Golang框架实战-KisFlow流式计算框架(10)-Flo
Golang框架实战-KisFlow流式计算框架专栏
Golang框架实战-KisFlow流式计算框架(1)-概述
Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)
Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)
Golang框架实战-KisFlow流式计算框架(4)-数据流
Golang框架实战-KisFlow流式计算框架(5)-Function调度
Golang框架实战-KisFlow流式计算框架(6)-Connector
Golang框架实战-KisFlow流式计算框架(7)-配置导入与导出
Golang框架实战-KisFlow流式计算框架(8)-KisFlow Action
Golang框架实战-KisFlow流式计算框架(9)-Cache/Params 数据缓存与数据参数
Golang框架实战-KisFlow流式计算框架(10)-Flow多副本
9.1 多副本能力
KisFlow如果在执行流体中,需要被多个Goroutine来并发使用,可能需要同一个配置的创建多个Flow来匹配多个并发的计算流,所以Flow需要一个创建副本的能力。本章将实现这部分的能力。
9.1.1 Flow新增接口
首先,给Flow的抽象层新增一个接口Fork()
,原型如下:
kis-flow/kis/flow.go
type Flow interface {
// Run 调度Flow,依次调度Flow中的Function并且执行
Run(ctx context.Context) error
// Link 将Flow中的Function按照配置文件中的配置进行连接
Link(fConf *config.KisFuncConfig, fParams config.FParam) error
// CommitRow 提交Flow数据到即将执行的Function层
CommitRow(row interface{}) error
// Input 得到flow当前执行Function的输入源数据
Input() common.KisRowArr
// GetName 得到Flow的名称
GetName() string
// GetThisFunction 得到当前正在执行的Function
GetThisFunction() Function
// GetThisFuncConf 得到当前正在执行的Function的配置
GetThisFuncConf() *config.KisFuncConfig
// GetConnector 得到当前正在执行的Function的Connector
GetConnector() (Connector, error)
// GetConnConf 得到当前正在执行的Function的Connector的配置
GetConnConf() (*config.KisConnConfig, error)
// GetConfig 得到当前Flow的配置
GetConfig() *config.KisFlowConfig
// GetFuncConfigByName 得到当前Flow的配置
GetFuncConfigByName(funcName string) *config.KisFuncConfig
// Next 当前Flow执行到的Function进入下一层Function所携带的Action动作
Next(acts ...ActionFunc) error
// GetCacheData 得到当前Flow的缓存数据
GetCacheData(key string) interface{}
// SetCacheData 设置当前Flow的缓存数据
SetCacheData(key string, value interface{}, Exp time.Duration)
// GetMetaData 得到当前Flow的临时数据
GetMetaData(key string) interface{}
// SetMetaData 设置当前Flow的临时数据
SetMetaData(key string, value interface{})
// GetFuncParam 得到Flow的当前正在执行的Function的配置默认参数,取出一对key-value
GetFuncParam(key string) string
// GetFuncParamAll 得到Flow的当前正在执行的Function的配置默认参数,取出全部Key-Value
GetFuncParamAll() config.FParam
// +++++++++++++++++++++++++
// Fork 得到Flow的一个副本(深拷贝)
Fork(ctx context.Context) Flow
}
Fork()
会根据一个已有的KisFlow实例,完全克隆一个资源隔离的但是具有相同配置的KisFlow实例。
具体的实现方法如下:
kis-flow/flow/kis_flow.go
// Fork 得到Flow的一个副本(深拷贝)
func (flow *KisFlow) Fork(ctx context.Context) kis.Flow {
config := flow.Conf
// 通过之前的配置生成一个新的Flow
newFlow := NewKisFlow(config)
for _, fp := range flow.Conf.Flows {
if _, ok := flow.funcParams[flow.Funcs[fp.FuncName].GetId()]; !ok {
//当前function没有配置Params
newFlow.Link(flow.Funcs[fp.FuncName].GetConfig(), nil)
} else {
//当前function有配置Params
newFlow.Link(flow.Funcs[fp.FuncName].GetConfig(), fp.Params)
}
}
log.Logger().DebugFX(ctx, "=====>Flow Fork, oldFlow.funcParams = %+v\n", flow.funcParams)
log.Logger().DebugFX(ctx, "=====>Flow Fork, newFlow.funcParams = %+v\n", newFlow.GetFuncParamsAllFuncs())
return newFlow
}
在Fork()
中,首先会根据flow的配置信息,重新创建一个KisFlow实例,并且将flow所关联的Params等配置信息一同拷贝,最后通过Link()
将新建的Function和Flow连接起来。
上述代码为了调试,给Flow新增了一个打印全部FuncParams信息的接口GetFuncParamsAllFuncs()
,具体的实现方式如下:
kis-flow/kis/flow.go
type Flow interface {
// ... ...
// ... ...
// GetFuncParamsAllFuncs 得到Flow中所有Function的FuncParams,取出全部Key-Value
GetFuncParamsAllFuncs() map[string]config.FParam
// ... ...
}
kis-flow/flow/kis_flow_data.go
// GetFuncParamsAllFuncs 得到Flow中所有Function的FuncParams,取出全部Key-Value
func (flow *KisFlow) GetFuncParamsAllFuncs() map[string]config.FParam {
flow.fplock.RLock()
defer flow.fplock.RUnlock()
return flow.funcParams
}
9.2 单元测试
下面我们来测试一个Fork能力,单元测试代码如下:
kis-flow/test/kis_fork_test.go
func TestForkFlow(t *testing.T) {
ctx := context.Background()
// 0. 注册Function 回调业务
kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler)
kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)
// 0. 注册ConnectorInit 和 Connector 回调业务
kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)
// 1. 加载配置文件并构建Flow
if err := file.ConfigImportYaml("/Users/tal/gopath/src/kis-flow/test/load_conf/"); err != nil {
panic(err)
}
// 2. 获取Flow
flow1 := kis.Pool().GetFlow("flowName1")
flow1Clone1 := flow1.Fork(ctx)
// 3. 提交原始数据
_ = flow1Clone1.CommitRow("This is Data1 from Test")
_ = flow1Clone1.CommitRow("This is Data2 from Test")
_ = flow1Clone1.CommitRow("This is Data3 from Test")
// 4. 执行flow1
if err := flow1Clone1.Run(ctx); err != nil {
panic(err)
}
}
首先我们先创建flowName1的flow实例,然后通过fork()
得到flowClone1
,然后执行flowClone1的调度流程。
cd到kis-flow/test/
下执行:
go test -test.v -test.paniconexit0 -test.run TestForkFlow
结果如下:
=== RUN TestForkFlow
Add KisPool FuncName=funcName1
Add KisPool FuncName=funcName2
Add KisPool FuncName=funcName3
Add KisPool CaaSInit CName=ConnName1
Add KisPool CaaS CName=ConnName1, FName=funcName2, Mode =Save
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2]}
Add FlowRouter FlowName=flowName1
Add FlowRouter FlowName=flowName2
Add FlowRouter FlowName=flowName3
Add FlowRouter FlowName=flowName4
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2 funcName2]}
Add FlowRouter FlowName=flowName5
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2 funcName2]}
context.Background
=====>Flow Fork, oldFlow.funcParams = map[func-6b00f430fe494302a384c2ae09eb019c:map[default1:funcName3_param1 default2:funcName3_param2 myKey1:flowValue3-1 myKey2:flowValue3-2] func-bf9df5fc16684200b78f32985d073012:map[default1:funcName2_param1 default2:funcName2_param2 myKey1:flowValue2-1 myKey2:flowValue2-2] func-c0f1ae9850174f81b994a2e98fb34109:map[default1:funcName1_param1 default2:funcName1_param2 myKey1:flowValue1-1 myKey2:flowValue1-2]]
context.Background
=====>Flow Fork, newFlow.funcParams = map[func-614511f5142e4023b80373517f3ea0a7:map[default1:funcName3_param1 default2:funcName3_param2 myKey1:flowValue3-1 myKey2:flowValue3-2] func-9406285e2fa94bd582dab4a875771a97:map[default1:funcName1_param1 default2:funcName1_param2 myKey1:flowValue1-1 myKey2:flowValue1-2] func-de7c4e4175b74a898cb43863e53b3215:map[default1:funcName2_param1 default2:funcName2_param2 myKey1:flowValue2-1 myKey2:flowValue2-2]]
context.Background
====> After CommitSrcData, flow_name = flowName1, flow_id = flow-38c362e52fee489db3af96ae7d83d56a
All Level Data =
map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]]
KisFunctionV, flow = &{Id:flow-38c362e52fee489db3af96ae7d83d56a Name:flowName1 Conf:0xc000153f80 Funcs:map[funcName1:0xc00014d180 funcName2:0xc00014d200 funcName3:0xc00014d300] FlowHead:0xc00014d180 FlowTail:0xc00014d300 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc00014d180 ThisFunctionId:func-9406285e2fa94bd582dab4a875771a97 PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-614511f5142e4023b80373517f3ea0a7:map[default1:funcName3_param1 default2:funcName3_param2 myKey1:flowValue3-1 myKey2:flowValue3-2] func-9406285e2fa94bd582dab4a875771a97:map[default1:funcName1_param1 default2:funcName1_param2 myKey1:flowValue1-1 myKey2:flowValue1-2] func-de7c4e4175b74a898cb43863e53b3215:map[default1:funcName2_param1 default2:funcName2_param2 myKey1:flowValue2-1 myKey2:flowValue2-2]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false} cache:0xc00011aae0 metaData:map[] mLock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0}}
---> Call funcName1Handler ----
Params = map[default1:funcName1_param1 default2:funcName1_param2 myKey1:flowValue1-1 myKey2:flowValue1-2]
In FuncName = funcName1, FuncId = func-9406285e2fa94bd582dab4a875771a97, row = This is Data1 from Test
In FuncName = funcName1, FuncId = func-9406285e2fa94bd582dab4a875771a97, row = This is Data2 from Test
In FuncName = funcName1, FuncId = func-9406285e2fa94bd582dab4a875771a97, row = This is Data3 from Test
context.Background
====> After commitCurData, flow_name = flowName1, flow_id = flow-38c362e52fee489db3af96ae7d83d56a
All Level Data =
map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-9406285e2fa94bd582dab4a875771a97:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]
KisFunctionS, flow = &{Id:flow-38c362e52fee489db3af96ae7d83d56a Name:flowName1 Conf:0xc000153f80 Funcs:map[funcName1:0xc00014d180 funcName2:0xc00014d200 funcName3:0xc00014d300] FlowHead:0xc00014d180 FlowTail:0xc00014d300 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc00014d200 ThisFunctionId:func-de7c4e4175b74a898cb43863e53b3215 PrevFunctionId:func-9406285e2fa94bd582dab4a875771a97 funcParams:map[func-614511f5142e4023b80373517f3ea0a7:map[default1:funcName3_param1 default2:funcName3_param2 myKey1:flowValue3-1 myKey2:flowValue3-2] func-9406285e2fa94bd582dab4a875771a97:map[default1:funcName1_param1 default2:funcName1_param2 myKey1:flowValue1-1 myKey2:flowValue1-2] func-de7c4e4175b74a898cb43863e53b3215:map[default1:funcName2_param1 default2:funcName2_param2 myKey1:flowValue2-1 myKey2:flowValue2-2]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-9406285e2fa94bd582dab4a875771a97:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false} cache:0xc00011aae0 metaData:map[] mLock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0}}
---> Call funcName2Handler ----
Params = map[default1:funcName2_param1 default2:funcName2_param2 myKey1:flowValue2-1 myKey2:flowValue2-2]
In FuncName = funcName2, FuncId = func-de7c4e4175b74a898cb43863e53b3215, row = data from funcName[funcName1], index = 0
===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save
Params = map[args1:value1 args2:value2]
===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 0
In FuncName = funcName2, FuncId = func-de7c4e4175b74a898cb43863e53b3215, row = data from funcName[funcName1], index = 1
===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save
Params = map[args1:value1 args2:value2]
===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 1
In FuncName = funcName2, FuncId = func-de7c4e4175b74a898cb43863e53b3215, row = data from funcName[funcName1], index = 2
===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save
Params = map[args1:value1 args2:value2]
===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 2
context.Background
====> After commitCurData, flow_name = flowName1, flow_id = flow-38c362e52fee489db3af96ae7d83d56a
All Level Data =
map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-9406285e2fa94bd582dab4a875771a97:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] func-de7c4e4175b74a898cb43863e53b3215:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2]]
KisFunctionC, flow = &{Id:flow-38c362e52fee489db3af96ae7d83d56a Name:flowName1 Conf:0xc000153f80 Funcs:map[funcName1:0xc00014d180 funcName2:0xc00014d200 funcName3:0xc00014d300] FlowHead:0xc00014d180 FlowTail:0xc00014d300 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc00014d300 ThisFunctionId:func-614511f5142e4023b80373517f3ea0a7 PrevFunctionId:func-de7c4e4175b74a898cb43863e53b3215 funcParams:map[func-614511f5142e4023b80373517f3ea0a7:map[default1:funcName3_param1 default2:funcName3_param2 myKey1:flowValue3-1 myKey2:flowValue3-2] func-9406285e2fa94bd582dab4a875771a97:map[default1:funcName1_param1 default2:funcName1_param2 myKey1:flowValue1-1 myKey2:flowValue1-2] func-de7c4e4175b74a898cb43863e53b3215:map[default1:funcName2_param1 default2:funcName2_param2 myKey1:flowValue2-1 myKey2:flowValue2-2]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-9406285e2fa94bd582dab4a875771a97:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] func-de7c4e4175b74a898cb43863e53b3215:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2]] inPut:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false} cache:0xc00011aae0 metaData:map[] mLock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0}}
---> Call funcName3Handler ----
Params = map[default1:funcName3_param1 default2:funcName3_param2 myKey1:flowValue3-1 myKey2:flowValue3-2]
In FuncName = funcName3, FuncId = func-614511f5142e4023b80373517f3ea0a7, row = data from funcName[funcName2], index = 0
In FuncName = funcName3, FuncId = func-614511f5142e4023b80373517f3ea0a7, row = data from funcName[funcName2], index = 1
In FuncName = funcName3, FuncId = func-614511f5142e4023b80373517f3ea0a7, row = data from funcName[funcName2], index = 2
--- PASS: TestForkFlow (0.03s)
PASS
ok kis-flow/test 0.996s
通过结果可以看出,flowClone1和flowName1具有相同的配置信息。
9.3 【V0.8】源代码
https://github.com/aceld/kis-flow/releases/tag/v0.8
作者:刘丹冰Aceld github: https://github.com/aceld
KisFlow开源项目地址:https://github.com/aceld/kis-flow
Golang框架实战-KisFlow流式计算框架专栏
Golang框架实战-KisFlow流式计算框架(1)-概述
Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)
Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)
Golang框架实战-KisFlow流式计算框架(4)-数据流
Golang框架实战-KisFlow流式计算框架(5)-Function调度
Golang框架实战-KisFlow流式计算框架(6)-Connector
Golang框架实战-KisFlow流式计算框架(7)-配置导入与导出
Golang框架实战-KisFlow流式计算框架(8)-KisFlow Action
Golang框架实战-KisFlow流式计算框架(10)-Flow多副本