程序员@IT·互联网KisFlow-基于Golang的流式计算框架实战

KisFlow-Golang流式实时计算案例(二)-Flow并流

2024-04-22  本文已影响0人  刘丹冰Aceld

Golang框架实战-KisFlow流式计算框架专栏

Golang框架实战-KisFlow流式计算框架(1)-概述
Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)
Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)
Golang框架实战-KisFlow流式计算框架(4)-数据流
Golang框架实战-KisFlow流式计算框架(5)-Function调度
Golang框架实战-KisFlow流式计算框架(6)-Connector
Golang框架实战-KisFlow流式计算框架(7)-配置导入与导出
Golang框架实战-KisFlow流式计算框架(8)-KisFlow Action
Golang框架实战-KisFlow流式计算框架(9)-Cache/Params 数据缓存与数据参数
Golang框架实战-KisFlow流式计算框架(10)-Flow多副本
Golang框架实战-KisFlow流式计算框架(11)-Prometheus Metrics统计
Golang框架实战-KisFlow流式计算框架(12)-基于反射自适应注册FaaS形参类型

案例:
KisFlow-Golang流式计算案例(一)快速开始QuickStart
KisFlow-Golang流式计算案例(二)-Flow并流操作
KisFlow-Golang流式计算案例(二)-KisFlow在多协程中的应用


DownLoad kis-flow source

$go get github.com/aceld/kis-flow

《KisFlow开发者文档》

案例源代码:
https://github.com/aceld/kis-flow-usage/tree/main/8-connector

KisFlow也可以通过Connector来实现两个Flow的拼接。
通过下面两个Flow的拼接,来介绍有关Connector的接口及使用方式。

数据流图

案例介绍

假设一个学生包括四个属性:

学生ID:stu_id
学分1:score_1
学分2: score_2
学分3: score_3

定义Flow1:CalStuAvgScore-1-2,计算一个学生的学分1(score_1)和学分2(score_2)的平均分(avg_score_1_2)。
定义Flow2:CalStuAvgScore-3,计算一个学生的学分3(score_3)、和avg_score_1_2的平均分,也就是学分1、学分2、学分3的平均分,其中学分1、学分2的平均由Flow1提供。

Flow1流

Flow1由4个Function组成,其中V(Function:VerifyStu)为校验StuId合法性,C(Function:AvgStuScore12)为计算学分1和学分2的平均分,S(Function:SaveScoreAvg12)为将avg_score_1_2存入Redis,E(Function: PrintStuAvgScore)为打印学分1和学分2平均分结果。

Flow2流

Flow2由4个Funciton组成,其中V(Function:VerifyStu)为校验StuId合法性,L(Function:LoadScoreAvg12)为读取Flow1的计算完的当前学生学分1和学分2的平均分avg_score_1_2,C(Function: AvgStuScore3)为通过学分3和学分1、2平均分再次计算三科平均分,E(Function:PrintStuAvgScore)为打印学分1、学分2、学分3的平均分。

conf/func/func-AvgStuScore-3.yml

kistype: func
fname: AvgStuScore3
fmode: Calculate
source:
    name: SourceStuScore
    must:
        - stu_id

conf/func/func-LoadScoreAvg-1-2.yml

kistype: func
fname: LoadScoreAvg12
fmode: Load
source:
    name: SourceStuScore
    must:
        - stu_id
option:
    cname: Score12Cache

基础数据协议

stu_proto.go

package main

type StuScore1_2 struct {
    StuId  int `json:"stu_id"`
    Score1 int `json:"score_1"`
    Score2 int `json:"score_2"`
}

type StuScoreAvg struct {
    StuId    int     `json:"stu_id"`
    AvgScore float64 `json:"avg_score"`
}

type StuScore3 struct {
    StuId      int     `json:"stu_id"`
    AvgScore12 float64 `json:"avg_score_1_2"` // score_1, score_2 avg
    Score3     int     `json:"score_3"`
}

Connector Init

本项目中定义的Connector:Score12Cache,是一个关联Redis的链接资源,该Connector需要提供一个初始化办法,用来启动KisFlow时作初始化连接使用。

conn_init.go

package main

import (
    "context"
    "fmt"
    "github.com/aceld/kis-flow/kis"
    "github.com/aceld/kis-flow/log"
    "github.com/go-redis/redis/v8"
)

// type ConnInit func(conn Connector) error

func InitScore12Cache(connector kis.Connector) error {
    fmt.Println("===> Call Connector InitScore12Cache")

    // init Redis Conn Client
    rdb := redis.NewClient(&redis.Options{
        Addr:     connector.GetConfig().AddrString, // Redis-Server address
        Password: "",                               // password
        DB:       0,                                // select db
    })

    // Ping test
    pong, err := rdb.Ping(context.Background()).Result()
    if err != nil {
        log.Logger().ErrorF("Failed to connect to Redis: %v", err)
        return err
    }
    fmt.Println("Connected to Redis:", pong)

    // set rdb to connector
    connector.SetMetaData("rdb", rdb)

    return nil
}

这里将Redis链接成功的实例存储在connector的缓存变量"rdb"中。

    // set rdb to connector
    connector.SetMetaData("rdb", rdb)

FaaS实现

Function(V): VerifyStu

faas_stu_verify.go

package main

import (
    "context"
    "fmt"
    "github.com/aceld/kis-flow/kis"
    "github.com/aceld/kis-flow/serialize"
)

type VerifyStuIn struct {
    serialize.DefaultSerialize
    StuId int `json:"stu_id"`
}

func VerifyStu(ctx context.Context, flow kis.Flow, rows []*VerifyStuIn) error {
    fmt.Printf("->Call Func VerifyStu\n")

    for _, stu := range rows {
        // 过滤掉不合法的数据
        if stu.StuId < 0 || stu.StuId > 999 {
            // 终止当前Flow流程,不会再继续执行当前Flow的后续Function
            return flow.Next(kis.ActionAbort)
        }
    }

    return flow.Next(kis.ActionDataReuse)
}

VerifyStu()用来对数据进行校验,如果不满足要求,则中止该本条数据流,最后通过flow.Next(kis.ActionDataReuse)数据重复使用传递给下一曾。

Function(C): AvgStuScore12

faas_save_score_avg_1_2.go

package main

import (
    "context"
    "fmt"
    "github.com/aceld/kis-flow/kis"
    "github.com/aceld/kis-flow/serialize"
)

type AvgStuScoreIn_1_2 struct {
    serialize.DefaultSerialize
    StuScore1_2
}

type AvgStuScoreOut_1_2 struct {
    serialize.DefaultSerialize
    StuScoreAvg
}

func AvgStuScore12(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn_1_2) error {
    fmt.Printf("->Call Func AvgStuScore12\n")

    for _, row := range rows {

        out := AvgStuScoreOut_1_2{
            StuScoreAvg: StuScoreAvg{
                StuId:    row.StuId,
                AvgScore: float64(row.Score1+row.Score2) / 2,
            },
        }

        // 提交结果数据
        _ = flow.CommitRow(out)
    }

    return flow.Next()
}

AvgStuScore12()为计算score_1和score_2的平均分,得到计算结果avg_score

Function(S): SaveScoreAvg12

faas_save_score_avg_1_2.go

package main

import (
    "context"
    "fmt"
    "github.com/aceld/kis-flow/kis"
    "github.com/aceld/kis-flow/serialize"
    "github.com/go-redis/redis/v8"
    "strconv"
)

type SaveStuScoreIn struct {
    serialize.DefaultSerialize
    StuScoreAvg
}

func BatchSetStuScores(ctx context.Context, conn kis.Connector, rows []*SaveStuScoreIn) error {

    var rdb *redis.Client

    // Get Redis Client
    rdb = conn.GetMetaData("rdb").(*redis.Client)

    // Set data to redis
    pipe := rdb.Pipeline()

    for _, score := range rows {
        // make key
        key := conn.GetConfig().Key + strconv.Itoa(score.StuId)

        pipe.HMSet(context.Background(), key, map[string]interface{}{
            "avg_score": score.AvgScore,
        })
    }

    _, err := pipe.Exec(ctx)
    if err != nil {
        return err
    }

    return nil
}

func SaveScoreAvg12(ctx context.Context, flow kis.Flow, rows []*SaveStuScoreIn) error {
    fmt.Printf("->Call Func SaveScoreScore12\n")

    conn, err := flow.GetConnector()
    if err != nil {
        fmt.Printf("SaveScoreScore12(): GetConnector err = %s\n", err.Error())
        return err
    }

    if BatchSetStuScores(ctx, conn, rows) != nil {
        fmt.Printf("SaveScoreScore12(): BatchSetStuScores err = %s\n", err.Error())
        return err
    }

    return flow.Next(kis.ActionDataReuse)
}

SaveScoreAvg12()将数据通过绑定的Connector,存入Redis中,使用的Key为Connector配置好的Key。最后将源数据透传给下一次Function。

Function(E): PrintStuAvgScore

faas_stu_score_avg_print.go

package main

import (
    "context"
    "fmt"
    "github.com/aceld/kis-flow/kis"
    "github.com/aceld/kis-flow/serialize"
)

type PrintStuAvgScoreIn struct {
    serialize.DefaultSerialize
    StuId    int     `json:"stu_id"`
    AvgScore float64 `json:"avg_score"`
}

func PrintStuAvgScore(ctx context.Context, flow kis.Flow, rows []*PrintStuAvgScoreIn) error {
    fmt.Printf("->Call Func PrintStuAvgScore, in Flow[%s]\n", flow.GetName())

    for _, row := range rows {
        fmt.Printf("stuid: [%+v], avg score: [%+v]\n", row.StuId, row.AvgScore)
    }

    return flow.Next()
}

PrintStuAvgScore()为打印当前学生的平均分值。

Function(L): LoadScoreAvg12

faas_load_score_avg_1_2.go

package main

import (
    "context"
    "fmt"
    "github.com/aceld/kis-flow/kis"
    "github.com/aceld/kis-flow/serialize"
    "github.com/go-redis/redis/v8"
    "strconv"
)

type LoadStuScoreIn struct {
    serialize.DefaultSerialize
    StuScore3
}

type LoadStuScoreOut struct {
    serialize.DefaultSerialize
    StuScore3
}

func GetStuScoresByStuId(ctx context.Context, conn kis.Connector, stuId int) (float64, error) {

    var rdb *redis.Client

    // Get Redis Client
    rdb = conn.GetMetaData("rdb").(*redis.Client)

    // make key
    key := conn.GetConfig().Key + strconv.Itoa(stuId)

    // get data from redis
    result, err := rdb.HGetAll(ctx, key).Result()
    if err != nil {
        return 0, err
    }

    // get value
    avgScoreStr, ok := result["avg_score"]
    if !ok {
        return 0, fmt.Errorf("avg_score not found for stuId: %d", stuId)
    }

    // parse to float64
    avgScore, err := strconv.ParseFloat(avgScoreStr, 64)
    if err != nil {
        return 0, err
    }

    return avgScore, nil
}

func LoadScoreAvg12(ctx context.Context, flow kis.Flow, rows []*LoadStuScoreIn) error {
    fmt.Printf("->Call Func LoadScoreAvg12\n")

    conn, err := flow.GetConnector()
    if err != nil {
        fmt.Printf("LoadScoreAvg12(): GetConnector err = %s\n", err.Error())
        return err
    }

    for _, row := range rows {
        stuScoreAvg1_2, err := GetStuScoresByStuId(ctx, conn, row.StuId)
        if err != nil {
            fmt.Printf("LoadScoreAvg12(): GetStuScoresByStuId err = %s\n", err.Error())
            return err
        }

        out := LoadStuScoreOut{
            StuScore3: StuScore3{
                StuId:      row.StuId,
                Score3:     row.Score3,
                AvgScore12: stuScoreAvg1_2, // avg score of score1 and score2 (load from redis)
            },
        }

        // commit result
        _ = flow.CommitRow(out)
    }

    return flow.Next()
}

LoadScoreAvg12()为从绑定的Connector中链接资源Redis,通过配置中的Key,读取score_1和score_2的平均分值,然后将上游的源数据加上新读取的score1和score2的平均分发送给下层。

Function(C): AvgStuScore3

faas_stu_score_avg_3.go

package main

import (
    "context"
    "fmt"
    "github.com/aceld/kis-flow/kis"
    "github.com/aceld/kis-flow/serialize"
)

type AvgStuScore3In struct {
    serialize.DefaultSerialize
    StuScore3
}

type AvgStuScore3Out struct {
    serialize.DefaultSerialize
    StuScoreAvg
}

func AvgStuScore3(ctx context.Context, flow kis.Flow, rows []*AvgStuScore3In) error {
    fmt.Printf("->Call Func AvgStuScore12\n")

    for _, row := range rows {

        out := AvgStuScore3Out{
            StuScoreAvg: StuScoreAvg{
                StuId:    row.StuId,
                AvgScore: (float64(row.Score3) + row.AvgScore12*2) / 3,
            },
        }

        // 提交结果数据
        _ = flow.CommitRow(out)
    }

    return flow.Next()
}

AvgStuScore3()将score_3和读取的score_1和score_2的平均分值,再从新计算三个学分的平均分值,得到最终的平均分值avg_score

Register FaaS & CaaSInit/CaaS(注册Function/Connector)

main.go

func init() {
    // Register functions
    kis.Pool().FaaS("VerifyStu", VerifyStu)
    kis.Pool().FaaS("AvgStuScore12", AvgStuScore12)
    kis.Pool().FaaS("SaveScoreAvg12", SaveScoreAvg12)
    kis.Pool().FaaS("PrintStuAvgScore", PrintStuAvgScore)
    kis.Pool().FaaS("LoadScoreAvg12", LoadScoreAvg12)
    kis.Pool().FaaS("AvgStuScore3", AvgStuScore3)

    // Register connectors
    kis.Pool().CaaSInit("Score12Cache", InitScore12Cache)
}

主流程

main.go

package main

import (
    "context"
    "github.com/aceld/kis-flow/file"
    "github.com/aceld/kis-flow/kis"
    "sync"
)

func RunFlowCalStuAvgScore12(ctx context.Context, flow kis.Flow) error {

    // Submit a string
    _ = flow.CommitRow(`{"stu_id":101, "score_1":100, "score_2":90}`)
    _ = flow.CommitRow(`{"stu_id":102, "score_1":100, "score_2":80}`)

    // Run the flow
    if err := flow.Run(ctx); err != nil {
        return err
    }

    return nil
}

func RunFlowCalStuAvgScore3(ctx context.Context, flow kis.Flow) error {

    // Submit a string
    _ = flow.CommitRow(`{"stu_id":101, "score_3": 80}`)
    _ = flow.CommitRow(`{"stu_id":102, "score_3": 70}`)

    // Run the flow
    if err := flow.Run(ctx); err != nil {
        return err
    }

    return nil
}

func main() {
    ctx := context.Background()

    // Load Configuration from file
    if err := file.ConfigImportYaml("conf/"); err != nil {
        panic(err)
    }

    var wg sync.WaitGroup
    wg.Add(2)

    go func() {
        // run flow1
        defer wg.Done()
        // Get the flow
        flow1 := kis.Pool().GetFlow("CalStuAvgScore12")
        if flow1 == nil {
            panic("flow1 is nil")
        }

        if err := RunFlowCalStuAvgScore12(ctx, flow1); err != nil {
            panic(err)
        }
    }()

    go func() {
        defer wg.Done()
        // Get the flow
        flow2 := kis.Pool().GetFlow("CalStuAvgScore3")
        if flow2 == nil {
            panic("flow2 is nil")
        }

        // run flow2
        if err := RunFlowCalStuAvgScore3(ctx, flow2); err != nil {
            panic(err)
        }
    }()

    wg.Wait()

    return
}

开启两个Goroutine分别执行Flow1和Flow2,来计算学生101和学生102的最终平均分。

运行结果

===> Call Connector InitScore12Cache
Connected to Redis: PONG
Add FlowRouter FlowName=CalStuAvgScore12
===> Call Connector InitScore12Cache
Connected to Redis: PONG
Add FlowRouter FlowName=CalStuAvgScore3
->Call Func VerifyStu
->Call Func VerifyStu
->Call Func AvgStuScore12
->Call Func LoadScoreAvg12
->Call Func SaveScoreScore12
->Call Func PrintStuAvgScore, in Flow[CalStuAvgScore12]
stuid: [101], avg score: [95]
stuid: [102], avg score: [90]
->Call Func AvgStuScore12
->Call Func PrintStuAvgScore, in Flow[CalStuAvgScore3]
stuid: [101], avg score: [90]
stuid: [102], avg score: [83.33333333333333]

我们看到在Flow[CalStuAvgScore3]最终算出了学分1、学分2、学分3的平均分数值。


作者:刘丹冰Aceld github: https://github.com/aceld
KisFlow开源项目地址:https://github.com/aceld/kis-flow

Golang框架实战-KisFlow流式计算框架专栏

Golang框架实战-KisFlow流式计算框架(1)-概述
Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)
Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)
Golang框架实战-KisFlow流式计算框架(4)-数据流
Golang框架实战-KisFlow流式计算框架(5)-Function调度
Golang框架实战-KisFlow流式计算框架(6)-Connector
Golang框架实战-KisFlow流式计算框架(7)-配置导入与导出
Golang框架实战-KisFlow流式计算框架(8)-KisFlow Action
Golang框架实战-KisFlow流式计算框架(9)-Cache/Params 数据缓存与数据参数
Golang框架实战-KisFlow流式计算框架(10)-Flow多副本
Golang框架实战-KisFlow流式计算框架(11)-Prometheus Metrics统计
Golang框架实战-KisFlow流式计算框架(12)-基于反射自适应注册FaaS形参类型

案例:
KisFlow-Golang流式计算案例(一)快速开始QuickStart
KisFlow-Golang流式计算案例(二)-Flow并流操作
KisFlow-Golang流式计算案例(二)-KisFlow在多协程中的应用

上一篇下一篇

猜你喜欢

热点阅读