rocketmq-producer

2018-08-13  本文已影响0人  划水者

rocketmq的producer发送消息,大致会分为如下几种消息

同步消息,发送者必须同步等待;

    DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");

    producer.setNamesrvAddr("localhost:9876");

    producer.start();

    for (int i = 0; i < 128; i++)
        try {
            {
                Message msg = new Message("TopicTest",
                    "TagA",
                    "OrderID188",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            }

        } catch (Exception e) {
            e.printStackTrace();
        }

    producer.shutdown();

异步消息,消息发送成功,异步通知;

    DefaultMQProducer producer = new DefaultMQProducer("Jodie_Daily_test");
    producer.setNamesrvAddr("localhost:9876");
    producer.start();
    producer.setRetryTimesWhenSendAsyncFailed(0);

    for (int i = 0; i < 10000000; i++) {
        try {
            final int index = i;
            Message msg = new Message("Jodie_topic_1023",
                "TagA",
                "OrderID188",
                "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
            producer.send(msg, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
                }

                @Override
                public void onException(Throwable e) {
                    System.out.printf("%-10d Exception %s %n", index, e);
                    e.printStackTrace();
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    producer.shutdown();

单向消息,消息直接发送,不管成功失败;

    DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
    producer.setNamesrvAddr("localhost:9876");
    producer.start();
    for (int i = 0; i < 100; i++) {
        Message msg = new Message("TopicTest" , "TagA", ("Hello RocketMQ " +
                        i).getBytes(RemotingHelper.DEFAULT_CHARSET)
        );
        producer.sendOneway(msg);

    }
    producer.shutdown();

乱序消息,消费者不能保证按照生产者发送的顺序消费;在发送消息的时候不需要指定消息的keys

    DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");

    producer.setNamesrvAddr("localhost:9876");

    producer.start();

    for (int i = 0; i < 128; i++)
        try {
            {
                Message msg = new Message("TopicTest",
                    "TagA",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            }

        } catch (Exception e) {
            e.printStackTrace();
        }

    producer.shutdown();

顺序消息,消费者按照生产者生产顺序消费;发送消息时,需要指定消息的发送队列,broker会需要保持有序的消息放到同一个队列中,消费者消费同一个队列中的消息可以保证有序,注意不同队列之间消息不能保证有序性

    MQProducer producer = new DefaultMQProducer("example_group_name");
    producer.start();
    String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
    for (int i = 0; i < 100; i++) {
        int orderId = i % 10;
        Message msg = new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,
                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
        SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
            @Override
            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                Integer id = (Integer) arg;
                int index = id % mqs.size();
                return mqs.get(index);
            }
        }, orderId);

        System.out.printf("%s%n", sendResult);
    }
    producer.shutdown();

实时消息,发送者发送的消息,对消费者立刻可见;上述所写所有demo其实都是属于实时消息,只要发送,对消费者是实时可见的

延迟消息,发送者发送的消息,需要等待一段时间对消费者可见;

    DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");

    producer.setNamesrvAddr("localhost:9876");

    producer.start();

    for (int i = 0; i < 128; i++)
        try {
            {
                //指定keys为OrderID188
                Message msg = new Message("TopicTest",
                    "TagA",
                    "OrderID188",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                //设置延迟消费级别
                msg.setDelayTimeLevel(3);
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            }

        } catch (Exception e) {
            e.printStackTrace();
        }

    producer.shutdown();

rocketmq的延迟消费,只支持特定时间的延迟,通过设置延迟时间的级别来确定延迟的时间
默认的延迟级别的配置如下,可进行修改
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
如果设置delayTimeLevel为3,表示延迟时间为10s

单个消息,发送者每次发送到服务端是一条消息;上述列举的demo都是属于单个消息发送

批量消息,发送者每次发送到服务端是多条消息;

    DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");

    producer.setNamesrvAddr("localhost:9876");

    producer.start();

    for (int i = 0; i < 128; i++)
        try {
            {
                //指定keys为OrderID188
                Message msg1 = new Message("TopicTest",
                    "TagA",
                    "OrderID188",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                Message msg2 = new Message("TopicTest",
                        "TagA",
                        "OrderID189",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                List<Message> msgs = new ArrayList<>();
                msgs.add(msg1);
                msgs.add(msg2);
                SendResult sendResult = producer.send(msgs);
                System.out.printf("%s%n", sendResult);
            }

        } catch (Exception e) {
            e.printStackTrace();
        }

    producer.shutdown();

发送批量消息主要目的是为了减少io交互

上一篇下一篇

猜你喜欢

热点阅读