程序员

Kafka(2)SpringBoot2整合Kafka

2020-08-18  本文已影响0人  正义的杰克船长

1 前期准备

版本兼容表

2 创建项目

项目整体目录结构

3 添加项目依赖

项目依赖pom.xml文件如下:

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.0.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <!-- kafka 依赖-->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.5.0</version>
        </dependency>
        <!-- spring boot 依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- json 工具包 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.68</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

4 Kafka消息生产端

server:
  port: 8081

spring:
  kafka:
    producer:
      # 生产客户端id,默认值为""
      client-id: 1
      # 连接的broker地址,如有多个用逗号隔开
      bootstrap-servers: localhost:9092
      # key序列化类,可以自定义序列化(broker端接受的消息必须以字节数组的形式)
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # value序列化类,可以自定义序列化
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 重试次数(提高可靠性,会影响同步性能,需要等待上一条消息发送完成后才发送下一条)
      retries: 1
package com.johnny.september.kafka.producer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * 启动类
 * @author Johnny Lu
 */
@SpringBootApplication
public class KafkaProducerApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaProducerApplication.class, args);
    }
}
package com.johnny.september.kafka.producer.model;

import java.io.Serializable;

/**
 * 消息实体
 * @author Johnny Lu
 */
public class Message implements Serializable {

    private static final long serialVersionUID = -118L;

    /** 内容 */
    private String content;

    public Message() {
    }

    public Message(String content) {
        this.content = content;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }

}
package com.johnny.september.kafka.producer.config;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;

/**
 * Kafka配置类
 * @author Johnny Lu
 */
@Configuration
public class KafkaConfig {

    /**
     * 创建topic,指定主题名称,分区数量,副本数量
     *
     * @return
     */
    @Bean
    public NewTopic topicTest() {
        return TopicBuilder.name("topic_test_1").partitions(3).replicas(1).build();
    }
}
package com.johnny.september.kafka.producer.controller;

import com.alibaba.fastjson.JSON;
import com.johnny.september.kafka.producer.model.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
 * 控制器
 * @author Johnny Lu
 */
@RestController
public class MessageController {
    private static final Logger logger = LoggerFactory.getLogger(MessageController.class);

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    /**
     * 发送消息
     *
     * @param content
     */
    @RequestMapping(path = "/send/{content}")
    public void sendMessage(@PathVariable String content) {
        kafkaTemplate.send("topic_test_1", JSON.toJSONString(new Message(content)));
    }

    /**
     * 发送消息,且阻塞等待broker的响应,直到消息发送成功,设置超时时间,超时异常处理
     *
     * @param content
     * @return
     */
    @RequestMapping(path = "/sendWaitResult/{content}")
    public String sendMessageWaitResult(@PathVariable String content) {
        String result = "发送成功";
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate
                .send("topic_test_1", JSON.toJSONString(new Message(content)));
        try {
            future.get(3000, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
            result = "发送失败";
        } catch (ExecutionException e) {
            e.printStackTrace();
            result = "执行失败";
        } catch (TimeoutException e) {
            e.printStackTrace();
            result = "发送超时";
        }
        logger.info("发送消息:{}, 结果:{}", content, result);
        return result;
    }
}

5 Kafka消息消费端

server:
  port: 8082

spring:
  kafka:
    consumer:
      # 消费客户端id,默认值为""
      client-id: 1
      # 连接的broker地址,如有多个用逗号隔开
      bootstrap-servers: localhost:9092
      # key反序列化类
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # value反序列化类
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
package com.johnny.september.kafka.consumer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * 启动类
 * @author Johnny Lu
 */
@SpringBootApplication
public class KafkaConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaConsumerApplication.class, args);
    }
}
package com.johnny.september.kafka.consumer.listener;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/**
 * 消息监听
 *
 * @author Johnny Lu
 */
@Component
public class MessageListener {

    private final Logger logger = LoggerFactory.getLogger(MessageListener.class);

    /**
     * 监听消息,接受消息后处理业务逻辑
     * 消费组:messageGroup
     *
     * @param message
     */
    @KafkaListener(id = "messageGroup", topics = "topic_test_1")
    public void listen(String message) {
        logger.info("接受消息: " + message);
    }
}

6 启动项目验证

7 自定义拦截器

Kafka共有两种拦截器:生产者拦截器和消费者拦截器。

生产者拦截器

package com.johnny.september.kafka.producer.config;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

/**
 * 自定义生产端拦截器
 * @author Johnny Lu
 */
public class CustomProducerInterceptor implements ProducerInterceptor {

    private static final Logger logger = LoggerFactory.getLogger(CustomProducerInterceptor.class);

    /**
     * 发送前做一些处理
     *
     * @param record
     * @return
     */
    @Override
    public ProducerRecord onSend(ProducerRecord record) {
        logger.info("发送消息 :{}", record.toString());
        return record;
    }

    /**
     * 这个方法在应答前或消息发送失败时被调用
     *
     * @param metadata
     * @param exception
     */
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    }

    /**
     * 关闭这个拦截器时被调用
     */
    @Override
    public void close() {
    }

    @Override
    public void configure(Map<String, ?> configs) {
    }
}
spring:
  kafka:
    producer:
      properties:
        interceptor.classes: com.johnny.september.kafka.producer.config.CustomProducerInterceptor

消费者拦截器

package com.johnny.september.kafka.consumer.config;

import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;

import java.util.Map;

/**
 * 自定义消费端拦截器
 * @author Johnny Lu
 */
public class CustomConsumerInterceptor implements ConsumerInterceptor {

    /**
     * 这个方法,在拉取到消息调用
     * @param records
     * @return
     */
    @Override
    public ConsumerRecords onConsume(ConsumerRecords records) {
        return records;
    }

    /**
     * 这个方法,在提交请求响应成功时被调用
     * @param offsets
     */
    @Override
    public void onCommit(Map offsets) {
    }

    @Override
    public void close() {
    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

spring:
  kafka:
    consumer:
      properties:
        # 支持多个拦截器,用逗号隔开,多个形成拦截链,按顺序一一调用
        interceptor.classes: com.johnny.september.kafka.consumer.config.CustomConsumerInterceptor

8 自定义分区器

package com.johnny.september.kafka.producer.config;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.utils.Utils;

import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;

/**
 * 自定义分区器
 * @author Johnny Lu
 */
public class CustomPartitioner implements Partitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes,
            Cluster cluster) {
        Integer partitionNums = cluster.partitionCountForTopic(topic);
        if (keyBytes == null) {
            // 随机分区
            return Utils.toPositive(ThreadLocalRandom.current().nextInt()) % partitionNums;
        } else {
            // 保持和 DefaultPartitioner 一样采用murmur2算法分区
            return Utils.toPositive(Utils.murmur2(keyBytes)) % partitionNums;
        }
    }

    @Override
    public void close() {
    }

    @Override
    public void configure(Map<String, ?> configs) {
    }
}

spring:
  kafka:
    producer:
      properties:
        partitioner.class: com.johnny.september.kafka.producer.config.CustomPartitioner

重新启动项目进行验证。

9 结语

到这里,Spring Boot2整合Kafka简易版完成了。以后会继续记录Kafka其他功能及用法。

上一篇下一篇

猜你喜欢

热点阅读