go-zero serviceGroup + executor
2022-11-06 本文已影响0人
大黄蜂阿天
需求
把订阅的mqtt数据持久化道mysql
为什么
serviceGroup用来管理同一个进程内的多个常驻进程的服务
executor 用来批量提交,避免来一条数据插入一行数据
结构目录
image.png具体代码
# serviceGroup 部分
/**
@author: Administrator
@since: 2022/9/21
@desc: 进程入口文件
**/
package main
import (
"flag"
"github.com/zeromicro/go-zero/core/conf"
"github.com/zeromicro/go-zero/core/service"
"iot/app/services/internal/config"
"iot/app/services/internal/logic"
)
var configFile = flag.String("f", "etc/services.yaml", "Specify the config file")
func main() {
flag.Parse()
var c config.Config
conf.MustLoad(*configFile, &c)
// log、prometheus、trace、metricsUrl.
if err := c.SetUp(); err != nil {
panic(err)
}
serviceGroup := service.NewServiceGroup()
defer serviceGroup.Stop()
for _, s := range logic.ServiceList(c) {
serviceGroup.Add(s)
}
serviceGroup.Start()
}
/**
@author: Administrator
@since: 2022/9/22
@desc: // 服务配置
**/
package logic
import (
"context"
"github.com/zeromicro/go-zero/core/service"
"iot/app/services/internal/config"
"iot/app/services/internal/logic/nodeLog"
"iot/app/services/internal/svc"
)
/*
ServiceList
@Description: 吐出来所有要加入serviceGroup的 service
@param c
@return []service.Service
*/
func ServiceList(c config.Config) []service.Service {
svcContext := svc.NewServiceContext(c)
ctx := context.Background()
var services = []service.Service{
nodeLog.NewBatchStoreIntoMysql(ctx, svcContext),
}
return services
}
/**
@author: Administrator
@since: 2022/9/22
@desc: 把从mqtt来的消息 批量存储到mysql,这个地方需要注意的是,如果部署多机的话,这样会让日志存在重复数据
**/
package nodeLog
import (
"context"
"fmt"
"os"
"os/signal"
"sync"
"syscall"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/zeromicro/go-zero/core/executors"
"github.com/zeromicro/go-zero/core/threading"
"iot/app/services/internal/svc"
)
var lock sync.Mutex
var payload = make(chan mqtt.Message)
type BatchStoreIntoMysql struct {
ctx context.Context
svcCtx *svc.ServiceContext
mqttClient mqtt.Client
quit chan os.Signal
insertExecutor *executors.BulkExecutor
}
func NewBatchStoreIntoMysql(ctx context.Context, svcCtx *svc.ServiceContext) *BatchStoreIntoMysql {
opts := svcCtx.MqttClientOption
opts.OnConnect = connectHandler
opts.OnConnectionLost = connectLostHandler
executor := executors.NewBulkExecutor(func(items []interface{}) {
lock.Lock()
// 其实这里感觉不用锁吧,因为这里是单进程,无竞争状态
svcCtx.NodeLogModel.BatchInsertByMqttPayload(ctx, items)
lock.Unlock()
}, executors.WithBulkInterval(time.Second*30), // 30s会自动刷一次container中task去执行
executors.WithBulkTasks(1024))
return &BatchStoreIntoMysql{
ctx: ctx,
svcCtx: svcCtx,
mqttClient: mqtt.NewClient(opts),
quit: make(chan os.Signal, 1),
insertExecutor: executor,
}
}
func (l *BatchStoreIntoMysql) Start() {
signal.Notify(l.quit, os.Interrupt, syscall.SIGTERM)
if token := l.mqttClient.Connect(); token.Wait() && token.Error() != nil {
// 这个地方应该优雅的处理,因为panic会影响其他进程 让主进程退出
panic(token.Error())
}
threading.GoSafe(func() {
for msg := range payload {
l.insertExecutor.Add(msg)
}
})
<-l.quit
}
func (l *BatchStoreIntoMysql) Stop() {
close(l.quit)
close(payload)
l.mqttClient.Disconnect(250)
fmt.Println("BatchStoreIntoMysql stop")
}
var subscribeHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("TOPIC: %s\n", msg.Topic())
fmt.Printf("MSG: %s\n", msg.Payload())
// 装在到管道
payload <- msg
}
var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
opt := client.OptionsReader()
// 记录当前客户端的连接状态
clientId := opt.ClientID()
fmt.Printf("%s Connected\n", clientId)
if token := client.Subscribe("xz/uplink", 0, subscribeHandler); token.Wait() && token.Error() != nil {
// 这个地方应该优雅的处理,因为panic会影响其他进程 让主进程退出
panic(token.Error())
}
}
var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
opt := client.OptionsReader()
clientId := opt.ClientID()
fmt.Printf("%s Connect lost: %v\n", clientId, err)
}