@IT·互联网KisFlow-基于Golang的流式计算框架实战

Golang框架实战-KisFlow流式计算框架(8)-KisF

2024-03-04  本文已影响0人  刘丹冰Aceld

Golang框架实战-KisFlow流式计算框架专栏

Golang框架实战-KisFlow流式计算框架(1)-概述
Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)
Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)
Golang框架实战-KisFlow流式计算框架(4)-数据流
Golang框架实战-KisFlow流式计算框架(5)-Function调度
Golang框架实战-KisFlow流式计算框架(6)-Connector
Golang框架实战-KisFlow流式计算框架(7)-配置导入与导出


7.1 Action Abort(终止流程)

KisFlow Action 是指在执行Function的时候,同时可以控制Flow的调度逻辑,KisFlow提供一些Action动作让开发者做选择,本节先介绍最简单的Action动作,Abort(终止当前Flow)。

我们最终的Abort的使用形式如下:

func AbortFuncHandler(ctx context.Context, flow kis.Flow) error {
    fmt.Println("---> Call AbortFuncHandler ----")

    for _, row := range flow.Input() {
        str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
        fmt.Println(str)
    }

    return flow.Next(kis.ActionAbort)  // 终止Flow
}

AbortFuncHandler()是一个Function 的业务回调方法,是由开发者自定义的,在执行完当前Funciton之后,正常的情况是继续执行下一个Funciton,但是如果传递flow.Next(kis.ActionAbort) 作为当前Funciton的返回值,那么则不会执行到下一个Funciton,而是直接终止当前Flow的调度计算流。

下面我们先来实现KisFlow的 Abort Action动作模式。

7.1.1 Abort接口定义

首先,先对Flow的Abort()接口做定义。

kis-flow/kis/flow.go

type Flow interface {
    // Run 调度Flow,依次调度Flow中的Function并且执行
    Run(ctx context.Context) error
    // Link 将Flow中的Function按照配置文件中的配置进行连接
    Link(fConf *config.KisFuncConfig, fParams config.FParam) error
    // CommitRow 提交Flow数据到即将执行的Function层
    CommitRow(row interface{}) error
    // Input 得到flow当前执行Function的输入源数据
    Input() common.KisRowArr
    // GetName 得到Flow的名称
    GetName() string
    // GetThisFunction 得到当前正在执行的Function
    GetThisFunction() Function
    // GetThisFuncConf 得到当前正在执行的Function的配置
    GetThisFuncConf() *config.KisFuncConfig
    // GetConnector 得到当前正在执行的Function的Connector
    GetConnector() (Connector, error)
    // GetConnConf 得到当前正在执行的Function的Connector的配置
    GetConnConf() (*config.KisConnConfig, error)
    // GetConfig 得到当前Flow的配置
    GetConfig() *config.KisFlowConfig
    // GetFuncConfigByName 得到当前Flow的配置
    GetFuncConfigByName(funcName string) *config.KisFuncConfig

    //  --- KisFlow Action ---
    // Next 当前Flow执行到的Function进入下一层Function所携带的Action动作
    Next(acts ...ActionFunc) error
}

这里面提供一个接口Next(acts ...ActionFunc) error,其中参数是一个可变参数,类型为ActionFunc,这个是我们给KisFlow定义的Action相关的方法。有关Action的定义模块如下:

7.1.2 Action模块定义

Action是用来在Flow执行过程中,通过Function来控制Flow执行特殊动作的行为配置模块,包括上面的Abort行为,Abort也属于其中一个Action。Action的模块定义如下,在kis-flow/kis/下创建action.go文件,实现:

kis-flow/kis/action.go

package kis

// Action KisFlow执行流程Actions
type Action struct {
    // Abort 终止Flow的执行
    Abort bool
}

// ActionFunc KisFlow Functional Option 类型
type ActionFunc func(ops *Action)

// LoadActions 加载Actions,依次执行ActionFunc操作函数
func LoadActions(acts []ActionFunc) Action {
    action := Action{}

    if acts == nil {
        return action
    }

    for _, act := range acts {
        act(&action)
    }

    return action
}

// ActionAbort 终止Flow的执行
func ActionAbort(action *Action) {
    action.Abort = true
}

首先,现在Action只有Abort一个行为,我们用bool类型来表示Abort是否为终止,true则为需要终止flow的调用。
其次,type ActionFunc func(ops *Action)这个函数原型为一个函数类型,函数的形参是传递进来一个Action{} 指针,而 func ActionAbort(action *Action)则是它的一个具体的函数,ActionAbort()的方法的目的就是将Action的Abort成员设置为true。

最后看func LoadActions(acts []ActionFunc) Action方法。这个形参是一个ActionFunc函数数组,LoadActions()则是创建一个新的Action{} ,然后依次执行[]ActionFunc的函数来改变Aciton{}的成员,最终将新的Action{}返回上层。

7.1.3 Next方法实现

接下来,我们需要给KisFlow模块实现这个接口,首先需要给KisFlow添加一个Action{}成员,表示每次执行完Function之后所携带的动作。

kis-flow/flow/kis_flow.go

// KisFlow 用于贯穿整条流式计算的上下文环境
type KisFlow struct {
    // 基础信息
    Id   string                // Flow的分布式实例ID(用于KisFlow内部区分不同实例)
    Name string                // Flow的可读名称
    Conf *config.KisFlowConfig // Flow配置策略

    // Function列表
    Funcs          map[string]kis.Function // 当前flow拥有的全部管理的全部Function对象, key: FunctionName
    FlowHead       kis.Function            // 当前Flow所拥有的Function列表表头
    FlowTail       kis.Function            // 当前Flow所拥有的Function列表表尾
    flock          sync.RWMutex            // 管理链表插入读写的锁
    ThisFunction   kis.Function            // Flow当前正在执行的KisFunction对象
    ThisFunctionId string                  // 当前执行到的Function ID
    PrevFunctionId string                  // 当前执行到的Function 上一层FunctionID

    // Function列表参数
    funcParams map[string]config.FParam // flow在当前Function的自定义固定配置参数,Key:function的实例KisID, value:FParam
    fplock     sync.RWMutex             // 管理funcParams的读写锁

    // 数据
    buffer common.KisRowArr  // 用来临时存放输入字节数据的内部Buf, 一条数据为interface{}, 多条数据为[]interface{} 也就是KisBatch
    data   common.KisDataMap // 流式计算各个层级的数据源
    inPut  common.KisRowArr  // 当前Function的计算输入数据

    // +++++++++++++++++++++
    
    // KisFlow Action
    action kis.Action        // 当前Flow所携带的Action动作
}

然后实现KisFlow的Next()接口,如下:

kis-flow/flow/kis_flow.go

// Next 当前Flow执行到的Function进入下一层Function所携带的Action动作
func (flow *KisFlow) Next(acts ...kis.ActionFunc) error {

    // 加载Function FaaS 传递的 Action动作
    flow.action = kis.LoadActions(acts)

    return nil
}

每次开发者在执行Function的自定义业务回调中,最后会调用flow.Next()来传递Action,所以Next(acts ...kis.ActionFunc) error就是讲传递的Action属性加载进来并且在flow.action保存。

7.1.4 Abort控制Flow流程

现在有个Abort来控制Flow流,那么我们需要给KisFlow添加一个成员来表示这个状态

kis-flow/flow/kis_flow.go

// KisFlow 用于贯穿整条流式计算的上下文环境
type KisFlow struct {
    // 基础信息
    Id   string                // Flow的分布式实例ID(用于KisFlow内部区分不同实例)
    Name string                // Flow的可读名称
    Conf *config.KisFlowConfig // Flow配置策略

    // Function列表
    Funcs          map[string]kis.Function // 当前flow拥有的全部管理的全部Function对象, key: FunctionName
    FlowHead       kis.Function            // 当前Flow所拥有的Function列表表头
    FlowTail       kis.Function            // 当前Flow所拥有的Function列表表尾
    flock          sync.RWMutex            // 管理链表插入读写的锁
    ThisFunction   kis.Function            // Flow当前正在执行的KisFunction对象
    ThisFunctionId string                  // 当前执行到的Function ID
    PrevFunctionId string                  // 当前执行到的Function 上一层FunctionID

    // Function列表参数
    funcParams map[string]config.FParam // flow在当前Function的自定义固定配置参数,Key:function的实例KisID, value:FParam
    fplock     sync.RWMutex             // 管理funcParams的读写锁

    // 数据
    buffer common.KisRowArr  // 用来临时存放输入字节数据的内部Buf, 一条数据为interface{}, 多条数据为[]interface{} 也就是KisBatch
    data   common.KisDataMap // 流式计算各个层级的数据源
    inPut  common.KisRowArr  // 当前Function的计算输入数据
    action kis.Action        // 当前Flow所携带的Action动作

    // +++++++++
    abort  bool              // 是否中断Flow
}

在每次执行到flow.Run()方法时,需要重置abort变量,并且在循环调度的时候加上对flow.abort的判断。

kis-flow/flow/kis_flow.go

// Run 启动KisFlow的流式计算, 从起始Function开始执行流
func (flow *KisFlow) Run(ctx context.Context) error {

    // +++++++++
    // 重置 abort
    flow.abort = false  //  每次进入调度,要重置abort状态

    // ... ...

    // ... ...

    //流式链式调用
    for fn != nil && flow.abort != true { // ++++ 如果设置abort则不进入下次循环调度

        // ... ...
        // ... ...
        
        if err := fn.Call(ctx, flow); err != nil {
            //Error
            return err
        } else {
            //Success

            // ... ...

            fn = fn.Next()
        }
    }

    return nil

这样在每次Call()调度到Funciton的自定方法时,如果return flow.Next(ActionAbort)就会对flow的Action状态进行改变,从而就控制了flow的流程终止。最后就是将Action的Abort状态传递给KisFlow的Abort状态。

既然有了Abort状态,那么我们可以通过给Flow执行过程中添加一个设定,如果当前的Function没有提交本层的结果数据,也就是flow.buffer为空,那么将不会进入下一层,在本层直接结束退出Flow的Run()调用。

kis-flow/flow/kis_flow_data.go

//commitCurData 提交Flow当前执行Function的结果数据
func (flow *KisFlow) commitCurData(ctx context.Context) error {

    // 判断本层计算是否有结果数据,如果没有则退出本次Flow Run循环
    if len(flow.buffer) == 0 {
        // ++++++++++++
        flow.abort = true
        return nil
    }

    // ... ...
    // ... ...

    return nil

7.1.5 Action捕获及处理

接下来来实现一个专门处理Action动作的方法,定义在kis-flow/flow/kis_flow_action.go文件中,如下:

kis-flow/flow/kis_flow_action.go

package flow

import (
    "context"
    "errors"
    "fmt"
    "kis-flow/kis"
)

// dealAction  处理Action,决定接下来Flow的流程走向
func (flow *KisFlow) dealAction(ctx context.Context, fn kis.Function) (kis.Function, error) {

    if err := flow.commitCurData(ctx); err != nil {
        return nil, err
    }

    // 更新上一层 FuncitonId 游标
    flow.PrevFunctionId = flow.ThisFunctionId
    fn = fn.Next()

    // Abort Action 强制终止
    if flow.action.Abort {
        flow.abort = true
    }

    // 清空Action
    flow.action = kis.Action{}

    return fn, nil
}

然后稍微改进下KisFlow的Run() 流程,将dealAction()方法嵌入进去。

kis-flow/flow/kis_flow.go

// Run 启动KisFlow的流式计算, 从起始Function开始执行流
func (flow *KisFlow) Run(ctx context.Context) error {

    var fn kis.Function

    fn = flow.FlowHead
    flow.abort = false

    if flow.Conf.Status == int(common.FlowDisable) {
        //flow被配置关闭
        return nil
    }

    // 因为此时还没有执行任何Function, 所以PrevFunctionId为FirstVirtual 因为没有上一层Function
    flow.PrevFunctionId = common.FunctionIdFirstVirtual

    // 提交数据流原始数据
    if err := flow.commitSrcData(ctx); err != nil {
        return err
    }

    //流式链式调用
    for fn != nil && flow.abort == false {

        // flow记录当前执行到的Function 标记
        fid := fn.GetId()
        flow.ThisFunction = fn
        flow.ThisFunctionId = fid

        // 得到当前Function要处理与的源数据
        if inputData, err := flow.getCurData(); err != nil {
            log.Logger().ErrorFX(ctx, "flow.Run(): getCurData err = %s\n", err.Error())
            return err
        } else {
            flow.inPut = inputData
        }

        if err := fn.Call(ctx, flow); err != nil {
            //Error
            return err
        } else {
            //Success
            
            // +++++++++++++++++++++++++++++++
            fn, err = flow.dealAction(ctx, fn)
            if err != nil {
                return err
            }
            // +++++++++++++++++++++++++++++++
        }
    }

    return nil
}

7.1.6 Action Abort单元测试

首先我们新建一个Function业务,配置文件如下:

kis-flow/test/load_conf/func/func-AbortFunc.yml

kistype: func
fname: abortFunc
fmode: Calculate
source:
  name: 用户订单错误率
  must:
    - order_id
    - user_id

当前的Funciton的名称为abortFunc,然后实现其FaaS函数,如下:

kis-flow/test/faas/faas_abort.go

package faas

import (
    "context"
    "fmt"
    "kis-flow/kis"
)

// type FaaS func(context.Context, Flow) error

func AbortFuncHandler(ctx context.Context, flow kis.Flow) error {
    fmt.Println("---> Call AbortFuncHandler ----")

    for _, row := range flow.Input() {
        str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
        fmt.Println(str)
    }

    return flow.Next(kis.ActionAbort)
}

这个Function就会最终调用flow.Next(kis.ActionAbort)来终止Flow,接下来我们新建一个Flow,将上面的Function作为中间的Function,看检测是否会终止之后的Function被执行。
新建的flow的配置如下:

kis-flow/test/load_conf/flow/flow-FlowName2.yml

kistype: flow
status: 1
flow_name: flowName2
flows:
  - fname: funcName1
  - fname: abortFunc
  - fname: funcName3

当前Flow的名称为flowName2,当前的Flow有三个Function,其中funcNam1 和 funcName2我们之前都已经定义好了,abortFunc是我们新建的,并且在中间。如果abort功能满足,则funcName3将不会被调度。

接下来实现单元测试用例。

kis-flow/test/kis_action_test.go

package test

import (
    "context"
    "kis-flow/common"
    "kis-flow/file"
    "kis-flow/kis"
    "kis-flow/test/caas"
    "kis-flow/test/faas"
    "testing"
)

func TestActionAbort(t *testing.T) {
    ctx := context.Background()

    // 0. 注册Function 回调业务
    kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
    kis.Pool().FaaS("abortFunc", faas.AbortFuncHandler) // 添加abortFunc 业务
    kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)

    // 0. 注册ConnectorInit 和 Connector 回调业务
    kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
    kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)

    // 1. 加载配置文件并构建Flow
    if err := file.ConfigImportYaml("/Users/gopath/src/kis-flow/test/load_conf/"); err != nil {
        panic(err)
    }

    // 2. 获取Flow
    flow1 := kis.Pool().GetFlow("flowName2")

    // 3. 提交原始数据
    _ = flow1.CommitRow("This is Data1 from Test")
    _ = flow1.CommitRow("This is Data2 from Test")
    _ = flow1.CommitRow("This is Data3 from Test")

    // 4. 执行flow1
    if err := flow1.Run(ctx); err != nil {
        panic(err)
    }
}

其中下面的代码是初始化注册的代码,大家也可以写在其他文件中,这样就不需要每次都携带这部分代码了。

    // 0. 注册Function 回调业务
    kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
    kis.Pool().FaaS("abortFunc", faas.AbortFuncHandler) // 添加abortFunc 业务
    kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)

    // 0. 注册ConnectorInit 和 Connector 回调业务
    kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
    kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)

cd 到kis-flow/test/目录下执行如下指令:

go test -test.v -test.paniconexit0 -test.run  TestActionAbort

结果如下:

=== RUN   TestActionAbort
Add KisPool FuncName=funcName1
Add KisPool FuncName=abortFunc
Add KisPool FuncName=funcName3
Add KisPool CaaSInit CName=ConnName1
Add KisPool CaaS CName=ConnName1, FName=funcName2, Mode =Save
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2]}
Add FlowRouter FlowName=flowName1

Add FlowRouter FlowName=flowName2

context.Background
====> After CommitSrcData, flow_name = flowName2, flow_id = flow-b6b90eb4b7d7457fbf85b3299b625513
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]]

KisFunctionV, flow = &{Id:flow-b6b90eb4b7d7457fbf85b3299b625513 Name:flowName2 Conf:0xc000092cc0 Funcs:map[abortFunc:0xc000094d20 funcName1:0xc000094cc0 funcName3:0xc000094d80] FlowHead:0xc000094cc0 FlowTail:0xc000094d80 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000094cc0 ThisFunctionId:func-c435cf9f8e3346a1851f8c76375fce0f PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-7f5af1521fd64d08839d5bdd26de5254:map[] func-c435cf9f8e3346a1851f8c76375fce0f:map[] func-f0b80593fe2e4018a878f155b9c543b4:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] jumpFunc:NoJump abort:false nextOpt:<nil>}

---> Call funcName1Handler ----
In FuncName = funcName1, FuncId = func-c435cf9f8e3346a1851f8c76375fce0f, row = This is Data1 from Test
In FuncName = funcName1, FuncId = func-c435cf9f8e3346a1851f8c76375fce0f, row = This is Data2 from Test
In FuncName = funcName1, FuncId = func-c435cf9f8e3346a1851f8c76375fce0f, row = This is Data3 from Test
context.Background
 ====> After commitCurData, flow_name = flowName2, flow_id = flow-b6b90eb4b7d7457fbf85b3299b625513
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-c435cf9f8e3346a1851f8c76375fce0f:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]

KisFunctionC, flow = &{Id:flow-b6b90eb4b7d7457fbf85b3299b625513 Name:flowName2 Conf:0xc000092cc0 Funcs:map[abortFunc:0xc000094d20 funcName1:0xc000094cc0 funcName3:0xc000094d80] FlowHead:0xc000094cc0 FlowTail:0xc000094d80 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000094d20 ThisFunctionId:func-7f5af1521fd64d08839d5bdd26de5254 PrevFunctionId:func-c435cf9f8e3346a1851f8c76375fce0f funcParams:map[func-7f5af1521fd64d08839d5bdd26de5254:map[] func-c435cf9f8e3346a1851f8c76375fce0f:map[] func-f0b80593fe2e4018a878f155b9c543b4:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-c435cf9f8e3346a1851f8c76375fce0f:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] jumpFunc:NoJump abort:false nextOpt:<nil>}

---> Call AbortFuncHandler ----
In FuncName = abortFunc, FuncId = func-7f5af1521fd64d08839d5bdd26de5254, row = data from funcName[funcName1], index = 0
In FuncName = abortFunc, FuncId = func-7f5af1521fd64d08839d5bdd26de5254, row = data from funcName[funcName1], index = 1
In FuncName = abortFunc, FuncId = func-7f5af1521fd64d08839d5bdd26de5254, row = data from funcName[funcName1], index = 2
--- PASS: TestActionAbort (0.00s)
PASS
ok      kis-flow/test   0.487s

通过结果可以看到,在执行完 AbortFuncHandler 后,没有继续执行,而是退出了Flow的Run()方法。

7.2 Action DataReuse(复用上层数据)

Action DataReuse 为服用上层数据,含义为,当前的执行Function提交到下一层的结果将不被使用,而是直接将当前Function的上一层结果数据,复用到下一层,作为下一层Funciton的数据源。

下面来实现Action DataReuse功能。

7.2.1 DataReuse Action添加

在Action中添加DataReuse成员,是一个bool类型。

kis-flow/kis/action.go

// Action KisFlow执行流程Actions
type Action struct {
    // +++++++++++++
    // DataReuse 是否复用上层Function数据
    DataReuse bool

    // Abort 终止Flow的执行
    Abort bool
}


// ActionDataReuse Next复用上层Function数据Option
func ActionDataReuse(act *Action) {
    act.DataReuse = true
}

然后提供一个ActionFunc,命名为:ActionDataReuse,实现中为改变DataReuse状态为true。

7.2.2 复用上层数据到下层

这里需要再实现一个提交数据的方法,为如何提交复用数据,具体逻辑如下:

kis-flow/flow/kis_flow_data.go

// commitReuseData
func (flow *KisFlow) commitReuseData(ctx context.Context) error {

    // 判断上层是否有结果数据, 如果没有则退出本次Flow Run循环
    if len(flow.data[flow.PrevFunctionId]) == 0 {
        flow.abort = true
        return nil
    }

    // 本层结果数据等于上层结果数据(复用上层结果数据到本层)
    flow.data[flow.ThisFunctionId] = flow.data[flow.PrevFunctionId]

    // 清空缓冲Buf (如果是ReuseData选项,那么提交的全部数据,都将不会携带到下一层)
    flow.buffer = flow.buffer[0:0]

    log.Logger().DebugFX(ctx, " ====> After commitReuseData, flow_name = %s, flow_id = %s\nAll Level Data =\n %+v\n", flow.Name, flow.Id, flow.data)

    return nil
}

逻辑很简单,与commitCurData()不同的是,commitCurData()为将flow.buffer的数据提交到flow.data[flow.ThisFunctionId]中,而commitReuseData()为将上一层的结果数据提交到flow.data[flow.ThisFunctionId]中。

7.2.3 处理DataReuse Action动作

然后在dealAction()方法中添加对Action DataReuse的动作捕获,如下:

kis-flow/flow/kis_flow_action.go

// dealAction  处理Action,决定接下来Flow的流程走向
func (flow *KisFlow) dealAction(ctx context.Context, fn kis.Function) (kis.Function, error) {

    // ++++++++++++++++
    // DataReuse Action
    if flow.action.DataReuse {
        if err := flow.commitReuseData(ctx); err != nil {
            return nil, err
        }
    } else {
        if err := flow.commitCurData(ctx); err != nil {
            return nil, err
        }
    }
    

    // 更新上一层 FuncitonId 游标
    flow.PrevFunctionId = flow.ThisFunctionId
    fn = fn.Next()

    // Abort Action 强制终止
    if flow.action.Abort {
        flow.abort = true
    }

    // 清空Action
    flow.action = kis.Action{}

    return fn, nil
}

7.2.4 单元测试

下面来针对DataReuse做单元测试,首先创建一个名字为dataReuseFunc 的Funciton,先创建配置文件。

kis-flow/test/load_conf/func/func-dataReuseFunc.yml

kistype: func
fname: dataReuseFunc
fmode: Calculate
source:
  name: 用户订单错误率
  must:
    - order_id
    - user_id

同时新建一个Flow流,名称为flowName3,配置如下:

kis-flow/test/load_conf/flow/func-FlowName3.yml

kistype: flow
status: 1
flow_name: flowName3
flows:
  - fname: funcName1
  - fname: dataReuseFunc
  - fname: funcName3

针对dataReuseFunc的Function的逻辑业务,如下:

kis-flow/test/faas/faas_data_reuse.go

package faas

import (
    "context"
    "fmt"
    "kis-flow/kis"
)

// type FaaS func(context.Context, Flow) error

func DataReuseFuncHandler(ctx context.Context, flow kis.Flow) error {
    fmt.Println("---> Call DataReuseFuncHandler ----")

    for index, row := range flow.Input() {
        str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
        fmt.Println(str)

        // 计算结果数据
        resultStr := fmt.Sprintf("data from funcName[%s], index = %d", flow.GetThisFuncConf().FName, index)

        // 提交结果数据
        _ = flow.CommitRow(resultStr)
    }

    return flow.Next(kis.ActionDataReuse)
}

最后实现测试用例,如下:

kis-flow/test/kis_action_test.go

func TestActionDataReuse(t *testing.T) {
    ctx := context.Background()

    // 0. 注册Function 回调业务
    kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
    kis.Pool().FaaS("dataReuseFunc", faas.DataReuseFuncHandler) // 添加dataReuesFunc 业务
    kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)

    // 0. 注册ConnectorInit 和 Connector 回调业务
    kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
    kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)

    // 1. 加载配置文件并构建Flow
    if err := file.ConfigImportYaml("/Users/tal/gopath/src/kis-flow/test/load_conf/"); err != nil {
        panic(err)
    }

    // 2. 获取Flow
    flow1 := kis.Pool().GetFlow("flowName3")

    // 3. 提交原始数据
    _ = flow1.CommitRow("This is Data1 from Test")
    _ = flow1.CommitRow("This is Data2 from Test")
    _ = flow1.CommitRow("This is Data3 from Test")

    // 4. 执行flow1
    if err := flow1.Run(ctx); err != nil {
        panic(err)
    }
}

cd 到 kis-flow/test/下执行:

go test -test.v -test.paniconexit0 -test.run  TestActionDataReuse

结果是:

=== RUN   TestActionDataReuse
Add KisPool FuncName=funcName1
Add KisPool FuncName=dataReuseFunc
Add KisPool FuncName=funcName3
Add KisPool CaaSInit CName=ConnName1
Add KisPool CaaS CName=ConnName1, FName=funcName2, Mode =Save
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2]}
Add FlowRouter FlowName=flowName5
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2 funcName2]}
Add FlowRouter FlowName=flowName1
Add FlowRouter FlowName=flowName2
Add FlowRouter FlowName=flowName3
Add FlowRouter FlowName=flowName4
context.Background
====> After CommitSrcData, flow_name = flowName3, flow_id = flow-2c1a23d9587842bebaeee490319de81f
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]]

KisFunctionV, flow = &{Id:flow-2c1a23d9587842bebaeee490319de81f Name:flowName3 Conf:0xc000092dc0 Funcs:map[dataReuseFunc:0xc000095620 funcName1:0xc0000955c0 funcName3:0xc000095680] FlowHead:0xc0000955c0 FlowTail:0xc000095680 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc0000955c0 ThisFunctionId:func-7886178381634f05b302841141382e59 PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-7886178381634f05b302841141382e59:map[] func-cfe66e39aba54ff989d6764cc4edda20:map[] func-ef567879d0dd45b287ed709e549e9d32:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}

---> Call funcName1Handler ----
In FuncName = funcName1, FuncId = func-7886178381634f05b302841141382e59, row = This is Data1 from Test
In FuncName = funcName1, FuncId = func-7886178381634f05b302841141382e59, row = This is Data2 from Test
In FuncName = funcName1, FuncId = func-7886178381634f05b302841141382e59, row = This is Data3 from Test
context.Background
 ====> After commitCurData, flow_name = flowName3, flow_id = flow-2c1a23d9587842bebaeee490319de81f
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-7886178381634f05b302841141382e59:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]

KisFunctionC, flow = &{Id:flow-2c1a23d9587842bebaeee490319de81f Name:flowName3 Conf:0xc000092dc0 Funcs:map[dataReuseFunc:0xc000095620 funcName1:0xc0000955c0 funcName3:0xc000095680] FlowHead:0xc0000955c0 FlowTail:0xc000095680 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000095620 ThisFunctionId:func-ef567879d0dd45b287ed709e549e9d32 PrevFunctionId:func-7886178381634f05b302841141382e59 funcParams:map[func-7886178381634f05b302841141382e59:map[] func-cfe66e39aba54ff989d6764cc4edda20:map[] func-ef567879d0dd45b287ed709e549e9d32:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-7886178381634f05b302841141382e59:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}

---> Call DataReuseFuncHandler ----
In FuncName = dataReuseFunc, FuncId = func-ef567879d0dd45b287ed709e549e9d32, row = data from funcName[funcName1], index = 0
In FuncName = dataReuseFunc, FuncId = func-ef567879d0dd45b287ed709e549e9d32, row = data from funcName[funcName1], index = 1
In FuncName = dataReuseFunc, FuncId = func-ef567879d0dd45b287ed709e549e9d32, row = data from funcName[funcName1], index = 2
context.Background
 ====> After commitReuseData, flow_name = flowName3, flow_id = flow-2c1a23d9587842bebaeee490319de81f
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-7886178381634f05b302841141382e59:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] func-ef567879d0dd45b287ed709e549e9d32:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]

KisFunctionC, flow = &{Id:flow-2c1a23d9587842bebaeee490319de81f Name:flowName3 Conf:0xc000092dc0 Funcs:map[dataReuseFunc:0xc000095620 funcName1:0xc0000955c0 funcName3:0xc000095680] FlowHead:0xc0000955c0 FlowTail:0xc000095680 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000095680 ThisFunctionId:func-cfe66e39aba54ff989d6764cc4edda20 PrevFunctionId:func-ef567879d0dd45b287ed709e549e9d32 funcParams:map[func-7886178381634f05b302841141382e59:map[] func-cfe66e39aba54ff989d6764cc4edda20:map[] func-ef567879d0dd45b287ed709e549e9d32:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-7886178381634f05b302841141382e59:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] func-ef567879d0dd45b287ed709e549e9d32:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}

---> Call funcName3Handler ----
In FuncName = funcName3, FuncId = func-cfe66e39aba54ff989d6764cc4edda20, row = data from funcName[funcName1], index = 0
In FuncName = funcName3, FuncId = func-cfe66e39aba54ff989d6764cc4edda20, row = data from funcName[funcName1], index = 1
In FuncName = funcName3, FuncId = func-cfe66e39aba54ff989d6764cc4edda20, row = data from funcName[funcName1], index = 2
--- PASS: TestActionDataReuse (0.02s)
PASS
ok      kis-flow/test   0.523s

通过结果可以看出,在最后的funcName3Handler中得到的数据是funcName1传递下来的数据,中间的ReuseFunction将上层的数据复用到了下一层,变成了FuncName3的数据源。

7.3 Action ForceEntryNext(强制进入下一层)

7.3.1 ForceEntryNext Action属性

目前的KisFlow为,如果当前的Function没有commit数据(本层的结果数据),那么当前的Function结束后,将不会继续调度下一层Function。 但是有的Flow的流式计算可能需要继续向下执行,哪怕没有数据,所以这里可以通过ForceEntryNext这个动作来触发。
首先我们在Action中新增一个ForceEntryNext 属性。

kis-flow/kis/action.go

// Action KisFlow执行流程Actions
type Action struct {
    // DataReuse 是否复用上层Function数据
    DataReuse bool

    // 默认Next()为如果本层Function计算结果为0条数据,之后Function将不会继续执行
    // ForceEntryNext 为忽略上述默认规则,没有数据强制进入下一层Function
    ForceEntryNext bool

    // Abort 终止Flow的执行
    Abort bool
}

// ActionForceEntryNext 强制进入下一层
func ActionForceEntryNext(act *Action) {
    act.ForceEntryNext = true
}

且提供配置函数ActionForceEntryNext()来修改这个属性状态。

7.3.2 捕获Action

在捕获Action的dealAction()方法中,加上对这个状态的判断,如果被设置,则需要将flow.Abort状态改成false,flow将继续执行下一层。

kis-flow/flow/kis_flow_action.go

// dealAction  处理Action,决定接下来Flow的流程走向
func (flow *KisFlow) dealAction(ctx context.Context, fn kis.Function) (kis.Function, error) {

    // DataReuse Action
    if flow.action.DataReuse {
        if err := flow.commitReuseData(ctx); err != nil {
            return nil, err
        }
    } else {
        if err := flow.commitCurData(ctx); err != nil {
            return nil, err
        }
    }

    // ++++++++++++++++++++++++++++
    // ForceEntryNext Action
    if flow.action.ForceEntryNext {
        if err := flow.commitVoidData(ctx); err != nil {
            return nil, err
        }
        flow.abort = false
    }

    // 更新上一层 FuncitonId 游标
    flow.PrevFunctionId = flow.ThisFunctionId
    fn = fn.Next()

    // Abort Action 强制终止
    if flow.action.Abort {
        flow.abort = true
    }

    // 清空Action
    flow.action = kis.Action{}

    return fn, nil
}

这里有一个细节,我们需要调用一个方法commitVoidData(),即提交空数据,原因是,如果不提交空数据,那么flow.buffer依然为空,那么不会执行数据的提交动作,那么会导致flow.data[flow.ThisFunctionId]这条不存在,也就是key不存在,那么再执行到flow.getCurData()会出现找不到key的异常而panic。所以这里需要提交一个空的数据到flow.data[flow.ThisFunctionId]中。
具体的commitVoidData()实现如下:

kis-flow/flow/kis_flow_data.go

func (flow *KisFlow) commitVoidData(ctx context.Context) error {
    if len(flow.buffer) != 0 {
        return nil
    }

    // 制作空数据
    batch := make(common.KisRowArr, 0)

    // 将本层计算的缓冲数据提交到本层结果数据中
    flow.data[flow.ThisFunctionId] = batch

    log.Logger().DebugFX(ctx, " ====> After commitVoidData, flow_name = %s, flow_id = %s\nAll Level Data =\n %+v\n", flow.Name, flow.Id, flow.data)

    return nil
}

7.3.3 单元测试,不设置ForceEntryNext

首先,创建一个noResultFunc的Function配置,且实现相关的回调业务函数。

kis-flow/test/load_conf/func/func-NoResultFunc.yml

kistype: func
fname: noResultFunc
fmode: Calculate
source:
  name: 用户订单错误率
  must:
    - order_id
    - user_id

kis-flow/test/faas/faas_no_result.go

package faas

import (
    "context"
    "fmt"
    "kis-flow/kis"
)

// type FaaS func(context.Context, Flow) error

func NoResultFuncHandler(ctx context.Context, flow kis.Flow) error {
    fmt.Println("---> Call NoResultFuncHandler ----")

    for _, row := range flow.Input() {
        str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
        fmt.Println(str)
    }

    return flow.Next()
}

这里面在Function的最后,只调用flow.Next() 不传递任何Action动作。
然后新建一个FlowName4,配置如下:

kis-flow/test/load_conf/flow-FlowName4.yml

kistype: flow
status: 1
flow_name: flowName4
flows:
  - fname: funcName1
  - fname: noResultFunc
  - fname: funcName3

最后我们编写单元测试用例代码,将noResultFunc放在中间的部分。

kis-flow/test/kis_action_test.go

func TestActionForceEntry(t *testing.T) {
    ctx := context.Background()

    // 0. 注册Function 回调业务
    kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
    kis.Pool().FaaS("noResultFunc", faas.NoResultFuncHandler) // 添加noResultFunc 业务
    kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)

    // 0. 注册ConnectorInit 和 Connector 回调业务
    kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
    kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)

    // 1. 加载配置文件并构建Flow
    if err := file.ConfigImportYaml("/Users/tal/gopath/src/kis-flow/test/load_conf/"); err != nil {
        panic(err)
    }

    // 2. 获取Flow
    flow1 := kis.Pool().GetFlow("flowName4")

    // 3. 提交原始数据
    _ = flow1.CommitRow("This is Data1 from Test")
    _ = flow1.CommitRow("This is Data2 from Test")
    _ = flow1.CommitRow("This is Data3 from Test")

    // 4. 执行flow1
    if err := flow1.Run(ctx); err != nil {
        panic(err)
    }
}

cd到kis-flow/test/ 下执行:

go test -test.v -test.paniconexit0 -test.run  TestActionForceEntry

结果如下:

=== RUN   TestActionForceEntry
Add KisPool FuncName=funcName1
Add KisPool FuncName=noResultFunc
Add KisPool FuncName=funcName3
Add KisPool CaaSInit CName=ConnName1
Add KisPool CaaS CName=ConnName1, FName=funcName2, Mode =Save
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2]}
Add FlowRouter FlowName=flowName1
Add FlowRouter FlowName=flowName2
Add FlowRouter FlowName=flowName3
Add FlowRouter FlowName=flowName4
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2 funcName2]}
Add FlowRouter FlowName=flowName5
context.Background
====> After CommitSrcData, flow_name = flowName4, flow_id = flow-a496d02c79204e9a803fb5e1307523c9
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]]

KisFunctionV, flow = &{Id:flow-a496d02c79204e9a803fb5e1307523c9 Name:flowName4 Conf:0xc000152e40 Funcs:map[funcName1:0xc00011d560 funcName3:0xc00011d620 noResultFunc:0xc00011d5c0] FlowHead:0xc00011d560 FlowTail:0xc00011d620 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc00011d560 ThisFunctionId:func-4d113d6a8e744d30a906db310f2d7818 PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-47cb6f9ae464484aa779c18284035705:map[] func-4d113d6a8e744d30a906db310f2d7818:map[] func-70011c7ccecf46be91c6993d143639bb:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}

---> Call funcName1Handler ----
In FuncName = funcName1, FuncId = func-4d113d6a8e744d30a906db310f2d7818, row = This is Data1 from Test
In FuncName = funcName1, FuncId = func-4d113d6a8e744d30a906db310f2d7818, row = This is Data2 from Test
In FuncName = funcName1, FuncId = func-4d113d6a8e744d30a906db310f2d7818, row = This is Data3 from Test
context.Background
 ====> After commitCurData, flow_name = flowName4, flow_id = flow-a496d02c79204e9a803fb5e1307523c9
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-4d113d6a8e744d30a906db310f2d7818:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]

KisFunctionC, flow = &{Id:flow-a496d02c79204e9a803fb5e1307523c9 Name:flowName4 Conf:0xc000152e40 Funcs:map[funcName1:0xc00011d560 funcName3:0xc00011d620 noResultFunc:0xc00011d5c0] FlowHead:0xc00011d560 FlowTail:0xc00011d620 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc00011d5c0 ThisFunctionId:func-47cb6f9ae464484aa779c18284035705 PrevFunctionId:func-4d113d6a8e744d30a906db310f2d7818 funcParams:map[func-47cb6f9ae464484aa779c18284035705:map[] func-4d113d6a8e744d30a906db310f2d7818:map[] func-70011c7ccecf46be91c6993d143639bb:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-4d113d6a8e744d30a906db310f2d7818:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}

---> Call NoResultFuncHandler ----
In FuncName = noResultFunc, FuncId = func-47cb6f9ae464484aa779c18284035705, row = data from funcName[funcName1], index = 0
In FuncName = noResultFunc, FuncId = func-47cb6f9ae464484aa779c18284035705, row = data from funcName[funcName1], index = 1
In FuncName = noResultFunc, FuncId = func-47cb6f9ae464484aa779c18284035705, row = data from funcName[funcName1], index = 2
--- PASS: TestActionForceEntry (0.02s)
PASS
ok      kis-flow/test   0.958s

因为noResultFunc不会生成任何的结果数据,所以下一层Function将不会被执行,最后只执行到

---> Call NoResultFuncHandler ----

7.3.4 单元测试,设置ForceEntryNext

下面我们将Action为ForceEntryNext加上,在NoResultFuncHandler() 中,加上flow.Next(kis.ActionForceEntryNext),如下:

kis-flow/test/faas/faas_no_result.go

package faas

import (
    "context"
    "fmt"
    "kis-flow/kis"
)

// type FaaS func(context.Context, Flow) error

func NoResultFuncHandler(ctx context.Context, flow kis.Flow) error {
    fmt.Println("---> Call NoResultFuncHandler ----")

    for _, row := range flow.Input() {
        str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
        fmt.Println(str)
    }

    return flow.Next(kis.ActionForceEntryNext)
}

cd到kis-flow/test/ 下执行:

go test -test.v -test.paniconexit0 -test.run  TestActionForceEntry

结果如下:

=== RUN   TestActionForceEntry
Add KisPool FuncName=funcName1
Add KisPool FuncName=noResultFunc
Add KisPool FuncName=funcName3
Add KisPool CaaSInit CName=ConnName1
Add KisPool CaaS CName=ConnName1, FName=funcName2, Mode =Save
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2]}
Add FlowRouter FlowName=flowName5
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2 funcName2]}
Add FlowRouter FlowName=flowName1
Add FlowRouter FlowName=flowName2
Add FlowRouter FlowName=flowName3
Add FlowRouter FlowName=flowName4
context.Background
====> After CommitSrcData, flow_name = flowName4, flow_id = flow-7fb47f227c9f4b9d8fa69c28177fc7bb
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]]

KisFunctionV, flow = &{Id:flow-7fb47f227c9f4b9d8fa69c28177fc7bb Name:flowName4 Conf:0xc000028e80 Funcs:map[funcName1:0xc0000136e0 funcName3:0xc0000137a0 noResultFunc:0xc000013740] FlowHead:0xc0000136e0 FlowTail:0xc0000137a0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc0000136e0 ThisFunctionId:func-ecddaee7d7d447a9852d07088732f509 PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-5729600ae6ea4d6f879eb5832c638e1a:map[] func-c9817c7993894919b8463dea1757544e:map[] func-ecddaee7d7d447a9852d07088732f509:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}

---> Call funcName1Handler ----
In FuncName = funcName1, FuncId = func-ecddaee7d7d447a9852d07088732f509, row = This is Data1 from Test
In FuncName = funcName1, FuncId = func-ecddaee7d7d447a9852d07088732f509, row = This is Data2 from Test
In FuncName = funcName1, FuncId = func-ecddaee7d7d447a9852d07088732f509, row = This is Data3 from Test
context.Background
 ====> After commitCurData, flow_name = flowName4, flow_id = flow-7fb47f227c9f4b9d8fa69c28177fc7bb
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-ecddaee7d7d447a9852d07088732f509:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]

KisFunctionC, flow = &{Id:flow-7fb47f227c9f4b9d8fa69c28177fc7bb Name:flowName4 Conf:0xc000028e80 Funcs:map[funcName1:0xc0000136e0 funcName3:0xc0000137a0 noResultFunc:0xc000013740] FlowHead:0xc0000136e0 FlowTail:0xc0000137a0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000013740 ThisFunctionId:func-c9817c7993894919b8463dea1757544e PrevFunctionId:func-ecddaee7d7d447a9852d07088732f509 funcParams:map[func-5729600ae6ea4d6f879eb5832c638e1a:map[] func-c9817c7993894919b8463dea1757544e:map[] func-ecddaee7d7d447a9852d07088732f509:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-ecddaee7d7d447a9852d07088732f509:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}

---> Call NoResultFuncHandler ----
In FuncName = noResultFunc, FuncId = func-c9817c7993894919b8463dea1757544e, row = data from funcName[funcName1], index = 0
In FuncName = noResultFunc, FuncId = func-c9817c7993894919b8463dea1757544e, row = data from funcName[funcName1], index = 1
In FuncName = noResultFunc, FuncId = func-c9817c7993894919b8463dea1757544e, row = data from funcName[funcName1], index = 2
context.Background
 ====> After commitVoidData, flow_name = flowName4, flow_id = flow-7fb47f227c9f4b9d8fa69c28177fc7bb
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-c9817c7993894919b8463dea1757544e:[] func-ecddaee7d7d447a9852d07088732f509:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]

KisFunctionC, flow = &{Id:flow-7fb47f227c9f4b9d8fa69c28177fc7bb Name:flowName4 Conf:0xc000028e80 Funcs:map[funcName1:0xc0000136e0 funcName3:0xc0000137a0 noResultFunc:0xc000013740] FlowHead:0xc0000136e0 FlowTail:0xc0000137a0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc0000137a0 ThisFunctionId:func-5729600ae6ea4d6f879eb5832c638e1a PrevFunctionId:func-c9817c7993894919b8463dea1757544e funcParams:map[func-5729600ae6ea4d6f879eb5832c638e1a:map[] func-c9817c7993894919b8463dea1757544e:map[] func-ecddaee7d7d447a9852d07088732f509:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-c9817c7993894919b8463dea1757544e:[] func-ecddaee7d7d447a9852d07088732f509:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}

---> Call funcName3Handler ----
--- PASS: TestActionForceEntry (0.01s)
PASS
ok      kis-flow/test   0.348s

会发现,Function第三层funcName3Handler 被执行到,但是没有任何的数据。

7.4 Action JumpFunc(流程跳转)

接下来,来实现JumpFunc Action,JumpFunc是可以在当前Flow中任意跳转到指定的FuncName继续执行(前提是跳转的FuncName当当前Flow中存在)

注意:JumpFunc容易出现无限循环流,所以在业务的设计要慎用。

7.4.1 Action添加JumpFunc

首先在Action添加一个JumpFunc属性,注意,JunpFunc不是一个bool状态,而是一个string字符串,表示具体要跳转的FunctionName名称。

kis-flow/kis/action.go

// Action KisFlow执行流程Actions
type Action struct {
    // DataReuse 是否复用上层Function数据
    DataReuse bool

    // 默认Next()为如果本层Function计算结果为0条数据,之后Function将不会继续执行
    // ForceEntryNext 为忽略上述默认规则,没有数据强制进入下一层Function
    ForceEntryNext bool

    // ++++++++++
    // JumpFunc 跳转到指定Function继续执行
    JumpFunc string

    // Abort 终止Flow的执行
    Abort bool
}


// ActionJumpFunc 会返回一个ActionFunc函数,并且会将funcName赋值给Action.JumpFunc
// (注意:容易出现Flow循环调用,导致死循环)
func ActionJumpFunc(funcName string) ActionFunc {
    return func(act *Action) {
        act.JumpFunc = funcName
    }
}

然后提供一个修改JumpFunc的配置方法ActionJumpFunc(),注意这个方法和之前的方法写法有一些不同,主要是返回一个匿名函数并且执行,目的则是修改Action中的JumpFunc属性。

7.4.2 捕获Action

接下来,我们来捕获JumpFunc的Action动作,判断JumpFunc是否为空字符串即可。

kis-flow/flow/kis_flow_action.go

// dealAction  处理Action,决定接下来Flow的流程走向
func (flow *KisFlow) dealAction(ctx context.Context, fn kis.Function) (kis.Function, error) {

    // DataReuse Action
    if flow.action.DataReuse {
        if err := flow.commitReuseData(ctx); err != nil {
            return nil, err
        }
    } else {
        if err := flow.commitCurData(ctx); err != nil {
            return nil, err
        }
    }

    // ForceEntryNext Action
    if flow.action.ForceEntryNext {
        if err := flow.commitVoidData(ctx); err != nil {
            return nil, err
        }
        flow.abort = false
    }

    // ++++++++++++++++++++++++++++++++
    // JumpFunc Action
    if flow.action.JumpFunc != "" {
        if _, ok := flow.Funcs[flow.action.JumpFunc]; !ok {
            //当前JumpFunc不在flow中
            return nil, errors.New(fmt.Sprintf("Flow Jump -> %s is not in Flow", flow.action.JumpFunc))
        }

        jumpFunction := flow.Funcs[flow.action.JumpFunc]
        // 更新上层Function
        flow.PrevFunctionId = jumpFunction.GetPrevId()
        fn = jumpFunction

        // 如果设置跳跃,强制跳跃
        flow.abort = false
    // ++++++++++++++++++++++++++++++++

    } else {

        // 更新上一层 FuncitonId 游标
        flow.PrevFunctionId = flow.ThisFunctionId
        fn = fn.Next()
    }

    // Abort Action 强制终止
    if flow.action.Abort {
        flow.abort = true
    }

    // 清空Action
    flow.action = kis.Action{}

    return fn, nil
}

如果设置JumpFunc,则需要修改下次执行的fn指针,否则则正常寻址fn.Next()

7.4.3 单元测试

接下来来定义一个跳转Action的Function,配置,如下:

kis-flow/test/load_conf/func/func-jumpFunc.yml

kistype: func
fname: jumpFunc
fmode: Calculate
source:
  name: 用户订单错误率
  must:
    - order_id
    - user_id

并且实现相关的Funciton业务逻辑,如下:

kis-flow/test/faas/faas_jump.go

package faas

import (
    "context"
    "fmt"
    "kis-flow/kis"
)

// type FaaS func(context.Context, Flow) error

func JumpFuncHandler(ctx context.Context, flow kis.Flow) error {
    fmt.Println("---> Call JumpFuncHandler ----")

    for _, row := range flow.Input() {
        str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
        fmt.Println(str)
    }

    return flow.Next(kis.ActionJumpFunc("funcName1"))
}

这里,最后通过flow.Next(kis.ActionJumpFunc("funcName1"))来指定跳转到funcName1的Function。

新建一个Flow,为FlowName5,配置如下:

kis-flow/test/load_conf/flow/flow-FlowName5.yml

kistype: flow
status: 1
flow_name: flowName5
flows:
  - fname: funcName1
  - fname: funcName2
  - fname: jumpFunc

之后,来实现单元测试用例代码,如下:

kis-flow/test/kis_action_test.go

func TestActionJumpFunc(t *testing.T) {
    ctx := context.Background()

    // 0. 注册Function 回调业务
    kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
    kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler)
    kis.Pool().FaaS("jumpFunc", faas.JumpFuncHandler) // 添加jumpFunc 业务

    // 0. 注册ConnectorInit 和 Connector 回调业务
    kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
    kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)

    // 1. 加载配置文件并构建Flow
    if err := file.ConfigImportYaml("/Users/tal/gopath/src/kis-flow/test/load_conf/"); err != nil {
        panic(err)
    }

    // 2. 获取Flow
    flow1 := kis.Pool().GetFlow("flowName5")

    // 3. 提交原始数据
    _ = flow1.CommitRow("This is Data1 from Test")
    _ = flow1.CommitRow("This is Data2 from Test")
    _ = flow1.CommitRow("This is Data3 from Test")

    // 4. 执行flow1
    if err := flow1.Run(ctx); err != nil {
        panic(err)
    }
}

cd到kis-flow/test/执行:

go test -test.v -test.paniconexit0 -test.run  TestActionJumpFunc

结果如下:

... 
...

---> Call funcName1Handler ----
In FuncName = funcName1, FuncId = func-f6ca8010d66744429bf6069c9897a928, row = This is Data1 from Test
In FuncName = funcName1, FuncId = func-f6ca8010d66744429bf6069c9897a928, row = This is Data2 from Test
In FuncName = funcName1, FuncId = func-f6ca8010d66744429bf6069c9897a928, row = This is Data3 from Test
context.Background
 ====> After commitCurData, flow_name = flowName5, flow_id = flow-5da80af989dc49648a001762fa08b866
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-5800567c4cd842b6b377c2b0c0fd81c2:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] func-f6ca8010d66744429bf6069c9897a928:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]

KisFunctionS, flow = &{Id:flow-5da80af989dc49648a001762fa08b866 Name:flowName5 Conf:0xc000028f80 Funcs:map[funcName1:0xc000013620 funcName2:0xc000013680 jumpFunc:0xc0000136e0] FlowHead:0xc000013620 FlowTail:0xc0000136e0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000013680 ThisFunctionId:func-5800567c4cd842b6b377c2b0c0fd81c2 PrevFunctionId:func-f6ca8010d66744429bf6069c9897a928 funcParams:map[func-4faf8f019f4a4a48b84ef27abfad53d1:map[] func-5800567c4cd842b6b377c2b0c0fd81c2:map[] func-f6ca8010d66744429bf6069c9897a928:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-5800567c4cd842b6b377c2b0c0fd81c2:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] func-f6ca8010d66744429bf6069c9897a928:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}

---> Call funcName2Handler ----
In FuncName = funcName2, FuncId = func-5800567c4cd842b6b377c2b0c0fd81c2, row = data from funcName[funcName1], index = 0
===> In CaasDemoHanler1: flowName: flowName5, cName:ConnName1, fnName:funcName2, mode:Save
===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 0
In FuncName = funcName2, FuncId = func-5800567c4cd842b6b377c2b0c0fd81c2, row = data from funcName[funcName1], index = 1
===> In CaasDemoHanler1: flowName: flowName5, cName:ConnName1, fnName:funcName2, mode:Save
===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 1
In FuncName = funcName2, FuncId = func-5800567c4cd842b6b377c2b0c0fd81c2, row = data from funcName[funcName1], index = 2
===> In CaasDemoHanler1: flowName: flowName5, cName:ConnName1, fnName:funcName2, mode:Save
===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 2
context.Background
 ====> After commitCurData, flow_name = flowName5, flow_id = flow-5da80af989dc49648a001762fa08b866
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-5800567c4cd842b6b377c2b0c0fd81c2:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] func-f6ca8010d66744429bf6069c9897a928:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]

KisFunctionC, flow = &{Id:flow-5da80af989dc49648a001762fa08b866 Name:flowName5 Conf:0xc000028f80 Funcs:map[funcName1:0xc000013620 funcName2:0xc000013680 jumpFunc:0xc0000136e0] FlowHead:0xc000013620 FlowTail:0xc0000136e0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc0000136e0 ThisFunctionId:func-4faf8f019f4a4a48b84ef27abfad53d1 PrevFunctionId:func-5800567c4cd842b6b377c2b0c0fd81c2 funcParams:map[func-4faf8f019f4a4a48b84ef27abfad53d1:map[] func-5800567c4cd842b6b377c2b0c0fd81c2:map[] func-f6ca8010d66744429bf6069c9897a928:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-5800567c4cd842b6b377c2b0c0fd81c2:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] func-f6ca8010d66744429bf6069c9897a928:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}

---> Call JumpFuncHandler ----
In FuncName = jumpFunc, FuncId = func-4faf8f019f4a4a48b84ef27abfad53d1, row = data from funcName[funcName2], index = 0
In FuncName = jumpFunc, FuncId = func-4faf8f019f4a4a48b84ef27abfad53d1, row = data from funcName[funcName2], index = 1
In FuncName = jumpFunc, FuncId = func-4faf8f019f4a4a48b84ef27abfad53d1, row = data from funcName[funcName2], index = 2
KisFunctionV, flow = &{Id:flow-5da80af989dc49648a001762fa08b866 Name:flowName5 Conf:0xc000028f80 Funcs:map[funcName1:0xc000013620 funcName2:0xc000013680 jumpFunc:0xc0000136e0] FlowHead:0xc000013620 FlowTail:0xc0000136e0 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000013620 ThisFunctionId:func-f6ca8010d66744429bf6069c9897a928 PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-4faf8f019f4a4a48b84ef27abfad53d1:map[] func-5800567c4cd842b6b377c2b0c0fd81c2:map[] func-f6ca8010d66744429bf6069c9897a928:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-5800567c4cd842b6b377c2b0c0fd81c2:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] func-f6ca8010d66744429bf6069c9897a928:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false}}

---> Call funcName1Handler ----

 ... 
 ...

发现我们会无限循环的调度Flow,这样说明我们的JumpFunc Action已经生效。

7.5【V0.6】源代码

https://github.com/aceld/kis-flow/releases/tag/v0.6


作者:刘丹冰Aceld github: https://github.com/aceld
KisFlow开源项目地址:https://github.com/aceld/kis-flow

Golang框架实战-KisFlow流式计算框架专栏

Golang框架实战-KisFlow流式计算框架(1)-概述
Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)
Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)
Golang框架实战-KisFlow流式计算框架(4)-数据流
Golang框架实战-KisFlow流式计算框架(5)-Function调度
Golang框架实战-KisFlow流式计算框架(6)-Connector
Golang框架实战-KisFlow流式计算框架(7)-配置导入与导出

上一篇下一篇

猜你喜欢

热点阅读