Elastic job ShardingService 分片服务

2018-08-14  本文已影响26人  pcgreat

ShardingService 是es job 分片服务类 , 这里重点介绍该类一些重要方法

setReshardingFlag
...
/**
* 设置需要重新分片的标记.
*/
public void setReshardingFlag() {
jobNodeStorage.createJobNodeIfNeeded(ShardingNode.NECESSARY);
}
...
如果 job 存在 , 而leader/sharding/necessary 不存在 ,会创建永久节点 , 设置重分片flag (leader/sharding/necessary 节点)

setReshardingFlag 被下图中方法调用

image.png

ShardingListenerManager.ShardingTotalCountChangedJobListener 监听分片数量变更 , 如果变更了 ,则 调用 setReshardingFlag 设置重分片flag

ShardingListenerManager.ListenServersChangedJobListener 在job 没有shutdown情况下 , 监听 zk instances 下 节点 非 NODE_UPDATED 事件变更 ,以及 servers下 节点 事件变更 , 调用 setReshardingFlag 设置重分片flag

SchedulerFacade. registerStartUpInfo(boolean) 实例启动时 设置重分片

ReconcileService runOneIteration 方法 每分钟调度一次 , 判断自己是否leader , 判断 是否分片完成 ,判断分片 所在节点是否离线 (instances临时节点 和 分片节点(分片节点永久)对比),设置重分片

shardingIfNecessary 分片方法

 /**
     * 如果需要分片且当前节点为主节点, 则作业分片.
     * 
     * <p>
     * 如果当前无可用节点则不分片.
     * </p>
     */
    public void shardingIfNecessary() {
        List<JobInstance> availableJobInstances = instanceService.getAvailableJobInstances();
        if (!isNeedSharding() || availableJobInstances.isEmpty()) {
            return;
        }
        if (!leaderService.isLeaderUntilBlock()) {
            blockUntilShardingCompleted();
            return;
        }
        waitingOtherShardingItemCompleted();
        LiteJobConfiguration liteJobConfig = configService.load(false);
        int shardingTotalCount = liteJobConfig.getTypeConfig().getCoreConfig().getShardingTotalCount();
        log.debug("Job '{}' sharding begin.", jobName);
        jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, "");
        resetShardingInfo(shardingTotalCount);
        JobShardingStrategy jobShardingStrategy = JobShardingStrategyFactory.getStrategy(liteJobConfig.getJobShardingStrategyClass());
        jobNodeStorage.executeInTransaction(new PersistShardingInfoTransactionExecutionCallback(jobShardingStrategy.sharding(availableJobInstances, jobName, shardingTotalCount)));
        log.debug("Job '{}' sharding complete.", jobName);
    }

在需要分片(leader/sharding/necessary存在 ),合适instances 不为空 , 非leader 节点 阻塞至分片完成然后返回 ,leader节点 会等待正在处理分片完成 。 会在leader/sharding/processing 创建正在处理节点 ,清理分片对应旧实例 ,有必要会创建新分片或者减少分片 。 根据分片策略,将分片分给实例 ,最后 去掉 leader/sharding/processing 和 leader/sharding/necessary ,唤醒其余实例

shardingIfNecessary被下图中方法调用


image.png

LiteJobFacade getShardingContexts() 方法

    public ShardingContexts getShardingContexts() {
        boolean isFailover = configService.load(true).isFailover();
        if (isFailover) {
            List<Integer> failoverShardingItems = failoverService.getLocalFailoverItems();
            if (!failoverShardingItems.isEmpty()) {
                return executionContextService.getJobShardingContext(failoverShardingItems);
            }
        }
        shardingService.shardingIfNecessary();
        List<Integer> shardingItems = shardingService.getLocalShardingItems();
        if (isFailover) {
            shardingItems.removeAll(failoverService.getLocalTakeOffItems());
        }
        shardingItems.removeAll(executionService.getDisabledItems(shardingItems));
        return executionContextService.getJobShardingContext(shardingItems);
    }

isFailover 失败补偿 ,默认情况是false , 比如说某一实例 某些分片失败了 ,那么 isFailover 为true的情况下 ,会分配给其他存活实例 , 这种情况暂时不分析(线上并没有使用) ,getShardingContexts 会在job 执行时候被调用 , 不考虑 isFailover ,首先 shardingIfNecessary ,然后 remove 掉disable分片 ,返回分片上下文

如下图 192.168.200.151@-@10576 拥有 0 分片 , 新创建个实例,10576实例在执行job时候 会调用 getShardingContexts 方法 , 这时候 0分片 被分配给新的实例192.168.200.151@-@10634

[zk: 127.0.0.1:2181(CONNECTED) 18] get /elastic-job-example-lite-java/javaSimpleJob/sharding/0/instance
192.168.200.151@-@10576

[zk: 127.0.0.1:2181(CONNECTED) 19] get /elastic-job-example-lite-java/javaSimpleJob/sharding/0/instance
192.168.200.151@-@10634

上一篇下一篇

猜你喜欢

热点阅读