Dubbo 2.7.5 消费线程模型优化
2020-02-16 本文已影响0人
晴天哥_王志
开篇
-
阅读本篇文章之前,建议先阅读Dubbo Consumer 响应过程,能够对Consumer处理响应的过程有个更好的理解。
-
Dubbo 2.7.5之前消费线程模型:基于每个Connection维护一个单独的线程池,所有的响应由线程池进行处理完成后通知业务线程。
-
Dubbo 2.7.5的消费线程模型:每个Connection不再维护线程池,所有的响应由业务线程自行处理。
Dubbo 2.7.5之前线程模型
Dubbo2.7.5之前线程模型 Dubbo2.7.5 之前线程模型- Dubbo 2.7.5 之前的线程模型,所有的响应都由Consumer线程池/Client侧线程池进行处理。
- Consumer侧线程池处理完响应后将响应赋值给请求侧对应的future对象中并唤醒等待的业务线程完成处理。
Dubbo2.7.5 线程模型
Dubbo2.7.5线程模型- Dubbo2.7.5的线程模型,所有的响应都由业务线程自行进行处理,不再由Consumer的线程池负责处理。
- Dubbo2.7.5的Client端不再为每个连接维护单独的线程池对象。
Consumer 请求流程
- 按照AsyncToSyncInvoker => DubboInvoker => HeaderExchangeChannel顺序请求。
AsyncToSyncInvoker
public class AsyncToSyncInvoker<T> implements Invoker<T> {
private Invoker<T> invoker;
public AsyncToSyncInvoker(Invoker<T> invoker) {
this.invoker = invoker;
}
@Override
public Class<T> getInterface() {
return invoker.getInterface();
}
@Override
public Result invoke(Invocation invocation) throws RpcException {
// DubboInvoker的invoke()方法并返回AsyncRpcResult对象
Result asyncResult = invoker.invoke(invocation);
try {
if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) {
asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
}
} catch (InterruptedException e) {
// 省略代码
} catch (ExecutionException e) {
// 省略代码
} catch (Throwable e) {
// 省略代码
}
return asyncResult;
}
}
- AsyncToSyncInvoker执行invoker.invoke()方法,invoker是DubboInvoker对象。
- AsyncToSyncInvoker执行invoker.invoke()返回AsyncRpcResult对象。
- 执行asyncResult.get()同步等待结果返回。
DubboInvoker
public class DubboInvoker<T> extends AbstractInvoker<T> {
private final ExchangeClient[] clients;
private final AtomicPositiveInteger index = new AtomicPositiveInteger();
private final String version;
private final ReentrantLock destroyLock = new ReentrantLock();
private final Set<Invoker<?>> invokers;
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(PATH_KEY, getUrl().getPath());
inv.setAttachment(VERSION_KEY, version);
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodPositiveParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else {
// 核心的getCallbackExecutor的线程池对象ThreadlessExecutor。
ExecutorService executor = getCallbackExecutor(getUrl(), inv);
CompletableFuture<AppResponse> appResponseFuture =
currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
// save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
FutureContext.getContext().setCompatibleFuture(appResponseFuture);
// 创建返回结果对象
AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
// 设置ThreadlessExecutor到AsyncRpcResult对象中。
result.setExecutor(executor);
return result;
}
} catch (TimeoutException e) {
} catch (RemotingException e) {
}
}
}
public abstract class AbstractInvoker<T> implements Invoker<T> {
protected ExecutorService getCallbackExecutor(URL url, Invocation inv) {
ExecutorService sharedExecutor = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension().getExecutor(url);
if (InvokeMode.SYNC == RpcUtils.getInvokeMode(getUrl(), inv)) {
return new ThreadlessExecutor(sharedExecutor);
} else {
return sharedExecutor;
}
}
protected abstract Result doInvoke(Invocation invocation) throws Throwable;
}
- DubboInvoker执行doInvoke()方法进行调用。
- getCallbackExecutor的线程池对象ThreadlessExecutor。
- currentClient.request()方法执行HeaderExchangeChannel的request()方法,返回CompletableFuture对象。
- 构建AsyncRpcResult对象并保存请求的future对象CompletableFuture。
HeaderExchangeChannel
final class HeaderExchangeChannel implements ExchangeChannel {
public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
// create request.
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setData(request);
// DefaultFuture会把请求生成对应的future并放到全局的futureMap当中
DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);
try {
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
}
- HeaderExchangeChannel的request()方法创建DefaultFuture对象。
DefaultFuture
public class DefaultFuture extends CompletableFuture<Object> {
private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<>();
private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<>();
public static final Timer TIME_OUT_TIMER = new HashedWheelTimer(
new NamedThreadFactory("dubbo-future-timeout", true),
30,
TimeUnit.MILLISECONDS);
// invoke id.
private final Long id;
private final Channel channel;
private final Request request;
private final int timeout;
private final long start = System.currentTimeMillis();
private volatile long sent;
private Timeout timeoutCheckTask;
private ExecutorService executor;
public ExecutorService getExecutor() {
return executor;
}
public void setExecutor(ExecutorService executor) {
this.executor = executor;
}
private DefaultFuture(Channel channel, Request request, int timeout) {
this.channel = channel;
this.request = request;
this.id = request.getId();
this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
// put into waiting map.
FUTURES.put(id, this);
CHANNELS.put(id, channel);
}
public static DefaultFuture newFuture(Channel channel, Request request, int timeout, ExecutorService executor) {
final DefaultFuture future = new DefaultFuture(channel, request, timeout);
future.setExecutor(executor);
// ThreadlessExecutor needs to hold the waiting future in case of circuit return.
if (executor instanceof ThreadlessExecutor) {
((ThreadlessExecutor) executor).setWaitingFuture(future);
}
// timeout check
timeoutCheck(future);
return future;
}
}
- DefaultFuture的对象创建会保存到全局的FUTURES。
Consumer 响应流程
AsyncRpcResult
public class AsyncRpcResult implements Result {
private RpcContext storedContext;
private RpcContext storedServerContext;
private Executor executor;
private Invocation invocation;
private CompletableFuture<AppResponse> responseFuture;
public AsyncRpcResult(CompletableFuture<AppResponse> future, Invocation invocation) {
this.responseFuture = future;
this.invocation = invocation;
this.storedContext = RpcContext.getContext();
this.storedServerContext = RpcContext.getServerContext();
}
@Override
public Result get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
if (executor != null && executor instanceof ThreadlessExecutor) {
ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;
// 业务线程同步等待
threadlessExecutor.waitAndDrain();
}
return responseFuture.get(timeout, unit);
}
}
- AsyncRpcResult的get()执行threadlessExecutor.waitAndDrain()同步等待。
- responseFuture.get()返回结果。
ThreadlessExecutor
public class ThreadlessExecutor extends AbstractExecutorService {
// 请求响应任务等待队列queue
private final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
private ExecutorService sharedExecutor;
private CompletableFuture<?> waitingFuture;
private volatile boolean waiting = true;
private final Object lock = new Object();
public ThreadlessExecutor(ExecutorService sharedExecutor) {
this.sharedExecutor = sharedExecutor;
}
public void waitAndDrain() throws InterruptedException {
// 同步等待响应请求到来
Runnable runnable = queue.take();
synchronized (lock) {
waiting = false;
// 执行响应任务
runnable.run();
}
runnable = queue.poll();
while (runnable != null) {
try {
runnable.run();
} catch (Throwable t) {
logger.info(t);
}
runnable = queue.poll();
}
}
@Override
public void execute(Runnable runnable) {
// 提交响应任务到业务线程的等待队列queue
synchronized (lock) {
// 如果不处于等待状态就提交共享线程
if (!waiting) {
sharedExecutor.execute(runnable);
} else {
// 如果处于等待状态就添加到等待队列
queue.add(runnable);
}
}
}
}
- ThreadlessExecutor的waitAndDrain()方法会同步等待响应请求的任务。
- ThreadlessExecutor的execute()方法提交并添加任务到等待队列queue当中。
AllChannelHandler
public class AllChannelHandler extends WrappedChannelHandler {
@Override
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService executor = getPreferredExecutorService(message);
try {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
if(message instanceof Request && t instanceof RejectedExecutionException){
sendFeedback(channel, (Request) message, t);
return;
}
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}
}
public class WrappedChannelHandler implements ChannelHandlerDelegate {
public ExecutorService getPreferredExecutorService(Object msg) {
if (msg instanceof Response) {
Response response = (Response) msg;
DefaultFuture responseFuture = DefaultFuture.getFuture(response.getId());
// a typical scenario is the response returned after timeout, the timeout response may has completed the future
if (responseFuture == null) {
return getSharedExecutorService();
} else {
ExecutorService executor = responseFuture.getExecutor();
if (executor == null || executor.isShutdown()) {
executor = getSharedExecutorService();
}
return executor;
}
} else {
return getSharedExecutorService();
}
}
}
- AllChannelHandler的received()方法会获取executor对象并执行execute()提交任务。
- AllChannelHandler的getPreferredExecutorService()方法获取future当中保存的executor对象,即ThreadlessExecutor对象。
- AllChannelHandler的executor.execute()执行的是ThreadlessExecutor的execute()方法,即提交到ThreadlessExecutor的queue当中。