PostgreSQLPostgreSQL

PipelineDB1.0,用写SQL的方式进行实时计算

2019-02-26  本文已影响211人  洛洛尘沙

简述

PipelineDB是一个高性能的PostgreSQL扩展,用于在时间序列数据上连续运行SQL查询。这些连续查询的输出存储在常规表中,可以像查询任何其他表或视图一样查询这些表。因此,连续查询可以看作是非常高的吞吐量、增量更新的物化视图。

简言之,与传统的实时计算引擎(storm、kafka streams等)相比,PipelineDB不需要写任何程序代码,只需要写SQL即可实现实时指标的聚合计算,统计实时指标开发耗时可以从原来的天级骤降到分钟级。

要点简概

简要数据流向图

上图为PipelineDB在实际应用中的简要数据流向图。

下面对图中涉及的特定类型的表进行解释:

  1. Stream:进行时序数据聚合计算的第一个必需基础表类型,外部数据会顺序流入Stream表。该表不实际储存数据,当一条数据被后续所有的必须View/Transform读取后即会被丢弃。Stream作为PipelineDB的第一道关口,承接从外部实时写入到PipelineDB内的数据。之后,可以以Stream为基础表,在其上创建若干个统计结果的View或者转换内容的Transform。
  2. Continuous View:PipelineDB的基本抽象,从Stream读取数据并将新数据按照SQL内条件(count、sum等)实时聚合,并将结果实时增量更新到View内。Continuous View是物化视图,实际内容会存储在以_mrel结尾的表中。
  3. Continuous Transform:进行实时转换数据,比如解析URL、关联维表等。Continuous Transform不支持聚合,不储存数据,转换后的数据可以实时写入到另一个Stream或者表。
动态展示图

上图源引自PipelineDB官网,直观的展示了它的实时计算过程。

用法实践

安装

从1.0版本开始,PipelineDB开始作为PostgreSQL的一个扩展存在,安装PipelineDB之前请先安装PostgreSQL。

具体安装方法请参见官方安装教程

连接PostgreSQL数据库

为了连接方便,可以在安装有PostgreSQL的服务器上直接敲"psql"进入交互式命令行。

当需要远程连接时,可以写一个名为“pg”的bash脚本并将路径export到PATH,内容:

#!/bin/bash

source /etc/profile

# 设置密码环境变量
export PGPASSWORD='PostgreSQL密码'

# 登录PostgreSQL
declare -A postgres
postgres=(
    [pg1911-(Postgres11)]="psql -h test_host -p 1911 -U PostgreSQL用户名 -d postgres"
)

dbs=($(printf '%s\n' "${!postgres[@]}" | sort))

echo
echo "+-------------------------------+"
echo "|Postgres/PipelineDB连接菜单工具|"
echo "+-------------------------------+"
echo "请选择数据库序号..."

select db in ${dbs[@]}; do
  break
done

echo "执行命令:${postgres[$db]} ..."
eval "${postgres[$db]}"

exit

运行pg脚本,选择对应的序号后即可连接到对应数据库。(注意:远程连接前,请先配置好pg_hba.conf)

查看已创建Schema

\dn

注意: PostgreSQL里的schema可以类比MySQL里的database,但是并不相同,PostgreSQL有自己的database属性。

查看创建Schema的语句

\h CREATE SCHEMA;

创建Schema

CREATE SCHEMA IF NOT EXISTS test;

设置表搜索路径

SET search_path TO test, public;

敲重点: 截止到目前,因为官方的bug,在创建view等前请确保目前的schema是public,或者使用上述方式把public加到搜索路径内,否则会报函数找不到等错误。

查看当前Schema下表

\d 或者 \d+

创建Stream

CREATE FOREIGN TABLE IF NOT EXISTS test.streams_page_log_kafka (
    data json
)
SERVER pipelinedb;
  
-- 特别情况下,可以对STREAM增加字段(只能增,不能删)
ALTER FOREIGN TABLE test.streams_page_log_kafka ADD COLUMN x integer;
-- 删除STREAM
DROP FOREIGN TABLE test.streams_page_log_kafka;

注意:stream默认会添加一个arrival_timestamp字段,用来记录每条日志的到达时间,该字段可以用在诸如滑动窗口的地方。

创建Continuous View

CREATE VIEW test.rt_view_stat_daily_page WITH (action=materialize) AS
SELECT
    to_date(data->>'log_datetime', 'YYYY-MM-DD HH24:MI:SS') AS f_ds,
    SUM(COALESCE(cast(data->>'unreal_key' AS numeric), 0)) AS "testName",  -- 使用SUM时,请务必使用COALESCE把NULL值替换掉,否则会导致严重的数据库后端进程崩溃重启!!!
    COUNT(*) AS pv,
    COUNT(DISTINCT data->>'visitid') AS uv
FROM test.streams_page_log_kafka
GROUP BY
    f_ds
;

-- 向stream插入三条测试数据
INSERT INTO test.streams_page_log_kafka (data) VALUES ('{"log_datetime":"2019-01-02 17:20:54","visitid":"f9v65a20-18b8-bc77"}'), ('{"log_datetime":"2019-01-02 18:12:21","visitid":"f9v65a20-18b8-bc77"}'), ('{"log_datetime":"2019-01-02 19:35:50","visitid":"z8a32b19-28z8-kl57"}');

-- 查看结果
postgres=# SELECT * FROM test.rt_view_stat_daily_page;
    f_ds    | testName | pv | uv
------------+----------+----+----
 2019-01-02 |        0 |  3 |  2


-- 清空VIEW内数据
SELECT pipelinedb.truncate_continuous_view('test.rt_view_stat_daily_page');
  
-- 删除VIEW
DROP VIEW test.rt_view_stat_daily_page;

创建TTL(Time-To-Live)View表

TTL表会根据指定字段,尽可能删除早于指定时间的数据。

-- TTL表可以尽量销毁早于指定时间的数据
CREATE VIEW test.rt_view_stat_daily_ttl_page WITH (action=materialize, ttl='1 month', ttl_column='ttl_ds') AS
SELECT
    to_date(data->>'log_datetime', 'YYYY-MM-DD HH24:MI:SS') AS f_ds,
    day(to_timestamp(data->>'log_datetime', 'YYYY-MM-DD HH24:MI:SS')) AS ttl_ds,
    COUNT(*) AS pv,
    COUNT(DISTINCT data->>'visitid') AS uv
FROM test.streams_page_log_kafka
GROUP BY
    f_ds,
    ttl_ds
;

可以通过pipelinedb.set_ttl函数对continuous view增加、修改、移除TTL。*

具体详见:http://docs.pipelinedb.com/continuous-views.html#modifying-ttls

创建TRANSFORM

CREATE VIEW test.rt_trans_add_message_name_page WITH (action=transform) AS
SELECT
    v1.data->>'company'      AS company,
    v1.data->>'appf'         AS appf,
    v1.data->>'sourcec'      AS sourcec,
    v1.data->>'message_type' AS message_type,
    v1.data->>'message'      AS message,
    v2.message_name,
    v1.data->>'visitid'      AS visitid,
    v1.data->>'log_datetime' AS log_datetime
FROM test.streams_page_log_kafka v1
LEFT JOIN dw_setting.dim_source v2 ON v1.data->>'message' = v2.source
WHERE v1.data->>'message_type' IN ('asta', 'moshi', 'kozi', 'semi')
;
  
-- 从TRANSFORM创建VIEW,通过output_of读取tansform后的数据
CREATE VIEW test.rt_view_source_stat_daily_utrack WITH (action=materialize) AS
SELECT
    to_date(log_datetime, 'YYYY-MM-DD HH24:MI:SS') AS f_ds,
    message_type,
    message,
    COUNT(*) AS pv
FROM output_of('test.rt_trans_add_message_name_page')
GROUP BY
    f_ds,
    message_type,
    message
;
  
-- TRANSFORM后的数据写入到STREAM
CREATE FOREIGN TABLE IF NOT EXISTS test.streams_track_page_from_trans_utrack (
    company      text,
    appf         text,
    sourcec      text,
    message_type text,
    message      text,
    message_name text,
    visitid      text,
    log_datetime text
)
SERVER pipelinedb;
CREATE VIEW test.rt_trans_add_message_name_to_stream_utrack WITH (action=transform, outputfunc=pipelinedb.insert_into_stream('test.streams_track_page_from_trans_utrack')) AS
SELECT
    v1.data->>'company'      AS company,
    v1.data->>'appf'         AS appf,
    v1.data->>'sourcec'      AS sourcec,
    v1.data->>'message_type' AS message_type,
    v1.data->>'message'      AS message,
    v2.message_name,
    v1.data->>'visitid'      AS visitid,
    v1.data->>'log_datetime' AS log_datetime
FROM test.streams_page_log_kafka v1
LEFT JOIN dw_setting.dim_source v2 ON v1.data->>'message' = v2.source
WHERE v1.data->>'message_type' IN ('asta', 'moshi', 'kozi', 'semi')
;

注意:因为已知bug,使用TRANSFORM时请不要使用自定义函数,否则会有异常报错!可以使用自带的pipelinedb.insert_into_stream将结果写入到另一个STREAM

利用combine统计更粗粒度的UV

combine:可以使用view表保存的更多信息计算出准确的聚合值(sum/avg...),而不是简单的把现在值累加起来。

-- 创建分小时统计的view
CREATE VIEW test.rt_view_stat_hourly_page AS
SELECT
    to_date(data->>'log_datetime', 'YYYY-MM-DD HH24:MI:SS') AS f_ds,
    date_part('hour', to_timestamp(data->>'log_datetime', 'YYYY-MM-DD HH24:MI:SS')) AS f_hour,
    COUNT(DISTINCT data->>'visitid') AS uv
FROM test.streams_page_log_kafka
GROUP BY
    f_ds,
    f_hour
;

-- 同时向stream插入三条测试数据
-- 两个用户在三个不同时间段的三条记录
INSERT INTO test.streams_page_log_kafka (data) VALUES ('{"log_datetime":"2019-01-02 17:20:54","visitid":"f9v65a20-18b8-bc77"}'), ('{"log_datetime":"2019-01-02 18:12:21","visitid":"f9v65a20-18b8-bc77"}'), ('{"log_datetime":"2019-01-02 19:35:50","visitid":"z8a32b19-28z8-kl57"}');

如上所示,我们创建了一个按照天和小时聚合统计uv的表。当有需求需要统计每天的uv数时,不必要重新创建以天统计的view,我们可以利用combine的特性聚合出实际的结果。

-- 简单sum的结果 (错误)
postgres=# select f_ds, sum(uv) from test.rt_view_stat_hourly_page group by f_ds;
    f_ds    | sum
------------+-----
 2019-01-02 |   3
(1 row)

-- 使用combine的结果 (正确)
postgres=# select f_ds, combine(uv) from test.rt_view_stat_hourly_page group by f_ds;
    f_ds    | combine
------------+---------
 2019-01-02 |       2

注意:Pipelinedb计算uv使用的算法是HLL(HyperLogLog),官方宣称误差约0.81%。经实际测试,100以内基本无误差,1000条丢失约8条UV数据,10000条丢失约13条数据,可用。

滑动窗口

在实际统计工作中,不可避免会有统计一段时间窗口内数据的需求,比如最近30分钟的在线用户数、最近1分钟的浏览量等等。PipelineDB利用arrival_timestamp和clock_timestamp(总是返回现在的时间)实现了滑动窗口的计算。在实际使用中通过WITH子句的sw指定窗口区间(sliding window)。

-- 统计最近1分钟的访客数
CREATE VIEW test.recent_user_count_page WITH (sw = '1 minute') AS
SELECT
    COUNT(DISTINCT data->>'visitid') AS uv
FROM test.streams_page_log_kafka
;

注意:有局限性,不支持自定义时间计算窗口数据

时间分桶指标计算

date_round(timestamp, resolution):该函数会把时间向下转换到最近的分桶时间。利用date_round可以非常方便的统计诸如每十分钟浏览量的实时指标数据。

-- 每10分钟浏览量
CREATE VIEW test.stat_pv_bucket_10_page AS
SELECT
  date_round(to_timestamp(data->>'log_datetime', 'YYYY-MM-DD HH24:MI:SS'), '10 minutes') AS bucket_10m,
  COUNT(*) AS pv
FROM test.streams_page_log_kafka
GROUP BY
    bucket_10m
;

其他命令或trick

-- psql命令帮助
\?

-- 激活/停止 VIEW/TRANSFORM
SELECT pipelinedb.activate('continuous_view_or_transform');
SELECT pipelinedb.deactivate('continuous_view_or_transform');
 
--回填数据到老的VIEW
SELECT pipelinedb.combine_table('continuous_view_3', 'continuous_view_mrel');
 
-- 获取所有的VIEW
SELECT * FROM pipelinedb.get_views();
 
-- 获取所有的TRANSFORM
SELECT * FROM pipelinedb.get_transforms();

-- 创建表,当字段是字符串类型时,除非你知道这个字段值永远不会超过某个长度,
-- 否则请使用text格式,从此再也不用担心长度问题!
  
-- 使用Postgres作为api的后端数据库时,可以在SQL前添加下面的语句,避免一些转义符引起的错误
SET standard_conforming_strings = off;

提个醒:


数据写入到Stream

  • 使用INSERT INTO,该方法主要体现在测试环节
  • 使用COPY命令,该方法适合从文件导入数据到数据库
  • 使用pipeline_kafka,该插件配置后可以实时从kafka读取数据并写入到stream
  • 自己实现从kafka等写入到stream的过程

测试写入

INSERT INTO test.streams_page_log_kafka (data) VALUES (
  '{"log_datetime":"2019-01-02 17:20:54","visitid":"f9v65a20-18b8-bc77","userid":"2345521","version":0,"company":"search","appf":"search","sourcec":"search","ch_source":"","message":"m.baidu.com","message_type":"domain","log_type":"page","terminal":"touch"}'
);

-- 查看实时统计结果
SELECT * FROM test.rt_view_stat_daily_page;
    f_ds    | testName | pv | uv
------------+----------+----+----
 2019-01-02 |        0 |  1 |  1

使用pipeline_kafka

官方插件,读取kafka的速度依赖kafka的分区数,在小量级实时计算时,可以采用。

具体使用方法不再赘述,详见http://docs.pipelinedb.com/integrations.html#apache-kafka

自己实现写入部分

当量级上升到pipeline_kafka无法及时处理或者有其他需求时,建议自己实现从kafka等到stream的写入部分。如下图,根据之前做的一次PipelineDB的0.95版本和1.0版本的性能测试,对只有一个分区的topic读取速度,pipeline_kafka只有约2w/s。而自己实现的写入可以达到约5w/s。

性能测试结果

参考

PipelineDB官方文档:http://docs.pipelinedb.com/index.html

PostgreSQL官方文档:https://www.postgresql.org/docs/11/index.html

上一篇 下一篇

猜你喜欢

热点阅读