rocketmq-producer
2018-08-13 本文已影响0人
划水者
rocketmq的producer发送消息,大致会分为如下几种消息
同步消息,发送者必须同步等待;
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 0; i < 128; i++)
try {
{
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
异步消息,消息发送成功,异步通知;
DefaultMQProducer producer = new DefaultMQProducer("Jodie_Daily_test");
producer.setNamesrvAddr("localhost:9876");
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
for (int i = 0; i < 10000000; i++) {
try {
final int index = i;
Message msg = new Message("Jodie_topic_1023",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
producer.shutdown();
单向消息,消息直接发送,不管成功失败;
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 0; i < 100; i++) {
Message msg = new Message("TopicTest" , "TagA", ("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
producer.sendOneway(msg);
}
producer.shutdown();
乱序消息,消费者不能保证按照生产者发送的顺序消费;在发送消息的时候不需要指定消息的keys
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 0; i < 128; i++)
try {
{
Message msg = new Message("TopicTest",
"TagA",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
顺序消息,消费者按照生产者生产顺序消费;发送消息时,需要指定消息的发送队列,broker会需要保持有序的消息放到同一个队列中,消费者消费同一个队列中的消息可以保证有序,注意不同队列之间消息不能保证有序性
MQProducer producer = new DefaultMQProducer("example_group_name");
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 100; i++) {
int orderId = i % 10;
Message msg = new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
实时消息,发送者发送的消息,对消费者立刻可见;上述所写所有demo其实都是属于实时消息,只要发送,对消费者是实时可见的
延迟消息,发送者发送的消息,需要等待一段时间对消费者可见;
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 0; i < 128; i++)
try {
{
//指定keys为OrderID188
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
//设置延迟消费级别
msg.setDelayTimeLevel(3);
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
rocketmq的延迟消费,只支持特定时间的延迟,通过设置延迟时间的级别来确定延迟的时间
默认的延迟级别的配置如下,可进行修改
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
如果设置delayTimeLevel为3,表示延迟时间为10s
单个消息,发送者每次发送到服务端是一条消息;上述列举的demo都是属于单个消息发送
批量消息,发送者每次发送到服务端是多条消息;
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 0; i < 128; i++)
try {
{
//指定keys为OrderID188
Message msg1 = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
Message msg2 = new Message("TopicTest",
"TagA",
"OrderID189",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
List<Message> msgs = new ArrayList<>();
msgs.add(msg1);
msgs.add(msg2);
SendResult sendResult = producer.send(msgs);
System.out.printf("%s%n", sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
发送批量消息主要目的是为了减少io交互