Kafka实战-SpringBoot2生产消息

2021-01-25  本文已影响0人  大雄喵

1、基本环境
JDK:1.8
SpringBoot版本:2.3.8.RELEASE
Kafka版本:2.7单机部署

2、创建项目
在IDEA中新建SpringBoot项目


image.png

选择需要引入的依赖


image.png

最终pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.8.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.study</groupId>
    <artifactId>kafka-study</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>kafka-study</name>
    <description>学习Kafka</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

3、Kafka配置类
首先在application.properties中新建一个配置:

kafka.broker.list=192.168.30.128:9092

然后新建一个KafkaConfig的配置

package com.study.kafka.config;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Properties;

/**
 * Kafka配置
 */
@Configuration
public class KafkaConfig {

    @Value("${kafka.broker.list}")
    public String brokerList;

    public static final String TOPIC = "syslogs";

    public Properties producerConfigs() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 20000000);//20M 消息缓存
        //生产者空间不足时,send()被阻塞的时间,默认60s
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 6000);
        //生产者重试次数
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        //指定ProducerBatch(消息累加器中BufferPool中的)可复用大小
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        //生产者会在ProducerBatch被填满或者等待超过LINGER_MS_CONFIG时发送
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer.client.id.demo");
        return props;
    }

    @Bean
    public Producer<Integer, Object> getKafkaProducer() {
        //KafkaProducer是线程安全的,可以在多个线程中共享单个实例
        return new KafkaProducer<Integer, Object>(producerConfigs());
    }

}

4、使用测试类发送消息

package com.study.kafka;

import com.study.kafka.config.KafkaConfig;
import com.study.kafka.domain.LogInfo;
import kafka.utils.Json;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.Date;

@Slf4j
@SpringBootTest
class KafkaStudyApplicationTests {

    @Autowired
    Producer producer;

    @Test
    void sendKafkaMsg() {
        LogInfo logInfo = new LogInfo();
        logInfo.setId(10000L);
        logInfo.setIp("11.22.33.44");
        logInfo.setDeviceType("Huawei Mate40");
        logInfo.setOsVersion("11.0");
        logInfo.setCreateDate(new Date());

        // json消息
        String msg = Json.encodeAsString(logInfo);

        ProducerRecord<String,String> record = new ProducerRecord<>(KafkaConfig.TOPIC,msg);
        try {
            // 异步发送消息
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception e) {
                    if (e == null) {
                        log.info("发送成功,分区:{},偏移量:{}",metadata.partition(),metadata.offset());
                    }else {
                        log.info("异常:{}",e.getMessage());
                    }
                }
            });
        }catch (Exception ex) {
            ex.printStackTrace();
        }
    }

}

5、使用kafka-console-consumer.sh消费消息
在控制台执行命令,监听消息:

[root@master kafka_2.12-2.7.0]# ./bin/kafka-console-consumer.sh --bootstrap-server 192.168.30.128:9092 --topic syslogs --from-beginning

输出如下:

{"id":10000,"ip":"11.22.33.44","deviceType":"Huawei Mate40","osVersion":"11.0","createDate":1611583407099}
{"id":10000,"ip":"11.22.33.44","deviceType":"Huawei Mate40","osVersion":"11.0","createDate":1611583457711}
{"id":10000,"ip":"11.22.33.44","deviceType":"Huawei Mate40","osVersion":"11.0","createDate":1611583493982}

作者:大雄喵

上一篇 下一篇

猜你喜欢

热点阅读