基础知识

使用redis创建布隆过滤器

2019-10-22  本文已影响0人  无醉_1866

布隆过滤器

是一个很长的二进制向量和一系列随机映射函数。布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都比一般的算法要好的多,缺点是有一定的误识别率和删除困难。但是布隆过滤器可以控制错误率。

具体的布隆过滤器相关的内容可查找相关资料,非常详细,其优势就是占用内存比hash表要小得多,非常适合用于做过滤的场景

Guava中的布隆过滤器

Guava是google开发的java基础库,其中提供了布隆过滤器的实现,即名为BloomFilter的类,其使用方式类似如下:

image

使用Redis实现布隆过滤器

当布隆过滤器也需要使用大量内存,并要求在多台机器之间共享时,Guava提供的BloomFilter就难以满足需求了。BloomFilter在数据存在上,实际上可以认为是一个非常大了位图,而redis支持bitmap数据结构,正好可以用于实现布隆过滤器。

然而,我们如何实现BloomFilter呢,我们可以先看看guava中的BloomFilter的实现方式:

image

BloomFilter.put()方法中,直接调用了strategy.put(),我们可以继续进入到这个Strategy中:

image

可以看到,Strategy是BloomFilter类中的内部接口,是用于当布隆过滤器存储的对象转换成bits,guava中提供的实现是一个enum:

image

我们继续看看其put方法的实现:

image

其中,除了hash以外,就是对LockFreeBitArray的操作,因此,如果我们能通过redis实现一个新的LockFreeBitArray,那我们就能实现一个基于redis的布隆过滤器了,但是很可惜,LockFreeBitArray是final的类,且是包访问权限,我们无法从LockFreeBitArray类做扩展。

那么我们只有使用两种方式:

  1. 自己从头开始实现BloomFilter
  2. 拿来主义,都是开源的了,抄代码吧,把BloomFilter相关的代码copy出来,替换掉LockFreeBitArray

我这里使得了第二种方式,将guava中的BloomFilter复制一份,并加上JedisPool参数用于访问redis,然后基于redis实现一个LockFreeBitArray,其中基于redis的LockFreeBitArray的实现如下:

  static final class LockFreeBitArray {
    private static final Logger logger = LoggerFactory.getLogger(BloomFilterStrategies.class);

    private static final int LONG_ADDRESSABLE_BITS = 6;
    private final JedisPool jedisPool;
    private final String redisKey;
    private final long numBits;

    // Used by serialization
    LockFreeBitArray(final long numBits, final String redisKey, final JedisPool jedisPool) {
      checkNotNull(jedisPool, "jedisPool is null!");
      checkArgument(!Strings.isNullOrEmpty(redisKey), "redisKey is empty!");
      this.jedisPool = jedisPool;
      this.redisKey = redisKey;
      this.numBits = numBits;
    }

    /**
     * Returns true if the bit changed value.
     */
    boolean set(long... bitIndexes) {
      final Closer closer = Closer.create();
      try {
        final Jedis jedis = closer.register(jedisPool.getResource());
        final Pipeline pipeline = closer.register(jedis.pipelined());
        for (long bitIndex : bitIndexes) {
          pipeline.setbit(redisKey, bitIndex >>> LONG_ADDRESSABLE_BITS, true);
        }
        final Response<List<Object>> responses = pipeline.exec();
        boolean changed = false;
        final List<Object> rsts = responses.get();
        for (Object rst : rsts) {
          changed |= (Boolean) rst;
        }
        return changed;
      } finally {
        try {
          closer.close();
        } catch (IOException e) {
          logger.error("close resource failed", e);
        }
      }
    }

    boolean get(long... bitIndexes) {
      final Closer closer = Closer.create();
      try {
        final Jedis jedis = closer.register(jedisPool.getResource());
        final Pipeline pipeline = closer.register(jedis.pipelined());
        for (long bitIndex : bitIndexes) {
          pipeline.getbit(redisKey, bitIndex >>> LONG_ADDRESSABLE_BITS);
        }
        final Response<List<Object>> responses = pipeline.exec();
        final List<Object> rsts = responses.get();
        for (Object rst : rsts) {
          if (!(Boolean) rst) {
            return false;
          }
        }
        return true;
      } finally {
        try {
          closer.close();
        } catch (IOException e) {
          logger.error("close resource failed", e);
        }
      }
    }

    long bitSize() {
      return numBits;
    }

    long bitCount() {
      try (final Jedis jedis = jedisPool.getResource()) {
        return jedis.bitcount(redisKey);
      }
    }

    @Override
    public boolean equals(@Nullable Object o) {
      if (o instanceof LockFreeBitArray) {
        LockFreeBitArray lockFreeBitArray = (LockFreeBitArray) o;
        return Objects.equals(redisKey, lockFreeBitArray.redisKey);
      }
      return false;
    }

    @Override
    public int hashCode() {
      return Objects.hashCode(redisKey);
    }
  }

可以看到,本质上就是通过一个key创建出一个bitmap,代码本身只是将原来guava的LockFreeBitArray中的byte数据替换成了redis和bitmap

整个BloomFilterStrategies的重新实现如下:

enum BloomFilterStrategies implements RedisBloomFilter.Strategy {

  MURMUR128_MITZ_32() {
    @Override
    public <T> boolean put(
        T object, Funnel<? super T> funnel, int numHashFunctions, LockFreeBitArray bits) {
      long bitSize = bits.bitSize();
      long hash64 = Hashing.murmur3_128().hashObject(object, funnel).asLong();
      int hash1 = (int) hash64;
      int hash2 = (int) (hash64 >>> 32);

      long[] indexes = new long[numHashFunctions];
      for (int i = 1; i <= numHashFunctions; i++) {
        int combinedHash = hash1 + (i * hash2);
        // Flip all the bits if it's negative (guaranteed positive number)
        if (combinedHash < 0) {
          combinedHash = ~combinedHash;
        }
        indexes[i] = combinedHash & bitSize;
      }
      return bits.set(indexes);
    }

    @Override
    public <T> boolean mightContain(
        T object, Funnel<? super T> funnel, int numHashFunctions, LockFreeBitArray bits) {
      long bitSize = bits.bitSize();
      long hash64 = Hashing.murmur3_128().hashObject(object, funnel).asLong();
      int hash1 = (int) hash64;
      int hash2 = (int) (hash64 >>> 32);

      for (int i = 1; i <= numHashFunctions; i++) {
        int combinedHash = hash1 + (i * hash2);
        // Flip all the bits if it's negative (guaranteed positive number)
        if (combinedHash < 0) {
          combinedHash = ~combinedHash;
        }
        if (!bits.get(combinedHash % bitSize)) {
          return false;
        }
      }
      return true;
    }
  },
  /**
   * This strategy uses all 128 bits of {@link Hashing#murmur3_128} when hashing. It looks different
   * than the implementation in MURMUR128_MITZ_32 because we're avoiding the multiplication in the
   * loop and doing a (much simpler) += hash2\. We're also changing the index to a positive number by
   * AND'ing with Long.MAX_VALUE instead of flipping the bits.
   */
  MURMUR128_MITZ_64() {
    @Override
    public <T> boolean put(
        T object, Funnel<? super T> funnel, int numHashFunctions, LockFreeBitArray bits) {
      long bitSize = bits.bitSize();
      byte[] bytes = Hashing.murmur3_128().hashObject(object, funnel).asBytes();
      long hash1 = lowerEight(bytes);
      long hash2 = upperEight(bytes);

      long combinedHash = hash1;
      long[] indexes = new long[numHashFunctions];
      for (int i = 0; i < numHashFunctions; i++) {
        // Make the combined hash positive and indexable
        indexes[i] = (combinedHash & Long.MAX_VALUE) % bitSize;
        combinedHash += hash2;
      }
      return bits.set(indexes);
    }

    @Override
    public <T> boolean mightContain(
        T object, Funnel<? super T> funnel, int numHashFunctions, LockFreeBitArray bits) {
      long bitSize = bits.bitSize();
      byte[] bytes = Hashing.murmur3_128().hashObject(object, funnel).asBytes();
      long hash1 = lowerEight(bytes);
      long hash2 = upperEight(bytes);

      long combinedHash = hash1;
      for (int i = 0; i < numHashFunctions; i++) {
        // Make the combined hash positive and indexable
        if (!bits.get((combinedHash & Long.MAX_VALUE) % bitSize)) {
          return false;
        }
        combinedHash += hash2;
      }
      return true;
    }

    private /* static */ long lowerEight(byte[] bytes) {
      return Longs.fromBytes(
          bytes[7], bytes[6], bytes[5], bytes[4], bytes[3], bytes[2], bytes[1], bytes[0]);
    }

    private /* static */ long upperEight(byte[] bytes) {
      return Longs.fromBytes(
          bytes[15], bytes[14], bytes[13], bytes[12], bytes[11], bytes[10], bytes[9], bytes[8]);
    }
  };

  static final class LockFreeBitArray {
    private static final Logger logger = LoggerFactory.getLogger(BloomFilterStrategies.class);

    private static final int LONG_ADDRESSABLE_BITS = 6;
    private final JedisPool jedisPool;
    private final String redisKey;
    private final long numBits;

    // Used by serialization
    LockFreeBitArray(final long numBits, final String redisKey, final JedisPool jedisPool) {
      checkNotNull(jedisPool, "jedisPool is null!");
      checkArgument(!Strings.isNullOrEmpty(redisKey), "redisKey is empty!");
      this.jedisPool = jedisPool;
      this.redisKey = redisKey;
      this.numBits = numBits;
    }

    /**
     * Returns true if the bit changed value.
     */
    boolean set(long... bitIndexes) {
      final Closer closer = Closer.create();
      try {
        final Jedis jedis = closer.register(jedisPool.getResource());
        final Pipeline pipeline = closer.register(jedis.pipelined());
        for (long bitIndex : bitIndexes) {
          pipeline.setbit(redisKey, bitIndex >>> LONG_ADDRESSABLE_BITS, true);
        }
        final Response<List<Object>> responses = pipeline.exec();
        boolean changed = false;
        final List<Object> rsts = responses.get();
        for (Object rst : rsts) {
          changed |= (Boolean) rst;
        }
        return changed;
      } finally {
        try {
          closer.close();
        } catch (IOException e) {
          logger.error("close resource failed", e);
        }
      }
    }

    boolean get(long... bitIndexes) {
      final Closer closer = Closer.create();
      try {
        final Jedis jedis = closer.register(jedisPool.getResource());
        final Pipeline pipeline = closer.register(jedis.pipelined());
        for (long bitIndex : bitIndexes) {
          pipeline.getbit(redisKey, bitIndex >>> LONG_ADDRESSABLE_BITS);
        }
        final Response<List<Object>> responses = pipeline.exec();
        final List<Object> rsts = responses.get();
        for (Object rst : rsts) {
          if (!(Boolean) rst) {
            return false;
          }
        }
        return true;
      } finally {
        try {
          closer.close();
        } catch (IOException e) {
          logger.error("close resource failed", e);
        }
      }
    }

    long bitSize() {
      return numBits;
    }

    long bitCount() {
      try (final Jedis jedis = jedisPool.getResource()) {
        return jedis.bitcount(redisKey);
      }
    }

    @Override
    public boolean equals(@Nullable Object o) {
      if (o instanceof LockFreeBitArray) {
        LockFreeBitArray lockFreeBitArray = (LockFreeBitArray) o;
        return Objects.equals(redisKey, lockFreeBitArray.redisKey);
      }
      return false;
    }

    @Override
    public int hashCode() {
      return Objects.hashCode(redisKey);
    }
  }
}

最后是与BloomFilter几乎一样的RedisBloomFilter:

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.base.Predicate;
import com.google.common.hash.Funnel;
import com.google.common.math.DoubleMath;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import org.checkerframework.checker.nullness.qual.Nullable;
import redis.clients.jedis.JedisPool;

import java.io.Serializable;
import java.math.RoundingMode;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

/**
 * @see com.google.common.hash.BloomFilter
 */
public final class RedisBloomFilter<T> implements Predicate<T>, Serializable {

  interface Strategy extends java.io.Serializable {

    <T> boolean put(
        T object, Funnel<? super T> funnel, int numHashFunctions, BloomFilterStrategies.LockFreeBitArray bits);

    <T> boolean mightContain(
        T object, Funnel<? super T> funnel, int numHashFunctions, BloomFilterStrategies.LockFreeBitArray bits);

    int ordinal();
  }

  private final BloomFilterStrategies.LockFreeBitArray bits;

  private final int numHashFunctions;

  private final Funnel<? super T> funnel;

  private final Strategy strategy;

  private RedisBloomFilter(
      BloomFilterStrategies.LockFreeBitArray bits, int numHashFunctions, Funnel<? super T> funnel, Strategy strategy) {
    checkArgument(numHashFunctions > 0, "numHashFunctions (%s) must be > 0", numHashFunctions);
    checkArgument(
        numHashFunctions <= 255, "numHashFunctions (%s) must be <= 255", numHashFunctions);
    this.bits = checkNotNull(bits);
    this.numHashFunctions = numHashFunctions;
    this.funnel = checkNotNull(funnel);
    this.strategy = checkNotNull(strategy);
  }

  public boolean mightContain(T object) {
    return strategy.mightContain(object, funnel, numHashFunctions, bits);
  }

  @Deprecated
  @Override
  public boolean apply(T input) {
    return mightContain(input);
  }

  @CanIgnoreReturnValue
  public boolean put(T object) {
    return strategy.put(object, funnel, numHashFunctions, bits);
  }

  public double expectedFpp() {
    // You down with FPP? (Yeah you know me!) Who's down with FPP? (Every last homie!)
    return Math.pow((double) bits.bitCount() / bitSize(), numHashFunctions);
  }

  public long approximateElementCount() {
    long bitSize = bits.bitSize();
    long bitCount = bits.bitCount();

    double fractionOfBitsSet = (double) bitCount / bitSize;
    return DoubleMath.roundToLong(
        -Math.log1p(-fractionOfBitsSet) * bitSize / numHashFunctions, RoundingMode.HALF_UP);
  }

  @VisibleForTesting
  long bitSize() {
    return bits.bitSize();
  }

  public boolean isCompatible(RedisBloomFilter<T> that) {
    checkNotNull(that);
    return this != that
        && this.numHashFunctions == that.numHashFunctions
        && this.bitSize() == that.bitSize()
        && this.strategy.equals(that.strategy)
        && this.funnel.equals(that.funnel);
  }

  @Override
  public boolean equals(@Nullable Object object) {
    if (object == this) {
      return true;
    }
    if (object instanceof RedisBloomFilter) {
      RedisBloomFilter<?> that = (RedisBloomFilter<?>) object;
      return this.numHashFunctions == that.numHashFunctions
          && this.funnel.equals(that.funnel)
          && this.bits.equals(that.bits)
          && this.strategy.equals(that.strategy);
    }
    return false;
  }

  @Override
  public int hashCode() {
    return Objects.hashCode(numHashFunctions, funnel, strategy, bits);
  }

  public static <T> RedisBloomFilter<T> create(
      Funnel<? super T> funnel, long expectedInsertions, double fpp, JedisPool jedisPool, String redisKey) {
    return create(funnel, expectedInsertions, fpp, BloomFilterStrategies.MURMUR128_MITZ_64, jedisPool, redisKey);
  }

  @VisibleForTesting
  static <T> RedisBloomFilter<T> create(
      Funnel<? super T> funnel, long expectedInsertions, double fpp, Strategy strategy, JedisPool jedisPool, String key) {
    checkNotNull(funnel);
    checkArgument(
        expectedInsertions >= 0, "Expected insertions (%s) must be >= 0", expectedInsertions);
    checkArgument(fpp > 0.0, "False positive probability (%s) must be > 0.0", fpp);
    checkArgument(fpp < 1.0, "False positive probability (%s) must be < 1.0", fpp);
    checkNotNull(strategy);
    checkNotNull(jedisPool);

    if (expectedInsertions == 0) {
      expectedInsertions = 1;
    }

    long numBits = optimalNumOfBits(expectedInsertions, fpp);
    int numHashFunctions = optimalNumOfHashFunctions(expectedInsertions, numBits);
    try {
      return new RedisBloomFilter<T>(new BloomFilterStrategies.LockFreeBitArray(numBits, key, jedisPool), numHashFunctions, funnel, strategy);
    } catch (IllegalArgumentException e) {
      throw new IllegalArgumentException("Could not create RedisBloomFilter of " + numBits + " bits", e);
    }
  }

  public static <T> RedisBloomFilter<T> create(Funnel<? super T> funnel, long expectedInsertions, JedisPool jedisPool, String redisKey) {
    return create(funnel, expectedInsertions, 0.03, jedisPool, redisKey); // FYI, for 3%, we always get 5 hash functions
  }

  @VisibleForTesting
  static int optimalNumOfHashFunctions(long n, long m) {
    // (m / n) * log(2), but avoid truncation due to division!
    return Math.max(1, (int) Math.round((double) m / n * Math.log(2)));
  }

  @VisibleForTesting
  static long optimalNumOfBits(long n, double p) {
    if (p == 0) {
      p = Double.MIN_VALUE;
    }
    return (long) (-n * Math.log(p) / (Math.log(2) * Math.log(2)));
  }

}

待优化点

目前的环境中使得的redis是单机的,所以这样使用是没问题的,但是对于使用redis集群而言,这样做就不太好了,因为整个BloomFilter只关联了一个key,无法分散到redis集群中的各台机器上,因此可以针对集群做一个优化,一种可行的思路就是将一个BloomFilter拆分成多个BloomFilter,生成不同的key,将BloomFilter的数据分散到redis集群中不同的redis机器上,这样可充分发挥出redis集群的性能和缓存的容量

上一篇下一篇

猜你喜欢

热点阅读