SpringBoot 整合 Kafka 简明教程(Mac 下)

2018-12-28  本文已影响156人  panzhangbao

Mac 下安装启动 kafka

  1. 安装 kakfa,安装过程将依赖安装 zookeeper
brew install kafka

软件位置

/usr/local/Cellar/zookeeper

/usr/local/Cellar/kafka

配置文件位置

/usr/local/etc/kafka/zookeeper.properties 

/usr/local/etc/kafka/server.properties

备注:后续操作均需进入 /usr/local/Cellar/kafka/xxx/bin 目录下。

  1. 启动zookeeper
zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties &
  1. 启动kafka服务
kafka-server-start /usr/local/etc/kafka/server.properties & 
  1. 创建 topic ,命名为test1
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test1
  1. 查看创建的topic
kafka-topics --list --zookeeper localhost:2181  
  1. 生产数据
kafka-console-producer --broker-list localhost:9092 --topic test1
  1. 消费数据
kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --topic test1 --from-beginning

备注:--from-beginning 将从第一个消息还是接收

SpringBoot 整合 Kafka

<!--springboot 版本:2.1.1.RELEASE 对应 spring-kafka 版本 2.2.2.RELEASE 对应 kafka 版本 2.1.0-->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.2.2.RELEASE</version>
        </dependency>
spring:
  # Kafka
  kafka:
    # Kafka 服务器地址
    bootstrap-servers: 192.168.10.31:9092
    # 生产者
    producer:
      retries: 0
      # 每次批量发送消息的数量
      batch-size: 16384
      buffer-memory: 33554432
      # 指定消息key和消息体的编解码方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # key-value 序列化
    # 消费者
    consumer:
      group-id: consumer-group
      auto-offset-reset: earliest
      enable-auto-commit: true
      auto-commit-interval: 100s
      # 指定消息key和消息体的编解码方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
package tc.smartlockapplet.utils.http.kafka;

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import tc.smartlockapplet.utils.http.json.JSONUtils;

import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;

/**
 * Kafka 生产者
 *
 * Created by panzhangbao on 2018-12-27 16:57:54
 * Copyright © 2018 panzhangbao. All rights reserved.
 */
@Component
@EnableScheduling
public class KafkaProducer {
    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

    /**
     * 定时器测试 生产者发送信息
     *
     * @param
     * @return void
     * @date 2018/12/28 08:37
     * @author panzhangbao
     */
    @Scheduled(cron = "0/3 * * * * *")
    public void schedulingTest() {
        String topic = "personalInfoTopic";
        Map<String, String> messageMap = new HashMap<>();
        messageMap.put("name", "JasonPan");
        messageMap.put("motto", "If not now, when? if not me, who?");


        System.out.println("\n----------   producer   ----------\n");
        System.out.println("topic is " + topic + "\nmessage is " + messageMap + "\n");

        kafkaTemplate.send(topic, "personalInfo", JSONUtils.mapToJson(messageMap));
    }
}
package tc.smartlockapplet.utils.http.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.Optional;

/**
 * Kafka 消费者
 *
 * Created by panzhangbao on 2018-12-27 19:03:19
 * Copyright © 2018 panzhangbao. All rights reserved.
 */
@Component
public class KafkaConsumer {
    @KafkaListener(topics = {"personalInfoTopic"})
    public void listen(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());

        if (kafkaMessage.isPresent()) {

            Object message = kafkaMessage.get();

            System.out.println("----------   consumer   ----------");
            System.out.println("record is " + record + "\nmessage is " + message);
        }

    }
}

参考

  1. DevinShuai 的博客:Mac 安装使用kafka
  2. tzs_ 的博客:SpringBoot Kafka 整合使用
上一篇 下一篇

猜你喜欢

热点阅读