码农的世界

手写RPC框架

2018-09-08  本文已影响0人  匠丶

在分析RMI原理一文中,我们知道RMI是通过底层封装TCP网络通信实现。
基于此思路本文从以下切入点实现一个简单的RPC框架,反之也促进了对RMI的理解,相辅相成。

服务端

服务端通过端口发布一个服务,监听端口请求,通过反射调用本地方法,并返回结果给调用方。
1、定义要发布的接口和接口实现类

public interface IHello {
    String say();
}

public class IHelloImpl implements IHello {
    @Override
    public String say() {
        return "Hello";
    }
}

public class ServerDemo {
    public static void main(String[] args) {
        IHello iHello = new IHelloImpl();
        RpcServer rpcServer = new RpcServer();
        rpcServer.publisher(iHello,8080);
    }
}

2、定义发布中心,通过线程池来监听请求

public class RpcServer {
    //创建线程池
    private static final ExecutorService executorService=Executors.newCachedThreadPool();

    public void publisher(Object service,Integer port){
        try {
            ServerSocket serverSocket = new ServerSocket(port);
            //循环监听
            while (true){
                Socket socket = serverSocket.accept();
                executorService.submit(new ProcessorHandler(socket,service));
            }

        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

3、处理请求,通过反射得到结果,并将结果传递给客户端

public class ProcessorHandler implements Runnable {
    private Socket socket;

    private Object service;

    public ProcessorHandler(Socket socket, Object service) {
        this.socket = socket;
        this.service = service;
    }

    @Override
    public void run(){
        try {
            ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
            RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject();
            Object result = invoke(rpcRequest);

            ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
            objectOutputStream.writeObject(result);
            objectOutputStream.flush();
            objectOutputStream.close();
            objectInputStream.close();

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

 private Object invoke(RpcRequest rpcRequest) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
        Object[] args = rpcRequest.getParams();
        Class[] types = null;
        if(args != null){
            types = new Class[args.length];
            for(int i=0 ; i<args.length;i++){
                types[i] = args[i].getClass();
            }
        }

        Method method = service.getClass().getMethod(rpcRequest.getMethod(),types);
        return method.invoke(service,args);
    }
}

客户端

寻找服务,发送请求,得到结果。
1、定义代理,通过代理调用服务端接口

public class RpcClientProxy {

    public <T> T clientProxy(Class<T> target,String host,Integer port){
        return (T)Proxy.newProxyInstance(target.getClassLoader(),new Class[]{target},new RemoteInvocationHandler(host,port));
    }
}

public class RemoteInvocationHandler implements InvocationHandler {
    public String host;

    public Integer port;

    public RemoteInvocationHandler(String host, Integer port) {
        this.host = host;
        this.port = port;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
       RpcRequest rpcRequest = new RpcRequest();
       rpcRequest.setName(method.getDeclaringClass().getName());
       rpcRequest.setMethod(method.getName());
       rpcRequest.setParams(args);
       TCPTransport tcpTransport = new TCPTransport(host,port);
       return tcpTransport.send(rpcRequest);
    }

2、封装网络通信过程

public class TCPTransport {
    public String host;

    public Integer port;

    public TCPTransport(String host, Integer port) {
        this.host = host;
        this.port = port;
    }
    private Socket newSocket(){
        try {
            return new Socket(host,port);
        } catch (IOException e) {
            throw new RuntimeException("连接建立失败");
        }
    }

    public Object send(RpcRequest rpcRequest){
        Socket socket = newSocket();
        //发送请求到服务端
        try {
            ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
            objectOutputStream.writeObject(rpcRequest);
            objectOutputStream.flush();
            ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
            Object result = objectInputStream.readObject();
            objectInputStream.close();
            objectOutputStream.close();
            return result;

        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException("发起远程调用异常",e);
        } finally {
            try {
                socket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}


public class ClientDemo {
    public static void main(String[] args) {
        RpcClientProxy rpcClientProxy = new RpcClientProxy();
        IHello iHello =  rpcClientProxy.clientProxy(IHello.class,"localhost",8080);
        System.out.println(iHello.say());

    }
}

上一篇下一篇

猜你喜欢

热点阅读