dubbo服务端接收消息处理

2020-06-14  本文已影响0人  剑道_7ffc

NettyHandler#messageReceived

从NettyServer的doOpen得到处理流程是NettyHandler。

public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
    NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
    try {
        handler.received(channel, e.getMessage());
    } finally {
        NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
    }
}

handler的对象是NettyClient(MultiMessageHandler(HeartbeatHandler(AllDispatcher(DecodeHandler(HeaderExchangeHandler(ExchangeHandlerAdapter)))))),所以进入DubboProtocol$ExchangeHandlerAdapter的received方法

HeaderExchangeHandler.received

public void received(Channel channel, Object message) throws RemotingException {
    channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
    final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
    try {
        if (message instanceof Request) {//请求方法
            // handle request.
            Request request = (Request) message;
            if (request.isEvent()) {
                handlerEvent(channel, request);
            } else {
                if (request.isTwoWay()) {//双向
                    handleRequest(exchangeChannel, request);
                } else {
                    handler.received(exchangeChannel, request.getData());
                }
            }
        } else if (message instanceof Response) {//响应方法
            handleResponse(channel, (Response) message);
        } else if (message instanceof String) {//字符串
            if (isClientSide(channel)) {
                Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                logger.error(e.getMessage(), e);
            } else {
                String echo = handler.telnet(channel, (String) message);
                if (echo != null && echo.length() > 0) {
                    channel.send(echo);
                }
            }
        } else {//接收
            handler.received(exchangeChannel, message);
        }
    } finally {
        HeaderExchangeChannel.removeChannelIfDisconnected(channel);
    }
}

DubboProtocol$ExchangeHandlerAdapter.received

public void received(Channel channel, Object message) throws RemotingException {
        //RpcInvocation
    if (message instanceof Invocation) {
        reply((ExchangeChannel) channel, message);

    } else {
        super.received(channel, message);
    }
}

ExchangeHandlerAdapter#reply

public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
    Invocation inv = (Invocation) message;
    //获取服务端对应的invoker
    Invoker<?> invoker = getInvoker(channel, inv);
    RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
    //调用方法
    Result result = invoker.invoke(inv);
    //结果返回
    return result.completionFuture().thenApply(Function.identity());
}

DubboProtocol#getInvoker

Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
    boolean isCallBackServiceInvoke = false;
    boolean isStubServiceInvoke = false;
    int port = channel.getLocalAddress().getPort();
    //接口名称
    String path = inv.getAttachments().get(PATH_KEY);

    String serviceKey = serviceKey(port, path, inv.getAttachments().get(VERSION_KEY), inv.getAttachments().get(GROUP_KEY));
    DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
    return exporter.getInvoker();
}

AbstractProtocol#exporterMap

protected final Map<String, Exporter<?>> exporterMap;

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    URL url = invoker.getUrl();
    // export service.
    //获取服务标识,理解成服务坐标也行。由服务组名,服务名,服务版本号以及端口组成如${group}/copm.my.practice.dubbo.ISayHelloService:${version}:20880
    String key = serviceKey(url);
    DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
    exporterMap.put(key, exporter);
    return exporter;
}

invoker.invoke(inv)

invoker=InvokerDelegate(DelegateProviderMetaDataInvoker(AbstractProxyInvoker()))

AbstractProxyInvoker

public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
    /**
     * proxy:接口的实现类的对象
     * type:接口的类型
     * url: registry://ip:port...
     */
    final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
    return new AbstractProxyInvoker<T>(proxy, type, url) {
        @Override
        protected Object doInvoke(T proxy, String methodName,
                                  Class<?>[] parameterTypes,
                                  Object[] arguments) throws Throwable {
            return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
        }
    };
}
public Object invokeMethod (Object o, String n, Class[]p, Object[]v) throws
java.lang.reflect.InvocationTargetException {
    com.my.dubbo.PayServiceImpl w;
    try {
        w = ((com.my.dubbo.PayServiceImpl) $1);
    } catch (Throwable e) {
        throw new IllegalArgumentException(e);
    }
    try {
        if ("pay".equals($2) && $3.length == 1) {
            return ($w) w.pay((java.lang.String) $4[0]);
        }
    } catch (Throwable e) {
        throw new java.lang.reflect.InvocationTargetException(e);
    }
    throw new org.apache.dubbo.common.bytecode.NoSuchMethodException("Not found method \"" + $2 + "\" in class com.my.dubbo.PayServiceImpl.");
}
上一篇下一篇

猜你喜欢

热点阅读