RocketMQ 和Spring Cloud Stream
2019-08-20 本文已影响0人
just_like_you
Spring Cloud Stream
RocketMQ的使用
生产者
- 添加配置
rocketmq:
name-server: 192.168.18.91:9876 #rocketmq server地址
producer:
group: test-group #生产者组名
- 使用
RocketMQTemplate
发送消息,这里是使用了RocketMQ
提供的MQ
分布式事务。
this.rocketMQTemplate.sendMessageInTransaction(
"tx-add-bonus",
"add-bonus",
MessageBuilder.withPayload(
UserAddBonusMsgDTO.builder()
.bonus(500)
.userId(share.getUserId())
.build()
)
.setHeader("transactionId", transactionId)
.setHeader("shareId", id)
.build(),
//传递参数
auditDTO
);
- 监听生产者本地事务,以及使用一个
transationId
来辅助判断本地是否是否提交
@Component
@RocketMQTransactionListener(txProducerGroup = "tx-add-bonus")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class AddBonusMQTransactionListener implements RocketMQLocalTransactionListener {
private final ShareServiceImpl shareService;
private final RocketmqTxLogMapper rocketmqTxLogMapper;
/**
* 监听执行本地方法事务
*
* @param msg 消息体
* @param arg 参数
* @return {@link RocketMQLocalTransactionState}
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
MessageHeaders headers = msg.getHeaders();
Integer shareId = Optional.ofNullable(headers.get("shareId"))
.map(String::valueOf)
.map(Integer::valueOf).orElseThrow(() -> new IllegalArgumentException("shareId不能为空"));
String transactionId = (String) msg.getHeaders().get("transactionId");
//要监听的方法
this.shareService.updateStatusWithRocketTx(shareId, (AuditDTO) arg,transactionId);
//执行成功则给broker发送提交事务信号
return COMMIT;
} catch (Exception e) {
return ROLLBACK;
}
}
/**
* rocketMQ Server 回查
*
* @param msg
* @return
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
String transactionId = (String) msg.getHeaders().get("transactionId");
if (rocketmqTxLogMapper.selectCount(Wrappers.<RocketmqTxLog>lambdaQuery().eq(StringUtils.isNotBlank(transactionId), RocketmqTxLog::getTransactionId, transactionId)) == 1) {
return COMMIT;
}
return ROLLBACK;
}
}
消费者
- 添加监听器即可
普通消费监听器
@Service
@RocketMQMessageListener(topic = "add-bonus",consumerGroup = "consumer-group")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class AddBonusListener implements RocketMQListener<UserAddBonusMsgDTO> {
private final UserMapper userMapper;
private final BonusEventLogMapper bonusEventLogMapper;
@Override
public void onMessage(UserAddBonusMsgDTO message) {
Optional.ofNullable(message.getUserId())
.map(userMapper::selectById)
.ifPresent(user -> {
user.setBonus(user.getBonus() + message.getBonus());
userMapper.updateById(user);
});
Optional.of(bonusEventLogMapper.insert(
BonusEventLog.builder()
.createTime(LocalDateTime.now())
.description("添加积分")
.event("add-bonus")
.userId(message.getUserId())
.value(message.getBonus())
.build()
)).filter(i -> i == 1).orElseThrow(() -> new IllegalArgumentException("添加积分失败"));
}
}
Spring Cloud Stream的简单使用
- 添加依赖
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
- 添加注解和配置
添加注解:
@EnableBinding({CustomerChannel.class}) //这里采用自定义的收发接口
public class TestController
自定义接口:
public interface CustomerChannel {
String OUTPUT = "my-output"; //这里的名称对应了spring.cloud.stream.rocketmq.bindings.<channelName>
String INPUT = "my-input";
@Output(CustomerChannel.OUTPUT)
MessageChannel output();
@Input(CustomerChannel.INPUT)
SubscribableChannel input();
}
application.yml
配置:
stream:
rocketmq:
binder:
name-server: 192.168.18.91:9876
bindings:
my-input:
consumer:
tags: tag2 || tag1
bindings:
my-input:
destination: my-stream-topic
group: my-stream-group
my-output:
destination: my-stream-topic
- 编写代码
//使用一个controller断点模拟发送消息,可以在setHeader方法中设置header来实现消息过滤
@GetMapping("/test-stream-send-2")
public String testCustomInterfaceSendMsg() {
this.customerChannel.output()
.send(MessageBuilder.withPayload("send message").setHeader(RocketMQHeaders.TAGS, "tag3").build());
return "success";
}
//使用@StreamListener来监听消息
@StreamListener(value = CustomerChannel.INPUT)
public void testCustomListener(Message message) {
System.out.println(message.getPayload().toString());
}
Spring Cloud Stream的过滤消息
Spring Cloud Stream的监控断点
- /actuator/bindings
- /actuator/channels
- /actuator/health