基于Netty 手写 Dubbo 框架

2019-05-07  本文已影响0人  July_whj

基于Netty 手写 Dubbo 框架

1、Dubbo是什么,解决什么样的问题?

为了解决模块拆分后,彼此远程调用的问题。

RPC -> Remote Procedure Call 远程调用,常见的RPC框架有:

阿里的:dubbo。

当当的:dubbox。

谷歌的:grpc。

SpringCloud(一站式开发)等。

2、实现方案

查看官网dubbo结构图

image-20190504082007794.png

1、首先通过register将服务提供者的url注册到Registry注册中心中。

2、客户端Consumer从注册中心获取被调用服务端注册信息,如:接口名称,URL地址等信息。

3、将获取的url地址返回到Consumer客户端,客户端通过获取的URL地址支持invoke反射机制获取服务的实现。

3、整体项目结构信息

|-- netty-to-dubbo
    |-- netty-dubbo-api
        |-- cn.org.july.netty.dubbo.api
            |-- Iservice : 对外服务暴露接口
            |-- RpcRequest :服务请求对象Bean
    |-- netty-dubbo-common
        |-- cn.org.july.netty.dubbo.annotation
            |-- RpcAnnotation : 定义一个接口标识注解
    |-- netty-dubbo-server
        |-- cn.org.july.netty.dubbo.registry
            |-- IRegisterCenter :服务注册接口
            |-- RegisterCenterImpl:服务注册实现类
            |-- ZkConfig:ZK配置文件
        |-- cn.org.july.netty.dubbo.rpc
            |-- NettyRpcServer:基于netty实现的Rpc通讯服务
            |-- RpcServerHandler:Rpc服务处理流程
        |-- cn.org.july.netty.dubbo.service
            |-- ServiceImpl:对外接口IService接口实现类
    |-- netty-dubbo-client
        |-- cn.org.july.netty.dubbo.loadbalance
            |-- LoadBalance :负载均衡实现接口
            |-- RandomLoadBalance:负载均衡实现类随机获取服务提供者
        |-- cn.org.july.netty.dubbo.proxy
            |-- RpcClientProxy:netty客户端通讯组件
            |-- RpcProxyHandler:netty与服务端通讯消息处理组件
        |-- cn.org.july.netty.dubbo.registry
            |-- IServiceDiscover:从注册中心获取注册的服务接口
            |-- ServiceDiscoverImpl:接口IServiceDiscover的实现类
            |-- ZkConfig:zk配置文件。

4、服务提供者Provider

4.1、实现Iservice接口

首先我们看下Iservice接口的内容:

package cn.org.july.netty.dubbo.api;

/**
 * @author july_whj
 */
public interface IService {
    /**
     * 计算加法
     */
    int add(int a, int b);
    /**
     * @param msg
     */
    String sayHello(String msg);
}

我们编写ServiceImpl实现以上两个接口类。

package cn.org.july.netty.dubbo.service;

import cn.org.july.netty.dubbo.annotation.RpcAnnotation;
import cn.org.july.netty.dubbo.api.IService;

/**
 * @author july_whj
 */
@RpcAnnotation(IService.class)
public class ServiceImpl implements IService {
    @Override
    public int add(int a, int b) {
        return a + b;
    }
    @Override
    public String sayHello(String msg) {
        System.out.println("rpc say :" + msg);
        return "rpc say: " + msg;
    }
}

该类实现比较简单,不做多处理,下面分析服务注册。

4.2、服务注册到ZK

​ 首先我们定义一个接口类IRegisterCenter,里面定义一个registry方法,该方法实现服务注册。服务注册需要将服务的名称、服务的地址注册到注册中心中,我们定义接口如下:

package cn.org.july.netty.dubbo.registry;

/**
 * @author july_whj
 */
public interface IRegisterCenter {
    /**
     * 服务注册
     * @param serverName 服务名称(实现方法路径)
     * @param serviceAddress 服务地址
     */
    void registry(String serverName,String serviceAddress);
}

​ 第二,我们使用zookeerper作为服务注册中心,在netty-dubbo-server模块中引入zk的客户端操作类,pom文件如下:

<dependency>
     <groupId>org.apache.curator</groupId>
     <artifactId>curator-recipes</artifactId>
     <version>2.5.0</version>
</dependency>
<dependency>
     <groupId>org.apache.curator</groupId>
     <artifactId>curator-framework</artifactId>
     <version>2.5.0</version>
</dependency>

注意:这里版本使用的2.5.0,我使用的zk版,

​ 第三,实现该接口编写接口实现类RegisterCenterImpl

package cn.org.july.netty.dubbo.registry;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;

/**
 * @author july_whj
 */
public class RegisterCenterImpl implements IRegisterCenter {

    private CuratorFramework curatorFramework;
    {
        curatorFramework = CuratorFrameworkFactory.builder()
                .connectString(ZkConfig.addr).sessionTimeoutMs(4000)
                .retryPolicy(new ExponentialBackoffRetry(1000, 10)).build();
        curatorFramework.start();
    }
    @Override
    public void registry(String serverName, String serviceAddress) {
        String serverPath = ZkConfig.ZK_REGISTER_PATH.concat("/").concat(serverName);
        try {
            if (curatorFramework.checkExists().forPath(serverPath) == null) {
                curatorFramework.create().creatingParentsIfNeeded()
                        .withMode(CreateMode.PERSISTENT).forPath(serverPath, "0".getBytes());
            }
            String addr = serverPath.concat("/").concat(serviceAddress);
            String rsNode = curatorFramework.create().withMode(CreateMode.EPHEMERAL)
                    .forPath(addr, "0".getBytes());
            System.out.println("服务注册成功," + rsNode);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

我们分析下以上代码:

​ 定义一个CuratorFramework对象,通过代码块来实例化该对象,并通过curatorFramework.start();来连接ZKConfig中配置好的地址连接ZK。

​ 使用zk作为注册中心,我们了解下ZK的存储结构。zookeeper的命名空间的结构和文件系统很像。一个名字和文件一样使用/的路径表现,zookeeper的每个节点都是被路径唯一标识的。

image-20190504113736353.png

​ 分析一下registry方法,首先从ZkConfig中获取要注册数据的根节点信息,并将该信息和服务名称进行拼接,判断该路径是否存在,如果不存在使用PERSISTENT方式创建该服务名称路径信息。PERSISTENT方式为持久方式,我们使用这种方式创建因为服务名称不是动态变化的,不用每次去监听它的变化。而我们服务的地址是有可能存在多个,并且有可能发生变化,我们使用EPHEMERAL方式来创建服务的实现地址。

​ 我们将ServiceImpl服务注册到zk上,我们首先获取这个服务的服务名称,和服务实现的地址,将该服务的服务名称和服务地址注册到zk上,下面看下我们的注册服务的测试类RegTest

import cn.org.july.netty.dubbo.registry.IRegisterCenter;
import cn.org.july.netty.dubbo.registry.RegisterCenterImpl;

import java.io.IOException;

public class RegTest {
    public static void main(String[] args) throws IOException {
        IRegisterCenter registerCenter = new RegisterCenterImpl();
        registerCenter.registry("cn.org.july.test", "127.0.0.1:9090");
        System.in.read();
    }
}

​ 我们将cn.org.july.test服务,和服务实现的地址127.0.0.1:9090注册到zk中。

看下服务执行效果:

image-20190504114457883.png

服务端显示注册成功,我们看以下zk服务中有没有该数据,

zk.gif

最后,我们可以看到数据注册成功。

4.3、实现NettyRpcServer

​ 我们要将ServiceImpl服务发布到zk上,并通过netty监听某个端口信息。

​ 我们先看下

package cn.org.july.netty.dubbo.rpc;

import cn.org.july.netty.dubbo.annotation.RpcAnnotation;
import cn.org.july.netty.dubbo.registry.IRegisterCenter;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;

import java.util.HashMap;
import java.util.Map;

/**
 * @author july_whj
 */
public class NettyRpcServer {

    private IRegisterCenter registerCenter;
    private String serviceAddress;
    private Map<String, Object> handlerMap = new HashMap<>(16);

    public NettyRpcServer(IRegisterCenter registerCenter, String serviceAddress) {
        this.registerCenter = registerCenter;
        this.serviceAddress = serviceAddress;
    }

    /**
     * 发布服务
     */
    public void publisher() {
        for (String serviceName : handlerMap.keySet()) {
            registerCenter.registry(serviceName, serviceAddress);
        }
        try {
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            //启动netty服务
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class);
            bootstrap.childHandler(new ChannelInitializer<Channel>() {
                @Override
                protected void initChannel(Channel channel) throws Exception {
                    ChannelPipeline channelPipeline = channel.pipeline();
                    channelPipeline.addLast(new ObjectDecoder(1024 * 1024, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
                    channelPipeline.addLast(new ObjectEncoder());
                    channelPipeline.addLast(new RpcServerHandler(handlerMap));
                }
            }).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);
            String[] addr = serviceAddress.split(":");
            String ip = addr[0];
            int port = Integer.valueOf(addr[1]);
            ChannelFuture future = bootstrap.bind(ip, port).sync();
            System.out.println("服务启动,成功。");
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 子对象的实现
     *
     * @param services 对象实现类
     */
    public void bind(Object... services) {
        //将实现类通过注解获取实现类的名称、实现类的实现放入map集合中。
        for (Object service : services) {
            RpcAnnotation annotation = service.getClass().getAnnotation(RpcAnnotation.class);
            String serviceName = annotation.value().getName();
            handlerMap.put(serviceName, service);
        }
    }
}

分析下以上代码:

​ 通过bind方法,将服务提供者通过RpcAnnotation注解获取服务名称,并将服务名称,服务实现类放入handlerMap 中。

​ 通过publisher方法,获取handlerMap 中的服务实现,将这些服务实现通过registerCenter.registry(serviceName, serviceAddress)将这些服务注册到zk注册中心中,完成服务的注册。下面代码是netty的基础代码,创建两个工作线程池,启动netty服务,通过channelPipeline定义序列化对象和RpcServerHandler实现。这里不做过多解析。

​ 我们看下RpcServerHandler的代码实现。

package cn.org.july.netty.dubbo.rpc;

import cn.org.july.netty.dubbo.api.RpcRequest;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.io.UnsupportedEncodingException;
import java.lang.reflect.Method;
import java.nio.Buffer;
import java.util.HashMap;
import java.util.Map;

public class RpcServerHandler extends ChannelInboundHandlerAdapter {
    private Map<String, Object> handlerMap = new HashMap<>();

    public RpcServerHandler(Map<String, Object> handlerMap) {
        this.handlerMap = handlerMap;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws UnsupportedEncodingException {
        System.out.println("channelActive:" + ctx.channel().remoteAddress());
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("服务端接收到消息:" + msg);
        RpcRequest rpcRequest = (RpcRequest) msg;
        Object result = new Object();
        if (handlerMap.containsKey(rpcRequest.getClassName())) {
            Object clazz = handlerMap.get(rpcRequest.getClassName());
            Method method = clazz.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getTypes());
            result = method.invoke(clazz, rpcRequest.getParams());
        }
        ctx.write(result);
        ctx.flush();
        ctx.close();
    }
}

​ 这里复写了channelRead方法,接收客户端传递的RpcRequest对象信息。下面判断handlerMap中是否存在客户端调用的实现类,如果存在通过反射机制获取服务端实现类,通过invoke方法调用方法实现,并将执行结果result对象通过ctx.write(result);将执行结果返回客户端。

4.4、编写服务启动类ServerTest

import cn.org.july.netty.dubbo.api.IService;
import cn.org.july.netty.dubbo.registry.IRegisterCenter;
import cn.org.july.netty.dubbo.registry.RegisterCenterImpl;
import cn.org.july.netty.dubbo.rpc.NettyRpcServer;
import cn.org.july.netty.dubbo.service.ServiceImpl;

/**
 * Created with IntelliJ IDEA.
 * User:  wanghongjie
 * Date:  2019/5/3 - 23:03
 * <p>
 * Description:
 */
public class ServerTest {
    public static void main(String[] args) {
        IService service = new ServiceImpl();
        IRegisterCenter registerCenter = new RegisterCenterImpl();
        NettyRpcServer rpcServer = new NettyRpcServer(registerCenter, "127.0.0.1:8080");
        rpcServer.bind(service);
        rpcServer.publisher();
    }
}

启动netty服务,将服务实现类service通过bind方法绑定到handlerMap中,通过publisher方法,将service、服务实现地址发布到zk,并启动netty服务,监听8080端口。

5、实现服务消费者

​ 做为服务消费者,我们首先要连接zk注册中心,获取服务实现的地址,并实时监听获取最新的地址信息。通过远程调用实现该服务。如果服务实现是多个我们需实现客户端负载,选取我们的服务地址。

5.1、负载均衡实现

​ 定义loadbalance接口.

package cn.org.july.netty.dubbo.loadbalance;

import java.util.List;

public interface LoadBalance {
    String select(List<String> repos);
}

​ 定义select选择方法。

通过RandomLoadBalance 实现loadbalance接口,从实现名称可以看到Random随机获取。

package cn.org.july.netty.dubbo.loadbalance;

import java.util.List;
import java.util.Random;

public class RandomLoadBalance implements LoadBalance {
    @Override
    public String select(List<String> repos) {
        int len = repos.size();
        if (len == 0)
            throw new RuntimeException("未发现注册的服务。");
        Random random = new Random();
        return repos.get(random.nextInt(len));
    }
}

5.2、获取注册中心服务注册信息

​ 定义IServiceDiscover接口,定义discover方法,进行服务发现。

package cn.org.july.netty.dubbo.registry;

public interface IServiceDiscover {
    String discover(String serviceName);
}

通过ServiceDiscoverImpl实现IServiceDiscover接口。

package cn.org.july.netty.dubbo.registry;

import cn.org.july.netty.dubbo.loadbalance.LoadBalance;
import cn.org.july.netty.dubbo.loadbalance.RandomLoadBalance;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;

import java.util.ArrayList;
import java.util.List;

/**
 * @author july_whj
 */
public class ServiceDiscoverImpl implements IServiceDiscover {

    List<String> repos = new ArrayList<String>();
    private CuratorFramework curatorFramework;

    public ServiceDiscoverImpl() {
        curatorFramework = CuratorFrameworkFactory.builder()
                .connectString(ZkConfig.addr).sessionTimeoutMs(4000)
                .retryPolicy(new ExponentialBackoffRetry(1000, 10))
                .build();
        curatorFramework.start();
    }

    @Override
    public String discover(String serviceName) {
        String path = ZkConfig.ZK_REGISTER_PATH.concat("/").concat(serviceName);
        try {
            repos = curatorFramework.getChildren().forPath(path);
        } catch (Exception e) {
            e.printStackTrace();
        }
        registerWatch(path);
        LoadBalance loadBalance = new RandomLoadBalance();
        return loadBalance.select(repos);
    }

    /**
     * 监听ZK节点内容刷新
     *
     * @param path 路径
     */
    private void registerWatch(final String path) {
        PathChildrenCache childrenCache = new PathChildrenCache(curatorFramework, path, true);
        PathChildrenCacheListener childrenCacheListener = new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
                repos = curatorFramework.getChildren().forPath(path);
            }
        };
        childrenCache.getListenable().addListener(childrenCacheListener);
        try {
            childrenCache.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


}

​ 和服务注册同样定义CuratorFramework对象,并通过curatorFramework.start();连接ZK。

连接成功后通过zk注册的根节点加服务名称,获取该服务的服务地址。

​ 获取的服务地址有可能不是最新的服务地址,我们需要监听zk节点的内容刷新,通过调用registerWatch方法,监听该节点的数据变化。

​ 最后,将获取到的地址集合,通过LoadBalance随机选出一个地址,实现该服务。

5.3、客户端netty实现RPC远程调用

定义客户端实现类RpcClientProxy.

package cn.org.july.netty.dubbo.proxy;

import cn.org.july.netty.dubbo.api.RpcRequest;
import cn.org.july.netty.dubbo.registry.IServiceDiscover;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;

/**
 * Created with IntelliJ IDEA.
 * User:  wanghongjie
 * Date:  2019/5/3 - 23:08
 * <p>
 * Description:
 */
public class RpcClientProxy {
    private IServiceDiscover serviceDiscover;

    public RpcClientProxy(IServiceDiscover serviceDiscover) {
        this.serviceDiscover = serviceDiscover;
    }

    public <T> T create(final Class<T> interfaceClass) {
        return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),
                new Class<?>[]{interfaceClass}, new InvocationHandler() {
                    //封装RpcRequest请求对象,然后通过netty发送给服务等
                    @Override
                    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                        RpcRequest rpcRequest = new RpcRequest();
                        rpcRequest.setClassName(method.getDeclaringClass().getName());
                        rpcRequest.setMethodName(method.getName());
                        rpcRequest.setTypes(method.getParameterTypes());
                        rpcRequest.setParams(args);
                        //服务发现,zk进行通讯
                        String serviceName = interfaceClass.getName();
                        //获取服务实现url地址
                        String serviceAddress = serviceDiscover.discover(serviceName);
                        //解析ip和port
                        System.out.println("服务端实现地址:" + serviceAddress);
                        String[] arrs = serviceAddress.split(":");
                        String host = arrs[0];
                        int port = Integer.parseInt(arrs[1]);
                        System.out.println("服务实现ip:" + host);
                        System.out.println("服务实现port:" + port);
                        final RpcProxyHandler rpcProxyHandler = new RpcProxyHandler();
                        //通过netty方式进行连接发送数据
                        EventLoopGroup group = new NioEventLoopGroup();
                        try {
                            Bootstrap bootstrap = new Bootstrap();
                            bootstrap.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
                                    .handler(new ChannelInitializer<SocketChannel>() {
                                        @Override
                                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                                            ChannelPipeline channelPipeline = socketChannel.pipeline();
                                            channelPipeline.addLast(new ObjectDecoder(1024 * 1024, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
                                            channelPipeline.addLast(new ObjectEncoder());
                                            //netty实现代码
                                            channelPipeline.addLast(rpcProxyHandler);
                                        }
                                    });
                            ChannelFuture future = bootstrap.connect(host, port).sync();
                            //将封装好的对象写入
                            future.channel().writeAndFlush(rpcRequest);
                            future.channel().closeFuture().sync();
                        } catch (Exception e) {

                        } finally {
                            group.shutdownGracefully();
                        }
                        return rpcProxyHandler.getResponse();
                    }
                });
    }
}

​ 我们看下create方法,通过动态代理newProxyInstance方法,传入待调用的接口对象,获取getClassLoader后,实现invoke方法。定义RpcRequest对象,封装请求参数。通过interfaceClass对象获取服务实现名称,调用discover方法获取服务提供者的地址信息,netty通过该信息连接服务,并将RpcRequest对象发送到服务端,服务端解析对象,获取接口请求参数等信息,执行方法,并将结果返回到客户端RpcProxyHandler对象接收返回结果。RpcProxyHandler代码实现:

package cn.org.july.netty.dubbo.proxy;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * Created with IntelliJ IDEA.
 * User:  wanghongjie
 * Date:  2019/5/3 - 23:21
 * <p>
 * Description:
 */
public class RpcProxyHandler extends ChannelInboundHandlerAdapter {
    private Object response;

    public Object getResponse() {
        return response;
    }
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //将服务端返回的内容返回
        response = msg;
    }
}

​ 我们复写channelRead方法,获取服务端返回的结果信息msg,并将msg赋值给response,通过getResponse获取返回信息。

5.4、客户单调用测试

import cn.org.july.netty.dubbo.api.IService;
import cn.org.july.netty.dubbo.proxy.RpcClientProxy;
import cn.org.july.netty.dubbo.registry.IServiceDiscover;
import cn.org.july.netty.dubbo.registry.ServiceDiscoverImpl;

/**
 * Created with IntelliJ IDEA.
 * User:  wanghongjie
 * Date:  2019/5/3 - 23:06
 * <p>
 * Description:
 */
public class ClientTest {
    public static void main(String[] args) {
        IServiceDiscover serviceDiscover = new ServiceDiscoverImpl();
        RpcClientProxy rpcClientProxy = new RpcClientProxy(serviceDiscover);
        IService iService = rpcClientProxy.create(IService.class);
        System.out.println(iService.sayHello("netty-to-dubbo"));
        System.out.println(iService.sayHello("你好"));
        System.out.println(iService.sayHello("成功咯,很高兴"));
        System.out.println(iService.add(10, 4));
    }
}

我们看下执行效果。​

服务端启动:

image-20190504142549948.png

客户单调用:

666.gif

远程调用完成。






上一篇下一篇

猜你喜欢

热点阅读