springboot-rocketmq整合

2019-02-25  本文已影响0人  zxy_3197

1、application.properties
spring.application.name = demoTest

mybatis

spring.profiles.active=dev
spring.datasource.url=jdbc:mysql://localhost:3306/saasboard
spring.datasource.username=root
spring.datasource.password=root
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource

region redis

spring.redis.host=192.168.207.18
spring.redis.port=6379

spring.redis.password=group@123

spring.redis.database=0
spring.redis.pool.max-active=150
spring.redis.pool.max-idle=30
spring.redis.pool.max-wait=3000
spring.redis.pool.min-idle=10

producer

该应用是否启用生产者

rocketmq.producer.isOnOff=on

发送同一类消息的设置为同一个group,保证唯一,默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示

rocketmq.producer.groupName=${spring.application.name}

mq的nameserver地址

rocketmq.producer.namesrvAddr=192.168.205.196:9876

消息最大长度 默认1024*4(4M)

rocketmq.producer.maxMessageSize=4096

发送消息超时时间,默认3000

rocketmq.producer.sendMsgTimeout=3000

发送消息失败重试次数,默认2

rocketmq.producer.retryTimesWhenSendFailed=2

consumer

该应用是否启用消费者

rocketmq.consumer.isOnOff=on
rocketmq.consumer.groupName=${spring.application.name}

mq的nameserver地址

rocketmq.consumer.namesrvAddr=192.168.205.196:9876

该消费者订阅的主题和tags(""号表示订阅该主题下所有的tags),格式:topictag1||tag2||tag3;topic2;

rocketmq.consumer.topics=DemoTopic~*;
rocketmq.consumer.consumeThreadMin=20
rocketmq.consumer.consumeThreadMax=64

设置一次消费消息的条数,默认为1条

rocketmq.consumer.consumeMessageBatchMaxSize=1

2、pom.xml文件
<dependency>
<groupId>com.alibaba.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>3.2.6</version>
</dependency>
3、消费端配置
package com.example.demo.rocketmq.consumer;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.example.demo.rocketmq.constants.RocketMQErrorEnum;
import com.example.demo.rocketmq.consumer.processor.MQConsumeMsgListenerProcessor;
import com.example.demo.rocketmq.exception.RocketMQException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.util.StringUtils;

/**

import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.example.demo.rocketmq.constants.RocketMQErrorEnum;
import com.example.demo.rocketmq.exception.RocketMQException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.util.StringUtils;

/**

5.监听配置
package com.example.demo.rocketmq.consumer.processor;

import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.util.List;

/**

import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.remoting.exception.RemotingException;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class DemoApplicationTests {

private static final Logger logger = LoggerFactory.getLogger(DemoApplicationTests.class);

/**使用RocketMq的生产者*/
@Autowired
private DefaultMQProducer defaultMQProducer;

/**
 * 发送消息
 *
 * 2018年3月3日 zhaowg
 * @throws InterruptedException
 * @throws MQBrokerException
 * @throws RemotingException
 * @throws MQClientException
 */
@Test
public void send() throws MQClientException, RemotingException, MQBrokerException, InterruptedException{
    String msg = "demo msg test";
    logger.info("开始发送消息:"+msg);
    Message sendMsg = new Message("DemoTopic","DemoTag",msg.getBytes());
    //默认3秒超时
    SendResult sendResult = defaultMQProducer.send(sendMsg);
    logger.info("消息发送响应信息:"+sendResult.toString());
}

}

上一篇下一篇

猜你喜欢

热点阅读