有些文章不一定是为了上首页投稿Java郭志敏的程序员书屋

轻量级RabbitMQ自动化配置及异常处理组件

2019-09-27  本文已影响0人  传达室马大爷

github源码地址

https://github.com/shawntime/shawn-basicmq

开发背景

功能描述

使用案例

实现了零配置,无需关心配置细节,只需要关系自己的业务逻辑即可

1)仅需要生产端

@Service("smsMsgQueueService")
public class SmsMsgQueueService extends AbstractMsgQueueService<SMSEntity> {

    @Override
    protected String getMessageDesc() {
        return "发送短信";
    }
}

// 调用方

@Resource(name = "smsMsgQueueService")
private IMsgQueueService smsMsgQueueService;

public void sendSms(SMSEntity smsEntity) {
    smsService.provide(JsonHelper.serialize(smsEntity));
}

2)生产端+消费端

实现protected void consumeMessage(SmsEntity smsEntity, MessageProperties messageProperties)即可

@Service("smsMsgQueueService")
public class SmsMsgQueueService extends AbstractMsgQueueService<SmsEntity> {
    
    @Override
    protected String getMessageDesc() {
        return "发送短信";
    }

    @Override
    protected void consumeMessage(SmsEntity smsEntity, MessageProperties messageProperties) throws Throwable {
        // 消息的具体处理类
    }
}

如何不需要消息头信息,则实现下面方法即可

protected void consumeMessage(SmsEntity smsEntity) throws Throwable() {}

使用方法

<dependency>
    <groupId>com.shanwtime</groupId>
    <artifactId>basicmq</artifactId>
    <version>1.0-SNAPSHOT</version>
</dependency>
static final String SCAN_PACKAGE = "com.shawntime.provider.dao.mapper." + DB_TGA + ",com.shawntime.basic.dao";
Resource[] locationResources = resolver.getResources(mapperLocations);
Resource[] mqResources = resolver.getResources("classpath:mapper/basicmq/MessageQueueErrorLogMapper.xml");
List<Resource> resources = new ArrayList<>(locationResources.length + mqResources.length);
resources.addAll(Arrays.asList(locationResources));
resources.addAll(Arrays.asList(mqResources));
sqlSessionFactoryBean.setMapperLocations(resources.stream().toArray(Resource[]::new));
@ComponentScan(basePackages = {"com.shawntime.basic")
spring:
  rabbitmq:
    base:
      host: 127.0.0.1
      port: 5672
      username: admin
      password: 123456
      vhost: test
      isOpenListener: true

isOpenListener: 是否开启消费端,默认关闭

openScheduledTask=true
旧项目的修改

已经引入basicmq项目的实现类,需要重写以下两个方法
isAutoRegistry:是否自动注册
isConfirmCallBack:是否确认交换机接收消息

@Service
public class DingTalkService extends AbstractMsgQueueService<DingMessage> {

    @Resource(name = "dingTalkTemplate")
    private AmqpTemplate dingTalkTemplate;

    @Value("${spring.rabbitmq.tmApi.queue.dingTalk}")
    private String dingTalkQueue;

    @Value(" ${spring.rabbitmq.tmApi.exchange.dingTalk}")
    private String dingTalkExchange;

    @Override
    protected String getMessageDesc() {
        return "钉钉消息";
    }

    @Override
    protected void provideMessage(String msgBodyJson) throws Throwable {
        dingTalkTemplate.convertAndSend(dingTalkQueue, msgBodyJson);
    }

    @Override
    protected boolean isAutoRegistry() {
        return false;
    }

    @Override
    protected boolean isConfirmCallBack() {
        return false;
    }
}

代码分析

生产端消息100%投递
@Override
public void provide(String msgBodyJson, boolean isAsync, Map headMap) {
    try {
        logger.info("provide -> {}", msgBodyJson);
        String correlationDataId = "";
        if (isConfirmCallBack()) {
            MessageData data = getMessageData(msgBodyJson);
            redisClient.hset(Constant.queue_key, data.getId(), JsonHelper.serialize(data), -1);
            correlationDataId = data.getId();
        }
        provideMessage(msgBodyJson, correlationDataId, headMap);
    } catch (Throwable e) {
        exceptionHandle(new MsgQueueBody(BasicOperatorEnum.PROVIDER, msgBodyJson), e, isAsync);
    }
}

private MessageData getMessageData(String msgBodyJson) {
    String id = UUID.randomUUID().toString();
    MessageData data = new MessageData();
    data.setCurrTime(System.currentTimeMillis());
    data.setId(id);
    data.setJsonData(msgBodyJson);
    data.setTypeDesc(getMessageDesc());
    data.setTypeId(getMessageType());
    data.setOriginalId(0);
    data.setBeanName(getSpringBeanName());
    return data;
}

protected void provideMessage(String msgBodyJson,
                                  String correlationDataId,
                                  Map<String, Object> headMap) throws Throwable {
    if (!isAutoRegistry()) {
        provideMessage(msgBodyJson, correlationDataId);
        return;
    }
    String exchangeName = getExchangeName();
    String queueName = getQueueName();
    MessagePostProcessor messagePostProcessor = message -> {
        MessageProperties messageProperties = message.getMessageProperties();
        if (MapExtensions.isNotEmpty(headMap)) {
            headMap.forEach((key, value) -> {
                messageProperties.setHeader(key, value);
            });
        }
        messageProperties.setCorrelationId(correlationDataId.getBytes());
        return message;
    };
    amqpProducer.publishMsg(exchangeName, queueName, msgBodyJson,
            new CorrelationData(correlationDataId), messagePostProcessor);
}
发送端消息确认
@Component("confirmCallBackListener")
public class ConfirmCallBackListener implements RabbitTemplate.ConfirmCallback {

    @Resource
    private RedisClient redisClient;

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData.getId();
        if (StringUtils.isEmpty(id)) {
            return;
        }
        if (ack) {
            redisClient.hdel(Constant.queue_key, id);
        }
    }

}
发送失败的消息job异步重试
@Override
public void retry() {
    Map<String, String> keyMap = redisClient.hgetAll(Constant.queue_key);
    if (MapExtensions.isEmpty(keyMap)) {
        return;
    }
    keyMap.forEach((id, value) -> {
        MessageData data = JsonHelper.deSerialize(value, MessageData.class);
        if (System.currentTimeMillis() - data.getCurrTime() > 5 * 60 * 1000) {
            MessageQueueErrorRecord log = new MessageQueueErrorRecord();
            log.setBeanName(data.getBeanName());
            log.setErrorDesc("");
            log.setIsRePush(0);
            log.setMsgBody(data.getJsonData());
            log.setTypeDesc(data.getTypeDesc());
            log.setOperatorId(BasicOperatorEnum.PROVIDER.getCode());
            log.setTypeId(data.getTypeId());
            log.setOriginalId(data.getOriginalId());
            msgQueueErrorLogService.save(log);
            redisClient.hdel(Constant.queue_key, id);
        }
    });
}
消费端100%消费

处理失败消息统一入库

@Override
public void consume(String msgBodyJson, int originalId, MessageProperties messageProperties) {
    try {
        logger.info("consume -> {}", msgBodyJson);
        T obj = JsonHelper.deSerialize(msgBodyJson,
                (Class<T>) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[0]);
        if (messageProperties == null || messageProperties.getCorrelationId() == null) {
            consumeMessage(obj, messageProperties);
            return;
        }
        String correlationId = new String(messageProperties.getCorrelationId(), Charset.defaultCharset());
        if (StringUtils.isEmpty(correlationId)) {
            consumeMessage(obj, messageProperties);
            return;
        }
        if (exchangeType() == ExchangeType.TOPIC || exchangeType() == ExchangeType.FANOUT) {
            consumeMessage(obj, messageProperties);
            return;
        }
        String lockKey = "lock." + correlationId;
        boolean isLock = RedisLockUtil.lock(lockKey, correlationId, 60);
        if (isLock) {
            try {
                consumeMessage(obj, messageProperties);
            } finally {
                RedisLockUtil.unLock(lockKey, correlationId);
            }
        } else {
            new RuntimeException("重复消费");
        }
    } catch (Throwable e) {
        exceptionHandle(new MsgQueueBody(BasicOperatorEnum.CONSUMER, msgBodyJson), e, false, originalId);
    }
}

private void exceptionHandle(MsgQueueBody msg, Throwable throwable, boolean isAsync, int originalId) {
    logger.error(getMessageDesc() + "|" + msg.getMsgQueueBody(), throwable);
    MessageQueueErrorRecord log = new MessageQueueErrorRecord();
    log.setMsgBody(msg.getMsgQueueBody());
    log.setErrorDesc(DBStringUtil.subString(throwable.getMessage(), 1500));
    log.setOperatorId(msg.getBasicOperatorEnum().getCode());
    String beanName = getSpringBeanName();
    log.setBeanName(beanName);
    log.setTypeId(getMessageType());
    log.setTypeDesc(StringUtils.defaultString(getMessageDesc(), ""));
    log.setOriginalId(originalId);
    logger.info("dbLog -> {}", JsonHelper.serialize(log));
    if (isAsync) {
        rabbitProductExecutor.submit(() -> saveLog(log));
    } else {
        saveLog(log);
    }
}
异常消息重试
private void rePush(MessageQueueErrorRecord record) {
    int id = record.getOriginalId();
    if (!IntegerExtensions.isMoreThanZero(record.getOriginalId())) {
        id = record.getId();
    }
    if (!check(id)) {
        return;
    }
    AbstractMsgQueueService messageQueueService =
            (AbstractMsgQueueService) msgQueueFactory.getMsgQueueService(record);
    messageQueueService.consume(record.getMsgBody(), id);
    record.setIsRePush(1);
    msgQueueErrorLogService.update(record);
}

private boolean check(int id) {
    String key = getRedisKey(id);
    Long increment = redisClient.increment(key, 1L, 24 * 60 * 60);
    return increment == null || increment.longValue() <= 3;
}

private String getRedisKey(int id) {
    return "message.queue.consume.limit"
            + "." + LocalDateUtil.format(new Date(), "yyyyMMdd")
            + "#" + id;
}
配置自动注入监听
@PostConstruct
public void autoRegistry() {
    if (!isAutoRegistry()) {
        return;
    }
    DefaultListableBeanFactory beanFactory = (DefaultListableBeanFactory) SpringUtils.getBeanFactory();
    ConnectionFactory connectionFactory = (ConnectionFactory) beanFactory.getBean("basicConnectionFactory");
    RabbitAdmin rabbitAdmin = (RabbitAdmin) beanFactory.getBean("basicRabbitAdmin");
    DynamicConsumer consumer = null;
    try {
        DynamicConsumerContainerFactory fac = DynamicConsumerContainerFactory.builder()
                .exchangeType(exchangeType())
                .exchangeName(getExchangeName())
                .queue(getQueueName())
                .autoDeleted(false)
                .autoAck(true)
                .durable(true)
                .routingKey(getRoutingKey())
                .rabbitAdmin(rabbitAdmin)
                .connectionFactory(connectionFactory).build();
        consumer = new DynamicConsumer(fac, this);
    } catch (Exception e) {
        logger.error("系统异常", e);
    }
    customizeDynamicConsumerContainer.put(getQueueName(), consumer);
    if (isOpenListener()) {
        consumer.start();
    }
}
AbstractMsgQueueService 抽象类使用
发送消息
/**
* msgBodyJson:json序列化的消息体
* isAsync:发送失败是否异步入库
* headMap:发送消息时携带的消息头
*/
public void provide(String msgBodyJson)
public void provide(String msgBodyJson, Map headMap)
public void provide(String msgBodyJson, boolean isAsync, Map headMap)
消费消息
/**
* msgBodyJson:接收到的消息体
* originalId:重试代入的原始消息主键id,用于控制消息每天重试次数
* messageProperties:消息头信息
*/
public void consume(String msgBodyJson)
public void consume(String msgBodyJson, int originalId)
public void consume(String msgBodyJson, MessageProperties messageProperties)
public void consume(String msgBodyJson, int originalId, MessageProperties messageProperties)
查找实体bean

如果定义了messageType则按照messageType查找,如果没有定义则按照service的bean名称查找

protected int getMessageType()
private String getSpringBeanName()
自定义exchange、queue、exchange type、routingKey

重写以下方法

protected String getQueueName()
protected String getDirectExchangeName()
protected String getTopicExchangeName()
protected String getFanoutExchangeName()
protected ExchangeType exchangeType()
protected String getRoutingKey()

api接口介绍

创建数据库
CREATE TABLE [dbo].[MessageQueueErrorRecord] (
  [id] int  IDENTITY(1,1) NOT NULL,
  [operator_id] tinyint  NOT NULL,
  [type_id] int  NOT NULL,
  [type_desc] varchar(50) COLLATE Chinese_PRC_CI_AS  NOT NULL,
  [bean_name] varchar(200) COLLATE Chinese_PRC_CI_AS  NOT NULL,
  [msg_body] varchar(8000) COLLATE Chinese_PRC_CI_AS  NOT NULL,
  [error_desc] varchar(2000) COLLATE Chinese_PRC_CI_AS  NOT NULL,
  [original_id] int DEFAULT ((0)) NOT NULL,
  [is_re_push] tinyint DEFAULT ((0)) NOT NULL,
  [created_stime] datetime DEFAULT (getdate()) NOT NULL,
  [modified_stime] datetime DEFAULT (getdate()) NOT NULL,
  [is_del] int DEFAULT ((0)) NOT NULL
)
GO

ALTER TABLE [dbo].[MessageQueueErrorRecord] SET (LOCK_ESCALATION = TABLE)
GO

EXEC sp_addextendedproperty
'MS_Description', N'主键id',
'SCHEMA', N'dbo',
'TABLE', N'MessageQueueErrorRecord',
'COLUMN', N'id'
GO

EXEC sp_addextendedproperty
'MS_Description', N'操作类型,1:消费端,2:生产端',
'SCHEMA', N'dbo',
'TABLE', N'MessageQueueErrorRecord',
'COLUMN', N'operator_id'
GO

EXEC sp_addextendedproperty
'MS_Description', N'消息类型id,1001-"消费端:原始订单入库",2001-"生产端:原始订单入库",1002-"消费端:百万代理人订单推送",2002-"生产端:百万代理人订单推送",1003-"消费端:发送短信",2003-"生产端:添加短信信息",1004-"消费端:订单导出",2004-"生产端:订单导出"',
'SCHEMA', N'dbo',
'TABLE', N'MessageQueueErrorRecord',
'COLUMN', N'type_id'
GO

EXEC sp_addextendedproperty
'MS_Description', N'消息类型描述',
'SCHEMA', N'dbo',
'TABLE', N'MessageQueueErrorRecord',
'COLUMN', N'type_desc'
GO

EXEC sp_addextendedproperty
'MS_Description', N'消息对应spring bean名称',
'SCHEMA', N'dbo',
'TABLE', N'MessageQueueErrorRecord',
'COLUMN', N'bean_name'
GO

EXEC sp_addextendedproperty
'MS_Description', N'消息体',
'SCHEMA', N'dbo',
'TABLE', N'MessageQueueErrorRecord',
'COLUMN', N'msg_body'
GO

EXEC sp_addextendedproperty
'MS_Description', N'错误描述',
'SCHEMA', N'dbo',
'TABLE', N'MessageQueueErrorRecord',
'COLUMN', N'error_desc'
GO

EXEC sp_addextendedproperty
'MS_Description', N'原始id',
'SCHEMA', N'dbo',
'TABLE', N'MessageQueueErrorRecord',
'COLUMN', N'original_id'
GO

EXEC sp_addextendedproperty
'MS_Description', N'是否重新推送,0:没有,1:有',
'SCHEMA', N'dbo',
'TABLE', N'MessageQueueErrorRecord',
'COLUMN', N'is_re_push'
GO

EXEC sp_addextendedproperty
'MS_Description', N'创建时间',
'SCHEMA', N'dbo',
'TABLE', N'MessageQueueErrorRecord',
'COLUMN', N'created_stime'
GO

EXEC sp_addextendedproperty
'MS_Description', N'修改时间',
'SCHEMA', N'dbo',
'TABLE', N'MessageQueueErrorRecord',
'COLUMN', N'modified_stime'
GO

EXEC sp_addextendedproperty
'MS_Description', N'是否删除',
'SCHEMA', N'dbo',
'TABLE', N'MessageQueueErrorRecord',
'COLUMN', N'is_del'
GO

EXEC sp_addextendedproperty
'MS_Description', N'消息队列错误记录表',
'SCHEMA', N'dbo',
'TABLE', N'MessageQueueErrorRecord'
GO


-- ----------------------------
-- Primary Key structure for table MessageQueueErrorRecord
-- ----------------------------
ALTER TABLE [dbo].[MessageQueueErrorRecord] ADD CONSTRAINT [PK_MessageQueueErrorRecord] PRIMARY KEY CLUSTERED ([id])
WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON)  
ON [DFG]
GO
    
参数资料

rabbitMQ动态队列实现参考:https://blog.csdn.net/kingvin_xm/article/details/86712613

上一篇下一篇

猜你喜欢

热点阅读