Redisson客户端的初始化流程及Netty处理器数据编解码《

2023-12-26  本文已影响0人  Teddy_b

以单节点为例,看下Redisson是怎么实现到Redis服务器的连接的

Redisson 单节点模式

Redisson中支持Redis的多种模式,其中单节点模式是作为一种特殊的主从模式实现的

public class SingleConnectionManager extends MasterSlaveConnectionManager {

    public SingleConnectionManager(SingleServerConfig cfg, ServiceManager serviceManager) {
        super(create(cfg), serviceManager);
    }
}

所以单节点的配置是直接继承主从模式的配置的,不一样的地方是读和订阅都会连接到主节点

newconfig.setReadMode(ReadMode.MASTER);
newconfig.setSubscriptionMode(SubscriptionMode.MASTER);

读写连接的初始化

Redisson中初始化到Redis服务器的连接时,会分别初始化读写连接和发布订阅连接,我们先看下读写连接是怎么初始化的

private CompletableFuture<RedisClient> setupMasterEntry(RedisClient client) {
        CompletableFuture<InetSocketAddress> addrFuture = client.resolveAddr();
        return addrFuture.thenCompose(res -> {
            masterEntry = new ClientConnectionsEntry(
                    client,
                    config.getMasterConnectionMinimumIdleSize(),
                    config.getMasterConnectionPoolSize(),
                    idleConnectionWatcher,
                    NodeType.MASTER,
                    config);

            List<CompletableFuture<Void>> futures = new ArrayList<>();

            CompletableFuture<Void> writeFuture = writeConnectionPool.initConnections(masterEntry);
            futures.add(writeFuture);

            return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
        }).whenComplete((r, e) -> {
            if (e != null) {
                client.shutdownAsync();
            }
        }).thenApply(r -> {
            writeConnectionPool.addEntry(masterEntry);
            return client;
        });
    }

再Redisson的源码中大量使用了CompletableFuture,所以读起来会相对晦涩一点,但是基本上都是异步调用、然后注册回调的模式

这里初始化连接的时候就是这样

创建读写连接

与创建读写链接比较相关的几个配置:

private void createConnection(ClientConnectionsEntry entry,
                                  CompletableFuture<Void> initPromise, int minimumIdleSize, AtomicInteger initializedConnections) {

        CompletableFuture<Void> f = acquireConnection(entry, null);
        f.thenAccept(r -> {
            CompletableFuture<T> promise = new CompletableFuture<T>();
            createConnection(entry, promise);
            promise.whenComplete((conn, e) -> {
                // 正常创建连接
                if (e == null) {
                    if (changeUsage()) {
                        conn.decUsage();
                    }
                     // 如果初始化还没完成,将刚刚创建的连接缓存起来
                    // 如果初始化已经完成了,说明刚刚创建的连接是多余的了,直接关闭连接
                    if (!initPromise.isDone()) {
                        entry.addConnection(conn);
                    } else {
                        conn.closeAsync();
                    }
                }

                 // 连接创建成功了,释放最大连接数量,让后续的创建连接请求能够继续创建连接
                releaseConnection(entry);
        
                // 创建连接异常了
                if (e != null) {
                    if (initPromise.isDone()) {
                        return;
                    }

                    // 关闭所有缓存的连接
                    for (RedisConnection connection : entry.getAllConnections()) {
                        if (!connection.isClosed()) {
                            connection.closeAsync();
                        }
                    }
                    entry.getAllConnections().clear();

                    // 异常初始化完成
                    initPromise.completeExceptionally(cause);
                    return;
                }

                // 创建的连接数是否达到了最小空闲连接数量
                // 已经达到最小连接数量了,则初始化正常完成
                // 还没达到最小连接数量, 则递归继续创建连接
                int value = initializedConnections.decrementAndGet();
                if (value == 0) {
                    if (initPromise.complete(null)) {
                        log.info("{} connections initialized for {}", minimumIdleSize, entry.getClient().getAddr());
                    }
                } else if (value > 0 && !initPromise.isDone()) {
                    createConnection(entry, initPromise, minimumIdleSize, initializedConnections);
                }
            });
        });
    }

这个是创建连接比较重要的一个方法,我们分别看下

public CompletableFuture<Void> acquire() {
        CompletableFuture<Void> future = new CompletableFuture<>();
        listeners.add(future);
        tryRun();
        return future;
    }

private void tryRun() {
        while (true) {
            if (counter.decrementAndGet() >= 0) {
                CompletableFuture<Void> future = listeners.poll();
                if (future == null) {
                    counter.incrementAndGet();
                    return;
                }

                if (future.complete(null)) {
                    return;
                }
            }

            if (counter.incrementAndGet() <= 0) {
                return;
            }
        }
    }

这里可以学习下这种用法,这里通过AtomicInteger维护了最大连接数量,尝试获取连接时会先创建一个CompletableFuture,并加入到ConcurrentLinkedQueue这个链表中

tryRun的时候会先尝试减少最大连接数量,减少后仍大于等于0,则正常完成刚刚创建的CompletableFuture

如果减少后小于0了,说明不能创建更多连接了,则撤回本次减少后,一直保存再链表中,等待有连接释放后唤醒它

private void createConnection(ClientConnectionsEntry entry, CompletableFuture<T> promise) {
        CompletionStage<T> connFuture = connect(entry);
        connFuture.whenComplete((conn, e) -> {
            if (e != null) {
                releaseConnection(entry);
                promiseFailure(entry, promise, e);
                return;
            }

            if (changeUsage()) {
                promise.thenApply(c -> c.incUsage());
            }
            connectedSuccessful(entry, promise, conn);
        });
    }

public CompletionStage<RedisConnection> connect() {
        CompletionStage<RedisConnection> future = client.connectAsync();
        return future.whenComplete((conn, e) -> {
            if (e != null) {
                return;
            }

            allConnections.add(conn);
        });
    }

public RFuture<RedisConnection> connectAsync() {
         // 解析处Redis服务器的地址
        CompletableFuture<InetSocketAddress> addrFuture = resolveAddr();
        CompletableFuture<RedisConnection> f = addrFuture.thenCompose(res -> {
            CompletableFuture<RedisConnection> r = new CompletableFuture<>();
             // 通过Netty客户端发送异步连接
            ChannelFuture channelFuture = bootstrap.connect(res);
            // 注册一个ChannelFutureListener,在连接建立后会进行回调
            channelFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(final ChannelFuture future) throws Exception {
                    if (bootstrap.config().group().isShuttingDown()) {
                        RedisConnectionException cause = new RedisConnectionException("RedisClient is shutdown");
                        r.completeExceptionally(cause);
                        return;
                    }

                    if (future.isSuccess()) {
                        RedisConnection c = RedisConnection.getFrom(future.channel());
                        c.getConnectionPromise().whenComplete((res, e) -> {
                            bootstrap.config().group().execute(new Runnable() {
                                @Override
                                public void run() {
                                    if (e == null) {
                                        if (!r.complete(c)) {
                                            c.closeAsync();
                                        } else {
                                            if (config.getConnectedListener() != null) {
                                                config.getConnectedListener().accept(getAddr());
                                            }
                                        }
                                    } else {
                                        r.completeExceptionally(e);
                                        c.closeAsync();
                                    }
                                }
                            });
                        });
                    } else {
                        bootstrap.config().group().execute(new Runnable() {
                            public void run() {
                                r.completeExceptionally(future.cause());
                            }
                        });
                    }
                }
            });
            return r;
        });
        return new CompletableFutureWrapper<>(f);
    }

这里实际的创建链接过程

public void release() {
        counter.incrementAndGet();
        tryRun();
    }

释放连接数量就是对之前的AtomicInteger进行自增,然后再tryRun中尝试唤醒一个仍然链表中的CompletableFuture进行完成,让这个CompletableFuture完成然后继续它的创建连接过程

Netty客户端

上面的创建链接最终是通过Netty客户端来完成的,Netty客户端的Bootstrap在初始化的时候会创建

private Bootstrap createBootstrap(RedisClientConfig config, Type type) {
        Bootstrap bootstrap = new Bootstrap()
                        .resolver(config.getResolverGroup())
                        .channel(config.getSocketChannelClass())
                        .group(config.getGroup());

        bootstrap.handler(new RedisChannelInitializer(bootstrap, config, this, channels, type));
        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getConnectTimeout());
        bootstrap.option(ChannelOption.SO_KEEPALIVE, config.isKeepAlive());
        bootstrap.option(ChannelOption.TCP_NODELAY, config.isTcpNoDelay());

        applyChannelOptions(config, bootstrap);

        config.getNettyHook().afterBoostrapInitialization(bootstrap);
        return bootstrap;
    }

这里主要的处理器是RedisChannelInitializer,我们重点看下这个

@Override
    protected void initChannel(Channel ch) throws Exception {
        initSsl(config, ch);
        
        if (type == Type.PLAIN) {
            ch.pipeline().addLast(new RedisConnectionHandler(redisClient));
        } 

        ch.pipeline().addLast(
            connectionWatchdog,
            new CommandEncoder(config.getCommandMapper()),
            CommandBatchEncoder.INSTANCE);

        if (type == Type.PLAIN) {
            ch.pipeline().addLast(new CommandsQueue());
        }

        if (pingConnectionHandler != null) {
            ch.pipeline().addLast(pingConnectionHandler);
        }
        
        if (type == Type.PLAIN) {
            ch.pipeline().addLast(new CommandDecoder(config.getAddress().getScheme()));
        }

        ch.pipeline().addLast(new ErrorsLoggingHandler());

    }

这里首先是初始化SSL,我们先不考虑SSL的连接

然后Netty客户端中InboundHandler和OutboundHandler的处理流程

public void channelActive(ChannelHandlerContext ctx) {
        List<CompletableFuture<Object>> futures = new ArrayList<>(5);

        InetSocketAddress addr = redisClient.resolveAddr().getNow(null);
        RedisClientConfig config = redisClient.getConfig();
        CompletionStage<Object> f = config.getCredentialsResolver().resolve(addr)
                .thenCompose(credentials -> {
                    String password = Objects.toString(config.getAddress().getPassword(),
                            Objects.toString(credentials.getPassword(), config.getPassword()));
                    if (password != null) {
                        
                            future = connection.async(RedisCommands.AUTH, password);

                        return future;
                    }
                    return CompletableFuture.completedFuture(null);
                });
        futures.add(f.toCompletableFuture());

        if (config.getPingConnectionInterval() > 0) {
            CompletionStage<Object> future = connection.async(RedisCommands.PING);
            futures.add(future.toCompletableFuture());
        }

        CompletableFuture<Void> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
        future.whenComplete((res, e) -> {
            if (e != null) {
                ...
                connection.closeAsync();
                connectionPromise.completeExceptionally(e);
                return;
            }

            ctx.fireChannelActive();
            connectionPromise.complete(connection);
        });
    }

这个处理器在连接建立后(channelActive),会根据我们知否配置了密码,来发送Redis命令AUTH到服务器,同时根据是否配置了心跳检测,来发送Redis命令PING,发送命令也是异步的

然后等待这两个命令CompletableFuture完成,如果正常完成,则对创建连接的CompletableFuture进行完成,返回的就是这个连接

如果CompletableFuture异常完成,则对创建连接的CompletableFuture进行异常完成,返回的是异常信息

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        if (msg instanceof QueueCommand) {
            QueueCommand data = (QueueCommand) msg;
            QueueCommandHolder holder = new QueueCommandHolder(data, promise);

            Queue<QueueCommandHolder> queue = ctx.channel().attr(COMMANDS_QUEUE).get();

            while (true) {
                if (lock.compareAndSet(false, true)) {
                    try {
                        queue.add(holder);
                        try {
                            holder.getChannelPromise().addListener(future -> {
                                if (!future.isSuccess()) {
                                    queue.remove(holder);
                                }
                            });
                            ctx.writeAndFlush(data, holder.getChannelPromise());
                        } catch (Exception e) {
                            queue.remove(holder);
                            throw e;
                        }
                    } finally {
                        lock.set(false);
                    }
                    break;
                }
            }
        } else {
            super.write(ctx, msg, promise);
        }
    }

这个处理器维护了一个双向队列,发送的Redis命令都会先进行入队操作,然后再发送这个命令到后面的处理器

可以学习下这里的写法,通过AtomicBoolean变量来确保并发时命令的顺序,只有先CAS操作成功的命令才会先入队

protected void encode(ChannelHandlerContext ctx, CommandData<?, ?> msg, ByteBuf out) throws Exception {
        try {
            out.writeByte("*");
            int len = 1 + msg.getParams().length;
            if (msg.getCommand().getSubName() != null) {
                len++;
            }
            out.writeBytes(longToString(len));
            out.writeBytes(CRLF);

            String name = commandMapper.map(msg.getCommand().getName());
            writeArgument(out, name.getBytes(CharsetUtil.UTF_8));
            if (msg.getCommand().getSubName() != null) {
                writeArgument(out, msg.getCommand().getSubName().getBytes(CharsetUtil.UTF_8));
            }

            for (Object param : msg.getParams()) {
                ByteBuf buf = encode(param);
                writeArgument(out, buf);
                if (!(param instanceof ByteBuf)) {
                    buf.release();
                }
            }

        } catch (Exception e) {
            msg.tryFailure(e);
            throw e;
        }
    }

可以看到这里都是按照redis协议的数组进行编码

private void writeArgument(ByteBuf out, byte[] arg) {
        out.writeByte("$");
        out.writeBytes(longToString(arg.length));
        out.writeBytes(CRLF);
        out.writeBytes(arg);
        out.writeBytes(CRLF);
      }

比如AUTH命令的格式就是:

*2\r\n
$4\r\nAUTH\r\n
$6\r\n123456\r\n

PING命令的格式就是:

*1\r\n
$4\r\nPING\r\n
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        QueueCommandHolder holder = getCommand(ctx);
        QueueCommand data = null;
        if (holder != null) {
            data = holder.getCommand();
        }

        if (state() == null) {
            state(new State());
        }
        
        if (data == null) {
            while (in.writerIndex() > in.readerIndex()) {
                int endIndex = skipCommand(in);

                try {
                    decode(ctx, in, null, 0);
                } catch (Exception e) {
                    in.readerIndex(endIndex);
                    throw e;
                }
            }
        } else {
            if (holder.getChannelPromise().isDone() && !holder.getChannelPromise().isSuccess()) {
                sendNext(ctx.channel());
                // throw REPLAY error
                in.indexOf(Integer.MAX_VALUE/2, Integer.MAX_VALUE, (byte) 0);
                return;
            }

            int endIndex = 0;
            if (!(data instanceof CommandsData)) {
                endIndex = skipCommand(in);
            } else {
                endIndex = skipBatchCommand(in, (CommandsData) data);
            }
            if (data.isExecuted()) {
                in.readerIndex(endIndex);
                sendNext(ctx.channel());
                return;
            }

            decode(ctx, in, data, endIndex);
        }
    }

这里说明下几个方法有利于理解解码过程

首先是这个skipCommand方法,它主要功能是找到不同命令的响应结果之间的分隔下标,因为不同命令都是先加入到队列中,然后一起发出去的,所以同一通道不同命令的响应结果可能是一起解析的

protected int skipCommand(ByteBuf in) throws Exception {
        in.markReaderIndex();
        skipDecode(in);
        int res = in.readerIndex();
        in.resetReaderIndex();
        return res;
    }
    
    protected void skipDecode(ByteBuf in) throws IOException{
        int code = in.readByte();
        if (code == '+') {
            skipString(in);
        } else if (code == '-') {
            skipString(in);
        } else if (code == ':') {
            skipString(in);
        } else if (code == '$') {
            skipBytes(in);
        } else if (code == '*') {
            long size = readLong(in);
            for (int i = 0; i < size; i++) {
                skipDecode(in);
            }
        }
    }

private void skipString(ByteBuf in) {
        int len = in.bytesBefore((byte) '\r');
        in.skipBytes(len + 2);
    }

这里只是为了找到不同命令响应数据之间的分隔下标,所以会通过markReaderIndex先把读下标保存起来,然后找到分隔下标后,再通过resetReaderIndex重置读下标,一遍再后续的解码中仍然能够处理到当前命令的响应结果

然后redis协议对响应数据的规范,单行字符串响应以+开头、错误响应以-开头、整数响应以:开头、多行字符串响应以$开头、数组响应以*开头

最后skipString就是读到\r\n换行符,这中间的长度就是第一个命令的完整响应长度了

记录这个长度是为了第一个命令处理响应结果处理异常时,方便处理第二个命令的响应结果

try {
                decode(in, cmd, null, channel, false, null);
                sendNext(channel, data);
 } catch (Exception e) {
                // 上一个命令解码异常,直接设置读下标为分隔下标
                in.readerIndex(endIndex);
                sendNext(channel);
                cmd.tryFailure(e);
                throw e;
 }

protected void sendNext(Channel channel) {
        Queue<QueueCommandHolder> queue = channel.attr(CommandsQueue.COMMANDS_QUEUE).get();
        queue.poll();
        state(null);
    }

再解码的时候,如果上一个命令解码异常,直接设置读下标为分隔下标,然后进行出队操作移除队首的上一个命令(这里才是出队)

这样再解码下一个命令的响应结果时,就可以直接从下一个命令的响应结果开始处理了

最后命令的响应数据的解码操作就是重新读取这个命令的响应结果,这主要是因为在找分隔下标的时候通过暂存读下标和重置读下标来完成

然后按照第一个字符区别处理即可

如单行字符串,就是先读取第一个字节,然后一直读取到换行符\r\n得到结果,然后通过命令结果的转换器对结果进行转换,最后使用这个结果完成通道CompletableFuture

int code = in.readByte();
if (code == '+') {
            int len = in.bytesBefore((byte) '\r');
        String result = in.toString(in.readerIndex(), len, CharsetUtil.UTF_8);
        in.skipBytes(len + 2);

            if (data != null && !skipConvertor) {
            result = data.getCommand().getConvertor().convert(result);
        }
        if (parts != null) {
            parts.add(result);
        } else {
            if (data != null) {
            data.getPromise().complete(result);
        }
        }
        }

如AUTH命令的响应结果:

+OK\r\n

再如AUTH和PING再同一通道一起发送时的响应结果,这时候找到的分隔下标就是6

+OK\r\n+PONG\r\n

至此,才完成了读写连接的创建,创建的连接会缓存下来,并且会同时缓存最小空闲连接数量(24)个连接

发布订阅连接的初始化

发布订阅连接的初始化过程和读写连接的初始化过程很多地方都是一致的,只在一些地方通过子类进行了重写

与发布订阅连接比较相关的几个配置:

在创建发布订阅连接时也会遵循这个配置

在创建连接时,除了类型不一样,其它的都是一样的

bootstrap = createBootstrap(copy, Type.PLAIN);
pubSubBootstrap = createBootstrap(copy, Type.PUBSUB);

对于类型Type.PUBSUB,处理器会有些不同

@Override
    protected void initChannel(Channel ch) throws Exception {
        initSsl(config, ch);
        
        else {
            ch.pipeline().addLast(new RedisPubSubConnectionHandler(redisClient));
        }

        ch.pipeline().addLast(
            connectionWatchdog,
            new CommandEncoder(config.getCommandMapper()),
            CommandBatchEncoder.INSTANCE);

        else {
            ch.pipeline().addLast(new CommandsQueuePubSub());
        }

        if (pingConnectionHandler != null) {
            ch.pipeline().addLast(pingConnectionHandler);
        }
        
        else {
            ch.pipeline().addLast(new CommandPubSubDecoder(config));
        }

        ch.pipeline().addLast(new ErrorsLoggingHandler());

    }
RedisPubSubConnection createConnection(ChannelHandlerContext ctx) {
        return new RedisPubSubConnection(redisClient, ctx.channel(), connectionPromise);
    }
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        if (msg instanceof QueueCommand) {
            QueueCommand data = (QueueCommand) msg;
            QueueCommandHolder holder = queue.peek();
            if (holder != null && holder.getCommand() == data) {
                super.write(ctx, msg, promise);
            } else {
                queue.add(new QueueCommandHolder(data, promise));
                sendData(ctx.channel());
            }
        } else {
            super.write(ctx, msg, promise);
        }
    }

private void sendData(Channel ch) {
        QueueCommandHolder holder = queue.peek();
        if (holder != null && holder.trySend()) {
            QueueCommand data = holder.getCommand();
            List<CommandData<Object, Object>> pubSubOps = data.getPubSubOperations();
            if (!pubSubOps.isEmpty()) {
                for (CommandData<Object, Object> cd : pubSubOps) {
                    for (Object channel : cd.getParams()) {
                        ch.pipeline().get(CommandPubSubDecoder.class).addPubSubCommand((ChannelName) channel, cd);
                    }
                }
            } else {
                ch.attr(CURRENT_COMMAND).set(holder);
            }

            holder.getChannelPromise().addListener(listener);
            ch.writeAndFlush(data, holder.getChannelPromise());
        }
    }

这里也同时维护了一个链表队列和一个通道属性,发送命令时

看到这里可能有点懵,但是先别急,再看下解码的处理器就能理解了

最后再看下订阅响应数据的解码过程,需要注意的是这里的命令data是为null的;SUBSCRIBE命令的响应一般情况下是长这样

*3\r\n
$9\r\nsubscribe\r\n
$7\r\nmyTopic\r\n
:1\r\n

是一个长度为3的数组,第一个元素是多行字符串,订阅成功时固定为subscribe;第二个也是多行字符串,为订阅的频道名称;第三个是整数,为订阅的频道数量

因此再解码的时候会首先解析code=*,然后再递归解析每一个数组元素,对应的就会解析到code=$code=:

protected void decode(ByteBuf in, CommandData<Object, Object> data, List<Object> parts, Channel channel, boolean skipConvertor, List<CommandData<?, ?>> commandsData) throws IOException {
      int code = in.readByte();
      
      } else if (code == ':') {
          Long result = readLong(in);
          handleResult(data, parts, result, false);
      } else if (code == '$') {
          ByteBuf buf = readBytes(in);
          Object result = null;
          if (buf != null) {
              Decoder<Object> decoder = selectDecoder(data, parts);
              result = decoder.decode(buf, state());
          }
          handleResult(data, parts, result, false);
      } else if (code == '*') {
          long size = readLong(in);
          List<Object> respParts = new ArrayList<Object>(Math.max((int) size, 0));
          
          state().incLevel();
          
          for (int i = respParts.size(); i < size; i++) {
              decode(in, data, respParts, channel, skipConvertor, null);
          }
          
          state().decLevel();
          
      }
  }

通过上面的解析之后,解析到的数据都会先保存在respParts这个数组中,然后还需要获取订阅命令的解码器,这时会遇到一个问题是由于当前命令data为null,无法知道它的解码器是什么;但是在CommandsQueuePubSub编码的时候,已经将订阅的频道名称和订阅命令之间的映射关系保存在CommandPubSubDecoder处理器中,而频道的名称现在已经解析到了,就可以通过频道名称获取到当前命令了

@Override
    protected MultiDecoder<Object> messageDecoder(CommandData<Object, Object> data, List<Object> parts) {
        if (parts.isEmpty() || parts.get(0) == null) {
            return null;
        }
        // 响应的第一个元素在订阅成功时固定为subscribe
        String command = parts.get(0).toString();
        if ("subscribe".contains(command)) {
             // 响应的第二个元素是频道名称
            ChannelName channelName = new ChannelName((byte[]) parts.get(1));
            PubSubKey key = new PubSubKey(channelName, command);
            CommandData<Object, Object> commandData = commands.get(key);
            if (commandData == null) {
                return null;
            }
            return commandData.getCommand().getReplayMultiDecoder();
        }
    }

对于SUBSCRIBE命令,它的解码器就是PubSubStatusDecoder,可以看到就是通过第一个元素和第二个元素构建一个PubSubStatusMessage对象

RedisCommand<Object> SUBSCRIBE = new RedisCommand<Object>("SUBSCRIBE", new PubSubStatusDecoder());

public class PubSubStatusDecoder implements MultiDecoder<Object> {

    @Override
    public PubSubStatusMessage decode(List<Object> parts, State state) {
        PubSubType type = PubSubType.valueOf(parts.get(0).toString().toUpperCase());
        ChannelName name = new ChannelName((byte[]) parts.get(1));
        return new PubSubStatusMessage(type, name);
    }

}

最后再对这个结果进行分发,PubSubStatusMessage是实现了Message接口的

protected void decodeResult(CommandData<Object, Object> data, List<Object> parts, Channel channel,
            Object result) throws IOException {
        
        if (result instanceof Message) {
            checkpoint();

             // 先把频道名称、当前命令信息保存到entries这个Map里
            RedisPubSubConnection pubSubConnection = RedisPubSubConnection.getFrom(channel);
            ChannelName channelName = ((Message) result).getChannel();
            if (result instanceof PubSubStatusMessage) {
                String operation = ((PubSubStatusMessage) result).getType().name().toLowerCase();
                PubSubKey key = new PubSubKey(channelName, operation);
                CommandData<Object, Object> d = commands.get(key);
                if ("SUBSCRIBE".contains(d.getCommand().getName())) {
                    commands.remove(key);
                    entries.put(channelName, new PubSubEntry(d.getMessageDecoder()));
                }
            }

            // 发布订阅的消息是否需要按顺序发送,默认是true
            if (config.isKeepPubSubOrder()) {
               
                PubSubEntry entry = entries.get(channelName);
                if (entry != null) {
                    enqueueMessage(result, pubSubConnection, entry);
                }
            } else {
                config.getExecutor().execute(new Runnable() {
                    @Override
                    public void run() {
                        if (result instanceof PubSubStatusMessage) {
                            pubSubConnection.onMessage((PubSubStatusMessage) result);
                        } else if (result instanceof PubSubMessage) {
                            pubSubConnection.onMessage((PubSubMessage) result);
                        } else if (result instanceof PubSubPatternMessage) {
                            pubSubConnection.onMessage((PubSubPatternMessage) result);
                        }
                    }
                });
            }
        } else {
            if (data != null && data.getCommand().getName().equals("PING")) {
                super.decodeResult(data, parts, channel, result);
            }
        }
    }

这里的处理流程

至此就完成了Redisson客户端的初始化流程

源码阅读最大的感受是Redisson中基本看不到同步锁操作(也有),大量使用无锁CAS思维来解决并发问题,值得借鉴学习

另外大量使用CompletableFuture,这种异步编程思路也值得借鉴学习

参考

上一篇 下一篇

猜你喜欢

热点阅读