Redisjava进阶干货

redis之管道应用场景及源码分析

2017-10-10  本文已影响0人  jerrik
一、redis通信基础

我们都知道,redis的通信是建立在tcp基础上的,也就是说每一次命令(get、set)都需要经过tcp三次握手,而且redis一般都是部署在局域网内,网络开销非常小,针对频次较低的操作,网络开销都是可以忽略的。

二、什么情况下需要使用redis的管道?

在redis通信基础中 我已经讲到了。每一次操作redis的时候我们都需要和服务端建立连接,针对量小的情况下网络延迟都是可以忽略的,但是针对大批量的业务,就会产生雪崩效应。假如一次操作耗时2ms,理论上100万次操作就会有2ms*100万ms延迟,中间加上服务器处理开销,耗时可能更多.对应客户端来讲,这种长时间的耗时是不能接受的。所以为了解决这个问题,redis的管道pipeline就派上用场了。 恰好公司的对账业务使用了redis的sdiff功能,数据量比较大,刚开始没有pipeline导致延迟非常严重。后来wireshark抓包分析原因确实发现不停的建立tcp连接(发送数据,接收数据)。使用pipeline后性能大幅度提升。

三、使用实例
  1. 不使用pipeline的情况
   private static void withoutPipeline(int count){
        try {
            for(int i =0; i<count; i++){
              CacheUtils.sadd("testWithout", "key_" + i);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
  1. 使用pipeline的情况
    private static void usePipeline(int count) {
        try {
            Pipeline pipe = CacheUtils.pipelined();
            for (int i = 0; i < count; i++) {
                pipe.sadd("test", "key_" + i);
            }
            pipe.sync();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
  1. 测试一下(循环操作1万次,看耗时情况)
int count = 10000;
long start = System.currentTimeMillis();
withoutPipeline(count);
long end = System.currentTimeMillis();
System.out.println("withoutPipeline: " + (end - start));

start = System.currentTimeMillis();
usePipeline(count);
end = System.currentTimeMillis();
System.out.println("usePipeline: " + (end - start));
output:
操作10000次的结果:
withoutPipeline: 9266
usePipeline: 66

操作1000000次的结果:
withoutPipeline: 834535
usePipeline: 4803

可想而知,使用pipeline的性能要比不使用管道快很多倍。

四、jredis pipeline()源码分析
 public Pipeline pipelined() {
    Pipeline pipeline = new Pipeline();
    pipeline.setClient(client);
    return pipeline;
  }

//使用管道的sadd
public Response<Long> sadd(String key, String... member) {
    getClient(key).sadd(key, member);
    return getResponse(BuilderFactory.LONG);//没有手动调用flush(),而是返回一个固定值。
  }

//让我们来看看pipeline的sync()方法:
 public void sync() {
    if (getPipelinedResponseLength() > 0) {
      List<Object> unformatted = client.getAll();
      for (Object o : unformatted) {
        generateResponse(o);
      }
    }
  }

//client.getAll()调用了getAll(0)
public List<Object> getAll(int except) {
    List<Object> all = new ArrayList<Object>();
    flush();//也就是说在调用pipeline.sync()时手动触发的flush()方法,一次pipeline操作真正意思上只有一次tcp
    while (pipelinedCommands > except) {
      try {
        all.add(readProtocolWithCheckingBroken());
      } catch (JedisDataException e) {
        all.add(e);
      }
      pipelinedCommands--;
    }
    return all;
  }


//未使用管道的sadd方法
public Long sadd(final String key, final String... members) {
    checkIsInMulti();
    client.sadd(key, members);//这里的sadd将会执行下面的sendCommand发送指令
    return client.getIntegerReply();//客户端立即发送数据到服务端。客户端等待服务端返回
  }

//所有的发送指令都要调用该方法,但是该方法并没有真正发送数据。
 protected Connection sendCommand(final ProtocolCommand cmd, final byte[]... args) {
    try {
      connect();
      Protocol.sendCommand(outputStream, cmd, args);
      pipelinedCommands++;
      return this;
    } catch (JedisConnectionException ex) {
      // Any other exceptions related to connection?
      broken = true;
      throw ex;
    }
  }

 public Long getIntegerReply() {
    flush();//sendCommand方法调用后,还没有真正将数据写到服务端,当调用flush()后才真正触发发送数据
    pipelinedCommands--;
    return (Long) readProtocolWithCheckingBroken();
  }

本文就先到这里了。。。

上一篇 下一篇

猜你喜欢

热点阅读