理解CEP 之greedy

2020-06-11  本文已影响0人  sensenyou

字面意思为,匹配的事件重复的次数越多越好。文字描述不够直观,我们直接上代码。

package ceptest

import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy

import org.apache.flink.cep.scala.CEP

import org.apache.flink.cep.scala.pattern.Pattern

import org.apache.flink.streaming.api.scala._

object TestSkipStrategyextends App {

val env = StreamExecutionEnvironment.createLocalEnvironment()

// Create a DataStream from a list of elements

  val myInts: DataStream[String] =env.fromElements("a", "b", "c1", "c2", "c3", "d", "c4", "e", "c5", "d")

val pattern2 = Pattern.begin[String]("start", AfterMatchSkipStrategy.skipPastLastEvent()).where(_.startsWith("c")).oneOrMore.greedy

.followedBy("middle").where(_.startsWith("d"))

val patternStream = CEP.pattern(myInts, pattern2)

patternStream.select(patternSelectFun => {

val start = patternSelectFun.get("start")

val middle = patternSelectFun.get("middle")

println(start.mkString("->") +"->" + middle.mkString("->"))

})

env.execute("test cep")

}

上面代码的输出结果为:

结果输出

可以看到,只把最长的匹配结果进行了输出。怎么做到这一点呢!!!

AfterMatchSkipStrategy.skipPastLastEvent()和greedy结合使用,完美解决。

结果输出中,第二个匹配事件的输出也是很值得注意的。当我们没有设定连续性时,默认是relaxed contiguity。官网说明如下:

For looping patterns (e.g. oneOrMore() and times()) the default is relaxed contiguity

上一篇下一篇

猜你喜欢

热点阅读