PipelineDB1.0,用写SQL的方式进行实时计算
简述
PipelineDB是一个高性能的PostgreSQL扩展,用于在时间序列数据上连续运行SQL查询。这些连续查询的输出存储在常规表中,可以像查询任何其他表或视图一样查询这些表。因此,连续查询可以看作是非常高的吞吐量、增量更新的物化视图。
简言之,与传统的实时计算引擎(storm、kafka streams等)相比,PipelineDB不需要写任何程序代码,只需要写SQL即可实现实时指标的聚合计算,统计实时指标开发耗时可以从原来的天级骤降到分钟级。
要点简概
简要数据流向图上图为PipelineDB在实际应用中的简要数据流向图。
下面对图中涉及的特定类型的表进行解释:
- Stream:进行时序数据聚合计算的第一个必需基础表类型,外部数据会顺序流入Stream表。该表不实际储存数据,当一条数据被后续所有的必须View/Transform读取后即会被丢弃。Stream作为PipelineDB的第一道关口,承接从外部实时写入到PipelineDB内的数据。之后,可以以Stream为基础表,在其上创建若干个统计结果的View或者转换内容的Transform。
- Continuous View:PipelineDB的基本抽象,从Stream读取数据并将新数据按照SQL内条件(count、sum等)实时聚合,并将结果实时增量更新到View内。Continuous View是物化视图,实际内容会存储在以_mrel结尾的表中。
- Continuous Transform:进行实时转换数据,比如解析URL、关联维表等。Continuous Transform不支持聚合,不储存数据,转换后的数据可以实时写入到另一个Stream或者表。
上图源引自PipelineDB官网,直观的展示了它的实时计算过程。
用法实践
安装
从1.0版本开始,PipelineDB开始作为PostgreSQL的一个扩展存在,安装PipelineDB之前请先安装PostgreSQL。
具体安装方法请参见官方安装教程
连接PostgreSQL数据库
为了连接方便,可以在安装有PostgreSQL的服务器上直接敲"psql"进入交互式命令行。
当需要远程连接时,可以写一个名为“pg”的bash脚本并将路径export到PATH,内容:
#!/bin/bash
source /etc/profile
# 设置密码环境变量
export PGPASSWORD='PostgreSQL密码'
# 登录PostgreSQL
declare -A postgres
postgres=(
[pg1911-(Postgres11)]="psql -h test_host -p 1911 -U PostgreSQL用户名 -d postgres"
)
dbs=($(printf '%s\n' "${!postgres[@]}" | sort))
echo
echo "+-------------------------------+"
echo "|Postgres/PipelineDB连接菜单工具|"
echo "+-------------------------------+"
echo "请选择数据库序号..."
select db in ${dbs[@]}; do
break
done
echo "执行命令:${postgres[$db]} ..."
eval "${postgres[$db]}"
exit
运行pg脚本,选择对应的序号后即可连接到对应数据库。(注意:远程连接前,请先配置好pg_hba.conf)
查看已创建Schema
\dn
注意: PostgreSQL里的schema可以类比MySQL里的database,但是并不相同,PostgreSQL有自己的database属性。
查看创建Schema的语句
\h CREATE SCHEMA;
创建Schema
CREATE SCHEMA IF NOT EXISTS test;
设置表搜索路径
SET search_path TO test, public;
敲重点: 截止到目前,因为官方的bug,在创建view等前请确保目前的schema是public,或者使用上述方式把public加到搜索路径内,否则会报函数找不到等错误。
查看当前Schema下表
\d 或者 \d+
创建Stream
CREATE FOREIGN TABLE IF NOT EXISTS test.streams_page_log_kafka (
data json
)
SERVER pipelinedb;
-- 特别情况下,可以对STREAM增加字段(只能增,不能删)
ALTER FOREIGN TABLE test.streams_page_log_kafka ADD COLUMN x integer;
-- 删除STREAM
DROP FOREIGN TABLE test.streams_page_log_kafka;
注意:stream默认会添加一个arrival_timestamp字段,用来记录每条日志的到达时间,该字段可以用在诸如滑动窗口的地方。
创建Continuous View
CREATE VIEW test.rt_view_stat_daily_page WITH (action=materialize) AS
SELECT
to_date(data->>'log_datetime', 'YYYY-MM-DD HH24:MI:SS') AS f_ds,
SUM(COALESCE(cast(data->>'unreal_key' AS numeric), 0)) AS "testName", -- 使用SUM时,请务必使用COALESCE把NULL值替换掉,否则会导致严重的数据库后端进程崩溃重启!!!
COUNT(*) AS pv,
COUNT(DISTINCT data->>'visitid') AS uv
FROM test.streams_page_log_kafka
GROUP BY
f_ds
;
-- 向stream插入三条测试数据
INSERT INTO test.streams_page_log_kafka (data) VALUES ('{"log_datetime":"2019-01-02 17:20:54","visitid":"f9v65a20-18b8-bc77"}'), ('{"log_datetime":"2019-01-02 18:12:21","visitid":"f9v65a20-18b8-bc77"}'), ('{"log_datetime":"2019-01-02 19:35:50","visitid":"z8a32b19-28z8-kl57"}');
-- 查看结果
postgres=# SELECT * FROM test.rt_view_stat_daily_page;
f_ds | testName | pv | uv
------------+----------+----+----
2019-01-02 | 0 | 3 | 2
-- 清空VIEW内数据
SELECT pipelinedb.truncate_continuous_view('test.rt_view_stat_daily_page');
-- 删除VIEW
DROP VIEW test.rt_view_stat_daily_page;
创建TTL(Time-To-Live)View表
TTL表会根据指定字段,尽可能删除早于指定时间的数据。
-- TTL表可以尽量销毁早于指定时间的数据
CREATE VIEW test.rt_view_stat_daily_ttl_page WITH (action=materialize, ttl='1 month', ttl_column='ttl_ds') AS
SELECT
to_date(data->>'log_datetime', 'YYYY-MM-DD HH24:MI:SS') AS f_ds,
day(to_timestamp(data->>'log_datetime', 'YYYY-MM-DD HH24:MI:SS')) AS ttl_ds,
COUNT(*) AS pv,
COUNT(DISTINCT data->>'visitid') AS uv
FROM test.streams_page_log_kafka
GROUP BY
f_ds,
ttl_ds
;
可以通过pipelinedb.set_ttl函数对continuous view增加、修改、移除TTL。*
具体详见:http://docs.pipelinedb.com/continuous-views.html#modifying-ttls
创建TRANSFORM
CREATE VIEW test.rt_trans_add_message_name_page WITH (action=transform) AS
SELECT
v1.data->>'company' AS company,
v1.data->>'appf' AS appf,
v1.data->>'sourcec' AS sourcec,
v1.data->>'message_type' AS message_type,
v1.data->>'message' AS message,
v2.message_name,
v1.data->>'visitid' AS visitid,
v1.data->>'log_datetime' AS log_datetime
FROM test.streams_page_log_kafka v1
LEFT JOIN dw_setting.dim_source v2 ON v1.data->>'message' = v2.source
WHERE v1.data->>'message_type' IN ('asta', 'moshi', 'kozi', 'semi')
;
-- 从TRANSFORM创建VIEW,通过output_of读取tansform后的数据
CREATE VIEW test.rt_view_source_stat_daily_utrack WITH (action=materialize) AS
SELECT
to_date(log_datetime, 'YYYY-MM-DD HH24:MI:SS') AS f_ds,
message_type,
message,
COUNT(*) AS pv
FROM output_of('test.rt_trans_add_message_name_page')
GROUP BY
f_ds,
message_type,
message
;
-- TRANSFORM后的数据写入到STREAM
CREATE FOREIGN TABLE IF NOT EXISTS test.streams_track_page_from_trans_utrack (
company text,
appf text,
sourcec text,
message_type text,
message text,
message_name text,
visitid text,
log_datetime text
)
SERVER pipelinedb;
CREATE VIEW test.rt_trans_add_message_name_to_stream_utrack WITH (action=transform, outputfunc=pipelinedb.insert_into_stream('test.streams_track_page_from_trans_utrack')) AS
SELECT
v1.data->>'company' AS company,
v1.data->>'appf' AS appf,
v1.data->>'sourcec' AS sourcec,
v1.data->>'message_type' AS message_type,
v1.data->>'message' AS message,
v2.message_name,
v1.data->>'visitid' AS visitid,
v1.data->>'log_datetime' AS log_datetime
FROM test.streams_page_log_kafka v1
LEFT JOIN dw_setting.dim_source v2 ON v1.data->>'message' = v2.source
WHERE v1.data->>'message_type' IN ('asta', 'moshi', 'kozi', 'semi')
;
注意:因为已知bug,使用TRANSFORM时请不要使用自定义函数,否则会有异常报错!可以使用自带的pipelinedb.insert_into_stream将结果写入到另一个STREAM
利用combine统计更粗粒度的UV
combine:可以使用view表保存的更多信息计算出准确的聚合值(sum/avg...),而不是简单的把现在值累加起来。
-- 创建分小时统计的view
CREATE VIEW test.rt_view_stat_hourly_page AS
SELECT
to_date(data->>'log_datetime', 'YYYY-MM-DD HH24:MI:SS') AS f_ds,
date_part('hour', to_timestamp(data->>'log_datetime', 'YYYY-MM-DD HH24:MI:SS')) AS f_hour,
COUNT(DISTINCT data->>'visitid') AS uv
FROM test.streams_page_log_kafka
GROUP BY
f_ds,
f_hour
;
-- 同时向stream插入三条测试数据
-- 两个用户在三个不同时间段的三条记录
INSERT INTO test.streams_page_log_kafka (data) VALUES ('{"log_datetime":"2019-01-02 17:20:54","visitid":"f9v65a20-18b8-bc77"}'), ('{"log_datetime":"2019-01-02 18:12:21","visitid":"f9v65a20-18b8-bc77"}'), ('{"log_datetime":"2019-01-02 19:35:50","visitid":"z8a32b19-28z8-kl57"}');
如上所示,我们创建了一个按照天和小时聚合统计uv的表。当有需求需要统计每天的uv数时,不必要重新创建以天统计的view,我们可以利用combine的特性聚合出实际的结果。
-- 简单sum的结果 (错误)
postgres=# select f_ds, sum(uv) from test.rt_view_stat_hourly_page group by f_ds;
f_ds | sum
------------+-----
2019-01-02 | 3
(1 row)
-- 使用combine的结果 (正确)
postgres=# select f_ds, combine(uv) from test.rt_view_stat_hourly_page group by f_ds;
f_ds | combine
------------+---------
2019-01-02 | 2
注意:Pipelinedb计算uv使用的算法是HLL(HyperLogLog),官方宣称误差约0.81%。经实际测试,100以内基本无误差,1000条丢失约8条UV数据,10000条丢失约13条数据,可用。
滑动窗口
在实际统计工作中,不可避免会有统计一段时间窗口内数据的需求,比如最近30分钟的在线用户数、最近1分钟的浏览量等等。PipelineDB利用arrival_timestamp和clock_timestamp(总是返回现在的时间)实现了滑动窗口的计算。在实际使用中通过WITH子句的sw指定窗口区间(sliding window)。
-- 统计最近1分钟的访客数
CREATE VIEW test.recent_user_count_page WITH (sw = '1 minute') AS
SELECT
COUNT(DISTINCT data->>'visitid') AS uv
FROM test.streams_page_log_kafka
;
注意:有局限性,不支持自定义时间计算窗口数据
时间分桶指标计算
date_round(timestamp, resolution):该函数会把时间向下转换到最近的分桶时间。利用date_round可以非常方便的统计诸如每十分钟浏览量的实时指标数据。
-- 每10分钟浏览量
CREATE VIEW test.stat_pv_bucket_10_page AS
SELECT
date_round(to_timestamp(data->>'log_datetime', 'YYYY-MM-DD HH24:MI:SS'), '10 minutes') AS bucket_10m,
COUNT(*) AS pv
FROM test.streams_page_log_kafka
GROUP BY
bucket_10m
;
其他命令或trick
-- psql命令帮助
\?
-- 激活/停止 VIEW/TRANSFORM
SELECT pipelinedb.activate('continuous_view_or_transform');
SELECT pipelinedb.deactivate('continuous_view_or_transform');
--回填数据到老的VIEW
SELECT pipelinedb.combine_table('continuous_view_3', 'continuous_view_mrel');
-- 获取所有的VIEW
SELECT * FROM pipelinedb.get_views();
-- 获取所有的TRANSFORM
SELECT * FROM pipelinedb.get_transforms();
-- 创建表,当字段是字符串类型时,除非你知道这个字段值永远不会超过某个长度,
-- 否则请使用text格式,从此再也不用担心长度问题!
-- 使用Postgres作为api的后端数据库时,可以在SQL前添加下面的语句,避免一些转义符引起的错误
SET standard_conforming_strings = off;
提个醒:
-
现在在PipelineDB内,一个简单的数据聚合统计模型已经构建完了。
-
因为还没有数据写入到stream,所以现在view内还没有统计结果。
数据写入到Stream
- 使用INSERT INTO,该方法主要体现在测试环节
- 使用COPY命令,该方法适合从文件导入数据到数据库
- 使用pipeline_kafka,该插件配置后可以实时从kafka读取数据并写入到stream
- 自己实现从kafka等写入到stream的过程
测试写入
INSERT INTO test.streams_page_log_kafka (data) VALUES (
'{"log_datetime":"2019-01-02 17:20:54","visitid":"f9v65a20-18b8-bc77","userid":"2345521","version":0,"company":"search","appf":"search","sourcec":"search","ch_source":"","message":"m.baidu.com","message_type":"domain","log_type":"page","terminal":"touch"}'
);
-- 查看实时统计结果
SELECT * FROM test.rt_view_stat_daily_page;
f_ds | testName | pv | uv
------------+----------+----+----
2019-01-02 | 0 | 1 | 1
使用pipeline_kafka
官方插件,读取kafka的速度依赖kafka的分区数,在小量级实时计算时,可以采用。
具体使用方法不再赘述,详见http://docs.pipelinedb.com/integrations.html#apache-kafka
自己实现写入部分
当量级上升到pipeline_kafka无法及时处理或者有其他需求时,建议自己实现从kafka等到stream的写入部分。如下图,根据之前做的一次PipelineDB的0.95版本和1.0版本的性能测试,对只有一个分区的topic读取速度,pipeline_kafka只有约2w/s。而自己实现的写入可以达到约5w/s。
性能测试结果参考
PipelineDB官方文档:http://docs.pipelinedb.com/index.html
PostgreSQL官方文档:https://www.postgresql.org/docs/11/index.html