go mysql:多协程实现mysql事务的并发操作

2022-01-26  本文已影响0人  彳亍口巴

背景

在项目开发过程中,往往会涉及到同时插入或修改多条数据,并且操作是需要保证事务原子性的,要么全部成功,要么全部失败,此时最好的办法是一次请求完成全部的数据操作,即将所有的数据拼接成一条SQL语句,但如果我们需要对数据修改前后进行保存作为记录日志的话,将数据拼接成一条SQL语句就行不通了,此时可以通过开启多协程去并发执行修改操作

datas := buildData()
    //db.SetConnMaxLifetime(time.Second * 30)
    tx, err := db.Begin()
    if err != nil {
        fmt.Println("tx err:%+v", err)
        return
    }
    var ids []string
    var wg sync.WaitGroup
    var txErr error
    for i := range datas {
        ids = append(ids, datas[i].ID)
        wg.Add(1)
        go func(i int) {
            defer wg.Done()
            //r, getErr := tx.Query("select * from t_crud")
            //if getErr != nil {
            //  fmt.Println("geterr:", getErr)
            //  return
            //}
            //fmt.Println(r.Columns())
            //r.Close()
            sql := fmt.Sprintf("insert into t_crud( `id`,`name`,`addr` ) values('%s','%s','%s')", datas[i].ID, datas[i].Name, datas[i].Addr)
            fmt.Println(sql)
            res, err2 := tx.Exec(sql)
            if err2 != nil {
                fmt.Println("exec err:%+v", err2)
                txErr = err2
                return
            }
            fmt.Println(res.RowsAffected())

            //rows, getErr2 := tx.QueryContext(context.Background(), "select * from t_crud where id=?", "id_99")
            //if getErr2 != nil {
            //  fmt.Println("geterr2:", getErr2)
            //  return
            //}
            //
            //fmt.Println(rows.Columns())
            //rows.Close()
        }(i)
    }
    wg.Wait()
    if txErr != nil {
        fmt.Println("has err roolback:", txErr)
        tx.Rollback()
        return
    }
     tx.Commit()

对数据遍历,每条数据开启一个协程,最后再汇总错误,当错误不为nil时回滚,否则提交,这样的缺点是有多少条数据就开启多少个协程,协程不好管理,可以通过channel对每次请求的协程数进行控制。

func buildData3() {
    tx, err := db.Begin()
    if err != nil {
        fmt.Println("tx err:%+v", err)
        return
    }
    var txErr error
    datas := buildData()
    execFun := func(index int) {
        sql := fmt.Sprintf("insert into t_crud( `id`,`name`,`addr` ) values('%s','%s','%s')", datas[index].ID, datas[index].Name, datas[index].Addr)
        fmt.Println(sql)
        res, err2 := tx.Exec(sql)
        if err2 != nil {
            fmt.Println("exec err:%+v", err2)
            txErr = err2
            return
        }
        fmt.Println(res.RowsAffected())
    }
    // 同时最多有10个协程在跑
    DoBatch2(len(datas), 10, execFun)
    if txErr != nil {
        tx.Rollback()
        panic(txErr)
        return
    }
    tx.Commit()
}

// DoBatch 开启指定协程数批量执行
func DoBatch(max int, goroutineNum int, execFun func(index int)) {
    var (
        wg          sync.WaitGroup
        workLimiter = make(chan struct{}, goroutineNum)
    )
    for i := 0; i < max; i++ {
        wg.Add(1)
        select {
        case workLimiter <- struct{}{}:
            go func(i int) {
                defer func() {
                    <-workLimiter
                    wg.Done()
                }()
                execFun(i)
            }(i)
        }

    }
    wg.Wait()
}

通过channel对当前的协程数量进行控制,同时只允许goroutineNum个协程在跑,否则其他任务应该阻塞

driver:bad connection

值得一提的是,同一事务开启多协程的同时如果有并发读,那可能会出现driver:bad connection错误,原因是同一事务同一时间只能有一个可以进行读操作,读完之后需要将查询得到的Rows关闭

  r, getErr := tx.Query("select * from t_crud")
if getErr != nil {
fmt.Println("geterr:", getErr)
    return
}
fmt.Println(r.Columns())
r.Close()
上一篇下一篇

猜你喜欢

热点阅读