Java 杂谈

RPC学习

2019-08-05  本文已影响14人  大数据阶梯之路

一、什么是RPC

RPC(Remote Procudure Call),即远程过程调用。
简单来说,就比如有2个类,1个类A在服务端,1个类B在客户端,反正在不同的机器上,然后类A同样可以像本地调用一样地调用类B的方法,这就是RPC。

image.png

上图说明:做RPC我们大概需要3个角色:①客户端,②发布服务的接口,③服务的注册中心。
服务端通过字符串解析出该字符串代表的接口的一切信息,需要用到反射技术,而客户端和服务端之间进行通信调用,需要用到socket技术,服务端根据客户端不同请求返回不同的接口类型,此时客户端就要接收不同的接口类型,需要在客户端用到动态代理技术。

二、程序流程

三、代码Demo

HelloService.java

package rpc.server;

//hello接口
public interface HelloService {
    public String sayHello(String name);
}

HelloServiceImpl

package rpc.server;

public class HelloServiceImpl implements HelloService {
    @Override
    public String sayHello(String name) {
        return "hello"+name;
    }
}

Server.java

package rpc.server;

//服务中心接口
public interface Server {
    public void start();

    public void stop();

    public void register(Class service,Class serviceImpl);

}

ServerCenter.java

package rpc.server;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

//服务中心具体实现
public class ServerCenter implements Server {

    //map存放着服务端所有发布服务的接口,到注册到这个map中(key为接口名,value为接口值)
    private static HashMap<String,Class> serverRegister = new HashMap<>();
    private static int port;   //9999本地地址
    //连接池:存放多个连接对象,每个连接对象处理一个客户请求,Runtime.getRuntime().availableProcessors()是获取用户请求数
    private static ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    private static boolean isRunning = false;
    public ServerCenter(int port){
        this.port = port;
    }

    //开启服务端服务
    @Override
    public void start(){
        //start-->线程对象
        ServerSocket serverSocket = null;
        try{
            //运用socket技术在服务端暴露对外访问的接口
            serverSocket = new ServerSocket();
            serverSocket.bind(new InetSocketAddress(port));
            isRunning = true;   //服务已经启动
            //循环操作多个具体的服务内容:接受客户端请求,处理请求,返回结果
            while(true){
                System.out.println("服务器开启...");
                //客户端每请求一次连接(发出一次请求),服务端就从连接池中拿出一个线程对象来处理
                Socket socket = null;
                try {
                    socket = serverSocket.accept();   //等待客户端连接
                } catch (IOException e) {
                    e.printStackTrace();
                }
                executorService.execute(new ServiceTask(socket));    //启动线程处理用户请求
            }

        } catch (IOException e){
            e.printStackTrace();
        }
    }

    private static class ServiceTask implements Runnable{

        private Socket socket = null;
        public ServiceTask(){

        }
        public ServiceTask(Socket socket){
            this.socket = socket;
        }
        @Override
        public void run() {
            ObjectOutputStream objectOutputStream = null;
            ObjectInputStream objectInputStream = null;
            try{
                //接收到客户端连接及请求,处理该请求
                objectInputStream = new ObjectInputStream(socket.getInputStream()); //把字节流转换为序列化流(即对象流)
                //因为ObjectInputStream对发送的数据顺序有严格要求,因此需要参照发送的顺序逐个接收
                String serviceName = objectInputStream.readUTF();
                String methodName = objectInputStream.readUTF();
                Class[] parameterTypes = (Class[]) objectInputStream.readObject();
                Object[] arguments = (Object[]) objectInputStream.readObject();
                //根据客户端请求,到map中找到对应的具体接口
                Class serviceClass = serverRegister.get(serviceName);
                //在具体接口找到对应的方法
                Method method = serviceClass.getMethod(methodName,parameterTypes);
                //执行该方法,返回result结果
                Object result = method.invoke(serviceClass.newInstance(),arguments);
                //将执行结果result返回给客户端
                objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
                objectOutputStream.writeObject(result);
            } catch (IllegalAccessException e) {
                e.printStackTrace();
            } catch (InvocationTargetException e) {
                e.printStackTrace();
            } catch (InstantiationException e) {
                e.printStackTrace();
            } catch (ClassNotFoundException e){
                e.printStackTrace();
            } catch (NoSuchMethodException e){
                e.printStackTrace();
            } catch (IOException e){
                e.printStackTrace();
            } finally {
                try {
                    if(objectInputStream!=null)
                        objectInputStream.close();
                    if(objectOutputStream!=null)
                        objectOutputStream.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    //停止服务端服务
    @Override
    public void stop() {
        isRunning = false;
        executorService.shutdown();
    }

    //注册服务接口
    @Override
    //Class类型是反射入口
    public void register(Class service,Class serviceImpl) {
        //运用了反射技术将服务的服务名拿到
        serverRegister.put(service.getName(),serviceImpl);
    }
}

Client.java

package rpc.client;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.net.Socket;

public class Client {

    //客户端获取远程服务端接口的动态代理对象
    /**
     * serviceInterface代表着请求的接口名
     * addr代表着IP:端口号  对象
     * 返回值是动态代理对象
     */
    public static <T> T getRemoteProxyObj(Class serviceInterface,InetSocketAddress addr){
        /**
         * Proxy.newProxyInstance(a,b,c)
         * a:类加载器,表示需要代理的哪个类。就按照语法把该类的加载器给传入
         * b:需要代理的对象,具备哪些方法。就把接口传入,因为方法在接口中都定义着,需要传入一个数组
         * c:请求的对象
         */
        return (T)Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class<?>[]{serviceInterface}, new InvocationHandler() {
            @Override
            /**proxy:代表代理的对象
             * method:代表哪个接口方法(参数列表)
             * arg:代表参数列表
             * 返回值就是远程调用服务器方法的返回结果
             */
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                ObjectOutputStream objectOutputStream = null;
                ObjectInputStream objectInputStream = null;
                try {
                    //客户端向服务端发送请求
                    Socket socket = new Socket();
                    socket.connect(addr);
                    objectOutputStream = new ObjectOutputStream(socket.getOutputStream()); //把字节流转换为序列化流(即对象流)
                    //发送的接口名、方法名:是字符串所以用writeUTF()
                    objectOutputStream.writeUTF(serviceInterface.getName());
                    objectOutputStream.writeUTF(method.getName());
                    //方法参数的类型,方法参数 Object
                    objectOutputStream.writeObject(method.getParameterTypes());
                    objectOutputStream.writeObject(args);
                    //发送完毕等待服务端处理......

                    //接收服务端处理后的返回值
                    objectInputStream = new ObjectInputStream(socket.getInputStream());
                    return objectInputStream.readObject();
                } catch (Exception e){
                    e.printStackTrace();
                    return null;
                } finally {
                    try {
                        if(objectInputStream!=null)
                            objectInputStream.close();
                        if(objectOutputStream!=null)
                            objectOutputStream.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
    }
}

RPCServerTest.java

package rpc.test;

import rpc.server.HelloService;
import rpc.server.HelloServiceImpl;
import rpc.server.Server;
import rpc.server.ServerCenter;

public class RPCServerTest {
    public static void main(String[] args){
        new Thread(new Runnable() {
            @Override
            public void run() {
                //服务中心
                Server server = new ServerCenter(9999);
                //将类注册到服务中心
                server.register(HelloService.class, HelloServiceImpl.class);
                server.start();
            }
        }).start();
    }
}

RPCClientTest.java

package rpc.test;

import rpc.client.Client;
import rpc.server.HelloService;

import java.net.InetSocketAddress;

public class RPCClientTest{
    public static void main(String[] args) throws ClassNotFoundException{
            HelloService helloService = Client.getRemoteProxyObj(Class.forName("rpc.server.HelloService"),new InetSocketAddress("127.0.0.1",9999));
            System.out.println(helloService.sayHello("小江"));
    }
}
上一篇下一篇

猜你喜欢

热点阅读