Pulsar Source 入门篇
阅读本文需要约 5 分钟。
- Apache Pulsar 是一个分布式发布订阅消息系统。
- Source 是 Pulsar 的一个组件,用来将其他系统的数据输入至 Pulsar。
摘要
本文介绍 Apache Pulsar Source 的基础知识,例如,Source 的常用命令、环境搭建以及使用示例。
Source 常用命令
create
创建 source。
参数 | 解释 |
---|---|
-a , --archive
|
指定 source 的 NAR 包 |
--classname |
指定 source 的类名称 |
--destination-topic-name |
指定目标 Topic 名称 |
--name |
指定 source 的名称 |
--namespace |
指定 source 的命名空间 |
--parallelism |
指定 source 的并发数 |
--source-config-file |
指定 source 使用的配置文件 |
--tenant |
指定 source 所属的租户 |
update
更新 source。
参数 | 解释 |
---|---|
-a , --archive
|
指定 source 的 NAR 包 |
--classname |
指定 source 的类名称 |
--destination-topic-name |
指定目标 Topic 名称 |
--name |
指定 source 的名称 |
--namespace |
指定 source 的命名空间 |
--parallelism |
指定 source 的并发数 |
--source-config-file |
指定 source 使用的配置文件 |
--tenant |
指定 source 所属的租户 |
delete
删除 source。
参数 | 解释 |
---|---|
--name |
指定 source 的名称 |
--namespace |
指定 source 的命名空间 |
--tenant |
指定 source 所属的租户 |
start
启动 source。
参数 | 解释 |
---|---|
--name |
指定 source 的名称 |
--namespace |
指定 source 的命名空间 |
--tenant |
指定 source 所属的租户 |
--instance-id |
指定 source 的 instance-id,如果未指定,将启动所有实例 |
stop
停止 source。
参数 | 解释 |
---|---|
--name |
指定 source 的名称 |
--namespace |
指定 source 的命名空间 |
--tenant |
指定 source 所属的租户 |
--instance-id |
指定 source 的 instance-id,如果未指定,将停止所有实例 |
get
获取 source 信息。
参数 | 解释 |
---|---|
--name |
指定 source 的名称 |
--namespace |
指定 source 的命名空间 |
--tenant |
指定 source 所属的租户 |
status
检查 source 状态。
参数 | 解释 |
---|---|
--name |
指定 source 的名称 |
--namespace |
指定 source 的命名空间 |
--tenant |
指定 source 所属的租户 |
--instance-id |
指定 source 的 instance-id,如果未指定,将获取所有实例状态 |
list
列出所有 source 信息。
参数 | 解释 |
---|---|
--namespace |
指定 source 的命名空间 |
--tenant |
指定 source 所属的租户 |
restart
重启 source。
参数 | 解释 |
---|---|
--name |
指定 source 的名称 |
--namespace |
指定 source 的命名空间 |
--tenant |
指定 source 所属的租户 |
--instance-id |
指定 source 的 instance-id,如果未指定,将重启所有实例 |
localrun
在本地运行 source,方便调试。
参数 | 解释 |
---|---|
-a , --archive
|
指定 source 的 NAR 包 |
--classname |
指定 source 的类名称 |
--destination-topic-name |
指定目标 Topic 名称 |
--name |
指定 source 的名称 |
--namespace |
指定 source 的命名空间 |
--parallelism |
指定 source 的并发数 |
--source-config-file |
指定 source 使用的配置文件 |
--tenant |
指定 source 所属的租户 |
环境搭建
本示例以 Kafka source 为例,实践这些命令。
- 下载所需文件。
wget http://central.maven.org/maven2/org/apache/kafka/kafka-clients/0.10.2.1/kafka-clients-0.10.2.1.jar
wget https://archive.apache.org/dist/pulsar/pulsar-2.4.0/connectors/pulsar-io-kafka-2.4.0.nar
- 创建网络。
实践发现,使用下述方式才能成功;使用 --link 的方式指定网络,会出现问题。
docker network create kafka-pulsar
- 拉取 ZooKeeper 镜像并启动 ZooKeeper 服务。
docker pull wurstmeister/zookeeper
docker run -d -it -p 2181:2181 --name pulsar-kafka-zookeeper --network kafka-pulsar wurstmeister/zookeeper
- 拉取 Kafka 镜像并启动 Kafka 服务。
docker pull wurstmeister/kafka:2.11-1.0.2
docker run -d -it --network kafka-pulsar -p 6667:6667 -p 9092:9092 -e KAFKA_ADVERTISED_HOST_NAME=pulsar-kafka -e KAFKA_ZOOKEEPER_CONNECT=pulsar-kafka-zookeeper:2181 --name pulsar-kafka wurstmeister/kafka:2.11-1.0.2
- 拉取 Pulsar 镜像并启动 Pulsar standalone 服务。
docker pull apachepulsar/pulsar:2.4.0
docker run -d -it --network kafka-pulsar -p 6650:6650 -p 8080:8080 -v $PWD/data:/pulsar/data --name pulsar-kafka-standalone apachepulsar/pulsar:2.4.0 bin/pulsar standalone
- 创建 source 配置文件 kafkaSourceConfig.yaml。
configs:
bootstrapServers: "pulsar-kafka:9092"
groupId: "test-pulsar-io"
topic: "my-topic"
sessionTimeoutMs: "10000"
autoCommitEnabled: "false"
- 创建生产者文件 kafka-producer.py。
kafka-producer.py
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='pulsar-kafka:9092')
future = producer.send('my-topic', b'hello world')
future.get()
- 创建消费者文件 pulsar-client.py。
import pulsar
client = pulsar.Client('pulsar://localhost:6650')
consumer = client.subscribe('my-topic', subscription_name='my-aa')
while True:
msg = consumer.receive()
print msg
print dir(msg)
print("Received message: '%s'" % msg.data())
consumer.acknowledge(msg)
client.close()
- 复制以下文件至 pulsar-kafka-standalone。
docker cp pulsar-io-kafka-2.4.0.nar pulsar-kafka-standalone:/pulsar
docker cp kafkaSourceConfig.yaml pulsar-kafka-standalone:/pulsar/conf
docker cp kafka-clients-0.10.2.1.jar pulsar-kafka-standalone:/pulsar/lib
docker cp pulsar-client.py pulsar-kafka-standalone:/pulsar/
docker cp kafka-producer.py pulsar-kafka-standalone:/pulsar/
- 开启新窗口,使用 localrun 运行 source。
docker exec -it pulsar-kafka-standalone /bin/bash
./bin/pulsar-admin source localrun --archive ./pulsar-io-kafka-2.4.0.nar --classname org.apache.pulsar.io.kafka.KafkaBytesSource --tenant public --namespace default --name kafka --destination-topic-name my-topic --source-config-file ./conf/kafkaSourceConfig.yaml --parallelism 1
- 开启新窗口,运行消费者。
docker exec -it pulsar-kafka-standalone /bin/bash
python pulsar-client.py
- 开启新窗口,运行生产者。
docker exec -it pulsar-kafka-standalone /bin/bash
pip install kafka-python
python3 kafka-producer.py
- 验证。
此时消费者窗口显示以下消息,说明环境搭建成功。
Received message: 'hello world'
使用示例
Localrun 该命令已在前文 #10 实现。前文 #9 已向 /pulsar/lib 文件夹中复制了一个 kafka 的 clients 库,因此需要首先重启 pulsar-kafka-standalone。
docker restart pulsar-kafka-standalone
create
在租户 public 和命名空间 default 下,创建名为 kafka 的 source。
./bin/pulsar-admin source create --archive ./pulsar-io-kafka-2.4.0.nar --classname org.apache.pulsar.io.kafka.KafkaBytesSource --tenant public --namespace default --name kafka --destination-topic-name my-topic --source-config-file ./conf/kafkaSourceConfig.yaml --parallelism 1
"Created successfully"
如果命令行窗口显示以上信息,说明创建成功。
list
显示租户为 public、命名空间为 default 的 source。
./bin/pulsar-admin source list --tenant public --namespace default
[
"kafka"
]
get
获取名称为 kafka 的 source 的信息。
./bin/pulsar-admin source get --tenant public --namespace default --name kafka
{
"tenant": "public",
"namespace": "default",
"name": "kafka",
"className": "org.apache.pulsar.io.kafka.KafkaBytesSource",
"topicName": "my-topic",
"configs": {
"bootstrapServers": "pulsar-kafka:9092",
"groupId": "test-pulsar-io1",
"topic": "my-topic",
"sessionTimeoutMs": "10000",
"autoCommitEnabled": "false"
},
"parallelism": 1,
"processingGuarantees": "ATLEAST_ONCE"
}
以上显示了刚才创建的 source 信息,包括租户、 namespace 、 名称、类名称、所在机器等。
status
获取名称为 kafka 的 source 的运行状态。
./bin/pulsar-admin source status --tenant public --namespace default --name kafka
{
"numInstances" : 1,
"numRunning" : 1,
"instances" : [ {
"instanceId" : 0,
"status" : {
"running" : true,
"error" : "",
"numRestarts" : 0,
"numReceivedFromSource" : 0,
"numSystemExceptions" : 0,
"latestSystemExceptions" : [ ],
"numSourceExceptions" : 0,
"latestSourceExceptions" : [ ],
"numWritten" : 0,
"lastReceivedTime" : 0,
"workerId" : "c-standalone-fw-7e0cf1b3bf9d-8080"
}
} ]
}
以上显示了 source 的实例信息,包括是否正在运行、实例 id、workId 等。
stop
停止租户 public 命名空间 default 下面名称为 kafka 的 source。
./bin/pulsar-admin source stop --tenant public --namespace default --name kafka --instance-id 0
Stopped successfully
start
启动租户 public 命名空间 default 下面名称为 kafka 的 source。
./bin/pulsar-admin source start --tenant public --namespace default --name kafka --instance-id 0
Started successfully
restart
重启租户 public 命名空间 default 下面名称为 kafka 的 source。
./bin/pulsar-admin source restart --tenant public --namespace default --name kafka --instance-id 0
Restarted successfully
update
更新租户 public 命名空间 default 下面名称为 kafka 的 source。
./bin/pulsar-admin source update --archive ./pulsar-io-kafka-2.4.0.nar --classname org.apache.pulsar.io.kafka.KafkaBytesSource --tenant public --namespace default --name kafka --destination-topic-name my-topic --source-config-file ./conf/kafkaSourceConfig.yaml --parallelism 1 --cpu 2
"Updated successfully"
./bin/pulsar-admin source get --tenant public --namespace default --name kafka
{
"tenant": "public",
"namespace": "default",
"name": "kafka",
"className": "org.apache.pulsar.io.kafka.KafkaBytesSource",
"topicName": "my-topic",
"configs": {
"bootstrapServers": "pulsar-kafka:9092",
"groupId": "test-pulsar-io1",
"topic": "my-topic",
"sessionTimeoutMs": "10000",
"autoCommitEnabled": "false"
},
"parallelism": 1,
"processingGuarantees": "ATLEAST_ONCE",
"resources": {
"cpu": 2.0,
"ram": 1073741824,
"disk": 10737418240
}
}
以上示例成功更新了CPU。
delete
删除租户 public 命名空间 default 下面名称为 kafka 的 source。
./bin/pulsar-admin source delete --tenant public --namespace default --name kafka
"Delete source successfully"
./bin/pulsar-admin source get --tenant public --namespace default --name kafka
HTTP 404 Not Found
Reason: Source kafka doesn't exist
以上示例成功删除了该 source。
总结
本文以 Kafka source 为例,介绍了 source 的常用命令、环境搭建和使用示例,之后会有更多文章深入介绍 source,敬请期待。