rpc理解

2022-04-17  本文已影响0人  君子兰琚琚

1. 什么是rpc?

  rpc,即 Remote Procedure Call,中文:远程过程调用。简单点说,就是跨进程、跨机器、基于网络来实现的方法调用,下面以代码来解释:

import org.junit.Test;

import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.charset.StandardCharsets;

/**
 * @Author juwm
 * rpc调用方,可采用任意支持socket编程的语言实现
 */
public class RpcClient {

    @Test
    public void client() {
        try (Socket socket = new Socket("127.0.0.1", 6639);
             OutputStream out = socket.getOutputStream();
             InputStream in = socket.getInputStream()) {
            // 目标对象 、目标方法
            String protocol = "{\"method\":\"targetMethod\",\"classFullName\":\"a.b.TargetObject\"}]";
            out.write(protocol.getBytes(StandardCharsets.UTF_8));
            int len = 100;
            byte[] c = new byte[len];
            StringBuffer buffer = new StringBuffer(100);
            for (; ; ) {
                int read = in.read(c);
                buffer.append(new String(c));
                if (read != len) // 如果数据长度是len的倍数,这里不会跳出循环
                    break;
            }
            String result = buffer.substring(0, buffer.lastIndexOf("]"));
            System.out.println(result);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
import com.alibaba.fastjson.JSON;
import org.junit.Test;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.Method;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @Author juwm
 * rpc被调方,可采用任意支持socket编程的语言实现
 */
public class RpcServer {

    private static Queue<Socket> socketList = new ArrayBlockingQueue(10);

    private static ThreadPoolExecutor executor =
            new ThreadPoolExecutor(5,
                    5,
                    1000,
                    TimeUnit.SECONDS,
                    new ArrayBlockingQueue<>(10),
                    Executors.defaultThreadFactory(),
                    new ThreadPoolExecutor.DiscardPolicy());

    @Test
    public void server() throws IOException {
        // new a ServerSocket
        ServerSocket server = new ServerSocket(6639);
        while (true) {
            // listen connect
            Socket socket = server.accept();
            // connect line up
            socketList.offer(socket);
            // create a task to Thread Pool
            dealWithConnect();
        }
    }

    public static void dealWithConnect() {
        executor.execute(() -> {
            try (Socket socket = socketList.poll();
                 InputStream in = socket.getInputStream();
                 OutputStream out = socket.getOutputStream()) {
                int len = 10;
                byte[] c = new byte[len];
                StringBuffer buffer = new StringBuffer(100);
                for (; ; ) {
                    /*
                     * 当read()执行时如果没有数据让它读,它就会阻塞直到有数据到来,
                     * 这是Bio的特点。read()阻塞后它后边的代码在数据到来前都将不得执行。
                     * 所以如果有代码需要再read()阻塞前执行,就需要知道客户端发送的数据的长度。
                     * 计算出当次读取的最后一次read(),执行完最后一次read()后先执行想要执行的代码,
                     * 然后再read(),这样就可以在read()阻塞前把想执行的代码执行。
                     */
                    int read = in.read(c);
                    buffer.append(new String(c));
                    if(read != len) {
                        // 这里判断是否为最后一次读取的方案是:
                        // 读取出来的数据长度是否等于数组的长度。
                        // 如果读出来的数据不能填满数组,
                        // 说明读到头了,下一次读取read()将阻塞,
                        // 如果有代码需要在阻塞前执行,则此时执行,
                        // 这种设计的bug是如果数据的长度正好是10
                        // 或者10的倍数则不行
                        break;
                    }
                }
                String context = buffer.substring(0, buffer.lastIndexOf("]"));
                Map map = JSON.parseObject(context, Map.class);
                // client想调用的目标对象(全类名)
                Class<?> objClass = Class.forName((String) map.get("classFullName"));
                // client想调用的目标方法
                Method me = objClass.getMethod((String) map.get("method"), null);
                // 利用反射调用目标对象的方法,从而实现跨进程的方法调用:Remote Procedure Call
                String result = me.invoke(objClass.newInstance(), null) + "]";
                // 返回结果给client
                out.write(result.getBytes(StandardCharsets.UTF_8));
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
    }
}
/**
 * @Author juwm
 * 目标对象
 */
public class TargetObject {
    // 目标方法
    public String targetMethod() {
        return "hello Remote Procedure Call!";
    }
}
image.png
image.png
上一篇下一篇

猜你喜欢

热点阅读