redis深度历险

2018-12-20  本文已影响96人  zlmind

一、redis常用指令及数据结构

image.png
STRING命令

更多command https://redis.io/commands

Redis底层数据结构

字符串(SDS)、链表(LinkedList)、Hash表(Dict)、整数集合(IntSet)、压缩列表(ZipList)、跳跃表(SkipList)

详见:https://www.jianshu.com/p/0206ef2cffa4
Redis 3.2.8 源码剖析注释
https://blog.csdn.net/men_wen/column/info/15428

二、底层通讯原理

Redis服务器与客户端通过RESP(REdis Serialization Protocol)协议通信。
Redis在TCP端口6379上监听到来的连接,客户端连接到来时,Redis服务器为此创建一个TCP连接。在客户端与服务器端之间传输的每个Redis命令或者数据都以\r\n结尾。

三、redis使用模式

1、单节点模式

单节点模式是最简单的Redis模式,就是一个redis实例,如果只是自己测试缓存或者小程序,数据量很小,仅仅做一个小型的KEY/VALUE型数据库,完全足够。

2、主从模式(master/slaver)

image.png

Redis不具备自动容错和恢复功能,主机从机的宕机都会导致前端部分读写请求失败,需要等待机器重启或者手动切换前端的IP才能恢复。
主机宕机,宕机前有部分数据未能及时同步到从机,切换IP后还会引入数据不一致的问题,降低了系统的可用性。
Redis的主从复制采用全量复制,复制过程中主机会fork出一个子进程对内存做一份快照,并将子进程的内存快照保存为文件发送给从机,这一过程需要确保主机有足够多的空余内存。若快照文件较大,对集群的服务能力会产生较大的影响,而且复制过程是在从机新加入集群或者从机和主机网络断开重连时都会进行,也就是网络波动都会造成主机和从机间的一次全量的数据复制,这对实际的系统运营造成了不小的麻烦。
Redis较难支持在线扩容,在集群容量达到上限时在线扩容会变得很复杂。为避免这一问题,运维人员在系统上线时必须确保有足够的空间,这对资源造成了很大的浪费。

3、Sentinel(哨兵)模式

Sentinel(哨兵)进程是用于监控redis集群中Master主服务器工作的状态,在Master主服务器发生故障的时候,可以实现Master和Slave服务器的切换,保证系统的高可用。


image.png

优点:
哨兵集群模式是基于主从模式的,所有主从的优点,哨兵模式同样具有。
主从可以切换,故障可以转移,系统可用性更好。
哨兵模式是主从模式的升级,系统更健壮,可用性更高。

缺点:
Redis较难支持在线扩容,在集群容量达到上限时在线扩容会变得很复杂。为避免这一问题,运维人员在系统上线时必须确保有足够的空间,这对资源造成了很大的浪费。
配置复杂

4、Cluster(集群)模式

Redis Cluster是一种服务器Sharding技术,3.0版本开始正式提供。
在这个图中,每一个蓝色的圈都代表着一个redis的服务器节点。它们任何两个节点之间都是相互连通的。客户端可以与任何一个节点相连接,然后就可以访问集群中的任何一个节点。对其进行存取和其他操作

image.png

Redis 3.0的集群方案有以下两个问题。

一个Redis实例具备了“数据存储”和“路由重定向”,完全去中心化的设计。这带来的好处是部署非常简单,直接部署Redis就行,不像Codis有那么多的组件和依赖。但带来的问题是很难对业务进行无痛的升级,如果哪天Redis集群出了什么严重的Bug,就只能回滚整个Redis集群。
对协议进行了较大的修改,对应的Redis客户端也需要升级。升级Redis客户端后谁能确保没有Bug?而且对于线上已经大规模运行的业务,升级代码中的Redis客户端也是一个很麻烦的事情。
Redis Cluster是Redis 3.0以后才正式推出,时间较晚,目前能证明在大规模生产环境下成功的案例还不是很多,需要时间检验。

5、中间件代理

中间件的作用是将我们需要存入redis中的数据的key通过一套算法计算得出一个值。然后根据这个值找到对应的redis节点,将这些数据存在这个redis的节点中

Twemproxy

https://github.com/twitter/twemproxy

image.png
Twemproxy最大的痛点在于,无法平滑地扩容/缩容
运维不友好,甚至没有控制面板

Codis

Codis引入了Group的概念,每个Group包括1个Redis Master及至少1个Redis Slave,这是和Twemproxy的区别之一。这样做的好处是,如果当前Master有问题,则运维人员可通过Dashboard“自助式”切换到Slave,而不需要小心翼翼地修改程序配置文件。为支持数据热迁移(Auto Rebalance),出品方修改了Redis Server源码,并称之为Codis Server。Codis采用预先分片(Pre-Sharding)机制,事先规定好了,分成1024个slots(也就是说,最多能支持后端1024个Codis Server),这些路由信息保存在ZooKeeper中。

https://github.com/CodisLabs/codis

image.png image.png

著名的开源方案
Jedis https://github.com/xetorthio/jedis
Redisson https://github.com/redisson/redisson

推特开源Twemproxy https://github.com/twitter/twemproxy
豌豆荚团队开源codis https://github.com/CodisLabs/codis
官方高可用方案:sentinel(哨兵)

四、客户端源码分析

Jedis https://github.com/xetorthio/jedis
本文使用2.7.2版本

对象池设计:Pool,JedisPool,GenericObjectPool,BasePoolableObjectFactory,JedisFactory
面向用户的redis操作封装:BinaryJedisCommands,JedisCommands,BinaryJedis,Jedis
面向redis服务器的操作封装:Commands,Client,BinaryClient,Connection,Protocol

类图设计如下:


image.png

其他类作用


image.png
ShardedJedis实现分析
ShardedJedis是基于一致性哈希算法实现的分布式Redis集群客户端;ShardedJedis的设计分为以下几块:

对象池设计:Pool,ShardedJedisPool,ShardedJedisFactory
面向用户的操作封装:BinaryShardedJedis,BinaryShardedJedis
一致性哈希实现:Sharded

关于ShardedJedis设计,忽略了Jedis的设计细节,设计类图如下:


image.png

关于ShardedJedis类图设计,省略了对象池,以及Jedis设计的以下细节介绍:


image.png

reids客户端配置

 <bean id="jedisPool" class="redis.clients.jedis.ShardedJedisPool">
        <constructor-arg index="0" ref="jedisConfig"/>
        <constructor-arg index="1">
            <list>
                <bean class="redis.clients.jedis.JedisShardInfo">
                    <constructor-arg index="0" value="${redis.host}"/>
                    <constructor-arg index="1" value="${redis.port}"/>
                    <constructor-arg index="2" value="${redis.timeout}" type="int"/>
                    <!-- <constructor-arg index="3" value="@{redis1.weight}" type="int"/>
                     -->
                </bean>
            </list>
        </constructor-arg>
    </bean>
    <bean id="jedisConfig" class="redis.clients.jedis.JedisPoolConfig">
        <property name="maxTotal" value="${redis.maxTotal}"/>
        <property name="maxIdle" value="${redis.maxIdle}"/>
        <property name="minIdle" value="${redis.minIdle}"/>
        <property name="testOnBorrow" value="${redis.testOnBorrow}"/>
        <property name="maxWaitMillis" value="${redis.maxWait}"/>
    </bean>

redis.clients.jedis.ShardedJedisPool

//构造函数注入JedisShardInfo,JedisPoolConfig
public class ShardedJedisPool extends Pool<ShardedJedis> {
    
  //构造函数注入JedisShardInfo,JedisPoolConfig
  public ShardedJedisPool(final GenericObjectPoolConfig poolConfig, List<JedisShardInfo> shards) {
    this(poolConfig, shards, Hashing.MURMUR_HASH);
  }
。。。。

JedisShardInfo注入

JedisShardInfo继承ShardInfo,很简单,主要是服务端信息配置及soTimeout的注入,这个参数用来控制connection,和socket的连接超时

public class JedisShardInfo extends ShardInfo<Jedis> {
。。。
  public JedisShardInfo(String host, int port, int timeout) {
    this(host, port, timeout, timeout, Sharded.DEFAULT_WEIGHT);
  }
。。。

  public JedisShardInfo(String host, int port, int connectionTimeout, int soTimeout, int weight) {
    super(weight);
    this.host = host;
    this.port = port;
    this.connectionTimeout = connectionTimeout;
    this.soTimeout = soTimeout;
  }
。。。。
}

JedisPoolConfig信息初始化及注入

image.png

由上图看出,JedisPoolConfig 继承apache commons-pool2.4.3.jar的GenericObjectPool,设置配置

public class JedisPoolConfig extends GenericObjectPoolConfig {
  public JedisPoolConfig() {
    // defaults to make your life with connection pool easier :)
    setTestWhileIdle(true);
    setMinEvictableIdleTimeMillis(60000);
    setTimeBetweenEvictionRunsMillis(30000);
    setNumTestsPerEvictionRun(-1);
  }
}

GenericObjectPool

//GenericObjectPool构造函数
  public GenericObjectPool(PooledObjectFactory<T> factory,
            GenericObjectPoolConfig config) {

        super(config, ONAME_BASE, config.getJmxNamePrefix());

        if (factory == null) {
            jmxUnregister(); // tidy up
            throw new IllegalArgumentException("factory may not be null");
        }
        this.factory = factory;

        idleObjects = new LinkedBlockingDeque<PooledObject<T>>(config.getFairness());

        setConfig(config);
        //启动自动回收空闲连接的代码
        startEvictor(getTimeBetweenEvictionRunsMillis());
    }
//设置配置文件数据
public void setConfig(final GenericObjectPoolConfig conf) {
        setLifo(conf.getLifo());
        setMaxIdle(conf.getMaxIdle());
        setMinIdle(conf.getMinIdle());
        setMaxTotal(conf.getMaxTotal());
        setMaxWaitMillis(conf.getMaxWaitMillis());
        setBlockWhenExhausted(conf.getBlockWhenExhausted());
        setTestOnCreate(conf.getTestOnCreate());
        setTestOnBorrow(conf.getTestOnBorrow());
        setTestOnReturn(conf.getTestOnReturn());
        setTestWhileIdle(conf.getTestWhileIdle());
        setNumTestsPerEvictionRun(conf.getNumTestsPerEvictionRun());
        setMinEvictableIdleTimeMillis(conf.getMinEvictableIdleTimeMillis());
        setTimeBetweenEvictionRunsMillis(
                conf.getTimeBetweenEvictionRunsMillis());
        setSoftMinEvictableIdleTimeMillis(
                conf.getSoftMinEvictableIdleTimeMillis());
        setEvictionPolicyClassName(conf.getEvictionPolicyClassName());
        setEvictorShutdownTimeoutMillis(conf.getEvictorShutdownTimeoutMillis());
    }

Jedis连接池

连接池初始化

Pool class

public abstract class Pool<T> implements Closeable {
  protected GenericObjectPool<T> internalPool;
。。。。

  public void initPool(final GenericObjectPoolConfig poolConfig, PooledObjectFactory<T> factory) {

    if (this.internalPool != null) {
      try {
        closeInternalPool();
      } catch (Exception e) {
      }
    }

    this.internalPool = new GenericObjectPool<T>(factory, poolConfig);
  }
//org.apache.commons.pool2.impl.GenericObjectPool.GenericObjectPool(PooledObjectFactory<T> factory, GenericObjectPoolConfig config)

 public GenericObjectPool(PooledObjectFactory<T> factory,
            GenericObjectPoolConfig config) {

        super(config, ONAME_BASE, config.getJmxNamePrefix());

        if (factory == null) {
            jmxUnregister(); // tidy up
            throw new IllegalArgumentException("factory may not be null");
        }
        this.factory = factory;

        idleObjects = new LinkedBlockingDeque<PooledObject<T>>(config.getFairness());

        setConfig(config);

        startEvictor(getTimeBetweenEvictionRunsMillis());
    }

//初始化idleObjects
 private void ensureIdle(int idleCount, boolean always) throws Exception {
        if (idleCount < 1 || isClosed() || (!always && !idleObjects.hasTakeWaiters())) {
            return;
        }

        while (idleObjects.size() < idleCount) {
            PooledObject<T> p = create();
            if (p == null) {
                // Can't create objects, no reason to think another call to
                // create will work. Give up.
                break;
            }
            if (getLifo()) {
                idleObjects.addFirst(p);
            } else {
                idleObjects.addLast(p);
            }
        }
        if (isClosed()) {
            // Pool closed while object was being added to idle objects.
            // Make sure the returned object is destroyed rather than left
            // in the idle object pool (which would effectively be a leak)
            clear();
        }
    }

连接池使用

ShardedJedis jedis = null;
        try {
            //连接池获取redis实例
            jedis = jedisPool.getResource();

//redis.clients.jedis.ShardedJedisPool
public class ShardedJedisPool extends Pool<ShardedJedis> {
。。。
 @Override
  public ShardedJedis getResource() {
    ShardedJedis jedis = super.getResource();
    jedis.setDataSource(this);
    return jedis;
  }

T redis.clients.util.Pool.getResource()

 public T getResource() {
    try {
      return internalPool.borrowObject();
    } catch (Exception e) {
      throw new JedisConnectionException("Could not get a resource from the pool", e);
    }
  }
public class GenericObjectPool<T> extends BaseGenericObjectPool<T>
        implements ObjectPool<T>, GenericObjectPoolMXBean, UsageTracking<T> {

。。。。。
public T borrowObject(long borrowMaxWaitMillis) throws Exception {
        assertOpen();

        AbandonedConfig ac = this.abandonedConfig;
        if (ac != null && ac.getRemoveAbandonedOnBorrow() &&
                (getNumIdle() < 2) &&
                (getNumActive() > getMaxTotal() - 3) ) {
            removeAbandoned(ac);
        }

        PooledObject<T> p = null;

        // Get local copy of current config so it is consistent for entire
        // method execution
        //没有空闲连接的时候是否阻塞的等待连接
        boolean blockWhenExhausted = getBlockWhenExhausted();

        boolean create;
        long waitTime = System.currentTimeMillis();

        while (p == null) {
            create = false;
             // 没有空闲连接时等待
            if (blockWhenExhausted) {
                p = idleObjects.pollFirst();
                if (p == null) {
                    p = create();
                    if (p != null) {
                        create = true;
                    }
                }
                 // 如果连接创建失败, 则继续从idleObjects中阻塞的获取连接
                if (p == null) {
                    if (borrowMaxWaitMillis < 0) {
                         // 无限制的等待, 不会超时
                        p = idleObjects.takeFirst();
                    } else {
                          // 有超时时间的等待
                        p = idleObjects.pollFirst(borrowMaxWaitMillis,
                                TimeUnit.MILLISECONDS);
                    }
                }
                if (p == null) {
                    throw new NoSuchElementException(
                            "Timeout waiting for idle object");
                }
                if (!p.allocate()) {
                    p = null;
                }
            } else {
                // 没有空闲连接时直接抛出异常
                p = idleObjects.pollFirst();
                if (p == null) {
                    p = create();
                    if (p != null) {
                        create = true;
                    }
                }
                if (p == null) {
                    throw new NoSuchElementException("Pool exhausted");
                }
                if (!p.allocate()) {
                    p = null;
                }
            }

            if (p != null) {
                try {
                    factory.activateObject(p);
                } catch (Exception e) {
                    try {
                        destroy(p);
                    } catch (Exception e1) {
                        // Ignore - activation failure is more important
                    }
                    p = null;
                    if (create) {
                        NoSuchElementException nsee = new NoSuchElementException(
                                "Unable to activate object");
                        nsee.initCause(e);
                        throw nsee;
                    }
                }
                if (p != null && (getTestOnBorrow() || create && getTestOnCreate())) {
                    boolean validate = false;
                    Throwable validationThrowable = null;
                    try {
                        validate = factory.validateObject(p);
                    } catch (Throwable t) {
                        PoolUtils.checkRethrow(t);
                        validationThrowable = t;
                    }
                    if (!validate) {
                        try {
                            destroy(p);
                            destroyedByBorrowValidationCount.incrementAndGet();
                        } catch (Exception e) {
                            // Ignore - validation failure is more important
                        }
                        p = null;
                        if (create) {
                            NoSuchElementException nsee = new NoSuchElementException(
                                    "Unable to validate object");
                            nsee.initCause(validationThrowable);
                            throw nsee;
                        }
                    }
                }
            }
        }

        updateStatsBorrow(p, System.currentTimeMillis() - waitTime);

        return p.getObject();
    }

//创建
  private PooledObject<T> create() throws Exception {
        // 判断当前已经创建的连接是否已经超过设置的最大连接数
        int localMaxTotal = getMaxTotal();
        long newCreateCount = createCount.incrementAndGet();
        if (localMaxTotal > -1 && newCreateCount > localMaxTotal ||
                newCreateCount > Integer.MAX_VALUE) {
            createCount.decrementAndGet();
            return null;
        }

        final PooledObject<T> p;
        try {
               // 创建一个到redis server的连接
            p = factory.makeObject();
        } catch (Exception e) {
            createCount.decrementAndGet();
            throw e;
        }

        AbandonedConfig ac = this.abandonedConfig;
        if (ac != null && ac.getLogAbandoned()) {
            p.setLogAbandoned(true);
        }

        createdCount.incrementAndGet();
        allObjects.put(p.getObject(), p);
        return p;
    }

ShardedJedisPool内部类ShardedJedisFactory

private static class ShardedJedisFactory implements PooledObjectFactory<ShardedJedis> {
    private List<JedisShardInfo> shards;
    private Hashing algo;
    private Pattern keyTagPattern;

    public ShardedJedisFactory(List<JedisShardInfo> shards, Hashing algo, Pattern keyTagPattern) {
      this.shards = shards;
      this.algo = algo;
      this.keyTagPattern = keyTagPattern;
    }
。。。。。
@Override
    public PooledObject<ShardedJedis> makeObject() throws Exception {
      ShardedJedis jedis = new ShardedJedis(shards, algo, keyTagPattern);
      return new DefaultPooledObject<ShardedJedis>(jedis);
    }

实际项目中CacheUtil使用

@Resource
    private ShardedJedisPool jedisPool;
。。。。
。。。。

 /**
     * @param key 缓存的key
     * @return String
     */
    public String getCache(String key) {
        ShardedJedis jedis = null;
        try {
            //连接池获取redis实例
            jedis = jedisPool.getResource();
            //连接服务端取值
            return jedis.get(key);
        } catch (JedisConnectionException e) {
            LOGGER.info("get cache {} error...", key, e);
            broken(jedis);
            return null;
        } finally {
            release(jedis);
        }
    }

jedis.get(key)时序图

image.png

Connection类中sendCommand

主要看connect();
Protocol.sendCommand(outputStream, cmd, args);

  protected Connection sendCommand(final ProtocolCommand cmd, final byte[]... args) {
    try {
      //连接服务器
      connect();
      //发送指令
      Protocol.sendCommand(outputStream, cmd, args);
      pipelinedCommands++;
      return this;
    } catch (JedisConnectionException ex) {
      // Any other exceptions related to connection?
      broken = true;
      throw ex;
    }
  }

//connect函数
public void connect() {
    if (!isConnected()) {
      try {
        socket = new Socket();
        // ->@wjw_add
        socket.setReuseAddress(true);
        socket.setKeepAlive(true); // Will monitor the TCP connection is
        // valid
        socket.setTcpNoDelay(true); // Socket buffer Whetherclosed, to
        // ensure timely delivery of data
        socket.setSoLinger(true, 0); // Control calls close () method,
        // the underlying socket is closed
        // immediately
        // <-@wjw_add

        socket.connect(new InetSocketAddress(host, port), connectionTimeout);
        socket.setSoTimeout(soTimeout);
        outputStream = new RedisOutputStream(socket.getOutputStream());
        inputStream = new RedisInputStream(socket.getInputStream());
      } catch (IOException ex) {
        broken = true;
        throw new JedisConnectionException(ex);
      }
    }
  }

//sendCommand函数
  public static void sendCommand(final RedisOutputStream os, final ProtocolCommand command,
      final byte[]... args) {
    sendCommand(os, command.getRaw(), args);
  }

  private static void sendCommand(final RedisOutputStream os, final byte[] command,
      final byte[]... args) {
    try {
      os.write(ASTERISK_BYTE);
      os.writeIntCrLf(args.length + 1);
      os.write(DOLLAR_BYTE);
      os.writeIntCrLf(command.length);
      os.write(command);
      os.writeCrLf();

      for (final byte[] arg : args) {
        os.write(DOLLAR_BYTE);
        os.writeIntCrLf(arg.length);
        os.write(arg);
        os.writeCrLf();
      }
    } catch (IOException e) {
      throw new JedisConnectionException(e);
    }
  }

五、Jedis之ShardedJedis一致性哈希分析

https://www.cnblogs.com/vhua/p/redis_2.html
https://my.oschina.net/xinxingegeya/blog/391713

参考:
https://www.jianshu.com/p/83ac498e1b2c
https://blog.csdn.net/keketrtr/article/details/78802571
https://blog.csdn.net/wy0123/article/details/79583506
https://cloud.tencent.com/developer/news/8009
哨兵模式客户端连接:https://blog.csdn.net/holdbelief/article/details/79791162
Jedis之ShardedJedis一致性哈希分析https://my.oschina.net/xinxingegeya/blog/391713
Jedis 与 ShardedJedis 设计
https://my.oschina.net/tantexian/blog/688019

上一篇下一篇

猜你喜欢

热点阅读