kafkaspringbootClickHouse学习笔记

ClickHouse数据导入

2020-12-31  本文已影响0人  gaofubao

一 概述

目前Kafka数据导入ClickHouse的常用方案有两种,一种是通过ClickHouse内置的Kafka表引擎实现,另一种是借助数据流组件,如Logstash。

以下会分别介绍这两种方案。

二 数据导入方案

方案一 Kafka表引擎

方案介绍

Kafka表引擎基于librdkafka库实现与Kafka的通信,但它只充当一个数据管道的角色,负责拉取Kafka中的数据;所以还需要一张物化视图将Kafka引擎表中的数据实时同步到本地MergeTree系列表中。

为了提高性能,接受的消息被分组为 maxinsertblocksize 大小(由kafkamax_block_size参数空值,默认值为65536)的块。如果未在 streamflushinterval_ms 毫秒(默认500 ms)内形成块,则不关心块的完整性,都会将数据刷新到表中。


方案一.png

相关配置参数:
https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

<kafka>
  <debug>cgrp</debug>
  <auto_offset_reset>smallest</auto_offset_reset>
</kafka>
 
<kafka_logs>
  <retry_backoff_ms>250</retry_backoff_ms>
  <fetch_min_bytes>100000</fetch_min_bytes>
</kafka_logs>

使用示例

1)部署Kafka

# 部署ZK(略)

# 部署Kafka
docker run -d --name=kafka21 --hostname=node21 --network=host -e KAFKA_ADVERTISED_HOST_NAME=node21 -e KAFKA_ZOOKEEPER_CONNECT=node21:2181,node22:2181,node23:2181 wurstmeister/kafka
 
# 创建Topic
./bin/kafka-topics.sh --zookeeper node21:2181 --create --topic test --partitions 1 --replication-factor 1
 
# 测试
./bin/kafka-console-producer.sh --broker-list node21:9092 --topic test
>{"id":1,"name":"tom"}
>{"id":2,"name":"jerry"}
 
./bin/kafka-console-consumer.sh --bootstrap-server node21:9092 --topic test --from-beginning
{"id":1,"name":"tom"}
{"id":2,"name":"jerry"}

2)创建Kafka引擎表

CREATE TABLE kafka_queue
(
  warehouse_id Int64,
  product_id  Int64,
  product_name String
)
ENGINE = Kafka()
SETTINGS
  kafka_broker_list='node21:9092',
  kafka_topic_list='test',
  kafka_group_name='test',
  kafka_format='JSONEachRow',
  kafka_skip_broken_messages=100

必选参数:

可选参数:

3)创建数据表
使用已有的数据表,以下只给出了分布表的创建语句。

CREATE TABLE warehouse_dist ON CLUSTER cluster_1
(
    warehouse_id Int64,
    product_id Int64,
    product_name String
)
ENGINE = Distributed(cluster_1, default, warehouse_local, warehouse_id)

4)创建物化视图

CREATE MATERIALIZED VIEW kafka_view TO warehouse_dist AS SELECT * FROM kafka_queue

方案二 Logstash

方案介绍

与Elasticsearch写入类似,通过Logstash的ClickHouse插件,订阅Kafka中的数据并写入CH中。其中,ClickHouse插件调用HTTP接口完成数据写入。


方案二.png

使用示例

1)部署Logstash
部署Logstash,并安装ClickHouse插件:

bin/logstash-plugin install logstash-output-clickhouse

2)创建Logstash配置文件

input {
  stdin {
      codec => "json"
  }
}
 
output {
  stdout {
    codec => "json"
  }
  clickhouse {
    http_hosts => ["http://192.168.167.21:8123"]
    table => "warehouse_dist"
    request_tolerance => 1
    flush_size => 1000
    pool_max => 1000
  }
}
 
filter {
  mutate {
    remove_field => [ "@timestamp", "host", "@version" ]
  }
}

相关参数:

3)启动Logstash

./bin/logstash -f config/logstash.conf

三 总结

Kafka引擎表和Logstash都是常见的数据导入方式,

参考:《ClickHouse原理解析与应用实践》

上一篇下一篇

猜你喜欢

热点阅读