globalsign mgo 介绍

2019-03-11  本文已影响0人  xuyaozuo

背景:

gopkg.in/mgo.v2 停止维护了

github.com/globalsign/mgo 这个是社区中目前比较活跃的库 1437 stars,最后一次提交是 2018/10/15

兼容:

兼容gopkg.in/mgo.v2 接口,一般只需要改几个包名,对比mongodb 官方的client来说相对较好的做了向上兼容

新功能增加

    ss.SetSafe(&mgo.Safe{
        W: 1,
        WMode: "majority",
        RMode: "local",
        WTimeout: int(time.Second / time.Millisecond),
        J: true,
    })
    //ss.DB("test").C("").FindId(1).Apply()

功能优化

// bug
const jdateFormat = "2006-01-02T15:04:05.999Z"
// fix
const jdateFormat = "2006-01-02T15:04:05.999Z07:00"
type inlinePtrStruct struct {
    A int
    *MStruct `bson:",inline"`
}
q := ss.DB("test").C("test_col").FindId(1)
q.SetMaxTime(time.Second)
q.Count()
ss.DB("test").C("test_col").DropAllIndexes() 

connect 优化:

// CloseAfterIdle terminates an idle socket, which has a zero
// reference, or marks the socket to be terminate after idle.
func (socket *mongoSocket) CloseAfterIdle() {
    socket.Lock()
    if socket.references == 0 {
        socket.Unlock()
        socket.Close()
        logf("Socket %p to %s: idle and close.", socket, socket.addr)
        return
    }
    socket.closeAfterIdle = true
    socket.Unlock()
    logf("Socket %p to %s: close after idle.", socket, socket.addr)
}

mongoServer 如果不可用,则所有的socket 会被强制kill 造成正在所有在用的session 试图重新执行query,造成雪崩效应。

func (server *mongoServer) poolShrinker() {
    ticker := time.NewTicker(1 * time.Minute)
    for _ = range ticker.C {
        if server.closed {
            ticker.Stop()
            return
        }
        server.Lock()
        unused := len(server.unusedSockets)
        if unused < server.minPoolSize {
            server.Unlock()
            continue
        }
        now := time.Now()
        end := 0
        reclaimMap := map[*mongoSocket]struct{}{}
        // Because the acquisition and recycle are done at the tail of array,
        // the head is always the oldest unused socket.
        for _, s := range server.unusedSockets[:unused-server.minPoolSize] {
            if s.lastTimeUsed.Add(time.Duration(server.maxIdleTimeMS) * time.Millisecond).After(now) {
                break
            }
            end++
            reclaimMap[s] = struct{}{}
        }
        tbr := server.unusedSockets[:end]
        if end > 0 {
            next := make([]*mongoSocket, unused-end)
            copy(next, server.unusedSockets[end:])
            server.unusedSockets = next
            remainSockets := []*mongoSocket{}
            for _, s := range server.liveSockets {
                if _, ok := reclaimMap[s]; !ok {
                    remainSockets = append(remainSockets, s)
                }
            }
            server.liveSockets = remainSockets
            stats.conn(-1*end, server.info.Master)
        }
        server.Unlock()

        for _, s := range tbr {
            s.Close()
        }
    }
}
func (socket *mongoSocket) Query(ops ...interface{}) (err error) {

    ......

    buf := bytesBufferPool.Get().([]byte)
    defer func() {
        bytesBufferPool.Put(buf[:0])
    }()

    ......

连接的实现逻辑:

mongo 调用逻辑.png

代码建议

使用session的方式


代码演示 (更多workshop使用方法可以参考)

    ss, err := mgo.Dial("")
    if err != nil {
        panic(err)
    }

    defer ss.Close()
    pool := workshop.NewPool(3)

    var retried bool

    pms := workshop.NewPromise(pool, workshop.Process{
        Process: func(ctx context.Context, last interface{}) (interface{}, error) {
            id := "test-id-" + fmt.Sprint(1000)
            var result = map[string]interface{}{}
            err = ss.DB("test").C("concurrence").FindId(id).One(&result)
            return result, err
        },
    }).RecoverAndRetry(workshop.ExceptionProcess{
        Process: func(ctx context.Context, err error, last interface{}) (interface{}, error) {
            if retried {
                return nil, err
            }
            if err == mgo.ErrNotFound {
                return last, err
            }
            switch err.(type){
            case *mgo.QueryError:
                return last, err
            default:
                ss.Refresh()
                retried = true
                return last, nil
            }
        },
    })

    ctx, _ := context.WithTimeout(context.Background(), time.Second)
    data, err := pms.Get(ctx)
    if err != nil {
        // handle err
    }

    log.Println("get map data:", data.(map[string]interface{}))

上一篇 下一篇

猜你喜欢

热点阅读