GO与微服务

手撸golang GO与微服务 Saga模式之7

2021-03-17  本文已影响0人  老罗话编程

缘起

最近阅读<<Go微服务实战>> (刘金亮, 2021.1)
本系列笔记拟采用golang练习之

Saga模式

Saga由一系列的子事务“Ti”组成,
每个Ti都有对应的补偿“Ci”,
当Ti出现问题时Ci用于处理Ti执行带来的问题。

可以通过下面的两个公式理解Saga模式。
T = T1 T2 … Tn
T = TCT

Saga模式的核心理念是避免使用长期持有锁(如14.2.2节介绍的两阶段提交)的长事务,
而应该将事务切分为一组按序依次提交的短事务,
Saga模式满足ACD(原子性、一致性、持久性)特征。

摘自 <<Go微服务实战>> 刘金亮, 2021.1

目标

子目标(Day 7)

设计

ISaleOrderService.go

订单服务接口

package order

// ISaleOrderService to manage sale order creation and modification
type ISaleOrderService interface {
    // get order info
    Get(orderID string) *SaleOrder

    // create new order
    Create(it *SaleOrder) error

    // update order status
    Update(orderID string, oldStatusFlag int32, newStatusFlag int32) (error, *SaleOrder)
}

SaleOrder.go

销售订单实体

package order

type SaleOrder struct {
    OrderID string
    CustomerID string
    ProductID string
    Quantity int
    Price float64
    Amount float64
    CreateTime int64
    StatusFlag int32
}

const StatusNotDelivered int32 = 0
const StatusStockOutboundDone int32 = 1
const StatusStockOutboundFailed int32 = 2
const StatusMQServiceFailed int32 = 3

tSaleOrderService.go

模拟订单服务, 实现ISaleOrderService接口

package order

import (
    "bytes"
    "encoding/json"
    "errors"
    "io/ioutil"
    "learning/gooop/saga/mqs/logger"
    "learning/gooop/saga/mqs/models"
    "net/http"
    "sync"
    "sync/atomic"
    "time"
)

type tSaleOrderService struct {
    rwmutex *sync.RWMutex
    orders map[string]*SaleOrder
    bMQReady bool
    publishQueue chan *SaleOrder
}

func newSaleOrderService() ISaleOrderService {
    it := new(tSaleOrderService)
    it.init()
    return it
}

func (me *tSaleOrderService) init() {
    me.rwmutex = new(sync.RWMutex)
    me.orders = make(map[string]*SaleOrder)
    me.bMQReady = false
    me.publishQueue = make(chan *SaleOrder, gMQMaxQueuedMsg)

    go me.beginSubscribeMQ()
    go me.beginPublishMQ()
}

func (me *tSaleOrderService) beginSubscribeMQ() {
    expireDuration := int64(1 * time.Hour)
    subscribeDuration := 20 * time.Minute
    pauseDuration := 3*time.Second
    lastSubscribeTime := int64(0)

    for {
        now := time.Now().UnixNano()
        if now - lastSubscribeTime >= int64(subscribeDuration) {
            expireTime := now + expireDuration
            err := fnSubscribeMQ(expireTime)

            if err != nil {
                me.bMQReady = false
                logger.Logf("tSaleOrderService.beginSubscribeMQ, failed, err=%v", err)

            } else {
                lastSubscribeTime = now
                me.bMQReady = true
                logger.Logf("tSaleOrderService.beginSubscribeMQ, done")
            }
        }
        time.Sleep(pauseDuration)
    }
}

func fnSubscribeMQ(expireTime int64) error {
    msg := &models.SubscribeMsg{
        ClientID: gMQClientID,
        Topic: gMQSubscribeTopic,
        NotifyUrl: gMQServerURL + PathOfNotifyStockOutbound,
        ExpireTime: expireTime,
    }
    url := gMQServerURL + "/subscribe"
    return fnPost(msg, url)
}


func fnPost(msg interface{}, url string) error {
    body,_ := json.Marshal(msg)
    rsp, err := http.Post(url, "application/json;charset=utf-8", bytes.NewReader(body))
    if err != nil {
        return err
    }

    defer rsp.Body.Close()
    j, err := ioutil.ReadAll(rsp.Body)
    if err != nil {
        return err
    }
    ok := &models.OkMsg{}
    err = json.Unmarshal(j, ok)
    if err != nil {
        return err
    }

    if !ok.OK {
        return gMQReplyFalse
    }

    return nil
}


func (me *tSaleOrderService) beginPublishMQ() {
    for {
        select {
        case msg := <- me.publishQueue :
            me.publishMQ(msg)
            break
        }
    }
}


func (me *tSaleOrderService) Get(orderID string) *SaleOrder {
    me.rwmutex.RLock()
    defer me.rwmutex.RUnlock()

    it,ok := me.orders[orderID]
    if ok {
        return it
    } else {
        return nil
    }
}

func (me *tSaleOrderService) Create(it *SaleOrder) error {
    me.rwmutex.Lock()
    defer me.rwmutex.Unlock()

    if len(me.publishQueue) >= gMQMaxQueuedMsg {
        return gMQNotAvailableError
    }
    me.orders[it.OrderID] = it
    me.publishQueue <- it

    return nil
}


func (me *tSaleOrderService) publishMQ(it *SaleOrder) {
    url := gMQServerURL + "/publish"

    j,_ := json.Marshal(it)
    msg := &models.TxMsg{
        GlobalID: it.OrderID,
        SubID: it.OrderID,
        SenderID: gMQClientID,
        Topic: gMQPublishTopic,
        CreateTime: it.CreateTime,
        Content: string(j),
    }

    for i := 0;i < gMQMaxPublishRetry;i++ {
        err := fnPost(msg, url)
        if err != nil {
            logger.Logf("tSaleOrderService.publishMQ, failed, err=%v, order=%v", err, it)
            time.Sleep(gMQPublishInterval)

        } else {
            logger.Logf("tSaleOrderService.publishMQ, done, order=%v", it)
            return
        }
    }

    // publish failed
    logger.Logf("tSaleOrderService.publishMQ, failed max retries, order=%v", it)
    _,_ = me.Update(it.OrderID, StatusNotDelivered, StatusMQServiceFailed)
}


func (me *tSaleOrderService) Update(orderID string, oldStatusFlag int32, newStatusFlag int32) (error, *SaleOrder) {
    me.rwmutex.RLock()
    defer me.rwmutex.RUnlock()

    it, ok := me.orders[orderID]
    if !ok {
        return gNotFoundError, nil
    }

    if !atomic.CompareAndSwapInt32(&it.StatusFlag, oldStatusFlag, newStatusFlag) {
        return gStatusChangedError, it
    }

    it.StatusFlag = newStatusFlag
    return nil, it
}

var gMQReplyFalse = errors.New("mq reply false")
var gMQNotAvailableError = errors.New("mq not ready")
var gNotFoundError = errors.New("order not found")
var gStatusChangedError = errors.New("status changed")
var gMQMaxPublishRetry = 3
var gMQPublishInterval = 1*time.Second
var gMQSubscribeTopic = "sale-order.stock.outbound"
var gMQPublishTopic = "sale-order.created"
var gMQClientID = "sale-order-service"
var gMQServerURL = "http://localhost:333"
var gMQMaxQueuedMsg = 1024

var SaleOrderService = newSaleOrderService()

NotifyStockOutbound.go

接收库存服务的扣库结果消息

package order

import (
    "encoding/json"
    "github.com/gin-gonic/gin"
    "io/ioutil"
    "learning/gooop/saga/mqs/logger"
    "learning/gooop/saga/mqs/models"
    "net/http"
)

func NotifyStockOutbound(c *gin.Context) {
    body := c.Request.Body
    defer body.Close()

    j, e := ioutil.ReadAll(body)
    if e != nil {
        logger.Logf("order.NotifyStockOutbound, failed ioutil.ReadAll")
        c.JSON(http.StatusBadRequest, gin.H { "ok": false, "error": e.Error()})
        return
    }

    msg := &models.TxMsg{}
    e = json.Unmarshal(j, msg)
    if e != nil {
        logger.Logf("order.NotifyStockOutbound, failed json.Unmarshal")
        c.JSON(http.StatusBadRequest, gin.H { "ok": false, "error": e.Error()})
        return
    }

    orderID := msg.GlobalID
    succeeded := msg.Content == "1"
    logger.Logf("order.NotifyStockOutbound, orderID=%v, succeeded=%s", orderID, succeeded)

    var newStatusFlag int32
    if succeeded {
        newStatusFlag = StatusStockOutboundDone
    } else {
        newStatusFlag = StatusStockOutboundFailed
    }

    err, order := SaleOrderService.Update(orderID, StatusNotDelivered, newStatusFlag)
    if err != nil {
        logger.Logf("order.NotifyStockOutbound, failed SaleOrderService.Update, err=%v, order=%v", err, order)
    }

    c.JSON(http.StatusOK, gin.H{ "ok": true })
}

var PathOfNotifyStockOutbound = "/notify/sale-order.stock.outbound"

(未完待续)

上一篇下一篇

猜你喜欢

热点阅读