Apache Kafka

基于Splunk Connector for Kafka 实现S

2019-09-30  本文已影响0人  半夜菊花茶

Kafka是Apache的基于订阅发布模式的分布式消息队列,主要应用于数据管道和服务之间的流式数据,很多公司和组织都在利用Apache Kafka提供的能力构建下一代流式应用。本文介绍如何使用开源的Splunk Connector for Kafka消费Kafka数据并转发至Splunk,同时可以自由指定写入数据的sourcetype,便于后续数据解析处理。
Splunk Connect for Kafka是一种侵入式的客户端,基于Kafka Connect框架开发,用于将数据从Kafka的topic取出,并通过Splunk HTTP Event Collector(HEC)发送到Splunk

0x01 准备工作

0x02 配置Splunk

HTTP Event Collector(HEC)

配置Splunk接收从Kafka发送过来的消息需要配置HEC,通常有两种方式配置接收,即通过HEC接收后由Heavy Forwarder转发或者直接发送到Indexer节点,本文采用第二种方式,将消息直接发送到Indexer。
配置HEC,登录Splunk Web 页面,Setting > Data Inputs > HTTP Event Collector 点击Global Settings,确认All Tokens 为enabled状态,然后点击Save。这里要注意下面的HTTP Port Number,在后面的配置中会用到它。


image.png

然后点击New Token来创建一个新的HEC 令牌。
有关Splunk Acknowledgement 先不在本文讨论,感兴趣的同学请自行google


image.png

下一步会提示设置sourcetype,context和index等内容,按需配置即可。
对于大规模分布式的Splunk环境,生成token的方式请参考官方文档

Connector安装和配置

首先解压从GitHub上下载的Connector包,并将它复制到所有Kafka Connect节点上(默认情况下就是Kafka的安装节点)
修改配置文件,路径默认在Kafka安装目录下的config目录config/connect-distributed.properties

#These settings may already be configured if you have deployed a connector in your Kafka Connect Environment
bootstrap.servers=<BOOTSTRAP_SERVERS>
plugin.path=<PLUGIN_PATH>
#Required
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.flush.interval.ms=10000
#Recommended
group.id=kafka-connect-splunk-hec-sink

注意:

./bin/connect-distributed.sh config/connect-distributed.properties

执行以下命令验证插件是否启动

curl http://<KAFKA_CONNECT_HOST>:8083/connector-plugins

返回信息应该会是Connector的名称com.splunk.kafka.connect.SplunkSinkConnector

创建任务

通过上面的步骤我们启动了一个Splunk Connector for Kafka的服务,但是并没有指定要订阅哪个主题以及发送的目的端,接下来我们需要通过RESTful API创建任务。
在Kafka Connect节点上执行:

curl localhost:8083/connectors -X POST -H "Content-Type: application/json" -d'{
  "name": "splunk-prod-financial",
    "config": {
     "connector.class": "com.splunk.kafka.connect.SplunkSinkConnector",
     "tasks.max": "3",
     "topics": "NIP_SYSLOG_1515UDP2",
     "splunk.indexes": "main",
     "splunk.hec.uri":"https://192.168.134.56:8088",
     "splunk.hec.token": "4b6c844b-125c-4133-b23f-2f4de21b634a",
     "splunk.hec.ack.enabled" : "true",
     "splunk.hec.raw" : "false",
     "splunk.hec.json.event.enrichment" : "org=fin,bu=south-east-us",
     "splunk.hec.ssl.validate.certs": "false",
     "splunk.hec.track.data" : "true"
    }
}'

这里需要注意:

验证

要验证配置是否正确,需要往Kafka指定的主题里面写入数据,然后在Splunk上查看是否可以收到对应数据内容。写入数据有很多种方式,可以使用Splunk提供的数据生成工具Kafka data-gen-app或者kafka-console-producer,再或者使用kafka自带的客户端直接写消息进去,比如:
在kafka节点上执行:

bin/kafka-console-producer.sh --broker-list 192.168.134.57:9092 --topic NIP_SYSLOG_1515UDP2
image.png

然后在Splunk 搜索:


image.png

其他常用API

通过调用任务创建API,Splunk Connector开始订阅指定主题,此时我们可以通过另外几个API查询任务状态

描述 命令
列出有效connectors curl http://localhost:8083/connectors
获取指定Connector信息 curl http://localhost:8083/connectors/kafka-connect-splunk
获取指定Connector配置 curl http://localhost:8083/connectors/kafka-connect-splunk/config
删除指定Connector curl http://localhost:8083/connectors/kafka-connect-splunk -X DELETE
获取指定connector任务信息 curl http://localhost:8083/connectors/kafka-connect-splunk/tasks

参考链接

Install and Use Splunk Connect for Kafka
Splunk Connect for Kafka – Connecting Apache Kafka with Splunk
Github Kafa-connect-splunk

上一篇 下一篇

猜你喜欢

热点阅读