redis之管道应用场景及源码分析
2017-10-10 本文已影响0人
jerrik
一、redis通信基础
我们都知道,redis的通信是建立在tcp基础上的,也就是说每一次命令(get、set)都需要经过tcp三次握手,而且redis一般都是部署在局域网内,网络开销非常小,针对频次较低的操作,网络开销都是可以忽略的。
二、什么情况下需要使用redis的管道?
在redis通信基础中 我已经讲到了。每一次操作redis的时候我们都需要和服务端建立连接,针对量小的情况下网络延迟都是可以忽略的,但是针对大批量的业务,就会产生雪崩效应。假如一次操作耗时2ms,理论上100万次操作就会有2ms*100万ms延迟,中间加上服务器处理开销,耗时可能更多.对应客户端来讲,这种长时间的耗时是不能接受的。所以为了解决这个问题,redis的管道pipeline就派上用场了。 恰好公司的对账业务使用了redis的sdiff功能,数据量比较大,刚开始没有pipeline导致延迟非常严重。后来wireshark抓包分析原因确实发现不停的建立tcp连接(发送数据,接收数据)。使用pipeline后性能大幅度提升。
三、使用实例
- 不使用pipeline的情况
private static void withoutPipeline(int count){
try {
for(int i =0; i<count; i++){
CacheUtils.sadd("testWithout", "key_" + i);
}
} catch (Exception e) {
e.printStackTrace();
}
}
- 使用pipeline的情况
private static void usePipeline(int count) {
try {
Pipeline pipe = CacheUtils.pipelined();
for (int i = 0; i < count; i++) {
pipe.sadd("test", "key_" + i);
}
pipe.sync();
} catch (Exception e) {
e.printStackTrace();
}
}
- 测试一下(循环操作1万次,看耗时情况)
int count = 10000;
long start = System.currentTimeMillis();
withoutPipeline(count);
long end = System.currentTimeMillis();
System.out.println("withoutPipeline: " + (end - start));
start = System.currentTimeMillis();
usePipeline(count);
end = System.currentTimeMillis();
System.out.println("usePipeline: " + (end - start));
output:
操作10000次的结果:
withoutPipeline: 9266
usePipeline: 66
操作1000000次的结果:
withoutPipeline: 834535
usePipeline: 4803
可想而知,使用pipeline的性能要比不使用管道快很多倍。
四、jredis pipeline()源码分析
public Pipeline pipelined() {
Pipeline pipeline = new Pipeline();
pipeline.setClient(client);
return pipeline;
}
//使用管道的sadd
public Response<Long> sadd(String key, String... member) {
getClient(key).sadd(key, member);
return getResponse(BuilderFactory.LONG);//没有手动调用flush(),而是返回一个固定值。
}
//让我们来看看pipeline的sync()方法:
public void sync() {
if (getPipelinedResponseLength() > 0) {
List<Object> unformatted = client.getAll();
for (Object o : unformatted) {
generateResponse(o);
}
}
}
//client.getAll()调用了getAll(0)
public List<Object> getAll(int except) {
List<Object> all = new ArrayList<Object>();
flush();//也就是说在调用pipeline.sync()时手动触发的flush()方法,一次pipeline操作真正意思上只有一次tcp
while (pipelinedCommands > except) {
try {
all.add(readProtocolWithCheckingBroken());
} catch (JedisDataException e) {
all.add(e);
}
pipelinedCommands--;
}
return all;
}
//未使用管道的sadd方法
public Long sadd(final String key, final String... members) {
checkIsInMulti();
client.sadd(key, members);//这里的sadd将会执行下面的sendCommand发送指令
return client.getIntegerReply();//客户端立即发送数据到服务端。客户端等待服务端返回
}
//所有的发送指令都要调用该方法,但是该方法并没有真正发送数据。
protected Connection sendCommand(final ProtocolCommand cmd, final byte[]... args) {
try {
connect();
Protocol.sendCommand(outputStream, cmd, args);
pipelinedCommands++;
return this;
} catch (JedisConnectionException ex) {
// Any other exceptions related to connection?
broken = true;
throw ex;
}
}
public Long getIntegerReply() {
flush();//sendCommand方法调用后,还没有真正将数据写到服务端,当调用flush()后才真正触发发送数据
pipelinedCommands--;
return (Long) readProtocolWithCheckingBroken();
}
本文就先到这里了。。。