开源定时任务power-job的告警实现
2022-12-15 本文已影响0人
天草二十六_简村人
一、报警接口Alarmable
package tech.powerjob.server.extension;
import tech.powerjob.server.persistence.remote.model.UserInfoDO;
import tech.powerjob.server.extension.defaultimpl.alarm.module.Alarm;
import java.util.List;
/**
* 报警接口
*
* @author tjq
* @since 2020/4/19
*/
public interface Alarmable {
void onFailed(Alarm alarm, List<UserInfoDO> targetUserList);
}
二、告警实现类
多种实现,这里实现了钉钉消息、邮件和webhook三种方式。你可以继续实现企业微信、sms等其他方式的告警。
DingTalkAlarmService
依赖于DingTalkUtils,详细我将在另外一篇文章里列出,希望能帮助到对接钉钉的同学。
package tech.powerjob.server.extension.defaultimpl.alarm.impl;
import tech.powerjob.common.OmsConstant;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.common.utils.NetUtils;
import tech.powerjob.server.common.PowerJobServerConfigKey;
import tech.powerjob.server.common.SJ;
import tech.powerjob.server.persistence.remote.model.UserInfoDO;
import tech.powerjob.server.extension.defaultimpl.alarm.module.Alarm;
import tech.powerjob.server.extension.Alarmable;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.List;
import java.util.Set;
/**
* 钉钉告警服务
*
* @author tjq
* @since 2020/8/6
*/
@Slf4j
@Service
public class DingTalkAlarmService implements Alarmable {
@Resource
private Environment environment;
private Long agentId;
private DingTalkUtils dingTalkUtils;
private Cache<String, String> mobile2UserIdCache;
private static final int CACHE_SIZE = 8192;
// 防止缓存击穿
private static final String EMPTY_TAG = "EMPTY";
@Override
public void onFailed(Alarm alarm, List<UserInfoDO> targetUserList) {
if (dingTalkUtils == null) {
return;
}
Set<String> userIds = Sets.newHashSet();
targetUserList.forEach(user -> {
String phone = user.getPhone();
if (StringUtils.isEmpty(phone)) {
return;
}
try {
String userId = mobile2UserIdCache.get(phone, () -> {
try {
return dingTalkUtils.fetchUserIdByMobile(phone);
} catch (PowerJobException ignore) {
return EMPTY_TAG;
} catch (Exception ignore) {
return null;
}
});
if (!EMPTY_TAG.equals(userId)) {
userIds .add(userId);
}
}catch (Exception ignore) {
}
});
userIds.remove(null);
if (!userIds.isEmpty()) {
String userListStr = SJ.COMMA_JOINER.skipNulls().join(userIds);
List<DingTalkUtils.MarkdownEntity> markdownEntities = Lists.newLinkedList();
markdownEntities.add(new DingTalkUtils.MarkdownEntity("server", NetUtils.getLocalHost()));
String content = alarm.fetchContent().replaceAll(OmsConstant.LINE_SEPARATOR, OmsConstant.COMMA);
markdownEntities.add(new DingTalkUtils.MarkdownEntity("content", content));
try {
dingTalkUtils.sendMarkdownAsync(alarm.fetchTitle(), markdownEntities, userListStr, agentId);
}catch (Exception e) {
log.error("[DingTalkAlarmService] send ding message failed, reason is {}", e.getMessage());
}
}
}
@PostConstruct
public void init() {
String agentId = environment.getProperty(PowerJobServerConfigKey.DING_AGENT_ID);
String appKey = environment.getProperty(PowerJobServerConfigKey.DING_APP_KEY);
String appSecret = environment.getProperty(PowerJobServerConfigKey.DING_APP_SECRET);
log.info("[DingTalkAlarmService] init with appKey:{},appSecret:{},agentId:{}", appKey, appSecret, agentId);
if (StringUtils.isAnyBlank(agentId, appKey, appSecret)) {
log.warn("[DingTalkAlarmService] cannot get agentId, appKey, appSecret at the same time, this service is unavailable");
return;
}
if (!StringUtils.isNumeric(agentId)) {
log.warn("[DingTalkAlarmService] DingTalkAlarmService is unavailable due to invalid agentId: {}", agentId);
return;
}
this.agentId = Long.valueOf(agentId);
dingTalkUtils = new DingTalkUtils(appKey, appSecret);
mobile2UserIdCache = CacheBuilder.newBuilder().maximumSize(CACHE_SIZE).build();
log.info("[DingTalkAlarmService] init DingTalkAlarmService successfully!");
}
}
MailAlarmService
邮件发送主要依赖javamail,没有太多可讲的。
package tech.powerjob.server.extension.defaultimpl.alarm.impl;
import tech.powerjob.server.persistence.remote.model.UserInfoDO;
import tech.powerjob.server.extension.defaultimpl.alarm.module.Alarm;
import tech.powerjob.server.extension.Alarmable;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.mail.SimpleMailMessage;
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import javax.annotation.Resource;
import java.util.List;
import java.util.Objects;
/**
* 邮件通知服务
*
* @author tjq
* @since 2020/4/30
*/
@Slf4j
@Service
public class MailAlarmService implements Alarmable {
@Resource
private Environment environment;
private JavaMailSender javaMailSender;
private String from;
private static final String FROM_KEY = "spring.mail.username";
@Override
public void onFailed(Alarm alarm, List<UserInfoDO> targetUserList) {
initFrom();
if (CollectionUtils.isEmpty(targetUserList) || javaMailSender == null || StringUtils.isEmpty(from)) {
return;
}
SimpleMailMessage sm = new SimpleMailMessage();
try {
sm.setFrom(from);
sm.setTo(targetUserList.stream().map(UserInfoDO::getEmail).filter(Objects::nonNull).toArray(String[]::new));
sm.setSubject(alarm.fetchTitle());
sm.setText(alarm.fetchContent());
javaMailSender.send(sm);
}catch (Exception e) {
log.warn("[MailAlarmService] send mail failed, reason is {}", e.getMessage());
}
}
@Autowired(required = false)
public void setJavaMailSender(JavaMailSender javaMailSender) {
this.javaMailSender = javaMailSender;
}
// 不能直接使用 @Value 注入,不存在的时候会报错
private void initFrom() {
if (StringUtils.isEmpty(from)) {
from = environment.getProperty(FROM_KEY);
}
}
}
WebHookAlarmService
发送http请求是okhttp3框架,我也将在另外一篇单独列出,希望可以帮助到相关同学。当然,发送http请求的框架太多了,你可以任意选择。
package tech.powerjob.server.extension.defaultimpl.alarm.impl;
import com.alibaba.fastjson.JSONObject;
import tech.powerjob.common.OmsConstant;
import tech.powerjob.common.utils.HttpUtils;
import tech.powerjob.server.persistence.remote.model.UserInfoDO;
import tech.powerjob.server.extension.defaultimpl.alarm.module.Alarm;
import tech.powerjob.server.extension.Alarmable;
import lombok.extern.slf4j.Slf4j;
import okhttp3.MediaType;
import okhttp3.RequestBody;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import java.util.List;
/**
* http 回调报警
*
* @author tjq
* @since 11/14/20
*/
@Slf4j
@Service
public class WebHookAlarmService implements Alarmable {
private static final String HTTP_PROTOCOL_PREFIX = "http://";
private static final String HTTPS_PROTOCOL_PREFIX = "https://";
@Override
public void onFailed(Alarm alarm, List<UserInfoDO> targetUserList) {
if (CollectionUtils.isEmpty(targetUserList)) {
return;
}
targetUserList.forEach(user -> {
String webHook = user.getWebHook();
if (StringUtils.isEmpty(webHook)) {
return;
}
// 自动添加协议头
if (!webHook.startsWith(HTTP_PROTOCOL_PREFIX) && !webHook.startsWith(HTTPS_PROTOCOL_PREFIX)) {
webHook = HTTP_PROTOCOL_PREFIX + webHook;
}
MediaType jsonType = MediaType.parse(OmsConstant.JSON_MEDIA_TYPE);
RequestBody requestBody = RequestBody.create(jsonType, JSONObject.toJSONString(alarm));
try {
String response = HttpUtils.post(webHook, requestBody);
log.info("[WebHookAlarmService] invoke webhook[url={}] successfully, response is {}", webHook, response);
}catch (Exception e) {
log.warn("[WebHookAlarmService] invoke webhook[url={}] failed!", webHook, e);
}
});
}
}
三、消息基类Alarm
是一个接口,默认实现了拼接消息内容的方法。不同种类的消息,包含的字段不一样,但是都实现该接口。
package tech.powerjob.server.extension.defaultimpl.alarm.module;
import com.alibaba.fastjson.JSONObject;
import tech.powerjob.common.OmsConstant;
import tech.powerjob.common.PowerSerializable;
import tech.powerjob.common.utils.CommonUtils;
import org.apache.commons.lang3.StringUtils;
/**
* 报警内容
*
* @author tjq
* @since 2020/8/1
*/
public interface Alarm extends PowerSerializable {
String fetchTitle();
default String fetchContent() {
StringBuilder sb = new StringBuilder();
JSONObject content = JSONObject.parseObject(JSONObject.toJSONString(this));
content.forEach((key, originWord) -> {
sb.append(key).append(": ");
String word = String.valueOf(originWord);
if (StringUtils.endsWithIgnoreCase(key, "time") || StringUtils.endsWithIgnoreCase(key, "date")) {
try {
if (originWord instanceof Long) {
word = CommonUtils.formatTime((Long) originWord);
}
}catch (Exception ignore) {
}
}
sb.append(word).append(OmsConstant.LINE_SEPARATOR);
});
return sb.toString();
}
}
四、告警服务
image.png对外提供的功能入口,供其他代码调用。主要是方法alarmFailed()。
它的入参,第一个是消息内容,只要实现了接口Alerm的对象即可,第二个是告警对象-消息接收人。
package tech.powerjob.server.extension.defaultimpl.alarm;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import tech.powerjob.server.extension.defaultimpl.alarm.module.Alarm;
import tech.powerjob.server.extension.Alarmable;
import tech.powerjob.server.persistence.remote.model.UserInfoDO;
import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.concurrent.*;
/**
* 报警服务
*
* @author tjq
* @since 2020/4/19
*/
@Slf4j
@Component
public class AlarmCenter {
private final ExecutorService POOL;
private final List<Alarmable> BEANS = Lists.newLinkedList();
@Autowired
public AlarmCenter(List<Alarmable> alarmables) {
int cores = Runtime.getRuntime().availableProcessors();
ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("AlarmPool-%d").build();
POOL = new ThreadPoolExecutor(cores, cores, 5, TimeUnit.MINUTES, Queues.newLinkedBlockingQueue(), factory);
alarmables.forEach(bean -> {
BEANS.add(bean);
log.info("[AlarmCenter] bean(className={},obj={}) register to AlarmCenter successfully!", bean.getClass().getName(), bean);
});
}
public void alarmFailed(Alarm alarm, List<UserInfoDO> targetUserList) {
POOL.execute(() -> BEANS.forEach(alarmable -> {
try {
alarmable.onFailed(alarm, targetUserList);
}catch (Exception e) {
log.warn("[AlarmCenter] alarm failed.", e);
}
}));
}
}
告警对象
不同的告警方式,需要填写的字段不一,可能是手机号,可能是邮箱,也可以是webhook地址。
package tech.powerjob.server.persistence.remote.model;
import lombok.Data;
import org.hibernate.annotations.GenericGenerator;
import javax.persistence.*;
import java.util.Date;
/**
* 用户信息表
*
* @author tjq
* @since 2020/4/12
*/
@Data
@Entity
@Table
public class UserInfoDO {
@Id
@GeneratedValue(strategy = GenerationType.AUTO, generator = "native")
@GenericGenerator(name = "native", strategy = "native")
private Long id;
private String username;
private String password;
/**
* 手机号
*/
private String phone;
/**
* 邮箱地址
*/
private String email;
/**
* webHook
*/
private String webHook;
/**
* 扩展字段
*/
private String extra;
private Date gmtCreate;
private Date gmtModified;
}