阿里云-流计算典型示例场景

2020-03-12  本文已影响0人  fat32jin

示例原地址:https://help.aliyun.com/document_detail/65839.html?spm=a2c4g.11186623.6.788.619a7fd2VLJkh4

一、电商实时PVUV

- 源表结构

表 1. 日志源表

字段名 数据类型 详情
account_id VARCHAR 用户ID
client_ip VARCHAR 客户端IP
client_info VACHAR 设备机型信息
platform VARHCAR 系统版本信息
imei VARCHAR 设备唯一标识
version BIGINT 版本号
action BIGINT 页面跳转描述
gpm VARCHAR 埋点链路
c_time VARCHAR 请求时间
target_type VARCHAR 目标类型
target_id VARCHAR 目标ID
udata VARCHAR 扩展信息
session_id VARHCAR 会话ID
product_id_chain VARHCAR 商品ID串
cart_product_id_chain VARCHAR 加购商品ID
tag VARCHAR 特殊标记
position VARCHAR 位置信息
network VARCHAR 网络使用情况
p_dt VARCHAR 时间分区
p_platform VARCHAR 系统版本信息

表 2. 数据库RDS结果表

字段名 数据类型 详情
summary_date BIGINT 统计日期
summary_min VARCHAR 统计分钟
pv BIGINT 单击量
uv BIGINT 访客量 说明: 一天内同个访客多次访问仅计算一个UV。
currenttime TIMESTAMP 当前时间
--数据的订单源表

CREATE TABLE source_ods_fact_log_track_action (
    account_id                        VARCHAR,--用户ID
    client_ip                         VARCHAR,--客户端IP
    client_info                       VARCHAR,--设备机型信息
    platform                          VARCHAR,--系统版本信息
    imei                              VARCHAR,--设备唯一标识
    `version`                         VARCHAR,--版本号
    `action`                          VARCHAR,--页面跳转描述
    gpm                               VARCHAR,--埋点链路
    c_time                            VARCHAR,--请求时间
    target_type                       VARCHAR,--目标类型
    target_id                         VARCHAR,--目标ID
    udata                             VARCHAR,--扩展信息,JSON格式
    session_id                        VARCHAR,--会话ID
    product_id_chain                  VARCHAR,--商品ID串
    cart_product_id_chain             VARCHAR,--加购商品ID
    tag                               VARCHAR,--特殊标记
    `position`                        VARCHAR,--位置信息
    network                           VARCHAR,--网络使用情况
    p_dt                              VARCHAR,--时间分区天
    p_platform                        VARCHAR--系统版本信息


) WITH (
    type='datahub',
      endPoint='yourEndpointURL',
    project='yourProjectName',
    topic='yourTopicName',
    accessId='yourAccessId',
    accessKey='yourAccessSecret',
    batchReadSize='1000'
);

CREATE TABLE result_cps_total_summary_pvuv_min (
    summary_date              bigint,--统计日期
    summary_min               varchar,--统计分钟
    pv                        bigint,--单击量
    uv                        bigint,--一天内同个访客多次访问仅计算一个UV
    currenttime               timestamp,--当前时间
    primary key (summary_date,summary_min)
) WITH (
    type= 'rds',
    url = 'yourRDSDatabaseURL',
    userName = 'yourDatabaseUserName',
    password = 'yourDatabasePassword',
    tableName = 'yourTableName'
);

难点解析

为了方便理解结构化代码和代码维护,我们推荐使用View(数据视图概念)把业务逻辑差分成二个模块。

p_dt pv uv max(c_time)
2017-12-12 1000 100 2017-12-12 9:00:00
2017-12-12 1500 120 2017-12-12 9:01:00
2017-12-12 2200 200 2017-12-12 9:02:00
2017-12-12 3300 320 2017-12-12 9:03:00

模块二

INSERT into  result_cps_total_summary_pvuv_min
select 
a.summary_date,--时间分区,天为单位
cast(DATE_FORMAT(c_time,'HH:mm')  as varchar) as summary_min,
--取出小时分钟级别的时间
a.pv,
a.uv,
CURRENT_TIMESTAMP  as currenttime--当前时间
from result_cps_total_summary_pvuv_min_01 AS a                          

把模块一的数据精确到小时分钟级别取出,最后根据数据得出PV、UV的增长曲线。如图所示。

image.png

二、电商订单地域统计

image.png

SQL语句:

订单

CREATE TABLE source_order (
    id VARCHAR,-- 订单ID
    seller_id VARCHAR, --卖家ID
    account_id VARCHAR,--买家ID
    receive_address_id VARCHAR,--收货地址ID
    total_price VARCHAR,--订单金额
    pay_time VARCHAR --订单支付时间
) WITH (
    type='datahub',
    endPoint='http://dh-cn-hangzhou.aliyun-inc.com',
    project='yourProjectName',--您的project
    topic='yourTopicName',--您的topic
    roleArn='yourRoleArn',--您的roleArn
    batchReadSize='500'
);
订单地址
CREATE TABLE source_order_receive_address ( 
     id VARCHAR,--收货地址ID 
     full_name VARCHAR,--收货人全名 
     mobile_number VARCHAR,--收货人手机号 
     detail_address VARCHAR,--收货详细地址 
     province VARCHAR,--收货省份 
     city_id VARCHAR,--收货城市 
     create_time VARCHAR --创建时间 
 ) WITH ( 
     type='datahub', 
     endPoint='http://dh-cn-hangzhou.aliyun-inc.com', 
     project='yourProjectName',--您的project 
     topic='yourTopicName',--您的topic 
     roleArn='yourRoleArn',--您的roleArn 
     batchReadSize='500' 
 );
                                
城市表
CREATE TABLE dim_city ( 
     city_id varchar, 
     city_name varchar,--城市名 
     province_id varchar,--所属省份ID 
     zip_code varchar,--邮编 
     lng varchar,--经度 
     lat varchar,--纬度 
  PRIMARY KEY (city_id), 
  PERIOD FOR SYSTEM_TIME --定义为维表 
 ) WITH ( 
     type= 'rds', 
     url = 'yourDatabaseURL',--您的数据库url 
     tableName = 'yourTableName',--您的表名 
     userName = 'yourDatabaseName',--您的用户名 
     password = 'yourDatabasePassword'--您的密码 
 );
按日统计不同地域订单(总销售额)的分布。
CREATE TABLE result_order_city_distribution ( 
     summary_date varchar,--统计日期 
     city_id bigint,--城市ID 
     city_name varchar,--城市名 
     province_id bigint,--所属省份ID 
     gmv double,--总销售额 
     lng varchar,--经度 
     lat varchar,--纬度 
     primary key (summary_date,city_id) 
    ) WITH ( 
        type= 'rds', 
        url = 'yourDatabaseURL',--您的数据库url 
        tableName = 'yourTableName',--您的表名 
        userName = 'yourDatabaseName',--您的用户名 
        password = 'yourDatabasePassword'--您的密码 
    );
编辑业务逻辑
 insert into result_order_city_distribution 
 select 
 d.summary_date 
 ,cast(d.city_id as BIGINT) 
 ,e.city_name 
 ,cast(e.province_id as BIGINT) 
 ,d.gmv 
 ,e.lng 
 ,e.lat 
 from 
 ( 
         select 
         DISTINCT 
         DATE_FORMAT(a.pay_time,'yyyyMMdd') as summary_date 
         ,b.city_id as city_id 
         ,round(sum(cast(a.total_price as double)),2) as gmv 
         from source_order as a 
         join source_order_receive_address as b on a.receive_address_id =b.id 
         group by DATE_FORMAT(a.pay_time,'yyyyMMdd'),b.city_id 
         --双流join,并根据日期和城市ID得到销售额分布 
 )d join dim_city FOR SYSTEM_TIME AS OF PROCTIME() as e on d.city_id = e.city_id 
 -- join维表,补齐城市地理信息,得到最终结果 

三、工业互联网传感器统计

业务描述
该工业客户拥有1千多台设备,分布在不同城市的多个厂区,每个设备上有10个不同种类传感器,这些传感器,大概每5秒采集并上传一份数据到日志服务(Log/SLS),每个采集点格式如下。

s_id s_value s_ts
传感器ID 传感器当前值 发送时间
同时,上述传感器分布在多个设备、多个厂区,用户在RDS还记录如下传感器、设备、厂区的分布维表,如下:

s_id s_type device_id factory_id
传感器ID 传感器监控类型 设备ID 厂区ID

上述信息存放在RDS上,用户希望传感器上传的数据能够和上述数据关联,并将传感器数据按照设备归类每1分钟打平为一张宽表,如下:

ts device_id factory_id device_temp device_pres
时间 设备ID 工厂ID 设备温度 设备压力

为了简化不必要的逻辑,我们假定仅有两种类型的监控传感器,即温度和压力,以方便后续的计算,后续计算逻辑如下:

筛选指定温度大于80的设备,并向下游触发告警。用户选择使用MQ作为消息触发源,也就是实时计算将温度大于80的设备过滤并投递给MQ,触发下游的用户定义的告警系统。

将数据写出到在线OLAP系统中,这里用户选择了阿里云hybriddb for mysql (原petadata)。下游用户开发一整套BI系统对接PetaData进行多维度展示。

经过传感器上传的数据进入Log,当行数据格式如下:

{
"sid": "t_xxsfdsad",
"s_value": "85.5",
"s_ts": "1515228763"
}
定义Log源表为s_sensor_data,结构如下:

CREATE TABLE s_sensor_data (
    s_id    VARCHAR,
    s_value VARCHAR,
    s_ts    VARCHAR,
    ts      AS CAST(FROM_UNIXTIME(CAST(s_ts AS BIGINT)) AS TIMESTAMP),
    WATERMARK FOR ts AS withOffset(ts, 10000)
) WITH (
    TYPE='sls',
    endPoint ='http://cn-hangzhou-corp.sls.aliyuncs.com',
    accessId ='xxxxxxxxxxx',
    accessKey ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    project ='ali-cloud-streamtest',
    logStore ='stream-test',
);

定义传感器和设备关联RDS维表为d_sensor_device_data,结构如下:
``` sql
CREATE TABLE d_sensor_device_data (
    s_id    VARCHAR,
    s_type  VARCHAR,
    device_id BIGINT,
    factory_id BIGINT,
    PERIOD FOR SYSTEM_TIME,
    PRIMARY KEY(s_id)
) WITH (
    TYPE='RDS',
    url='',
    tableName='test4',
    userName='test',
    password='XXXXXX'
);

定义触发告警逻辑MQ表为r_monitor_data, 结构如下:

CREATE TABLE r_monitor_data (
    ts  VARCHAR,
    device_id   BIGINT,
    factory_id  BIGINT,
    device_TEMP DOUBLE,
    device_PRES DOUBLE
) WITH (
    TYPE='MQ'
);

定义存储结果数据的HybridDB表定义为r_device_data,结构如下:

CREATE TABLE r_device_data (
    ts  VARCHAR,
    device_id BIGINT,
    factory_id BIGINT,
    device_temp DOUBLE,
    device_pres DOUBLE,
    PRIMARY KEY(ts, device_id)
) WITH (
    TYPE='HybridDB'
);

先考虑将传感器数据按分钟级别进行汇总,打平为一个宽表。为了更加结构化代码方便后续代码维护,使用View:

--先获取每个传感器对应的设备、厂区
CREATE VIEW v_sensor_device_data
AS
SELECT
    s.ts,
    s.s_id,
    s.s_value,
    d.s_type,
    d.device_id,
    d.factory_id
FROM
    s_sensor_data s
JOIN
    d_sensor_device_data FOR SYSTEM_TIME AS OF PROCTIME() as d
ON
    s.s_id = d.s_id;

--打平为一张宽表。
CREATE VIEW v_device_data
AS
SELECT
    --使用滚窗的起始时间作为该条记录的时间
    CAST(TUMBLE_START(v.ts, INTERVAL '1' MINUTE) AS VARCHAR) as ts,
    v.device_id,
    v.factory_id,
    CAST(SUM(IF(v.s_type = 'TEMP', v.s_value, 0)) AS DOUBLE)/CAST(SUM(IF(v.s_type = 'TEMP', 1, 0)) AS DOUBLE) device_temp, --这里用于计算这一分钟的温度平均值
    CAST(SUM(IF(v.s_type = 'PRES', v.s_value, 0)) AS DOUBLE)/CAST(SUM(IF(v.s_type = 'PRES', 1, 0)) AS DOUBLE) device_pres --这里用于计算这一分钟的压力平均值
FROM
    v_sensor_device_data v
GROUP BY
    TUMBLE(v.ts, INTERVAL '1' MINUTE), v.device_id, v.factory_id;
上述是核心计算逻辑,将这一分钟内分别统计关于温度和压力的平均值,作为这一分钟的温度值、压力值。由于使用的是Tumbling Window,也就意味着数据将在每分钟结束时候产出一份。接下来就将数据过滤写出到MQ和HybridDB,如下:

--过滤温度大于80摄氏度的传感器,并写出到MQ触发告警。
INSERT INTO r_monitor_data
SELECT
    ts,
    device_id,
    factory_id,
    device_temp,
    device_pres
FROM
    v_device_data
WHERE
    device_temp > 80.0;
    
--将数据写出到HybridDB,用于后续的分析。
INSERT INTO r_device_data
SELECT
    ts,
    device_id,
    factory_id,
    device_temp,
    device_pres
FROM
    v_device_data;  
上一篇 下一篇

猜你喜欢

热点阅读