golangGolang

golang源码分析-sql package

2016-04-09  本文已影响683人  分裂四人组

背景

1. 结构体

1.1 depSet和finalCloser

depSet : 记录db与conn之间的依赖关系,维持连接池以及关闭时使用
finalCloser: Todo

 // depSet is a finalCloser's outstanding dependencies
 type depSet map[interface{}]bool // set of true bools
// The finalCloser interface is used by (*DB).addDep and related
// dependency reference counting.

type finalCloser interface {
// finalClose is called when the reference count of an object
// goes to zero. (*DB).mu is not held while calling it.
finalClose() error

1.2 DB结构体

// DB is a database handle representing a pool of zero or more
// underlying connections. It's safe for concurrent use by multiple
// goroutines.
//
// The sql package creates and frees connections automatically; it
// also maintains a free pool of idle connections. If the database has
// a concept of per-connection state, such state can only be reliably
// observed within a transaction. Once DB.Begin is called, the
// returned Tx is bound to a single connection. Once Commit or
// Rollback is called on the transaction, that transaction's
// connection is returned to DB's idle connection pool. The pool size
// can be controlled with SetMaxIdleConns.
type DB struct {
    driver driver.Driver
    dsn    string
    // numClosed is an atomic counter which represents a total number of
    // closed connections. Stmt.openStmt checks it before cleaning closed
    // connections in Stmt.css.
    numClosed uint64
    mu           sync.Mutex // protects following fields
    //当前连接池中空余的连接
   //当freeConn>maxIdle时,会将多余的那部分连接close掉
    freeConn     []*driverConn
    connRequests []chan connRequest
    numOpen      int // number of opened and pending open connections
    // Used to signal the need for new connections
    // a goroutine running connectionOpener() reads on this chan and
    // maybeOpenNewConnections sends on the chan (one send per needed connection)
    // It is closed during db.Close(). The close tells the connectionOpener
    // goroutine to exit.
    openerCh    chan struct{}
    closed      bool
    // 记录依赖关系
    dep         map[finalCloser]depSet
    lastPut     map[*driverConn]string // stacktrace of last conn's put; debug only
    //maxIdle和maxOpen的关系?
   // maxIdle为连接池中最大的连接数目
   // maxOpen为打开db最大的连接数 : maxOpen仅当>0且< maxIdle的情况下才会生效
    maxIdle     int                    // zero means defaultMaxIdleConns; negative means 0
    maxOpen     int                    // <= 0 means unlimited
    //单个连接最大生命周期
    maxLifetime time.Duration          // maximum amount of time a connection may be reused
    cleanerCh   chan struct{}
}

// connReuseStrategy determines how (*DB).conn returns database connections.
type connReuseStrategy uint8

const (
    // alwaysNewConn forces a new connection to the database.
    alwaysNewConn connReuseStrategy = iota
    // cachedOrNewConn returns a cached connection, if available, else waits
    // for one to become available (if MaxOpenConns has been reached) or
    // creates a new database connection.
    cachedOrNewConn
)

1.3 driverConn结构体
driverConn为单个连接的结构体,同时它有一个其依赖的db的指针(因为每个连接都是建立在db之上),在DB中可以处理单个连接的操作(比如Close()操作)

// driverConn wraps a driver.Conn with a mutex, to
// be held during all calls into the Conn. (including any calls onto
// interfaces returned via that Conn, such as calls on Tx, Stmt,
// Result, Rows)
type driverConn struct {
    db        *DB
    createdAt time.Time

    sync.Mutex  // guards following
    ci          driver.Conn
    closed      bool
    finalClosed bool // ci.Close has been called
    openStmt    map[driver.Stmt]bool

    // guarded by db.mu
    inUse      bool
    onPut      []func() // code (with db.mu held) run when conn is next returned
    dbmuClosed bool     // same as closed, but guarded by db.mu, for removeClosedStmtLocked
}

2. 连接池原理分析

2.1 单个请求关闭操作

关闭操作主要分为两个步骤:
a. 加互斥锁关闭该连接中的closed值
b. 加互斥所关闭其所依赖db的依赖关系(调用removeDepLocked函数)

  func (dc *driverConn) Close() error {
    dc.Lock()
    if dc.closed {
        dc.Unlock()
        return errors.New("sql: duplicate driverConn close")
    }
    dc.closed = true
    dc.Unlock() // not defer; removeDep finalClose calls may need to lock

    // And now updates that require holding dc.mu.Lock.
    dc.db.mu.Lock()
    dc.dbmuClosed = true
    fn := dc.db.removeDepLocked(dc, dc)
    dc.db.mu.Unlock()
    return fn()
}

2.2 db操作去除依赖关系函数

注意,从2.1可知该函数的入参x finalCloser为单个连接的指针,在db中会建立一个map, map[finalCloser]depSet(db中的deep字段),如果该连接关闭掉,则会在关闭的时候在map中delete该依赖关系,同时还要确保该连接内Stmt全部关闭(调用finalClose函数)。

 func (db *DB) removeDepLocked(x finalCloser, dep interface{}) func() error {
    //println(fmt.Sprintf("removeDep(%T %p, %T %p)", x, x, dep, dep))

    xdep, ok := db.dep[x]
    if !ok {
        panic(fmt.Sprintf("unpaired removeDep: no deps for %T", x))
    }

    l0 := len(xdep)
    delete(xdep, dep)

    switch len(xdep) {
    case l0:
        // Nothing removed. Shouldn't happen.
        panic(fmt.Sprintf("unpaired removeDep: no %T dep on %T", dep, x))
    case 0:
        // No more dependencies.
        delete(db.dep, x)
        return x.finalClose
    default:
        // Dependencies remain.
        return func() error { return nil }
    }
}

2.3 db Open函数

a. 在sql包里有两个全局变量:一个读写锁,一个是driver map表;
b. 在driver map初始化的过程中,为防止对应关系被修改加了一把读锁;
c. connectionRequestQueueSize变量的理解:请求连接队列大小要远大于db.maxOpen,这样做可以在仅打开<=maxOpen个连接的情况下不阻塞住请求数量,其实现原理是通过golfing的channel去处理的;

var (
    driversMu sync.RWMutex
    drivers   = make(map[string]driver.Driver)
)

 // This is the size of the connectionOpener request chan (DB.openerCh).
// This value should be larger than the maximum typical value
// used for db.maxOpen. If maxOpen is significantly larger than
// connectionRequestQueueSize then it is possible for ALL calls into the *DB
// to block until the connectionOpener can satisfy the backlog of requests.
var connectionRequestQueueSize = 1000000

// Open opens a database specified by its database driver name and a
// driver-specific data source name, usually consisting of at least a
// database name and connection information.
//
// Most users will open a database via a driver-specific connection
// helper function that returns a *DB. No database drivers are included
// in the Go standard library. See https://golang.org/s/sqldrivers for
// a list of third-party drivers.
//
// Open may just validate its arguments without creating a connection
// to the database. To verify that the data source name is valid, call
// Ping.
//
// The returned DB is safe for concurrent use by multiple goroutines
// and maintains its own pool of idle connections. Thus, the Open
// function should be called just once. It is rarely necessary to
// close a DB.
func Open(driverName, dataSourceName string) (*DB, error) {
    driversMu.RLock()
    driveri, ok := drivers[driverName]
    driversMu.RUnlock()
    if !ok {
        return nil, fmt.Errorf("sql: unknown driver %q (forgotten import?)", driverName)
    }
    db := &DB{
        driver:   driveri,
        dsn:      dataSourceName,
        openerCh: make(chan struct{}, connectionRequestQueueSize),
        lastPut:  make(map[*driverConn]string),
    }
    go db.connectionOpener()
    return db, nil
}

2.4 建立连接conn函数

在介绍conn函数之前要了解sql保重一个重要的枚举常量connReuseStrategy,其定义入下:

// connReuseStrategy determines how (*DB).conn returns database connections.
type connReuseStrategy uint8

const (
    // alwaysNewConn forces a new connection to the database.
    alwaysNewConn connReuseStrategy = iota
    // cachedOrNewConn returns a cached connection, if available, else waits
    // for one to become available (if MaxOpenConns has been reached) or
    // creates a new database connection.
    cachedOrNewConn
)

其中:
a. alwaysNewConn常量定义了打开连接时的策略为每次建立一个新的连接;
b. cachedOrNewConn常量会从返回一个cached连接或者等待一个可用连接,甚至也可能建立一个新的连接;

// connRequest represents one request for a new connection
// When there are no idle connections available, DB.conn will create
// a new connRequest and put it on the db.connRequests list.
type connRequest struct {
    conn *driverConn
    err  error
}

下面为db中建立连接的过程,分为三个步骤:
a. 如果是基于cachedOrNewConn缓存策略,且有可用的连接,则从freeConn中取第一个连接返回;
b. 如果连接数量已经>设定的阈值,则从req channel中等待一个可用的连接返回;
c. 重新建立一个新的连接返回;

// conn returns a newly-opened or cached *driverConn.
func (db *DB) conn(strategy connReuseStrategy) (*driverConn, error) {
    db.mu.Lock()
    if db.closed {
        db.mu.Unlock()
        return nil, errDBClosed
    }
    lifetime := db.maxLifetime

    // 策略一: Prefer a free connection, if possible.
    numFree := len(db.freeConn)
    if strategy == cachedOrNewConn && numFree > 0 {
        conn := db.freeConn[0]
        copy(db.freeConn, db.freeConn[1:])
        db.freeConn = db.freeConn[:numFree-1]
        conn.inUse = true
        db.mu.Unlock()
        if conn.expired(lifetime) {
            conn.Close()
            return nil, driver.ErrBadConn
        }
        return conn, nil
    }

    //策略二: Out of free connections or we were asked not to use one.  If we're not
    // allowed to open any more connections, make a request and wait.
    if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
        // Make the connRequest channel. It's buffered so that the
        // connectionOpener doesn't block while waiting for the req to be read.
        req := make(chan connRequest, 1)
        db.connRequests = append(db.connRequests, req)
        db.mu.Unlock()
        ret, ok := <-req
        if !ok {
            return nil, errDBClosed
        }
        if ret.err == nil && ret.conn.expired(lifetime) {
            ret.conn.Close()
            return nil, driver.ErrBadConn
        }
        return ret.conn, ret.err
    }

    //策略三
    db.numOpen++ // optimistically
    db.mu.Unlock()
    ci, err := db.driver.Open(db.dsn)
    if err != nil {
        db.mu.Lock()
        db.numOpen-- // correct for earlier optimism
        db.maybeOpenNewConnections()
        db.mu.Unlock()
        return nil, err
    }
    db.mu.Lock()
    dc := &driverConn{
        db:        db,
        createdAt: nowFunc(),
        ci:        ci,
    }
    db.addDepLocked(dc, dc)
    dc.inUse = true
    db.mu.Unlock()
    return dc, nil
}

3. sql查询原理分析

在介绍db类查询之前,首先介绍下一个主要的结构体和一个函数,
a. driverStmt为db连接处理sql时db与driver 声明之间的一个桥梁;
b. rowsiFromStatement函数为queryConn函数调用一个基类函数,其作用为调用driver的Query方法实现对driverStmt的一个查询;

// driverStmt associates a driver.Stmt with the
// *driverConn from which it came, so the driverConn's lock can be
// held during calls.
type driverStmt struct {
    //该字段Locker为interface,定义了Lock和Unlock方法,而在driverStmt结构体中,则为指向driver连接的指针
    sync.Locker // the *driverConn
    //si 为driver中的Stmt 类型interface(主要实现了Exec()和Query()方法)
    si          driver.Stmt
}

func rowsiFromStatement(ds driverStmt, args ...interface{}) (driver.Rows, error) {
    ds.Lock()
    want := ds.si.NumInput()
    ds.Unlock()

    // -1 means the driver doesn't know how to count the number of
    // placeholders, so we won't sanity check input here and instead let the
    // driver deal with errors.
    if want != -1 && len(args) != want {
        return nil, fmt.Errorf("sql: statement expects %d inputs; got %d", want, len(args))
    }
    // driverArgs为实现格式化参数的一个函数
    dargs, err := driverArgs(&ds, args)
    if err != nil {
        return nil, err
    }

    ds.Lock()
    //该函数主要调用driver中的Query实现查询其结果
    rowsi, err := ds.si.Query(dargs)
    ds.Unlock()
    if err != nil {
        return nil, err
    }
    return rowsi, nil
}

下面介绍下db类查询中一个主要的函数queryConn,其中
a. dc为driverConn类型;
b. releaseConn为一个函数入参,其含义为释放该连接只freeConn池的作用;
c. query 和 args为输入sql语言分解后的语句和参数;

// queryConn executes a query on the given connection.
// The connection gets released by the releaseConn function.
func (db *DB) queryConn(dc *driverConn, releaseConn func(error), query string, args []interface{}) (*Rows, error) {
    if queryer, ok := dc.ci.(driver.Queryer); ok {
        dargs, err := driverArgs(nil, args)
        if err != nil {
            releaseConn(err)
            return nil, err
        }
        dc.Lock()
        rowsi, err := queryer.Query(query, dargs)
        dc.Unlock()
        if err != driver.ErrSkip {
            if err != nil {
                releaseConn(err)
                return nil, err
            }
            // Note: ownership of dc passes to the *Rows, to be freed
            // with releaseConn.
            rows := &Rows{
                dc:          dc,
                releaseConn: releaseConn,
                rowsi:       rowsi,
            }
            return rows, nil
        }
    }

    dc.Lock()
    si, err := dc.ci.Prepare(query)
    dc.Unlock()
    if err != nil {
        releaseConn(err)
        return nil, err
    }

    ds := driverStmt{dc, si}
    rowsi, err := rowsiFromStatement(ds, args...)
    if err != nil {
        dc.Lock()
        si.Close()
        dc.Unlock()
        releaseConn(err)
        return nil, err
    }

    // Note: ownership of ci passes to the *Rows, to be freed
    // with releaseConn.
    rows := &Rows{
        dc:          dc,
        releaseConn: releaseConn,
        rowsi:       rowsi,
        closeStmt:   si,
    }
    return rows, nil
}

3.1 db查询语言分析

3.2 Stmt结构体介绍

// connStmt is a prepared statement on a particular connection.
type connStmt struct {
    dc *driverConn
    si driver.Stmt
}

// Stmt is a prepared statement.
// A Stmt is safe for concurrent use by multiple goroutines.
type Stmt struct {
    // Immutable:
    db        *DB    // where we came from
    query     string // that created the Stmt
    stickyErr error  // if non-nil, this error is returned for all operations

    closemu sync.RWMutex // held exclusively during close, for read otherwise.

    // If in a transaction, else both nil:
    tx   *Tx
    txsi *driverStmt

    mu     sync.Mutex // protects the rest of the fields
    closed bool

    // css is a list of underlying driver statement interfaces
    // that are valid on particular connections.  This is only
    // used if tx == nil and one is found that has idle
    // connections.  If tx != nil, txsi is always used.
    css []connStmt

    // lastNumClosed is copied from db.numClosed when Stmt is created
    // without tx and closed connections in css are removed.
    lastNumClosed uint64
} 
上一篇下一篇

猜你喜欢

热点阅读