3、zk客户端curator使用(转)
一、curator简介
- Curator是Netflix公司开源的一套zookeeper客户端框架,解决了很多Zookeeper客户端非常底层的细节开发工作,包括连接重连、反复注册Watcher和NodeExistsException异常等等。Patrixck Hunt(Zookeeper)以一句“Guava is to Java that Curator to Zookeeper”给Curator予高度评价。
curator-framework:对zookeeper的底层api的一些封装
curator-client:提供一些客户端的操作,例如重试策略等
curator-recipes:封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等
- 版本问题。使用curator的版本:2.12.0,对应Zookeeper的版本为:3.4.x,如果跨版本会有兼容性问题,很有可能导致节点操作失败
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
二、Curator-framework基本api使用
1、创建会话
(1)静态工厂方法创建会话
String connectionInfo = "127.0.0.1:2181";
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory
.newClient(connectionInfo, 5000, 3000, retryPolicy);
newClient静态工厂方法包含四个主要参数:
静态工厂参数
(2)使用fluent流式创建
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.builder()
.connectString(connectionInfo)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();
(3)创建包含命名空间的会话
- 为了实现不同的Zookeeper业务之间的隔离,需要为每个业务分配一个独立的命名空间(NameSpace),即指定一个Zookeeper的根路径(官方术语:为Zookeeper添加“Chroot”特性)。
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.builder()
.connectString(connectionInfo)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
#创建命名空间为namespace的会话(该会话内的操作都是基于该目录进行的)
.namespace("namespace")
.build();
2、启动客户端
client.start();
3、创建数据节点
- Zookeeper的节点创建模式:
PERSISTENT:持久化
PERSISTENT_SEQUENTIAL:持久化并且带序列号
EPHEMERAL:临时
EPHEMERAL_SEQUENTIAL:临时并且带序列号
//创建一个节点,指定创建模式(临时节点),附带初始化内容,并且自动递归创建父节点
client.create()
.creatingParentContainersIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath("path","init".getBytes());
4、删除节点
//删除一个节点,并且递归删除其所有的子节点
client.delete().deletingChildrenIfNeeded().forPath("path");
//删除一个节点,强制指定版本进行删除
client.delete().withVersion(10086).forPath("path");
//删除一个节点,强制保证删除,guaranteed()接口是一个保障措施,只要客户端会话有效,
//那么Curator会在后台持续进行删除操作,直到删除节点成功。
client.delete().guaranteed().forPath("path");
5、读取数据
//读取一个节点的数据内容,同时获取到该节点的stat
读取一个节点的数据内容,同时获取到该节点的stat
6、更新节点数据
//更新一个节点的数据内容,该接口会返回一个Stat实例
Stat path = client.setData().forPath("path", "data".getBytes());
//更新一个节点的数据内容,强制指定版本进行更新
client.setData().withVersion(10086).forPath("path","data".getBytes());
7、检测节点是否存在
Stat stat = client.checkExists().forPath("path");
8、获取某个节点的所有子节点路径
//该方法的返回值为List<String>,获得ZNode的子节点Path列表。 可以调用额外的方法(监控、后台处
//理或者获取状态watch, background or get stat) 并在最后调用forPath()指定要操作的父ZNode
client.getChildren().forPath("path");
9、事物
- CuratorFramework的实例包含inTransaction( )接口方法,调用此方法开启一个ZooKeeper事务. 可以复合create, setData, check, and/or delete 等操作然后调用commit()作为一个原子操作提交。一个例子如下:
client.inTransaction().check().forPath("path")
.and()
.create().withMode(CreateMode.EPHEMERAL).forPath("path","data".getBytes())
.and()
.setData().withVersion(10086).forPath("path","data2".getBytes())
.and()
.commit();
10、异步接口
- 上面提到的创建、删除、更新、读取等方法都是同步的,Curator提供异步接口,引入了BackgroundCallback接口用于处理异步接口调用之后服务端返回的结果信息。BackgroundCallback接口中一个重要的回调值为CuratorEvent,里面包含事件类型、响应吗和节点的详细信息。
-
CuratorEventType:
CuratorEventType对应的事件类型 -
响应码(#getResultCode()):
状态响应码
//异步的创建节点
Executor executor = Executors.newFixedThreadPool(2);
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.inBackground((curatorFramework, curatorEvent) -> {
System.out.println(String.format("eventType:%s,resultCode:%s",
curatorEvent.getType(), curatorEvent.getResultCode()));
}, executor)
.forPath("path");
二、Curator-recipes实现高级特征
1、缓存cache
- Zookeeper原生支持通过注册Watcher来进行事件监听,但是开发者需要反复注册(Watcher只能单次注册单次使用)。Cache是Curator中对事件监听的包装,可以看作是对事件监听的本地缓存视图,能够自动为开发者处理反复注册监听。Curator提供了三种Watcher(Cache)来监听结点的变化。
(1)Path Cache
- Path Cache用来监控一个ZNode的子节点. 当一个子节点增加, 更新,删除时, Path Cache会改变它的状态, 会包含最新的子节点, 子节点的数据和状态,而状态的更变将通过PathChildrenCacheListener通知。涉及到的四个类:
PathChildrenCache、PathChildrenCacheEvent、PathChildrenCacheListener、ChildData
//1、构造函数
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData)
//2、想使用cache,必须调用它的start方法,使用完后调用close方法。可以设置StartMode来实现启动的模式,NORMAL:正常初始化。BUILD_INITIAL_CACHE:在调用start()之前会调用rebuild()。POST_INITIALIZED_EVENT: 当Cache初始化数据后发送一个PathChildrenCacheEvent.Type#INITIALIZED事件
//3、可以增加listener监听缓存的变化。
public void addListener(PathChildrenCacheListener listener)
//4、遍历所有的子节点
getCurrentData()
private static final String PATH = "/example/pathCache";
public static void main(String[] args) throws Exception {
//1.创建client并启动
String connectionInfo = "127.0.0.1:2181";
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory
.newClient(connectionInfo, 5000, 3000, retryPolicy);
client.start();
//2.创建path cache并启动
PathChildrenCache cache = new PathChildrenCache(client, PATH, true);
cache.start();
//3、进行事件监听
PathChildrenCacheListener cacheListener = (client1, event) -> {
System.out.println("事件类型:" + event.getType());
//如果new PathChildrenCache(client, PATH, true)中的参数cacheData值设置为false,则示例中的
// event.getData().getData()、data.getData()将返回null,cache将不会缓存节点数据。
if (null != event.getData()) {
System.out.println("节点数据:" + event.getData().getPath() + " = " + new String(event.getData().getData()));
}
};
cache.getListenable().addListener(cacheListener);
//4、进行响应的curd操作,看看事件回调结果
client.create().creatingParentsIfNeeded().forPath("/example/pathCache/test01", "01".getBytes());
Thread.sleep(10);
client.create().creatingParentsIfNeeded().forPath("/example/pathCache/test02", "02".getBytes());
Thread.sleep(10);
client.setData().forPath("/example/pathCache/test01", "01_V2".getBytes());
Thread.sleep(10);
for (ChildData data : cache.getCurrentData()) {
System.out.println("getCurrentData:" + data.getPath() + " = " + new String(data.getData()));
}
client.delete().forPath("/example/pathCache/test01");
Thread.sleep(10);
client.delete().forPath("/example/pathCache/test02");
Thread.sleep(1000 * 5);
cache.close();
client.close();
System.out.println("OK!");
}
(2)Node Cache
- NodeCache只能监听一个节点的状态变化。
- Node Cache与Path Cache类似,Node Cache只是监听某一个特定的节点。它涉及到下面的三个类:
NodeCache - Node Cache实现类
NodeCacheListener - 节点监听器
ChildData - 节点数据
client.create().creatingParentsIfNeeded().forPath(PATH);
final NodeCache cache = new NodeCache(client, PATH);
NodeCacheListener listener = () -> {
ChildData data = cache.getCurrentData();
if (null != data) {
System.out.println("节点数据:" + new String(cache.getCurrentData().getData()));
} else {
System.out.println("节点被删除!");
}
};
cache.getListenable().addListener(listener);
cache.start();
client.setData().forPath(PATH, "01".getBytes());
Thread.sleep(100);
client.setData().forPath(PATH, "02".getBytes());
Thread.sleep(100);
client.delete().deletingChildrenIfNeeded().forPath(PATH);
Thread.sleep(1000 * 2);
cache.close();
client.close();
System.out.println("OK!");
(3)Tree Cache
- Tree Cache可以监控整个树上的所有节点,类似于PathCache和NodeCache的组合,主要涉及到下面四个类:
TreeCache - Tree Cache实现类
TreeCacheListener - 监听器类
TreeCacheEvent - 触发的事件类
ChildData - 节点数据
- 注意:TreeCache在初始化(调用start()方法)的时候会回调TreeCacheListener实例一个事TreeCacheEvent,而回调的TreeCacheEvent对象的Type为INITIALIZED,ChildData为null,此时event.getData().getPath()很有可能导致空指针异常,这里应该主动处理并避免这种情况。
client.create().creatingParentsIfNeeded().forPath(PATH);
TreeCache cache = new TreeCache(client, PATH);
TreeCacheListener listener = (client1, event) ->
System.out.println("事件类型:" + event.getType() +
" | 路径:" + (null != event.getData() ? event.getData().getPath() : null));
cache.getListenable().addListener(listener);
cache.start();
client.setData().forPath(PATH, "01".getBytes());
Thread.sleep(100);
client.setData().forPath(PATH, "02".getBytes());
Thread.sleep(100);
client.delete().deletingChildrenIfNeeded().forPath(PATH);
Thread.sleep(1000 * 2);
cache.close();
client.close();
System.out.println("OK!");
2、leader选举
- Curator 有两种leader选举的recipe,分别是LeaderSelector和LeaderLatch。前者是所有存活的客户端不间断的轮流做Leader,大同社会。后者是一旦选举出Leader,除非有客户端挂掉重新触发选举,否则不会交出领导权。
(1)LeaderLatch
public class LeaderLatchTest {
/**
* 1、一旦启动,LeaderLatch会和其它使用相同latch path的其它LeaderLatch交涉,然后其中一个最终会被选举为leader,
* 2、可以通过hasLeadership方法查看LeaderLatch实例是否leader:
* 3、 LeaderLatch在请求成为leadership会block(阻塞),一旦不使用LeaderLatch了,必须调用close方法。
* 如果它是leader,会释放leadership, 其它的参与者将会选举一个leader。
* 4、 错误处理:LeaderLatch实例可以增加ConnectionStateListener来监听网络连接问题。 当 SUSPENDED 或 LOST 时,
* leader不再认为自己还是leader。当LOST后连接重连后RECONNECTED,LeaderLatch会删除先前的ZNode然后重新
* 创建一个。LeaderLatch用户必须考虑导致leadership丢失的连接问题。
* 强烈推荐你使用ConnectionStateListener
*/
protected static String PATH = "/francis/leader";
private static final int CLIENT_QTY = 10;
public static void main(String[] args) throws Exception {
List<CuratorFramework> clients = Lists.newArrayList();
List<LeaderLatch> examples = Lists.newArrayList();
String connectionInfo = "127.0.0.1:2181";
try {
//1、先创建10个leaderLatch
for (int i = 0; i < CLIENT_QTY; i++) {
CuratorFramework client
= CuratorFrameworkFactory.newClient(connectionInfo, new ExponentialBackoffRetry(20000, 3));
clients.add(client);
LeaderLatch latch = new LeaderLatch(client, PATH, "Client #" + i);
latch.addListener(new LeaderLatchListener() {
@Override
public void isLeader() {
// TODO Auto-generated method stub
System.out.println("I am Leader");
}
@Override
public void notLeader() {
// TODO Auto-generated method stub
System.out.println("I am not Leader");
}
});
examples.add(latch);
//2、启动后,选中一个作为leader
client.start();
latch.start();
}
Thread.sleep(10000);
LeaderLatch currentLeader = null;
//3、通过hasLeadership查看自己是否是leader, 如果是的话返回true。
for (LeaderLatch latch : examples) {
if (latch.hasLeadership()) {
currentLeader = latch;
}
}
System.out.println("current leader is " + currentLeader.getId());
System.out.println("release the leader " + currentLeader.getId());
//4、close释放当前的领导权。
currentLeader.close();
Thread.sleep(5000);
for (LeaderLatch latch : examples) {
if (latch.hasLeadership()) {
currentLeader = latch;
}
}
System.out.println("current leader is " + currentLeader.getId());
System.out.println("release the leader " + currentLeader.getId());
} finally {
for (LeaderLatch latch : examples) {
if (null != latch.getState() && !latch.getState().equals(LeaderLatch.State.CLOSED))
latch.close();
}
for (CuratorFramework client : clients) {
client.close();
}
}
}
}
(2)LeaderSelector
- LeaderSelector使用的时候主要涉及下面几个类:
LeaderSelector
LeaderSelectorListener
LeaderSelectorListenerAdapter
CancelLeadershipException
- 类似LeaderLatch,LeaderSelector必须start: leaderSelector.start(); 一旦启动,当实例取得领导权时你的listener的takeLeadership()方法被调用。而takeLeadership()方法只有领导权被释放时才返回。 当你不再使用LeaderSelector实例时,应该调用它的close方法。
public class LeaderSelectorAdapter extends LeaderSelectorListenerAdapter implements Closeable {
private final String name;
private final LeaderSelector leaderSelector;
//使用AtomicInteger来记录此client获得领导权的次数, 它是”fair”, 每个client有平等的机会获得领导权。
private final AtomicInteger leaderCount = new AtomicInteger();
public LeaderSelectorAdapter(CuratorFramework client, String path, String name) {
this.name = name;
leaderSelector = new LeaderSelector(client, path, this);
//调用 leaderSelector.autoRequeue();保证在此实例释放领导权之后还可能获得领导权。
leaderSelector.autoRequeue();
}
public void start() throws IOException {
leaderSelector.start();
}
@Override
public void close() throws IOException {
leaderSelector.close();
}
/**
* leaderSelector.start(); 一旦启动,当实例取得领导权时你的listener的takeLeadership()方法被调用。
* 而takeLeadership()方法只有领导权被释放时才返回。
* 你可以在takeLeadership进行任务的分配等等,并且不要返回,
* 如果你想要要此实例一直是leader的话可以加一个死循环。
* @param client
* @throws Exception
*/
@Override
public void takeLeadership(CuratorFramework client) throws Exception {
final int waitSeconds = (int) (5 * Math.random()) + 1;
System.out.println(name + " is now the leader. Waiting " + waitSeconds + " seconds...");
System.out.println(name + " has been leader " + leaderCount.getAndIncrement() + " time(s) before.");
try {
Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
} catch (InterruptedException e) {
System.err.println(name + " was interrupted.");
Thread.currentThread().interrupt();
} finally {
System.out.println(name + " relinquishing leadership.\n");
}
}
}
public class LeaderSelectorDemo {
protected static String PATH = "/francis/leader";
private static final int CLIENT_QTY = 10;
public static void main(String[] args) throws Exception {
List<CuratorFramework> clients = Lists.newArrayList();
List<LeaderSelectorAdapter> examples = Lists.newArrayList();
String connectionInfo = "127.0.0.1:2181";
try {
//1、构建10个LeaderSelectorAdapter
for (int i = 0; i < CLIENT_QTY; i++) {
CuratorFramework client
= CuratorFrameworkFactory.newClient(connectionInfo, new ExponentialBackoffRetry(20000, 3));
clients.add(client);
LeaderSelectorAdapter selectorAdapter = new LeaderSelectorAdapter(client, PATH, "Client #" + i);
examples.add(selectorAdapter);
//2、启动,并开始选举
client.start();
selectorAdapter.start();
}
System.out.println("Press enter/return to quit\n");
new BufferedReader(new InputStreamReader(System.in)).readLine();
} finally {
System.out.println("Shutting down...");
for (LeaderSelectorAdapter exampleClient : examples) {
CloseableUtils.closeQuietly(exampleClient);
}
for (CuratorFramework client : clients) {
CloseableUtils.closeQuietly(client);
}
}
}
}
(3)小结
对比可知,LeaderLatch必须调用close()方法才会释放领导权,而对于LeaderSelector,通过LeaderSelectorListener可以对领导权进行控制, 在适当的时候释放领导权,这样每个节点都有可能获得领导权。从而,LeaderSelector具有更好的灵活性和可控性,建议有LeaderElection应用场景下优先使用LeaderSelector。
3、分布式锁
- 要点:
1.推荐使用ConnectionStateListener监控连接的状态,因为当连接LOST时你不再拥有锁
2.分布式的锁全局同步, 这意味着任何一个时间点不会有两个客户端都拥有相同的锁。
(1)可重入共享锁Shared Reentrant Lock
- Shared意味着锁是全局可见的, 客户端都可以请求锁。 Reentrant和JDK的ReentrantLock类似,即可重入, 意味着同一个客户端在拥有锁的同时,可以多次获取,不会被阻塞。 它是由类InterProcessMutex来实现。 它的构造函数为:
public InterProcessMutex(CuratorFramework client, String path)
- 通过acquire()获得锁,并提供超时机制;并通过release()释放锁
- 请求撤销当前的锁, 调用attemptRevoke()方法,注意锁释放时RevocationListener将会回调。
public class FakeLimitedResource {
private final AtomicBoolean inUse = new AtomicBoolean(false);
public void use() throws InterruptedException {
// 真实环境中我们会在这里访问/维护一个共享的资源
//这个例子在使用锁的情况下不会非法并发异常IllegalStateException
//但是在无锁的情况由于sleep了一段时间,很容易抛出异常
if (!inUse.compareAndSet(false, true)) {
throw new IllegalStateException("Needs to be used by one client at a time");
}
try {
Thread.sleep((long) (3 * Math.random()));
} finally {
inUse.set(false);
}
}
}
public class InterProcessMutexDemo {
private InterProcessMutex lock;
private final FakeLimitedResource resource;
private final String clientName;
public InterProcessMutexDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
this.resource = resource;
this.clientName = clientName;
this.lock = new InterProcessMutex(client, lockPath);
}
public void doWork(long time, TimeUnit unit) throws Exception {
//加锁、其他被阻塞
if (!lock.acquire(time, unit)) {
throw new IllegalStateException(clientName + " could not acquire the lock");
}
try {
System.out.println(clientName + " get the lock");
resource.use(); //access resource exclusively
} finally {
System.out.println(clientName + " releasing the lock");
//解锁
lock.release(); // always release the lock in a finally block
}
}
private static final int QTY = 5;
private static final int REPETITIONS = QTY * 10;
private static final String PATH = "/examples/locks";
public static void main(String[] args) throws Exception {
final FakeLimitedResource resource = new FakeLimitedResource();
ExecutorService service = Executors.newFixedThreadPool(QTY);
String connectionInfo = "127.0.0.1:2181";
try {
for (int i = 0; i < QTY; ++i) {
final int index = i;
Callable<Void> task = new Callable<Void>() {
@Override
public Void call() throws Exception {
CuratorFramework client = CuratorFrameworkFactory.newClient(connectionInfo, new ExponentialBackoffRetry(1000, 3));
try {
client.start();
final InterProcessMutexDemo example = new InterProcessMutexDemo(client, PATH, resource, "Client " + index);
for (int j = 0; j < REPETITIONS; ++j) {
example.doWork(10, TimeUnit.SECONDS);
}
} catch (Throwable e) {
e.printStackTrace();
} finally {
CloseableUtils.closeQuietly(client);
}
return null;
}
};
service.submit(task);
}
service.shutdown();
service.awaitTermination(10, TimeUnit.MINUTES);
} finally {
}
}
}
(2)不可重入共享锁—Shared Lock
- 这个锁和上面的InterProcessMutex相比,就是少了Reentrant的功能,也就意味着它不能在同一个线程中重入,具体实现类为InterProcessSemaphoreMutex
(3)可重入读写锁—Shared Reentrant Read Write Lock
- 类似JDK的ReentrantReadWriteLock。一个读写锁管理一对相关的锁。一个负责读操作,另外一个负责写操作。读操作在写锁没被使用时可同时由多个进程使用,而写锁在使用时不允许读(阻塞)。此锁是可重入的。一个拥有写锁的线程可重入读锁,但是读锁却不能进入写锁。这也意味着写锁可以降级成读锁, 比如请求写锁 --->请求读锁--->释放读锁 ---->释放写锁。从读锁升级成写锁是不行的。可重入读写锁主要由两个类实现:InterProcessReadWriteLock、InterProcessMutex。使用时首先创建一个InterProcessReadWriteLock实例,然后再根据你的需求得到读锁或者写锁,读写锁的类型是InterProcessMutex。
(4)信号量—Shared Semaphore - 一个计数的信号量类似JDK的Semaphore。 JDK中Semaphore维护的一组许可(permits),而Curator中称之为租约(Lease)。 有两种方式可以决定semaphore的最大租约数。第一种方式是用户给定path并且指定最大LeaseSize。第二种方式用户给定path并且使用SharedCountReader类。如果不使用SharedCountReader, 必须保证所有实例在多进程中使用相同的(最大)租约数量,否则有可能出现A进程中的实例持有最大租约数量为10,但是在B进程中持有的最大租约数量为20,此时租约的意义就失效了。
- 这次调用acquire()会返回一个租约对象。 客户端必须在finally中close这些租约对象,否则这些租约会丢失掉。 但是, 但是,如果客户端session由于某种原因比如crash丢掉, 那么这些客户端持有的租约会自动close, 这样其它客户端可以继续使用这些租约。
- 注意你可以一次性请求多个租约,如果Semaphore当前的租约不够,则请求线程会被阻塞。 同时还提供了超时的重载方法。
(5)多共享锁对象 —Multi Shared Lock - Multi Shared Lock是一个锁的容器。 当调用acquire(), 所有的锁都会被acquire(),如果请求失败,所有的锁都会被release。 同样调用release时所有的锁都被release(失败被忽略)。
4、分布式计数器
- 计数器是用来计数的, 利用ZooKeeper可以实现一个集群共享的计数器。 只要使用相同的path就可以得到最新的计数器值, 这是由ZooKeeper的一致性保证的。Curator有两个计数器, 一个是用int来计数(SharedCount),一个用long来计数(DistributedAtomicLong)。
(1)分布式int计数器—SharedCount
- 这个类使用int类型来计数。 主要涉及三个类。SharedCount代表计数器, 可以为它增加一个SharedCountListener,当计数器改变时此Listener可以监听到改变的事件,而SharedCountReader可以读取到最新的值, 包括字面值和带版本信息的值VersionedValue。
SharedCount
SharedCountReader
SharedCountListener
- 注意计数器必须start,使用完之后必须调用close关闭它。
(2)分布式long计数器—DistributedAtomicLong - 再看一个Long类型的计数器。 除了计数的范围比SharedCount大了之外, 它首先尝试使用乐观锁的方式设置计数器, 如果不成功(比如期间计数器已经被其它client更新了), 它使用InterProcessMutex方式来更新计数值。此计数器有一系列的操作:
get(): 获取当前值
increment(): 加一
decrement(): 减一
add(): 增加特定的值
subtract(): 减去特定的值
trySet(): 尝试设置计数值
forceSet(): 强制设置计数值
5、分布式队列
- 基本不会使用zk作为分布式队列使用
- Curator也提供ZK Recipe的分布式队列实现。 利用ZK的 PERSISTENTS_EQUENTIAL节点, 可以保证放入到队列中的项目是按照顺序排队的。 如果单一的消费者从队列中取数据, 那么它是先入先出的,这也是队列的特点。 如果你严格要求顺序,你就的使用单一的消费者,可以使用Leader选举只让Leader作为唯一的消费者。
6、分布式屏障—Barrier
- 分布式Barrier是这样一个类: 它会阻塞所有节点上的等待进程,直到某一个被满足, 然后所有的节点继续进行,先关实现类DistributedBarrier