SimpleRPC待完善

2021-03-19  本文已影响0人  养一只tom猫

common:

public interface HelloService {

    String hello(String message);

}

provider:

public class HelloServiceImpl implements HelloService {

    @Override
    public String hello(String message) {
        System.out.println(message);
        return "调用成功";
    }
}

server:

public class ServerBootstrap {

    public static void main(String[] args) {
        new NettyServer().start("127.0.0.1", 7000);
    }
}
public class NettyServer {

    public void start(String host, int port) {
        startServer0(host, port);
    }

    public static void startServer0(String host, int port) {
        EventLoopGroup boosGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(boosGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new StringDecoder());
                            ch.pipeline().addLast(new StringEncoder());
                            ch.pipeline().addLast(new NettyServerHandler());
                        }
                    });

            ChannelFuture channelFuture = serverBootstrap.bind(host, port).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            boosGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

}
public class NettyServerHandler extends ChannelInboundHandlerAdapter {


    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 找到接口实现类
        //调用
        //返回结果
        System.out.println("服务端接收到" + msg);
        if (msg.toString().startsWith("HelloService#hello#")){
            String result = new HelloServiceImpl().hello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1));
            ctx.writeAndFlush(result);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

client:

public class ClientBootstrap {

    public static void main(String[] args) {
        NettyClient nettyClient = new NettyClient();
        HelloService bean = (HelloService) nettyClient.getBean(HelloService.class, "HelloService#hello#");
        String result = bean.hello("1233211234567");
        System.out.println("result  = " + result);
    }
}
public class NettyClient {

    private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    private static NettyClientHandler client;

    public Object getBean(final Class<?> clazz, final String providerName) {
        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class<?>[]{clazz}, ((proxy, method, args) -> {
            if (client == null) {
                initClient();
            }
            //设置发送服务端信息
            client.setParam(providerName + args[0]);

            return executor.submit(client).get();
        }));
    }

    private static void initClient() {
        client = new NettyClientHandler();
        EventLoopGroup executors = new NioEventLoopGroup();

        Bootstrap bootstrap = new Bootstrap()
                .group(executors)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new StringDecoder());
                        ch.pipeline().addLast(new StringEncoder());
                        ch.pipeline().addLast(client);
                    }
                });
        try {
            bootstrap.connect("127.0.0.1", 7000).sync();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {

    private String result;

    private String param;

    private ChannelHandlerContext context;

    /**
        调用流程:
            1.首先通过channelActive给context赋值
            2.setParam 设置参数
            3.call方法,通过context将参数发送给服务端,并且等待
            4.channelRead服务端回复消息,将服务端会送的消息赋值给result,唤醒等待线程
            5.call被唤醒,返回result
     */

    @Override
    public synchronized Object call() throws Exception {
        System.out.println("call方法被调用");
        context.writeAndFlush(param);
        wait();
        return result;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelActive");
        context = ctx;
    }

    @Override
    public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("channelRead");
        result = msg.toString();
        notify();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }

    public void setParam(String param) {
        this.param = param;
    }
}
上一篇 下一篇

猜你喜欢

热点阅读