rocketmq部署、入门

2020-12-20  本文已影响0人  kafeimao

参考:https://www.cnblogs.com/kiwifly/p/11546008.html
前提:安装了docker

启动nameserver

创建nameserver的日志和数据存放路径

/home/rocketmq/namesrv/logs
/home/rocketmq/namesrv/store

启动命令

docker run -d --name rmqnamesrv -p 9876:9876 
-v /home/rocketmq/namesrv/logs:/opt/logs 
-v /home/rocketmq/namesrv/store:/opt/store
 rocketmqinc/rocketmq 
sh mqnamesrv

启动broker

创建broker的日志和数据存放的路径以及配置

/home/rocketmq/broker/logs
/home/rocketmq/broker/store
/home/rocketmq/broker/conf/broker.conf

在broker.conf中写入配置

terName=DefaultCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
brokerIP1= 我的服务器外网IP
namesrvAddr=我的服务器外网IP:9876

启动命令

docker run -d --name rmqbroker  -p 10911:10911 -p 10909:10909
 -v  /home/rocketmq/broker/logs:/root/logs 
-v  /home/rocketmq/broker/store:/root/store 
-v /home/rocketmq/broker/conf/broker.conf:/opt/rocketmq/conf/broker.conf 
--link rmqnamesrv:namesrv 
-e "NAMESRV_ADDR=namesrv:9876" 
rocketmqinc/rocketmq 
sh mqbroker -c /opt/rocketmq/conf/broker.conf

启动rocketmq的控制台

docker run -d -e "JAVA_OPTS=-Drocketmq.config.namesrvAddr=我的服务器IP:9876 -Drocketmq.config.isVIPChannel=false" -p 8082:8080 -t styletang/rocketmq-console-ng

打开浏览器看看


image.png

Java入门测试

maven依赖

<dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.1.0-incubating</version>
        </dependency>

生产者

public class Producer {
    public static void main(String[] args) {
        DefaultMQProducer producer = new DefaultMQProducer("Producer");
        producer.setNamesrvAddr("我的服务器ip:9876");
        try {
            producer.start();
            Message msg = new Message("test-topic","tag","1"."Just for test.".getBytes());
            SendResult result = producer.send(msg);
            System.out.println("id:" + result.getMsgId() + " result:" + result.getSendStatus());
        } catch (Exception e) {
            e.printStackTrace();
        }finally{
            producer.shutdown();
        }
    }
}

消费者

public class Consumer {
    public static void main(String[] args) {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushConsumer");
        consumer.setNamesrvAddr("我的服务器ip:9876");
        try {
            consumer.subscribe("test-topic", "tag");
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                                                 @Override
                                                 public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext Context) {
                                                     Message msg = list.get(0);
                                                     String topic = msg.getTopic();
                                                     System.out.println("topic = " + topic);
                                                     byte[] body = msg.getBody();
                                                     System.out.println("body:  " + new String(body));
                                                     String keys = msg.getKeys();
                                                     System.out.println("keys = " + keys);
                                                     String tags = msg.getTags();
                                                     System.out.println("tags = " + tags);
                                                     System.out.println("-----------------------------------------------");
                                                     return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                                                 }
                                             }
            );
            consumer.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

整合springboot

上一篇 下一篇

猜你喜欢

热点阅读