记RocketMQ的一次使用

2018-10-26  本文已影响56人  瓜尔佳_半阙

Producer的代码基本都一样,详情参看官网样例即可。
这里记录一下Consumer的其中一种消费方式。

Consumer 配置类

package com.yeshen.server.initConfig;

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ConsumerConfig {
    private static final Logger logger = LoggerFactory.getLogger(ConsumerConfig.class);

    @Value("${rocketmq.namesrv.addr}")
    private String namesrvAddr;

    @Value("${rocketmq.consumer.topics}")
    private String topics;

    @Value("${rocketmq.consumer.threadMax}")
    private String consumeThreadMax;

    @Value("${rocketmq.consumer.threadMin}")
    private String consumeThreadMin;

    @Value("${rocketmq.consumer.groupName}")
    private String consumerGroupName;

    @Autowired
    private MQMessageListenerProcessor mqMessageListenerProcessor;

    @Bean
    public DefaultMQPushConsumer getRocketMQConsumer() throws Exception {
        if (StringUtils.isEmpty(consumerGroupName)) {
            logger.info("consumer group name is null");
            throw new Exception("consumer group name is null");
        }
        if (StringUtils.isEmpty(namesrvAddr)) {
            logger.info("namesrv address is null");
            throw new Exception("namesrv address is null");
        }
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroupName);
        consumer.setNamesrvAddr(namesrvAddr);
        consumer.setConsumeThreadMin(Integer.valueOf(consumeThreadMin));
        consumer.setConsumeThreadMax(Integer.valueOf(consumeThreadMax));
        consumer.subscribe("TopicTest", "TagA || TagB || TagC || TagD || TagE");
        consumer.registerMessageListener(mqMessageListenerProcessor);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.start();
        logger.info("--------- consumer start success. ------------");
        return consumer;
    }
}

MessageListener

package com.yeshen.server.initConfig;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.yeshen.server.commons.commonVo.Query;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.util.List;

@Component
public class MQMessageListenerProcessor implements MessageListenerConcurrently {

    private static final Logger logger = LoggerFactory.getLogger(MQMessageListenerProcessor.class);

    @Value("${rocketmq.consumer.topics}")
    private String topics;

    @Autowired
    private UploadDataRepository uploadDataRepository;

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        if (CollectionUtils.isEmpty(list)) {
            logger.info("message list is null,return success");
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
        for (MessageExt msg : list) {
            try {
                String jsonStr = new String(msg.getBody());
                JSONObject jsonObject = (JSONObject) JSONObject.parse(jsonStr);
                JSONObject queryJson = (JSONObject) jsonObject.get("queryVo");
                Query query = JSON.parseObject(queryJson.toJSONString(), Query.class);
                List<String> packageList = jsonObject.getJSONArray("list").toJavaList(String.class);
                Integer vcode = jsonObject.getInteger("vcode");
                uploadDataRepository.uploadDataToOdps(query, packageList, vcode + "", 1);
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
                if (msg.getReconsumeTimes() == 3) {
                    String jsonStr = new String(msg.getBody());
                    JSONObject jsonObject = (JSONObject) JSONObject.parse(jsonStr);
                    JSONObject queryJson = (JSONObject) jsonObject.get("queryVo");
                    Query query = JSON.parseObject(queryJson.toJSONString(), Query.class);
                    List<String> packageList = jsonObject.getJSONArray("list").toJavaList(String.class);
                    Integer vcode = jsonObject.getInteger("vcode");
                    JSONObject logJson = new JSONObject();
                    logJson.put("country", query.getIsoCode().toLowerCase());
                    logJson.put("list", packageList);
                    logJson.put("ip", query.getIp());
                    logJson.put("vcode", vcode);
                    logger.info("not_exists_rule_applist" + logJson.toJSONString());
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

上一篇下一篇

猜你喜欢

热点阅读