Java 杂谈深入浅出Netty源码剖析

【第23篇】Netty的ReplayingDecoder源码分析

2019-06-03  本文已影响1人  爱学习的蹭蹭

1、 ReplayingDecoder

public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder {
ReplayingDecoder
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
       replayable.setCumulation(in);
       try {
           while (in.isReadable()) {
               int oldReaderIndex = checkpoint = in.readerIndex();
               int outSize = out.size();

               if (outSize > 0) {
                   fireChannelRead(ctx, out, outSize);
                   out.clear();//清除
                   //在继续解码之前,检查这个处理程序是否已被删除。
                   //如果它被移除,继续在缓冲区上操作是不安全的。
                   // See:
                   // - https://github.com/netty/netty/issues/4635
                   if (ctx.isRemoved()) {
                       break;
                   }
                   outSize = 0;
               }

               S oldState = state;
               int oldInputLength = in.readableBytes();
               try {
                   //解码移除再入保护
                   decodeRemovalReentryProtection(ctx, replayable, out);
                   //在继续循环之前,检查是否删除了这个处理程序。
                   //如果它被移除,继续在缓冲区上操作是不安全的。
                   // See https://github.com/netty/netty/issues/1664
                   if (ctx.isRemoved()) {
                       break;
                   }

                   if (outSize == out.size()) {
                       if (oldInputLength == in.readableBytes() && oldState == state) {
                           throw new DecoderException(
                                   StringUtil.simpleClassName(getClass()) + ".decode() must consume the inbound " +
                                   "data or change its state if it did not decode anything.");
                       } else {
                           //以前的数据已被丢弃或导致状态转换。
                           //也许它还在继续读。
                           continue;
                       }
                   }
               } catch (Signal replay) {
                   replay.expect(REPLAY);
                   //在继续循环之前,检查是否删除了此处理程序。
                   //如果它被移除,继续在缓冲区上操作是不安全的。
                   // See https://github.com/netty/netty/issues/1664
                   if (ctx.isRemoved()) {
                       break;
                   }

                   //返回到这个检查点(或是旧的位置)和重试
                   int checkpoint = this.checkpoint;
                   if (checkpoint >= 0) {
                       in.readerIndex(checkpoint);
                   } else {
                       
                   }
                   break;
               }

               if (oldReaderIndex == in.readerIndex() && oldState == state) {
                   throw new DecoderException(
                          StringUtil.simpleClassName(getClass()) + ".decode() method must consume the inbound data " +
                          "or change its state if it decoded something.");
               }
               if (isSingleDecode()) {
                   break;
               }
           }
       } catch (DecoderException e) {
           throw e;
       } catch (Throwable cause) {
           throw new DecoderException(cause);
       }
   }

2、ReplayingDecoder如何工作?

3、ReplayingDecoder如何提高性能

4、ReplayingDecoder使用枚举Enum调用 checkpoint(T)

 public enum MyDecoderState {
     READ_LENGTH,
     READ_CONTENT;
   }
   
  public class IntegerHeaderFrameDecoder extends ReplayingDecoder<MyDecoderState> {
  
     private int length;
  
     public IntegerHeaderFrameDecoder() {
       // Set the initial state.
       super(MyDecoderState.READ_LENGTH);
     }
  
     @Override
     protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception {
       switch (state()) {
        case READ_LENGTH:
            length = buf.readInt();
            checkpoint(MyDecoderState.READ_CONTENT);
        case READ_CONTENT:
            ByteBuf frame = buf.readBytes(length);
            checkpoint(MyDecoderState.READ_LENGTH);
            out.add(frame);
         break;
       default:
         throw new Error("Shouldn't reach here.");
       }
     }
   }

5、ReplayingDecoder调用没有参数的checkpoint()方法

  public class IntegerHeaderFrameDecoder extends ReplayingDecoder<Void> {}

6、 ReplayingDecoder用管道中的另一个解码器替换一个解码器

 public class FirstDecoder extends ReplayingDecoder<Void> {
  
        @Override
       protected void decode(ChannelHandlerContext ctx,  ByteBuf buf, List<Object> out) {
           ...
           // Decode the first message
           Object firstMessage = ...;
  
           // Add the second decoder
           ctx.pipeline().addLast("second", new SecondDecoder());
  
           if (buf.isReadable()) {
               // Hand off the remaining data to the second decoder
               out.add(firstMessage);
               out.add(buf.readBytes(super.actualReadableBytes()));
           } else {
               // Nothing to hand off
               out.add(firstMessage);
           }
           // Remove the first decoder (me)
           ctx.pipeline().remove(this);
       }
}       

7、开发过程中编写解码器与编码器的建议(注意点)

上一篇 下一篇

猜你喜欢

热点阅读