RocketMQ知识(及开发实战)

2018-09-28  本文已影响0人  Huang远

MQ基础概念:

MQ生产者者实例:

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {

        //声明并初始化一个producer
        //需要一个producer group名字作为构造方法的参数,这里为producer1
        DefaultMQProducer producer = new DefaultMQProducer("producer1");
        
        //设置NameServer地址,此处应改为实际NameServer地址,多个地址之间用;分隔
        //NameServer的地址必须有,但是也可以通过环境变量的方式设置,不一定非得写死在代码里
        producer.setNamesrvAddr("10.1.54.121:9876;10.1.54.122:9876");
        
        //调用start()方法启动一个producer实例
        producer.start();

        //发送10条消息到Topic为TopicTest,tag为TagA,消息内容为“Hello RocketMQ”拼接上i的值
        for (int i = 0; i < 10; i++) {
            try {
                Message msg = new Message("TopicTest",// topic
                        "TagA",// tag
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)// body
                );
                
                //调用producer的send()方法发送消息
                //这里调用的是同步的方式,所以会有返回结果
                SendResult sendResult = producer.send(msg);
                
                //打印返回结果,可以看到消息发送的状态以及一些相关信息
                System.out.println(sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }

        //发送完消息之后,调用shutdown()方法关闭producer
        producer.shutdown();
    }
}

MQ消费者实例:

在开发过程中,如果想测试生产者是否发出了mq,可以编写一个消费者进行测试

@Test
public void testMqConsumer() throws Exception {
    String rocketmqAddress="10.113.41.2:9876;10.113.41.4:9876";

    int threadNum = 5;
    String topics = "WechatUnionCoreTemplateNotifyTopic";
    String instanceName = "TemplateComsumer";
    String groupName = "wechatUnionTemplateNotifyConsumer";
    DefaultMQPushConsumer consumer = null;

    consumer = new DefaultMQPushConsumer(groupName);
    consumer.setNamesrvAddr(rocketmqAddress);//MQ地址
    consumer.setClientCallbackExecutorThreads(threadNum);//消费现场数量
    consumer.setInstanceName(instanceName);//实例名称
    consumer.subscribe(topics, "*");


    //注册监听
    consumer.registerMessageListener(new MessageListenerConcurrently() {

        @Override
        public ConsumeConcurrentlyStatus consumeMessage(
                List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
            for (int i = 0; i < msgs.size(); i++) {
                MessageExt msgExt =  msgs.get(i);
                String msgId = msgExt.getMsgId();
                Integer flag = msgExt.getFlag();
                TemplateNotifyItem templateNotifyItem = ProtoBufSerialize.fromProto(msgExt.getBody(), TemplateNotifyItem.class);
                logger.info("receive new Msg:    " + "  msgId=" + msgId + "   flag=" + flag + "  templateNotifyItem=" + templateNotifyItem);
            }

            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    logger.info("监听执行中");



    Thread.sleep(1000000);
}

参考:
http://blog.csdn.net/manzhizhen/article/details/52606733
https://www.jianshu.com/p/824066d70da8
架构师之路-mq系列

上一篇 下一篇

猜你喜欢

热点阅读