一起学RPC(六)
上一篇文章中讲到了服务的本地注册。这篇文章接着继续讲服务的“远程注册”。所谓的“远程注册”实际上就是将需要暴露的服务相关信息上传到一个服务器上,这个服务器会保存这些信息,并且供consumer来查阅。简单地可以将注册中心类比为DNS。
而今天不是讨论这个DNS,而是探讨provider的注册器,也就是发起注册的客户端程序。
Jupiter中将这个“注册器”抽象为RegistryService
。而这个是一个抽象类型,不用想也知道一定是个抽象类型的接口,因为方便拓展。其顶级接口为Registry
,而这个顶级接口中只定义一个方法connectToRegistryServer
。抽象层次十分清晰。
而RegistryService
不仅仅只能由provider使用,consumer也能使用到。其中包含的基本操作也可想而知,如register
和unregister
就是供provider使用的,而subscribe
则给consumer调用。
先看看顶级接口的connectToRegistryServer
的实现细节:
private final ConcurrentMap<UnresolvedAddress, DefaultRegistry> clients = Maps.newConcurrentMap();
public void connectToRegistryServer(String connectString) {
checkNotNull(connectString, "connectString");
String[] array = Strings.split(connectString, ',');
for (String s : array) {
String[] addressStr = Strings.split(s, ':');
String host = addressStr[0];
int port = Integer.parseInt(addressStr[1]);
UnresolvedAddress address = new UnresolvedSocketAddress(host, port);
DefaultRegistry client = clients.get(address);
if (client == null) {
DefaultRegistry newClient = new DefaultRegistry(this);
client = clients.putIfAbsent(address, newClient);
if (client == null) {
client = newClient;
JConnection connection = client.connect(address);
client.connectionManager().manage(connection);
} else {
newClient.shutdownGracefully();
}
}
}
}
这段代码的主要目的是将要连接的注册中心地址和对应的“客户端”做一个映射,同时连接到注册中心,维持这个“连接”。这里的“客户端”指的是DefaultRegistry
实例。这里的clients
不是一个普通的map,而是并发包下的。这么做的目的很明显是为了并发安全。但是我没想到具体的场景。这段判断逻辑看似简单,但是值得推敲。address和client能不能映射起来,能就返回一个空,然后做连接操作,将返回的连接给保存起来;不能映射的话就说明这个address已经被别的client映射到了,就返回对应的value,那么之前创建的client就销毁掉,就当从来没有来过一样。
而register
的实现就非常简单了:
private final LinkedBlockingQueue<RegisterMeta> queue = new LinkedBlockingQueue<>();
public void register(RegisterMeta meta) {
queue.add(meta);
}
他是在抽象类中实现的,很直白--将注册信息放到阻塞队列中。实际上注册的逻辑没有那么简单,一行代码肯定完成不了。在抽象类的构造器中隐藏着大玄机:
private final ScheduledExecutorService registerScheduledExecutor =
Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("register.schedule.executor", true));
private final ExecutorService registerExecutor =
Executors.newSingleThreadExecutor(new NamedThreadFactory("register.executor", true));
private final ExecutorService localRegisterWatchExecutor =
Executors.newSingleThreadExecutor(new NamedThreadFactory("local.register.watch.executor", true));
public AbstractRegistryService() {
registerExecutor.execute(new Runnable() {
@Override
public void run() {
while (!shutdown.get()) {
RegisterMeta meta = null;
try {
meta = queue.take();
registerMetaMap.put(meta, RegisterState.PREPARE);
doRegister(meta);
} catch (InterruptedException e) {
logger.warn("[register.executor] interrupted.");
} catch (Throwable t) {
if (meta != null) {
logger.error("Register [{}] fail: {}, will try again...", meta.getServiceMeta(), stackTrace(t));
// 间隔一段时间再重新入队, 让出cpu
final RegisterMeta finalMeta = meta;
registerScheduledExecutor.schedule(new Runnable() {
@Override
public void run() {
queue.add(finalMeta);
}
}, 1, TimeUnit.SECONDS);
}
}
}
}
});
localRegisterWatchExecutor.execute(new Runnable() {
@Override
public void run() {
while (!shutdown.get()) {
try {
Thread.sleep(3000);
doCheckRegisterNodeStatus();
} catch (InterruptedException e) {
logger.warn("[local.register.watch.executor] interrupted.");
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Check register node status fail: {}, will try again...", stackTrace(t));
}
}
}
}
});
}
在这个注册器实例化的时候就会新建两个线程池--registerExecutor
和localRegisterWatchExecutor
。前者是负责注册的。从代码逻辑中可以看到里面跑着一个循环,这个循环正常情况下是一直跑的(死循环)。而循环里取队列中的注册信息,将其作为key存到registerMetaMap
中,value为这个注册信息的注册状态。这里的状态为PREPARE,也就是准备中,还没到注册完成。接下来就是具体的动作了,注册信息有了,得把这些数据发送到注册中心就得发起一个tcp请求。这个方法是一个抽象方法,具体的实现由子类完成。我们这里只看默认实现:
protected void doRegister(RegisterMeta meta) {
Collection<DefaultRegistry> allClients = clients.values();
checkArgument(!allClients.isEmpty(), "init needed");
logger.info("Register: {}.", meta);
for (DefaultRegistry c : allClients) {
c.doRegister(meta);
}
getRegisterMetaMap().put(meta, RegisterState.DONE);
}
因为注册中心可能存在很多,每个注册中心都维持着一个连接,这里将所有的连接拿到,然后将注册信息全部推送到注册中心,完事后将这个状态置为DONE,也就是注册完成了。然而这里的具体的注册过程依旧不是自己完成的,而是交给DefaultRegistry
去做的,这个对象简单理解为就是一个tcp client。这里不展开讨论。
当这段死循环(准确来说不叫死循环,因为take会阻塞)中出现异常了怎么办?这里又准备了一个线程池,是带定时器功能的线程池。目的是等一段时间再将注册信息添加到队列中。总不能出现了异常就把注册信息给丢了吧。
而localRegisterWatchExecutor
是一个检查官,每隔一段时间去检查注册中心的节点状态,如果发现注册信息不见了,那么重新注册一次。当然在默认实现中是没有实现的,但是zk的实现是有对应的操作的,不然我也不能瞎编吧。我猜是因为自己实现的注册中心没有那种节点变化后能直接通知客户端的机制,因此没有去实现23333.
注册的逻辑就这么些,接下来看看取消注册是如何完成的:
public void unregister(RegisterMeta meta) {
if (!queue.remove(meta)) {
registerMetaMap.remove(meta);
doUnregister(meta);
}
}
没有任何疑问的操作一定是从队列中移除这个注册信息,这里有个先后顺序,如果移除成功了说明这个元素是在队列中也就是说registerMetaMap
肯定是没有这个元素的。如果不在队列里了,说明已经将这个东西取出来了,放到了registerMetaMap
中。这时候得将这个元素也移除掉,同时注册中心上的这个元素也得移除掉。
protected void doUnregister(RegisterMeta meta) {
Collection<DefaultRegistry> allClients = clients.values();
checkArgument(!allClients.isEmpty(), "init needed");
logger.info("Unregister: {}.", meta);
for (DefaultRegistry c : allClients) {
c.doUnregister(meta);
}
}
关于provider的“注册”基本上就这些内容,关于“远程注册”的注册机client的实现会在下一篇中继续。