记RocketMQ的一次使用
2018-10-26 本文已影响56人
瓜尔佳_半阙
Producer的代码基本都一样,详情参看官网样例即可。
这里记录一下Consumer的其中一种消费方式。
Consumer 配置类
package com.yeshen.server.initConfig;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ConsumerConfig {
private static final Logger logger = LoggerFactory.getLogger(ConsumerConfig.class);
@Value("${rocketmq.namesrv.addr}")
private String namesrvAddr;
@Value("${rocketmq.consumer.topics}")
private String topics;
@Value("${rocketmq.consumer.threadMax}")
private String consumeThreadMax;
@Value("${rocketmq.consumer.threadMin}")
private String consumeThreadMin;
@Value("${rocketmq.consumer.groupName}")
private String consumerGroupName;
@Autowired
private MQMessageListenerProcessor mqMessageListenerProcessor;
@Bean
public DefaultMQPushConsumer getRocketMQConsumer() throws Exception {
if (StringUtils.isEmpty(consumerGroupName)) {
logger.info("consumer group name is null");
throw new Exception("consumer group name is null");
}
if (StringUtils.isEmpty(namesrvAddr)) {
logger.info("namesrv address is null");
throw new Exception("namesrv address is null");
}
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroupName);
consumer.setNamesrvAddr(namesrvAddr);
consumer.setConsumeThreadMin(Integer.valueOf(consumeThreadMin));
consumer.setConsumeThreadMax(Integer.valueOf(consumeThreadMax));
consumer.subscribe("TopicTest", "TagA || TagB || TagC || TagD || TagE");
consumer.registerMessageListener(mqMessageListenerProcessor);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.start();
logger.info("--------- consumer start success. ------------");
return consumer;
}
}
MessageListener
package com.yeshen.server.initConfig;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.yeshen.server.commons.commonVo.Query;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.List;
@Component
public class MQMessageListenerProcessor implements MessageListenerConcurrently {
private static final Logger logger = LoggerFactory.getLogger(MQMessageListenerProcessor.class);
@Value("${rocketmq.consumer.topics}")
private String topics;
@Autowired
private UploadDataRepository uploadDataRepository;
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
if (CollectionUtils.isEmpty(list)) {
logger.info("message list is null,return success");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
for (MessageExt msg : list) {
try {
String jsonStr = new String(msg.getBody());
JSONObject jsonObject = (JSONObject) JSONObject.parse(jsonStr);
JSONObject queryJson = (JSONObject) jsonObject.get("queryVo");
Query query = JSON.parseObject(queryJson.toJSONString(), Query.class);
List<String> packageList = jsonObject.getJSONArray("list").toJavaList(String.class);
Integer vcode = jsonObject.getInteger("vcode");
uploadDataRepository.uploadDataToOdps(query, packageList, vcode + "", 1);
} catch (Exception e) {
logger.error(e.getMessage(), e);
if (msg.getReconsumeTimes() == 3) {
String jsonStr = new String(msg.getBody());
JSONObject jsonObject = (JSONObject) JSONObject.parse(jsonStr);
JSONObject queryJson = (JSONObject) jsonObject.get("queryVo");
Query query = JSON.parseObject(queryJson.toJSONString(), Query.class);
List<String> packageList = jsonObject.getJSONArray("list").toJavaList(String.class);
Integer vcode = jsonObject.getInteger("vcode");
JSONObject logJson = new JSONObject();
logJson.put("country", query.getIsoCode().toLowerCase());
logJson.put("list", packageList);
logJson.put("ip", query.getIp());
logJson.put("vcode", vcode);
logger.info("not_exists_rule_applist" + logJson.toJSONString());
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}