自己动手实现RPC框架

2019-05-31  本文已影响0人  sunpy

什么是RPC服务

就是机器A调用机器B上的程序,而机器A上的进程挂起,B机器在执行完返回后,将执行结果返回给A机器的进程,而A机器的进程获取执行结果继续执行下去的过程叫做RPC调用。通俗讲就是远程调用接口像在调用本地接口一样,方便,透明。

实现RPC框架

  1. 定义接口和方法实现
public interface IHelloRpc {
    
    public String helloRpc(String content);
    
}

public class HelloRpcImpl implements IHelloRpc{

    @Override
    public String helloRpc(String content) {
        System.out.println("Hello RPC");
        return content;
    }
}
  1. 设计提供者
    (1)监听指定ip和端口号的Socket,如果没有连接那么将一直阻塞。
    (2)使用每线程每连接模型,只是采用线程池进行线程的回收和创建。
    (3)将线程和任务分离。
public class HelloProvider {

    private static final Executor executor = Executors.newCachedThreadPool();
    
    public static void provide(String host, int port) {
        ServerSocket ss = null;
        
        try {
            ss = new ServerSocket();
            ss.bind(new InetSocketAddress(host, port));
            
            while(true) {
                executor.execute(new HelloTask(ss.accept()));
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                if (ss != null) {
                    ss.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}
  1. 定义任务
    (1)通过Socket套接字获取输入流,将输入流包装成对象输入流。
    (2)使用对象输入流获取接口类、参数类型数组、参数值数组。
    (3)使用反射方式执行指定接口的方法。
    (4)将执行结果写入到对象输出流。
public class HelloTask implements Runnable {

    Socket socket = null;
    
    public HelloTask(Socket socket) {
        this.socket = socket;
    }
    
    @Override
    public void run() {
        ObjectInputStream ois = null;
        ObjectOutputStream oos = null;
        
        try {
            // 获取套接字输入流,包装成对象输入流
            ois = new ObjectInputStream(socket.getInputStream());
            // 获取接口名称
            String interfaceName = ois.readUTF();
            // 获取接口类
            Class<?> service = Class.forName(interfaceName);
            // 获取方法名称
            String methodName = ois.readUTF();
            // 获取参数类型数组
            Class<?>[] parameterTypes = (Class<?>[]) ois.readObject();
            // 获取参数值数组
            Object[] args = (Object[]) ois.readObject();
            // 获取方法类
            Method method = service.getMethod(methodName, parameterTypes);
            // 执行对象的方法
            Object result = method.invoke(service.newInstance(), args);
            // 获取套接字输出流,包装成对象输出流
            oos = new ObjectOutputStream(socket.getOutputStream());
            // 将写出到套接字输出流
            oos.writeObject(result);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (oos != null) {
                    oos.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
            
            try {
                if (ois != null) {
                    ois.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
            
            try {
                if (socket != null) {
                    socket.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}
  1. 设计消费者
    (1)使用JDK代理,连接服务端,向socket中写入类、方法名称、参数类型数组、方法参数值。
    (2)对象输入流获取一个对象。
public class HelloConsumer<T> {
    
    @SuppressWarnings("unchecked")
    public T consume(Class<?> serviceClass, String host, int port) {
        return (T) Proxy.newProxyInstance(serviceClass.getClassLoader(), 
                new Class<?>[] {serviceClass.getInterfaces()[0]}, 
                new InvocationHandler() {
                    
                    @Override
                    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                        Socket socket = null;
                        ObjectOutputStream oos = null;
                        ObjectInputStream ois = null;
                        try {
                            socket = new Socket();
                            socket.connect(new InetSocketAddress(host, port));
                            
                            oos = new ObjectOutputStream(socket.getOutputStream());
                            // 往对象输出流写入类
                            oos.writeUTF(serviceClass.getName());
                            // 往对象输出流写入方法名称
                            oos.writeUTF(method.getName());
                            // 往对象输出流写入参数类型数组
                            oos.writeObject(method.getParameterTypes());
                            // 往对象输出流写入方法参数值args
                            oos.writeObject(args);
                            
                            ois = new ObjectInputStream(socket.getInputStream());
                            return ois.readObject();
                        } finally {
                            if (socket != null) {
                                socket.close();
                            }
                            
                            if (oos != null) {
                                oos.close();
                            }
                            
                            if (ois != null) {
                                ois.close();
                            }
                        }
                    }
                });
    }
}   
  1. 测试类
public class HelloTest {

    public static void main(String[] args) {
        new Thread(new Runnable() {

            @Override
            public void run() {
                HelloProvider.provide("localhost", 9999);
            }
        }).start();
        
        HelloConsumer<IHelloRpc> hc = new HelloConsumer<IHelloRpc>();
        IHelloRpc hr = hc.consume(HelloRpcImpl.class, "localhost", 9999);
        String content = hr.helloRpc("Hello World");
        System.out.println(content);
    }
}

结果:

Hello RPC
Hello World
上一篇 下一篇

猜你喜欢

热点阅读