GO与微服务

手撸golang GO与微服务 Saga模式之6

2021-03-16  本文已影响0人  老罗话编程

缘起

最近阅读<<Go微服务实战>> (刘金亮, 2021.1)
本系列笔记拟采用golang练习之

Saga模式

Saga由一系列的子事务“Ti”组成,
每个Ti都有对应的补偿“Ci”,
当Ti出现问题时Ci用于处理Ti执行带来的问题。

可以通过下面的两个公式理解Saga模式。
T = T1 T2 … Tn
T = TCT

Saga模式的核心理念是避免使用长期持有锁(如14.2.2节介绍的两阶段提交)的长事务,
而应该将事务切分为一组按序依次提交的短事务,
Saga模式满足ACD(原子性、一致性、持久性)特征。

摘自 <<Go微服务实战>> 刘金亮, 2021.1

目标

子目标(Day 6)

单元测试

mqs_test.go, 依次清空数据库, 测试订阅接口, 发布接口和通知接口, 并诊断过程日志和数据库记录变化.

package saga

import (
    "bytes"
    "encoding/json"
    "errors"
    "fmt"
    "github.com/jmoiron/sqlx"
    "io/ioutil"
    "learning/gooop/saga/mqs/cmd"
    "learning/gooop/saga/mqs/database"
    "learning/gooop/saga/mqs/logger"
    "learning/gooop/saga/mqs/models"
    "net/http"
    "sync"
    "testing"
    "time"
)


var gRunOnce sync.Once
func fnBootMQS() {
    gRunOnce.Do(func() {
        // boot mqs
        go cmd.BootMQS()

        // wait for mqs up
        time.Sleep(1 * time.Second)
    })
}

func fnAssertTrue (t *testing.T, b bool, msg string) {
    if !b {
        t.Fatal(msg)
    }
}

func Test_MQS(t *testing.T) {
    // prepare mqs
    fnClearDB(t)
    fnBootMQS()

    // subscribe
    fnTestSubscribe(t)
    t.Log("passed fnTestSubscribe")

    // publish and notify
    fnTestPublishAndNotify(t)
    t.Log("passed fnTestPublishAndNotify")
}


func fnTestPublishAndNotify(t *testing.T) {
    t.Log("testing fnTestPublishAndNotify")

    msg := &models.TxMsg{
        GlobalID: "test-global-id",
        SubID: "test-sub-id",
        SenderID: "test-sender-id",
        Topic: "test-topic",
        Content: "test-content",
    }
    fnPost(t, msg, "http://localhost:3333/publish")

    // check log
    fnAssertTrue(t, logger.Count("handlers.Publish, msg=test-global-id")==1, "expecting log: handlers.Publish, msg=test-global-id")

    // check notify
    time.Sleep(100 * time.Millisecond)
    fnAssertTrue(t, logger.Count("tLiveMsgSource.handleMsgPublished, clientID=test-client, msg=test-global-id")==1, "expecting log: tLiveMsgSource.handleMsgPublished")
    fnAssertTrue(t, logger.Count("tDeliveryWorker.afterDeliverySuccess, done, id=test-client, msg=test-global-id")==1, "expecting log: tDeliveryWorker.afterDeliverySuccess")
    fnAssertTrue(t, logger.Count("handlers.Notify, msg=")==1, "expecting log: handlers.Notify, msg=")

    // check success queue
    fnAssertTrue(t, fnDBCount(t, "select count(1) from success_queue where ClientID='test-client'")==1, "expectiang db.success_queue.count == 1")
}


func fnClearDB(t *testing.T) {
    fnDBExec(t, "delete from subscriber")
    fnDBExec(t, "delete from tx_msg")
    fnDBExec(t, "delete from delivery_queue")
    fnDBExec(t, "delete from success_queue")
}


func fnDBCount(t *testing.T, sql string, args... interface{}) int {
    sum := []int{ 0 }
    err := database.DB(func(db *sqlx.DB) error {
        r,e := db.Queryx(sql, args...)
        if e != nil {
            return e
        }

        defer r.Close()
        if !r.Next() {
            return errors.New("empty rows")
        }

        e = r.Scan(&sum[0])
        if e != nil {
            return e
        }

        return nil
    })

    if err != nil {
        t.Fatal(err)
    }

    return sum[0]
}


func fnDBExec(t *testing.T, sql string, args... interface{}) int {
    rows := []int64{ 0 }
    err := database.DB(func(db *sqlx.DB) error {
        r,e := db.Exec(sql, args...)
        if e != nil {
            return e
        }

        rows[0], e = r.RowsAffected()
        if e != nil {
            return e
        }

        return nil
    })

    if err != nil {
        t.Fatal(err)
    }
    return int(rows[0])
}

func fnTestSubscribe(t *testing.T) {
    t.Log("testing fnTestSubscribe")

    // clear subscriber
    fnDBExec(t, "delete from subscriber")

    msg := &models.SubscribeMsg{
        ClientID: "test-client",
        Topic: "test-topic",
        NotifyUrl: "http://localhost:3333/notify",
        ExpireTime: time.Now().UnixNano() + int64(30*time.Second),
    }
    fnPost(t, msg, "http://localhost:3333/subscribe")

    // check log
    fnAssertTrue(t, logger.Count("handlers.Subscribe, event=subscriber.registered") == 1, "expecting event=subscriber.registered")

    // check db
    count := fnDBCount(t, "select count(1) as n from subscriber where ClientID=? and topic=?", msg.ClientID, msg.Topic)
    fnAssertTrue(t, count == 1, "expecting subscriber.count == 1")
}


func fnPost(t *testing.T, msg interface{}, url string) {
    body,_ := json.Marshal(msg)
    rsp, err := http.Post(url, "application/json;charset=utf-8", bytes.NewReader(body))
    if err != nil {
        t.Fatal(err)
    }

    defer rsp.Body.Close()
    j, err := ioutil.ReadAll(rsp.Body)
    if err != nil {
        t.Fatal(err)
    }
    ok := &models.OkMsg{}
    err = json.Unmarshal(j, ok)
    if err != nil {
        t.Fatal(err)
    }
    fnAssertTrue(t, ok.OK, fmt.Sprintf("expecting replying ok from %s", url))
}

测试输出

根据assert失败提示, 逐步排查诊断日志, 主要是sqlx字段映射错误, 和未及时调用rows.Close()错误.

$ go test -v mqs_test.go 
=== RUN   Test_MQS
eventbus.Pub, event=system.boot, handler=gDeliveryService.handleBootEvent
[GIN-debug] [WARNING] Creating an Engine instance with the Logger and Recovery middleware already attached.

[GIN-debug] [WARNING] Running in "debug" mode. Switch to "release" mode in production.
 - using env:   export GIN_MODE=release
 - using code:  gin.SetMode(gin.ReleaseMode)

[GIN-debug] GET    /ping                     --> learning/gooop/saga/mqs/handlers.Ping (4 handlers)
[GIN-debug] POST   /subscribe                --> learning/gooop/saga/mqs/handlers.Subscribe (4 handlers)
[GIN-debug] POST   /publish                  --> learning/gooop/saga/mqs/handlers.Publish (4 handlers)
[GIN-debug] POST   /notify                   --> learning/gooop/saga/mqs/handlers.Notify (4 handlers)
[GIN-debug] Listening and serving HTTP on :3333
tDeliveryService.beginCreatingWorkers
tDeliveryService.beginCleanExpiredWorkers
    mqs_test.go:139: testing fnTestSubscribe
handlers.Subscribe, event=subscriber.registered, msg=&{test-client test-topic http://localhost:3333/notify 1615868155394998875}
eventbus.Pub, event=subscriber.registered, handler=gDeliveryService.handleSubscriberRegistered
[GIN] 2021/03/16 - 12:15:25 | 200 |    8.118873ms |             ::1 | POST     "/subscribe"
[GIN] 2021/03/16 - 12:15:25 | 200 |    8.214968ms |             ::1 | POST     "/subscribe"
tDeliveryWorker.afterInitialLoad, clientID=test-client, rows=0
database.DB, err=empty rows
    mqs_test.go:45: passed fnTestSubscribe
    mqs_test.go:54: testing fnTestPublishAndNotify
handlers.Publish, msg=test-global-id/test-sub-id/test-topic, msgId=15
[GIN] 2021/03/16 - 12:15:25 | 200 |   15.200109ms |             ::1 | POST     "/publish"
[GIN] 2021/03/16 - 12:15:25 | 200 |   15.216578ms |             ::1 | POST     "/publish"
handlers.Publish, pubLiveMsg 15
handlers.Publish, pubLiveMsg, msgId=15, rows=1
handlers.Publish, event=msg.published, clientID=test-client, msg=test-global-id/test-sub-id/http://localhost:3333/notify
eventbus.Pub, event=msg.published, handler=tLiveMsgSource.test-client
tLiveMsgSource.handleMsgPublished, clientID=test-client, msg=test-global-id/test-sub-id/test-topic
handlers.Notify, msg=&{test-global-id test-sub-id  0 test-topic test-content}
[GIN] 2021/03/16 - 12:15:25 | 200 |       48.38µs |             ::1 | POST     "/notify"
[GIN] 2021/03/16 - 12:15:25 | 200 |     110.659µs |             ::1 | POST     "/notify"
tDeliveryWorker.afterDeliverySuccess, done, id=test-client, msg=test-global-id/test-sub-id
    mqs_test.go:49: passed fnTestPublishAndNotify
--- PASS: Test_MQS (1.18s)
PASS
ok      command-line-arguments  1.187s

(未完待续)

上一篇 下一篇

猜你喜欢

热点阅读