打造自己的通信框架四——NettyServer搭建
前言
从客户端发出一条消息到服务端接收并处理这条消息,大概可以分成下面的流程
image.png黄色部分为客户端逻辑,蓝色为网络传输,红色为服务端逻辑,本文关注的是服务端逻辑。
正文
将二进制解码为特定格式,将protobuf封装为自定义格式都是这个处理链的一个单元。
在Netty中,ChannelHandler充当了单元,ChannelPipeline充当处理链。处理链的构造如下
ChannelPipeline pipeline = ch.pipeline();
//闲置链接监听
pipeline.addLast(new IdleStateHandler(60, 60, 0));
//闲置链接处理单元
pipeline.addLast(idleEventHandler);
//1 protobuf解码单元
pipeline.addLast(new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4));
pipeline.addLast(protobufDecoder);
//2 protobuf封装为自定义协议Request单元
pipeline.addLast(protobufRequestDecoder);
//3 Request分发执行单元
pipeline.addLast(requestDispatchHandler);
//two protobuuf编码单元
pipeline.addLast(new LengthFieldPrepender(4));
pipeline.addLast(protobufEncoder);
//one Response转换为protobuf单元
pipeline.addLast(responseEncoder);
标号1,2,3为我们需要关注的逻辑: 1.将二进制数据解码为protobuf -> 2.将protobuf编码为Request -> 3.Request分发执行
在具体实现上对应下面三点
- 从二进制数据转换为proto生成的java类Base.Request
- 从Base.Request转换为AbstractRequest的具体实现类
- 根据AbstractRequest找到对应的方法,通过反射调用
下面我们详解这三个过程
将二进制数据解码为protobuf
在这一步我们将二进制数据转换成之前定义的Base.Request
这里使用的两个ChannelHandler都是Netty提供的,不过多叙述
- io.netty.handler.codec.LengthFieldBasedFrameDecoder
- io.netty.handler.codec.protobuf.ProtobufDecoder
将protobuf编码为Request
在这一步,MessageFactory.createRequest()
根据moduleId和commandId找出具体的Request,并用proto对象去填充数据。再将创建的AbstractMessage传递给下一层
(MessageRegistry稍后解释)。
public class ProtobufRequestDecoder extends MessageToMessageDecoder<Request> {
@Override
protected void decode(ChannelHandlerContext ctx, Request msg, List<Object> out) throws Exception {
int moduleId = msg.getModuleId();
int commandId = msg.getCommandId();
AbstractRequest request = MessageFactory.createRequest(moduleId, commandId, msg.getData());
out.add(request);
}
}
public static <T extends AbstractRequest> T createRequest(int moduleId, int commandId, Object body) {
Class<? extends AbstractMessage> messageClazz = Context.getMessageRegistry().getMessage(moduleId, commandId);
Preconditions.checkNotNull(messageClazz, "moduleId({}),commandId({}) no relate Message", moduleId, commandId);
Preconditions.checkArgument(AbstractRequest.class.isAssignableFrom(messageClazz));
try {
Constructor<? extends AbstractMessage> constructor = messageClazz.getConstructor();
AbstractMessage abstractMessage = constructor.newInstance();
AbstractRequest abstractRequest = (AbstractRequest) abstractMessage;
MessageHeader header = new RequestHeader(moduleId, commandId);
abstractRequest.setHeader(header);
abstractRequest.setBody(body);
abstractRequest.decode();
return (T) abstractRequest;
} catch (Exception e) {
log.error("create request error", e);
}
return null;
}
Request分发执行
这一步将Channel转换为User,将转发的职责交给RequestDispatchService
channel2SessionMap
记录了Channel和User的对应关系。User是客户端的唯一标识,里面记录了客户端的信息,并承载了异步串行无锁化的功能,具体实现后文会详述。
public class RequestDispatchHandler extends SimpleChannelInboundHandler<AbstractRequest> {
private static Map<Channel, User> channel2SessionMap = new ConcurrentHashMap<>();
@Override
protected void channelRead0(ChannelHandlerContext ctx, AbstractRequest msg) throws Exception {
User user = channel2SessionMap.compute(ctx.channel(), (c, s) -> {
if (null == s) {
User createUser = new User();
createUser.setChannel(c);
return createUser;
} else {
return s;
}
});
Context.getRequestDispatchService().dispatch(user, msg);
}
}
RequestDispatchService也很简单,找到AbstractRequest对应的方法,再通过反射调用去执行。
user.execute(...)
将这条消息放到用户的消息队列去执行
@Component
public class RequestDispatchService {
@Autowired
private MessageRegistry messageRegistry;
public void dispatch(User user, AbstractRequest request) {
MethodWrapper wrapper = messageRegistry.getWrapper(request.header().getModuleId(), request.header().getCommandId());
Preconditions.checkNotNull(wrapper);
user.execute(new MessageTask(wrapper, request));
}
}
至此,服务端逻辑结束,下面来看一下这个过程中最核心的MessageRegistry
MessageRegistry
MessageRegistry有两个功能
- 计算并存储moduleId+commandId到AbstractMessage的映射
- 计算并存储AbstractMessage到具体方法的映射
这样就从消息映射到了方法执行,处理消息 ~= 使用数据执行特定方法
moduleId+commandId到AbstractMessage的映射
举个例子
所有AbstractMessage的具体实现,都会带有@MessageWrapper
注解,以此将具体的Message和moduleId+commandId关联起来,例如
@MessageWrapper(moduleId = 1, commandId = 1)
public class HelloRequest extends AbstractRequest {
}
MessageRegistry会扫描类信息,将其关系记录下来。
Reflections reflections = new Reflections(TransportStarter.class.getPackage().getName());
//记录所有的Message
Map<String, Class<? extends AbstractMessage>> tempMessageMap = new HashMap<>();
Set<Class<?>> wrappersClasses = reflections.getTypesAnnotatedWith(MessageWrapper.class);
for (Class<?> wrapperClazz : wrappersClasses) {
Preconditions.checkArgument(!Modifier.isAbstract(wrapperClazz.getModifiers()), "{} illegal,@MessageWrapper annotated class can't be abstract", wrapperClazz.getSimpleName());
Preconditions.checkArgument(AbstractMessage.class.isAssignableFrom(wrapperClazz), "{} illegal,@MessageWrapper annotated class must be sub type of AbstractCSMessage", wrapperClazz.getSimpleName());
MessageWrapper wrapperAnnotation = wrapperClazz.getAnnotation(MessageWrapper.class);
Preconditions.checkArgument(wrapperAnnotation.moduleId() > 0 && wrapperAnnotation.commandId() > 0);
String key = getKey(wrapperAnnotation.moduleId(), wrapperAnnotation.commandId());
if (tempMessageMap.containsKey(key)) {
Class<? extends AbstractMessage> old = tempMessageMap.get(key);
throw new IllegalArgumentException(old.getSimpleName() + " and " + wrapperClazz.getSimpleName() + " conflict,please check @MessageWrapper's moduleId and commandId");
}
tempMessageMap.put(key, (Class<? extends AbstractMessage>) wrapperClazz);
}
messages = Collections.unmodifiableMap(tempMessageMap);
计算并存储AbstractMessage到具体方法的映射
一个消息必定有一个对应的方法,@MessageHandler
注解标识某个接口中的某些方法是和消息相关联的
@MessageHandler
public interface HelloService {
void hello(User user, HelloRequest request);
}
MessageRegistry再将这个方法封装一下,和消息关联起来
//解析所有message对应的方法
Map<Class<? extends AbstractMessage>, MethodWrapper> tempMethodWrappers = new HashMap<>();
Set<Class<?>> handlerClasses = reflections.getTypesAnnotatedWith(MessageHandler.class, true);
for (Class<?> handlerClazz : handlerClasses) {
Preconditions.checkArgument(Modifier.isInterface(handlerClazz.getModifiers()), "{} illegal,@MessageHandler annotated class must be interface");
Object bean = applicationContext.getBean(handlerClazz);
Preconditions.checkNotNull(bean, "{} annotated with @MessageHandler but no instance");
var methods = handlerClazz.getDeclaredMethods();
for (Method method : methods) {
var parameterTypes = method.getParameterTypes();
List<Class<?>> satisfyParameters = Arrays.stream(parameterTypes).filter(AbstractMessage.class::isAssignableFrom).filter(type -> messages.containsValue(type)).collect(Collectors.toList());
if (satisfyParameters.isEmpty()) {
continue;
}
Preconditions.checkArgument(satisfyParameters.size() == 1, "method {} signature illegal,parameters should only contain exactly one AbstractCSMessage", method);
Class<? extends AbstractMessage> messageClazz = (Class<? extends AbstractMessage>) satisfyParameters.get(0);
Preconditions.checkArgument(!tempMethodWrappers.containsKey(messageClazz), "parameter illegal,{} appear in different methods", messageClazz);
MethodWrapper methodWrapper = new MethodWrapper(method, bean);
tempMethodWrappers.put(messageClazz, methodWrapper);
}
}
messageMethods = Collections.unmodifiableMap(tempMethodWrappers);
后记
通过这套机制,打通了从消息接收到执行的一整套逻辑,开发者不需要关注消息是如何处理的,只要定义好必备的东西就好。