roketmq安装及demo
1.下载
目前roketmq最新版为4.4.0 , 之前装过roketmq 3.X版本 ,升级到4版本后roketmq 的配置目录及配置文件有些变化。4.x版本后也是重新启用了对事物消息的支持。目前更新也比较活跃。
这里下载的是已经编译后的版本
https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.4.0/rocketmq-all-4.4.0-bin-release.zip
2.解压 配置 启动mq
安装前保证你的机器已经装好jdk8
#安装到 /usr/local下
cd /usr/local
#这里直接下载编译后版本, 无需编译打包
unzip rocketmq-all-4.4.0-bin-release.zip
mv rocketmq-all-4.4.0-bin-release rocketmq-4.4.0
cd rocketmq-4.4.0
编译后的版本直接解压后就可以用了,在启动roketmq之前需要修改一下jvm参数 , 因为我们学习环境下的vmware安装的系统内存一般不会太高,而这里的默认jvm参数一般是生产环境下推荐的参数配置
cd rocketmq-4.4.0/bin
###修改name-service 的启动参数
vim runservice.sh
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
#修改成
JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx128m -Xmn64m -XX:MetaspaceSize=56m -XX:MaxMetaspaceSize=128m"
#修改broker的启动参数
vim runbroker.sh
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
修改成
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
启动name-service
#启动name-service
nohup sh bin/mqnamesrv &
#jps 查看启动进程
jps
2381 NamesrvStartup
#查看启动日志
tail -f ~/logs/rocketmqlogs/namesrv.log
#nameService log
2019-03-11 06:30:09 INFO main - tls.client.keyPath = null
2019-03-11 06:30:09 INFO main - tls.client.keyPassword = null
2019-03-11 06:30:09 INFO main - tls.client.certPath = null
2019-03-11 06:30:09 INFO main - tls.client.authServer = false
2019-03-11 06:30:09 INFO main - tls.client.trustCertPath = null
2019-03-11 06:30:09 INFO main - Using OpenSSL provider
2019-03-11 06:30:10 INFO main - SSLContext created for server
2019-03-11 06:30:10 INFO NettyEventExecutor - NettyEventExecutor service started
2019-03-11 06:30:10 INFO main - The Name Server boot success. serializeType=JSON
2019-03-11 06:30:10 INFO FileWatchService - FileWatchService service started
启动borket
nohup sh bin/mqbroker -n localhost:9876 &
jps
2418 BrokerStartup #borker 进程
2467 Jps
2381 NamesrvStartup #nameService进程
tail -f ~/logs/rocketmqlogs/broker.log
##############log################
[root@localhost rocketmq-4.4.0]# tail -f ~/logs/rocketmqlogs/broker.log
2019-03-11 06:33:26 INFO main - The broker dose not enable acl
2019-03-11 06:33:26 INFO FileWatchService - FileWatchService service started
2019-03-11 06:33:26 INFO PullRequestHoldService - PullRequestHoldService service started
2019-03-11 06:33:27 INFO brokerOutApi_thread_1 - register broker to name server localhost:9876 OK
2019-03-11 06:33:27 INFO main - Start transaction service!
2019-03-11 06:33:27 INFO main - The broker[localhost.localdomain, 192.168.31.202:10911] boot success. serializeType=JSON and name server is localhost:9876
2019-03-11 06:33:36 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes
2019-03-11 06:33:36 INFO BrokerControllerScheduledThread1 - Slave fall behind master: 0 bytes
2019-03-11 06:33:37 INFO brokerOutApi_thread_2 - register broker to name server localhost:9876 OK
2019-03-11 06:34:07 INFO brokerOutApi_thread_3 - register broker to name server localhost:9876 OK
测试发送接收消息
#使用自带的消息发送脚本
export NAMESRV_ADDR=localhost:9876
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
###发送成功日志
SendResult [sendStatus=SEND_OK, msgId=C0A81FCA09C37F31245A317D90E60088, offsetMsgId=C0A81FCA00002A9F0000000000005F32, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=2], queueOffset=34]
....
#启动消息消费端
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
### 打印消费日志 ...
至此单机版的roketmq已经部署成功 , 接下来我们一起把官网的栗子全部跑一遍
3.消息发送接收demo
1.普通消息
普通消息是实际应用中应用最多的场景,也是mq的基本功能, roketmq采用发布-订阅的消息模型。
首先在你的客户端程序中加入roket-client 的依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.3.0</version>
</dependency>
同步发送生产者
: 发送消息后会等待mq的ack结果,在等待过程会产生线程阻塞,是应用中使用最广泛的一种消息发送模型。
public class SyncProducer {
public static void main(String[] args) throws Exception {
//给消息生产者分组命名
DefaultMQProducer producer = new
DefaultMQProducer("producer_demo");
// 设置nameService 的地址
producer.setNamesrvAddr("192.168.31.202:9876");
//Launch the instance.
producer.start();
for (int i = 0; i < 100; i++) {
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message(
//topicName, 消息标签
"sync_topic_test","TagA",
//消息体
("Hello RocketMQ " +i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
//发动消息获取消息返回结果
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
//Shut down once the producer instance is not longer in use.
producer.shutdown();
}
}
异步消息发送端
: 发送消息后线程不会阻塞,拥有两个回调函数,消息发送成功或者失败回调,同事异步发送还可以设置发送失败重试,对于一些需要快速响应用户并且无需依靠发送结果处理接下来的流程,可以选用异步发送。
public class AsyncProducer {
public static void main(String[] args) throws Exception {
//生产者分组
DefaultMQProducer producer = new DefaultMQProducer("producer_demo");
//nameServce地址
producer.setNamesrvAddr("192.168.31.202:9876");
//Launch the instance.
producer.start();
//异步发送失败重试次数 (发送重试)
producer.setRetryTimesWhenSendAsyncFailed(0);
for (int i = 0; i < 100; i++) {
final int index = i;
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("async_topic_test",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.send(msg, new SendCallback() {
//发送成功的回调
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index,
sendResult.getMsgId());
}
//发送异常的回调
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
}
//Shut down once the producer instance is not longer in use.
producer.shutdown();
}
}
单向发送
: 单向发送是一种不可靠的发送, 不会接受到mq的ack , 当然该发送方式也是异步的,意味着这种发送方式可以更高效的处理消费的并发 , 但是消息发送失败和丢失都是可能存在的风险,对一些可以接受消息丢失,发送失败并且有高并发的消息场景实用, 比如日志类型信息的收集。
public class OnewayProducer {
public static void main(String[] args) throws Exception{
//Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// Specify name server addresses.
producer.setNamesrvAddr("localhost:9876");
//Launch the instance.
producer.start();
for (int i = 0; i < 100; i++) {
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//Call send message to deliver message to one of brokers.
producer.sendOneway(msg);
}
//Shut down once the producer instance is not longer in use.
producer.shutdown();
}
}
普通消费者
: 我们注意到消息的消费者同样需要设置消费者分组名, 这里不需要设置和发送端一样的组名, 消费分组意味着除广播消息, 一条消息只会在同组的一个实例中消费成功一次(异常情况除外),同组的消费端将由mq实现负负载均衡发送消息,消息订阅的topicName 需要和发送端的topicName 一致。当消息返回成功状态代表消息已经消费成功,返回其他结果或者是不返回,mq将进行消息重试。
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// 消费分组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_test");
//NameService地址
consumer.setNamesrvAddr("192.168.31.202:9876");
// 订阅的topicName
consumer.subscribe("TopicTest", "*");
// 当接受到mq推送的消息时执行该回调方法
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
//返回成功状态到mq
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//Launch the consumer instance.
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
2.顺序消息
顺序消息指发送有序和消费有序, 在发送有序的前提下消费是有序的,由于普通消息是多消费端并发消费, 并且发送的消息也存储在多个消息列队中, 所以即使先发送的消息也有可能后消费。
顺序具体实现原理后面再展开详细讨论
顺序消息发送者
: 顺序消息的发送者需保证同一订单(顺序消息)的发送是有序的, 并且发送到mq的同一个列队里。
public class OrderedProducer {
public static void main(String[] args) throws Exception {
//Instantiate with a producer group name.
MQProducer producer = new DefaultMQProducer("ordered_producer_group ");
//Launch the instance.
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 100; i++) {
int orderId = i % 10;
Message msg = new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
/**
mqs : 消息列队的所有列队
msg : 消息体
orderId : 订单号(同一类消息唯一编号)
保证同一定单号的消息(需要顺序消费的消息发送到同一列队)
**/
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
System.out.printf("%s%n", sendResult);
}
//server shutdown
producer.shutdown();
}
}
顺序消息的消费
: 顺序消息的消费端需要用MessageListenerOrderly对象监听顺序消息,由于网络的不可达, roketmq并不能保证消息100%的顺序消费。
public class OrderedConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "TagA || TagC || TagD");
consumer.registerMessageListener(new MessageListenerOrderly() {
AtomicLong consumeTimes = new AtomicLong(0);
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
context.setAutoCommit(false);
System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
this.consumeTimes.incrementAndGet();
//本实例通过对部分消息消费失败来达到最终结果是顺序的体现顺序消息
if ((this.consumeTimes.get() % 2) == 0) {
return ConsumeOrderlyStatus.SUCCESS;
} else if ((this.consumeTimes.get() % 3) == 0) {
return ConsumeOrderlyStatus.ROLLBACK;
} else if ((this.consumeTimes.get() % 4) == 0) {
return ConsumeOrderlyStatus.COMMIT;
} else if ((this.consumeTimes.get() % 5) == 0) {
context.setSuspendCurrentQueueTimeMillis(3000);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
3.广播消息
广播消息接受端
: 在广播消息的接受端设置客户端类型为广播类型,可以看出广播消息与普通消息的区别只是在消费端设置消息模型,设置为广播类型, mq将继续将此消息发送给其他消费端,达到all-consumer的效果。
public class BroadcastConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//设置广播消息模型
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.subscribe("TopicTest", "TagA || TagC || TagD");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Broadcast Consumer Started.%n");
}
}
4.延迟消息(定时消息)
消息发送到mq后并不会立刻消费, 而是在等待一段时间后才发送到消费端。延迟消息的消费端与普通消息无异 , 在发送消息时设置延迟等级 ,这里不支持设置任意延迟时间 。
每个等级对应的延迟时间为 : 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
public class ScheduledMessageProducer {
public static void main(String[] args) throws Exception {
// Instantiate a producer to send scheduled messages
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
// Launch producer
producer.start();
int totalMessagesToSend = 100;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
//设置延迟等级为3 对应延迟10秒
message.setDelayTimeLevel(3);
// Send the message
producer.send(message);
}
// Shutdown producer after use.
producer.shutdown();
}
}
5.批量消息
roketmq 还支持消息的批量发送 , 当需要发送大量的消息时,批量发送会提高发送效率。批量消息必须要在同一个topic 和同一个发送等待模型(同步或异步单向等)并且是不支持定时消息的。
还有一个很重要的点, 就是批量消息的数据总量不能大于1M
批量消息的发送,把批量消息封装成list结构发送即可 , 对于批量发送的数据总量不能大于1M , 官网上给出一个分割消息的例子。
String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
try {
producer.send(messages);
} catch (Exception e) {
e.printStackTrace();
//handle the error
}
6.消息过滤
roketmq支持消息过滤 , 当多个消息端订阅了同一个topic ,可以通过消息过滤来让特定的消费端消费指定类型的消息。 roketmq 提供了多种消息过滤方法。
方案1.在消费端指定消费的TAG
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
//在消费指定消费TOPIC 中TAGA ,TAGB ,TAGC 标签
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
方案2.用SQL 表达式过滤消息
在消息发送端给消息添加一个自定义属性
//给消息添加一个属性a ,属性值123
msg.putUserProperty("a", "123");
SendResult sendResult = producer.send(msg);
在消息的接收端添加过滤规则
//添加一个过滤规则 a 在0-100 之间
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 100");
onsumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
7.事务消息
事务消息在roketmq版本4后又重新回归(3.X进行闭源), 事务消息是roketmq一个比较强大的功能, 也是目前唯一支持事物消息的开源消息中间件,事物消息能解决在很多场景下的分布式事物问题, 达到最终一致性, 比起传统型的分布式事物具有更高的效率,当然只是适用于满足最终一致性的事物模型 。事物消息保证了本地数据库事物和消息发送能同时成功或者失败,实现原理可以看作就是一次两阶段提交。具体实现后面我们再展开讨论。
事物消息的发送端需设置一个线程池和绑定一个本地事物
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
ExecutorService executorService = new ThreadPoolExecutor
(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
//为事物消息的发送者绑定本地事物
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg =
new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
}
本地事物
: 实现一个TransactionListener接口, 将本地数据库事物写在executeLocalTransaction方法中
将事物回调查询逻辑写在 checkLocalTransaction 方法中。
public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Integer status = localTrans.get(msg.getTransactionId());
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
3.总结
从官网的这几个例子来看, roketmq 功能是非常强大的。接下来后面我们将展开roketmq的每种消息类型的具体实现进行详细研究, 另外对roketmq的分布式部署,消息存储结构也会一一分析。