SOFA Bolt源码分析 1-简单的服务启动和服务端启动设计

2019-02-24  本文已影响0人  折浪君

一、启动方式

1.业务逻辑处理器定义

/**

* 自定义的业务逻辑用户处理器

* SyncUserProcessor属于同步,soaf bolt还提供了异步的方式AsyncUserProcessor

* 二者的区别在于,前者需要在当前处理线程以return返回值的形式返回处理结果;而后者,有一个 AsyncContext 客户端的调用,调用 sendResponse 方法,内部通过穿件RCMD对象,可以在当前线程,也可以在异步线程,返回处理结果

* 如果一个处理器需要对多种数据模型感兴趣,两种方式,一种是使用基类的方式处理,第二种方式通过MultiInterestUserProcessor

*/

public class TestServerUserProcessorextends SyncUserProcessor {

@Override

    public Object handleRequest(BizContext bizCtx, TestRequest request)throws Exception {

TestResponse response =new TestResponse();

if (request !=null) {

System.out.println(request);

response.setResp("from server -> " + request.getReq());

}

return response;

}

/**

* 业务处理器感兴趣的消息体,

*/

    @Override

    public String interest() {

return TestRequest.class.getName();

}

}

2.服务端启动

//创建RpcServer实例,指定监听ip和port,指定是否使用链接管理器

RpcServer server =new RpcServer("127.0.0.1",8888,false);

//注册业务逻辑处理器 UserProcessor

server.registerUserProcessor(new TestServerUserProcessor());

//启动服务端

server.start();

3.server.start()原理分析

server.start()主要有两步

=doInit();

==this.addressParser =new RpcAddressParser();//初始化地址解析器

==this.connectionEventHandler =new ConnectionEventHandler(switches());//初始化链接事件处理器,主要负责入站出站数据的处理

==this.connectionEventHandler.setConnectionManager(this.connectionManager);//添加初始化的连接管理器到链接事件处理器

==this.connectionEventHandler.setConnectionEventListener(this.connectionEventListener);//添加时间监听器

==initRpcRemoting();//初始化远程调用对象RpcServerRemoting

==this.bootstrap =new ServerBootstrap();

    this.bootstrap.group(bossGroup,workerGroup)

    .channel(NettyEventLoopUtil.getServerSocketChannelClass())

.   option(ChannelOption.SO_BACKLOG, ConfigManager.tcp_so_backlog())//tcp等待队列大小,默认1024

    .option(ChannelOption.SO_REUSEADDR, ConfigManager.tcp_so_reuseaddr())//端口快速释放,默认true

    .childOption(ChannelOption.TCP_NODELAY, ConfigManager.tcp_nodelay())

    .childOption(ChannelOption.SO_KEEPALIVE, ConfigManager.tcp_so_keepalive());

==initWriteBufferWaterMark();//设置出站的低水位和高水位

==if (ConfigManager.netty_buffer_pooled()) { this.bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); } else { this.bootstrap.option(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT) .childOption(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT); }//根据配置决定是否使用对象池化

==NettyEventLoopUtil.enableTriggeredMode(bootstrap)//设置selector的epoll模式是边缘触发还是水平触发,默认水平触发

==this.bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override     protected void initChannel(SocketChannel channel) { ChannelPipeline pipeline =     channel.pipeline(); pipeline.addLast("decoder", codec.newDecoder());     pipeline.addLast("encoder", codec.newEncoder());if (idleSwitch) {     pipeline.addLast("idleStateHandler", new IdleStateHandler(0, 0, idleTime,     TimeUnit.MILLISECONDS));pipeline.addLast("serverIdleHandler", serverIdleHandler); } pipeline.addLast("connectionEventHandler", connectionEventHandler); pipeline.addLast("handler", rpcHandler); createConnection(channel); } /** * create connection operation<br> * <ul> * <li>If flag manageConnection be true, use {@link DefaultConnectionManager} to add a new connection, meanwhile bind it with the channel.</li> * <li>If flag manageConnection be false, just create a new connection and bind it with the channel.</li> * </ul> */ private void createConnection(SocketChannel channel) { Url url = addressParser.parse(RemotingUtil.parseRemoteAddress(channel)); if (switches().isOn(GlobalSwitch.SERVER_MANAGE_CONNECTION_SWITCH)) { connectionManager.add(new Connection(channel, url), url.getUniqueKey()); } else { new Connection(channel, url); } channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT); } });//添加加密、解密、心跳(原理后面讲)、远程请求(原理后面讲)处理器,并创建一个和监听端口绑定的链接(业务层面的绑定)

=doStart();

==this.channelFuture =this.bootstrap.bind(new InetSocketAddress(ip(), port())).sync();//绑定指定的端口

上一篇下一篇

猜你喜欢

热点阅读