RocketMQ 和Spring Cloud Stream

2019-08-20  本文已影响0人  just_like_you

Spring Cloud Stream

RocketMQ的使用

生产者

rocketmq:
  name-server: 192.168.18.91:9876 #rocketmq server地址
  producer:
    group: test-group #生产者组名
this.rocketMQTemplate.sendMessageInTransaction(
                    "tx-add-bonus",
                    "add-bonus",
                    MessageBuilder.withPayload(
                            UserAddBonusMsgDTO.builder()
                                    .bonus(500)
                                    .userId(share.getUserId())
                                    .build()
                    )
                            .setHeader("transactionId", transactionId)
                            .setHeader("shareId", id)
                            .build(),
                    //传递参数
                    auditDTO
            );
@Component
@RocketMQTransactionListener(txProducerGroup = "tx-add-bonus")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class AddBonusMQTransactionListener implements RocketMQLocalTransactionListener {

    private final ShareServiceImpl shareService;
    private final RocketmqTxLogMapper rocketmqTxLogMapper;

    /**
     * 监听执行本地方法事务
     *
     * @param msg 消息体
     * @param arg 参数
     * @return {@link RocketMQLocalTransactionState}
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            MessageHeaders headers = msg.getHeaders();
            Integer shareId = Optional.ofNullable(headers.get("shareId"))
                    .map(String::valueOf)
                    .map(Integer::valueOf).orElseThrow(() -> new IllegalArgumentException("shareId不能为空"));
            String transactionId = (String) msg.getHeaders().get("transactionId");
            //要监听的方法
            this.shareService.updateStatusWithRocketTx(shareId, (AuditDTO) arg,transactionId);
            //执行成功则给broker发送提交事务信号
            return COMMIT;
        } catch (Exception e) {
            return ROLLBACK;
        }
    }
    /**
     * rocketMQ Server 回查
     *
     * @param msg
     * @return
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        String transactionId = (String) msg.getHeaders().get("transactionId");
        if (rocketmqTxLogMapper.selectCount(Wrappers.<RocketmqTxLog>lambdaQuery().eq(StringUtils.isNotBlank(transactionId), RocketmqTxLog::getTransactionId, transactionId)) == 1) {
            return COMMIT;
        }
        return ROLLBACK;
    }
}

消费者

普通消费监听器
@Service
@RocketMQMessageListener(topic = "add-bonus",consumerGroup = "consumer-group")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class AddBonusListener implements RocketMQListener<UserAddBonusMsgDTO> {
    private final UserMapper userMapper;
    private final BonusEventLogMapper bonusEventLogMapper;

    @Override
    public void onMessage(UserAddBonusMsgDTO message) {
        Optional.ofNullable(message.getUserId())
                .map(userMapper::selectById)
                .ifPresent(user -> {
                    user.setBonus(user.getBonus() + message.getBonus());
                    userMapper.updateById(user);
                });

        Optional.of(bonusEventLogMapper.insert(
                BonusEventLog.builder()
                        .createTime(LocalDateTime.now())
                        .description("添加积分")
                        .event("add-bonus")
                        .userId(message.getUserId())
                        .value(message.getBonus())
                        .build()
        )).filter(i -> i == 1).orElseThrow(() -> new IllegalArgumentException("添加积分失败"));
    }
}

Spring Cloud Stream的简单使用

        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
        </dependency>

添加注解:

@EnableBinding({CustomerChannel.class}) //这里采用自定义的收发接口
public class TestController

自定义接口:

public interface CustomerChannel {

    String OUTPUT = "my-output";   //这里的名称对应了spring.cloud.stream.rocketmq.bindings.<channelName>
    String INPUT = "my-input";

    @Output(CustomerChannel.OUTPUT)
    MessageChannel output();


    @Input(CustomerChannel.INPUT)
    SubscribableChannel input();

}

application.yml配置:

    stream:
      rocketmq:
        binder:
          name-server: 192.168.18.91:9876
        bindings:
          my-input:
            consumer:
              tags: tag2 || tag1
      bindings:
        my-input:
          destination: my-stream-topic
          group: my-stream-group
        my-output:
          destination: my-stream-topic
//使用一个controller断点模拟发送消息,可以在setHeader方法中设置header来实现消息过滤
@GetMapping("/test-stream-send-2")
    public String testCustomInterfaceSendMsg() {
        this.customerChannel.output()
                .send(MessageBuilder.withPayload("send message").setHeader(RocketMQHeaders.TAGS, "tag3").build());
        return "success";
    }
    //使用@StreamListener来监听消息
    @StreamListener(value = CustomerChannel.INPUT)
    public void testCustomListener(Message message) {
        System.out.println(message.getPayload().toString());
    }

Spring Cloud Stream的过滤消息

大佬博客手记

Spring Cloud Stream的监控断点

上一篇下一篇

猜你喜欢

热点阅读