实现简单的RPC框架

2021-06-19  本文已影响0人  侧耳倾听y

仅实现最基本的功能

RPC原理

实现步骤

1. 本地代理存根

使用cglib获取代理Service

OrderService orderService = ServiceFactory.create(OrderService.class, "http://localhost:8080/");
Order order = orderService.findByOrderNo("110");
log.info("rpc result:{}", order);
public class ServiceFactory {

    static {
        ParserConfig.getGlobalInstance().addAccept("org.example");
    }

    public static  <T> T create(Class<T> serviceClass, String host) {
        ServiceInterceptor interceptor = new ServiceInterceptor(serviceClass, host);
        Enhancer enhancer = new Enhancer();
        enhancer.setSuperclass(serviceClass);
        enhancer.setCallback(interceptor);
        return (T) enhancer.create();
    }
}
public class ServiceInterceptor implements MethodInterceptor {

    public static final MediaType JSON_TYPE = MediaType.get("application/json; charset=utf-8");

    private Class serviceClass;

    private String host;

    public ServiceInterceptor(Class serviceClass, String host) {
        this.serviceClass = serviceClass;
        this.host = host;
    }

    @Override
    public Object intercept(Object o, Method method, Object[] objects, MethodProxy methodProxy) throws Throwable {
        String serviceClassName = serviceClass.getName();
        String methodName = method.getName();
        RpcRequest rpcRequest = RpcRequest.builder().serviceClass(serviceClassName).methodName(methodName)
                .params(objects).build();
        String reqJson = JSON.toJSONString(rpcRequest);
        OkHttpClient client = new OkHttpClient();
        final Request request = new Request.Builder()
                .url(host)
                .post(RequestBody.create(JSON_TYPE, reqJson))
                .build();
        String respJson = Objects.requireNonNull(client.newCall(request).execute().body()).string();
        RpcResponse rpcResponse = JSON.parseObject(respJson, RpcResponse.class);
        if (!rpcResponse.isSuccess()) {
            throw rpcResponse.getException();
        }
        return JSON.parse(rpcResponse.getData().toString());
    }
}

2.本地序列化、反序列化

使用Json序列化

RpcRequest rpcRequest = RpcRequest.builder().serviceClass(serviceClassName).methodName(methodName)
                .params(objects).build();
String reqJson = JSON.toJSONString(rpcRequest);
...
RpcResponse rpcResponse = JSON.parseObject(respJson, RpcResponse.class);

3.网络通信

使用http协议

OkHttpClient client = new OkHttpClient();
        final Request request = new Request.Builder()
                .url(host)
                .post(RequestBody.create(JSON_TYPE, reqJson))
                .build();
        String respJson = Objects.requireNonNull(client.newCall(request).execute().body()).string();

4.远程序列化、反序列化

同样使用Json序列化

 @PostMapping("/")
    public RpcResponse invoke(@RequestBody RpcRequest request) {
        return invoker.invoke(request);
    }

5. 远程服务存根

利用spring的beanFactory找到Service实现

@Setter
public class ServiceInvoker implements ApplicationContextAware {

    private ApplicationContext applicationContext;

    public RpcResponse invoke(RpcRequest rpcRequest) {
        try {
            Class<?> serviceClass = Class.forName(rpcRequest.getServiceClass());
            Method method = Arrays.stream(serviceClass.getMethods()).filter(m -> m.getName()
                    .equals(rpcRequest.getMethodName())).findFirst().get();
            final Object service = applicationContext.getBean(serviceClass);
            Object data = method.invoke(service, rpcRequest.getParams());
            return RpcResponse.builder().data(JSON.toJSONString(data, SerializerFeature.WriteClassName)).success(true).build();
        } catch (Exception e) {
            return RpcResponse.builder().exception(e).success(true).build();
        }
    }
}

6. 调用实际业务服务

通过反射调用

Class<?> serviceClass = Class.forName(rpcRequest.getServiceClass());
Method method = Arrays.stream(serviceClass.getMethods()).filter(m -> m.getName()
                    .equals(rpcRequest.getMethodName())).findFirst().get();
final Object service = applicationContext.getBean(serviceClass);
Object data = method.invoke(service, rpcRequest.getParams());

7. 原路返回服务结果

RpcResponse.builder().data(JSON.toJSONString(data, SerializerFeature.WriteClassName)).success(true).build();

8. 返回给本地调用方

思考

上述仅仅实现了rpc的基本功能,在此之上,还可以实现一些扩展功能,比如负载均衡、路由、容错等功能。对于基本功能,也有很多可以优化的地方,比如服务注册发现,网络通信优化等等。

代码

https://gitee.com/morehard/demo/tree/master/simple-rpc

上一篇 下一篇

猜你喜欢

热点阅读