grpc使用自定义业务线程池

2021-05-21  本文已影响0人  江江的大猪
@Slf4j
@Component
public class GrpcServer {
    @Resource
    private Config config;
    @Resource
    private GrpcMetadataHandler metadataHandler;

    private ThreadPoolExecutor executorService;
    private Server server;

    // executorService拒绝任务时服务端会有错误日志,返回给客户端 http2.0 internal error,客户端会抛异常
    @PostConstruct
    private void init() {
        log.info("GrpcServer init");
        executorService = new ThreadPoolExecutor(
                config.getGrpcThreadCount(),
                config.getGrpcThreadCount(),
                0L, TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<>(config.getGrpcThreadQueue()),
                new CustomizableThreadFactory("GrpcServer"));
        server = NettyServerBuilder.forPort(config.getGrpcPort())
//                .addService(metadataHandler)
                .executor(executorService)
                .maxConnectionIdle(10, TimeUnit.MINUTES)
                .build();
        GlobalEventExecutor.INSTANCE.scheduleWithFixedDelay(() -> log.info("GrpcServer queue count:{}", executorService.getQueue().size()), 0, 1, TimeUnit.SECONDS);
    }

    @PreDestroy
    private void destroy() throws InterruptedException {
        log.info("GrpcServer shutdown ...");
        // 会阻塞到所有任务执行完,除非和客户端的连接断了,断了的话不会执行线程池中未完成的任务
        server.shutdown().awaitTermination();
        executorService.shutdown();
        while (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
            log.error("GrpcServer shutdown wait queue:{}", executorService.getQueue().size());
        }
        log.info("GrpcServer shutdown finished");
    }

    public void start() throws IOException {
        server.start();
    }
}
2021-05-19 22:24:01.868  WARN 2595 --- [-worker-ELG-3-3] i.g.n.s.i.grpc.netty.NettyServerHandler  : Exception in onHeadersRead()

java.util.concurrent.RejectedExecutionException: Task io.grpc.internal.SerializingExecutor@7cfe63 rejected from java.util.concurrent.ThreadPoolExecutor@7f241d90[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 9]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
    at io.grpc.internal.SerializingExecutor.schedule(SerializingExecutor.java:93)
    at io.grpc.internal.SerializingExecutor.execute(SerializingExecutor.java:86)
    at io.grpc.internal.ServerImpl$ServerTransportListenerImpl.streamCreatedInternal(ServerImpl.java:585)
    at io.grpc.internal.ServerImpl$ServerTransportListenerImpl.streamCreated(ServerImpl.java:475)
    at io.grpc.netty.shaded.io.grpc.netty.NettyServerHandler.onHeadersRead(NettyServerHandler.java:456)
    at io.grpc.netty.shaded.io.grpc.netty.NettyServerHandler.access$900(NettyServerHandler.java:101)
    at io.grpc.netty.shaded.io.grpc.netty.NettyServerHandler$FrameListener.onHeadersRead(NettyServerHandler.java:813)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onHeadersRead(DefaultHttp2ConnectionDecoder.java:373)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2InboundFrameLogger$1.onHeadersRead(Http2InboundFrameLogger.java:65)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader$1.processFragment(DefaultHttp2FrameReader.java:457)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.readHeadersFrame(DefaultHttp2FrameReader.java:464)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.processPayloadState(DefaultHttp2FrameReader.java:254)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.readFrame(DefaultHttp2FrameReader.java:160)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2InboundFrameLogger.readFrame(Http2InboundFrameLogger.java:41)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder.decodeFrame(DefaultHttp2ConnectionDecoder.java:174)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler$FrameDecoder.decode(Http2ConnectionHandler.java:378)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:438)
    at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:501)
    at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:440)
    at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    at io.grpc.netty.shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
    at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
    at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
    at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
    at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.grpc.netty.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)

2021-05-19 22:24:01.873  WARN 2595 --- [-worker-ELG-3-3] i.g.n.s.i.grpc.netty.NettyServerHandler  : Stream Error

io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2Exception$StreamException: Task io.grpc.internal.SerializingExecutor@7cfe63 rejected from java.util.concurrent.ThreadPoolExecutor@7f241d90[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 9]
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2Exception.streamError(Http2Exception.java:167)
    at io.grpc.netty.shaded.io.grpc.netty.NettyServerHandler.newStreamException(NettyServerHandler.java:774)
    at io.grpc.netty.shaded.io.grpc.netty.NettyServerHandler.onHeadersRead(NettyServerHandler.java:465)
    at io.grpc.netty.shaded.io.grpc.netty.NettyServerHandler.access$900(NettyServerHandler.java:101)
    at io.grpc.netty.shaded.io.grpc.netty.NettyServerHandler$FrameListener.onHeadersRead(NettyServerHandler.java:813)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onHeadersRead(DefaultHttp2ConnectionDecoder.java:373)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2InboundFrameLogger$1.onHeadersRead(Http2InboundFrameLogger.java:65)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader$1.processFragment(DefaultHttp2FrameReader.java:457)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.readHeadersFrame(DefaultHttp2FrameReader.java:464)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.processPayloadState(DefaultHttp2FrameReader.java:254)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.readFrame(DefaultHttp2FrameReader.java:160)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2InboundFrameLogger.readFrame(Http2InboundFrameLogger.java:41)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder.decodeFrame(DefaultHttp2ConnectionDecoder.java:174)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler$FrameDecoder.decode(Http2ConnectionHandler.java:378)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:438)
    at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:501)
    at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:440)
    at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    at io.grpc.netty.shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
    at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
    at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
    at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
    at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.grpc.netty.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.RejectedExecutionException: Task io.grpc.internal.SerializingExecutor@7cfe63 rejected from java.util.concurrent.ThreadPoolExecutor@7f241d90[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 9]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
    at io.grpc.internal.SerializingExecutor.schedule(SerializingExecutor.java:93)
    at io.grpc.internal.SerializingExecutor.execute(SerializingExecutor.java:86)
    at io.grpc.internal.ServerImpl$ServerTransportListenerImpl.streamCreatedInternal(ServerImpl.java:585)
    at io.grpc.internal.ServerImpl$ServerTransportListenerImpl.streamCreated(ServerImpl.java:475)
    at io.grpc.netty.shaded.io.grpc.netty.NettyServerHandler.onHeadersRead(NettyServerHandler.java:456)
    ... 31 common frames omitted

2021-05-19 22:24:01.874 DEBUG 2595 --- [-worker-ELG-3-3] i.g.n.s.i.grpc.netty.NettyServerHandler  : [id: 0x257fbde9, L:/127.0.0.1:10051 - R:/127.0.0.1:54061] OUTBOUND RST_STREAM: streamId=13 errorCode=2
2021-05-19 22:24:01.875 DEBUG 2595 --- [-worker-ELG-3-2] i.g.n.s.i.grpc.netty.NettyClientHandler  : [id: 0xe5f50b5a, L:/127.0.0.1:54061 - R:/127.0.0.1:10051] INBOUND RST_STREAM: streamId=13 errorCode=2
2021-05-19 22:24:01.875 DEBUG 2595 --- [-worker-ELG-3-3] i.g.n.s.i.grpc.netty.NettyServerHandler  : [id: 0x257fbde9, L:/127.0.0.1:10051 - R:/127.0.0.1:54061] INBOUND DATA: streamId=13 padding=0 endStream=true length=50 bytes=000000002d0a0868615f6461696c7912214d515f494e53545f313937333238313236393636313136305f4742746439557878
2021-05-19 22:24:01.876  WARN 2595 --- [-worker-ELG-3-3] i.g.n.s.i.grpc.netty.NettyServerHandler  : Stream Error

io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2Exception$StreamException: Received DATA frame for an unknown stream 13
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2Exception.streamError(Http2Exception.java:147)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.shouldIgnoreHeadersOrDataFrame(DefaultHttp2ConnectionDecoder.java:596)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onDataRead(DefaultHttp2ConnectionDecoder.java:239)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2InboundFrameLogger$1.onDataRead(Http2InboundFrameLogger.java:48)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.readDataFrame(DefaultHttp2FrameReader.java:422)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.processPayloadState(DefaultHttp2FrameReader.java:251)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2FrameReader.readFrame(DefaultHttp2FrameReader.java:160)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2InboundFrameLogger.readFrame(Http2InboundFrameLogger.java:41)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder.decodeFrame(DefaultHttp2ConnectionDecoder.java:174)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler$FrameDecoder.decode(Http2ConnectionHandler.java:378)
    at io.grpc.netty.shaded.io.netty.handler.codec.http2.Http2ConnectionHandler.decode(Http2ConnectionHandler.java:438)
    at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:501)
    at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:440)
    at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    at io.grpc.netty.shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
    at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
    at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
    at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
    at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.grpc.netty.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)

上一篇 下一篇

猜你喜欢

热点阅读