SSM+shiro等javaweb收藏spring boot

spring boot集成kafka

2018-08-10  本文已影响186人  后知后觉_ceba

随着spring boot流行,越来越多的开发者转向spring boot作为java项目的底层框架,而spring boot集成中间件的配置不同于spring的xml方式,现给大家做一个spring boot集成多个kafka集群生产发送消息示例,希望对大家能有帮助

依赖环境

spring boot:2.0.4
kafka:1.1.0

示例代码

1:pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.wl</groupId>
    <artifactId>spring-boot-kafka</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>spring-boot-kafka</name>
    <description>Demo project for Spring Boot Integrate kafka</description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.4.RELEASE</version>
        <relativePath/>
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!--kafka依赖 start-->
        <!--
            此处用spring-kafka,依赖的kafka-clients为1.0.2,由于服务端是1.1.0版本,
            所以此处去掉spring-kafka本身依赖的kafka客户端,引入了1.1.0版本的kafka-clients,
            如果不需要spring-kafka,可以直接应用原生kafka-clients
         -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.kafka</groupId>
                    <artifactId>kafka-clients</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>1.1.0</version>
        </dependency>
        <!--kafka依赖 end-->
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>
2:application.properties
#============== kafka cluster1===================
kafka.cluster1.consumer.zookeeper.connect=127.0.0.1:2181
kafka.cluster1.consumer.servers=127.0.0.1:9092

kafka.cluster1.producer.servers=127.0.0.1:9092
#============== kafka cluster2===================
kafka.cluster2.consumer.zookeeper.connect=127.0.0.1:2182
kafka.cluster2.consumer.servers=127.0.0.1:9093

kafka.cluster2.producer.servers=127.0.0.1:9093
#============== kafka 公共配置===================
kafka.consumer.enable.auto.commit=true
kafka.consumer.session.timeout=6000
kafka.consumer.auto.commit.interval=100
kafka.consumer.auto.offset.reset=latest
kafka.consumer.group.id=test
kafka.consumer.concurrency=10

kafka.producer.retries=0
kafka.producer.batch.size=4096
kafka.producer.linger=1
kafka.producer.buffer.memory=40960
3:SpringKafkaProducerConfig.java

该类配置了两个KafkaTemplate,分别对应kafka cluster1与kafka cluster2

package com.wl.kafka;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import java.util.HashMap;
import java.util.Map;

/**
 * @Author:
 * @Description:kafka生产者配置,配置多个生产模版向多个kafka集群发送消息示例
 * @Date: Created in 下午4:48 2018/8/9
 * @Modified By:
 */
@Configuration
@EnableKafka
public class SpringKafkaProducerConfig {
    /**
     * kafka集群1
     */
    @Value("${kafka.cluster1.producer.servers}")
    private String cluster1Servers;

    /**
     * kafka集群2
     */
    @Value("${kafka.cluster2.producer.servers}")
    private String cluster2Servers;

    @Value("${kafka.producer.retries}")
    private int retries;
    @Value("${kafka.producer.batch.size}")
    private int batchSize;
    @Value("${kafka.producer.linger}")
    private int linger;
    @Value("${kafka.producer.buffer.memory}")
    private int bufferMemory;

    /**
     * kafka cluster1 生产模版
     * @return
     */
    @Bean
    public KafkaTemplate<String, String> kafkaCluster1Template() {
        Map<String, Object> configProps = producerConfigProps();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster1Servers);
        return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(configProps));
    }

    /**
     * kafka cluster2 生产模版
     * @return
     */
    @Bean
    public KafkaTemplate<String, String> kafkaCluster2Template() {
        Map<String, Object> configProps = producerConfigProps();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster2Servers);
        return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(configProps));
    }

    /**
     * 公共配置
     * @return
     */
    private Map<String, Object> producerConfigProps(){
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        configProps.put(ProducerConfig.ACKS_CONFIG, "all");
        configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
        configProps.put(ProducerConfig.LINGER_MS_CONFIG, linger);
        configProps.put(ProducerConfig.RETRIES_CONFIG, retries);
        configProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
        return configProps;
    }
}
4:SpringKafkaConsumerConfig.java

该类配置了两个ConcurrentKafkaListenerContainerFactory,分别对应kafka cluster1与kafka cluster2

package com.wl.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.HashMap;
import java.util.Map;

/**
 * @Author:
 * @Description: kafka消费者配置,配置多个消费工厂消费不同的kafka集群消息示例
 * @Date: Created in 下午3:20 2018/8/8
 * @Modified By:
 */
@Configuration
@EnableKafka
public class SpringKafkaConsumerConfig {

    /**
     * kafka cluster1配置
     */
    @Value("${kafka.cluster1.consumer.servers}")
    private String kafkaCluster1BootstrapServers;

    /**
     * kafka cluster2配置
     */
    @Value("${kafka.cluster2.consumer.servers}")
    private String kafkaCluster2BootstrapServers;

    /**
     * 公共配置
     */
    @Value("${kafka.consumer.session.timeout}")
    private Integer sessionTimeoutMs;
    @Value("${kafka.consumer.enable.auto.commit}")
    private boolean enableAutoCommit;
    @Value("${kafka.consumer.auto.commit.interval}")
    private Integer autoCommitIntervalMs;
    @Value("${kafka.consumer.auto.offset.reset}")
    private String autoOffsetReset;
    @Value("${kafka.consumer.group.id}")
    private String groupId;


    /**
     * kafka cluster1消费工厂
     * @return
     */
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> cluster1KafkaListenerContainerFactory() {
        Map<String, Object> configProps = configProps();
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster1BootstrapServers);
        return getKafkaListenerContainerFactory(configProps);
    }

    /**
     * kafka cluster2消费工厂
     * @return
     */
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> cluster2KafkaListenerContainerFactory() {
        Map<String, Object> configProps = configProps();
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster2BootstrapServers);
        return getKafkaListenerContainerFactory(configProps);
    }

    /**
     * 创建ConcurrentKafkaListenerContainerFactory
     * @param configProps
     * @return
     */
    private ConcurrentKafkaListenerContainerFactory<String, String> getKafkaListenerContainerFactory(Map<String, Object> configProps){
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        ConsumerFactory<String, String> basicConsumerFactory = new DefaultKafkaConsumerFactory<>(configProps);
        factory.setConsumerFactory(basicConsumerFactory);
        //设定为批量消费
        factory.setBatchListener(true);
        return factory;
    }

    /**
     * 公共配置
     * @return
     */
    private Map<String, Object> configProps(){
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        configProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,100);
        return configProps;
    }

}

4:MyProducer.java

该类为生产者测试类,定时向kafka cluster1与kafka cluster2发送消息

package com.wl.kafka;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

/**
 * @Author:
 * @Description: 定时向kafka集群1和kafka集群2发送消息
 * @Date: Created in 下午9:33 2018/8/9
 * @Modified By:
 */
@Component
@EnableScheduling
public class MyProducer {

    @Autowired
    @Qualifier("kafkaCluster1Template")
    private KafkaTemplate kafkaCluster1Template;

    @Autowired
    @Qualifier("kafkaCluster2Template")
    private KafkaTemplate kafkaCluster2Template;

    /**
     * 每5秒向Kafka集群1的topick:kafkacluster1test发送消息
     */
    @Scheduled(cron = "*/5 * * * * ?")
    public void produceMsgToKafkaCluster1(){
        System.out.println("向kafka cluster1 发送消息");
        kafkaCluster1Template.send("kafkacluster1test", "hello cluster1");
    }

    /**
     * 每10秒向Kafka集群2的topick:kafkacluster2test发送消息
     */
    @Scheduled(cron = "*/10 * * * * ?")
    public void produceMsgToKafkaCluster2(){
        System.out.println("向kafka cluster2 发送消息");
        kafkaCluster2Template.send("kafkacluster2test", "hello cluster2");
    }

}

4:MyConsumer.java

该类为消费者测试类,包含两个消费任务,分别从kafka cluster1和kafka cluster2消费消息

package com.wl.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.List;

/**
 * @Author:
 * @Description:消费kafka集群1的kafkacluster1test主题消息和kafka集群2的kafkacluster2test主题消息
 * @Date: Created in 下午9:46 2018/8/9
 * @Modified By:
 */
@Component
public class MyConsumer {

    /**
     * 消费kafka集群1的kafkacluster1test主题消息
     * @param records
     */
    @KafkaListener(topics = "kafkacluster1test", containerFactory = "cluster1KafkaListenerContainerFactory")
    private void kafkacluster1testConsumer(List<ConsumerRecord<String, String>> records) {
        System.out.println("消费kafkacluster1test消息:" + records.size() + ">>>" + records.toString());
    }

    /**
     * 消费kafka集群2的kafkacluster2test主题消息
     * @param records
     */
    @KafkaListener(topics = "kafkacluster2test", containerFactory = "cluster2KafkaListenerContainerFactory")
    private void kafkacluster2testConsumer(List<ConsumerRecord<String, String>> records) {
        System.out.println("消费kafkacluster2test消息:" + records.size() + ">>>" + records.toString());
    }
}

4:运行测试

启动springboot启动类,观察日志


kafka-spring-boot.png
5:github代码库

github代码库.

上一篇 下一篇

猜你喜欢

热点阅读