RocketMQ

二、RocketMQ Simple Example 三种发送方式

2019-04-10  本文已影响9人  ASD_92f7

本篇主要通过JAVA代码来模拟Producer及Consumer,参考链接:
http://rocketmq.apache.org/docs/simple-example
https://www.jianshu.com/p/42330afbe53a

一、发送消息的三种方式

reliable synchronous - 同步可靠模式 reliable asynchronous - 异步可靠模式
one-way transmission - 单向模式

二、reliable synchronous - 同步可靠模式代码

package com.asd.rocket.controller.test;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

/**
 * @author zhangluping@sinosoft.com.cn
 * @date 2019/4/10 10:08
 */
public class Producer {
    public static void main(String[] args) throws Exception{
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new
                DefaultMQProducer("please_rename_unique_group_name");
        // Specify name server addresses.
        producer.setNamesrvAddr("10.1.11.155:9876");
        producer.setSendMsgTimeout(13000);
        producer.setCreateTopicKey("AUTO_CREATE_TOPIC_KEY");
        //Launch the instance.
        producer.start();
        for (int i = 0; i < 1; i++) {
            /*
             * Create a message instance, specifying topic, tag and message body.
             * 三个参数
             * Topic、Tag、MessageBody
             */
            Message msg = new Message("qqq","TagA" , ("Hello RocketMQ " +i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        //Shut down once the producer instance is not longer in use.
        producer.shutdown();
        System.out.println(1);
    }
}

三、reliable asynchronous - 异步可靠模式代码

这里有个大坑,异步模式,在消息还没有发的时候,producer.shutdown() 就执行了,然后又粗暴地报了一个错:No route info of this topic XXX,这里需要注意!

package com.asd.rocket.controller.test;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
/**
 * @author zhangluping@sinosoft.com.cn
 * @date 2019/4/10 10:18
 */
public class AsyncProducer {
    public static void main(String[] args) throws  Exception {
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new DefaultMQProducer("qqq");
        // Specify name server addresses.
        producer.setNamesrvAddr("10.1.11.155:9876");
        //Launch the instance.
        producer.setSendMsgTimeout(13000);
        producer.start();
        producer.setRetryTimesWhenSendAsyncFailed(0);

        for (int i = 0; i < 1; i++) {
            final int index = i;
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("qqq","TagA","OrderID188","Hello world1".getBytes(RemotingHelper.DEFAULT_CHARSET));
            producer.send(msg, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.printf("%-10d OK %s %n", index,sendResult.getMsgId());
                }
                @Override
                public void onException(Throwable e) {
                    System.out.printf("%-10d Exception %s %n", index, e);
                    e.printStackTrace();
                }
            });
        }
        //Shut down once the producer instance is not longer in use.
        Thread.sleep(3000);
        producer.shutdown();
    }
}

四、 one-way transmission - 单向模式

package com.asd.rocket.controller.test;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

/**
 * @author zhangluping@sinosoft.com.cn
 * @date 2019/4/10 16:04
 */
public class OnewayProducer {
    public static void main(String[] args) throws Exception{
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        // Specify name server addresses.
        producer.setNamesrvAddr("10.1.11.155:9876");
        //Launch the instance.
        producer.start();
        for (int i = 0; i < 100; i++) {
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("qqq" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " +
                            i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            //Call send message to deliver message to one of brokers.
            producer.sendOneway(msg);

        }
        //Shut down once the producer instance is not longer in use.
        producer.shutdown();
    }
}

五、Comsumer 消费者

package com.asd.rocket.controller.test;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * @author zhangluping@sinosoft.com.cn
 * @date 2019/4/10 10:08
 */
public class Consumer {
        public static void main(String[] args) throws Exception{
            // Instantiate with specified consumer group name.
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
            // Specify name server addresses.
            consumer.setNamesrvAddr("10.1.11.155:9876");
            // Subscribe one more more topics to consume.
            consumer.subscribe("qqq", "*");
            // Register callback to execute on arrival of messages fetched from brokers.
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                                ConsumeConcurrentlyContext context) {
                    System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            //Launch the consumer instance.
            consumer.start();
            System.out.printf("Consumer Started.%n");
        }
}
上一篇 下一篇

猜你喜欢

热点阅读