动态Flink CEP规则构想

2020-04-03  本文已影响0人  和平菌

场景
我们在写CEP规则的时候动态的逻辑大致可以分为三个部分:
1、通过条件过滤数据
2、事件链(Pattern)
3、每条事件匹配的条件(Condition)
所以可以构想开发一个通用的Job,然后通过一个配置文件来达到规则的触发。

设计
1、首先为了方便处理数据,我们先把数据都转成JSONObject,方便后续直接使用
2、所有涉及到条件判断的部分,我们都可以使用Jexl来实现(关于jexl的使用我已经有过介绍了)
3、整个事件链的描述需要定义一套DSL语言然后来解析(前端可以实现可视化的配置)

实现
1、条件的过滤

public class RuleFilter extends RichFilterFunction<JSONObject> {

    private Jexl jexl;

    /**
     * 规则表达式
     */
    private String express;

    public RuleFilter(String express) {
        this.express = express;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        jexl = new Jexl();
    }

    @Override
    public boolean filter(JSONObject jsonObj) throws Exception {
        if(jsonObj == null){
            return false;
        }
        return jexl.evaluateBoolean(express, jsonObj);
    }
}

这里的Jexl是我对jexl的封装

2、事件条件的判断

public class RuleCondition extends SimpleCondition<JSONObject> {

    private String express;

    private Jexl jexl;

    public RuleCondition(String express) {
        this.express = express;
        this.jexl = new Jexl();
    }

    @Override
    public boolean filter(JSONObject jsonObject) throws Exception {
        if(jsonObject == null){
            return false;
        }
        return jexl.evaluateBoolean(express, jsonObject);
    }
}

提示:
功能FLINK CEP SQL本事是实现的,这里主要是探讨Flink SQL大致实现的过程

上一篇下一篇

猜你喜欢

热点阅读