vertx 实现动态 RPC

2019-01-01  本文已影响0人  Double_winter

需求:替换ali lightApi 动态rpc的实现,因为api为商业版,不是开源的,是基于pandora 的EDAS平台的。那么我们如何实现开源的动态RPC呢?

定义:动态RPC指的是,可以动态的让一个服务上线和下线,换句话说就是动态的从注册中心剔除服务,而不是停止这个服务,启动这个服务。这个需求是来自链路的原路返回

简介 vertx:

vertx 是一套全异步的基于netty通信的框架,Vertx中核心组建有 verticle, eventbus, circuit breaker,service discovery and register, Router等等。后期我会一一的去探索这里面的组件的原理。而本文介绍的vertx 的动态rpc,就是使用vertx proxy service 这个功能来实现的。另外我们得知道vertx 中的线程模型是基于netty的event loop,一个verticle 是一个微服务,而且具有HA的机制的微服务。服务之间的通信是根据event bus 的netty 通信机制。但是event bus 通信不是100%的可靠的(这点很要命,后期我会写博客探索的)。

此外vertx需要依赖 分布式缓存建立集群(hazelcast, ignite等),基于gossip协议的p2p网络。

探索过程:

如果要实现需求的话,首先第一想到的还是spring cloud,dubbo,thrift 等RPC框架,hsf 是阿里pandora rpc 框架,肯定不能用。但是经一番折腾之后,没有可以让我觉得可以在代码里面直接让服务上线下线,从注册中心剔除掉或者注册到注册中心的。那怎么办呢?曾经也想过会用rabbitmq,这样的MQ 去做动态的RPC,但是发现很复杂,关键很难做scalablity,如果有上千个链路,就需要上千个topic,或者tag 这类的标记。感觉很复杂,也很难维护。

solutions: vertx proxy service 

这个方案上,我们不打算使用verticle的概念,仅仅使用vertx 代理服务的概念。首先我们要使用vertx 的服务发现和注册,其次我们要使用proxy service。

第一个坑:代理服务的自动生成。请在pom或者gradle里面加入 生成代理的依赖(对接口的代理)。然后创建 package-info.java,在root package。所谓的root package就是基package,web developer 开的springboot 应该都很清楚,需要将app.class 建在基package,以便扫描都可以扫描到

    <groupId>io.vertx</groupId>

    <artifactId>vertx-service-proxy</artifactId>

    <classifier>processor</classifier>

    <groupId>io.vertx</groupId>

    <artifactId>vertx-codegen</artifactId>

    <classifier>processor</classifier>

@ModuleGen(name ="ap-common-vertx", groupPackage ="com.xxxx.xx.xx.vertx")

package com.xxxx.xx.xx.vertx;

import io.vertx.codegen.annotations.ModuleGen;

其中groupPackage ="com.xxxx.xx.xx.vertx"), 是基package, 也可以是 你定义interface 所在的包。完成之后,使用maven clean install, 你就会发现在target 目录下面会有 ...Proxy 的class,这两个class 就是代理类,对于原理,后面的博客我会慢慢分析。

第二个坑是:服务发现publish 服务之后,需要注册服务代理,不然注册的服务,在event bus 上面是找不到的,看看代码怎么写吧:

Record record = EventBusService.createRecord(servicePublishRequestBean.getServiceName(), servicePublishRequestBean.getEventBusAddress(), servicePublishRequestBean.getClazz());

servicePublishRequestBean.getDiscovery().publish(record, ar -> {

if (ar.succeeded() && ar.result() !=null) {

publishedRecords.add(record);

        recordMessageConsumerMap.putIfAbsent(record, ProxyServiceUtil.registerProxyService(servicePublishRequestBean.getEventBusAddress(), servicePublishRequestBean.getClazz(), servicePublishRequestBean.getService()));

    }

handler.handle(ar.map(ar.result()));

});

servicePublishRequestBean就是我封装的bean,里面de属性有:

private StringserviceName;

private StringeventBusAddress;

private ServiceDiscoverydiscovery;

private Classclazz;

private T service;

此外我们一定要注册这样的proxy service才能够生效,不然是没有用的:

ProxyServiceUtil.registerProxyService(servicePublishRequestBean.getEventBusAddress(), servicePublishRequestBean.getClazz(), servicePublishRequestBean.getService()));

看看里面怎么写的

public static MessageConsumerregisterProxyService(String eventBusAddress, Class clazz, T service) {

return ServiceBinderUtil.getBinderInstance()

.setAddress(eventBusAddress)

.register(clazz, service);

}

第三个坑是: 和第二个坑一样的,需要注销服务,注销服务的话,我们也要调用proxyService 去把服务注销了,而不能单纯的调用 service discovery and register unpublish 方法,这个是很恶心的:看看我怎么写的把,一些很细节的东西需要自己探索,我不会贴出所有的东西的。比如下面的discover record 有许多的状态,这里一不小心就会找不到你所发布的服务,另外注销代理服务,需要个奇怪的参数,这个参数我存储在map里面:recordMessageConsumerMap, 在服务publish时候,就会生成的。

discovery.getRecord(info -> info.getName().equals(serviceName), res -> {

if (res.succeeded() && res.result() !=null && res.result().getStatus() == Status.UP) {

Record record = res.result();

                discovery.unpublish(record.getRegistration(), ar -> {

if(ar.succeeded()) {

List records =recordMessageConsumerMap.keySet().stream().filter(map-> map.getName().equals(serviceName)).collect(Collectors.toList());

                        offlineRecord(records);

                    }

handler.handle(ar.map((Void)null));

                });

            }else {

handler.handle(res.map((Void)null));

            }

}

);

OfflineRecord 主要是调用ProxyServiceUtil.unregisterProxyService(recordMessageConsumerMap.get(records.get(0)));,其中unregisterProxyService 方法的实现如下,是不是比较简单?

ServiceBinderUtil.getBinderInstance().unregister(consumer);

然后基本上做到这里,vertx 动态的rpc 就可以实现了,主要是用了vertx 服务发现组件的,publish, unpublish方式,和proxy service 的 register和unregister方法。但是里面的坑比较多。此外除了上面所描述的坑之外,我还想告诉小伙伴们,event bus 通信exception 会出现一些奇怪的错误,所有event bus 通信也是个坑,其实proxy sevice 本质就是 event bus,所以event bus 网络连接不通的话,确实很头疼,经过我这边的测试,不管ecs 集群的部署,还是ecs 和docker 的混合部署,网络都是可以通的(我使用的是ignite分布式缓存)。暂时我先贴出来关于网络的代码,如果你们遇到了问题,先暂时按照我这个来,不会让你很恼火。

@Bean

public IgniteConfigurationgetIgniteSelfConfiguration()throws Exception{

IgniteConfiguration igniteConfiguration =new IgniteConfiguration();

    igniteConfiguration.setClientMode(false);

    igniteConfiguration.setPeerClassLoadingEnabled(true);

    igniteConfiguration.setDeploymentMode(DeploymentMode.CONTINUOUS);

    igniteConfiguration.setPeerClassLoadingMissedResourcesCacheSize(0);

    igniteConfiguration.setDiscoverySpi(getTcpDiscoverySpi());

    igniteConfiguration.setCacheConfiguration(getCacheConfiguration());

  // igniteConfiguration.setLocalHost(IPUtil.getLocalIp());

    return igniteConfiguration;

}

@Bean

public TcpDiscoverySpigetTcpDiscoverySpi()throws Exception{

TcpDiscoverySpi tcpDiscoverySpi =new TcpDiscoverySpi();

    tcpDiscoverySpi.setIpFinder(getTcpDiscoveryMulticastIpFinder());

    tcpDiscoverySpi.setNetworkTimeout(10000);

    System.out.println("setting success for host");

    tcpDiscoverySpi.setLocalAddress(IPUtil.getLocalIp());

    return tcpDiscoverySpi;

}

@Bean

public TcpDiscoveryMulticastIpFindergetTcpDiscoveryMulticastIpFinder(){

TcpDiscoveryMulticastIpFinder tcpDiscoveryMulticastIpFinder =new TcpDiscoveryMulticastIpFinder();

    tcpDiscoveryMulticastIpFinder.setMulticastGroup("224.0.0.100");

    return tcpDiscoveryMulticastIpFinder;

}

@Bean

public CacheConfigurationgetCacheConfiguration(){

CacheConfiguration cacheConfiguration =new CacheConfiguration();

    cacheConfiguration.setName("myCache");

    cacheConfiguration.setCacheMode(CacheMode.PARTITIONED);

    cacheConfiguration.setBackups(1);

    return cacheConfiguration;

}

@Bean

public VertxClusterStartervertxClusterStarter() {

VertxClusterStarter vertxClusterStarter =new VertxClusterStarter();

    return vertxClusterStarter;

}

ClusterManager clusterManager =new IgniteClusterManager(igniteSelfConfiguration);

TcpDiscoverySpi discoverySpi = (TcpDiscoverySpi)igniteSelfConfiguration.getDiscoverySpi();

TcpDiscoveryMulticastIpFinder tcpDiscoveryMulticastIpFinder = (TcpDiscoveryMulticastIpFinder) discoverySpi.getIpFinder();

tcpDiscoveryMulticastIpFinder.setAddresses(Arrays.asList(propertiesHolderUtils.getVertxClusterIps().split(CommonConstants.CHARACTER_SEPARATOR_COMMA)));

VertxOptions vertxOptions =new VertxOptions().setClustered(true).setClusterHost(IPUtil.getLocalIp()).setClusterPort(Integer.valueOf(propertiesHolderUtils.getVertxClusterPorts().split(CommonConstants.CHARACTER_SEPARATOR_COMMA)[0])).setClusterManager(clusterManager);

ServiceDiscoveryOptions discoveryOptions =new ServiceDiscoveryOptions();

谢谢,希望对大家有所帮助,后面我会尝试探索event bus 通信的原理。

上一篇下一篇

猜你喜欢

热点阅读