RPC学习
2019-08-05 本文已影响14人
大数据阶梯之路
一、什么是RPC
RPC(Remote Procudure Call),即远程过程调用。
简单来说,就比如有2个类,1个类A在服务端,1个类B在客户端,反正在不同的机器上,然后类A同样可以像本地调用一样地调用类B的方法,这就是RPC。
上图说明:做RPC我们大概需要3个角色:①客户端,②发布服务的接口,③服务的注册中心。
服务端通过字符串解析出该字符串代表的接口的一切信息,需要用到反射技术,而客户端和服务端之间进行通信调用,需要用到socket技术,服务端根据客户端不同请求返回不同的接口类型,此时客户端就要接收不同的接口类型,需要在客户端用到动态代理技术。
二、程序流程
- 1、先在一台机器定义服务接口,再定义一个服务接口实现类来实现逻辑业务
- 2、再在这台机器上写一个服务中心把服务给注册到这里,这个服务中心就相当于一个对外暴露访问的入口。
- 3、再另一台机器写一个获取动态代理对象的方法。
- 4、双方两台机器通过socket技术实现通信。向服务端发送请求,服务端接收请求并处理请求,然后发送请求结果给客户端。
三、代码Demo
HelloService.java
package rpc.server;
//hello接口
public interface HelloService {
public String sayHello(String name);
}
HelloServiceImpl
package rpc.server;
public class HelloServiceImpl implements HelloService {
@Override
public String sayHello(String name) {
return "hello"+name;
}
}
Server.java
package rpc.server;
//服务中心接口
public interface Server {
public void start();
public void stop();
public void register(Class service,Class serviceImpl);
}
ServerCenter.java
package rpc.server;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
//服务中心具体实现
public class ServerCenter implements Server {
//map存放着服务端所有发布服务的接口,到注册到这个map中(key为接口名,value为接口值)
private static HashMap<String,Class> serverRegister = new HashMap<>();
private static int port; //9999本地地址
//连接池:存放多个连接对象,每个连接对象处理一个客户请求,Runtime.getRuntime().availableProcessors()是获取用户请求数
private static ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
private static boolean isRunning = false;
public ServerCenter(int port){
this.port = port;
}
//开启服务端服务
@Override
public void start(){
//start-->线程对象
ServerSocket serverSocket = null;
try{
//运用socket技术在服务端暴露对外访问的接口
serverSocket = new ServerSocket();
serverSocket.bind(new InetSocketAddress(port));
isRunning = true; //服务已经启动
//循环操作多个具体的服务内容:接受客户端请求,处理请求,返回结果
while(true){
System.out.println("服务器开启...");
//客户端每请求一次连接(发出一次请求),服务端就从连接池中拿出一个线程对象来处理
Socket socket = null;
try {
socket = serverSocket.accept(); //等待客户端连接
} catch (IOException e) {
e.printStackTrace();
}
executorService.execute(new ServiceTask(socket)); //启动线程处理用户请求
}
} catch (IOException e){
e.printStackTrace();
}
}
private static class ServiceTask implements Runnable{
private Socket socket = null;
public ServiceTask(){
}
public ServiceTask(Socket socket){
this.socket = socket;
}
@Override
public void run() {
ObjectOutputStream objectOutputStream = null;
ObjectInputStream objectInputStream = null;
try{
//接收到客户端连接及请求,处理该请求
objectInputStream = new ObjectInputStream(socket.getInputStream()); //把字节流转换为序列化流(即对象流)
//因为ObjectInputStream对发送的数据顺序有严格要求,因此需要参照发送的顺序逐个接收
String serviceName = objectInputStream.readUTF();
String methodName = objectInputStream.readUTF();
Class[] parameterTypes = (Class[]) objectInputStream.readObject();
Object[] arguments = (Object[]) objectInputStream.readObject();
//根据客户端请求,到map中找到对应的具体接口
Class serviceClass = serverRegister.get(serviceName);
//在具体接口找到对应的方法
Method method = serviceClass.getMethod(methodName,parameterTypes);
//执行该方法,返回result结果
Object result = method.invoke(serviceClass.newInstance(),arguments);
//将执行结果result返回给客户端
objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
objectOutputStream.writeObject(result);
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
} catch (InstantiationException e) {
e.printStackTrace();
} catch (ClassNotFoundException e){
e.printStackTrace();
} catch (NoSuchMethodException e){
e.printStackTrace();
} catch (IOException e){
e.printStackTrace();
} finally {
try {
if(objectInputStream!=null)
objectInputStream.close();
if(objectOutputStream!=null)
objectOutputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
//停止服务端服务
@Override
public void stop() {
isRunning = false;
executorService.shutdown();
}
//注册服务接口
@Override
//Class类型是反射入口
public void register(Class service,Class serviceImpl) {
//运用了反射技术将服务的服务名拿到
serverRegister.put(service.getName(),serviceImpl);
}
}
Client.java
package rpc.client;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.net.Socket;
public class Client {
//客户端获取远程服务端接口的动态代理对象
/**
* serviceInterface代表着请求的接口名
* addr代表着IP:端口号 对象
* 返回值是动态代理对象
*/
public static <T> T getRemoteProxyObj(Class serviceInterface,InetSocketAddress addr){
/**
* Proxy.newProxyInstance(a,b,c)
* a:类加载器,表示需要代理的哪个类。就按照语法把该类的加载器给传入
* b:需要代理的对象,具备哪些方法。就把接口传入,因为方法在接口中都定义着,需要传入一个数组
* c:请求的对象
*/
return (T)Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class<?>[]{serviceInterface}, new InvocationHandler() {
@Override
/**proxy:代表代理的对象
* method:代表哪个接口方法(参数列表)
* arg:代表参数列表
* 返回值就是远程调用服务器方法的返回结果
*/
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
ObjectOutputStream objectOutputStream = null;
ObjectInputStream objectInputStream = null;
try {
//客户端向服务端发送请求
Socket socket = new Socket();
socket.connect(addr);
objectOutputStream = new ObjectOutputStream(socket.getOutputStream()); //把字节流转换为序列化流(即对象流)
//发送的接口名、方法名:是字符串所以用writeUTF()
objectOutputStream.writeUTF(serviceInterface.getName());
objectOutputStream.writeUTF(method.getName());
//方法参数的类型,方法参数 Object
objectOutputStream.writeObject(method.getParameterTypes());
objectOutputStream.writeObject(args);
//发送完毕等待服务端处理......
//接收服务端处理后的返回值
objectInputStream = new ObjectInputStream(socket.getInputStream());
return objectInputStream.readObject();
} catch (Exception e){
e.printStackTrace();
return null;
} finally {
try {
if(objectInputStream!=null)
objectInputStream.close();
if(objectOutputStream!=null)
objectOutputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
});
}
}
RPCServerTest.java
package rpc.test;
import rpc.server.HelloService;
import rpc.server.HelloServiceImpl;
import rpc.server.Server;
import rpc.server.ServerCenter;
public class RPCServerTest {
public static void main(String[] args){
new Thread(new Runnable() {
@Override
public void run() {
//服务中心
Server server = new ServerCenter(9999);
//将类注册到服务中心
server.register(HelloService.class, HelloServiceImpl.class);
server.start();
}
}).start();
}
}
RPCClientTest.java
package rpc.test;
import rpc.client.Client;
import rpc.server.HelloService;
import java.net.InetSocketAddress;
public class RPCClientTest{
public static void main(String[] args) throws ClassNotFoundException{
HelloService helloService = Client.getRemoteProxyObj(Class.forName("rpc.server.HelloService"),new InetSocketAddress("127.0.0.1",9999));
System.out.println(helloService.sayHello("小江"));
}
}