自己动手实现RPC框架
2019-05-31 本文已影响0人
sunpy
什么是RPC服务
就是机器A调用机器B上的程序,而机器A上的进程挂起,B机器在执行完返回后,将执行结果返回给A机器的进程,而A机器的进程获取执行结果继续执行下去的过程叫做RPC调用。通俗讲就是远程调用接口像在调用本地接口一样,方便,透明。
实现RPC框架
- 定义接口和方法实现
public interface IHelloRpc {
public String helloRpc(String content);
}
public class HelloRpcImpl implements IHelloRpc{
@Override
public String helloRpc(String content) {
System.out.println("Hello RPC");
return content;
}
}
- 设计提供者
(1)监听指定ip和端口号的Socket,如果没有连接那么将一直阻塞。
(2)使用每线程每连接模型,只是采用线程池进行线程的回收和创建。
(3)将线程和任务分离。
public class HelloProvider {
private static final Executor executor = Executors.newCachedThreadPool();
public static void provide(String host, int port) {
ServerSocket ss = null;
try {
ss = new ServerSocket();
ss.bind(new InetSocketAddress(host, port));
while(true) {
executor.execute(new HelloTask(ss.accept()));
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (ss != null) {
ss.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
- 定义任务
(1)通过Socket套接字获取输入流,将输入流包装成对象输入流。
(2)使用对象输入流获取接口类、参数类型数组、参数值数组。
(3)使用反射方式执行指定接口的方法。
(4)将执行结果写入到对象输出流。
public class HelloTask implements Runnable {
Socket socket = null;
public HelloTask(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
ObjectInputStream ois = null;
ObjectOutputStream oos = null;
try {
// 获取套接字输入流,包装成对象输入流
ois = new ObjectInputStream(socket.getInputStream());
// 获取接口名称
String interfaceName = ois.readUTF();
// 获取接口类
Class<?> service = Class.forName(interfaceName);
// 获取方法名称
String methodName = ois.readUTF();
// 获取参数类型数组
Class<?>[] parameterTypes = (Class<?>[]) ois.readObject();
// 获取参数值数组
Object[] args = (Object[]) ois.readObject();
// 获取方法类
Method method = service.getMethod(methodName, parameterTypes);
// 执行对象的方法
Object result = method.invoke(service.newInstance(), args);
// 获取套接字输出流,包装成对象输出流
oos = new ObjectOutputStream(socket.getOutputStream());
// 将写出到套接字输出流
oos.writeObject(result);
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (oos != null) {
oos.close();
}
} catch (IOException e) {
e.printStackTrace();
}
try {
if (ois != null) {
ois.close();
}
} catch (IOException e) {
e.printStackTrace();
}
try {
if (socket != null) {
socket.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
- 设计消费者
(1)使用JDK代理,连接服务端,向socket中写入类、方法名称、参数类型数组、方法参数值。
(2)对象输入流获取一个对象。
public class HelloConsumer<T> {
@SuppressWarnings("unchecked")
public T consume(Class<?> serviceClass, String host, int port) {
return (T) Proxy.newProxyInstance(serviceClass.getClassLoader(),
new Class<?>[] {serviceClass.getInterfaces()[0]},
new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Socket socket = null;
ObjectOutputStream oos = null;
ObjectInputStream ois = null;
try {
socket = new Socket();
socket.connect(new InetSocketAddress(host, port));
oos = new ObjectOutputStream(socket.getOutputStream());
// 往对象输出流写入类
oos.writeUTF(serviceClass.getName());
// 往对象输出流写入方法名称
oos.writeUTF(method.getName());
// 往对象输出流写入参数类型数组
oos.writeObject(method.getParameterTypes());
// 往对象输出流写入方法参数值args
oos.writeObject(args);
ois = new ObjectInputStream(socket.getInputStream());
return ois.readObject();
} finally {
if (socket != null) {
socket.close();
}
if (oos != null) {
oos.close();
}
if (ois != null) {
ois.close();
}
}
}
});
}
}
- 测试类
public class HelloTest {
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
HelloProvider.provide("localhost", 9999);
}
}).start();
HelloConsumer<IHelloRpc> hc = new HelloConsumer<IHelloRpc>();
IHelloRpc hr = hc.consume(HelloRpcImpl.class, "localhost", 9999);
String content = hr.helloRpc("Hello World");
System.out.println(content);
}
}
结果:
Hello RPC
Hello World