有待学习Kafkaspring框架

SpringBoot 整合 Kafka

2018-09-20  本文已影响167人  weisen

Kafka 概述

Apache Kafka 是一个分布式流处理平台,用于构建实时的数据管道和流式的应用.它可以让你发布和订阅流式的记录,可以储存流式的记录,并且有较好的容错性,可以在流式记录产生时就进行处理。

Apache Kafka是分布式发布-订阅消息系统,在 kafka官网上对 Kafka 的定义:一个分布式发布-订阅消息传递系统。

Kafka 特性

  1. 高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个topic可以分多个partition, consumer group 对partition进行consume操作;
  2. 可扩展性:kafka集群支持热扩展;
  3. 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失;
  4. 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败);
  5. 高并发:支持数千个客户端同时读写;
  6. 支持实时在线处理和离线处理:可以使用Storm这种实时流处理系统对消息进行实时进行处理,同时还可以使用Hadoop这种批处理系统进行离线处理;

Kafka 使用场景

  1. 日志收集:一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如Hadoop、Hbase、Solr等;
  2. 消息系统:解耦和生产者和消费者、缓存消息等;
  3. 用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到Hadoop、数据仓库中做离线分析和挖掘;
  4. 运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告;
  5. 流式处理:比如spark streaming和storm;
  6. 事件源;

Spring Boot2.0 + Kafka

1,创建 Spring Boot 项目

注意版本:该项目使用Spring Boot 2.0 +,低版本可能不对

  1. pom.xml引用
               <dependency>
                   <groupId>org.springframework.kafka</groupId>
                   <artifactId>spring-kafka</artifactId>
               </dependency>           
  1. 配置文件 application.yml
spring:
  application:
    name: bd-job-executor-springboot
  kafka:
    bootstrap-servers: 192.168.235.6:9092,192.168.235.7:9092
    consumer:
      groupId: executor
      enable-auto-commit: true
      keyDeserializer: org.apache.kafka.common.serialization.StringDeserializer
      valueDserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      groupId: executor
      keyDeserializer: org.apache.kafka.common.serialization.StringSerializer
      valueDserializer: org.apache.kafka.common.serialization.StringSerializer
      batch-size: 16384
      buffer-memory: 33554432
  1. kafka文件的总体结构


    20180920114831.png
  2. 消息文件定义
public class Message {

    private Long id;    //id

    private int code;  //返回码

    private String msg; //消息

    private Date startTime;  //时间戳

    private Date sendTime;  //时间戳

    private String logPath; //日志地址
}
  1. 定义消息生产者
    直接使用 KafkaTemplate 发送消息 ,Spring Boot自动装配,不需要自己定义一个Kafka配置类,吐槽一下网站的文章,全都是互相抄,全都写一个 ProduceConfig Consumerconfig 类, Kafka 的参数配置 硬编码在代码中,简直无法直视。
    定义一个泛型类KafkaSender<T> T 就是你需要发送的消息 对象,序列化使用阿里的 fastjson消息发送后,可以在回调类里面处理自己的业务,ListenableFutureCallback 类有两个方法,分别是 onFailureononSuccess ,实际场景可以在这两个方法,处理自己的具体业务,这里不做实现。
 @Component
public class KafkaSender<T> {

    private static Logger log = LoggerFactory.getLogger(KafkaSender.class);

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    //发送消息方法
    public void send(T obj) {
        String jsonObj = JSON.toJSONString(obj);
        log.info("------------ message = {}", jsonObj);
        //发送消息
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(TopicConst.EXECUTOR_TOPIC, jsonObj);
        future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable throwable) {
                log.info("Produce: The message failed to be sent:" + throwable.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
                //TODO 业务处理
                log.info("Produce: The message was sent successfully:");
                log.info("Produce: _+_+_+_+_+_+_+ result: " + stringObjectSendResult.toString());
            }
        });
    }
}

  1. TopicConst是一个常量类的配置
public class TopicConst {
    public static final String EXECUTOR_TOPIC = "executorTopic";
}
  1. 定义消息消费者
    使用@KafkaListener 注解监听 topics 消息,此处的topics 必须和 send 函数中的 一致
    @Header(KafkaHeaders.RECEIVED_TOPI 直接获取 topic
@Component
public class KafkaReceiver {
    private static Logger log = LoggerFactory.getLogger(KafkaReceiver.class);

    @KafkaListener(topics = {TopicConst.EXECUTOR_TOPIC})
    public void listen(String message) {
        log.info("------------------接收消息 message =" + message);
        Message msg = JSON.parseObject(message, Message.class);
        log.info("MessageConsumer: onMessage: message is: [" + msg + "]");
        log.info("------------------ message =" + message);

    }
}
  1. 使用方法
    直接使用 @Autowired 对类 KafkaSender 自动装配,然后调用 send 方法发送消息即可,下面给出代码:
@RestController
@RequestMapping("/kafka")
public class KafkaController {
    protected final Logger logger = LoggerFactory.getLogger(this.getClass());
    @Autowired
    private KafkaSender sender;

    @RequestMapping(value = "/send", method = RequestMethod.GET)
    public ReturnT<String> sendKafka(String message) {
        try {
            logger.info("kafka的消息={}", message);
            sender.send(message);
            logger.info("发送kafka成功.");
            return ReturnT.SUCCESS;
        } catch (Exception e) {

            logger.error("发送kafka失败", e);
            return ReturnT.FAIL;
        }
    }
}

注意事项:

kafka可视化客户端工具(Kafka Tool)的基本使用

1、下载

下载地址:http://www.kafkatool.com/download.html

2、安装

根据不同的系统下载对应的版本,我这里kafka版本是kafka_2.11-1.0.0,下载kafka tool 2.0.1。

image.png

双击下载完成的exe图标,傻瓜式完成安装。

3、简单使用

kafka环境搭建请参考:CentOS7.5搭建Kafka2.11-1.1.0集群

1)连接kafka

打开kafka tool安装目录,点击exe文件

image.png

提示设置kafka集群连接,设置

image

设置完了,点击Test测试是否能连接,连接通了,然后点击Add,添加完成设置。出现如下界面

image

2)简单使用

配置以字符串的形式显示kafka消息体

image image image

或者通过如下界面配置

image

注释:更改完Content Types,要点击Update和Refresh按钮

再次查看kafka的数据:

image.png
上一篇下一篇

猜你喜欢

热点阅读