xxl-job架构分析

2021-04-18  本文已影响0人  丑人林宗己

近期花了一些时间翻了xxl-job的源码,稍作分析,希望能从如此成熟的框架中洞悉一些分布式任务调度的本质。

本文的行文包括如下几点:

核心设计图

image.png

服务端设计

XxlJobScheduler中可以窥视服务端设计的最核心的内容,包括如下:

触发器

设计上分为两个线程池,fastTriggerPoolslowTriggerPool,二者唯一的不同就是blockqueue队列的大小不一样,fastTriggerPool是1000,slowTriggerPool是2000

public void addTrigger(final int jobId,
                           final TriggerTypeEnum triggerType,
                           final int failRetryCount,
                           final String executorShardingParam,
                           final String executorParam,
                           final String addressList) {

    // choose thread pool
    ThreadPoolExecutor triggerPool_ = fastTriggerPool;
    AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
    if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) {      // job-timeout 10 times in 1 min
        triggerPool_ = slowTriggerPool;
    }
    // ....
}

注释中提示,当一分钟内任务执行发生timeout的次数超过10次,任务将会被投入慢触发器线程池,但是此处提及的timeout可不是指http执行的超时时间,指的是任务执行的时间超过500ms。

long minTim_now = System.currentTimeMillis()/60000;
if (minTim != minTim_now) {
    minTim = minTim_now;
    jobTimeoutCountMap.clear();
}

// incr timeout-count-map
long cost = System.currentTimeMillis()-start;
if (cost > 500) {       // ob-timeout threshold 500ms
    AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));
    if (timeoutCount != null) {
        timeoutCount.incrementAndGet();
    }
}

每次任务执行完成,将会判断当前时间是否等于上一次执行的最小时间(通过当前时间戳除以60000来粗糙的表示1分钟的间距)
当任务执行时间超过500ms,则记录超时次数。(500ms还不是可以配置的参数,不得不说有点粗糙……)

服务注册与注销

registryOrRemoveThreadPool线程池是用来处理接收来自客户端的注册与注销的处理任务,比较特殊的是线程池的拒绝策略是r.run(),意味着将会由客户端请求的线程池来完成执行。(xxl-job相当多的线程池都是使用该策略)

registryMonitorThread是定时任务,执行时间间隔为30s,主要工作如下:

// auto registry group
List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);
if (groupList!=null && !groupList.isEmpty()) {

  // remove dead address (admin/executor)
  List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());
  if (ids!=null && ids.size()>0) {
    XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
  }

  // fresh online address (admin/executor)
  HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
  List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());
  if (list != null) {
    //....
  }

  // fresh group address
  // ....
}

任务执行异常补偿机制

monitorThread是定时任务,执行时间间隔为10s,主要工作如下:

告警状态:0-默认、-1=锁定状态、1-无需告警、2-告警成功、3-告警失败

List<Long> failLogIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findFailJobLogIds(1000);
if (failLogIds!=null && !failLogIds.isEmpty()) {
  for (long failLogId: failLogIds) {

    // lock log
    int lockRet = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, 0, -1);
    if (lockRet < 1) {
      continue;
    }
    XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(failLogId);
    XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId());

    // 1、fail retry monitor
    if (log.getExecutorFailRetryCount() > 0) {
      JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), log.getExecutorParam(), null);
      String retryMsg = "<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_type_retry") +"<<<<<<<<<<< </span><br>";
      log.setTriggerMsg(log.getTriggerMsg() + retryMsg);
      XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log);
    }

    // 2、fail alarm monitor
    int newAlarmStatus = 0;   // 告警状态:0-默认、-1=锁定状态、1-无需告警、2-告警成功、3-告警失败
    if (info!=null && info.getAlarmEmail()!=null && info.getAlarmEmail().trim().length()>0) {
      boolean alarmResult = XxlJobAdminConfig.getAdminConfig().getJobAlarmer().alarm(info, log);
      newAlarmStatus = alarmResult?2:3;
    } else {
      newAlarmStatus = 1;
    }

    XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, -1, newAlarmStatus);
  }
}

客户端异常下线处理、回调处理

callbackThreadPool线程池用来处理接收来自客户端的回调处理。拒绝策略也是r.run()

monitorThread,是定时任务,时间间隔为60s,主要工作如下:

// 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败;
Date losedTime = DateUtil.addMinutes(new Date(), -10);
List<Long> losedJobIds  = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime);
if (losedJobIds!=null && losedJobIds.size()>0) {
  for (Long logId: losedJobIds) {
    XxlJobLog jobLog = new XxlJobLog();
    jobLog.setId(logId);
    jobLog.setHandleTime(new Date());
    jobLog.setHandleCode(ReturnT.FAIL_CODE);
    jobLog.setHandleMsg( I18nUtil.getString("joblog_lost_fail") );

    XxlJobCompleter.updateHandleInfoAndFinish(jobLog);
  }
}

任务执行统计报表

统计报表的核心代码如下:

Date todayTo = itemDay.getTime();

// refresh log-report every minute
XxlJobLogReport xxlJobLogReport = new XxlJobLogReport();
xxlJobLogReport.setTriggerDay(todayFrom);
xxlJobLogReport.setRunningCount(0);
xxlJobLogReport.setSucCount(0);
xxlJobLogReport.setFailCount(0);

Map<String, Object> triggerCountMap = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLogReport(todayFrom, todayTo);
if (triggerCountMap!=null && triggerCountMap.size()>0) {
    int triggerDayCount = triggerCountMap.containsKey("triggerDayCount")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCount"))):0;
    int triggerDayCountRunning = triggerCountMap.containsKey("triggerDayCountRunning")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCountRunning"))):0;
    int triggerDayCountSuc = triggerCountMap.containsKey("triggerDayCountSuc")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCountSuc"))):0;
    int triggerDayCountFail = triggerDayCount - triggerDayCountRunning - triggerDayCountSuc;

    xxlJobLogReport.setRunningCount(triggerDayCountRunning);
    xxlJobLogReport.setSucCount(triggerDayCountSuc);
    xxlJobLogReport.setFailCount(triggerDayCountFail);
}

// do refresh
int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().update(xxlJobLogReport);
if (ret < 1) {
    XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().save(xxlJobLogReport);
}

任务执行定时任务(生成任务下一次执行时间)

scheduleThread是个定时任务,时间间隔是动态计算的,这是服务端最核心的部分功能。

@Override
public void run() {

    try {
        TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 ); // 秒数为0, 5, 10.....
    } catch (InterruptedException e) {
        //....
    }
    logger.info(">>>>>>>>> init xxl-job admin scheduler success.");

    // pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)
    // 默认为(200 + 100) * 20 = 60000
    int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;

    while (!scheduleThreadToStop) {

        // Scan Job
        long start = System.currentTimeMillis();
        // 获取数据库资源
        boolean preReadSuc = true;
        try {

            // ...
            // 争取到锁权限(争取不到的节点,将会在这里阻塞,比如当部署多个节点时)
            preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
            preparedStatement.execute();

            // tx start

            // 1、pre read
            long nowTime = System.currentTimeMillis();
            // 查询下一次执行时间小于当前时间的5s后,并查默认6000条
            List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
            if (scheduleList!=null && scheduleList.size()>0) {
                // 2、push time-ring
                for (XxlJobInfo jobInfo: scheduleList) {

                    // time-ring jump
                    // 当前时间 > 下一次触发时间 + 5s
                    if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
                        // 2.1、trigger-expire > 5s:pass && make next-trigger-time
                        logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());

                        // 1、misfire match
                        // 过期调度策略
                        MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);
                        if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {
                            // FIRE_ONCE_NOW 》 trigger
                            JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);
                            logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
                        }

                        // 2、fresh next
                        // 刷新下次执行的时间
                        // 如果计算不到下次执行时间,将会停止任务
                        refreshNextValidTime(jobInfo, new Date());

                    } else if (nowTime > jobInfo.getTriggerNextTime()) {
                        // 当前时间稍大于下一次执行时间(5s内)
                        // 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time

                        // 1、trigger
                        JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
                        logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );

                        // 2、fresh next
                        refreshNextValidTime(jobInfo, new Date());
                        // 如果下次触发的时间间隔在5s内,则写入ring线程,由时间轮线程处理
                        // next-trigger-time in 5s, pre-read again
                        if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {
                            // 计算下一次触发事件将会掉落在时间轮的哪一格上
                            // 1、make ring second
                            int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);//

                            // 2、push time ring
                            pushTimeRing(ringSecond, jobInfo.getId());

                            // 3、fresh next
                            refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));

                        }

                    } else {
                        // 下次触发的时间在当前时间之后,计算本次触发的时间将会掉落在时间轮的哪一个格上
                        // 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time

                        // 1、make ring second
                        int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);

                        // 2、push time ring
                        pushTimeRing(ringSecond, jobInfo.getId());

                        // 3、fresh next
                        refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));

                    }

                }

                // 3、update trigger info
                for (XxlJobInfo jobInfo: scheduleList) {
                    XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
                }

            } else {
                preReadSuc = false; // 没有可执行任务
            }

            // tx stop


        } catch (Exception e) {
            // ....
        } finally {
            // 关闭资源
        }
        long cost = System.currentTimeMillis()-start;


        // Wait seconds, align second
        if (cost < 1000) {  // scan-overtime, not wait
            try {
                // pre-read period: success > scan each second; fail > skip this period;
                // 5s内没有可执行的任务,则沉睡到下一个零整5s,比如0,5,10....
                // 如果有则沉睡到下一个零整1s, 1,2,3,4....
                TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);
            } catch (InterruptedException e) {
                if (!scheduleThreadToStop) {
                    logger.error(e.getMessage(), e);
                }
            }
        }

    }

    logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");
}

ring thread是一个时间轮设计,最高为60格,内部是一个map,key为秒数,value为待执行任务ID列表。

public void run() {

    while (!ringThreadToStop) {

        // align second
        try {
            TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000); // 对齐到下一秒
        } catch (InterruptedException e) {
            if (!ringThreadToStop) {
                logger.error(e.getMessage(), e);
            }
        }

        try {
            // second data
            List<Integer> ringItemData = new ArrayList<>();
            int nowSecond = Calendar.getInstance().get(Calendar.SECOND);   // 避免处理耗时太长,跨过刻度,向前校验一个刻度;
            for (int i = 0; i < 2; i++) {
                List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 ); // 将当前刻度+上一个刻度。距离当前是4s, i=0则为4, i1则为3
                if (tmpData != null) {
                    ringItemData.addAll(tmpData);
                }
            }

            // ring trigger
            logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
            if (ringItemData.size() > 0) {
                // do trigger
                for (int jobId: ringItemData) {
                    // do trigger
                    JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
                }
                // clear
                ringItemData.clear();
            }
        } catch (Exception e) {
            if (!ringThreadToStop) {
                logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
            }
        }
    }
    logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
}

小结:
由此服务端的核心设计基本就算拆解完了,最核心的部分是任务执行定时任务,内部实现了秒级的时间轮算法(有点粗糙....)。总的架构设计并不算简单,由于没有外部框架依赖,需要自己实现服务注册/服务注销/服务掉线检查等等的功能,所以总体上又可以将架构分为:服务治理与服务调度两个大块去看。服务治理主要作用于维系服务端与客户端两个角色的连接状态。服务调度则是xxl-job的核心功能。

客户端设计

客户端的肯定不如服务端复杂,基本上就几个点:

其中重点关注的点在于客户端服务器,以及服务注册与注销

客户端服务器

EmbedServer是一个基于Netty构建的Http服务器,默认端口号为9000

// start server
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
        .channel(NioServerSocketChannel.class)
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel channel) throws Exception {
                channel.pipeline()
                        .addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS))  // beat 3N, close if idle
                        .addLast(new HttpServerCodec())
                        .addLast(new HttpObjectAggregator(5 * 1024 * 1024))  // merge request & reponse to FULL
                        .addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));// http 处理器
            }
        })
        .childOption(ChannelOption.SO_KEEPALIVE, true);

// bind
ChannelFuture future = bootstrap.bind(port).sync();

logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);

// start registry
startRegistry(appname, address); // 服务注册

// wait util stop
future.channel().closeFuture().sync();

EmbedHttpServerHandler处理来自服务端的Http请求,主要包括心跳空闲线程检查执行停止任务执行获取日志.

private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {

    // valid 仅支持post
    if (HttpMethod.POST != httpMethod) {
        return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");
    }
    if (uri==null || uri.trim().length()==0) {
        return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");
    }
    // 检验token 
    if (accessToken!=null
            && accessToken.trim().length()>0
            && !accessToken.equals(accessTokenReq)) {
        return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
    }

    // services mapping
    try {
        if ("/beat".equals(uri)) {
            return executorBiz.beat();
        } else if ("/idleBeat".equals(uri)) {
            IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
            return executorBiz.idleBeat(idleBeatParam);// 检查线程是否在运行,队列中是否有数据
        } else if ("/run".equals(uri)) {
            TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
            return executorBiz.run(triggerParam);
        } else if ("/kill".equals(uri)) {// 停止客户端线程
            KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
            return executorBiz.kill(killParam);
        } else if ("/log".equals(uri)) {
            LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
            return executorBiz.log(logParam);
        } else {
            return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");
        }
    } catch (Exception e) {
       // ....
    }
}

重点关注run事件,它主要是找到相应的IHandler,构建JobThread,推到线程队列中等线程处理。

@Override
public ReturnT<String> run(TriggerParam triggerParam) {
    // load old:jobHandler + jobThread
    JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
    IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
    String removeOldReason = null;

    // valid:jobHandler + jobThread
    GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
    if (GlueTypeEnum.BEAN == glueTypeEnum) {

        // new jobhandler 
        IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());

        // valid old jobThread
        if (jobThread!=null && jobHandler != newJobHandler) {
            // change handler, need kill old thread
            removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";

            jobThread = null;
            jobHandler = null;
        }

        // valid handler
        if (jobHandler == null) {
            jobHandler = newJobHandler;
            if (jobHandler == null) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");
            }
        }

    } else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {
    //...
    } else if (glueTypeEnum!=null && glueTypeEnum.isScript()) {
    // ....
    } else {
        return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");
    }

    // executor block strategy
    if (jobThread != null) { // 根据配置的阻塞策略来执行
        ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
        if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) { // 丢弃后续调度
            // discard when running
            if (jobThread.isRunningOrHasQueue()) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
            }
        } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) { // 覆盖之前调度
            // kill running jobThread
            if (jobThread.isRunningOrHasQueue()) {
                removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();

                jobThread = null;
            }
        } else { // 单机串行
            // just queue trigger
        }
    }

    // replace thread (new or exists invalid)
    if (jobThread == null) {
        jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason); // 生成该handler的执行线程并注册到注册表中,启动该线程
    }

    // push data to queue
    ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam); // 放入阻塞队列,线程会对阻塞队列的数据进行处理
    return pushResult;
}

job thread的处理逻辑,具体如下:


// init
try {
    handler.init(); // ihandler有init方法支持
} catch (Throwable e) {
    logger.error(e.getMessage(), e);
}

// execute
while(!toStop){
    running = false;
    idleTimes++; // 空闲次数

    TriggerParam triggerParam = null;
    try {
        // to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)
        triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS); // 阻塞等待3s
        if (triggerParam!=null) {
            running = true;
            idleTimes = 0; // 空闲次数清零
            triggerLogIdSet.remove(triggerParam.getLogId());

            // log filename, like "logPath/yyyy-MM-dd/9999.log"
            String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId());
            // 上下文环境
            XxlJobContext xxlJobContext = new XxlJobContext(
                    triggerParam.getJobId(),
                    triggerParam.getExecutorParams(),
                    logFileName,
                    triggerParam.getBroadcastIndex(),
                    triggerParam.getBroadcastTotal());

            // init job context
            XxlJobContext.setXxlJobContext(xxlJobContext);

            // execute
            XxlJobHelper.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + xxlJobContext.getJobParam());
            // 设置了执行超时时间,则采用异步超时等待执行的方式
            if (triggerParam.getExecutorTimeout() > 0) {
                // limit timeout
                Thread futureThread = null;
                try {
                    FutureTask<Boolean> futureTask = new FutureTask<Boolean>(new Callable<Boolean>() {
                        @Override
                        public Boolean call() throws Exception {

                            // init job context
                            XxlJobContext.setXxlJobContext(xxlJobContext);

                            handler.execute(); // 执行
                            return true;
                        }
                    });
                    futureThread = new Thread(futureTask);
                    futureThread.start();

                    Boolean tempResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);// 超时时间的单位为秒
                } catch (TimeoutException e) {
                    // ...
                    // handle result
                    XxlJobHelper.handleTimeout("job execute timeout "); // 处理超时
                } finally {
                    futureThread.interrupt();
                }
            } else {
                // just execute
                handler.execute(); // 直接执行
            }

            // ....
        } else {
            if (idleTimes > 30) { // 当空闲次数超过30次
                if(triggerQueue.size() == 0) {  // avoid concurrent trigger causes jobId-lost 等待执行队列中没有数据
                    XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit."); // 移除该线程
                }
            }
        }
    } catch (Throwable e) {
        // 处理异常
    } finally {
        if(triggerParam != null) {
            // callback handler info
            if (!toStop) {
                // commonm 写入回调队列,等待回调线程上报处理结果
                TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
                        triggerParam.getLogId(),
                        triggerParam.getLogDateTime(),
                        XxlJobContext.getXxlJobContext().getHandleCode(),
                        XxlJobContext.getXxlJobContext().getHandleMsg() )
                );
            } else {
                // is killed 线程已暂停,可以kill,由服务端来kill
                TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
                        triggerParam.getLogId(),
                        triggerParam.getLogDateTime(),
                        XxlJobContext.HANDLE_COCE_FAIL,
                        stopReason + " [job running, killed]" )
                );
            }
        }
    }
}

// 线程停止后,队列中有数据,写入回调队列,等待kill
// callback trigger request in queue
while(triggerQueue !=null && triggerQueue.size()>0){
    TriggerParam triggerParam = triggerQueue.poll();
    if (triggerParam!=null) {
        // is killed 
        TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
                triggerParam.getLogId(),
                triggerParam.getLogDateTime(),
                XxlJobContext.HANDLE_COCE_FAIL,
                stopReason + " [job not executed, in the job queue, killed.]")
        );
    }
}

// destroy
try {
    handler.destroy(); // 线程停止时销毁回调,ihandler有destroy支持
} catch (Throwable e) {
    logger.error(e.getMessage(), e);
}

小结:至此,客户端的执行逻辑基本分析完成了。其中设计点主要为:每一个@XXL的方法都将生成一个对应的线程来处理服务端的调度。并且当线程的空闲次数超过30次,每次3s,总共为90s,没有任务处理,将会关闭线程,下次调度时如果没有线程再重新生成,即同一个前后调度间隔超过90s的任务那不是要来回构建/删除线程吗?

服务注册与注销

EmbedServer构建Http服务器时会启动服务注册线程registryThreadregistryThread每30s上报一次心跳。如果registryThreadtoStop被更新为true,则进入服务注销流程。

public void run() {

    // registry
    while (!toStop) {
        try {
            RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
            for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
                try {
                    ReturnT<String> registryResult = adminBiz.registry(registryParam); // 注册(上报心跳)
                    // ...
                } catch (Exception e) {}

            }
        } catch (Exception e) {}
        try {
            if (!toStop) {
                TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT); // 30s
            }
        } catch (InterruptedException e) {}
    }

    // registry remove
    try {
        RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
        for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
            try {
                ReturnT<String> registryResult = adminBiz.registryRemove(registryParam); // 注销
                //....
            } catch (Exception e) {}

        }
    } catch (Exception e) {}

}

小结:客户端的设计言简意赅,配置多个admin adress就向多个服务端注册,这种方式也进一步说明服务端之间并未做状态同步(如果是同一个数据库即可以相互感知)。总而言之,客户端的设计主要是比较耗费线程资源,有些线程可能会出现不断构建、删除的情况。

上一篇 下一篇

猜你喜欢

热点阅读