DataAn

Flink CEP 新特性进展与在实时风控场景的落地

2023-02-13  本文已影响0人  Flink中文社区

摘要:本文整理自阿里云开发工程师耿飙&阿里云开发工程师胡俊涛,在 FFA 实时风控专场的分享。本篇内容主要分为四个部分:

  1. Flink CEP 介绍&新功能解读

  2. 动态多规则支持与 Demo

  3. Flink CEP SQL 语法增强

  4. 未来规划

■ 分享中的动态 CEP 和 CEP SQL 新功能目前已在阿里云实时计算 Flink 版上线支持。

点击查看直播回放 & 演讲PPT

一、Flink CEP 介绍&新功能解读

1.1 什么是 Flink CEP

1

CEP 是复杂事件处理 Complex Event Processing 的缩写,而 Flink CEP 则是基于 Flink 实现的复杂事件处理库,它可以识别出数据流中符合特定模式(Pattern)的事件序列,并允许用户作出针对性处理。

下面我们举个例子,如上图所示,假设我们对模式 A、B、B、C 感兴趣,它代表我们想要找到这样的事件序列:A 类事件发生后,发生了两次 B 类事件,又发生一次 C 类事件。注意,这里我们并不要求事件之间是严格连续的。

当我们使用 Flink CEP 开发了相关代码并跑起作业后,遇到 d1、a1、b1、b2、d2、c1 的事件流,Flink CEP 就能找到其中的 a1、b1、b2、c1 这一次匹配,之后用户就可以在作业中针对这次匹配做出处理,比如发送报警到下游系统等。

1.2 Flink CEP 应用场景

2

在实际场景中,Flink CEP 基于 Flink 的分布式特性、毫秒级处理延迟以及自身丰富的规则表达能力有非常多的应用。我们这里展示三个典型场景:

1.3 Flink CEP 在 1.16 的改进

3

在 1.16 版本中,Flink CEP 主要包含四个改进。

接下来举一个简单的例子来演示 1.16 的新特性给用户带来的好处。

4

在营销场景中,我们希望用户在领取品类优惠券,并添加对应品类商品后,马上下单付款。如果没有付款,我们会采取一些针对性的措施。把刚才的描述细化成一个具体的营销场景,也就是寻找大促当天在领取优惠券后的五分钟内,向购物车中添加了商品,但最终没有结账付款的用户。找到这些用户就可以让下游业务方进行用户分析,或者采取营销措施(例如实时发送提醒消息等)。

针对该场景,1.16 后我们就可以写出上图中的 Pattern。首先起始判断的条件是领取了优惠券,具体判断优惠券领取的逻辑,我们写在 StartCondition 对应的代码中。中间的 Pattern 是 addItem,它对应着添加商品到购物车,具体的判断逻辑我们写在 MiddleCondition 代码中。

注意,这里我们在相邻的子 Pattern 之间定义了 Within 时间窗口,类型为 REVIOUS_AND_CURRENT,它表示只有在领取优惠券事件发生后的五分钟内,发生的添加商品事件,才会被纳入这次模式匹配的考虑中。

最后以 notFollowedBy 结尾,后面是付款 Pattern,并且定义整个付款 Pattern 的时间窗口是一天。可以看到整个 Flink CEP 的 Pattern 写起来更轻松,表达能力也更强。

二、动态多规则的设计与云上实践

2.1 动态规则支持:背景

5

在介绍我们为什么需要动态规则更新前,先看一下右边的图,明确一下规则究竟包含哪些要素。我们认为 Flink CEP 中的规则(即 Pattern)是由阈值、条件、事实三部分组成的。下面我们以“五分钟内通过广告链接访问某商品超过五次,但最终没有购买”为例来介绍这三个要素。

阈值指的是超过五次中的“五”;事实指的是规则所针对的动作,比如通过广告链接访问某商品等;而条件则是用来描述如何过滤符合要求的动作。比如超过五次中的“超过”。

明晰规则的组成要素后,我们也更容易理解为什么需要支持规则动态更新。在实际生产中,频繁变化的现实场景要求我们对规则的内容,进行修改或者添加。

比如有一个 CEP 作业会在某个用户在一分钟内连续进行某操作超过 10 次后将其认为是风险用户。但在流量暴增或者举行某些活动的时候,这个阈值被改为 20 或者 30 次才更合适。现有的条件下想要更新规则,我们只能重新编写 Java 代码,再重启作业来使最新规则生效。

这样做时间成本高、延迟敏感的作业很难接受,除此之外,如果规则的时间窗口较长,状态又比较大的话,重启作业的代价会更高,因此我们需要支持动态规则更新。

要做到这一点,我们有两个关键问题需要解决。

第一,如何让 Flink 作业不停机加载新规则。第二,如何解决规则(Pattern)的序列化与反序列化。第二个问题本质上是由第一个问题衍生而来的。如果想让作业不停机加载,作业就必须从某个地方拿到我们传给它的 Pattern,并生成对应的 Pattern 对象在作业中使用。

针对上述两个问题,有一些现有的解决方案,比如通过修改 CepOperator 添加注入规则的接口,来实现不停机加载,以及基于 Groovy 引擎动态生成 Pattern 对象,解决序列化问题。但我们也发现,这两个方案其实都有一些不足。

比如第一个方案,通常情况下,规则都会存储在数据库中,而典型的对 CepOperator 的修改,则是让 CepOperator 直接和数据库交互,拉取最新规则。这样当 CEP 作业并发较多的时候,每个 sub_task 都要去连接数据库,这会给数据库带来额外的压力,并且更大的问题是,不同的 sub_task 拉取到的规则一致性难以保证。除此之外,这种修改通常只支持修改单条规则,不容易拓展到多规则的场景。

对于第二个方案,使用 Groovy 引擎动态生成 Pattern 对象也有自己的缺点。比如它的表达能力有限,一般只能结合 Aviator 动态修改阈值,很难改变规则整体的逻辑。并且 Groovy 是一个较重的引擎,它生成规则的时间也相对较长。

2.2 动态规则支持:设计

6

基于以上提到的背景和问题,阿里的同学在社区内提出了 FLIP-200,并在阿里内部按照 FLIP-200 实现了一版 CEP 中动态多规则的支持。下面我将详细介绍我们是如何实现的,以及如何解决刚才提到的那些问题。

首先我们新增了 PatternProcessor 接口,用于完整的定义 CEP 中的一条规则。PatternProcessor 包含 getId,getVersion 等用于获取该 Pattern 唯一标识符的方法;getTimestamp 等用于获得时间戳,进行调度的方法;getPattern 对象用于拿到 PatternProcessor 所内嵌的规则;PatternProcessorFunction 用于描述如何处理找到的匹配。除此之外,为了功能的完整性,我们还添加了 PatternProcessorDiscoverer 和 PatternProcessorManager,用于描述如何发现和管理 Processor。

7

下面介绍一下动态规则支持的整体架构。首先要提一下 Flink 的 OperatorCoordinator 机制,顾名思义它负责协调 Flink 作业中的各个 operator。OperatorCoordinator 自身运行在 JobManager 中,可以给 TM 的 Operator 发送事件,之前主要在 Source 和 Sink 中使用,用于发现和分配读写的 Workload。

在 CEP 中我们也复用了这一机制,实现了 DynamicCEPOperatorCoordinator,它是 JobManager 中运行的线程,负责调用 PatternProcessorDiscoverer 接口拿到最新的 Pattern。

上图左侧展示的是从数据库中读取序列化后的 PatternProcessor 的过程。可以看到 Operator 拿到 PatternProcessor 后,会发送给和它关联的 DynamicCEPOperator。DynamicCEPOperator 接收到发送的事件并进行解析与反序列化,最终生成要使用的 PatternProcessor 并构造对应的 NFA,用于处理上游发送的事件并输出到下游。

另外注意一下,这里允许一个 CepOperator 里有多个 NFA 对应多个 PatternProcessor,这样可以比较好的支持多规则。

基于这样的方式,我们就可以做到不停机的更新规则内容,且只有 OperatorCoordinator 会和外部规则数据库交互,可以有效减少对数据库的访问,并保证了各个下游 sub_task 中使用规则的一致性。

2.3 动态规则支持:(de)serialization

8

接下来我们来思考下 Pattern 的抽象。Pattern 本质上是描述了规则匹配时用到的 NFA 的状态转换图,即根据输入事件如何从一个状态转移到另一个状态,直到终态为止。

有了这样的观察后,我们就可以稍微做一些简化。比如将一个复合 Pattern 看成一个图,节点是每个子 Pattern,边则对应事件选择策略,即如何从一个子 Pattern 的匹配转移到另一个子 Pattern 的匹配。而每个图也可以看作是一个更大的图的子节点,这样我们就允许了模式的嵌套。

那么我们该如何描述这个图呢?我们定义的格式有如下几个设计原则。

根据这些原则,最终我们选择了基于 JSON 定义一套描述 Pattern 的规范。下面用一个简单的例子来展示我们定义的 JSON 格式。

9

上图左侧是我们用 Java API 定义的示例 Pattern,当满足 StartCondition 的事件出现大于等于三次之后,如果跟着一个满足 EndtCondition 的事件,那么我们就认为这是一个匹配。

可以注意到这里的 Pattern 包含两个子 Pattern,第一个 Pattern 对应 Star Pattern,第二个 Pattern 对应 End Pattern,逻辑上也存在两条边。由于 StartCondition 包含 timesOrMore 的声明,所以它有一条指向自己的边,另外也有一条从 StartCondition 指向 EndCondition 的边。

上图右侧就是用我们定义的 JSON 格式来描述 Java Pattern 得到的结果。我们注意这里有几个关键字段。

第一个是 node 字段,它是一个数组,包含每个子 Pattern 的完整描述,比如这里我们用 times 字段表示这个子 Pattern 对应的 Condition,要被满足大于等于三次。第二个是 edges 字段,它用于记录边的信息。

整个 JSON 格式完整的定义,可以参考阿里云的官方文档。

2.4 动态规则支持:拓展 Condition

10

接下来我们介绍一下我们是如何支持动态修改 Condition 中使用的阈值。和业界典型的实践一样,我们基于 Aviator 表达式引擎定义了 AviatorCondition。在 AviatorCondition 的构造函数中,根据输入的表达式字符串生成 AviatorExpression,然后在 filter 方法中通过反射来解析传入的事件字段和阈值,执行 AviatorExpression,最后返回 true or false 作为 filter 这个方法的返回结果。

举一个简单的例子,假设有一个叫 Event 的类,它有两个字段 price 和 action。那么我们就可以构造一个这样的 AviatorCondition,它的参数是一个表达式字符串,这个字符串里描述了对 Event 中事件字段的取值要求。比如我们要求 action==1&&price>20。如果我们想要更新阈值,就直接修改表达式,变成 action==0&&price>50。

注意这个字符串是传入的参数,它也可以在我们刚才介绍的 JSON 格式中定义和描述,所以我们也可以直接编辑数据库中的字段进行阈值的动态更新。

2.5 多规则支持

11

多规则是指在同一输入流上运用多条规则。按照开源 Flink CEP 的方案,我们要想在一个 Flink 作业中做到这点,就需要定义多个 Pattern Stream,对应也会生成多个 CepOperator 和 NFA,这也意味着上游数据要复制多次,这显然带来了很多额外的开销。

所以我们进行了优化,允许一个 DynamicCEPOperator 在它里面构建多个 NFA,这样上游的数据只需要传递一次,避免了额外的数据拷贝。

2.6 动态 CEP Demo

接下来我们以广告投放中的实时反作弊场景来演示动态 CEP Demo。

首先为大家介绍一下 demo 所针对的场景,我们知道在电商平台投放广告时,广告主通常有预算限制。例如对于按点击次数计算费用的广告而言,如果有黑灰产构造虎假流量,攻击广告主,就会很快消耗掉正常广告主的预算,使得广告内容被提前下架。

在这种情况下,广告主的利益受到了损害,容易导致后续的投诉与纠纷。为了应对上述作弊场景,我们需要快速辨识出恶意流量,采取针对性措施。例如限制恶意用户、向广告主发送告警等来保护用户权益。同时考虑到可能有意外因素,例如达人推荐、热点事件引流等导致某一商品的流量骤变,我们也需要动态调整用于识别恶意流量的规则,避免损害正常用户的利益。

本 Demo 将为大家演示如何使用 Fink 动态 CEP 解决上述问题。

1

我们假设客户的行为日志会被存放入消息队列 Kafka 中,Fink CEP 作业会消费 Kafka 数据,同时会去轮询 RDS 数据库中的规则表,拉取策略人员添加到数据库的最新规则,并用最新规则去做事件匹配。针对匹配到的事件,Flink CEP 作业会发出告警或将相关信息写入其他数据存储中,整体数据链路如上图所示。

在一会儿的演示中,我们对用户的日志做了一些简化,日志中的 action 字段,它的值如果为 0,就代表点击事件,为 1 代表购买事件,为 2 代表分享事件。接下来为大家演示具体的操作步骤。

首先需要创建好 Flink 全托管实例、RDS MySQL 实例、消息队列 Kafka 实例。然后准备好数据库相关内容,在 RDS 控制台创建 database。

2

注意在配置好之后,我们要在数据库连接中设置白名单,来保证我们的 Flink 全托管实例能访问 RDS 数据库。

然后打开 RDS 的 SQL 编辑页面创建一张数据表,命名为 RDS demo,四个字段 id、version、pattern、function。id 和 version 用于标名唯一的版本和 id 信息,pattern 代表了序列化后的 Pattern Stream,function 用于指代要用的 PatternProcessor 的函数名。然后编写 Flink DataStream 作业并打包提交到 Flink 全托管实例中运行。

3

接下来为大家介绍 main 函数的大致流程以及部分关键实现。首先读取一些必要的参数用于构造 KafkaSource 以及 RDS 数据库的一些连接信息。然后对 Source 基于用户和商品的 ID 做 keyBy,方便后续进行 CEP 的匹配。

4

接下来介绍一下在动态 CEP 中引入的新接口 DynamicPatterns。它有四个参数,第一个用来指定输入事件流,第二个参数 PatternProcessorDiscovererFoctory 用来构造 PatternProcessorDiscoverer;第三个参数 TimeBehaviour 用来指定是按照 even time 处理事件还是按照 processing time 处理事件;第四个用来描述输出流的类型信息。

另外注意这里用的是 JDBCPeriodPatternProcessorDiscovererFactory,它会周期性地扫描指定的数据库,检测到更新后,会对应地更新 Flink CEP 作业中使用的 PatternProcessor。

5

完成作业的打包后,我们接下来把作业上传到 Flink 全托管中,然后指定了一些必要的参数,比如 KafkaBrockers 以及 RDS 的一些连接信息,然后点击上线,进入运维,启动作业。

6

接下来我们在 RDS 数据库中插入插入规则 1: “连续 3 条 action 为 0 的事件发生后,下一条事件的 action 仍非 1”,其业务含义为连续 3 次访问该产品但最后没有购买。

7

它的 JSON 序列化表现如上图。

8

然后将该条 JSON 数据插入到数据库中。

9

接下来我们去作业中查看一下 TaskManager 日志,可以看到已经插入了最新规则。

10

接下来我们尝试往 Kafka 中发送几条消息来验证 CEP 的匹配逻辑,这里直接发四条一样的消息。

11

接下来检测一下,TaskManager 中是否有相应的输出。可以看到(id=1, version=1)的规则的最新匹配,匹配的事件序列就是刚才发送的那四条事件。

12

然后我们来验证动态修改这个规则并插入新的规则。这里将出现次数改为大于等于五次,StartCondition 的判断条件也改为 action==0 或者 action==2,然后执行插入。同时我们插入第二条规则,它的 ID 为 2,版本为 1,内容和规则 1 的第 1 个版本完全一致,主要用来辅助展示对多规则的支持。

13

接下来可以在作业日志中查看到我们刚刚插的两个规则,然后用 Kafka 发送三条 action 为 0 的消息,一条 action 为 2 的消息,并将之前的四条消息再发一遍。

图片

接着我们查看作业的匹配结果,可以看到针对(id=1, version=2)的规则,作业匹配到 1 次“5 个 action 为 0 或 1 的事件+1 个 action 非 1 的事件”的序列后输出了匹配结果,代表动态修改的规则成功生效;而对于(id=1, version=2)的规则,CEP 作业匹配到 2 次“3 个 action 为 0 的事件+1 个 action 非 1 的事件”的序列后输出结果,代表新增的规则也在作业中被采用。

三、Flink CEP SQL 语法增强

3.1 Flink CEP SQL 简介

12

Flink CEP SQL 主要基于 SQL2016 标准中的行模式识别语句,将 Flink 流表,例如上图中的 csv_source 作为 MATCH_RECOGNIZE 语句的输入,使用非确定有穷状态机对流表中的时序数据进行匹配,最终对识别出特定模式的数据序列进行计算后重新输出为 Flink 流表,从而无缝对接 Flink SQL 生态。

其 MATCH_RECOGNIZE 语句主要包含以下几个部分:

13

对于上图中的源表,示例中的 CEP SQL 语句会输出四条结果,其中 Alice 用户识别到序列为 AAAB,如红色箭头所指。由于默认使用的 AFTER MATCH 策略为 SKIP TO NEXT ROW,结果表中会包含 AAAB 序列的两个子序列 AAB、AB 对应的输出。对于 Bob 用户则匹配到 AB 序列,如蓝色箭头所指。

3.2 Flink CEP SQL 语法增强

14

目前 Flink CEP 的主要工作集中在 Java API 上,但基于 Flink SQL 和其他 SQL 类 ETL 软件庞大的用户群和成熟的生态考虑,我们也尽可能在保持对 SQL 标准兼容的同时,持续完善和改进 Flink CEP SQL 的功能和使用体验。

在最近的工作中,Flink CEP SQL 主要在语法层面对以下三个功能进行了支持:

■ 01 输出带时间约束模式的匹配超时序列

15

在目前版本的 Flink CEP SQL 中可以通过 WITHIN 语句对模式的整体匹配时间进行约束。例如一个常见的应用场景是用户行为模式识别,从流量入口到最终完成用户价值转化的一系列流程中,我们希望整体流程周期在十分钟之内的高潜用户,则可以像上图中在 PATTERN 后使用 WITHIN INTERVAL 加时间参数来进行约束。

16

例如对于上图中的源表,前面的 MATCH_RECOGNIZE 语句示例将会匹配到 Alice 用户在十分钟之内完成了 ABC 操作。而 Bob 用户由于 C 操作距离 A 操作已经过去了 13 分钟,将会匹配失败。

17

但可能也存在这样的需求,对于这些流程周期超过十分钟或流程中断的用户,我们也希望能够识别出来,进一步去分析其超时或中断的原因。那么我们可以如上图中示例,在 ONE ROW PER MATCH 之后使用 SHOW TIMEOUT MATCHES 来声明输出匹配超时的序列。

在 Java API 中,我们使用 Output Tag 来将超时序列输出到侧流处理,而在 SQL 中,匹配超时序列和匹配成功序列会在同一张流表中,但对超时序列未匹配到的事件,在 MEASURES 中计算将会得到空值。上图结果表中 Bob 用户的 C 操作超时,因此得到 C 的映射操作结果也为空值。通过这些空值,我们可以将这些匹配超时序列从流表中分离出来,并且判断是在哪个步骤超时的。

■ 02 定义事件之间的连续性

18

在使用 Flink CEP Java API 的时候,我们可以通过函数很方便地定义事件之间的连续性,例如用 next()指定严格连续,模式中相邻的事件在数据流中必须紧接着出现,使用 followedBy()则可以指定松散连续,模式中相邻事件匹配时可以忽略一些不匹配的事件。

之前的 Flink CEP SQL 中只支持声明严格连续,即表中第一行的语法,现在每一个 Java API 中的连续性函数在 SQL 中都有了对应的表达方式。

例如 followedBy()对应的 SQL 语法,在 A 和 B 之间使用 SQL EXCLUDE 的语法,即{- X*?-},其中使用了一个未在 DEFINE 中定义的变量 X 来表示任意匹配,并使用 X*?表示非贪婪地匹配 0 至任意多个任意事件,其效果是 exclude 部分会连续匹配任何非 B 的事件,等效于 followedBy()的语义。在 followedBy() SQL 语法的基础上去掉 X 上的非贪婪修饰即为 followedByAny()的语义。对于 notNext(),则使用[^B] 的表达形式,表示 A 事件之后紧接着不能出现 B 事件。对于 notFollowedBy(),只需要将 followedBy()和 notNext()的 SQL 语法结合使用即可。

■ 03 定义循环模式中的连续性和贪婪性

19

对于一个循环模式,例如上表中的 A+,在之前的 Flink CEP SQL 中已经支持了贪婪性的声明,不使用任何符号为贪婪匹配,使用一个问号则为非贪婪。两者的区别是,例如上图示例中当 a3 可以同时匹配 A 条件或 C 条件,贪婪匹配会选择更长的序列,而非贪婪则会选择更短的。

现在我们在原有贪婪性的声明上新增了对连续性的声明,使用??表示松散连续且贪婪,???表示松散连续非贪婪。循环模式的松散连续可以认为是在循环模式中的事件之间使用 followedyBy 关系,例如 a1、a2 之间有非匹配的 b1 事件,在严格连续的情况下,a1 会无法匹配到循环模式 A 中,如表中(A+ C)得到的 a2 a3 c1 序列,而松散连续的情况下则可以跳过 b1 事件而形成更长的匹配序列,例如(A+?? C)得到的 a1 a2 a3 c1 序列。

四、未来规划

20

Flink CEP 未来工作的重点还是在动态 CEP 和 CEP SQL 上:

点击查看直播回放 & 演讲PPT<

上一篇下一篇

猜你喜欢

热点阅读