使用非注解形式的javaConfig配置进行kafka消息监听

2018-09-20  本文已影响0人  神蛋_狄仁杰

最近在做平台的kafka消息监听的改造,以前用的是平台自己封装jar,现在统一改用spring-kafka.jar,这样的好处是减少特殊处理,便于统一维护。
以下是配置:

import java.util.*;

/**
 * @Description: kafka配置类
 * @Author: LiuBing
 * @Date: 13:42 2018/9/12
 */
@Configuration
public class KafkaConfig {

    @Value("${kafka.bootstrap.servers}")
    private String bootstrapServers;

    @Value("${kafka.topic.name}")
    private String topicName;

    @Value("${kafka.consumer.group}")
    private String consumerGroup;

    @Bean
    public DefaultKafkaConsumerFactory defaultKafkaConsumerFactory(){
        Map<String, Object> consumerProperties = new HashMap<>();
        consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        DefaultKafkaConsumerFactory factory = new DefaultKafkaConsumerFactory(consumerProperties,new StringDeserializer(),new StringDeserializer());
        return factory;
    }

    @Bean
    public ContainerProperties containerProperties(MessageListener<String, String> consumerMessageListener){
        ContainerProperties properties = new ContainerProperties(topicName);
        properties.setGroupId(consumerGroup);
        properties.setAckMode(AbstractMessageListenerContainer.AckMode.RECORD);
        properties.setMessageListener(consumerMessageListener);
        return properties;
    }

    @Bean
    public ConcurrentMessageListenerContainer kafkaMessageListenerContainer(ConsumerFactory defaultKafkaConsumerFactory,
                                                                            ContainerProperties containerProperties){
        ConcurrentMessageListenerContainer container = new ConcurrentMessageListenerContainer(defaultKafkaConsumerFactory,containerProperties);
        container.setConcurrency(3);
        return container;
    }

    @Bean
    public MessageListener<String, String> consumerMessageListener(){
        return new KafkaConsumeMessageListenerTwo();
    }

}

然后是一个监听类,注意在上边的配置中我们需要将这个监听类设置到我们的配置里边

import java.util.concurrent.ThreadPoolExecutor;

/**
 * @Description: 消费者监听
 * @Author: LiuBing
 * @Date: 14:46 2018/9/19
 */
@Slf4j
public class kafkaConsumeMessageListenerTwo implements MessageListener<String, String> {

    @Autowired
    private TInsuranceCommonArgumentDao insuranceCommonArgumentDao;

    @Autowired
    private RaiseFundServiceImpl raiseFundServiceImpl;

    @Autowired
    private TradeCenterServiceImpl tradeCenterService;

    @Autowired
    private ThreadPoolExecutor threadPoolExecutor;

    @Override
    public void onMessage(ConsumerRecord<String, String> consumerRecord) {

        log.info("kafkaConsumeMessageListenerTwo.threadPoolExecutor...hashcode:{}", threadPoolExecutor.hashCode());
        threadPoolExecutor.execute(() -> {
            try {
                String value = insuranceCommonArgumentDao.getValue(MarketActivityEnum.QFBF001_HB_SWITCH.getTemplateCoe());
                if ("0".equals(value)){
                    return;
                }
                try {
                    //do business
                    doBusiness(consumerRecord);
                } catch (Exception e) {
                    log.error("消息接收异常:{}", e.getMessage());
                }
            } catch (Exception e) {
                log.info("错误信息:{}", e);
            }
        });
    }


    public void doBusiness(ConsumerRecord<String, String> consumerRecord) throws Exception{
        // 将json格式的消息转换为bean对象
        log.info("消息体串:{}", consumerRecord);
        //  topic匹配
        switch (consumerRecord.topic()){
            case CommonConstant.FUND_ORDER_TOPIC:
                log.info("偏移量:{},topic:{}", consumerRecord.offset(),consumerRecord.topic());
                TradeCenterFundMessageDTO fundMessageDTO = JsonUtil.fromJson(consumerRecord.value(),TradeCenterFundMessageDTO.class);
                raiseFundServiceImpl.capitalDomain(fundMessageDTO, CommonConstant.PREMIUM_ACCOUNT_NO,CommonConstant.QFBF_ACTIVITY_NO);
                break;
            case CommonConstant.ACQ_ORDER_PAY_TOPIC:
                log.info("偏移量:{},topic:{}", consumerRecord.offset(),consumerRecord.topic());
                TradeCenterAcqMessageDTO acqMessageDTO = JsonUtil.fromJson(consumerRecord.value(),TradeCenterAcqMessageDTO.class);
                tradeCenterService.tradeCenterRaise(acqMessageDTO, CommonConstant.PREMIUM_ACCOUNT_NO,CommonConstant.QFBF_ACTIVITY_NO);
                String button = insuranceCommonArgumentDao.getValue("SHQFBF_BUTTON");
                if("true".equals(button)) {
                    tradeCenterService.tradeCenterRaise(acqMessageDTO, CommonConstant.SHPREMIUM_ACCOUNT_NO,CommonConstant.SHQFBF_ACTIVITY_NO);
                }
                break;
            default:
                break;
        }

    }
}

至此便可以监听到消息进行业务处理了。

上一篇下一篇

猜你喜欢

热点阅读