KisFlow-Golang流式实时计算案例(二)-Flow并流
Golang框架实战-KisFlow流式计算框架专栏
Golang框架实战-KisFlow流式计算框架(1)-概述
Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)
Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)
Golang框架实战-KisFlow流式计算框架(4)-数据流
Golang框架实战-KisFlow流式计算框架(5)-Function调度
Golang框架实战-KisFlow流式计算框架(6)-Connector
Golang框架实战-KisFlow流式计算框架(7)-配置导入与导出
Golang框架实战-KisFlow流式计算框架(8)-KisFlow Action
Golang框架实战-KisFlow流式计算框架(9)-Cache/Params 数据缓存与数据参数
Golang框架实战-KisFlow流式计算框架(10)-Flow多副本
Golang框架实战-KisFlow流式计算框架(11)-Prometheus Metrics统计
Golang框架实战-KisFlow流式计算框架(12)-基于反射自适应注册FaaS形参类型
案例:
KisFlow-Golang流式计算案例(一)快速开始QuickStart
KisFlow-Golang流式计算案例(二)-Flow并流操作
KisFlow-Golang流式计算案例(二)-KisFlow在多协程中的应用
DownLoad kis-flow source
$go get github.com/aceld/kis-flow
《KisFlow开发者文档》
案例源代码:
https://github.com/aceld/kis-flow-usage/tree/main/8-connector
KisFlow也可以通过Connector
来实现两个Flow的拼接。
通过下面两个Flow
的拼接,来介绍有关Connector
的接口及使用方式。
数据流图
案例介绍
假设一个学生包括四个属性:
学生ID:stu_id
学分1:score_1
学分2: score_2
学分3: score_3
定义Flow1:CalStuAvgScore-1-2,计算一个学生的学分1(score_1)和学分2(score_2)的平均分(avg_score_1_2)。
定义Flow2:CalStuAvgScore-3,计算一个学生的学分3(score_3)、和avg_score_1_2的平均分,也就是学分1、学分2、学分3的平均分,其中学分1、学分2的平均由Flow1提供。
Flow1流
Flow1由4个Function组成,其中V(Function:VerifyStu)为校验StuId合法性,C(Function:AvgStuScore12)为计算学分1和学分2的平均分,S(Function:SaveScoreAvg12)为将avg_score_1_2存入Redis,E(Function: PrintStuAvgScore)为打印学分1和学分2平均分结果。
Flow2流
Flow2由4个Funciton组成,其中V(Function:VerifyStu)为校验StuId合法性,L(Function:LoadScoreAvg12)为读取Flow1的计算完的当前学生学分1和学分2的平均分avg_score_1_2,C(Function: AvgStuScore3)为通过学分3和学分1、2平均分再次计算三科平均分,E(Function:PrintStuAvgScore)为打印学分1、学分2、学分3的平均分。
conf/func/func-AvgStuScore-3.yml
kistype: func
fname: AvgStuScore3
fmode: Calculate
source:
name: SourceStuScore
must:
- stu_id
conf/func/func-LoadScoreAvg-1-2.yml
kistype: func
fname: LoadScoreAvg12
fmode: Load
source:
name: SourceStuScore
must:
- stu_id
option:
cname: Score12Cache
基础数据协议
stu_proto.go
package main
type StuScore1_2 struct {
StuId int `json:"stu_id"`
Score1 int `json:"score_1"`
Score2 int `json:"score_2"`
}
type StuScoreAvg struct {
StuId int `json:"stu_id"`
AvgScore float64 `json:"avg_score"`
}
type StuScore3 struct {
StuId int `json:"stu_id"`
AvgScore12 float64 `json:"avg_score_1_2"` // score_1, score_2 avg
Score3 int `json:"score_3"`
}
Connector Init
本项目中定义的Connector:Score12Cache,是一个关联Redis的链接资源,该Connector需要提供一个初始化办法,用来启动KisFlow时作初始化连接使用。
conn_init.go
package main
import (
"context"
"fmt"
"github.com/aceld/kis-flow/kis"
"github.com/aceld/kis-flow/log"
"github.com/go-redis/redis/v8"
)
// type ConnInit func(conn Connector) error
func InitScore12Cache(connector kis.Connector) error {
fmt.Println("===> Call Connector InitScore12Cache")
// init Redis Conn Client
rdb := redis.NewClient(&redis.Options{
Addr: connector.GetConfig().AddrString, // Redis-Server address
Password: "", // password
DB: 0, // select db
})
// Ping test
pong, err := rdb.Ping(context.Background()).Result()
if err != nil {
log.Logger().ErrorF("Failed to connect to Redis: %v", err)
return err
}
fmt.Println("Connected to Redis:", pong)
// set rdb to connector
connector.SetMetaData("rdb", rdb)
return nil
}
这里将Redis链接成功的实例存储在connector的缓存变量"rdb"中。
// set rdb to connector
connector.SetMetaData("rdb", rdb)
FaaS实现
Function(V): VerifyStu
faas_stu_verify.go
package main
import (
"context"
"fmt"
"github.com/aceld/kis-flow/kis"
"github.com/aceld/kis-flow/serialize"
)
type VerifyStuIn struct {
serialize.DefaultSerialize
StuId int `json:"stu_id"`
}
func VerifyStu(ctx context.Context, flow kis.Flow, rows []*VerifyStuIn) error {
fmt.Printf("->Call Func VerifyStu\n")
for _, stu := range rows {
// 过滤掉不合法的数据
if stu.StuId < 0 || stu.StuId > 999 {
// 终止当前Flow流程,不会再继续执行当前Flow的后续Function
return flow.Next(kis.ActionAbort)
}
}
return flow.Next(kis.ActionDataReuse)
}
VerifyStu()
用来对数据进行校验,如果不满足要求,则中止该本条数据流,最后通过flow.Next(kis.ActionDataReuse)
数据重复使用传递给下一曾。
Function(C): AvgStuScore12
faas_save_score_avg_1_2.go
package main
import (
"context"
"fmt"
"github.com/aceld/kis-flow/kis"
"github.com/aceld/kis-flow/serialize"
)
type AvgStuScoreIn_1_2 struct {
serialize.DefaultSerialize
StuScore1_2
}
type AvgStuScoreOut_1_2 struct {
serialize.DefaultSerialize
StuScoreAvg
}
func AvgStuScore12(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn_1_2) error {
fmt.Printf("->Call Func AvgStuScore12\n")
for _, row := range rows {
out := AvgStuScoreOut_1_2{
StuScoreAvg: StuScoreAvg{
StuId: row.StuId,
AvgScore: float64(row.Score1+row.Score2) / 2,
},
}
// 提交结果数据
_ = flow.CommitRow(out)
}
return flow.Next()
}
AvgStuScore12()
为计算score_1和score_2的平均分,得到计算结果avg_score
。
Function(S): SaveScoreAvg12
faas_save_score_avg_1_2.go
package main
import (
"context"
"fmt"
"github.com/aceld/kis-flow/kis"
"github.com/aceld/kis-flow/serialize"
"github.com/go-redis/redis/v8"
"strconv"
)
type SaveStuScoreIn struct {
serialize.DefaultSerialize
StuScoreAvg
}
func BatchSetStuScores(ctx context.Context, conn kis.Connector, rows []*SaveStuScoreIn) error {
var rdb *redis.Client
// Get Redis Client
rdb = conn.GetMetaData("rdb").(*redis.Client)
// Set data to redis
pipe := rdb.Pipeline()
for _, score := range rows {
// make key
key := conn.GetConfig().Key + strconv.Itoa(score.StuId)
pipe.HMSet(context.Background(), key, map[string]interface{}{
"avg_score": score.AvgScore,
})
}
_, err := pipe.Exec(ctx)
if err != nil {
return err
}
return nil
}
func SaveScoreAvg12(ctx context.Context, flow kis.Flow, rows []*SaveStuScoreIn) error {
fmt.Printf("->Call Func SaveScoreScore12\n")
conn, err := flow.GetConnector()
if err != nil {
fmt.Printf("SaveScoreScore12(): GetConnector err = %s\n", err.Error())
return err
}
if BatchSetStuScores(ctx, conn, rows) != nil {
fmt.Printf("SaveScoreScore12(): BatchSetStuScores err = %s\n", err.Error())
return err
}
return flow.Next(kis.ActionDataReuse)
}
SaveScoreAvg12()
将数据通过绑定的Connector,存入Redis中,使用的Key为Connector配置好的Key。最后将源数据透传给下一次Function。
Function(E): PrintStuAvgScore
faas_stu_score_avg_print.go
package main
import (
"context"
"fmt"
"github.com/aceld/kis-flow/kis"
"github.com/aceld/kis-flow/serialize"
)
type PrintStuAvgScoreIn struct {
serialize.DefaultSerialize
StuId int `json:"stu_id"`
AvgScore float64 `json:"avg_score"`
}
func PrintStuAvgScore(ctx context.Context, flow kis.Flow, rows []*PrintStuAvgScoreIn) error {
fmt.Printf("->Call Func PrintStuAvgScore, in Flow[%s]\n", flow.GetName())
for _, row := range rows {
fmt.Printf("stuid: [%+v], avg score: [%+v]\n", row.StuId, row.AvgScore)
}
return flow.Next()
}
PrintStuAvgScore()
为打印当前学生的平均分值。
Function(L): LoadScoreAvg12
faas_load_score_avg_1_2.go
package main
import (
"context"
"fmt"
"github.com/aceld/kis-flow/kis"
"github.com/aceld/kis-flow/serialize"
"github.com/go-redis/redis/v8"
"strconv"
)
type LoadStuScoreIn struct {
serialize.DefaultSerialize
StuScore3
}
type LoadStuScoreOut struct {
serialize.DefaultSerialize
StuScore3
}
func GetStuScoresByStuId(ctx context.Context, conn kis.Connector, stuId int) (float64, error) {
var rdb *redis.Client
// Get Redis Client
rdb = conn.GetMetaData("rdb").(*redis.Client)
// make key
key := conn.GetConfig().Key + strconv.Itoa(stuId)
// get data from redis
result, err := rdb.HGetAll(ctx, key).Result()
if err != nil {
return 0, err
}
// get value
avgScoreStr, ok := result["avg_score"]
if !ok {
return 0, fmt.Errorf("avg_score not found for stuId: %d", stuId)
}
// parse to float64
avgScore, err := strconv.ParseFloat(avgScoreStr, 64)
if err != nil {
return 0, err
}
return avgScore, nil
}
func LoadScoreAvg12(ctx context.Context, flow kis.Flow, rows []*LoadStuScoreIn) error {
fmt.Printf("->Call Func LoadScoreAvg12\n")
conn, err := flow.GetConnector()
if err != nil {
fmt.Printf("LoadScoreAvg12(): GetConnector err = %s\n", err.Error())
return err
}
for _, row := range rows {
stuScoreAvg1_2, err := GetStuScoresByStuId(ctx, conn, row.StuId)
if err != nil {
fmt.Printf("LoadScoreAvg12(): GetStuScoresByStuId err = %s\n", err.Error())
return err
}
out := LoadStuScoreOut{
StuScore3: StuScore3{
StuId: row.StuId,
Score3: row.Score3,
AvgScore12: stuScoreAvg1_2, // avg score of score1 and score2 (load from redis)
},
}
// commit result
_ = flow.CommitRow(out)
}
return flow.Next()
}
LoadScoreAvg12()
为从绑定的Connector中链接资源Redis,通过配置中的Key,读取score_1和score_2的平均分值,然后将上游的源数据加上新读取的score1和score2的平均分发送给下层。
Function(C): AvgStuScore3
faas_stu_score_avg_3.go
package main
import (
"context"
"fmt"
"github.com/aceld/kis-flow/kis"
"github.com/aceld/kis-flow/serialize"
)
type AvgStuScore3In struct {
serialize.DefaultSerialize
StuScore3
}
type AvgStuScore3Out struct {
serialize.DefaultSerialize
StuScoreAvg
}
func AvgStuScore3(ctx context.Context, flow kis.Flow, rows []*AvgStuScore3In) error {
fmt.Printf("->Call Func AvgStuScore12\n")
for _, row := range rows {
out := AvgStuScore3Out{
StuScoreAvg: StuScoreAvg{
StuId: row.StuId,
AvgScore: (float64(row.Score3) + row.AvgScore12*2) / 3,
},
}
// 提交结果数据
_ = flow.CommitRow(out)
}
return flow.Next()
}
AvgStuScore3()
将score_3和读取的score_1和score_2的平均分值,再从新计算三个学分的平均分值,得到最终的平均分值avg_score
。
Register FaaS & CaaSInit/CaaS(注册Function/Connector)
main.go
func init() {
// Register functions
kis.Pool().FaaS("VerifyStu", VerifyStu)
kis.Pool().FaaS("AvgStuScore12", AvgStuScore12)
kis.Pool().FaaS("SaveScoreAvg12", SaveScoreAvg12)
kis.Pool().FaaS("PrintStuAvgScore", PrintStuAvgScore)
kis.Pool().FaaS("LoadScoreAvg12", LoadScoreAvg12)
kis.Pool().FaaS("AvgStuScore3", AvgStuScore3)
// Register connectors
kis.Pool().CaaSInit("Score12Cache", InitScore12Cache)
}
主流程
main.go
package main
import (
"context"
"github.com/aceld/kis-flow/file"
"github.com/aceld/kis-flow/kis"
"sync"
)
func RunFlowCalStuAvgScore12(ctx context.Context, flow kis.Flow) error {
// Submit a string
_ = flow.CommitRow(`{"stu_id":101, "score_1":100, "score_2":90}`)
_ = flow.CommitRow(`{"stu_id":102, "score_1":100, "score_2":80}`)
// Run the flow
if err := flow.Run(ctx); err != nil {
return err
}
return nil
}
func RunFlowCalStuAvgScore3(ctx context.Context, flow kis.Flow) error {
// Submit a string
_ = flow.CommitRow(`{"stu_id":101, "score_3": 80}`)
_ = flow.CommitRow(`{"stu_id":102, "score_3": 70}`)
// Run the flow
if err := flow.Run(ctx); err != nil {
return err
}
return nil
}
func main() {
ctx := context.Background()
// Load Configuration from file
if err := file.ConfigImportYaml("conf/"); err != nil {
panic(err)
}
var wg sync.WaitGroup
wg.Add(2)
go func() {
// run flow1
defer wg.Done()
// Get the flow
flow1 := kis.Pool().GetFlow("CalStuAvgScore12")
if flow1 == nil {
panic("flow1 is nil")
}
if err := RunFlowCalStuAvgScore12(ctx, flow1); err != nil {
panic(err)
}
}()
go func() {
defer wg.Done()
// Get the flow
flow2 := kis.Pool().GetFlow("CalStuAvgScore3")
if flow2 == nil {
panic("flow2 is nil")
}
// run flow2
if err := RunFlowCalStuAvgScore3(ctx, flow2); err != nil {
panic(err)
}
}()
wg.Wait()
return
}
开启两个Goroutine分别执行Flow1和Flow2,来计算学生101和学生102的最终平均分。
运行结果
===> Call Connector InitScore12Cache
Connected to Redis: PONG
Add FlowRouter FlowName=CalStuAvgScore12
===> Call Connector InitScore12Cache
Connected to Redis: PONG
Add FlowRouter FlowName=CalStuAvgScore3
->Call Func VerifyStu
->Call Func VerifyStu
->Call Func AvgStuScore12
->Call Func LoadScoreAvg12
->Call Func SaveScoreScore12
->Call Func PrintStuAvgScore, in Flow[CalStuAvgScore12]
stuid: [101], avg score: [95]
stuid: [102], avg score: [90]
->Call Func AvgStuScore12
->Call Func PrintStuAvgScore, in Flow[CalStuAvgScore3]
stuid: [101], avg score: [90]
stuid: [102], avg score: [83.33333333333333]
我们看到在Flow[CalStuAvgScore3]
最终算出了学分1、学分2、学分3的平均分数值。
作者:刘丹冰Aceld github: https://github.com/aceld
KisFlow开源项目地址:https://github.com/aceld/kis-flow
Golang框架实战-KisFlow流式计算框架专栏
Golang框架实战-KisFlow流式计算框架(1)-概述
Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)
Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)
Golang框架实战-KisFlow流式计算框架(4)-数据流
Golang框架实战-KisFlow流式计算框架(5)-Function调度
Golang框架实战-KisFlow流式计算框架(6)-Connector
Golang框架实战-KisFlow流式计算框架(7)-配置导入与导出
Golang框架实战-KisFlow流式计算框架(8)-KisFlow Action
Golang框架实战-KisFlow流式计算框架(9)-Cache/Params 数据缓存与数据参数
Golang框架实战-KisFlow流式计算框架(10)-Flow多副本
Golang框架实战-KisFlow流式计算框架(11)-Prometheus Metrics统计
Golang框架实战-KisFlow流式计算框架(12)-基于反射自适应注册FaaS形参类型
案例:
KisFlow-Golang流式计算案例(一)快速开始QuickStart
KisFlow-Golang流式计算案例(二)-Flow并流操作
KisFlow-Golang流式计算案例(二)-KisFlow在多协程中的应用