Clickhouse消费存储Kafka中的Json数据

2023-05-11  本文已影响0人  国服最坑开发
clickhouse

0x01 概述

在单机完成验证:从Kafak中消费Json格式的数据,转存至 Clickhouse中。
包含了关键的多层级json字段解析提取能力
目标:实现下面这个json的存储:

{
    "id": "123",
    "timestamp": 1234567,
    "payload": {
        "message": "test",
        "measure_string": "haha"
    }
}

0x02 Kafak创建Topic

kafka-topics --bootstrap-server localhost:9092 --topic clickhouseTestJson --create --partitions 6 --replication-factor 1
 kafka-topics --bootstrap-server localhost:9092 --describe clickhouseTestJson.
 
 Topic: clickhouseTestJson  TopicId: PBbkM2ZhQMarwFf4x5znNQ PartitionCount: 6   ReplicationFactor: 1    Configs: segment.bytes=1073741824
    Topic: clickhouseTestJson   Partition: 0    Leader: 0   Replicas: 0 Isr: 0
    Topic: clickhouseTestJson   Partition: 1    Leader: 0   Replicas: 0 Isr: 0
    Topic: clickhouseTestJson   Partition: 2    Leader: 0   Replicas: 0 Isr: 0
    Topic: clickhouseTestJson   Partition: 3    Leader: 0   Replicas: 0 Isr: 0
    Topic: clickhouseTestJson   Partition: 4    Leader: 0   Replicas: 0 Isr: 0
    Topic: clickhouseTestJson   Partition: 5    Leader: 0   Replicas: 0 Isr: 0

0x03 Clickhouse 表创建:

重点说明:
在clickhouse 中,用到三个对象:

-- 创建存储化
CREATE TABLE IF NOT EXISTS event
(
    timestamp UInt64 Codec(DoubleDelta, LZ4),
    id Int64 Codec(Gorilla, LZ4),
    message LowCardinality(String),
    measure_string String Codec(ZSTD),
    date Date DEFAULT toDate(timestamp, 'UTC') Codec(ZSTD),
    timestamp_1min UInt64 DEFAULT (floor(timestamp/60) * 60) Codec(DoubleDelta, LZ4)
) Engine = MergeTree
PARTITION BY toStartOfMonth(date)
ORDER BY (
    id,
    timestamp_1min
);

-- 创建队列
CREATE TABLE IF NOT EXISTS json_queue2 (
  all String
) ENGINE = Kafka
SETTINGS kafka_broker_list = 'localhost:9092',
       kafka_topic_list = 'clickhouseTestJson',
       kafka_group_name = 'clickhouseTestJsonGroup2',
       kafka_format = 'JSONAsString',
       kafka_skip_broken_messages = 10000,
       kafka_max_block_size = 1048576;

-- 创建数据转换
CREATE MATERIALIZED VIEW json_mv2 TO event AS
SELECT
    JSONExtract(all, 'id', 'Int64') AS id,
    JSONExtract(all, 'timestamp', 'Int64') AS timestamp,
    JSONExtractString(all, 'payload', 'message') AS message,
    JSONExtractString(all, 'payload', 'measure_string') AS measure_string
FROM json_queue2;

在上述MATERIALIZED VIEW json_mv2对象中,我们通过JSONExtractString函数实现了将多层级的json进行扁平化存储的能力。

0x03 验证提交数据

kafka-console-producer --broker-list localhost:9092 --topic clickhouseTestJson
{"id":"123","timestamp":1234567,"payload":{"message":"test","measure_string":"haha"}}

0x04 Clickhouse中查看数据

clickhouse-client --stream_like_engine_allow_direct_select 1
use yourdatabase;
select * from event;

# 下面是结果:
SELECT *
FROM event

Query id: f91cb37d-a54a-49a4-800d-1c27e48a9018

┌─timestamp─┬─id─┬─message─┬─measure_string─┬───────date─┬─timestamp_1min─┐
│   1234567 │  0 │ test    │ haha           │ 1970-01-15 │        1234560 │
└───────────┴────┴─────────┴────────────────┴────────────┴────────────────┘

至此完成数据存储,后续可面向 event表,进行数据合并开发。

0x05 增加字段

Materialized View 不支持增加字段,只能删掉重建
Table 列只能加,不能删除

建字段的时候,对数值型要提前定义好,要不然查询时,使用sum类函数会报错。

alter table app_event_raw
    add column cleanTimes Int32 after city;

drop view app_event_view;

CREATE MATERIALIZED VIEW app_event_view TO app_event_raw AS
SELECT JSONExtract(all, 'apptype_name', 'String')       AS apptype_name,
       JSONExtract(all, 'baseplate_version', 'String')  AS baseplate_version,
       JSONExtract(all, 'bd_code', 'String')            AS bd_code,
       JSONExtract(all, 'bd_name', 'String')            AS bd_name,   
FROM app_event_queue;
上一篇 下一篇

猜你喜欢

热点阅读