vertx 实现动态 RPC
需求:替换ali lightApi 动态rpc的实现,因为api为商业版,不是开源的,是基于pandora 的EDAS平台的。那么我们如何实现开源的动态RPC呢?
定义:动态RPC指的是,可以动态的让一个服务上线和下线,换句话说就是动态的从注册中心剔除服务,而不是停止这个服务,启动这个服务。这个需求是来自链路的原路返回
简介 vertx:
vertx 是一套全异步的基于netty通信的框架,Vertx中核心组建有 verticle, eventbus, circuit breaker,service discovery and register, Router等等。后期我会一一的去探索这里面的组件的原理。而本文介绍的vertx 的动态rpc,就是使用vertx proxy service 这个功能来实现的。另外我们得知道vertx 中的线程模型是基于netty的event loop,一个verticle 是一个微服务,而且具有HA的机制的微服务。服务之间的通信是根据event bus 的netty 通信机制。但是event bus 通信不是100%的可靠的(这点很要命,后期我会写博客探索的)。
此外vertx需要依赖 分布式缓存建立集群(hazelcast, ignite等),基于gossip协议的p2p网络。
探索过程:
如果要实现需求的话,首先第一想到的还是spring cloud,dubbo,thrift 等RPC框架,hsf 是阿里pandora rpc 框架,肯定不能用。但是经一番折腾之后,没有可以让我觉得可以在代码里面直接让服务上线下线,从注册中心剔除掉或者注册到注册中心的。那怎么办呢?曾经也想过会用rabbitmq,这样的MQ 去做动态的RPC,但是发现很复杂,关键很难做scalablity,如果有上千个链路,就需要上千个topic,或者tag 这类的标记。感觉很复杂,也很难维护。
solutions: vertx proxy service
这个方案上,我们不打算使用verticle的概念,仅仅使用vertx 代理服务的概念。首先我们要使用vertx 的服务发现和注册,其次我们要使用proxy service。
第一个坑:代理服务的自动生成。请在pom或者gradle里面加入 生成代理的依赖(对接口的代理)。然后创建 package-info.java,在root package。所谓的root package就是基package,web developer 开的springboot 应该都很清楚,需要将app.class 建在基package,以便扫描都可以扫描到
<groupId>io.vertx</groupId>
<artifactId>vertx-service-proxy</artifactId>
<classifier>processor</classifier>
<groupId>io.vertx</groupId>
<artifactId>vertx-codegen</artifactId>
<classifier>processor</classifier>
@ModuleGen(name ="ap-common-vertx", groupPackage ="com.xxxx.xx.xx.vertx")
package com.xxxx.xx.xx.vertx;
import io.vertx.codegen.annotations.ModuleGen;
其中groupPackage ="com.xxxx.xx.xx.vertx"), 是基package, 也可以是 你定义interface 所在的包。完成之后,使用maven clean install, 你就会发现在target 目录下面会有 ...Proxy 的class,这两个class 就是代理类,对于原理,后面的博客我会慢慢分析。
第二个坑是:服务发现publish 服务之后,需要注册服务代理,不然注册的服务,在event bus 上面是找不到的,看看代码怎么写吧:
Record record = EventBusService.createRecord(servicePublishRequestBean.getServiceName(), servicePublishRequestBean.getEventBusAddress(), servicePublishRequestBean.getClazz());
servicePublishRequestBean.getDiscovery().publish(record, ar -> {
if (ar.succeeded() && ar.result() !=null) {
publishedRecords.add(record);
recordMessageConsumerMap.putIfAbsent(record, ProxyServiceUtil.registerProxyService(servicePublishRequestBean.getEventBusAddress(), servicePublishRequestBean.getClazz(), servicePublishRequestBean.getService()));
}
handler.handle(ar.map(ar.result()));
});
servicePublishRequestBean就是我封装的bean,里面de属性有:
private StringserviceName;
private StringeventBusAddress;
private ServiceDiscoverydiscovery;
private Classclazz;
private T service;
此外我们一定要注册这样的proxy service才能够生效,不然是没有用的:
ProxyServiceUtil.registerProxyService(servicePublishRequestBean.getEventBusAddress(), servicePublishRequestBean.getClazz(), servicePublishRequestBean.getService()));
看看里面怎么写的
public static MessageConsumerregisterProxyService(String eventBusAddress, Class clazz, T service) {
return ServiceBinderUtil.getBinderInstance()
.setAddress(eventBusAddress)
.register(clazz, service);
}
第三个坑是: 和第二个坑一样的,需要注销服务,注销服务的话,我们也要调用proxyService 去把服务注销了,而不能单纯的调用 service discovery and register unpublish 方法,这个是很恶心的:看看我怎么写的把,一些很细节的东西需要自己探索,我不会贴出所有的东西的。比如下面的discover record 有许多的状态,这里一不小心就会找不到你所发布的服务,另外注销代理服务,需要个奇怪的参数,这个参数我存储在map里面:recordMessageConsumerMap, 在服务publish时候,就会生成的。
discovery.getRecord(info -> info.getName().equals(serviceName), res -> {
if (res.succeeded() && res.result() !=null && res.result().getStatus() == Status.UP) {
Record record = res.result();
discovery.unpublish(record.getRegistration(), ar -> {
if(ar.succeeded()) {
List records =recordMessageConsumerMap.keySet().stream().filter(map-> map.getName().equals(serviceName)).collect(Collectors.toList());
offlineRecord(records);
}
handler.handle(ar.map((Void)null));
});
}else {
handler.handle(res.map((Void)null));
}
}
);
OfflineRecord 主要是调用ProxyServiceUtil.unregisterProxyService(recordMessageConsumerMap.get(records.get(0)));,其中unregisterProxyService 方法的实现如下,是不是比较简单?
ServiceBinderUtil.getBinderInstance().unregister(consumer);
然后基本上做到这里,vertx 动态的rpc 就可以实现了,主要是用了vertx 服务发现组件的,publish, unpublish方式,和proxy service 的 register和unregister方法。但是里面的坑比较多。此外除了上面所描述的坑之外,我还想告诉小伙伴们,event bus 通信exception 会出现一些奇怪的错误,所有event bus 通信也是个坑,其实proxy sevice 本质就是 event bus,所以event bus 网络连接不通的话,确实很头疼,经过我这边的测试,不管ecs 集群的部署,还是ecs 和docker 的混合部署,网络都是可以通的(我使用的是ignite分布式缓存)。暂时我先贴出来关于网络的代码,如果你们遇到了问题,先暂时按照我这个来,不会让你很恼火。
@Bean
public IgniteConfigurationgetIgniteSelfConfiguration()throws Exception{
IgniteConfiguration igniteConfiguration =new IgniteConfiguration();
igniteConfiguration.setClientMode(false);
igniteConfiguration.setPeerClassLoadingEnabled(true);
igniteConfiguration.setDeploymentMode(DeploymentMode.CONTINUOUS);
igniteConfiguration.setPeerClassLoadingMissedResourcesCacheSize(0);
igniteConfiguration.setDiscoverySpi(getTcpDiscoverySpi());
igniteConfiguration.setCacheConfiguration(getCacheConfiguration());
// igniteConfiguration.setLocalHost(IPUtil.getLocalIp());
return igniteConfiguration;
}
@Bean
public TcpDiscoverySpigetTcpDiscoverySpi()throws Exception{
TcpDiscoverySpi tcpDiscoverySpi =new TcpDiscoverySpi();
tcpDiscoverySpi.setIpFinder(getTcpDiscoveryMulticastIpFinder());
tcpDiscoverySpi.setNetworkTimeout(10000);
System.out.println("setting success for host");
tcpDiscoverySpi.setLocalAddress(IPUtil.getLocalIp());
return tcpDiscoverySpi;
}
@Bean
public TcpDiscoveryMulticastIpFindergetTcpDiscoveryMulticastIpFinder(){
TcpDiscoveryMulticastIpFinder tcpDiscoveryMulticastIpFinder =new TcpDiscoveryMulticastIpFinder();
tcpDiscoveryMulticastIpFinder.setMulticastGroup("224.0.0.100");
return tcpDiscoveryMulticastIpFinder;
}
@Bean
public CacheConfigurationgetCacheConfiguration(){
CacheConfiguration cacheConfiguration =new CacheConfiguration();
cacheConfiguration.setName("myCache");
cacheConfiguration.setCacheMode(CacheMode.PARTITIONED);
cacheConfiguration.setBackups(1);
return cacheConfiguration;
}
@Bean
public VertxClusterStartervertxClusterStarter() {
VertxClusterStarter vertxClusterStarter =new VertxClusterStarter();
return vertxClusterStarter;
}
ClusterManager clusterManager =new IgniteClusterManager(igniteSelfConfiguration);
TcpDiscoverySpi discoverySpi = (TcpDiscoverySpi)igniteSelfConfiguration.getDiscoverySpi();
TcpDiscoveryMulticastIpFinder tcpDiscoveryMulticastIpFinder = (TcpDiscoveryMulticastIpFinder) discoverySpi.getIpFinder();
tcpDiscoveryMulticastIpFinder.setAddresses(Arrays.asList(propertiesHolderUtils.getVertxClusterIps().split(CommonConstants.CHARACTER_SEPARATOR_COMMA)));
VertxOptions vertxOptions =new VertxOptions().setClustered(true).setClusterHost(IPUtil.getLocalIp()).setClusterPort(Integer.valueOf(propertiesHolderUtils.getVertxClusterPorts().split(CommonConstants.CHARACTER_SEPARATOR_COMMA)[0])).setClusterManager(clusterManager);
ServiceDiscoveryOptions discoveryOptions =new ServiceDiscoveryOptions();
谢谢,希望对大家有所帮助,后面我会尝试探索event bus 通信的原理。