微服务中,实现自定义的webhook

2023-01-08  本文已影响0人  天草二十六_简村人

一、背景

成熟的系统都会支持webhook回调,在本业务实体发生变更的时候,异步/同步触发回调订阅方。

具体的实现方案,首选http api接口,第二种方案是采用mq的方式,发布一个mq消息,由需要订阅该事件变更的服务去订阅消息。

本文主要是讲述前种方案。至于后者,本人不是很建议,它存在一定的技术依赖和网络要求。

之前我写过关于延时任务通知的文章,需要注意的是,本文是实时任务通知,并不实现延时的功能。

二、参考UI

这里我省去画UI的功夫,参考了禅道的界面,希望能帮助理解webhook的设计。

image.png image.png image.png

三、目标

四、数模设计

image.png
CREATE TABLE  `webhook_config` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键ID',
  `task_code` varchar(64) NOT NULL COMMENT '任务编号',
  `callback_url` varchar(255) NOT NULL COMMENT '回调地址',
  `created_date` datetime DEFAULT NULL COMMENT '创建时间',
  `modified_date` datetime DEFAULT NULL COMMENT '更新时间',
  `created_by` varchar(64) DEFAULT NULL COMMENT '创建人员',
  `modified_by` varchar(64) DEFAULT NULL COMMENT '更新人员',
  `remark` varchar(128) DEFAULT NULL COMMENT '备注',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='业务回调配置表';
 
 
CREATE TABLE `webhook_task` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键ID',
  `task_code` varchar(64) NOT NULL COMMENT '任务编号',
  `notify_params` varchar(4000) DEFAULT NULL COMMENT '通知内容Json',
  `notify_url` varchar(255) NOT NULL COMMENT '回调地址',
    
  `finish_time` datetime DEFAULT NULL COMMENT '完成时间',
  `is_finish` int(11) NOT NULL DEFAULT 0 COMMENT '是否完成',
  `last_time` datetime DEFAULT NULL COMMENT '最后重试时间',
  `next_time` datetime DEFAULT NULL COMMENT '下次尝试时间',
  `retry_times` int(11) DEFAULT 0 COMMENT '重试次数',
   
  `created_date` datetime DEFAULT NULL COMMENT '创建时间',
   
  PRIMARY KEY (`id`) USING BTREE,
  KEY `IDX_REF_TASK_nextTime` (`next_time`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='通知补偿任务表';
 
 -- 表webhook_tasks_history和表webhook_tasks的设计一致
CREATE TABLE `webhook_tasks_history` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键ID',
  `task_code` varchar(64) NOT NULL COMMENT '任务编号',
  `notify_params` varchar(4000) DEFAULT NULL COMMENT '通知内容Json',
  `notify_url` varchar(255) NOT NULL COMMENT '回调地址',
    
  `finish_time` datetime DEFAULT NULL COMMENT '完成时间',
  `is_finish` int(11) NOT NULL DEFAULT 0 COMMENT '是否完成',
  `last_time` datetime DEFAULT NULL COMMENT '最后重试时间',
  `next_time` datetime DEFAULT NULL COMMENT '下次尝试时间',
  `retry_times` int(11) DEFAULT 0 COMMENT '重试次数',
   
  `created_date` datetime DEFAULT NULL COMMENT '创建时间',
   
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='通知补偿任务归档表';

五、其他设计

还有一种的设计思路是,首次回调如果是成功,就不写入到webhoook_task表,只有当回调失败时,才持久化到任务表进行重试处理。

六、源码实现

image.png

6.1、核心类


import cn.hutool.http.ContentType;
import cn.hutool.http.HttpResponse;
import cn.hutool.http.HttpUtil;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;

import java.nio.charset.Charset;
import java.util.List;

/**
 * 通知任务
 *
 * @author xxx
 **/
@Slf4j
@Service
public class NotifyTasksService {

    private NotifyTaskScheduler notifyTaskScheduler;

    @Autowired
    private NotifyConfigRepository notifyConfigRepository;
    /**
     * 连接超时
     */
    private static final int connectionTimeout = 10000;

    /**
     * 读取超时
     */
    private static final int readTimeout = 5000;

    public void schedule(String taskCode, String notifyUrl, String remark) {
        notifyTaskScheduler.schedule(taskCode, notifyUrl, remark);
    }

    public NotifyTasksService(NotifyTaskScheduler notifyTaskScheduler) {
        this.notifyTaskScheduler = notifyTaskScheduler;
        notifyTaskScheduler.registerHandler(Constants.TaskCode.NOTIFY_POINTS_ACCOUNT_FLOW, new NotifyTaskScheduler.NotifyTaskHandler() {
            @Override
            public boolean run(NotifyTasks task) {
                boolean success = execute(Constants.TaskCode.NOTIFY_POINTS_ACCOUNT_FLOW, task.getNotifyUrl(), task.getNotifyParams(), true);
                return success;
            }
        });

        //TODO 注册其他任务
    }

// 暂时没用上,适用于在接口中指定了回调url。
    public void notify(String taskCode, String notifyUrl, String requestJson) {
        if (StringUtils.isEmpty(taskCode)) {
            return;
        }
        if (StringUtils.isNotEmpty(notifyUrl)) {
            execute(taskCode, notifyUrl, requestJson, false);
        }

        this.notify(taskCode, requestJson);
    }

    public void notify(String taskCode, String requestJson) {
        List<NotifyConfig> bizNotifyConfigs = notifyConfigRepository.findByTaskCode(taskCode);

        if (CollectionUtils.isEmpty(bizNotifyConfigs)) {
            return;
        }

        bizNotifyConfigs.forEach(bizNotifyConfig -> {
            // 通知
            String callbackUrl = bizNotifyConfig.getCallbackUrl();

            execute(taskCode, callbackUrl, requestJson, false);
        });
    }

// 具体指定回调,这里使用的是hutool框架,会打印调用日志。
    private boolean execute(String taskCode, String callbackUrl, String requestJson, boolean taskExecute) {
        try {
            HttpResponse httpResponse = HttpUtil.createPost(callbackUrl).body(requestJson, ContentType.build(ContentType.JSON.getValue(), Charset.defaultCharset()))
                    .setConnectionTimeout(connectionTimeout).setReadTimeout(readTimeout).execute();

            int status = httpResponse.getStatus();

            if (!httpResponse.isOk()) {
                if (log.isWarnEnabled()) {
                    log.warn("执行回调URL:{}失败, \n 请求入参是:{}, \n 返回状态码是:{}, \n 返回体是:{} \n taskCode={}",
                            callbackUrl, requestJson, status, httpResponse.body(), taskCode);
                }

                this.addTask2Schedule(taskExecute, taskCode, callbackUrl, requestJson);
            } else {
                if (log.isInfoEnabled()) {
                    log.info("执行回调URL:{}成功, \n 请求入参是:{}, \n 返回状态码是:{}, \n 返回体是:{} \n taskCode={}",
                            callbackUrl, requestJson, status, httpResponse.body(), taskCode);
                }
            }
            return httpResponse.isOk();
        } catch (Exception e) {
            log.error("执行回调URL出现异常, taskCode={}, 回调URL:{}, 请求入参是:{}", taskCode, callbackUrl, requestJson, e);

            this.addTask2Schedule(taskExecute, taskCode, callbackUrl, requestJson);
        }
        return false;
    }


    private void addTask2Schedule(boolean taskExecute, String taskCode, String callbackUrl, String requestJson) {
        if (!taskExecute) {
            this.schedule(taskCode, callbackUrl, requestJson);
        }
    }


}

/**
     * 通知任务代码
     **/
    public static class TaskCode {
        /**
         * 积分账户
         **/
        public final static String NOTIFY_POINTS_ACCOUNT_FLOW = "POINTS_ACCOUNT_FLOW";
    }

6.2、NotifyTaskScheduler


import com.google.common.collect.Maps;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.PlatformTransactionManager;

import java.util.Date;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

/**
 * Description 通知任务调度
 *
 **/
@Component
public class NotifyTaskScheduler extends AbstractTaskScheduler<NotifyTasks> {

    @Autowired
    private NotifyTasksRepository notifyTasksRepository;

    @Autowired
    private NotifyTasksHistoryRepository notifyTasksHistoryRepository;

    private Map<String, NotifyTaskHandler> taskHandlers = Maps.newConcurrentMap();

    public NotifyTaskScheduler(PlatformTransactionManager platformTransactionManager) {
        super(platformTransactionManager);
    }

    public void registerHandler(String type, NotifyTaskHandler handler) {
        taskHandlers.put(type, handler);
    }

    public void schedule(String taskCode, String notifyUrl, String remark) {
        NotifyTasks task = new NotifyTasks(taskCode, notifyUrl, remark);
        notifyTasksRepository.save(task);
    }

    private NotifyTaskHandler getHandler(String type) {
        return this.taskHandlers.get(type);
    }

    @Override
    protected boolean runTask(NotifyTasks task) {
        NotifyTaskHandler handler = getHandler(task.getTaskCode());
        if (handler != null) {
            return handler.run(task);
        }
        return false;
    }

    @Override
    protected void onSuccess(NotifyTasks task) {
        NotifyTaskHandler handler = getHandler(task.getTaskCode());
        if (handler != null) {
            handler.onSuccess(task);
        }
    }

    @Override
    protected Set<NotifyTasks> getTasks(Date date) {
        Set<NotifyTasks> availableTasks = notifyTasksRepository.findTop10ByNextTimeBeforeOrderByNextTimeAsc(date);
        return availableTasks;
    }

    @Override
    protected void updateTasks(Set<NotifyTasks> tasks) {
        notifyTasksRepository.saveAll(tasks);
    }

    @Override
    protected void moveTaskToHistory(Set<NotifyTasks> tasks) {
        notifyTasksRepository.deleteAll(tasks);
        final Set<NotifyTasksHistory> historyOrderTasks = tasks.stream()
                .map(t -> new NotifyTasksHistory(t))
                .collect(Collectors.toSet());
        notifyTasksHistoryRepository.saveAll(historyOrderTasks);
    }

    /**
     * 任务处理
     */
    public interface NotifyTaskHandler {

        /**
         * 运行任务
         *
         * @param task
         * @return
         */
        boolean run(NotifyTasks task);

        /**
         * 任务处理成功
         * <p>
         * 处于事务中,请勿进行复杂操作
         *
         * @param task
         */
        default void onSuccess(NotifyTasks task) {
        }
    }


}

6.3、AbstractTaskScheduler


import cn.hutool.core.date.DateUtil;
import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.transaction.PlatformTransactionManager;

import java.util.Date;
import java.util.Set;

/**
 * 任务调度
 *
 */
public abstract class AbstractTaskScheduler<T extends AbstractTask> extends AbstractTransaction {

    private static final Logger logger = LoggerFactory.getLogger(AbstractTaskScheduler.class);

    public AbstractTaskScheduler(PlatformTransactionManager platformTransactionManager) {
        super(platformTransactionManager);
    }

    public void runSchedule() {
        // 从数据库获取任务
        runInTransaction(() -> {
            Set<T> availableTasks = this.getTasks(DateUtil.date());
            availableTasks.forEach(task -> {
                task.retry();
            });
            if (!availableTasks.isEmpty()) {
                this.updateTasks(availableTasks);
            }
            return availableTasks;
        }).forEach(task -> {
            try {
                this.run(task);
            } catch (Exception ex) {
                if (logger.isWarnEnabled()) {
                    logger.warn("执行任务失败: " + task, ex);
                }
            }
        });
    }

    private void run(T task) {
        if (logger.isInfoEnabled()) {
            logger.info("正在执行任务: " + task);
        }
        boolean success = this.runTask(task);
        if (success) {
            if (logger.isInfoEnabled()) {
                logger.info("执行任务成功,回调: " + task);
            }
            this.onSuccess(task);

            // 移到历史任务表,这里使用到了自定义事务
            runInTransaction(() -> {
                if (logger.isInfoEnabled()) {
                    logger.info("执行任务成功,更新任务状态: " + task);
                }
                task.finish();

                Set<T> tasks = Sets.newHashSet(task);
                this.moveTaskToHistory(tasks);

                return 0;
            });
        } else {
            if (logger.isInfoEnabled()) {
                logger.info("执行任务失败: " + task);
            }
        }
    }


    /**
     * 获取需要执行任务
     * <p>
     * <pre>需要加分布式锁,或者数据库锁</pre>
     * <pre>已处于事务中</pre>
     *
     * @param date
     * @return
     */
    protected abstract Set<T> getTasks(Date date);

    /**
     * 更新任务状态
     * <p>
     * <pre>已处于事务中</pre>
     *
     * @param tasks
     */
    protected abstract void updateTasks(Set<T> tasks);

    /**
     * 移到历史任务表
     *
     * @param tasks
     */
    protected abstract void moveTaskToHistory(Set<T> tasks);

    /**
     * 执行任务
     *
     * @param task
     * @return
     */
    protected abstract boolean runTask(T task);

    /**
     * 任务执行成功后处理
     *
     * @param task
     */
    protected void onSuccess(T task) {
        /* NOT-IMPL */
    }
}

6.4、AbstractTask

import javax.persistence.Column;
import javax.persistence.MappedSuperclass;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.temporal.ChronoUnit;
import java.util.Date;

/**
 * 定时处理任务
 *
 */
@MappedSuperclass
public abstract class AbstractTask {

    @Column(name = "next_time", columnDefinition = "DATETIME default NOW() COMMENT '下次尝试时间'")
    private Date nextTime;

    @Column(name = "retry_times", columnDefinition = "INT default 0 COMMENT '重试次数'")
    private int retryTimes;

    @Column(name = "last_time", columnDefinition = "DATETIME COMMENT '最后重试时间'")
    private Date lastTime;

    @Column(name = "is_finish", nullable = false, columnDefinition = "INT default 0 COMMENT '是否完成'")
    private boolean isFinish;

    @Column(name = "finish_time", columnDefinition = "DATETIME COMMENT '完成时间'")
    private Date finishTime;

    public AbstractTask() {
        this(new Date(), 0, null, false, null);
    }

    public AbstractTask(AbstractTask task) {
        this(task.nextTime, task.retryTimes, task.lastTime, task.isFinish, task.finishTime);
    }

    public AbstractTask(Date nextTime, int retryTimes, Date lastTime, boolean isFinish, Date finishTime) {
        this.nextTime = nextTime;
        this.retryTimes = retryTimes;
        this.lastTime = lastTime;
        this.isFinish = isFinish;
        this.finishTime = finishTime;
    }

    /**
     * 指数时间退避重试
     *
     * @return
     */
    public void retry() {
        this.retryTimes += 1;
        this.nextTime = calcNextRetryTime();
        this.lastTime = new Date();
    }

// 计算下一次重试时间
    private Date calcNextRetryTime() {
        final ZoneId zone = ZoneId.systemDefault();
        final LocalDateTime lastTime = LocalDateTime.ofInstant(Instant.now(), zone);
        final LocalDateTime nextTime = lastTime.plus(exp(this.retryTimes), ChronoUnit.MILLIS);

        final Instant instant = nextTime.atZone(zone).toInstant();
        return Date.from(instant);
    }

// 指数级重试
    public static long exp(int retryCount) {
        long waitTime = ((long) Math.pow(2, retryCount) * 1000L);
        return waitTime;
    }

    public void finish() {
        this.isFinish = true;
        this.finishTime = new Date();
    }

}


import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import rx.functions.Func0;

/**
 *
 **/
public abstract class AbstractTransaction {

    private PlatformTransactionManager platformTransactionManager;

    public AbstractTransaction(PlatformTransactionManager platformTransactionManager) {
        this.platformTransactionManager = platformTransactionManager;
    }

    public <R> R runInTransaction(Func0<R> func0) {
        R ret = null;
        TransactionDefinition definition = new DefaultTransactionDefinition(
                TransactionDefinition.PROPAGATION_REQUIRED);
        TransactionStatus status = platformTransactionManager.getTransaction(definition);
        try {
            ret = func0.call();

            platformTransactionManager.commit(status);
        } catch ( Exception e ) {
            platformTransactionManager.rollback(status);
        }
        return ret;
    }
}

6.5、xxl-job定时任务

用于补偿回调失败的任务

image.png
    private final NotifyTaskScheduler notifyTaskScheduler;

/**
     * 回调通知
     *
     * @param param
     * @return
     * @throws Exception
     */
    @XxlJob("notifyTasksHandler")
    public ReturnT<String> notifyTasksHandler(String param) throws Exception {
        notifyTaskScheduler.runSchedule();
        return ReturnT.SUCCESS;
    }
上一篇下一篇

猜你喜欢

热点阅读