从基础到进阶,一文详解RocketMQ事务消息,看完不会跪键盘

2020-02-24  本文已影响0人  Java_苏先生

事务消息是RocketMQ提供的非常重要的一个特性,在4.x版本之后开源,可以利用事务消息轻松地实现分布式事务。本文对RocketMQ的事务消息进行详细介绍,并给出了代码示例。

一. 相关概念

RocketMQ在其消息定义的基础上,对事务消息扩展了两个相关的概念:

二. 执行流程

上面是官网提供的事务消息执行流程图,下面对具体流程进行分析:

三. 代码实例

本节通过一个简单的场景模拟RocketMQ的事务消息:存在2个微服务,分别是订单服务和商品服务。订单服务进行下单处理,并发送消息给商品服务,对于下单成功的商品进行减库存。

首先是订单服务:

/**
 * @Auther: ZhangShenao
 * @Date: 2019/3/27 16:44
 * @Description:使用RocketMQ事务消息——订单服务发送事务消息,然后进行本地下单,并通知商品服务减库存
 */
public class OrderService {
  public static void main(String[] args) throws Exception {
    TransactionMQProducer producer = new TransactionMQProducer();
    producer.setNamesrvAddr(RocketMQConstants.NAMESRV_ADDR);
    producer.setProducerGroup(RocketMQConstants.TRANSACTION_PRODUCER_GROUP);

    //自定义线程池,执行事务操作
    ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 50, 10L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(20), (Runnable r) -> new Thread("Order Transaction Massage Thread"));
    producer.setExecutorService(executor);

    //设置事务消息监听器
    producer.setTransactionListener(new OrderTransactionListener());

    producer.start();

    System.err.println("OrderService Start");

    for (int i = 0;i < 10;i++){
      String orderId = UUID.randomUUID().toString();
      String payload = "下单,orderId: " + orderId;
      String tags = "Tag";
      Message message = new Message(RocketMQConstants.TRANSACTION_TOPIC_NAME, tags, orderId, payload.getBytes(RemotingHelper.DEFAULT_CHARSET));

      //发送事务消息
      TransactionSendResult result = producer.sendMessageInTransaction(message, orderId);
      System.err.println("发送事务消息,发送结果: " + result);
    }
  }
}

事务消息需要一个TransactionListener,主要进行本地事务的执行和事务回查,代码如下:

/**
 * @Auther: ZhangShenao
 * @Date: 2019/3/27 16:50
 * @Description:订单事务消息监听器
 */
public class OrderTransactionListener implements TransactionListener {
  private static final Map<String, Boolean> results = new ConcurrentHashMap<>();

  @Override
  public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
    String orderId = (String) arg;

    //记录本地事务执行结果
    boolean success = persistTransactionResult(orderId);
    System.err.println("订单服务执行本地事务下单,orderId: " + orderId + ", result: " + success);
    return success ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE;
  }

  @Override
  public LocalTransactionState checkLocalTransaction(MessageExt msg) {
    String orderId = msg.getKeys();
    System.err.println("执行事务消息回查,orderId: " + orderId);
    return Boolean.TRUE.equals(results.get(orderId)) ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE;
  }

  private boolean persistTransactionResult(String orderId) {
    boolean success = Math.abs(Objects.hash(orderId)) % 2 == 0;
    results.put(orderId, success);
    return success;
  }
}

下面是商品服务及监听器:

/**
 * @Auther: ZhangShenao
 * @Date: 2019/3/27 17:09
 * @Description:使用RocketMQ事务消息——商品服务接收下单的事务消息,如果消息成功commit则本地减库存
 */
public class ProductService {
  public static void main(String[] args) throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
    consumer.setNamesrvAddr(RocketMQConstants.NAMESRV_ADDR);
    consumer.setConsumerGroup(RocketMQConstants.TRANSACTION_CONSUMER_GROUP);
    consumer.subscribe(RocketMQConstants.TRANSACTION_TOPIC_NAME, "*");
    consumer.registerMessageListener(new ProductListener());
    consumer.start();
    System.err.println("ProductService Start");
  }
}
/**
 * @Auther: ZhangShenao
 * @Date: 2019/3/27 17:14
 * @Description:
 */
public class ProductListener implements MessageListenerConcurrently {
  @Override
  public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    Optional.ofNullable(msgs).orElse(Collections.emptyList()).forEach(m -> {
      String orderId = m.getKeys();
      System.err.println("监听到下单消息,orderId: " + orderId + ", 商品服务减库存");
    });
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  }
}

分别运行OrderService和ProductService,可以看出只有事务执行成功的订单才会通知商品服务进行减库存。

监听到下单消息,orderId: f25a7127-307e-45ce-8f83-6e0a922ebb94, 商品服务减库存
监听到下单消息,orderId: d960171d-97c0-4e13-aa4a-c2b96102de4b, 商品服务减库存
监听到下单消息,orderId: 63aedaa2-ce74-4cb7-bf58-fb6a73082a73, 商品服务减库存
监听到下单消息,orderId: 25764461-70b2-44db-8296-960211179e6e, 商品服务减库存
监听到下单消息,orderId: fb319fe7-c8be-4edf-ae4e-6108898068ca, 商品服务减库存
监听到下单消息,orderId: 4f61a61a-7254-458a-bc10-9d4006a9f581, 商品服务减库存

出处

# 链接 Java程序员福利"常用资料分享"

上一篇 下一篇

猜你喜欢

热点阅读