dubbo

Dubbo 2.7.5 消费线程模型优化

2020-02-16  本文已影响0人  晴天哥_王志

开篇

Dubbo 2.7.5之前线程模型

Dubbo2.7.5之前线程模型 Dubbo2.7.5 之前线程模型

Dubbo2.7.5 线程模型

Dubbo2.7.5线程模型

Consumer 请求流程

AsyncToSyncInvoker

public class AsyncToSyncInvoker<T> implements Invoker<T> {

    private Invoker<T> invoker;

    public AsyncToSyncInvoker(Invoker<T> invoker) {
        this.invoker = invoker;
    }

    @Override
    public Class<T> getInterface() {
        return invoker.getInterface();
    }

    @Override
    public Result invoke(Invocation invocation) throws RpcException {
        // DubboInvoker的invoke()方法并返回AsyncRpcResult对象
        Result asyncResult = invoker.invoke(invocation);

        try {
            if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) {
                asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
            }
        } catch (InterruptedException e) {
            // 省略代码
        } catch (ExecutionException e) {
            // 省略代码
        } catch (Throwable e) {
            // 省略代码
        }

        return asyncResult;
    }
}

DubboInvoker

public class DubboInvoker<T> extends AbstractInvoker<T> {

    private final ExchangeClient[] clients;
    private final AtomicPositiveInteger index = new AtomicPositiveInteger();
    private final String version;
    private final ReentrantLock destroyLock = new ReentrantLock();
    private final Set<Invoker<?>> invokers;

    @Override
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment(PATH_KEY, getUrl().getPath());
        inv.setAttachment(VERSION_KEY, version);

        ExchangeClient currentClient;
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = getUrl().getMethodPositiveParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);
            if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                return AsyncRpcResult.newDefaultAsyncResult(invocation);
            } else {
                // 核心的getCallbackExecutor的线程池对象ThreadlessExecutor。
                ExecutorService executor = getCallbackExecutor(getUrl(), inv);

                CompletableFuture<AppResponse> appResponseFuture =
                        currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);

                // save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
                FutureContext.getContext().setCompatibleFuture(appResponseFuture);

                // 创建返回结果对象
                AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);

                // 设置ThreadlessExecutor到AsyncRpcResult对象中。
                result.setExecutor(executor);

                return result;
            }
        } catch (TimeoutException e) {
           
        } catch (RemotingException e) {
            
        }
    }
}



public abstract class AbstractInvoker<T> implements Invoker<T> {

    protected ExecutorService getCallbackExecutor(URL url, Invocation inv) {
        ExecutorService sharedExecutor = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension().getExecutor(url);
        if (InvokeMode.SYNC == RpcUtils.getInvokeMode(getUrl(), inv)) {
            return new ThreadlessExecutor(sharedExecutor);
        } else {
            return sharedExecutor;
        }
    }

    protected abstract Result doInvoke(Invocation invocation) throws Throwable;
}

HeaderExchangeChannel

final class HeaderExchangeChannel implements ExchangeChannel {

    public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
        if (closed) {
            throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
        }
        // create request.
        Request req = new Request();
        req.setVersion(Version.getProtocolVersion());
        req.setTwoWay(true);
        req.setData(request);
        // DefaultFuture会把请求生成对应的future并放到全局的futureMap当中
        DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);
        try {
            channel.send(req);
        } catch (RemotingException e) {
            future.cancel();
            throw e;
        }
        return future;
    }
}

DefaultFuture

public class DefaultFuture extends CompletableFuture<Object> {

    private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<>();
    private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<>();
    public static final Timer TIME_OUT_TIMER = new HashedWheelTimer(
            new NamedThreadFactory("dubbo-future-timeout", true),
            30,
            TimeUnit.MILLISECONDS);

    // invoke id.
    private final Long id;
    private final Channel channel;
    private final Request request;
    private final int timeout;
    private final long start = System.currentTimeMillis();
    private volatile long sent;
    private Timeout timeoutCheckTask;

    private ExecutorService executor;

    public ExecutorService getExecutor() {
        return executor;
    }

    public void setExecutor(ExecutorService executor) {
        this.executor = executor;
    }

    private DefaultFuture(Channel channel, Request request, int timeout) {
        this.channel = channel;
        this.request = request;
        this.id = request.getId();
        this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
        // put into waiting map.
        FUTURES.put(id, this);
        CHANNELS.put(id, channel);
    }

    public static DefaultFuture newFuture(Channel channel, Request request, int timeout, ExecutorService executor) {
        final DefaultFuture future = new DefaultFuture(channel, request, timeout);
        future.setExecutor(executor);
        // ThreadlessExecutor needs to hold the waiting future in case of circuit return.
        if (executor instanceof ThreadlessExecutor) {
            ((ThreadlessExecutor) executor).setWaitingFuture(future);
        }
        // timeout check
        timeoutCheck(future);
        return future;
    }
}

Consumer 响应流程

AsyncRpcResult

public class AsyncRpcResult implements Result {

    private RpcContext storedContext;
    private RpcContext storedServerContext;
    private Executor executor;
    private Invocation invocation;
    private CompletableFuture<AppResponse> responseFuture;

    public AsyncRpcResult(CompletableFuture<AppResponse> future, Invocation invocation) {
        this.responseFuture = future;
        this.invocation = invocation;
        this.storedContext = RpcContext.getContext();
        this.storedServerContext = RpcContext.getServerContext();
    }

    @Override
    public Result get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        if (executor != null && executor instanceof ThreadlessExecutor) {
            ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;
            // 业务线程同步等待
            threadlessExecutor.waitAndDrain();
        }
        return responseFuture.get(timeout, unit);
    }
}

ThreadlessExecutor

public class ThreadlessExecutor extends AbstractExecutorService {
    // 请求响应任务等待队列queue
    private final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
    private ExecutorService sharedExecutor;
    private CompletableFuture<?> waitingFuture;
    private volatile boolean waiting = true;
    private final Object lock = new Object();

    public ThreadlessExecutor(ExecutorService sharedExecutor) {
        this.sharedExecutor = sharedExecutor;
    }

    public void waitAndDrain() throws InterruptedException {
        // 同步等待响应请求到来
        Runnable runnable = queue.take();

        synchronized (lock) {
            waiting = false;
            // 执行响应任务
            runnable.run();
        }

        runnable = queue.poll();
        while (runnable != null) {
            try {
                runnable.run();
            } catch (Throwable t) {
                logger.info(t);

            }
            runnable = queue.poll();
        }
    }

    @Override
    public void execute(Runnable runnable) {
        // 提交响应任务到业务线程的等待队列queue
        synchronized (lock) {
            // 如果不处于等待状态就提交共享线程
            if (!waiting) {
                sharedExecutor.execute(runnable);
            } else {
                // 如果处于等待状态就添加到等待队列
                queue.add(runnable);
            }
        }
    }
}

AllChannelHandler

public class AllChannelHandler extends WrappedChannelHandler {
    @Override
    public void received(Channel channel, Object message) throws RemotingException {

        ExecutorService executor = getPreferredExecutorService(message);
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            if(message instanceof Request && t instanceof RejectedExecutionException){
                sendFeedback(channel, (Request) message, t);
                return;
            }
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }
}


public class WrappedChannelHandler implements ChannelHandlerDelegate {

    public ExecutorService getPreferredExecutorService(Object msg) {

        if (msg instanceof Response) {
            Response response = (Response) msg;
            DefaultFuture responseFuture = DefaultFuture.getFuture(response.getId());
            // a typical scenario is the response returned after timeout, the timeout response may has completed the future
            if (responseFuture == null) {
                return getSharedExecutorService();
            } else {
                ExecutorService executor = responseFuture.getExecutor();
                if (executor == null || executor.isShutdown()) {
                    executor = getSharedExecutorService();
                }
                return executor;
            }
        } else {
            return getSharedExecutorService();
        }
    }
}

参考文章

上一篇下一篇

猜你喜欢

热点阅读