使用redis创建布隆过滤器
布隆过滤器
是一个很长的二进制向量和一系列随机映射函数。布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都比一般的算法要好的多,缺点是有一定的误识别率和删除困难。但是布隆过滤器可以控制错误率。
具体的布隆过滤器相关的内容可查找相关资料,非常详细,其优势就是占用内存比hash表要小得多,非常适合用于做过滤的场景
Guava中的布隆过滤器
Guava是google开发的java基础库,其中提供了布隆过滤器的实现,即名为BloomFilter的类,其使用方式类似如下:
image使用Redis实现布隆过滤器
当布隆过滤器也需要使用大量内存,并要求在多台机器之间共享时,Guava提供的BloomFilter就难以满足需求了。BloomFilter在数据存在上,实际上可以认为是一个非常大了位图,而redis支持bitmap数据结构,正好可以用于实现布隆过滤器。
然而,我们如何实现BloomFilter呢,我们可以先看看guava中的BloomFilter的实现方式:
imageBloomFilter.put()方法中,直接调用了strategy.put(),我们可以继续进入到这个Strategy中:
image可以看到,Strategy是BloomFilter类中的内部接口,是用于当布隆过滤器存储的对象转换成bits,guava中提供的实现是一个enum:
image我们继续看看其put方法的实现:
image其中,除了hash以外,就是对LockFreeBitArray的操作,因此,如果我们能通过redis实现一个新的LockFreeBitArray,那我们就能实现一个基于redis的布隆过滤器了,但是很可惜,LockFreeBitArray是final的类,且是包访问权限,我们无法从LockFreeBitArray类做扩展。
那么我们只有使用两种方式:
- 自己从头开始实现BloomFilter
- 拿来主义,都是开源的了,抄代码吧,把BloomFilter相关的代码copy出来,替换掉LockFreeBitArray
我这里使得了第二种方式,将guava中的BloomFilter复制一份,并加上JedisPool参数用于访问redis,然后基于redis实现一个LockFreeBitArray,其中基于redis的LockFreeBitArray的实现如下:
static final class LockFreeBitArray {
private static final Logger logger = LoggerFactory.getLogger(BloomFilterStrategies.class);
private static final int LONG_ADDRESSABLE_BITS = 6;
private final JedisPool jedisPool;
private final String redisKey;
private final long numBits;
// Used by serialization
LockFreeBitArray(final long numBits, final String redisKey, final JedisPool jedisPool) {
checkNotNull(jedisPool, "jedisPool is null!");
checkArgument(!Strings.isNullOrEmpty(redisKey), "redisKey is empty!");
this.jedisPool = jedisPool;
this.redisKey = redisKey;
this.numBits = numBits;
}
/**
* Returns true if the bit changed value.
*/
boolean set(long... bitIndexes) {
final Closer closer = Closer.create();
try {
final Jedis jedis = closer.register(jedisPool.getResource());
final Pipeline pipeline = closer.register(jedis.pipelined());
for (long bitIndex : bitIndexes) {
pipeline.setbit(redisKey, bitIndex >>> LONG_ADDRESSABLE_BITS, true);
}
final Response<List<Object>> responses = pipeline.exec();
boolean changed = false;
final List<Object> rsts = responses.get();
for (Object rst : rsts) {
changed |= (Boolean) rst;
}
return changed;
} finally {
try {
closer.close();
} catch (IOException e) {
logger.error("close resource failed", e);
}
}
}
boolean get(long... bitIndexes) {
final Closer closer = Closer.create();
try {
final Jedis jedis = closer.register(jedisPool.getResource());
final Pipeline pipeline = closer.register(jedis.pipelined());
for (long bitIndex : bitIndexes) {
pipeline.getbit(redisKey, bitIndex >>> LONG_ADDRESSABLE_BITS);
}
final Response<List<Object>> responses = pipeline.exec();
final List<Object> rsts = responses.get();
for (Object rst : rsts) {
if (!(Boolean) rst) {
return false;
}
}
return true;
} finally {
try {
closer.close();
} catch (IOException e) {
logger.error("close resource failed", e);
}
}
}
long bitSize() {
return numBits;
}
long bitCount() {
try (final Jedis jedis = jedisPool.getResource()) {
return jedis.bitcount(redisKey);
}
}
@Override
public boolean equals(@Nullable Object o) {
if (o instanceof LockFreeBitArray) {
LockFreeBitArray lockFreeBitArray = (LockFreeBitArray) o;
return Objects.equals(redisKey, lockFreeBitArray.redisKey);
}
return false;
}
@Override
public int hashCode() {
return Objects.hashCode(redisKey);
}
}
可以看到,本质上就是通过一个key创建出一个bitmap,代码本身只是将原来guava的LockFreeBitArray中的byte数据替换成了redis和bitmap
整个BloomFilterStrategies的重新实现如下:
enum BloomFilterStrategies implements RedisBloomFilter.Strategy {
MURMUR128_MITZ_32() {
@Override
public <T> boolean put(
T object, Funnel<? super T> funnel, int numHashFunctions, LockFreeBitArray bits) {
long bitSize = bits.bitSize();
long hash64 = Hashing.murmur3_128().hashObject(object, funnel).asLong();
int hash1 = (int) hash64;
int hash2 = (int) (hash64 >>> 32);
long[] indexes = new long[numHashFunctions];
for (int i = 1; i <= numHashFunctions; i++) {
int combinedHash = hash1 + (i * hash2);
// Flip all the bits if it's negative (guaranteed positive number)
if (combinedHash < 0) {
combinedHash = ~combinedHash;
}
indexes[i] = combinedHash & bitSize;
}
return bits.set(indexes);
}
@Override
public <T> boolean mightContain(
T object, Funnel<? super T> funnel, int numHashFunctions, LockFreeBitArray bits) {
long bitSize = bits.bitSize();
long hash64 = Hashing.murmur3_128().hashObject(object, funnel).asLong();
int hash1 = (int) hash64;
int hash2 = (int) (hash64 >>> 32);
for (int i = 1; i <= numHashFunctions; i++) {
int combinedHash = hash1 + (i * hash2);
// Flip all the bits if it's negative (guaranteed positive number)
if (combinedHash < 0) {
combinedHash = ~combinedHash;
}
if (!bits.get(combinedHash % bitSize)) {
return false;
}
}
return true;
}
},
/**
* This strategy uses all 128 bits of {@link Hashing#murmur3_128} when hashing. It looks different
* than the implementation in MURMUR128_MITZ_32 because we're avoiding the multiplication in the
* loop and doing a (much simpler) += hash2\. We're also changing the index to a positive number by
* AND'ing with Long.MAX_VALUE instead of flipping the bits.
*/
MURMUR128_MITZ_64() {
@Override
public <T> boolean put(
T object, Funnel<? super T> funnel, int numHashFunctions, LockFreeBitArray bits) {
long bitSize = bits.bitSize();
byte[] bytes = Hashing.murmur3_128().hashObject(object, funnel).asBytes();
long hash1 = lowerEight(bytes);
long hash2 = upperEight(bytes);
long combinedHash = hash1;
long[] indexes = new long[numHashFunctions];
for (int i = 0; i < numHashFunctions; i++) {
// Make the combined hash positive and indexable
indexes[i] = (combinedHash & Long.MAX_VALUE) % bitSize;
combinedHash += hash2;
}
return bits.set(indexes);
}
@Override
public <T> boolean mightContain(
T object, Funnel<? super T> funnel, int numHashFunctions, LockFreeBitArray bits) {
long bitSize = bits.bitSize();
byte[] bytes = Hashing.murmur3_128().hashObject(object, funnel).asBytes();
long hash1 = lowerEight(bytes);
long hash2 = upperEight(bytes);
long combinedHash = hash1;
for (int i = 0; i < numHashFunctions; i++) {
// Make the combined hash positive and indexable
if (!bits.get((combinedHash & Long.MAX_VALUE) % bitSize)) {
return false;
}
combinedHash += hash2;
}
return true;
}
private /* static */ long lowerEight(byte[] bytes) {
return Longs.fromBytes(
bytes[7], bytes[6], bytes[5], bytes[4], bytes[3], bytes[2], bytes[1], bytes[0]);
}
private /* static */ long upperEight(byte[] bytes) {
return Longs.fromBytes(
bytes[15], bytes[14], bytes[13], bytes[12], bytes[11], bytes[10], bytes[9], bytes[8]);
}
};
static final class LockFreeBitArray {
private static final Logger logger = LoggerFactory.getLogger(BloomFilterStrategies.class);
private static final int LONG_ADDRESSABLE_BITS = 6;
private final JedisPool jedisPool;
private final String redisKey;
private final long numBits;
// Used by serialization
LockFreeBitArray(final long numBits, final String redisKey, final JedisPool jedisPool) {
checkNotNull(jedisPool, "jedisPool is null!");
checkArgument(!Strings.isNullOrEmpty(redisKey), "redisKey is empty!");
this.jedisPool = jedisPool;
this.redisKey = redisKey;
this.numBits = numBits;
}
/**
* Returns true if the bit changed value.
*/
boolean set(long... bitIndexes) {
final Closer closer = Closer.create();
try {
final Jedis jedis = closer.register(jedisPool.getResource());
final Pipeline pipeline = closer.register(jedis.pipelined());
for (long bitIndex : bitIndexes) {
pipeline.setbit(redisKey, bitIndex >>> LONG_ADDRESSABLE_BITS, true);
}
final Response<List<Object>> responses = pipeline.exec();
boolean changed = false;
final List<Object> rsts = responses.get();
for (Object rst : rsts) {
changed |= (Boolean) rst;
}
return changed;
} finally {
try {
closer.close();
} catch (IOException e) {
logger.error("close resource failed", e);
}
}
}
boolean get(long... bitIndexes) {
final Closer closer = Closer.create();
try {
final Jedis jedis = closer.register(jedisPool.getResource());
final Pipeline pipeline = closer.register(jedis.pipelined());
for (long bitIndex : bitIndexes) {
pipeline.getbit(redisKey, bitIndex >>> LONG_ADDRESSABLE_BITS);
}
final Response<List<Object>> responses = pipeline.exec();
final List<Object> rsts = responses.get();
for (Object rst : rsts) {
if (!(Boolean) rst) {
return false;
}
}
return true;
} finally {
try {
closer.close();
} catch (IOException e) {
logger.error("close resource failed", e);
}
}
}
long bitSize() {
return numBits;
}
long bitCount() {
try (final Jedis jedis = jedisPool.getResource()) {
return jedis.bitcount(redisKey);
}
}
@Override
public boolean equals(@Nullable Object o) {
if (o instanceof LockFreeBitArray) {
LockFreeBitArray lockFreeBitArray = (LockFreeBitArray) o;
return Objects.equals(redisKey, lockFreeBitArray.redisKey);
}
return false;
}
@Override
public int hashCode() {
return Objects.hashCode(redisKey);
}
}
}
最后是与BloomFilter几乎一样的RedisBloomFilter:
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.base.Predicate;
import com.google.common.hash.Funnel;
import com.google.common.math.DoubleMath;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import org.checkerframework.checker.nullness.qual.Nullable;
import redis.clients.jedis.JedisPool;
import java.io.Serializable;
import java.math.RoundingMode;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* @see com.google.common.hash.BloomFilter
*/
public final class RedisBloomFilter<T> implements Predicate<T>, Serializable {
interface Strategy extends java.io.Serializable {
<T> boolean put(
T object, Funnel<? super T> funnel, int numHashFunctions, BloomFilterStrategies.LockFreeBitArray bits);
<T> boolean mightContain(
T object, Funnel<? super T> funnel, int numHashFunctions, BloomFilterStrategies.LockFreeBitArray bits);
int ordinal();
}
private final BloomFilterStrategies.LockFreeBitArray bits;
private final int numHashFunctions;
private final Funnel<? super T> funnel;
private final Strategy strategy;
private RedisBloomFilter(
BloomFilterStrategies.LockFreeBitArray bits, int numHashFunctions, Funnel<? super T> funnel, Strategy strategy) {
checkArgument(numHashFunctions > 0, "numHashFunctions (%s) must be > 0", numHashFunctions);
checkArgument(
numHashFunctions <= 255, "numHashFunctions (%s) must be <= 255", numHashFunctions);
this.bits = checkNotNull(bits);
this.numHashFunctions = numHashFunctions;
this.funnel = checkNotNull(funnel);
this.strategy = checkNotNull(strategy);
}
public boolean mightContain(T object) {
return strategy.mightContain(object, funnel, numHashFunctions, bits);
}
@Deprecated
@Override
public boolean apply(T input) {
return mightContain(input);
}
@CanIgnoreReturnValue
public boolean put(T object) {
return strategy.put(object, funnel, numHashFunctions, bits);
}
public double expectedFpp() {
// You down with FPP? (Yeah you know me!) Who's down with FPP? (Every last homie!)
return Math.pow((double) bits.bitCount() / bitSize(), numHashFunctions);
}
public long approximateElementCount() {
long bitSize = bits.bitSize();
long bitCount = bits.bitCount();
double fractionOfBitsSet = (double) bitCount / bitSize;
return DoubleMath.roundToLong(
-Math.log1p(-fractionOfBitsSet) * bitSize / numHashFunctions, RoundingMode.HALF_UP);
}
@VisibleForTesting
long bitSize() {
return bits.bitSize();
}
public boolean isCompatible(RedisBloomFilter<T> that) {
checkNotNull(that);
return this != that
&& this.numHashFunctions == that.numHashFunctions
&& this.bitSize() == that.bitSize()
&& this.strategy.equals(that.strategy)
&& this.funnel.equals(that.funnel);
}
@Override
public boolean equals(@Nullable Object object) {
if (object == this) {
return true;
}
if (object instanceof RedisBloomFilter) {
RedisBloomFilter<?> that = (RedisBloomFilter<?>) object;
return this.numHashFunctions == that.numHashFunctions
&& this.funnel.equals(that.funnel)
&& this.bits.equals(that.bits)
&& this.strategy.equals(that.strategy);
}
return false;
}
@Override
public int hashCode() {
return Objects.hashCode(numHashFunctions, funnel, strategy, bits);
}
public static <T> RedisBloomFilter<T> create(
Funnel<? super T> funnel, long expectedInsertions, double fpp, JedisPool jedisPool, String redisKey) {
return create(funnel, expectedInsertions, fpp, BloomFilterStrategies.MURMUR128_MITZ_64, jedisPool, redisKey);
}
@VisibleForTesting
static <T> RedisBloomFilter<T> create(
Funnel<? super T> funnel, long expectedInsertions, double fpp, Strategy strategy, JedisPool jedisPool, String key) {
checkNotNull(funnel);
checkArgument(
expectedInsertions >= 0, "Expected insertions (%s) must be >= 0", expectedInsertions);
checkArgument(fpp > 0.0, "False positive probability (%s) must be > 0.0", fpp);
checkArgument(fpp < 1.0, "False positive probability (%s) must be < 1.0", fpp);
checkNotNull(strategy);
checkNotNull(jedisPool);
if (expectedInsertions == 0) {
expectedInsertions = 1;
}
long numBits = optimalNumOfBits(expectedInsertions, fpp);
int numHashFunctions = optimalNumOfHashFunctions(expectedInsertions, numBits);
try {
return new RedisBloomFilter<T>(new BloomFilterStrategies.LockFreeBitArray(numBits, key, jedisPool), numHashFunctions, funnel, strategy);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Could not create RedisBloomFilter of " + numBits + " bits", e);
}
}
public static <T> RedisBloomFilter<T> create(Funnel<? super T> funnel, long expectedInsertions, JedisPool jedisPool, String redisKey) {
return create(funnel, expectedInsertions, 0.03, jedisPool, redisKey); // FYI, for 3%, we always get 5 hash functions
}
@VisibleForTesting
static int optimalNumOfHashFunctions(long n, long m) {
// (m / n) * log(2), but avoid truncation due to division!
return Math.max(1, (int) Math.round((double) m / n * Math.log(2)));
}
@VisibleForTesting
static long optimalNumOfBits(long n, double p) {
if (p == 0) {
p = Double.MIN_VALUE;
}
return (long) (-n * Math.log(p) / (Math.log(2) * Math.log(2)));
}
}
待优化点
目前的环境中使得的redis是单机的,所以这样使用是没问题的,但是对于使用redis集群而言,这样做就不太好了,因为整个BloomFilter只关联了一个key,无法分散到redis集群中的各台机器上,因此可以针对集群做一个优化,一种可行的思路就是将一个BloomFilter拆分成多个BloomFilter,生成不同的key,将BloomFilter的数据分散到redis集群中不同的redis机器上,这样可充分发挥出redis集群的性能和缓存的容量