在代码中实现生产者和消费者one2one

2021-09-01  本文已影响0人  你家门口的两朵云

Producer.java

package edu.hgnu.one2one;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

import java.nio.charset.StandardCharsets;

public class Provider {
    public static void main(String[] args) throws Exception {
        //1.谁来发
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        //2.发给谁
        producer.setNamesrvAddr("localhost:9876");
        producer.start();
        //3.怎么发
        //4.发什么
        String msg = "  盼望着,盼望着,东风来了,春天的脚步近了。\n" +
                "  一切都像刚睡醒的样子,欣欣然张开了眼。山朗润起来了,水涨起来了,太阳的脸红起来了。\n" +
                "  小草偷偷地从土地里钻出来,嫩嫩的,绿绿的。园子里,田野里,瞧去,一大片一大片满是的。\n" +
                "   坐着,躺着,打两个滚,踢几脚球,赛几趟跑,捉几回迷藏。风轻俏俏的`,草软绵绵的。";
        Message message = new Message("topic1", "tag1", msg.getBytes(StandardCharsets.UTF_8));
        SendResult sendResult = producer.send(message);
        //5.发的结果是什么
        System.out.println(sendResult.toString());
        //6.打扫战场
        producer.shutdown();
    }
}

Consumer.java

package edu.hgnu.one2one;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * @author admin
 */
public class Consumer {
    public static void main(String[] args) throws Exception {
        //1.创建一个接收消息的对象Consumer
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        //2.设定接收的命名服务器地址
        consumer.setNamesrvAddr("localhost:9876");
        //3.设置接收信息对应的topic,对应的sub标签为任意
        consumer.subscribe("topic1","*");
        //4.启动监听,接收消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt msg:list) {
                    System.out.println("msg=====>"+msg);
                    String s = new String(msg.getBody());
                    System.out.println(s);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        //5.启动接收消息的服务
        consumer.start();
    }
}

1.run Provider.java

控制台:

"C:\Program Files\Java\jdk1.8.0_101\bin\java.exe" ...
SendResult [sendStatus=SEND_OK, msgId=7F0000013BCC18B4AAC20330096D0000, offsetMsgId=AC10328300002A9F00000000002977AE, messageQueue=MessageQueue [topic=topic1, brokerName=SK-20210719XOMZ, queueId=1], queueOffset=3]
14:51:19.807 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
14:51:19.820 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
14:51:19.821 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[172.16.50.131:10911] result: true

2.run Consumer.java

控制台:

msg=====>MessageExt [brokerName=SK-20210719XOMZ, queueId=1, storeSize=636, queueOffset=3, sysFlag=0, bornTimestamp=1630479079790, bornHost=/172.16.50.131:57667, storeTimestamp=1630479079795, storeHost=/172.16.50.131:10911, msgId=AC10328300002A9F00000000002977AE, commitLogOffset=2717614, bodyCRC=1403423844, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='topic1', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=4, CONSUME_START_TIME=1630479232738, UNIQ_KEY=7F0000013BCC18B4AAC20330096D0000, CLUSTER=DefaultCluster, WAIT=true, TAGS=tag1}, body=[-29, -128, -128, -29, -128, -128, -25, -101, -68, -26, -100, -101, -25, -99, -128, -17, -68, -116, -25, -101, -68,], transactionId='null'}]
  盼望着,盼望着,东风来了,春天的脚步近了。
  一切都像刚睡醒的样子,欣欣然张开了眼。山朗润起来了,水涨起来了,太阳的脸红起来了。
  小草偷偷地从土地里钻出来,嫩嫩的,绿绿的。园子里,田野里,瞧去,一大片一大片满是的。
   坐着,躺着,打两个滚,踢几脚球,赛几趟跑,捉几回迷藏。风轻俏俏的,草软绵绵的。

上一篇 下一篇

猜你喜欢

热点阅读