【Hazelcast系列十一】分布式计算
11. 分布式计算
在本章中我们重点介绍Hazelcast的四种服务:executor service,durable executor service,scheduled executor service 和 entry processor
11.1. Executor Service
Executor框架是Java中一个非常酷的特性,可以让我们轻松的异步执行任务,比如数据库查询,复杂计算和图像渲染等。ThreadPoolExecutor
是Executor框架的默认实现,它主要为在单JVM内运行而设计。在分布式系统中,ThreadPoolExecutor
可能不是期望的Executor实现,因为提交任务和执行任务的JVM是不同的。Hazelcast提供了IExecutorService
用于在分布式环境中执行异步任务,并且实现了java.util.concurrent.ExecutorService
接口,为需要计算和数据处理能力的应用程序提供服务。
使用IExecutorService
,可以异步执行任务,也可以取消耗时超过预期的任务。因为任务都被分布式执行,因此任务必须支持序列化以支持在不同JVM之间通过网络传输。
Callable 和 Runnable 是Java Executor框架实现任务的两种方式:
- Callable: 有返回值
- Runnable: 无返回值
注意,分布式executor service(IExecutorService
)倾向在存储数据的JVM中执行。一般来说,Java的Runnable或者Callable不能够在客户端执行,因为客户端不一定都是Java应用。同样,客户端不存储任何数据,因此如果需要客户端必须从服务器获取数据。如果期望连接到服务器的部分或者全部客户端可以运行某些程序,可以通过订阅-推送机制实现:包含运行必须的参数的数据发送到一个ITopic
,客户端从ITopic
获取数据并执行处理逻辑。
11.1.1. 实现Callable Task
在Hazelcast中,实现一个类似java.util.concurrent.Callable
的任务需要实现两个接口:Callable和Serializable。
下面是一个计算IList
全部元素之和的任务:
class ListSumTask : Callable<Long>, Serializable, HazelcastInstanceAware {
@Transient
private var instance: HazelcastInstance? = null
override fun setHazelcastInstance(hazelcastInstance: HazelcastInstance) {
this.instance = hazelcastInstance
}
@Throws(Exception::class)
override fun call(): Long? {
val list = instance?.getList<Int>("list")
var result = 0L
list?.forEach(Consumer {
result += it
})
return result
}
}
下面是一个简单的Echo 任务,用来检测集群中的哪个成员执行了任务:
class EchoTask(val input: String) : Callable<String>, Serializable, HazelcastInstanceAware {
@Transient
private var hazelcastInstance: HazelcastInstance? = null
override fun setHazelcastInstance(hazelcastInstance: HazelcastInstance) {
this.hazelcastInstance = hazelcastInstance
}
override fun call(): String {
return hazelcastInstance!!.cluster.localMember.toString() + ":" + input
}
}
执行 Callable Task
在Hazelcast中执行一个 callable task 的执行流程如下:
- 从
HazelcastInstance
获取 Executor。 - 提交task并拿到返回值
Future
- 当任务执行完成,使用
Future
对象获取结果;如果任务未执行完成,执行get()
方法会阻塞当前线程。
任务执行过程中不影响程序执行其他任务或逻辑,程序无需显式等待任务执行完成。
下面是执行EchoTask的代码
val instance = Hazelcast.newHazelcastInstance()
val executor = instance.getExecutorService("executor")
val result = executor.submit(EchoTask("echo input"))
println(result.get())
11.1.2. 实现 Runnable Task
和Callable Task类似,Hazelcast中的 Runnable Task 需要实现两个接口:Runnable和Serializable。
一个每秒打印当前时间的Runnable Task:
class TickerTask : Runnable, Serializable {
override fun run() {
while (true) {
println(LocalDateTime.now())
TimeUnit.SECONDS.sleep(1)
}
}
}
执行 Runnable Task
在Hazelcast中执行Runnable Task只需要以下两步:
- 从
HazelcastInstance
获取 Executor - 将task提交到Executor
fun main() = runBlocking<Unit> {
val instance = Hazelcast.newHazelcastInstance()
val executor = instance.getExecutorService("executor")
executor.submit(TickerTask())
}
11.1.3. 扩容 Executor Service
Executor service支持水平和垂直扩容。
垂直扩容:
- pool-size
- CPU
- 内存
水平扩容:
- 增加Hazelcast的数量。
11.1.4. Executing Code in the Cluster
Hazelcast的 executor service 是 java.util.concurrent.ExecutorService
的一个分布式实现,允许在Hazelcast集群执行代码。下面通过代码样例来分别说明四种场景:在指定的member执行;在拥有key的member执行;随机选择一个member执行;在所有member执行。假设我们的Hazelcast集群由两个member组成:
1、在指定member执行:
fun main() = runBlocking<Unit> {
val instance = Hazelcast.newHazelcastInstance()
val executor = instance.getExecutorService("default")
var result: Future<String>? = null
instance.cluster.members.forEach {
if (it != instance.cluster.localMember) {
println("begin to submit task")
result = executor.submitToMember(EchoTask("test"), it)
}
}
println(result?.get())
}
2、在拥有指定key的member执行:
fun main() = runBlocking<Unit> {
val instance = Hazelcast.newHazelcastInstance()
val executor = instance.getExecutorService("default")
executor.submitToKeyOwner(EchoTask("token"),"token")
}
3、随机选择一个member执行
fun main() = runBlocking<Unit> {
val instance = Hazelcast.newHazelcastInstance()
val executor = instance.getExecutorService("default")
executor.submit(EchoTask("token"))
}
4、在所有member执行
fun main() = runBlocking<Unit> {
val instance = Hazelcast.newHazelcastInstance()
val executor = instance.getExecutorService("default")
executor.submitToAllMembers(EchoTask("token"))
}
11.1.5. 取消任务
如果一个任务的执行时间超过预期,可以通过停止或取消任务来释放资源。在Hazelcast中可以使用标准的Java API cancel()
来停止或取消任务。
class TimeCostTask : Callable<String>, Serializable {
override fun call(): String {
TimeUnit.SECONDS.sleep(2)
return "I'm return now"
}
}
fun main() = runBlocking<Unit> {
val instance = Hazelcast.newHazelcastInstance()
val executor = instance.getExecutorService("default")
val result = executor.submit(TimeCostTask())
delay(1000L)
result.cancel(true)
println(result.isCancelled) // 打印true
}
11.1.6. 任务回调
可以使用Hazelcast提供的ExecutionCallback
实现在任务完成时的异步回调。任务完成且没有错误时回调,需要实现onResponse
方法,任务完成但是有错误回调,需要实现onFailure
方法。
class CallbackTask : Callable<String>, Serializable {
override fun call(): String {
TimeUnit.SECONDS.sleep(10)
return "callback"
}
}
class TaskCallback : ExecutionCallback<String> {
override fun onFailure(t: Throwable?) {
println("failure ${t?.printStackTrace()}")
}
override fun onResponse(response: String?) {
println("response: $response")
}
}
fun main() = runBlocking<Unit> {
val instance = Hazelcast.newHazelcastInstance()
val executor = instance.getExecutorService("default")
executor.submit(CallbackTask(), TaskCallback())
}
11.1.7. 为Task选择members
如果想控制执行任务的member,可以使用MemberSelector
接口,该接口的select(Member)
方法会在每一个member上执行已确认当前member是否可以执行任务。
class SelfMemberSelector : MemberSelector {
override fun select(member: Member?): Boolean {
return true == member?.getBooleanAttribute("executor")
}
}
Hazelcast提供了四个默认的MemberSelector
实例:DATA_MEMBER_SELECTOR
,LITE_MEMBER_SELECTOR
,LOCAL_MEMBER_SELECTOR
和 NON_LOCAL_MEMBER_SELECTOR
。
11.1.8. 配置 Executor Service
Hazelcast支持对线程池大小,任务队列大小,是否开启统计和脑裂保护进行配置,下面是几种不同的配置方法:
声明式配置:
<hazelcast>
...
<executor-service name="exec">
<pool-size>1</pool-size>
<queue-capacity>10</queue-capacity>
<statistics-enabled>true</statistics-enabled>
<split-brain-protection-ref>splitbrainprotection-name</split-brain-protection-ref>
</executor-service>
...
</hazelcast>
代码配置:
Config config = new Config();
ExecutorConfig executorConfig = config.getExecutorConfig("exec");
executorConfig.setPoolSize( 1 ).setQueueCapacity( 10 )
.setStatisticsEnabled( true )
.setSplitBrainProtectionName( "splitbrainprotectionname" );
11.1.9. IExecutorService 脑裂保护
IExecutorService
可以配置在执行操作前检查集群最小可用的member数量,避免在网络分区条件下操作在集群的多数成员成功(实际上数据不可用)。
下面是受脑裂保护的操作:
execute
executeOnAllMembers
executeOnKeyOwner
executeOnMember
executeOnMembers
shutdown
shutdownNow
submit
submitToAllMembers
submitToKeyOwner
submitToMember
submitToMembers
11.2. Durable Executor Service
durable executor service是一种将任务存储在执行member及备份member,保证在member或任务提交者故障的场景中任务不丢失的数据结构。
使用durable executor service 执行任务的步骤:
- 将任务发送到主分区member及备份member,然后执行任务。
- 获取任务执行结果。
下面是durable executor service的执行示意图
Hazelcast的executor service返回客户端一个代表任务的Future
,durable executor service 执行任务的两个步骤保证了在返回Future
之前任务已经被执行,并且可以使用一个唯一的ID跟踪已提交任务的结果。Hazelcast将任务存储在主备member之后然后执行任务。
第一次调用,Hazelcast使用Ringbuffer存储任务并生成一个任务对应的序列返回调用端。通过这种方式,任务可以应对成员失败的情况,并且可以使用ID跟踪任务。第一次调用完成以后,任务的序列就会返回给客户端,第二次调用是从序列中获取任务的执行结果。如果结果已经可用,则将立即返回,否则将一直等待直到收到队列通知。任务执行完成,Ringbuffer会用任务的结果替换序列中的任务,并通知等待在序列上的操作。
11.2.1. 配置Durable Executor Service
声明式配置:
<hazelcast>
...
<durable-executor-service name="myDurableExecSvc">
<pool-size>8</pool-size>
<durability>1</durability>
<capacity>1</capacity>
<split-brain-protection-ref>splitbrainprotection-name</split-brain-protection-ref>
</durable-executor-service>
...
</hazelcast>
代码配置:
val config = Config()
val durableExecutorConfig = config.getDurableExecutorConfig("durable")
durableExecutorConfig.poolSize = 10
durableExecutorConfig.capacity = 1024
durableExecutorConfig.durability = 1
durableExecutorConfig.splitBrainProtectionName = "brain"
config.durableExecutorConfigs = hashMapOf("durable" to durableExecutorConfig)
val instance = Hazelcast.newHazelcastInstance(config);
val scheduler = instance.getDurableExecutorService("durable")
配置各参数的含义如下:
-
name
:executor的名字。 -
pool-size
:executor线程池大小。 -
durability
:任务备份数,默认值1。 -
capacity
:Executor 的队列大小 -
split-brain-protection-ref
:脑裂保护配置的名字
11.2.2. 脑裂保护
支持脑裂保护的操作:
- WRITE, READ_WRITE:
disposeResult
execute
executeOnKeyOwner
retrieveAndDisposeResult
shutdown
shutdownNow
submit
submitToKeyOwner
- READ, READ_WRITE:
retrieveResult
11.3. Scheduled Executor Service
Hazelcast的scheduled executor service(IScheduledExecutorService
)是部分实现了java.util.concurrent.ScheduledExecutorService
接口的数据结构。部分实现指的是IScheduledExecutorService
仅支持以固定速率执行任务而不支持以固定间隔执行任务。
IScheduledExecutorService
除去支持通用的调度方法,还支持以下额外的方法:
-
scheduleOnMember
: 在一个特定的member执行。 -
scheduleOnKeyOwner
: 拥有key的member执行。 -
scheduleOnAllMembers
: 在集群所有member执行。 -
scheduleOnAllMembers
: 在所有给定的member上执行。
有两种可用模式:
-
可用分区调度。任务存储在主分区和N个备份分区(备份分区数量可配置)。在member故障场景,会有一个或多个备份接管任务。在一个member故障的情况下,任务会被重新分配到主分区的owner,这可能会导致任务延迟。
-
可用member调度,任务只存储在member上,如果member故障任务会丢失。
为每个任务设置唯一标识符或名字是实现任务可靠性的必要条件,首先名字或者ID可以避免任务重复,其次在发起任务的机器或者客户端故障的条件下依然可以通过名字或ID获取任务。实现NamedTask
可以自定义任务名,如果未定义则使用一个随机的UUID作为名字。通过返回的IScheduleFuture
可以获取任务的handler和任务的运行时统计信息。
每一个任务都有一个Future与之关联,任务完成可以调用dispose()
方法释放任务占用的资源。如果任务以固定的速率执行,dispose()
方法可以取消即将执行的任务。任务handler是一个用来存储scheduled future信息的描述类,使用这些信息可以定位集群中的任务。handler包括任务的名字,任务的所有者和scheduler的名字。
任务被调度执行以后handler就一直处于可用状态,并可以以字符串的形式存储(ScheduledTaskHandler.toUrn()
)并可以从字符串重建handler(ScheduledTaskHandler.of()
)。如果handler丢失,可以使用方法获取所有的任务。
和executor service 一样,scheduled executor service支持有状态任务的调度。有状态任务是在运行时需要任何状态的任务,考虑到分区有可能丢失,任务状态必须与任务一起持久化。可以通过实现StatefulTask
接口来创建一个有状态任务,实现还需要提供存储和加载状态的方法。如果分区丢失,任务被重新调度并在执行之前需要重新加载之前存储的状态。
11.3.1. 配置 Scheduled Executor Service
声明式配置:
<hazelcast>
...
<scheduled-executor-service name="myScheduledExecSvc">
<pool-size>16</pool-size>
<durability>1</durability>
<capacity>100</capacity>
<split-brain-protection-ref>splitbrainprotection-name</split-brain-protection-ref>
</scheduled-executor-service>
...
</hazelcast>
配置各参数的含义如下:
-
name
:executor的名字。 -
pool-size
:executor线程池大小。 -
durability
:executor的备份数。 -
capacity
:Executor 的队列大小 -
split-brain-protection-ref
:脑裂保护配置的名字
11.3.2. 脑裂保护
IScheduledExecutorService 下面的方法支持脑裂保护:
- WRITE, READ_WRITE:
schedule
scheduleAtFixedRate
scheduleOnAllMembers
scheduleOnAllMembersAtFixedRate
scheduleOnKeyOwner
scheduleOnKeyOwnerAtFixedRate
scheduleOnMember
scheduleOnMemberAtFixedRate
scheduleOnMembers
scheduleOnMembersAtFixedRate
shutdown
- READ, READ_WRITE:
getAllScheduledFutures
11.4. Entry Processor
Hazelcast支持entry处理。entry processor是在map entry上原子执行函数的一种方法。
entry processor是在IMap
上执行批量处理的一种好的选择。通常的处理方法是使用IMap.get(key)
获取值修改后再调用IMap.put(key,value)
写回IMap
。如果上述的执行过程在客户端或在没有key的member中,需要两次网络请求:获取值和修改值并写回IMap
。
如果你正在执行上面的逻辑,那么你应该考虑一下使用entry processor。entry processor在存储数据的member上执行数据的读写操作,这避免了上面不必要的两次网络请求。
11.4.1. 内存中更快的Map操作
entry processor可以内存中执行更快的map操作而无需担心并发问题。entry processor可以处理单个entry也可以处理所有的entry。entry processor选择entry时支持使用谓词。受益于hazelcast隔离的线程模型(使用分区线程执行处理),使用entry processor无需现实的锁操作,Hazelcast将entry processor分发到各个成员执行,因此增加成员会提供处理效率。
使用索引
entry processor可以和谓词一起使用,谓词帮助processor选取数据的一个子集进行处理,选取子集可以通过全表扫描也可以通过索引来完成。为了加速算则的过程,可以考虑使用索引,如果map已经创建索引,entry processor 会自动使用索引。
使用OBJECT内存格式
如果entry processor是map的主要操作而且map包含复杂对象,应该考虑使用OJBECT的内存格式来减少序列化开销。map默认使用BINARY格式存储值。如果以OBJECT格式存储,entry processor可以直接处理,这种场景下没有序列化和反序列化开销。如果有Entry监听器,entry的值还是需要序列化后发送给事件发布服务。
处理Entries
IMap
接口提供了以下方法处理entry:
executeOnKey
executeOnKeys
submitToKey
executeOnEntries
executeOnEntries
考虑单key锁
entry processor在处理单个key时会使用锁,下面两个方法是用于处理单个key的方法:
<R> R executeOnKey(K key, EntryProcessor<K, V, R> entryProcessor);
<R> CompletionStage<R> submitToKey(K key, EntryProcessor<K, V, R> entryProcessor);
处理备份Entries
如果代码修改了数据,备份数据也需要修改。这可以减少主备数据不一致。通常情况下这是非常简单的,entry processor会在主备节点执行。如果希望在备执行不同的处理逻辑,需要覆写getBackupProcessor
方法。方法返回一个在备节点执行的EntryProcessor
实例。如果只是读去entry,方法可以返回null,以表示在备节点不执行任何操作。
11.4.2. 创建Entry Processor
创建一个Entry Processor需要实现EntryProcessor
接口,主备entry都会执行process()
class SimpleEntryProcessor : EntryProcessor<String, Int, Int> {
override fun process(p0: MutableMap.MutableEntry<String, Int>?): Int? {
println("old value is ${p0?.value}")
p0?.setValue(p0.value + 1)
return p0?.value
}
}
使用示例:
fun main() = runBlocking<Unit> {
val config = Config()
val mapConfig = config.getMapConfig("processor")
mapConfig.backupCount = 1;
val instance = Hazelcast.newHazelcastInstance(config)
val map = instance.getMap<String, Int>("processor")
for (i in 1..10) {
map["key$i"] = i
}
map.executeOnEntries(SimpleEntryProcessor())
}
11.4.3. Entry Processor 性能优化
默认一个分区线程负责执行entry processor,一个分区线程负责处理一个或多个分区。entry processor是以假定用户的代码可以在process()
方法内快速执行为前提进行设计。异常情况下process()
方法内的代码非常重执行耗时长,这可能成为处理entry的瓶颈。
Hazelcast提供了慢代码检测器,并可以根据下面的配置记录告警日志:
-
hazelcast.slow.operation.detector.enabled
(默认值true) -
hazelcast.slow.operation.detector.threshold.millis
(默认值: 10000)
Hazelcast默认只检测执行极其缓慢的processor,但是在开发过程中应该将值设置的更低以检测在生成环境中可能成为瓶颈的processor。下面是Hazelcast提供的优化建议:
-
Offloadable
使用executor线程而不是分区线程执行。 -
ReadOnly
避免获取key的锁
对于Hazelcast IMDG 3.9版本来说,上面的优化仅适用于IMap
的以下方法:
executeOnKey(Object, EntryProcessor)
submitToKey(Object, EntryProcessor)
submitToKey(Object, EntryProcessor, ExecutionCallback)
Offloadable Entry Processor
如果一个entry processor实现了Offloadable()
接口,process()
方法的执行将有getExecutorName()
获取的executor执行。
Offloading不阻塞分区线程,并可以让用户从更高的吞吐量受益。在处理期间key将被锁住以避免写冲突。在这种情况下,线程之间的合作关系如下:
- partition thread 获取entry并锁住key。
- execution thread 执行processor的
process()
方法。 - partition thread 更新值并解锁ey,如果值没有更新则只解锁key。
Hazelcast已经提供了Offloadable
接口的两个实现:
- NO_OFFLOADING: 和没有实现
Offloadable
接口一样。 - OFFLOADABLE_EXECUTOR: 使用默认的
ExecutionService.OFFLOADABLE_EXECUTOR
。
如果 getExecutorName()
无法找到executor,将会使用默认的executor。下面是default executor的配置信息:
<hazelcast>
...
<executor-service name="default">
<pool-size>16</pool-size>
<queue-capacity>0</queue-capacity>
</executor-service>
...
</hazelcast>
一个名为OffloadedProcessor
的配置如下:
<hazelcast>
...
<executor-service name="OffloadedInventoryEntryProcessor”>
<pool-size>30</pool-size>
<queue-capacity>0</queue-capacity>
</executor-service>
...
</hazelcast>
只读 Entry Processor
默认如果一个key上有锁entry processor则不会执行,直到key上的锁被释放为止。如果一个entry processor实现了Readonly
接口但是并未实现Offloadable
接口,entry的处理不会委托给其他executor。但是processor也不会感知key上是否有锁,也不会尝试获取key的锁,因为entry processor不会尝试做任何修改。
如果一个entry processor实现了ReadOnly
接口并尝试修改entry,会抛出 UnsupportedOperationException
异常。
ReadOnly and Offloadable Entry Processor
像上面的优化建议那样,如果一个entry processor实现了ReadOnly
和Offloadable
两个接口,processor的process()
方法会在自定义的executor中执行,而且不感知key上的锁也不尝试获取锁。
这种情况线程之间的合作关系如下:
- partition thread 获取entry
- execution thread 处理entry
这种情况下EntryProcessor.getBackupProcessor()
比如返回null否则 IllegalArgumentException
异常将会被抛出。修改entry会抛出 UnsupportedOperationException
异常。
class SimpleEntryProcessor : EntryProcessor<String, String, String>, ReadOnly, Offloadable {
override fun process(p0: MutableMap.MutableEntry<String, String>?): String {
return p0?.key + p0?.value
}
override fun getExecutorName(): String {
return Offloadable.OFFLOADABLE_EXECUTOR
}
override fun getBackupProcessor(): EntryProcessor<String, String, String>? {
return null
}
}