golang源码分析-sql package
背景
1. 结构体
1.1 depSet和finalCloser
depSet : 记录db与conn之间的依赖关系,维持连接池以及关闭时使用
finalCloser: Todo
// depSet is a finalCloser's outstanding dependencies
type depSet map[interface{}]bool // set of true bools
// The finalCloser interface is used by (*DB).addDep and related
// dependency reference counting.
type finalCloser interface {
// finalClose is called when the reference count of an object
// goes to zero. (*DB).mu is not held while calling it.
finalClose() error
1.2 DB结构体
// DB is a database handle representing a pool of zero or more
// underlying connections. It's safe for concurrent use by multiple
// goroutines.
//
// The sql package creates and frees connections automatically; it
// also maintains a free pool of idle connections. If the database has
// a concept of per-connection state, such state can only be reliably
// observed within a transaction. Once DB.Begin is called, the
// returned Tx is bound to a single connection. Once Commit or
// Rollback is called on the transaction, that transaction's
// connection is returned to DB's idle connection pool. The pool size
// can be controlled with SetMaxIdleConns.
type DB struct {
driver driver.Driver
dsn string
// numClosed is an atomic counter which represents a total number of
// closed connections. Stmt.openStmt checks it before cleaning closed
// connections in Stmt.css.
numClosed uint64
mu sync.Mutex // protects following fields
//当前连接池中空余的连接
//当freeConn>maxIdle时,会将多余的那部分连接close掉
freeConn []*driverConn
connRequests []chan connRequest
numOpen int // number of opened and pending open connections
// Used to signal the need for new connections
// a goroutine running connectionOpener() reads on this chan and
// maybeOpenNewConnections sends on the chan (one send per needed connection)
// It is closed during db.Close(). The close tells the connectionOpener
// goroutine to exit.
openerCh chan struct{}
closed bool
// 记录依赖关系
dep map[finalCloser]depSet
lastPut map[*driverConn]string // stacktrace of last conn's put; debug only
//maxIdle和maxOpen的关系?
// maxIdle为连接池中最大的连接数目
// maxOpen为打开db最大的连接数 : maxOpen仅当>0且< maxIdle的情况下才会生效
maxIdle int // zero means defaultMaxIdleConns; negative means 0
maxOpen int // <= 0 means unlimited
//单个连接最大生命周期
maxLifetime time.Duration // maximum amount of time a connection may be reused
cleanerCh chan struct{}
}
// connReuseStrategy determines how (*DB).conn returns database connections.
type connReuseStrategy uint8
const (
// alwaysNewConn forces a new connection to the database.
alwaysNewConn connReuseStrategy = iota
// cachedOrNewConn returns a cached connection, if available, else waits
// for one to become available (if MaxOpenConns has been reached) or
// creates a new database connection.
cachedOrNewConn
)
1.3 driverConn结构体
driverConn为单个连接的结构体,同时它有一个其依赖的db的指针(因为每个连接都是建立在db之上),在DB中可以处理单个连接的操作(比如Close()操作)
// driverConn wraps a driver.Conn with a mutex, to
// be held during all calls into the Conn. (including any calls onto
// interfaces returned via that Conn, such as calls on Tx, Stmt,
// Result, Rows)
type driverConn struct {
db *DB
createdAt time.Time
sync.Mutex // guards following
ci driver.Conn
closed bool
finalClosed bool // ci.Close has been called
openStmt map[driver.Stmt]bool
// guarded by db.mu
inUse bool
onPut []func() // code (with db.mu held) run when conn is next returned
dbmuClosed bool // same as closed, but guarded by db.mu, for removeClosedStmtLocked
}
2. 连接池原理分析
2.1 单个请求关闭操作
关闭操作主要分为两个步骤:
a. 加互斥锁关闭该连接中的closed值
b. 加互斥所关闭其所依赖db的依赖关系(调用removeDepLocked函数)
func (dc *driverConn) Close() error {
dc.Lock()
if dc.closed {
dc.Unlock()
return errors.New("sql: duplicate driverConn close")
}
dc.closed = true
dc.Unlock() // not defer; removeDep finalClose calls may need to lock
// And now updates that require holding dc.mu.Lock.
dc.db.mu.Lock()
dc.dbmuClosed = true
fn := dc.db.removeDepLocked(dc, dc)
dc.db.mu.Unlock()
return fn()
}
2.2 db操作去除依赖关系函数
注意,从2.1可知该函数的入参x finalCloser为单个连接的指针,在db中会建立一个map, map[finalCloser]depSet(db中的deep字段),如果该连接关闭掉,则会在关闭的时候在map中delete该依赖关系,同时还要确保该连接内Stmt全部关闭(调用finalClose函数)。
func (db *DB) removeDepLocked(x finalCloser, dep interface{}) func() error {
//println(fmt.Sprintf("removeDep(%T %p, %T %p)", x, x, dep, dep))
xdep, ok := db.dep[x]
if !ok {
panic(fmt.Sprintf("unpaired removeDep: no deps for %T", x))
}
l0 := len(xdep)
delete(xdep, dep)
switch len(xdep) {
case l0:
// Nothing removed. Shouldn't happen.
panic(fmt.Sprintf("unpaired removeDep: no %T dep on %T", dep, x))
case 0:
// No more dependencies.
delete(db.dep, x)
return x.finalClose
default:
// Dependencies remain.
return func() error { return nil }
}
}
2.3 db Open函数
a. 在sql包里有两个全局变量:一个读写锁,一个是driver map表;
b. 在driver map初始化的过程中,为防止对应关系被修改加了一把读锁;
c. connectionRequestQueueSize变量的理解:请求连接队列大小要远大于db.maxOpen,这样做可以在仅打开<=maxOpen个连接的情况下不阻塞住请求数量,其实现原理是通过golfing的channel去处理的;
var (
driversMu sync.RWMutex
drivers = make(map[string]driver.Driver)
)
// This is the size of the connectionOpener request chan (DB.openerCh).
// This value should be larger than the maximum typical value
// used for db.maxOpen. If maxOpen is significantly larger than
// connectionRequestQueueSize then it is possible for ALL calls into the *DB
// to block until the connectionOpener can satisfy the backlog of requests.
var connectionRequestQueueSize = 1000000
// Open opens a database specified by its database driver name and a
// driver-specific data source name, usually consisting of at least a
// database name and connection information.
//
// Most users will open a database via a driver-specific connection
// helper function that returns a *DB. No database drivers are included
// in the Go standard library. See https://golang.org/s/sqldrivers for
// a list of third-party drivers.
//
// Open may just validate its arguments without creating a connection
// to the database. To verify that the data source name is valid, call
// Ping.
//
// The returned DB is safe for concurrent use by multiple goroutines
// and maintains its own pool of idle connections. Thus, the Open
// function should be called just once. It is rarely necessary to
// close a DB.
func Open(driverName, dataSourceName string) (*DB, error) {
driversMu.RLock()
driveri, ok := drivers[driverName]
driversMu.RUnlock()
if !ok {
return nil, fmt.Errorf("sql: unknown driver %q (forgotten import?)", driverName)
}
db := &DB{
driver: driveri,
dsn: dataSourceName,
openerCh: make(chan struct{}, connectionRequestQueueSize),
lastPut: make(map[*driverConn]string),
}
go db.connectionOpener()
return db, nil
}
2.4 建立连接conn函数
在介绍conn函数之前要了解sql保重一个重要的枚举常量connReuseStrategy,其定义入下:
// connReuseStrategy determines how (*DB).conn returns database connections.
type connReuseStrategy uint8
const (
// alwaysNewConn forces a new connection to the database.
alwaysNewConn connReuseStrategy = iota
// cachedOrNewConn returns a cached connection, if available, else waits
// for one to become available (if MaxOpenConns has been reached) or
// creates a new database connection.
cachedOrNewConn
)
其中:
a. alwaysNewConn常量定义了打开连接时的策略为每次建立一个新的连接;
b. cachedOrNewConn常量会从返回一个cached连接或者等待一个可用连接,甚至也可能建立一个新的连接;
// connRequest represents one request for a new connection
// When there are no idle connections available, DB.conn will create
// a new connRequest and put it on the db.connRequests list.
type connRequest struct {
conn *driverConn
err error
}
下面为db中建立连接的过程,分为三个步骤:
a. 如果是基于cachedOrNewConn缓存策略,且有可用的连接,则从freeConn中取第一个连接返回;
b. 如果连接数量已经>设定的阈值,则从req channel中等待一个可用的连接返回;
c. 重新建立一个新的连接返回;
// conn returns a newly-opened or cached *driverConn.
func (db *DB) conn(strategy connReuseStrategy) (*driverConn, error) {
db.mu.Lock()
if db.closed {
db.mu.Unlock()
return nil, errDBClosed
}
lifetime := db.maxLifetime
// 策略一: Prefer a free connection, if possible.
numFree := len(db.freeConn)
if strategy == cachedOrNewConn && numFree > 0 {
conn := db.freeConn[0]
copy(db.freeConn, db.freeConn[1:])
db.freeConn = db.freeConn[:numFree-1]
conn.inUse = true
db.mu.Unlock()
if conn.expired(lifetime) {
conn.Close()
return nil, driver.ErrBadConn
}
return conn, nil
}
//策略二: Out of free connections or we were asked not to use one. If we're not
// allowed to open any more connections, make a request and wait.
if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
// Make the connRequest channel. It's buffered so that the
// connectionOpener doesn't block while waiting for the req to be read.
req := make(chan connRequest, 1)
db.connRequests = append(db.connRequests, req)
db.mu.Unlock()
ret, ok := <-req
if !ok {
return nil, errDBClosed
}
if ret.err == nil && ret.conn.expired(lifetime) {
ret.conn.Close()
return nil, driver.ErrBadConn
}
return ret.conn, ret.err
}
//策略三
db.numOpen++ // optimistically
db.mu.Unlock()
ci, err := db.driver.Open(db.dsn)
if err != nil {
db.mu.Lock()
db.numOpen-- // correct for earlier optimism
db.maybeOpenNewConnections()
db.mu.Unlock()
return nil, err
}
db.mu.Lock()
dc := &driverConn{
db: db,
createdAt: nowFunc(),
ci: ci,
}
db.addDepLocked(dc, dc)
dc.inUse = true
db.mu.Unlock()
return dc, nil
}
3. sql查询原理分析
在介绍db类查询之前,首先介绍下一个主要的结构体和一个函数,
a. driverStmt为db连接处理sql时db与driver 声明之间的一个桥梁;
b. rowsiFromStatement函数为queryConn函数调用一个基类函数,其作用为调用driver的Query方法实现对driverStmt的一个查询;
// driverStmt associates a driver.Stmt with the
// *driverConn from which it came, so the driverConn's lock can be
// held during calls.
type driverStmt struct {
//该字段Locker为interface,定义了Lock和Unlock方法,而在driverStmt结构体中,则为指向driver连接的指针
sync.Locker // the *driverConn
//si 为driver中的Stmt 类型interface(主要实现了Exec()和Query()方法)
si driver.Stmt
}
func rowsiFromStatement(ds driverStmt, args ...interface{}) (driver.Rows, error) {
ds.Lock()
want := ds.si.NumInput()
ds.Unlock()
// -1 means the driver doesn't know how to count the number of
// placeholders, so we won't sanity check input here and instead let the
// driver deal with errors.
if want != -1 && len(args) != want {
return nil, fmt.Errorf("sql: statement expects %d inputs; got %d", want, len(args))
}
// driverArgs为实现格式化参数的一个函数
dargs, err := driverArgs(&ds, args)
if err != nil {
return nil, err
}
ds.Lock()
//该函数主要调用driver中的Query实现查询其结果
rowsi, err := ds.si.Query(dargs)
ds.Unlock()
if err != nil {
return nil, err
}
return rowsi, nil
}
下面介绍下db类查询中一个主要的函数queryConn,其中
a. dc为driverConn类型;
b. releaseConn为一个函数入参,其含义为释放该连接只freeConn池的作用;
c. query 和 args为输入sql语言分解后的语句和参数;
// queryConn executes a query on the given connection.
// The connection gets released by the releaseConn function.
func (db *DB) queryConn(dc *driverConn, releaseConn func(error), query string, args []interface{}) (*Rows, error) {
if queryer, ok := dc.ci.(driver.Queryer); ok {
dargs, err := driverArgs(nil, args)
if err != nil {
releaseConn(err)
return nil, err
}
dc.Lock()
rowsi, err := queryer.Query(query, dargs)
dc.Unlock()
if err != driver.ErrSkip {
if err != nil {
releaseConn(err)
return nil, err
}
// Note: ownership of dc passes to the *Rows, to be freed
// with releaseConn.
rows := &Rows{
dc: dc,
releaseConn: releaseConn,
rowsi: rowsi,
}
return rows, nil
}
}
dc.Lock()
si, err := dc.ci.Prepare(query)
dc.Unlock()
if err != nil {
releaseConn(err)
return nil, err
}
ds := driverStmt{dc, si}
rowsi, err := rowsiFromStatement(ds, args...)
if err != nil {
dc.Lock()
si.Close()
dc.Unlock()
releaseConn(err)
return nil, err
}
// Note: ownership of ci passes to the *Rows, to be freed
// with releaseConn.
rows := &Rows{
dc: dc,
releaseConn: releaseConn,
rowsi: rowsi,
closeStmt: si,
}
return rows, nil
}
3.1 db查询语言分析
3.2 Stmt结构体介绍
// connStmt is a prepared statement on a particular connection.
type connStmt struct {
dc *driverConn
si driver.Stmt
}
// Stmt is a prepared statement.
// A Stmt is safe for concurrent use by multiple goroutines.
type Stmt struct {
// Immutable:
db *DB // where we came from
query string // that created the Stmt
stickyErr error // if non-nil, this error is returned for all operations
closemu sync.RWMutex // held exclusively during close, for read otherwise.
// If in a transaction, else both nil:
tx *Tx
txsi *driverStmt
mu sync.Mutex // protects the rest of the fields
closed bool
// css is a list of underlying driver statement interfaces
// that are valid on particular connections. This is only
// used if tx == nil and one is found that has idle
// connections. If tx != nil, txsi is always used.
css []connStmt
// lastNumClosed is copied from db.numClosed when Stmt is created
// without tx and closed connections in css are removed.
lastNumClosed uint64
}