打造自己的通信框架四——NettyServer搭建

2020-08-26  本文已影响0人  alonwang

前言

从客户端发出一条消息到服务端接收并处理这条消息,大概可以分成下面的流程

image.png

黄色部分为客户端逻辑,蓝色为网络传输,红色为服务端逻辑,本文关注的是服务端逻辑。

正文

将二进制解码为特定格式,将protobuf封装为自定义格式都是这个处理链的一个单元。
在Netty中,ChannelHandler充当了单元,ChannelPipeline充当处理链。处理链的构造如下

        ChannelPipeline pipeline = ch.pipeline();
        //闲置链接监听
        pipeline.addLast(new IdleStateHandler(60, 60, 0));
        //闲置链接处理单元
        pipeline.addLast(idleEventHandler);
        //1 protobuf解码单元
        pipeline.addLast(new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4));
        pipeline.addLast(protobufDecoder);
        //2 protobuf封装为自定义协议Request单元
        pipeline.addLast(protobufRequestDecoder);
        //3 Request分发执行单元
        pipeline.addLast(requestDispatchHandler);
        //two protobuuf编码单元
        pipeline.addLast(new LengthFieldPrepender(4));
        pipeline.addLast(protobufEncoder);
        //one Response转换为protobuf单元
        pipeline.addLast(responseEncoder);

标号1,2,3为我们需要关注的逻辑: 1.将二进制数据解码为protobuf -> 2.将protobuf编码为Request -> 3.Request分发执行
在具体实现上对应下面三点

  1. 从二进制数据转换为proto生成的java类Base.Request
  2. 从Base.Request转换为AbstractRequest的具体实现类
  3. 根据AbstractRequest找到对应的方法,通过反射调用

下面我们详解这三个过程

将二进制数据解码为protobuf

在这一步我们将二进制数据转换成之前定义的Base.Request
这里使用的两个ChannelHandler都是Netty提供的,不过多叙述

将protobuf编码为Request

在这一步,MessageFactory.createRequest()根据moduleId和commandId找出具体的Request,并用proto对象去填充数据。再将创建的AbstractMessage传递给下一层
MessageRegistry稍后解释)。

public class ProtobufRequestDecoder extends MessageToMessageDecoder<Request> {


    @Override
    protected void decode(ChannelHandlerContext ctx, Request msg, List<Object> out) throws Exception {
        int moduleId = msg.getModuleId();
        int commandId = msg.getCommandId();
        AbstractRequest request = MessageFactory.createRequest(moduleId, commandId, msg.getData());
        out.add(request);
    }
}

public static <T extends AbstractRequest> T createRequest(int moduleId, int commandId, Object body) {
        Class<? extends AbstractMessage> messageClazz = Context.getMessageRegistry().getMessage(moduleId, commandId);
        Preconditions.checkNotNull(messageClazz, "moduleId({}),commandId({}) no relate Message", moduleId, commandId);
        Preconditions.checkArgument(AbstractRequest.class.isAssignableFrom(messageClazz));
        try {
            Constructor<? extends AbstractMessage> constructor = messageClazz.getConstructor();
            AbstractMessage abstractMessage = constructor.newInstance();
            AbstractRequest abstractRequest = (AbstractRequest) abstractMessage;
            MessageHeader header = new RequestHeader(moduleId, commandId);
            abstractRequest.setHeader(header);
            abstractRequest.setBody(body);
            abstractRequest.decode();
            return (T) abstractRequest;
        } catch (Exception e) {
            log.error("create request error", e);
        }
        return null;
    }

Request分发执行

这一步将Channel转换为User,将转发的职责交给RequestDispatchService
channel2SessionMap记录了Channel和User的对应关系。User是客户端的唯一标识,里面记录了客户端的信息,并承载了异步串行无锁化的功能,具体实现后文会详述。

public class RequestDispatchHandler extends SimpleChannelInboundHandler<AbstractRequest> {
    private static Map<Channel, User> channel2SessionMap = new ConcurrentHashMap<>();

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, AbstractRequest msg) throws Exception {
        User user = channel2SessionMap.compute(ctx.channel(), (c, s) -> {
            if (null == s) {
                User createUser = new User();
                createUser.setChannel(c);
                return createUser;
            } else {
                return s;
            }

        });

        Context.getRequestDispatchService().dispatch(user, msg);
    }
}

RequestDispatchService也很简单,找到AbstractRequest对应的方法,再通过反射调用去执行。
user.execute(...)将这条消息放到用户的消息队列去执行

@Component
public class RequestDispatchService {
    @Autowired
    private MessageRegistry messageRegistry;

    public void dispatch(User user, AbstractRequest request) {
        MethodWrapper wrapper = messageRegistry.getWrapper(request.header().getModuleId(), request.header().getCommandId());
        Preconditions.checkNotNull(wrapper);
        user.execute(new MessageTask(wrapper, request));

    }
}

至此,服务端逻辑结束,下面来看一下这个过程中最核心的MessageRegistry

MessageRegistry

MessageRegistry有两个功能

moduleId+commandId到AbstractMessage的映射

举个例子
所有AbstractMessage的具体实现,都会带有@MessageWrapper注解,以此将具体的Message和moduleId+commandId关联起来,例如

 @MessageWrapper(moduleId = 1, commandId = 1)
 public class HelloRequest extends AbstractRequest {
    
 }

MessageRegistry会扫描类信息,将其关系记录下来。

        Reflections reflections = new Reflections(TransportStarter.class.getPackage().getName());
        //记录所有的Message
        Map<String, Class<? extends AbstractMessage>> tempMessageMap = new HashMap<>();
        Set<Class<?>> wrappersClasses = reflections.getTypesAnnotatedWith(MessageWrapper.class);
        for (Class<?> wrapperClazz : wrappersClasses) {
            Preconditions.checkArgument(!Modifier.isAbstract(wrapperClazz.getModifiers()), "{} illegal,@MessageWrapper annotated class can't be abstract", wrapperClazz.getSimpleName());
            Preconditions.checkArgument(AbstractMessage.class.isAssignableFrom(wrapperClazz), "{} illegal,@MessageWrapper annotated class must be sub type of AbstractCSMessage", wrapperClazz.getSimpleName());
            MessageWrapper wrapperAnnotation = wrapperClazz.getAnnotation(MessageWrapper.class);
            Preconditions.checkArgument(wrapperAnnotation.moduleId() > 0 && wrapperAnnotation.commandId() > 0);
            String key = getKey(wrapperAnnotation.moduleId(), wrapperAnnotation.commandId());
            if (tempMessageMap.containsKey(key)) {
                Class<? extends AbstractMessage> old = tempMessageMap.get(key);
                throw new IllegalArgumentException(old.getSimpleName() + " and " + wrapperClazz.getSimpleName() + " conflict,please check @MessageWrapper's moduleId and commandId");
            }
            tempMessageMap.put(key, (Class<? extends AbstractMessage>) wrapperClazz);
        }
        messages = Collections.unmodifiableMap(tempMessageMap);

计算并存储AbstractMessage到具体方法的映射

一个消息必定有一个对应的方法,@MessageHandler注解标识某个接口中的某些方法是和消息相关联的

@MessageHandler
public interface HelloService {
    void hello(User user, HelloRequest request);

}

MessageRegistry再将这个方法封装一下,和消息关联起来

        //解析所有message对应的方法
        Map<Class<? extends AbstractMessage>, MethodWrapper> tempMethodWrappers = new HashMap<>();
        Set<Class<?>> handlerClasses = reflections.getTypesAnnotatedWith(MessageHandler.class, true);
        for (Class<?> handlerClazz : handlerClasses) {
            Preconditions.checkArgument(Modifier.isInterface(handlerClazz.getModifiers()), "{} illegal,@MessageHandler annotated class must be interface");
            Object bean = applicationContext.getBean(handlerClazz);
            Preconditions.checkNotNull(bean, "{} annotated with @MessageHandler but no instance");
            var methods = handlerClazz.getDeclaredMethods();
            for (Method method : methods) {
                var parameterTypes = method.getParameterTypes();
                List<Class<?>> satisfyParameters = Arrays.stream(parameterTypes).filter(AbstractMessage.class::isAssignableFrom).filter(type -> messages.containsValue(type)).collect(Collectors.toList());
                if (satisfyParameters.isEmpty()) {
                    continue;
                }
                Preconditions.checkArgument(satisfyParameters.size() == 1, "method {} signature illegal,parameters should only contain exactly one AbstractCSMessage", method);
                Class<? extends AbstractMessage> messageClazz = (Class<? extends AbstractMessage>) satisfyParameters.get(0);
                Preconditions.checkArgument(!tempMethodWrappers.containsKey(messageClazz), "parameter illegal,{} appear in different methods", messageClazz);
                MethodWrapper methodWrapper = new MethodWrapper(method, bean);
                tempMethodWrappers.put(messageClazz, methodWrapper);
            }
        }
        messageMethods = Collections.unmodifiableMap(tempMethodWrappers);

后记

通过这套机制,打通了从消息接收到执行的一整套逻辑,开发者不需要关注消息是如何处理的,只要定义好必备的东西就好。

上一篇 下一篇

猜你喜欢

热点阅读