基于netty的rpc框架

2017-08-11  本文已影响0人  帅_zs

基于netty的rpc框架

[TOC]

如果你已经对以下东东有所了解,那么你就可以完成一个rpc框架了

神马是rpc?

实现思路

代码具体实现

  1. 首先我们需要为我们的网络请求分装两个JavaBean,分别为Request与Response.。

    //在Request中应有的属性
    private String requestId;
    private String className;
    private String methodName;
    private Class<?>[] parameterTypes;
    private Object[] parameters;
    //在response应该有的属性
    private String requestId;
    private Throwable error;
    private Object result;
    
  2. 创建RpcClient,封装我们的网络请求流程。其中最重要的是这个方法:

    public Response send(Request request) throws InterruptedException {
            ClientBootstrap bootstrap = new ClientBootstrap();
            ExecutorService boss = Executors.newCachedThreadPool();
            ExecutorService work = Executors.newCachedThreadPool();
    
            bootstrap.setFactory(new NioClientSocketChannelFactory(boss,work));
            bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
                @Override
                public ChannelPipeline getPipeline() throws Exception {
                    ChannelPipeline pipeline = Channels.pipeline();
                    pipeline.addLast("decoder",new ResponseDecoder());
                    pipeline.addLast("encoder",new RequestEncoder());
                    pipeline.addLast("handler",RpcClient.this);
                    return pipeline;
                }
            });
            ChannelFuture connect = bootstrap.connect(new InetSocketAddress(address, port)).sync();
            connect.getChannel().write(request).sync();
             //阻塞线程直到完成请求或者请求失败
            synchronized (obj){
                obj.wait();
            }
            connect.getChannel().close().sync();
    
            return this.response;
        }
    

    这里用netty3进行的网咯请求,这里ResponseDecoderRequestEncoder是对Response与Request进行的序列化与反序列化,采用的谷歌的Protostuff序列化框架实现(为啥不用java自带的序列化工具呢?因为java自定的序列化附带了很多其他信息,序列化的字节长度比谷歌的长好几倍,所以是为了节约带宽,同时Protostuff的序列化支持多种编程语言)

  3. 创建代理的工具类,返回代理对象。

    public <T>T proxy(Class<?> clazz){
            return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[] { clazz }, new InvocationHandler() {
                public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                    Request request = new Request();
                    request.setClassName(method.getDeclaringClass().getName());
                    request.setMethodName(method.getName());
                    request.setParameters(args);
                    request.setRequestId(UUID.randomUUID().toString());
                    request.setParameterTypes(method.getParameterTypes());
                    RpcClient client =new RpcClient(address,port);
                     //通过封装的网络框架进行网络请求
                    Response response = client.send(request);
                    if (response.getError()!=null){
                        throw response.getError();
                    }
                    else{
                        return response;
                    }
                }
            });
        }
    
  4. 服务端在开启服务的时候就需要通过spring扫描所有的service实现类,将其装进spring的容器中。

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            Map<String, Object> beansWithAnnotation = applicationContext.getBeansWithAnnotation(RPCService.class);
            for(Map.Entry<String,Object> entry :beansWithAnnotation.entrySet()){
                String interfaceName = entry.getValue().getClass()
                        .getAnnotation(RPCService.class).value().getName();
                serviceMap.put(interfaceName,entry.getValue());
            }
            startServer();
        }
    

    需要发布的服务类都需要使用@RPCService注解,这是一个自定义的注解。

  5. 在服务端收到客户端的网络请求之后,我们就需要从spring容器中找到请求的服务类完成调用并返回执行结果。

    @Override
        public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception {
            Request request = (Request) event.getMessage();
            Response response = new Response();
            //调用请求类的请求方法执行并返回执行结果
            Object invoke = null;
            try {
                Object requestBean = serviceMap.get(request.getClassName());
                Class<?> requestClazz = Class.forName(request.getClassName());
                Method method = requestClazz.getMethod(request.getMethodName(), request.getParameterTypes());
                invoke = method.invoke(requestBean, request.getParameters());
                response.setRequestId(UUID.randomUUID().toString());
                response.setResult(invoke);
            } catch (Exception e) {
                response.setError(e);
                response.setRequestId(UUID.randomUUID().toString());
            }
            System.out.println(request+""+response);
            //返回执行结果
            ctx.getChannel().write(response);
    
        }
    

总结

上一篇 下一篇

猜你喜欢

热点阅读