kafka connect 介绍
概述
Kafka Connect 是一款可扩展并且可靠地在 Apache Kafka 和其他系统之间进行数据传输的工具。 可以很简单的快速定义 connectors 将大量数据从 Kafka 移入和移出. Kafka Connect 可以摄取数据库数据或者收集应用程序的 metrics 存储到 Kafka topics,使得数据可以用于低延迟的流处理。 一个导出的 job 可以将来自 Kafka topic 的数据传输到二级存储,用于系统查询或者批量进行离线分析。
Kafka Connect 功能包括:
- Kafka connectors 通用框架: - Kafka Connect 将其他数据系统和Kafka集成标准化,简化了 connector 的开发,部署和管理
- 分布式和单机模式 - 可以扩展成一个集中式的管理服务,也可以单机方便的开发,测试和生产环境小型的部署。
- REST 接口 - 通过易于使用的REST API提交和管理connectors到您的Kafka Connect集群
- offset 自动管理 - 只需要connectors 的一些信息,Kafka Connect 可以自动管理offset 提交的过程,因此开发人员无需担心开发中offset提交出错的这部分。
- 分布式的并且可扩展 - Kafka Connect 构建在现有的 group 管理协议上。Kafka Connect 集群可以扩展添加更多的workers。
- 整合流处理/批处理 - 利用 Kafka 已有的功能,Kafka Connect 是一个桥接stream 和批处理系统理想的方式。
搭建kafka connect分布式集群
Kafka Connect 当前支持两种执行方式: 单机 (单个进程) 和 分布式.
分布式模式下会自动进行负载均衡,允许动态的扩缩容,并提供对 active task,以及这个任务对应的配置和offset提交记录的容错。
#分布式
bin/connect-distributed.sh config/connect-distributed.properties
connect-distributed.properties文件配置参数可以查看官方文档
配置connector
Connector 配置是简单的key-value 映射的格式。在分布式模式中,它们将被包含在创建(或修改)connector 的请求的JSON格式串中。
rest api
由于Kafka Connect 旨在作为服务运行,它还提供了一个用于管理 connectors 的REST API。默认情况下,此服务在端口8083上运行。以下是当前支持的功能:
- GET /connectors - 返回一个活动的连接器的列表
- POST /connectors - 创建一个新的连接器; 请求主体应为JSON对象,其中包含name字段和带有连接器配置参数的对象config字段
- GET /connectors/{name} - 获取有关特定连接器的信息
- GET /connectors/{name}/config
- PUT /connectors/{name}/config - 更新连接器的参数
- GET /connectors/{name}/status - 获取连接器的当前状态,包括连接器是否正在运行,发生故障,已暂停等,将其分配给哪个工作器,如果连接器发生故障,则显示错误信息,以及其所有任务的状态
- GET /connectors/{name}/tasks
- GET /connectors/{name}/tasks/{taskid}/status
- PUT /connectors/{name}/pause - 暂停连接器及其任务,这将停止消息处理,直到恢复连接器为止
- PUT /connectors/{name}/resume - 恢复已暂停的连接器(如果连接器未暂停,则不执行任何操作)
- POST /connectors/{name}/restart - 重新启动连接器(通常是因为它失败了)
- POST /connectors/{name}/tasks/{taskId}/restart
- DELETE /connectors/{name} - 删除连接器,暂停所有任务并删除其配置
Kafka Connect还提供用于获取有关 connector plugin sss信息的REST API:
- GET /connector-plugins- 返回安装在Kafka Connect集群中的连接器插件的列表。
- PUT /connector-plugins/{connector-type}/config/validate - 根据配置定义验证提供的配置值。
自定义connector
要在Kafka和另一个系统之间复制数据,用户会为想要 pull 数据或者 push 数据的系统创建一个connector。 connector 有两类:SourceConnectors 从其他系统导入数据(e.g.JDBCSourceConnector 会将关系型数据库导入到Kafka中)和SinkConnectors导出数据(e.g. HDFSSinkConnector会将Kafka topic 的内容导出到 HDFS 文件)
Connectors 自身不执行任何数据复制:Connector的配置描述要复制的数据,并且Connector 负责负责将 job 分解为可分发给 worker 的一组 Tasks。这些Tasks也分为两类: SourceTask 和 SinkTask。
通过分配,每个Task 必须将数据的一部分子集复制到Kafka或者从Kafka复制。在 Kafka Connect中,应该始终可以将这些分配的数据框架化为一组输入和输出流,这些流由具有一致结构的记录组成。
开发一个 connector 只需要实现两个接口, Connector 和 Task接口. 一个简单的例子的源码在Kafkafile package中。 connector 用于单机模式,并拥有 SourceConnector 和SourceTask实现来读取一个文件的每行记录,并将其作为记录发发送,SinkConnector的SinkTask将记录写入到文件。