kafka connector分布式部署

2017-07-29  本文已影响439人  草丛螳螂

配置

[root@book2 schema-registry]# vi schema-registry.properties

listeners=http://book2:8081

kafkastore.connection.url=book2:2181/kafka

kafkastore.topic=_schemas

debug=false

[root@book2 schema-registry]# vi connect-avro-distributed.properties

bootstrap.servers=book2:9092,book3:9092

group.id=connect-cluster

key.converter=io.confluent.connect.avro.AvroConverter

key.converter.schema.registry.url=http://book2:8081

value.converter=io.confluent.connect.avro.AvroConverter

value.converter.schema.registry.url=http://book2:8081

internal.key.converter=org.apache.kafka.connect.json.JsonConverter

internal.value.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter.schemas.enable=false

internal.value.converter.schemas.enable=false

config.storage.topic=connect-configs

offset.storage.topic=connect-offsets

status.storage.topic=connect-status

group.id (默认connect-cluster) - Connect cluster group使用唯一的名称;注意这不能和consumer group ID(消费者组)冲突。

config.storage.topic (默认connect-configs) - topic用于存储connector和任务配置;注意,这应该是一个单个的partition,多副本的topic。你需要手动创建这个topic,以 确保是单个partition(自动创建的可能会有多个partition)。

offset.storage.topic (默认 connect-offsets) - topic用于存储offsets;这个topic应该配置多个partition和副本。

status.storage.topic (默认 connect-status) - topic 用于存储状态;这个topic 可以有多个partitions和副本

启动

/home/confluent-3.2.2/bin/schema-registry-start /home/confluent-3.2.2/etc/schema-registry/schema-registry.properties &

/home/confluent-3.2.2/bin/connect-distributed /home/confluent-3.2.2/etc/schema-registry/connect-avro-distributed.properties &

在分布式模式中,connector(连接器)配置不能使用命令行。要使用下面介绍的REST API来创建,修改和销毁connector。 配置连接器(connector)

Connector的配置是简单的key-value映射。对于独立模式,这些都是在属性文件中定义,并通过在命令行上的Connect处理。在分布式模 式,JSON负载connector的创建(或修改)请求。大多数配置都是依赖的connector,有几个常见的选项:

name - 连接器唯一的名称,不能重复。

connector.class - 连接器的Java类,比如连接器是io.svectors.hbase.sink.HBaseSinkConnector。

tasks.max - 连接器创建任务的最大数。

topics - 作为连接器的输入的topic列表。

由于Kafka Connect的目的是作为一个服务运行,提供了一个用于管理connector的REST API。默认情况下,此服务的端口是8083。以下是当前支持的终端入口:

GET /connectors - 返回活跃的connector列表

POST /connectors - 创建一个新的connector;请求的主体是一个包含字符串name字段和对象config字段(connector的配置参数)的JSON对象。

GET /connectors/{name} - 获取指定connector的信息

GET /connectors/{name}/config - 获取指定connector的配置参数

PUT /connectors/{name}/config - 更新指定connector的配置参数

GET /connectors/{name}/status - 获取connector的当前状态,包括它是否正在运行,失败,暂停等。

GET /connectors/{name}/tasks - 获取当前正在运行的connector的任务列表。

GET /connectors/{name}/tasks/{taskid}/status - 获取任务的当前状态,包括是否是运行中的,失败的,暂停的等,

PUT /connectors/{name}/pause - 暂停连接器和它的任务,停止消息处理,直到connector恢复。

PUT /connectors/{name}/resume - 恢复暂停的connector(如果connector没有暂停,则什么都不做)

POST /connectors/{name}/restart - 重启connector(connector已故障)

参考链接:

github.com/mravi/kafka-connect-hbase

orchome.com/345

blog.csdn.net/chuanzhongdu1/article/details/51365535

blog.csdn.net/ludonqin/article/details/52387769

上一篇下一篇

猜你喜欢

热点阅读