Dubbo 3.0源码剖析

2023-02-24  本文已影响0人  王侦

1.手写模拟Dubbo

总结:

2.服务导出

几个核心步骤:

Dubbo接口级的服务注册 vs SpringCloud应用级的服务注册:


@EnableDubbo
-> @EnableDubboConfig
-> @Import(DubboConfigConfigurationRegistrar.class)
-> DubboConfigConfigurationRegistrar是个ImportBeanDefinitionRegistrar,调用registerBeanDefinitions()
-> DubboSpringInitializer#initialize
-> DubboBeanUtils#registerCommonBeans
-> 注册DubboDeployApplicationListener监听器监听ContextRefreshedEvent事件。
-> DubboDeployApplicationListener#onContextRefreshedEvent
-> DefaultModuleDeployer#start
1)服务导出exportServices();
2)服务引入referServices();
3)应用级注册onModuleStarted();

DubboDeployApplicationListener#onApplicationEvent,监听ContextRefreshedEvent事件。

    public void onApplicationEvent(ApplicationContextEvent event) {
        if (nullSafeEquals(applicationContext, event.getSource())) {
            if (event instanceof ContextRefreshedEvent) {
                onContextRefreshedEvent((ContextRefreshedEvent) event);
            } else if (event instanceof ContextClosedEvent) {
                onContextClosedEvent((ContextClosedEvent) event);
            }
        }
    }

2.1 Dubbo3.0之前接口级的服务注册

注册过程:

应用的每个接口都会注册到注册中心里面:


2.2 Dubbo3.0应用级的服务注册

Dubbo3.0为了兼容之前版本,既会进行接口级服务注册,也会进行应用级的服务注册。

注册过程:

如果服务暴露了Triple协议+20880端口,并且metadata-type为local,则应用级注册为:ip + 20881(优先是Dubbo的ip+port,这里是MetadataService)。因为此时MetadataService会暴露为Dubbo协议+20881(默认的20880被占用)。

怎么判断端口是否被占用?

只根据应用级注册的信息,服务消费者(调用者)怎么办?

那这里,如果应用配置相同的协议(dubbo)有两个端口,在dubbo.endpoints里怎么处理?

如果配置Triple协议,有两个端口,强制走接口FORCE_INTERFACE,会有另外一个Bug,也只会调用一个端口,另一个端口无法调用。

元数据中心特点:

如上图,应用级注册的值里面也会标识metadata是local还是remote,也就是指示服务消费者是到元数据中心查询还是通过Dubbo调用服务提供者的MetadataService来获取元数据信息。

另外还有一个重要应用,通过元数据信息只能查到<接口/服务, 协议>,是没有端口的,那这个端口在哪里?就在应用级注册的值里面:

1)元数据默认方式:

2)元数据向元数据中心注册的方式:


3.服务引入

@DubboReference
-> ReferenceConfig
-> ReferenceConfig#get 生成代理对象
-> RegistryProtocol#refer
-> RegistryProtocol#doRefer 生成MigrationInvoker
A)FORCE_INTERFACE,强制使用接口级服务引入
B)FORCE_APPLICATION,强制使用应用级服务引入
C)APPLICATION_FIRST,智能选择是接口级还是应用级,默认就是这个

总结一下整体服务消费者(调用者)的查找流程:

ServiceDiscoveryRegistryDirectory#subscribe
-> ServiceDiscoveryRegistry#doSubscribe
-> serviceNameMapping.getAndListen(),获取接口对应的应用名
-> serviceDiscovery.getInstances() 根据应用名,从/services/应用名节点查出所有实例
-> serviceDiscovery.getRemoteMetadata(),获取应用的元数据(从元数据中心或元数据服务获取)

服务消费者监听什么变化?

MigrationInvoker

# dubbo.application.service-discovery.migration 仅支持通过 -D 以及 全局配置中心 两种方式进行配置。
dubbo.application.service-discovery.migration=APPLICATION_FIRST

# 可选值 
# FORCE_INTERFACE,强制使用接口级服务引入
# FORCE_APPLICATION,强制使用应用级服务引入
# APPLICATION_FIRST,智能选择是接口级还是应用级,默认就是这个

事实上,在进行某个服务的服务引入时,会统一利用InterfaceCompatibleRegistryProtocol的refer来生成一个MigrationInvoker对象,在MigrationInvoker中有三个属性:

private volatile ClusterInvoker<T> invoker;  // 用来记录接口级ClusterInvoker
private volatile ClusterInvoker<T> serviceDiscoveryInvoker; // 用来记录应用级的ClusterInvoker
private volatile ClusterInvoker<T> currentAvailableInvoker; // 用来记录当前使用的ClusterInvoker,要么是接口级,要么应用级

RegistryProtocol#doRefer

MigrationInvoker#invoke

    @Override
    public Result invoke(Invocation invocation) throws RpcException {
        // currentAvailableInvoker要么是接口级ClusterInvoker,要么是应用级ClusterInvoker
        if (currentAvailableInvoker != null) {
            if (step == APPLICATION_FIRST) {

                // call ratio calculation based on random value
                // 在同时支持接口级和应用级的情况下,如果promotion小于100,则每次调用时,生成一个100以内的随机数,如果随机数大于promotion,则走接口级ClusterInvoker进行服务调用
                // 表示支持部分走接口级调用,部分走应用级调用,看随机数
                // promotion默认等于100,所以默认不会支持部分
                if (promotion < 100 && ThreadLocalRandom.current().nextDouble(100) > promotion) {
                    return invoker.invoke(invocation);
                }
            }

            return currentAvailableInvoker.invoke(invocation);
        }

        switch (step) {
            case APPLICATION_FIRST:
                if (checkInvokerAvailable(serviceDiscoveryInvoker)) {
                    currentAvailableInvoker = serviceDiscoveryInvoker;
                } else if (checkInvokerAvailable(invoker)) {
                    currentAvailableInvoker = invoker;
                } else {
                    currentAvailableInvoker = serviceDiscoveryInvoker;
                }
                break;
            case FORCE_APPLICATION:
                currentAvailableInvoker = serviceDiscoveryInvoker;
                break;
            case FORCE_INTERFACE:
            default:
                currentAvailableInvoker = invoker;
        }

        return currentAvailableInvoker.invoke(invocation);
    }

4.服务调用

4.1 服务导出TripleProtocol协议的流程

TripleProtocol#export

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();
        checkProtobufVersion(url);
        // 本地注册--->实例类
        String key = serviceKey(url);

        // 服务导出器,用来卸载服务时做一些善后处理
        final AbstractExporter<T> exporter = new AbstractExporter<T>(invoker) {
            @Override
            public void afterUnExport() {
                pathResolver.remove(url.getServiceKey());
                pathResolver.add(url.getServiceModel().getServiceModel().getInterfaceName(),
                    invoker);
                // set service status
                triBuiltinService.getHealthStatusManager()
                    .setStatus(url.getServiceKey(), ServingStatus.NOT_SERVING);
                triBuiltinService.getHealthStatusManager()
                    .setStatus(url.getServiceInterface(), ServingStatus.NOT_SERVING);
                exporterMap.remove(key);
            }
        };

        exporterMap.put(key, exporter);

        invokers.add(invoker);

        pathResolver.add(url.getServiceKey(), invoker); // url 20882
        pathResolver.add(url.getServiceModel().getServiceModel().getInterfaceName(), invoker);

        // set service status
        triBuiltinService.getHealthStatusManager()
            .setStatus(url.getServiceKey(), HealthCheckResponse.ServingStatus.SERVING);
        triBuiltinService.getHealthStatusManager()
            .setStatus(url.getServiceInterface(), HealthCheckResponse.ServingStatus.SERVING);

        // 启动服务器,用来处理HTTP2的请求
        PortUnificationExchanger.bind(invoker.getUrl());
        return exporter;
    }

PortUnificationExchanger#bind

    public static void bind(URL url) {
        // servers表示可以同时运行多个PortUnificationServer,只需要绑定的host+port不一样即可
        servers.computeIfAbsent(url.getAddress(), addr -> {
            final PortUnificationServer server = new PortUnificationServer(url);
            // 运行NettyServer,并绑定ip和port
            server.bind();
            return server;
        });
    }
    public void bind() {
        if (channel == null) {
            doOpen();
        }
    }
    protected void doOpen() {
        bootstrap = new ServerBootstrap();

        bossGroup = NettyEventLoopFactory.eventLoopGroup(1, EVENT_LOOP_BOSS_POOL_NAME);
        workerGroup = NettyEventLoopFactory.eventLoopGroup(
            getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
            EVENT_LOOP_WORKER_POOL_NAME);

        bootstrap.group(bossGroup, workerGroup)
            .channel(NettyEventLoopFactory.serverSocketChannelClass())
            .option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
            .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
            .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            .childHandler(new ChannelInitializer<SocketChannel>() {

                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    // FIXME: should we use getTimeout()?
                    int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
                    // ch是Socket连接
                    final ChannelPipeline p = ch.pipeline();
//                        p.addLast(new LoggingHandler(LogLevel.DEBUG));

                    final boolean enableSsl = getUrl().getParameter(SSL_ENABLED_KEY, false);
                    if (enableSsl) {
                        p.addLast("negotiation-ssl", new SslServerTlsHandler(getUrl()));
                    }

                    // 初始化SocketChannel,并在pipeline中绑定PortUnificationServerHandler
                    final PortUnificationServerHandler puHandler = new PortUnificationServerHandler(url, protocols);
                    p.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS));
                    p.addLast("negotiation-protocol", puHandler);
                    channelGroup = puHandler.getChannels();
                }
            });
        // bind

        String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
        int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
        if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
            bindIp = ANYHOST_VALUE;
        }
        InetSocketAddress bindAddress = new InetSocketAddress(bindIp, bindPort);
        ChannelFuture channelFuture = bootstrap.bind(bindAddress);
        channelFuture.syncUninterruptibly();
        channel = channelFuture.channel();
    }

核心是PortUnificationServerHandler。


ByteToMessageDecoder#channelRead

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof ByteBuf) {
            CodecOutputList out = CodecOutputList.newInstance();
            try {
                first = cumulation == null;
                cumulation = cumulator.cumulate(ctx.alloc(),
                        first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg);
                callDecode(ctx, cumulation, out);
            } catch (DecoderException e) {
                throw e;
            } catch (Exception e) {
                throw new DecoderException(e);
            } finally {
                try {
                    if (cumulation != null && !cumulation.isReadable()) {
                        numReads = 0;
                        cumulation.release();
                        cumulation = null;
                    } else if (++numReads >= discardAfterReads) {
                        // We did enough reads already try to discard some bytes so we not risk to see a OOME.
                        // See https://github.com/netty/netty/issues/4275
                        numReads = 0;
                        discardSomeReadBytes();
                    }

                    int size = out.size();
                    firedChannelRead |= out.insertSinceRecycled();
                    fireChannelRead(ctx, out, size);
                } finally {
                    out.recycle();
                }
            }
        } else {
            ctx.fireChannelRead(msg);
        }
    }

ByteToMessageDecoder#callDecode

    protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        try {
            while (in.isReadable()) {
                int outSize = out.size();

                if (outSize > 0) {
                    fireChannelRead(ctx, out, outSize);
                    out.clear();

                    // Check if this handler was removed before continuing with decoding.
                    // If it was removed, it is not safe to continue to operate on the buffer.
                    //
                    // See:
                    // - https://github.com/netty/netty/issues/4635
                    if (ctx.isRemoved()) {
                        break;
                    }
                    outSize = 0;
                }

                int oldInputLength = in.readableBytes();
                decodeRemovalReentryProtection(ctx, in, out);

                // Check if this handler was removed before continuing the loop.
                // If it was removed, it is not safe to continue to operate on the buffer.
                //
                // See https://github.com/netty/netty/issues/1664
                if (ctx.isRemoved()) {
                    break;
                }

                if (outSize == out.size()) {
                    if (oldInputLength == in.readableBytes()) {
                        break;
                    } else {
                        continue;
                    }
                }

                if (oldInputLength == in.readableBytes()) {
                    throw new DecoderException(
                            StringUtil.simpleClassName(getClass()) +
                                    ".decode() did not read anything but decoded a message.");
                }

                if (isSingleDecode()) {
                    break;
                }
            }
        } catch (DecoderException e) {
            throw e;
        } catch (Exception cause) {
            throw new DecoderException(cause);
        }
    }
    final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
            throws Exception {
        decodeState = STATE_CALLING_CHILD_DECODE;
        try {
            decode(ctx, in, out);
        } finally {
            boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
            decodeState = STATE_INIT;
            if (removePending) {
                fireChannelRead(ctx, out, out.size());
                out.clear();
                handlerRemoved(ctx);
            }
        }
    }

这里会调用PortUnificationServerHandler#decode

    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
       // ctx.channel() 拿到的是NioSocketChannel 接收到数据ByteBuf后,会先解码,然后再触发Handler的channelRead

        // 根据前5个字节确定对应的协议
        // Will use the first five bytes to detect a protocol.
        if (in.readableBytes() < 5) {
            return;
        }

        // 看是不是HTTP2.0
        for (final WireProtocol protocol : protocols) {
            in.markReaderIndex();
            final ProtocolDetector.Result result = protocol.detector().detect(ctx, in);
            in.resetReaderIndex();
            switch (result) {
                case UNRECOGNIZED:
                    continue;
                case RECOGNIZED:
                    // 符合个某个协议后,再給Socket连接对应的pipeline绑定Handler
                    protocol.configServerPipeline(url, ctx.pipeline(), sslCtx);
                    ctx.pipeline().remove(this);
                case NEED_MORE_DATA:
                    return;
                default:
                    return;
            }
        }
        // Unknown protocol; discard everything and close the connection.
        in.clear();
        ctx.close();
    }

总结一下后续流程:

重点看一下TripleHttp2Protocol#configServerPipeline

        // Http2MultiplexHandler是用来创建子Channel的,并且ChannelInitializer是用来初始化子Channel
        // 一个Socket连接对应一个NioSocketChannel,下层可以设置多个子Channel,每个子Channel对应一个HTTP2Stream
        final Http2MultiplexHandler handler = new Http2MultiplexHandler(

            // Channel初始化器,用来初始化传入进来的Channel,比如HTTP2Stream所对应的Channel
            new ChannelInitializer<Channel>() {
                @Override
                protected void initChannel(Channel ch) {
                    final ChannelPipeline p = ch.pipeline();
                    // 将QueuedCommand转换成Http2StreamFrame,然后再发出去
                    p.addLast(new TripleCommandOutBoundHandler());

                    // TripleHttp2FrameServerHandler处理的是HTTP2Stream所对应的子Channel
                    // 核心Handler, 用来处理Http2HeadersFrame、Http2DataFrame, lookupExecutor会根据服务url得到一个线程池
                    // 每个子Channel对应一个线程池?还是共享一个线程池?默认是共享一个
                    p.addLast(new TripleHttp2FrameServerHandler(frameworkModel, lookupExecutor(url),
                        filters));
                }
            });

核心是TripleHttp2FrameServerHandler。

4.2 服务引入TripleProtocol协议的流程

    public TripleInvoker(Class<T> serviceType,
        URL url,
        String acceptEncodings,
        ConnectionManager connectionManager,
        Set<Invoker<?>> invokers,
        ExecutorService streamExecutor) {
        super(serviceType, url, new String[]{INTERFACE_KEY, GROUP_KEY, TOKEN_KEY});
        this.invokers = invokers;
        // 与服务提供者建立Socket连接
        this.connection = connectionManager.connect(url);
        this.acceptEncodings = acceptEncodings;
        this.streamExecutor = streamExecutor;
    }

MultiplexProtocolConnectionManager#connect

    public Connection connect(URL url) {
        // 协议相同的URL将对应同一个ConnectionManager
        final ConnectionManager manager = protocols.computeIfAbsent(url.getProtocol(), this::createSingleProtocolConnectionManager);
        return manager.connect(url);
    }

SingleProtocolConnectionManager#connect

    public Connection connect(URL url) {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        // 不同的address对应不同的Connection对象
        return connections.compute(url.getAddress(), (address, conn) -> {
            if (conn == null) {
                final Connection created = new Connection(url);
                created.getClosePromise().addListener(future -> connections.remove(address, created));
                return created;
            } else {
                conn.retain();
                return conn;
            }
        });
    }
    public Connection(URL url) {
        url = ExecutorUtil.setThreadName(url, "DubboClientHandler");
        url = url.addParameterIfAbsent(THREADPOOL_KEY, DEFAULT_CLIENT_THREADPOOL);
        this.url = url;
        this.protocol = ExtensionLoader.getExtensionLoader(WireProtocol.class).getExtension(url.getProtocol());
        this.connectTimeout = url.getPositiveParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT);
        this.remote = getConnectAddress();
        // 只是创建一个Bootstrap对象,并不会建立Socket连接
        this.bootstrap = create();
    }
    private Bootstrap create() {
        final Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(NettyEventLoopFactory.NIO_EVENT_LOOP_GROUP.get())
                .option(ChannelOption.SO_KEEPALIVE, true)
                .option(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .remoteAddress(remote)
                .channel(socketChannelClass());

        final ConnectionHandler connectionHandler = new ConnectionHandler(this);
        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout);
        bootstrap.handler(new ChannelInitializer<SocketChannel>() {

            @Override
            protected void initChannel(SocketChannel ch) {
                final ChannelPipeline pipeline = ch.pipeline();
                SslContext sslContext = null;
                if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
                    pipeline.addLast("negotiation", new SslClientTlsHandler(url));
                }

                //.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                // TODO support IDLE
//                int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());
                pipeline.addLast(connectionHandler);
                protocol.configClientPipeline(url, pipeline, sslContext);
                // TODO support Socks5
            }
        });
        return bootstrap;
    }

构造TripleInvoker,创建connection只是创建一个Bootstrap对象,并不会建立Socket连接。

4.3 调用逻辑

4.3.1 服务消费者发送请求

那Socket连接什么时候创建呢?

 StreamObserver<String> streamObserver = demoService.sayHelloBiStream(new ZhouyuResultStreamObserver());
 streamObserver.onNext("zhouyu1");
 streamObserver.onCompleted();

TripleInvoker#doInvoke

TripleInvoker#invokeBiOrClientStream

TripleInvoker#streamCall

ClientCall#start

ClientStream#createWriteQueue

    private WriteQueue createWriteQueue(Channel parent) {
        // 利用Netty开启一个Http2StreamChannel,也就是HTTP2中的流
        final Http2StreamChannelBootstrap bootstrap = new Http2StreamChannelBootstrap(parent);
        final Future<Http2StreamChannel> future = bootstrap.open().syncUninterruptibly();
        if (!future.isSuccess()) {
            throw new IllegalStateException("Create remote stream failed. channel:" + parent);
        }

        // 并绑定两个Handler,一个工作在发送数据时,一个工作在接收数据时
        final Http2StreamChannel channel = future.getNow();
        channel.pipeline()
            .addLast(new TripleCommandOutBoundHandler())
            // TripleHttp2ClientResponseHandler是用来接收响应的
            .addLast(new TripleHttp2ClientResponseHandler(createTransportListener()));

        // 基于Http2StreamChannel创建一个WriteQueue
        // 后续把要发送的数据,添加到WriteQueue中就能发送出去了
        return new WriteQueue(channel);
    }

当用户程序调用streamObserver.onNext()发送数据时,实际调用的是:

发送的核心逻辑WriteQueue#enqueue(QueuedCommand):

具体的发送:

DataQueueCommand#doSend

    public void doSend(ChannelHandlerContext ctx, ChannelPromise promise) {
        if (data == null) {
            ctx.write(new DefaultHttp2DataFrame(endStream), promise);
        } else {
            ByteBuf buf = ctx.alloc().buffer();
            // 第一个字节记录请求体是否被压缩
            buf.writeByte(compressFlag);
            // 后四个字节记录请求体的长度
            buf.writeInt(data.length);
            // 真实的数据
            buf.writeBytes(data);
            // 发送
            ctx.write(new DefaultHttp2DataFrame(buf, endStream), promise);
        }
    }

总结一下:

4.3.2 服务提供者处理请求逻辑

TripleHttp2FrameServerHandler#channelRead

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof Http2HeadersFrame) {
            onHeadersRead(ctx, (Http2HeadersFrame) msg);
        } else if (msg instanceof Http2DataFrame) {
            onDataRead(ctx, (Http2DataFrame) msg);
        } else if (msg instanceof ReferenceCounted) {
            // ignored
            ReferenceCountUtil.release(msg);
        }
    }

处理请求头TripleHttp2FrameServerHandler#onHeadersRead

处理请求体TripleHttp2FrameServerHandler#onDataRead

UNARY调用逻辑(同步阻塞)

SERVER_STREAM调用逻辑(异步)

在Invoker#invoker里面就可以调用responseObserver向客户端发送响应。

    public void sayHelloServerStream(String name, StreamObserver<String> response) {

        response.onNext(name + " hello");

        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        response.onNext(name + " world");

        response.onCompleted();

    }

BI_STREAM或CLIENT_STREAM(异步)

    public StreamObserver<String> sayHelloBiStream(StreamObserver<String> response) {
        return new StreamObserver<String>() {
            @Override
            public void onNext(String name) {
                System.out.println(name);
                response.onNext("hello: "+name);
            }

            @Override
            public void onError(Throwable throwable) {
            }

            @Override
            public void onCompleted() {
                System.out.println("completed");
            }
        };
    }

4.3.3 客户端接收到响应TripleHttp2ClientResponseHandler

TripleHttp2ClientResponseHandler#channelRead0

    protected void channelRead0(ChannelHandlerContext ctx, Http2StreamFrame msg) throws Exception {
        if (msg instanceof Http2HeadersFrame) {
            final Http2HeadersFrame headers = (Http2HeadersFrame) msg;
            transportListener.onHeader(headers.headers(), headers.isEndStream());
        } else if (msg instanceof Http2DataFrame) {
            final Http2DataFrame data = (Http2DataFrame) msg;
            transportListener.onData(data.content(), data.isEndStream());
        } else {
            super.channelRead(ctx, msg);
        }
    }

ClientStream.ClientTransportListener#onData

        public void onData(ByteBuf data, boolean endStream) {
            executor.execute(() -> {

                // transportError不等于null,表示处理响应头时就有问题了
                if (transportError != null) {
                    transportError.appendDescription(
                        "Data:" + data.toString(StandardCharsets.UTF_8));
                    // 释放内存空间
                    ReferenceCountUtil.release(data);
                    //
                    if (transportError.description.length() > 512 || endStream) {
                        handleH2TransportError(transportError);

                    }
                    return;
                }

                if (!headerReceived) {
                    handleH2TransportError(TriRpcStatus.INTERNAL.withDescription(
                        "headers not received before payload"));
                    return;
                }

                // 接收到响应体数据后,把数据添加到accumulate中进行保存
                deframer.deframe(data);
            });
        }

ClientStream.ClientTransportListener#onData
-> TriDecoder#deframe
-> TriDecoder#deliver
-> TriDecoder#processBody
-> ClientCall.ClientStreamListenerImpl#onMessage
-> listener.onMessage(unpacked);有两种方式:同步和异步
UnaryClientCallListener#onMessage同步
ObserverToClientCallListenerAdapter#onMessage异步

ObserverToClientCallListenerAdapter#onMessage异步

    public void onMessage(Object message) {
        // 接收到一个响应结果,回调StreamObserver
        delegate.onNext(message);
        // 继续处理下一个响应结果
        if (call.isAutoRequestN()) {
            call.requestN(1);
        }
    }

上面的delegate就是服务消费者这边创建的ZhouyuResultStreamObserver:

    static class ZhouyuResultStreamObserver implements StreamObserver<String> {


        @Override
        public void onNext(String data) {
            System.out.println(data);
        }

        @Override
        public void onError(Throwable throwable) {
            System.out.println(throwable);
        }

        @Override
        public void onCompleted() {
            System.out.println("complete");
        }
    }

梳理一下客户端AbstractInvoker#invoke整个流程

同步阻塞什么时候唤醒?

4.3.4 SERVER_STREAM的一个BUG

服务提供者逻辑如下。

    public void sayHelloServerStream(String name, StreamObserver<String> response) {

        response.onNext(name + " hello");

        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        response.onNext(name + " world");

        response.onCompleted();

    }

预期是服务消费者拿到一个数据帧后,停3s,然后再拿到后续的数据。但是Dubbo这里是直接等3s,然后一下子拿到了所有三个数据。

整体流程:

建议更改:

SerializingExecutor#execute

    public void execute(Runnable r) {
        // SerializingExecutor会对加入到runQueue中的Runnable用一个线程进行串行处理

        // 将Runnable任务添加到队列
        runQueue.add(r);

        // 使用内部线程池executor中的一个线程来运行
        schedule(r);
    }

SerializingExecutor#schedule

    private void schedule(Runnable removable) {
        if (atomicBoolean.compareAndSet(false, true)) {
            boolean success = false;
            try {
                // SerializingExecutor内部保护了一个线程池executor,这个线程池是根据服务url创建出来的
                // 注意:这里并不是把runQueue队列中的Runnable任务拿出来用线程去执行
                // 而是把SerializingExecutor自己作为一个Runnable交给线程池中的一个线程去执行
                // 这里其实就是利用一个线程去执行SerializingExecutor中的run方法,从而获取runQueue中的任务进行执行
                executor.execute(this);
                success = true;
            } finally {
                // It is possible that at this point that there are still tasks in
                // the queue, it would be nice to keep trying but the error may not
                // be recoverable.  So we update our state and propagate so that if
                // our caller deems it recoverable we won't be stuck.
                if (!success) {
                    if (removable != null) {
                        // This case can only be reached if 'this' was not currently running, and we failed to
                        // reschedule.  The item should still be in the queue for removal.
                        // ConcurrentLinkedQueue claims that null elements are not allowed, but seems to not
                        // throw if the item to remove is null.  If removable is present in the queue twice,
                        // the wrong one may be removed.  It doesn't seem possible for this case to exist today.
                        // This is important to run in case of RejectedExecutionException, so that future calls
                        // to execute don't succeed and accidentally run a previous runnable.
                        runQueue.remove(removable);
                    }
                    atomicBoolean.set(false);
                }
            }
        }
    }

SerializingExecutor#run

    public void run() {
        Runnable r;
        try {
            while ((r = runQueue.poll()) != null) {
                try {
                    r.run();
                } catch (RuntimeException e) {
                    LOGGER.error("Exception while executing runnable " + r, e);
                }
            }
        } finally {
            atomicBoolean.set(false);
        }

        // 如果队列中不为空,则继续获取一个线程执行run(),继续获取队列中的任务进行执行
        if (!runQueue.isEmpty()) {
            // we didn't enqueue anything but someone else did.
            schedule(null);
        }
    }
上一篇 下一篇

猜你喜欢

热点阅读