KisFlow-基于Golang的流式计算框架实战@IT·互联网

Golang框架实战-KisFlow流式计算框架(7)-配置导入

2024-03-02  本文已影响0人  刘丹冰Aceld

Golang框架实战-KisFlow流式计算框架专栏

Golang框架实战-KisFlow流式计算框架(1)-概述
Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)
Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)
Golang框架实战-KisFlow流式计算框架(4)-数据流
Golang框架实战-KisFlow流式计算框架(5)-Function调度
Golang框架实战-KisFlow流式计算框架(6)-Connector
Golang框架实战-KisFlow流式计算框架(7)-配置导入与导出


6.1 配置的导入

现在每次建立Flow和Function等,都需要一系列繁琐的添加,不是很方便,接下来,我们可以通过批量读写配置文件,构建KisFlow中的结构关系,并且也可以将KisFlow的结构导出到本地文件中。目前我们先用文件的形式做配置的持久化,开发者也可以今后做数据库或者远程配置的持久化均可。

6.1 配置的导入

6.1.1 创建配置文件

首先我们在kis-flow/test/load_conf/下创建需要加载的kisflow业务配置文件。
kis-flow/test/load_conf/下分别创建conn/flow/func/三个文件夹分别存放Connector、Flow、Funciton的配置信息。

├── conn
│   └── conn-ConnName1.yml
├── flow
│   └── flow-FlowName1.yml
└── func
    ├── func-FuncName1.yml
    ├── func-FuncName2.yml
    └── func-FuncName3.yml

分别创建一些yml文件。具体内容如下:

A.Function

kis-flow/test/load_conf/func/func-FuncNam1.yml

kistype: func
fname: funcName1
fmode: Verify
source:
  name: 公众号抖音商城户订单数据
  must:
    - order_id
    - user_id

kis-flow/test/load_conf/func/func-FuncNam2.yml

kistype: func
fname: funcName2
fmode: Save
source:
  name: 用户订单错误率
  must:
    - order_id
    - user_id
option:
  cname: ConnName1

kis-flow/test/load_conf/func/func-FuncNam2.yml

kistype: func
fname: funcName2
fmode: Save
source:
  name: 用户订单错误率
  must:
    - order_id
    - user_id
option:
  cname: ConnName1

kis-flow/test/load_conf/func/func-FuncNam3.yml

kistype: func
fname: funcName3
fmode: Calculate
source:
  name: 用户订单错误率
  must:
    - order_id
    - user_id

B.Connector

kis-flow/test/load_conf/func/func-ConnName1.yml

kistype: conn
cname: ConnName1
addrs: '0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990'
type: redis
key: redis-key
params:
  args1: value1
  args2: value2
load: null
save:
  - funcName2

C.Flow

kis-flow/test/load_conf/func/func-FlowName1.yml

kistype: flow
status: 1
flow_name: flowName1
flows:
  - fname: funcName1
  - fname: funcName2
  - fname: funcName3

6.1.2 配置解析

创建kis-flow/file/目录,且创建kis-flow/file/config_import.go文件。

首先定义一个可以存放全部配置的接口:

kis-flow/file/config_import.go

type allConfig struct {
    Flows map[string]*config.KisFlowConfig
    Funcs map[string]*config.KisFuncConfig
    Conns map[string]*config.KisConnConfig
}

key作为各个模块的Name名称字段。
然后分别定义解析Flow、Function、Connector配置的方法。yaml的第三方库,我们用"gopkg.in/yaml.v3"这个库。

kis-flow/go.mod

module kis-flow

go 1.18

require github.com/google/uuid v1.5.0
require gopkg.in/yaml.v3 v3.0.1 // indirect

A. Flow 配置解析

kis-flow/file/config_import.go

// kisTypeFlowConfigure 解析Flow配置文件,yaml格式
func kisTypeFlowConfigure(all *allConfig, confData []byte, fileName string, kisType interface{}) error {
    flow := new(config.KisFlowConfig)
    if ok := yaml.Unmarshal(confData, flow); ok != nil {
        return errors.New(fmt.Sprintf("%s has wrong format kisType = %s", fileName, kisType))
    }

    // 如果FLow状态为关闭,则不做配置加载
    if common.KisOnOff(flow.Status) == common.FlowDisable {
        return nil
    }

    if _, ok := all.Flows[flow.FlowName]; ok {
        return errors.New(fmt.Sprintf("%s set repeat flow_id:%s", fileName, flow.FlowName))
    }

    // 加入配置集合中
    all.Flows[flow.FlowName] = flow

    return nil
}

kisTypeFlowConfigure 会将配置信息解析到allConfig的Flows成员中。
同理,Function和Connector的解析办法如下。

B. Functioin配置解析

kis-flow/file/config_import.go

// kisTypeFuncConfigure 解析Function配置文件,yaml格式
func kisTypeFuncConfigure(all *allConfig, confData []byte, fileName string, kisType interface{}) error {
    function := new(config.KisFuncConfig)
    if ok := yaml.Unmarshal(confData, function); ok != nil {
        return errors.New(fmt.Sprintf("%s has wrong format kisType = %s", fileName, kisType))
    }
    if _, ok := all.Funcs[function.FName]; ok {
        return errors.New(fmt.Sprintf("%s set repeat function_id:%s", fileName, function.FName))
    }

    // 加入配置集合中
    all.Funcs[function.FName] = function

    return nil
}

C. Connector配置解析

kis-flow/file/config_import.go

// kisTypeConnConfigure 解析Connector配置文件,yaml格式
func kisTypeConnConfigure(all *allConfig, confData []byte, fileName string, kisType interface{}) error {
    conn := new(config.KisConnConfig)
    if ok := yaml.Unmarshal(confData, conn); ok != nil {
        return errors.New(fmt.Sprintf("%s is wrong format nsType = %s", fileName, kisType))
    }

    if _, ok := all.Conns[conn.CName]; ok {
        return errors.New(fmt.Sprintf("%s set repeat conn_id:%s", fileName, conn.CName))
    }

    // 加入配置集合中
    all.Conns[conn.CName] = conn

    return nil
}

6.1.3 遍历文件

下面实现一个遍历一个路径loadPath下面所有的yml和yaml类型文件,按照kistype类别解析配置信息到allConfig中。

kis-flow/file/config_import.go

// parseConfigWalkYaml 全盘解析配置文件,yaml格式, 讲配置信息解析到allConfig中
func parseConfigWalkYaml(loadPath string) (*allConfig, error) {

    all := new(allConfig)

    all.Flows = make(map[string]*config.KisFlowConfig)
    all.Funcs = make(map[string]*config.KisFuncConfig)
    all.Conns = make(map[string]*config.KisConnConfig)

    err := filepath.Walk(loadPath, func(filePath string, info os.FileInfo, err error) error {
        // 校验文件后缀是否合法
        if suffix := path.Ext(filePath); suffix != ".yml" && suffix != ".yaml" {
            return nil
        }

        // 读取文件内容
        confData, err := ioutil.ReadFile(filePath)
        if err != nil {
            return err
        }

        confMap := make(map[string]interface{})

        // 校验yaml合法性
        if err := yaml.Unmarshal(confData, confMap); err != nil {
            return err
        }

        // 判断kisType是否存在
        if kisType, ok := confMap["kistype"]; !ok {
            return errors.New(fmt.Sprintf("yaml file %s has no file [kistype]!", filePath))
        } else {
            switch kisType {
            case common.KisIdTypeFlow:
                return kisTypeFlowConfigure(all, confData, filePath, kisType)

            case common.KisIdTypeFunction:
                return kisTypeFuncConfigure(all, confData, filePath, kisType)

            case common.KisIdTypeConnnector:
                return kisTypeConnConfigure(all, confData, filePath, kisType)

            default:
                return errors.New(fmt.Sprintf("%s set wrong kistype %s", filePath, kisType))
            }
        }
    })

    if err != nil {
        return nil, err
    }

    return all, nil
}

6.1.4 导入方法

下面提供一个对外的公开方法ConfigImportYaml,需要提供一个导入的文件根路径。

kis-flow/file/config_import.go

// ConfigImportYaml 全盘解析配置文件,yaml格式
func ConfigImportYaml(loadPath string) error {

    all, err := parseConfigWalkYaml(loadPath)
    if err != nil {
        return err
    }

    for flowName, flowConfig := range all.Flows {

        // 构建一个Flow
        newFlow := flow.NewKisFlow(flowConfig)

        for _, fp := range flowConfig.Flows {
            if err := buildFlow(all, fp, newFlow, flowName); err != nil {
                return err
            }
        }

        //将flow添加到FlowPool中
        kis.Pool().AddFlow(flowName, newFlow)
    }

    return nil
}

首先会调用parseConfigWalkYaml()将全部的配置信息加载到内存中。
其次,遍历所有的Flow,依次去构建Flow,最终将flow添加到Pool当中,具体的构建流程如下:

kis-flow/file/config_import.go

func buildFlow(all *allConfig, fp config.KisFlowFunctionParam, newFlow kis.Flow, flowName string) error {
    //加载当前Flow依赖的Function
    if funcConfig, ok := all.Funcs[fp.FuncName]; !ok {
        return errors.New(fmt.Sprintf("FlowName [%s] need FuncName [%s], But has No This FuncName Config", flowName, fp.FuncName))
    } else {
        //flow add connector
        if funcConfig.Option.CName != "" {
            // 加载当前Function依赖的Connector
            if connConf, ok := all.Conns[funcConfig.Option.CName]; !ok {
                return errors.New(fmt.Sprintf("FuncName [%s] need ConnName [%s], But has No This ConnName Config", fp.FuncName, funcConfig.Option.CName))
            } else {
                // Function Config 关联 Connector Config
                _ = funcConfig.AddConnConfig(connConf)
            }
        }

        //flow add function
        if err := newFlow.Link(funcConfig, fp.Params); err != nil {
            return err
        }
    }

    return nil
}

6.2 配置导入单元测试

创建单元测试文件 kis-flow/test/kis_config_import_test.go

kis-flow/test/kis_config_import_test.go

package test

import (
    "context"
    "kis-flow/common"
    "kis-flow/file"
    "kis-flow/kis"
    "kis-flow/test/caas"
    "kis-flow/test/faas"
    "testing"
)

func TestConfigImportYmal(t *testing.T) {
    ctx := context.Background()

    // 0. 注册Function 回调业务
    kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
    kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler)
    kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)

    // 0. 注册ConnectorInit 和 Connector 回调业务
    kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
    kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)

    // 1. 加载配置文件并构建Flow
    if err := file.ConfigImportYaml("/Users/gopath/src/kis-flow/test/load_conf/"); err != nil {
        panic(err)
    }

    // 2. 获取Flow
    flow1 := kis.Pool().GetFlow("flowName1")

    // 3. 提交原始数据
    _ = flow1.CommitRow("This is Data1 from Test")
    _ = flow1.CommitRow("This is Data2 from Test")
    _ = flow1.CommitRow("This is Data3 from Test")

    // 4. 执行flow1
    if err := flow1.Run(ctx); err != nil {
        panic(err)
    }
}

先注册业务方法。然后通过ConfigImportYaml加载配置,之后从Pool中得到flow实例,提交数据,运行。

注意,这里的配置文件路径,写的是绝对路径。

cd 到kis-flow/test/目录下,执行指令:

go test -test.v -test.paniconexit0 -test.run TestConfigImportYmal    

结果如下:

=== RUN   TestConfigImportYmal
Add KisPool FuncName=funcName1
Add KisPool FuncName=funcName2
Add KisPool FuncName=funcName3
Add KisPool CaaSInit CName=ConnName1
Add KisPool CaaS CName=ConnName1, FName=funcName2, Mode =Save
===> Call Connector InitDemo1
&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2]}
Add FlowRouter FlowName=flowName1

context.Background
====> After CommitSrcData, flow_name = flowName1, flow_id = flow-bcaaa02a8d4b4a80b2f2895d9cecf20b
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]]

KisFunctionV, flow = &{Id:flow-bcaaa02a8d4b4a80b2f2895d9cecf20b Name:flowName1 Conf:0xc00014eb00 Funcs:map[funcName1:0xc000114960 funcName2:0xc0001149c0 funcName3:0xc000114a20] FlowHead:0xc000114960 FlowTail:0xc000114a20 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000114960 ThisFunctionId:func-37c7070f45144529891d433ae9c4ebfc PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-37c7070f45144529891d433ae9c4ebfc:map[] func-5315301ffbbb4ae4be021729ddff1569:map[] func-89a6a662729b4a0895e849c40bf29892:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]}

---> Call funcName1Handler ----
In FuncName = funcName1, FuncId = func-37c7070f45144529891d433ae9c4ebfc, row = This is Data1 from Test
In FuncName = funcName1, FuncId = func-37c7070f45144529891d433ae9c4ebfc, row = This is Data2 from Test
In FuncName = funcName1, FuncId = func-37c7070f45144529891d433ae9c4ebfc, row = This is Data3 from Test
context.Background
 ====> After commitCurData, flow_name = flowName1, flow_id = flow-bcaaa02a8d4b4a80b2f2895d9cecf20b
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-37c7070f45144529891d433ae9c4ebfc:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]

KisFunctionS, flow = &{Id:flow-bcaaa02a8d4b4a80b2f2895d9cecf20b Name:flowName1 Conf:0xc00014eb00 Funcs:map[funcName1:0xc000114960 funcName2:0xc0001149c0 funcName3:0xc000114a20] FlowHead:0xc000114960 FlowTail:0xc000114a20 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc0001149c0 ThisFunctionId:func-5315301ffbbb4ae4be021729ddff1569 PrevFunctionId:func-37c7070f45144529891d433ae9c4ebfc funcParams:map[func-37c7070f45144529891d433ae9c4ebfc:map[] func-5315301ffbbb4ae4be021729ddff1569:map[] func-89a6a662729b4a0895e849c40bf29892:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-37c7070f45144529891d433ae9c4ebfc:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]}

---> Call funcName2Handler ----
In FuncName = funcName2, FuncId = func-5315301ffbbb4ae4be021729ddff1569, row = data from funcName[funcName1], index = 0
===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save
===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 0
In FuncName = funcName2, FuncId = func-5315301ffbbb4ae4be021729ddff1569, row = data from funcName[funcName1], index = 1
===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save
===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 1
In FuncName = funcName2, FuncId = func-5315301ffbbb4ae4be021729ddff1569, row = data from funcName[funcName1], index = 2
===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save
===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 2
context.Background
 ====> After commitCurData, flow_name = flowName1, flow_id = flow-bcaaa02a8d4b4a80b2f2895d9cecf20b
All Level Data =
 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-37c7070f45144529891d433ae9c4ebfc:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] func-5315301ffbbb4ae4be021729ddff1569:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2]]

KisFunctionC, flow = &{Id:flow-bcaaa02a8d4b4a80b2f2895d9cecf20b Name:flowName1 Conf:0xc00014eb00 Funcs:map[funcName1:0xc000114960 funcName2:0xc0001149c0 funcName3:0xc000114a20] FlowHead:0xc000114960 FlowTail:0xc000114a20 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000114a20 ThisFunctionId:func-89a6a662729b4a0895e849c40bf29892 PrevFunctionId:func-5315301ffbbb4ae4be021729ddff1569 funcParams:map[func-37c7070f45144529891d433ae9c4ebfc:map[] func-5315301ffbbb4ae4be021729ddff1569:map[] func-89a6a662729b4a0895e849c40bf29892:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-37c7070f45144529891d433ae9c4ebfc:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] func-5315301ffbbb4ae4be021729ddff1569:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2]] inPut:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2]}

---> Call funcName3Handler ----
In FuncName = funcName3, FuncId = func-89a6a662729b4a0895e849c40bf29892, row = data from funcName[funcName2], index = 0
In FuncName = funcName3, FuncId = func-89a6a662729b4a0895e849c40bf29892, row = data from funcName[funcName2], index = 1
In FuncName = funcName3, FuncId = func-89a6a662729b4a0895e849c40bf29892, row = data from funcName[funcName2], index = 2
--- PASS: TestConfigImportYmal (0.01s)
PASS
ok      kis-flow/test   0.517s

预期的结果和我们一致,现在我们可以通过配置文件进行加载且构建KisFlow了。

6.3 配置的导出

6.3.1 导出实现

kis-flow/file/config_export.go

package file

import (
    "errors"
    "fmt"
    "gopkg.in/yaml.v3"
    "io/ioutil"
    "kis-flow/common"
    "kis-flow/kis"
)

// ConfigExportYaml 将flow配置输出,且存储本地
func ConfigExportYaml(flow kis.Flow, savaPath string) error {

    if data, err := yaml.Marshal(flow.GetConfig()); err != nil {
        return err
    } else {
        //flow
        err := ioutil.WriteFile(savaPath+common.KisIdTypeFlow+"-"+flow.GetName()+".yaml", data, 0644)
        if err != nil {
            return err
        }

        //function
        for _, fp := range flow.GetConfig().Flows {
            fConf := flow.GetFuncConfigByName(fp.FuncName)
            if fConf == nil {
                return errors.New(fmt.Sprintf("function name = %s config is nil ", fp.FuncName))
            }

            if fdata, err := yaml.Marshal(fConf); err != nil {
                return err
            } else {
                if err := ioutil.WriteFile(savaPath+common.KisIdTypeFunction+"-"+fp.FuncName+".yaml", fdata, 0644); err != nil {
                    return err
                }
            }

            // Connector
            if fConf.Option.CName != "" {
                cConf, err := fConf.GetConnConfig()
                if err != nil {
                    return err
                }
                if cdata, err := yaml.Marshal(cConf); err != nil {
                    return err
                } else {
                    if err := ioutil.WriteFile(savaPath+common.KisIdTypeConnnector+"-"+cConf.CName+".yaml", cdata, 0644); err != nil {
                        return err
                    }
                }
            }
        }
    }

    return nil
}

这里面需要补充一些接口,如下:

6.3.2 Flow新增接口

kis-flow/kis/flow.go

package kis

import (
    "context"
    "kis-flow/common"
    "kis-flow/config"
)

type Flow interface {
    // Run 调度Flow,依次调度Flow中的Function并且执行
    Run(ctx context.Context) error
    // Link 将Flow中的Function按照配置文件中的配置进行连接
    Link(fConf *config.KisFuncConfig, fParams config.FParam) error
    // CommitRow 提交Flow数据到即将执行的Function层
    CommitRow(row interface{}) error
    // Input 得到flow当前执行Function的输入源数据
    Input() common.KisRowArr
    // GetName 得到Flow的名称
    GetName() string
    // GetThisFunction 得到当前正在执行的Function
    GetThisFunction() Function
    // GetThisFuncConf 得到当前正在执行的Function的配置
    GetThisFuncConf() *config.KisFuncConfig
    // GetConnector 得到当前正在执行的Function的Connector
    GetConnector() (Connector, error)
    // GetConnConf 得到当前正在执行的Function的Connector的配置
    GetConnConf() (*config.KisConnConfig, error)
    
    // +++++++++++++++++++++++++++++++
    // GetConfig 得到当前Flow的配置
    GetConfig() *config.KisFlowConfig
    // GetFuncConfigByName 得到当前Flow的配置
    GetFuncConfigByName(funcName string) *config.KisFuncConfig
    // +++++++++++++++++++++++++++++++
}

flow新增的接口实现如下:

kis-flow/flow/kis_flow.go

func (flow *KisFlow) GetConfig() *config.KisFlowConfig {
    return flow.Conf
}

// GetFuncConfigByName 得到当前Flow的配置
func (flow *KisFlow) GetFuncConfigByName(funcName string) *config.KisFuncConfig {
    if f, ok := flow.Funcs[funcName]; ok {
        return f.GetConfig()
    } else {
        log.Logger().ErrorF("GetFuncConfigByName(): Function %s not found", funcName)
        return nil
    }
}

6.3.3 KisFlow中Funcs修复

这里面之前有个笔误。

kis-flow/flow/kis_flow.go

// KisFlow 用于贯穿整条流式计算的上下文环境
type KisFlow struct {
    // 基础信息
    Id   string                // Flow的分布式实例ID(用于KisFlow内部区分不同实例)
    Name string                // Flow的可读名称
    Conf *config.KisFlowConfig // Flow配置策略

    // Function列表
    Funcs          map[string]kis.Function // 当前flow拥有的全部管理的全部Function对象, key: FunctionName
    FlowHead       kis.Function            // 当前Flow所拥有的Function列表表头
    FlowTail       kis.Function            // 当前Flow所拥有的Function列表表尾
    flock          sync.RWMutex            // 管理链表插入读写的锁
    ThisFunction   kis.Function            // Flow当前正在执行的KisFunction对象
    ThisFunctionId string                  // 当前执行到的Function ID
    PrevFunctionId string                  // 当前执行到的Function 上一层FunctionID

    // Function列表参数
    funcParams map[string]config.FParam // flow在当前Function的自定义固定配置参数,Key:function的实例KisID, value:FParam
    fplock     sync.RWMutex             // 管理funcParams的读写锁

    // 数据
    buffer common.KisRowArr  // 用来临时存放输入字节数据的内部Buf, 一条数据为interface{}, 多条数据为[]interface{} 也就是KisBatch
    data   common.KisDataMap // 流式计算各个层级的数据源
    inPut  common.KisRowArr  // 当前Function的计算输入数据
}

这里的Funcs成员,其key的含义,之前我们定义的是KisID,现在要修正为key的含义是FunctionName。

下面想Funcs成员赋值的代码做一个简单的修正

// appendFunc 将Function添加到Flow中, 链表操作
func (flow *KisFlow) appendFunc(function kis.Function, fParam config.FParam) error {
    // ... ... 


    //将Function Name 详细Hash对应关系添加到flow对象中
    flow.Funcs[function.GetConfig().FName] = function

    // ... ... 
}

6.3.4 KisPool新增方法

kis-flow/kis/pool.go

// GetFlows 得到全部的Flow
func (pool *kisPool) GetFlows() []Flow {
    pool.flowLock.RLock() // 读锁
    defer pool.flowLock.RUnlock()

    var flows []Flow

    for _, flow := range pool.flowRouter {
        flows = append(flows, flow)
    }

    return flows
}

KisPool新增 获取全部Flow的方法,以支持导出模块使用。

6.4 配置导出单元测试

kis-flow/test/创建kis_config_export_test.go文件。

package test

import (
    "kis-flow/common"
    "kis-flow/file"
    "kis-flow/kis"
    "kis-flow/test/caas"
    "kis-flow/test/faas"
    "testing"
)

func TestConfigExportYmal(t *testing.T) {
    // 0. 注册Function 回调业务
    kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)
    kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler)
    kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)

    // 0. 注册ConnectorInit 和 Connector 回调业务
    kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)
    kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)

    // 1. 加载配置文件并构建Flow
    if err := file.ConfigImportYaml("/Users/gopath/src/kis-flow/test/load_conf/"); err != nil {
        panic(err)
    }

    // 2. 讲构建的内存KisFlow结构配置导出的文件当中
    flows := kis.Pool().GetFlows()
    for _, flow := range flows {
        if err := file.ConfigExportYaml(flow, "/Users/gopath/src/kis-flow/test/export_conf/"); err != nil {
            panic(err)
        }
    }
}

cd到kis-flow/test/下 执行:

go test -test.v -test.paniconexit0 -test.run  TestConfigExportYmal 

会在kis-flow/test/export_conf/下得到导出的配置。

├── export_conf
│   ├── conn-ConnName1.yaml
│   ├── flow-flowName1.yaml
│   ├── func-funcName1.yaml
│   ├── func-funcName2.yaml
│   └── func-funcName3.yaml

6.5 【V0.5】源代码

https://github.com/aceld/kis-flow/releases/tag/v0.5


作者:刘丹冰Aceld github: https://github.com/aceld
KisFlow开源项目地址:https://github.com/aceld/kis-flow


Golang框架实战-KisFlow流式计算框架专栏

Golang框架实战-KisFlow流式计算框架(1)-概述
Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)
Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)
Golang框架实战-KisFlow流式计算框架(4)-数据流
Golang框架实战-KisFlow流式计算框架(5)-Function调度
Golang框架实战-KisFlow流式计算框架(6)-Connector
Golang框架实战-KisFlow流式计算框架(7)-配置导入与导出

上一篇下一篇

猜你喜欢

热点阅读