RocketMQ 快速开始quickstart && borke

2019-04-08  本文已影响0人  严重思想跑偏患者

producer

public class Producer1 {

    public static void main(String[] agrs) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {

        DefaultMQProducer producer = new
                DefaultMQProducer("TopicTest3-produceGroup1",ACLClient.getAclRPCHook());
        // Specify name server addresses.
        producer.setNamesrvAddr("10.1.54.46:9876");
        producer.setInstanceName("producer 1");
        //Launch the instance.
        producer.start();
        
        for (int i = 0; i < 10; i++) {
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("TopicTest3" /* Topic */,
                    "abc" /* Tag */,"OrderID188",
                    ("Hello RocketMQ " +
                            i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            msg.putUserProperty("coal", String.valueOf(i));
            //Call send message to deliver message to one of brokers.
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        //Shut down once the producer instance is not longer in use.
        producer.shutdown();

    }
}

consumer

public class AclConsumer {

    public static void main(String[] agrs) throws MQClientException {
        //指定Group和ACL
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup5", ACLClient.getAclRPCHook(), new AllocateMessageQueueAveragely());

        consumer.setNamesrvAddr("localhost:9876");
        consumer.setInstanceName("consumer 5");

        //集群订阅(MessageModel.CLUSTERING)
        //广播订阅(MessageModel.BROADCASTING)
        consumer.setMessageModel(MessageModel.BROADCASTING);

        //CONSUME_FROM_LAST_OFFSET 默认策略,从该队列最尾开始消费,即跳过历史消息
        //CONSUME_FROM_FIRST_OFFSET 从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍
        //CONSUME_FROM_TIMESTAMP 从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        //可以修改每次消费消息的数量,默认设置是每次消费一条
        consumer.setConsumeMessageBatchMaxSize(10);


        //设置consumer所订阅的Topic和Tag,*代表全部的Tag MessageSelector.byTag
        consumer.subscribe("TopicTest3", MessageSelector.bySql("coal between 2 and 7"));

        //注册消费的监听
        consumer.registerMessageListener(MessageListener.getInstance());

        consumer.start();
        System.out.println("consumer 2 is started");

    }
}

ACLClient

public class ACLClient {

    private static final String ACL_ACCESS_KEY = "RocketMQ2";
    private static final String ACL_SECRET_KEY = "12345678";

    static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY,ACL_SECRET_KEY));
    }

    private static final String TEST_ACL_ACCESS_KEY = "testKey";
    private static final String TEST_ACL_SECRET_KEY = "12345678";

    static RPCHook getTestAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials(TEST_ACL_ACCESS_KEY,TEST_ACL_SECRET_KEY));
    }


    private static final String ERROR_ACL_ACCESS_KEY = "RocketMQ333";
    private static final String ERROR_ACL_SECRET_KEY = "12345673333";

    static RPCHook getErrorAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials(ERROR_ACL_ACCESS_KEY,ERROR_ACL_SECRET_KEY));
    }
}

Consumer 的 MessageListener

public class MessageListener implements MessageListenerConcurrently {

    private static MessageListener instance = null;

    public static MessageListener getInstance() {
        if (instance == null) {
            synchronized (MessageListener.class) {
                if (instance == null) {
                    instance = new MessageListener();
                }
            }
        }
        return instance;
    }

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        for (MessageExt messageExt : msgs) {
            String messageBody = new String(messageExt.getBody());
            System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(
                    new Date()) + "消费响应: msgBody : " + messageBody);//输出消息内容
        }
        //ACK
        //CONSUME_SUCCESS 消费成功
        //RECONSUME_LATER 消费失败,需要稍后重新消费
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

    private MessageListener() {
    }
}

Borker配置文件介绍

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.

#  brokerClusterName = DefaultCluster
#  brokerName = broker-a
#  brokerId = 0
#  deleteWhen = 04
#  fileReservedTime = 48
#  brokerRole = ASYNC_MASTER
#  flushDiskType = ASYNC_FLUSH

# 所属集群名字 
brokerClusterName = default-rocketmq-cluster
# true后可以使用SQL92
enablePropertyFilter = true
#broker名字,注意此处不同的配置文件填写的不一样 
brokerName = broker-a
#0 表示 Master,>0 表示 Slave
brokerId = 0 
#nameServer地址,分号分割
brokerIP1 = 192.168.6.46
namesrvAddr = 192.168.6.46:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 
defaultTopicQueueNums = 4 
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭 
autoCreateTopicEnable = true 
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭 
autoCreateSubscriptionGroup = true 
#Broker 对外服务的监听端口 
listenPort = 10911 
#删除文件时间点,默认凌晨 4点 
deleteWhen = 04 
#文件保留时间,默认 48 小时 
fileReservedTime = 120 
#commitLog每个文件的大小默认1G 
mapedFileSizeCommitLog = 1073741824 
#ConsumeQueue每个文件默认存30W条,根据业务情况调整 
mapedFileSizeConsumeQueue = 300000 
#destroyMapedFileIntervalForcibly=120000 
#redeleteHangedFileInterval=120000 
#检测物理文件磁盘空间 
diskMaxUsedSpaceRatio = 88 
#存储路径 
storePathRootDir = D:\RocketMQ\target 
#commitLog 存储路径 
storePathCommitLog = D:\RocketMQ\target\commitLog 
#消费队列存储路径存储路径 
storePathConsumeQueue = D:\RocketMQ\target\consumequeue 
#消息索引存储路径 
storePathIndex = D:\RocketMQ\target\index 
#checkpoint 文件存储路径 
storeCheckpoint = D:\RocketMQ\target\checkpoint 
#Broker 的角色 
#- ASYNC_MASTER 异步复制Master 
#- SYNC_MASTER 同步双写Master 
#- SLAVE brokerRole=ASYNC_MASTER 
#刷盘方式 
#- ASYNC_FLUSH 异步刷盘 
#- SYNC_FLUSH 同步刷盘 
flushDiskType = ASYNC_FLUSH 
#checkTransactionMessageEnable=false 
#abort 文件存储路径 
abortFile = D:\RocketMQ\target\abort 
#限制的消息大小 
maxMessageSize = 65536 
#flushCommitLogLeastPages=4 
#flushConsumeQueueLeastPages=2 
#flushCommitLogThoroughInterval=10000 
#flushConsumeQueueThoroughInterval=60000
#acl控制
aclEnable=true
上一篇下一篇

猜你喜欢

热点阅读