springBoot+rocketMQ做分布式消息队列
2020-04-18 本文已影响0人
虾餃
笔者所有文章第一时间发布于:
hhbbz的个人博客
项目地址
spring-boot-aliRocketMQ-starter
application.yml
mq-config:
producerId: PID_*
consumerId: CID_*
accessKey: *
secretKey: *
onsAddr: *
topic: *
添加启动类用于初始化消费者
@Component
public class RocketMQRunner implements CommandLineRunner {
private static final Logger logger = LoggerFactory.getLogger(RocketMQRunner.class);
@Autowired
private MQConfig mqConfig;
@Autowired
@Qualifier("orderConsumer")
private OrderConsumer orderConsumer;
/**消息监听器**/
@Autowired
private ConsumerHandler consumerHandler;
/**
* 初始化订阅者,生产者信息,启动
* @param args
* @throws Exception
*/
@Override
public void run(String... args) throws Exception {
orderConsumer.subscribe(mqConfig.getTopic(), mqConfig.getTag(), consumerHandler);
orderConsumer.start();
}
}
生产消息
@Service
public class SysMessageProducer {
private static final Logger logger = LoggerFactory.getLogger(SysMessageProducer.class);
@Autowired
private MQHelper<SysMessage> mqHelper;
@Autowired
private MQConfig mqConfig;
/**
* 此方法用于在消息中心创建消息后调用推送消息
*
* @param sysMessage
*/
public void PushMessageWhenCreate(SysMessage sysMessage) {
if (AssertValue.isNotNull(sysMessage)) {
//发送消息到队列中
try {
ProducerMessage<SysMessage> producerMessage = new ProducerMessage<SysMessage>()
.setTopic(mqConfig.getTopic())
.setTags("middle")
.setName(MQHandlerType.PUSH_APPMESSAGE_NEWS.getTypeName())
.setKey(MQHandlerType.PUSH_APPMESSAGE_NEWS.toString())
.setBody(sysMessage)
.setShardingKey(String.valueOf(sysMessage.getId()))
.setState("none");
//设置立刻发送消息
producerMessage.setType(RocketMQServiceConstant.SYNCHRONOUS_ORDER_MESSAGE);
producerMessage.setStartDeliveryTime(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
//创建消息
Message message = mqHelper
.generateMessage(producerMessage);
//发送消息
SendResult sendResult = mqHelper.sendMessage(message, producerMessage);
logger.info(new Date() + " 发送成功! Topic:" + mqConfig.getTopic() + " msgId: " + sendResult.getMessageId());
} catch (Exception e) {
e.printStackTrace();
logger.error("消息生产失败,将进行两次重试", e.getMessage());
logger.info("发送失败");
throw new RestInternalServerErrorException(ExceptionEnumeration.SYS_NEWS_PUSH_FAIL, "平台消息推送失败");
}
} else {
throw new RestInternalServerErrorException(ExceptionEnumeration.SYS_NEWS_SELECT_FAIL, "找不到该平台消息");
}
}
}
监听器消费消息
@Component
public class ConsumerHandler implements MessageOrderListener {
private static final Logger logger = LoggerFactory.getLogger(ConsumerHandler.class);
/**
* 消费消息 handler
*
* @param message
* @param context
* @return
*/
@Override
public OrderAction consume(Message message, ConsumeOrderContext context) {
}
}