手写RPC框架

手写RPC框架(2) 引入zookeeper做服务治理

2019-07-07  本文已影响43人  jwfy

本人微信公众号(jwfy)欢迎关注

上一期完成了手写一个RPC框架,看看100个线程同时调用效果如何,但还是遗留了很多问题以及可以优化的点,这次就完全重写之前的代码,演进到v2版本,使得代码逻辑更加规范的同时,引入ZooKeeper辅助完成服务治理。

在代码展示之前还是先介绍一些基本的概念以及设计思路,ZooKeeper是什么,服务治理又是什么等,最后贴了部分关键代码以说明和v1版本的区别,有哪些点的改进措施。

最后还提了个问题:线程池打满了怎么办?,你有什么好的解决方案呢?

ZooKeeper

ZooKeeper(直译为动物管理员,简称zk)是一个分布式、开源的应用协调服务,利用和Paxos类似的ZAB选举算法实现分布式一致性服务。有类似于Unix文件目录的节点信息,同时可以针对节点的变更添加watcher监听以能够即使感知到节点信息变更。可提供的功能例如域名服务、配置维护、同步以及组服务等(此功能介绍来自官网描述:It exposes common services - such as naming, configuration management, synchronization, and group services - in a simple interface)。如下图就是DUBBO存储在ZooKeeper的节点数据情况。

image

在本地启动服务后通过zk客户端连接后也可通过命令查看节点信息,如下图所示。

image

ZooKeeper包含了4种不同含义的功能节点,在每次创建节点之前都需要明确声明节点类型。

类型 定义 描述
PERSISTENT 持久化目录节点 客户端与zookeeper断开连接后,该节点依旧存在
PERSISTENT_SEQUENTIAL 持久化顺序编号目录节点 客户端与zookeeper断开连接后,该节点依旧存在,只是Zookeeper给该节点名称进行顺序编号
EPHEMERAL 临时目录节点 客户端与zookeeper断开连接后,该节点被删除
EPHEMERAL_SEQUENTIAL 临时顺序编号目录节点 客户端与zookeeper断开连接后,该节点被删除,只是Zookeeper给该节点名称进行顺序编号

ZooKeeper使用之前需要先进行安装,后开启服务端的服务,我们的服务作为客户端连接ZooKeeper以便于后续的操作。具体可参考官网文档Zookeeper3.5.5 官方文档,在实际的java项目开发中也是可以通过maven引入ZkClient或者Curator开源的客户端,在本文学习笔记中是使用的Curator,因为其已经封装了原始的节点注册、数据获取、添加watcher等功能。具体maven引入的版本如下,

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>4.2.0</version>
</dependency>

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.2.0</version>
</dependency>

服务治理

服务治理也就是针对服务进行管理的措施,例如服务发现服务暴露负载均衡快速上下线等都是服务治理的具体体现。

数据处理流程

服务端:服务的提供方,接受网络传输的请求数据、通过网络把应答数据发送给客户端
客户端:服务的调用方,使用本地代理,通过网络把请求数据发送出去,接受服务端返回的应答数据

image

所有的数据传输都是按照上面图片说的流程来的,如果需要添加自定义的序列化工具,则需要在把数据提交到socket的输出流缓冲区之前按照序列化工具完成序列化操作,反序列化则进行反向操作即可。

RPC 实践 V2版本

文件夹目录如下图所示,其中:

image

由于代码太长,只贴部分重要的代码操作。

服务暴露 & 服务发现

public interface ServiceRegister {

    /**
     * 服务注册
     * @param config
     */
    void register(BasicConfig config);

    /**
     * 服务发现,从注册中心获取可用的服务提供方信息
     * @param request
     * @return
     */
    InetSocketAddress discovery(RpcRequest request, ServiceType nodeType);
}

默认使用了CuratorFramework客户端完成zk数据的操作

public class ZkServiceRegister implements ServiceRegister {

    private CuratorFramework client;

    private static final String ROOT_PATH = "jwfy/simple-rpc";

    private LoadBalance loadBalance = new DefaultLoadBalance();

    public ZkServiceRegister() {
        RetryPolicy policy = new ExponentialBackoffRetry(1000, 3);

        this.client = CuratorFrameworkFactory
                .builder()
                .connectString("127.0.0.1:2182")
                .sessionTimeoutMs(50000)
                .retryPolicy(policy)
                .namespace(ROOT_PATH)
                .build();
        // 业务的根路径是 /jwfy/simple-rpc ,其他的都会默认挂载在这里

        this.client.start();
        System.out.println("zk启动正常");
    }

    @Override
    public void register(BasicConfig config) {
        String interfacePath = "/" + config.getInterfaceName();
        try {
            if (this.client.checkExists().forPath(interfacePath) == null) {
                // 创建 服务的永久节点
                this.client.create()
                        .creatingParentsIfNeeded()
                        .withMode(CreateMode.PERSISTENT)
                        .forPath(interfacePath);
            }

            config.getMethods().forEach(method -> {
                try {
                    String methodPath = null;
                    ServiceType serviceType = config.getType();
                    if (serviceType == ServiceType.PROVIDER) {
                        // 服务提供方,需要暴露自身的ip、port信息,而消费端则不需要
                        String address = getServiceAddress(config);
                        methodPath = String.format("%s/%s/%s/%s", interfacePath, serviceType.getType(), method.getMethodName(), address);
                    } else {
                        methodPath = String.format("%s/%s/%s", interfacePath, serviceType.getType(), method.getMethodName());
                    }
                    System.out.println("zk path: [" + methodPath + "]");
                    this.client.create()
                            .creatingParentsIfNeeded()
                            .withMode(CreateMode.EPHEMERAL)
                            .forPath(methodPath, "0".getBytes());
                    // 创建临时节点,节点包含了服务提供段的信息
                } catch (Exception e) {
                    e.getMessage();
                }
            });
        } catch (Exception e) {
            e.getMessage();
        }
    }

    @Override
    public InetSocketAddress discovery(RpcRequest request, ServiceType nodeType) {
        String path = String.format("/%s/%s/%s", request.getClassName(), nodeType.getType(), request.getMethodName());
        try {
            List<String> addressList = this.client.getChildren().forPath(path);
            // 采用负载均衡的方式获取服务提供方信息,不过并没有添加watcher监听模式
            String address = loadBalance.balance(addressList);
            if (address == null) {
                return null;
            }
            return parseAddress(address);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    private String getServiceAddress(BasicConfig config) {
        String hostInfo = new StringBuilder()
                .append(config.getHost())
                .append(":")
                .append(config.getPort())
                .toString();
        return hostInfo;
    }

    private InetSocketAddress parseAddress(String address) {
        String[] result = address.split(":");
        return new InetSocketAddress(result[0], Integer.valueOf(result[1]));
    }

    public void setLoadBalance(LoadBalance loadBalance) {
        // 可以重新设置负载均衡的策略
        this.loadBalance = loadBalance;
    }
}
image

服务启动后利用zkclient查询到在zk中包含的节点信息,其中默认的命名空间是jwfy/simple-rpc

负载均衡

public interface LoadBalance {
    String balance(List<String> addressList);
}
public abstract class AbstractLoadBalance implements LoadBalance {

    @Override
    public String balance(List<String> addressList) {
        if (addressList == null || addressList.isEmpty()) {
            return null;
        }
        if (addressList.size() == 1) {
            return addressList.get(0);
        }
        return doLoad(addressList);
    }

    abstract String doLoad(List<String> addressList);
}
public class DefaultLoadBalance extends AbstractLoadBalance {

    @Override
    String doLoad(List<String> addressList) {
        Random random = new Random();
        // 利用随机函数选择一个,其中random.nextIn生成的数据是在[0, size) 之间
        return addressList.get(random.nextInt(addressList.size()));
    }
}

上面的负载均衡代码其实很简单,就是从一个机器列表addressList中选择一个,如果列表为空或者不存在则直接返回null,如果机器只有1台则直接获取返回即可,当列表记录超过1条后利用随机函数生成一个列表偏移量以获取对应数据。也可以按照类似完成更多负载均衡的策略,然后调用setLoadBalance方法就可以了。

IO 处理

public interface MessageProtocol {
    /**
     * 服务端解析从网络传输的数据,转变成request
     * @param inputStream
     * @return
     */
    void serviceToRequest(RpcRequest request, InputStream inputStream);

    /**
     * 服务端把计算机的结果包装好,通过outputStream 返回给客户端
     * @param response
     * @param outputStream
     * @param <T>
     */
     <T> void serviceGetResponse(RpcResponse<T> response, OutputStream outputStream);

    /**
     * 客户端把请求拼接好,通过outputStream发送到服务端
     * @param request
     * @param outputStream
     */
     void clientToRequest(RpcRequest request, OutputStream outputStream);

    /**
     * 客户端接收到服务端响应的结果,转变成response
     * @param response
     * @param inputStream
     */
     void clientGetResponse(RpcResponse response, InputStream inputStream);
}

实现类DefaultMessageProtocol

public class DefaultMessageProtocol implements MessageProtocol {

    @Override
    public void serviceToRequest(RpcRequest request, InputStream inputStream) {
        try {
            ObjectInputStream input = new ObjectInputStream(inputStream);

            request.setClassName(input.readUTF());
            request.setMethodName(input.readUTF());
            request.setParameterTypes((Class<?>[])input.readObject());
            request.setArguments((Object[])input.readObject());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public <T> void serviceGetResponse(RpcResponse<T> response, OutputStream outputStream) {
        try {
            ObjectOutputStream output = new ObjectOutputStream(outputStream);

            output.writeBoolean(response.getError());
            output.writeObject(response.getResult());
            output.writeObject(response.getErrorMessage());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void clientToRequest(RpcRequest request, OutputStream outputStream) {
        try {
            ObjectOutputStream ouput = new ObjectOutputStream(outputStream);

            ouput.writeUTF(request.getClassName());
            ouput.writeUTF(request.getMethodName());
            ouput.writeObject(request.getParameterTypes());
            ouput.writeObject(request.getArguments());
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void clientGetResponse(RpcResponse response, InputStream inputStream) {
        try {
            ObjectInputStream input = new ObjectInputStream(inputStream);

            response.setError(input.readBoolean());
            response.setResult(input.readObject());
            response.setErrorMessage((String) input.readObject());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

服务端请求处理

public class ServiceHandler {

    private ThreadPoolExecutor executor = null;
    private RpcService rpcService;
    private MessageProtocol messageProtocol;

    public ServiceHandler(RpcService rpcService) {
        this.rpcService = rpcService;

        ThreadFactory commonThreadName = new ThreadFactoryBuilder()
                .setNameFormat("Parse-Task-%d")
                .build();

        this.executor = new ThreadPoolExecutor(
                10,
                10,
                2,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(200),
                commonThreadName, new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                SocketTask socketTask = (SocketTask) r;
                Socket socket = socketTask.getSocket();
                if (socket != null) {
                    try {
                        socket.close();
                        System.out.println("reject socket:" + socketTask + ", and closed");
                        // 无法及时处理和响应的就快速拒绝掉
                    } catch (IOException e) {
                    }
                }
            }
        });
    }

    public RpcService getRpcService() {
        return rpcService;
    }

    public void setRpcService(RpcService rpcService) {
        this.rpcService = rpcService;
    }

    public MessageProtocol getMessageProtocol() {
        return messageProtocol;
    }

    public void setMessageProtocol(MessageProtocol messageProtocol) {
        this.messageProtocol = messageProtocol;
    }

    public void handler(Socket socket) {
        // 接收到新的套接字,包装成为一个runnable提交给线程去执行
        this.executor.execute(new SocketTask(socket));
    }

    class SocketTask implements Runnable {

        private Socket socket;

        public SocketTask(Socket socket) {
            this.socket = socket;
        }

        public Socket getSocket() {
            return socket;
        }

        @Override
        public void run() {
            try {
                InputStream inputStream = socket.getInputStream();
                OutputStream outputStream = socket.getOutputStream();

                RpcRequest request = new RpcRequest();
                messageProtocol.serviceToRequest(request, inputStream);
                // 获取客户端请求数据,统一包装成RpcRequest
                RpcResponse response = rpcService.invoke(request);
                // 反射调用,得到具体的返回值
                System.out.println("request:[" + request + "], response:[" + response + "]");
                messageProtocol.serviceGetResponse(response, outputStream);
                // 再返回给客户端
            } catch (Exception e) {
                // error
            } finally {
                if (socket != null) {
                    // SOCKET 关闭一定要加上,要不然会出各种事情
                    try {
                        socket.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
}

客户端 代理对象

public class ProxyInstance implements InvocationHandler {

    private RpcClient rpcClient;
    private Class clazz;

    public ProxyInstance(RpcClient client, Class clazz) {
        this.rpcClient = client;
        this.clazz = clazz;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        RpcRequest request = new RpcRequest();
        request.setClassName(clazz.getName());
        request.setMethodName(method.getName());
        request.setParameterTypes(method.getParameterTypes());
        request.setArguments(args);

        // 获取服务提供方信息,这里既是服务发现的入口,找到一个合适的可用的服务提供方信息
        InetSocketAddress address = rpcClient.discovery(request);
        System.out.println("[" + Thread.currentThread().getName() + "]discover service:" + address);

        // 发起网络请求,得到请求数据
        RpcResponse response = rpcClient.invoke(request, address);
        return response.getResult();
    }
}

上面的InetSocketAddress address = rpcClient.discovery(request)是相比v1多了一个最重要的东西,每次获取请求后都实时从zk中获取对应的服务提供方信息,这就是服务发现

实践

public class Client {

    public static void main(String[] args) {
        RpcClient rpcClient = new RpcClient();

        rpcClient.subscribe(Calculate.class);
        rpcClient.start();

        Calculate<Integer> calculateProxy = rpcClient.getInstance(Calculate.class);

        for(int i=0; i< 200; i++) {
            new Thread(() -> {
                long start = System.currentTimeMillis();
                int s1 = new Random().nextInt(100);
                int s2 = new Random().nextInt(100);
                int s3 = calculateProxy.add(s1, s2);
                System.out.println("[" + Thread.currentThread().getName() + "]a: " + s1 + ", b:" + s2 + ", c=" + s3 + ", 耗时:" + (System.currentTimeMillis() - start));
            }).start();
        }
    }
}

客户端开启200个线程后,执行结果是顺利执行的,在服务端开启的接受请求被添加到线程池中,而代码中线程池的任务队列长度是200,可以完全的存储200个线程,但是如果我们把客户端请求量从200个改成500个呢,又会出现什么情况?

服务端

image

客户端

image image

如上述的图片显示,当请求量打满线程池之后,线程池的拒绝策略就开始生效了,在本代码中是直接调用了close操作,而客户端感知到关闭后也会出现io错误,而正常的请求则顺利执行。其中还有输出discover服务发现了服务提供方的机器信息,这也是符合起初的想法的。

这里一定要加上一些策略以及时关闭无法处理的socket,否则就会出现服务提供方无任何可执行,但是服务调用方却还在等待中,因为socket并没有关闭,从而出现资源被占用了,还不执行相关任务。

提个问题:线程池打满了怎么办?

在本demo中采取了非常粗暴的策略,直接丢弃了无法处理的任务,在实际的线上业务中,可以先加机器以能再最短的时间内恢复线上情况,后期结合业务特点提出针对性的解决方案。如果业务接受一定的延迟,可以考虑接入kafka类似的消息队列(削峰是mq的一大特点);如果对时间要求很高,要么加机器,要么压榨机器性能,可能之前设置的线程池的数量太小,那就需要调节线程池的各个核心数据,修改线程池的任务队列类型也是可以考虑的;此外也有可能是业务耗时太多,无法及时处理完全造成请求堆积导致的,那么就需要考虑业务的同步改异步化。最后线上告警也需要及时完善。

没有绝对的解决方案,只有最合适当下场景的方案,没有银弹,任何不具体结合业务的方案都是扯淡。

总结思考

v2版本相比v1版本修改了整个代码结构,使得结构能够更加明确,引入zookeeper作为服务治理功能,大致介绍了zookeeper的特点以及功能,给服务注册、服务发现、序列化协议等均留下了口子,以便实现自定义的协议,v1的io模型是BIO,v2并没有变化,只是由单线程改造成多线程。

整体而言符合一个简单的rpc框架,依旧还是有很多点可以完善、优化的点,如:

本人微信公众号(搜索jwfy)欢迎关注

微信公众号
上一篇 下一篇

猜你喜欢

热点阅读