本文作者 Xargin,個(gè)人博客:https://xargin.com/。
http 標(biāo)準(zhǔn)庫(kù)
服務(wù)端
請(qǐng)求處理
package main
import (
"io"
"log"
"net/http"
)
func sayhello(wr http.ResponseWriter, r *http.Request) {
wr.Header()["Content-Type"] = []string{"Application/json"}
io.WriteString(wr, "hello")
}
func main() {
http.HandleFunc("/", sayhello)
http.ListenAndServe(":9090", nil)
}

1-1
每一個(gè)請(qǐng)求啟動(dòng)一個(gè) goroutine,讀取完畢之后,調(diào)用用戶傳入的 handler(沒有的話就用默認(rèn)的),在同一連接進(jìn)行 response 響應(yīng)。整體上是個(gè) request/response loop 模型。
客戶端
連接池
type Transport struct {
idleMu sync.Mutex
closeIdle bool // user has requested to close all idle conns
idleConn map[connectMethodKey][]*persistConn // most recently used at end
idleConnWait map[connectMethodKey]wantConnQueue // waiting getConns
idleLRU connLRU
connsPerHostMu sync.Mutex
connsPerHost map[connectMethodKey]int
connsPerHostWait map[connectMethodKey]wantConnQueue // waiting getConns
// MaxIdleConns controls the maximum number of idle (keep-alive)
// connections across all hosts. Zero means no limit.
MaxIdleConns int
// MaxIdleConnsPerHost, if non-zero, controls the maximum idle
// (keep-alive) connections to keep per-host. If zero,
// DefaultMaxIdleConnsPerHost is used.
MaxIdleConnsPerHost int
// MaxConnsPerHost optionally limits the total number of
// connections per host, including connections in the dialing,
// active, and idle states. On limit violation, dials will block.
//
// Zero means no limit.
MaxConnsPerHost int
// IdleConnTimeout is the maximum amount of time an idle
// (keep-alive) connection will remain idle before closing
// itself.
// Zero means no limit.
IdleConnTimeout time.Duration
}
transport 和 client 是一一對(duì)應(yīng),每個(gè) tranport 內(nèi)有自己的 connpool, idleConn 的結(jié)構(gòu)是:map[connectMethodKey][]*persistConn,這個(gè) map 的 key 是個(gè)數(shù)據(jù)結(jié)構(gòu):
// connectMethodKey is the map key version of connectMethod, with a
// stringified proxy URL (or the empty string) instead of a pointer to
// a URL.
type connectMethodKey struct {
proxy, scheme, addr string
onlyH1 bool
}
proxy 地址 + 協(xié)議 + 地址,以及是否只支持 http1,構(gòu)成該 map 的 key,proxy 地址是完整的 proxy 地址,比如 export HTTP_PROXY=localhost:1081,則該地址為用戶提供的字符串。scheme 一般是 http:// 或 https:// 之類的字符串,addr 包含完整的域名(或 IP)和端口。
getConn:

2
在 http2 中,同一個(gè)連接可以被重復(fù)使用,所以 http2 的邏輯里,該連接被返回后仍然保持在連接池里。是否可以重復(fù)使用由 pconn.alt 來決定。
tryPutIdleConn
3
如果有正在等待連接的 goroutine,那么就把這條連接 deliver 給相應(yīng)的 goroutine,這會(huì)觸發(fā)相應(yīng)的 ready 操作,使阻塞中的 goroutine 被喚醒繼續(xù)處理請(qǐng)求。
否則將連接放回到 Transport 的 idleConn 和 idleLRU 中。
readloop 和 writeloop
func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) {
go pconn.readLoop()
go pconn.writeLoop()
return pconn, nil
}
所以每個(gè) conn 都會(huì)有相應(yīng)的 readloop 和 writeloop,因此每個(gè)連接至少有兩個(gè) goroutine。
用戶協(xié)程在使用 http.Client 發(fā)送請(qǐng)求時(shí),一路到 http.Transport.roundTrip -> http.persistConn.roundTrip:
pc.writech <- writeRequest{req, writeErrCh, continueCh}
resc := make(chan responseAndError)
pc.reqch <- requestAndChan{
req: req.Request,
ch: resc,
addedGzip: requestedGzip,
continueCh: continueCh,
callerGone: gone,
}
在該函數(shù)中,將 request 和接收請(qǐng)求的 ch 傳入到 reqch,把 writeRequest 寫入到 writech。
- writeloop 從 writech 中收到了寫請(qǐng)求,會(huì)把內(nèi)容寫入到 conn 上,這個(gè)請(qǐng)求也就發(fā)給 server 端了
- readloop 收到 requestAndChan 結(jié)果,上面 writeloop 相當(dāng)于已經(jīng)把請(qǐng)求數(shù)據(jù)發(fā)送到 server 端,readloop 這時(shí)候可以從 conn 上讀出 server 發(fā)回的 response 數(shù)據(jù),所以 readloop 主要做的就是 ReadResponse,然后把 response 的內(nèi)容寫入到 requestAndChan.ch 中。
- 主協(xié)程只要監(jiān)聽 requestAndChan.ch 來接收相應(yīng)的 response 即可(用 select 同時(shí)監(jiān)聽 err、連接關(guān)閉等 chan)。
這里 http 標(biāo)準(zhǔn)庫(kù)的做法要參考一下,把接收數(shù)據(jù)和相應(yīng)的錯(cuò)誤處理代碼可以都集中在一起:
for {
testHookWaitResLoop()
select {
case err := <-writeErrCh: // 往 server 端寫數(shù)據(jù)異常
if debugRoundTrip {
req.logf("writeErrCh resv: %T/%#v", err, err)
}
if err != nil {
pc.close(fmt.Errorf("write error: %v", err))
return nil, pc.mapRoundTripError(req, startBytesWritten, err)
}
if d := pc.t.ResponseHeaderTimeout; d > 0 {
if debugRoundTrip {
req.logf("starting timer for %v", d)
}
timer := time.NewTimer(d)
defer timer.Stop() // prevent leaks
respHeaderTimer = timer.C
}
case <-pc.closech: // 連接關(guān)閉異常
if debugRoundTrip {
req.logf("closech recv: %T %#v", pc.closed, pc.closed)
}
return nil, pc.mapRoundTripError(req, startBytesWritten, pc.closed)
case <-respHeaderTimer: // 讀請(qǐng)求頭超時(shí)
if debugRoundTrip {
req.logf("timeout waiting for response headers.")
}
pc.close(errTimeout)
return nil, errTimeout
case re := <-resc: // 正常地從 response 的 channel 里讀到了響應(yīng)數(shù)據(jù)
if (re.res == nil) == (re.err == nil) {
panic(fmt.Sprintf("internal error: exactly one of res or err should be set; nil=%v", re.res == nil))
}
if debugRoundTrip {
req.logf("resc recv: %p, %T/%#v", re.res, re.err, re.err)
}
if re.err != nil {
return nil, pc.mapRoundTripError(req, startBytesWritten, re.err)
}
return re.res, nil
case <-cancelChan: // 用戶側(cè)通過 context 取消了流程
pc.t.CancelRequest(req.Request)
cancelChan = nil
case <-ctxDoneChan: // 這個(gè)應(yīng)該意思差不多
pc.t.cancelRequest(req.Request, req.Context().Err())
cancelChan = nil
ctxDoneChan = nil
}
}
http2
https://tools.ietf.org/html/rfc7540 https://github.com/bagder/http2-explained

4
http2 協(xié)議通過 frame 中的 stream id 對(duì)請(qǐng)求和響應(yīng)進(jìn)行關(guān)聯(lián)。
http2 可以不等待上一個(gè)請(qǐng)求響應(yīng)后再發(fā)下一個(gè)請(qǐng)求,因此同一個(gè)連接上可以實(shí)現(xiàn) multiplexing。標(biāo)準(zhǔn)庫(kù)中對(duì)于 http2 連接的處理復(fù)用了 http1 的連接池邏輯,只不過從連接池中取連接時(shí),并沒有真的從連接池里把這個(gè)連接拿走。獲取到的連接依然保留在 connpool 中。
除此之外,h2 的 connpool 和 h1 的沒什么區(qū)別。
從 idleConn 數(shù)組中獲取 idle 連接時(shí):
func (t *Transport) queueForIdleConn(w *wantConn) (delivered bool) {
if delivered {
if pconn.alt != nil {
// HTTP/2: multiple clients can share pconn.
// Leave it in the list.
} else {
// HTTP/1: only one client can use pconn.
// Remove it from the list.
t.idleLRU.remove(pconn)
list = list[:len(list)-1]
}
}
把使用完的連接放回連接池時(shí):
// HTTP/2 (pconn.alt != nil) connections do not come out of the idle list,
// because multiple goroutines can use them simultaneously.
// If this is an HTTP/2 connection being “returned,” we're done.
if pconn.alt != nil && t.idleLRU.m[pconn] != nil {
return nil
}
if pconn.alt == nil {
// HTTP/1.
// Loop over the waiting list until we find a w that isn't done already, and hand it pconn.
for q.len() > 0 {
w := q.popFront()
if w.tryDeliver(pconn, nil) {
done = true
break
}
}
} else {
// HTTP/2.
// Can hand the same pconn to everyone in the waiting list,
// and we still won't be done: we want to put it in the idle
// list unconditionally, for any future clients too.
for q.len() > 0 {
w := q.popFront()
w.tryDeliver(pconn, nil)
}
}
- 如果 LRU 列表非空,說明當(dāng)前沒有等待的 goroutine,而在獲取 http2 連接時(shí),并沒有把連接從連接池中真地拿走,所以直接返回就行了。
- 如果 LRU 列表為空,這條可能是新建的連接,需要把 waitqueue 彈到空,并把當(dāng)前這條連接放進(jìn)連接池。
fasthttp
服務(wù)端
請(qǐng)求處理
5
fasthttp 的 server 端使用 worker pool 來進(jìn)行 goroutine 復(fù)用,不會(huì)頻繁創(chuàng)建新的 g。
workerPool.workerFunc 就是每個(gè) worker 的主循環(huán):
func (wp *workerPool) workerFunc(ch *workerChan) {
var c net.Conn
for c = range ch.ch {
if c == nil {
break
}
wp.WorkerFunc(c)
}
wp.lock.Lock()
wp.workersCount--
wp.lock.Unlock()
}
每次 serve 新的 conn 時(shí):
- 從 workerpool 中獲取一個(gè) worker,沒有就新建,啟動(dòng) workerFunc 主循環(huán),監(jiān)聽 worker channel。
- 把當(dāng)前 serve 的新連接發(fā)送到 worker channel
- workerFunc 獲取到新 conn,即開始請(qǐng)求處理流程。執(zhí)行 fasthttp.Server.serveConn
客戶端
連接池
type HostClient struct {
// Maximum number of connections which may be established to all hosts
// listed in Addr.
//
// You can change this value while the HostClient is being used
// using HostClient.SetMaxConns(value)
//
// DefaultMaxConnsPerHost is used if not set.
MaxConns int
// Keep-alive connections are closed after this duration.
//
// By default connection duration is unlimited.
MaxConnDuration time.Duration
// Idle keep-alive connections are closed after this duration.
//
// By default idle connections are closed
// after DefaultMaxIdleConnDuration.
MaxIdleConnDuration time.Duration
// Maximum number of attempts for idempotent calls
//
// DefaultMaxIdemponentCallAttempts is used if not set.
MaxIdemponentCallAttempts int
// Maximum duration for waiting for a free connection.
//
// By default will not waiting, return ErrNoFreeConns immediately
MaxConnWaitTimeout time.Duration
clientName atomic.Value
lastUseTime uint32
connsLock sync.Mutex
connsCount int
conns []*clientConn
connsWait *wantConnQueue
}
acquireConn

6
流程比較簡(jiǎn)單,如果當(dāng)前 client.conns 數(shù)組 > 0,說明有空閑連接,直接取最后一個(gè)元素就好,這個(gè)元素一般是最近放進(jìn)去的連接。
releaseConn
func (c *HostClient) releaseConn(cc *clientConn) {
cc.lastUseTime = time.Now()
if c.MaxConnWaitTimeout <= 0 {
c.connsLock.Lock()
c.conns = append(c.conns, cc)
c.connsLock.Unlock()
return
}
// try to deliver an idle connection to a *wantConn
c.connsLock.Lock()
defer c.connsLock.Unlock()
delivered := false
if q := c.connsWait; q != nil && q.len() > 0 {
for q.len() > 0 {
w := q.popFront()
if w.waiting() {
delivered = w.tryDeliver(cc, nil)
break
}
}
}
if !delivered {
c.conns = append(c.conns, cc)
}
releaseConn 會(huì)先盡量嘗試把當(dāng)前的連接給正在等待連接的請(qǐng)求(wantConn),彈出等待隊(duì)列(connsWait)的第一個(gè)元素。并把連接轉(zhuǎn)交給該請(qǐng)求。如果該請(qǐng)求的狀態(tài)已經(jīng)不是 waiting 了,則繼續(xù)彈出,直到找到了合適的來接盤,或者等待隊(duì)列彈空。
如果沒有順利地把連接交出去,把當(dāng)前連接入空閑連接數(shù)組(c.conns)。
需要注意 fasthttp 里的 conns 是連接池,clientConnPool 是 clientConn 對(duì)象的對(duì)象池。
與標(biāo)準(zhǔn)庫(kù)中的 client 不同的是,fasthttp 中沒有 read write loop,所以每個(gè)請(qǐng)求是在當(dāng)前協(xié)程中完成的:
- 把 request 的 header 和 body 寫入到 conn
- 從 conn 中讀取 response
- 釋放連接、緩存各種過程中生成的 struct 對(duì)象
gRPC
服務(wù)端
gRPC 底層基于 http2,所以交互基于 http2 stream,服務(wù)端整體流程與 http2 沒什么區(qū)別。
客戶端
在 gRPC 中,客戶端沒有使用連接池,直接使用了 http2 連接:
Invoke
-> invoke
-> newClientStream
-> newAttemptLocked
-> getTransport
-> blockingpiker.pick
-> getReadyTransport
-> addrConn.connect
-> go ac.resetTransport()
然后一路走到創(chuàng)建 http2Client。
(dlv) bt
0 0x00000000013e2539 in google.golang.org/grpc/internal/transport.newHTTP2Client
at /Users/xargin/go/src/google.golang.org/grpc/internal/transport/http2_client.go:167
1 0x000000000145a5ca in google.golang.org/grpc/internal/transport.NewClientTransport
at /Users/xargin/go/src/google.golang.org/grpc/internal/transport/transport.go:575
2 0x000000000145a5ca in google.golang.org/grpc.(*addrConn).createTransport
at /Users/xargin/go/src/google.golang.org/grpc/clientconn.go:1275
3 0x0000000001459e25 in google.golang.org/grpc.(*addrConn).tryAllAddrs
at /Users/xargin/go/src/google.golang.org/grpc/clientconn.go:1205
4 0x00000000014593b7 in google.golang.org/grpc.(*addrConn).resetTransport
at /Users/xargin/go/src/google.golang.org/grpc/clientconn.go:1120
5 0x000000000105b811 in runtime.goexit
at /usr/local/go/src/runtime/asm_amd64.s:1357
thrift
thrift 官方?jīng)]有連接池,client 中生成的 seqid 只是用來和服務(wù)端返回的 rseqid 進(jìn)行匹配。
func (p *TStandardClient) Recv(iprot TProtocol, seqId int32, method string, result TStruct) error {
rMethod, rTypeId, rSeqId, err := iprot.ReadMessageBegin()
if err != nil {
return err
}
if method != rMethod {
return NewTApplicationException(WRONG_METHOD_NAME, fmt.Sprintf("%s: wrong method name", method))
} else if seqId != rSeqId {
return NewTApplicationException(BAD_SEQUENCE_ID, fmt.Sprintf("%s: out of order sequence response", method))
} else if rTypeId == EXCEPTION {
var exception tApplicationException
if err := exception.Read(iprot); err != nil {
return err
}
if err := iprot.ReadMessageEnd(); err != nil {
return err
}
return &exception
} else if rTypeId != REPLY {
return NewTApplicationException(INVALID_MESSAGE_TYPE_EXCEPTION, fmt.Sprintf("%s: invalid message type", method))
}
if err := result.Read(iprot); err != nil {
return err
}
return iprot.ReadMessageEnd()
}
thrift 的每個(gè) client 對(duì)象中包裹了一個(gè) transport:
...
useTransport, err := transportFactory.GetTransport(transport)
client := NewEchoClientFactory(useTransport, protocolFactory)
if err := transport.Open(); err != nil {
fmt.Fprintln(os.Stderr, "Error opening socket to 127.0.0.1:9898", " ", err)
os.Exit(1)
}
defer transport.Close()
req := &EchoReq{Msg: "You are welcome."}
res, err := client.Echo(context.TODO(), req)
...
type EchoClient struct {
c thrift.TClient
}
func NewEchoClientFactory(t thrift.TTransport, f thrift.TProtocolFactory) *EchoClient {
return &EchoClient{
c: thrift.NewTStandardClient(f.GetProtocol(t), f.GetProtocol(t)),
}
}
這個(gè)包裹的 transport 就是一條單獨(dú)的 tcp 連接,沒有連接池。
redigo
redigo 是個(gè) client 庫(kù),沒有服務(wù)端:
type Pool struct {
// Dial is an application supplied function for creating and configuring a
// connection.
//
// The connection returned from Dial must not be in a special state
// (subscribed to pubsub channel, transaction started, ...).
Dial func() (Conn, error)
// DialContext is an application supplied function for creating and configuring a
// connection with the given context.
//
// The connection returned from Dial must not be in a special state
// (subscribed to pubsub channel, transaction started, ...).
DialContext func(ctx context.Context) (Conn, error)
// TestOnBorrow is an optional application supplied function for checking
// the health of an idle connection before the connection is used again by
// the application. Argument t is the time that the connection was returned
// to the pool. If the function returns an error, then the connection is
// closed.
TestOnBorrow func(c Conn, t time.Time) error
// Maximum number of idle connections in the pool.
MaxIdle int
// Maximum number of connections allocated by the pool at a given time.
// When zero, there is no limit on the number of connections in the pool.
MaxActive int
// Close connections after remaining idle for this duration. If the value
// is zero, then idle connections are not closed. Applications should set
// the timeout to a value less than the server's timeout.
IdleTimeout time.Duration
// If Wait is true and the pool is at the MaxActive limit, then Get() waits
// for a connection to be returned to the pool before returning.
Wait bool
// Close connections older than this duration. If the value is zero, then
// the pool does not close connections based on age.
MaxConnLifetime time.Duration
chInitialized uint32 // set to 1 when field ch is initialized
mu sync.Mutex // mu protects the following fields
closed bool // set to true when the pool is closed.
active int // the number of open connections in the pool
ch chan struct{} // limits open connections when p.Wait is true
idle idleList // idle connections
waitCount int64 // total number of connections waited for.
waitDuration time.Duration // total time waited for new connections.
}
客戶端:
redigo 的客戶端需要顯式聲明并初始化內(nèi)部的 pool:
func newPool(addr string) *redis.Pool {
return &redis.Pool{
MaxIdle: 3,
IdleTimeout: 240 * time.Second,
// Dial or DialContext must be set. When both are set, DialContext takes precedence over Dial.
Dial: func () (redis.Conn, error) { return redis.Dial("tcp", addr) },
}
}
初始化時(shí)可以提供 TestOnBorrow 的行為:
pool := &redis.Pool{
// Other pool configuration not shown in this example.
TestOnBorrow: func(c redis.Conn, t time.Time) error {
if time.Since(t) < time.Minute {
return nil
}
_, err := c.Do("PING")
return err
},
}
使用時(shí)也需要用戶顯式地 defer Close:
func serveHome(w http.ResponseWriter, r *http.Request) {
conn := pool.Get()
defer conn.Close()
...
}
pool.Get

7
用戶需要設(shè)置 pool.Wait 是否等待,如果 Waittrue,則在沒有連接可用時(shí),會(huì)阻塞等待。如果 Waitfalse,且連接已到達(dá)閾值 pool.MaxActive,則直接返回錯(cuò)誤 ErrPoolExhausted。
activeConn.Close
func (ac *activeConn) Close() error {
pc := ac.pc
if pc == nil {
return nil
}
ac.pc = nil
if ac.state&connectionMultiState != 0 {
pc.c.Send("DISCARD")
ac.state &^= (connectionMultiState | connectionWatchState)
} else if ac.state&connectionWatchState != 0 {
pc.c.Send("UNWATCH")
ac.state &^= connectionWatchState
}
if ac.state&connectionSubscribeState != 0 {
pc.c.Send("UNSUBSCRIBE")
pc.c.Send("PUNSUBSCRIBE")
// To detect the end of the message stream, ask the server to echo
// a sentinel value and read until we see that value.
sentinelOnce.Do(initSentinel)
pc.c.Send("ECHO", sentinel)
pc.c.Flush()
for {
p, err := pc.c.Receive()
if err != nil {
break
}
if p, ok := p.([]byte); ok && bytes.Equal(p, sentinel) {
ac.state &^= connectionSubscribeState
break
}
}
}
pc.c.Do("")
ac.p.put(pc, ac.state != 0 || pc.c.Err() != nil)
return nil
}
close 時(shí)會(huì)把這個(gè) activeConn 放回連接池。
go-redis/redis
https://github.com/go-redis/redis
這個(gè) redis 庫(kù)屏蔽了連接池邏輯,用戶側(cè)基本不用關(guān)心連接,初始化時(shí),傳入連接池相關(guān)配置:
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379", // use default Addr
Password: "", // no password set
DB: 0, // use default DB
})
func NewClient(opt *Options) *Client {
opt.init()
c := Client{
baseClient: newBaseClient(opt, newConnPool(opt)),
ctx: context.Background(),
}
c.cmdable = c.Process
return &c
}
func newConnPool(opt *Options) *pool.ConnPool {
return pool.NewConnPool(&pool.Options{
Dialer: func(ctx context.Context) (net.Conn, error) {
return opt.Dialer(ctx, opt.Network, opt.Addr)
},
PoolSize: opt.PoolSize,
MinIdleConns: opt.MinIdleConns,
MaxConnAge: opt.MaxConnAge,
PoolTimeout: opt.PoolTimeout,
IdleTimeout: opt.IdleTimeout,
IdleCheckFrequency: opt.IdleCheckFrequency,
})
}
func (c *baseClient) _process(ctx context.Context, cmd Cmder) error {
var lastErr error
for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
if attempt > 0 {
if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
return err
}
}
retryTimeout := true
lastErr = c.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
err := cn.WithWriter(ctx, c.opt.WriteTimeout, func(wr *proto.Writer) error {
return writeCmd(wr, cmd)
})
if err != nil {
return err
}
err = cn.WithReader(ctx, c.cmdTimeout(cmd), cmd.readReply)
if err != nil {
retryTimeout = cmd.readTimeout() == nil
return err
}
return nil
})
if lastErr == nil || !isRetryableError(lastErr, retryTimeout) {
return lastErr
}
}
return lastErr
}
func (c *baseClient) withConn(
ctx context.Context, fn func(context.Context, *pool.Conn) error,
) error {
cn, err := c.getConn(ctx)
if err != nil {
return err
}
defer func() {
c.releaseConn(cn, err)
}()
err = fn(ctx, cn)
return err
連接池維護(hù)的邏輯和其它庫(kù)差不多。與其它庫(kù)不同的是,該庫(kù)會(huì)保證 idle 的 conns 維持在 MinIdleConn 配置數(shù)量之上,不足的話,會(huì)在后臺(tái)補(bǔ)充:
func (p *ConnPool) checkMinIdleConns() {
if p.opt.MinIdleConns == 0 {
return
}
for p.poolSize < p.opt.PoolSize && p.idleConnsLen < p.opt.MinIdleConns {
p.poolSize++
p.idleConnsLen++
go func() {
err := p.addIdleConn()
if err != nil {
p.connsMu.Lock()
p.poolSize--
p.idleConnsLen--
p.connsMu.Unlock()
}
}()
}
}
database/sql
這里的連接池與 RPC 系列的稍有區(qū)別,取的是 freeConns 的第一個(gè),并且有一個(gè)可能效率比較低的 copy 過程:
// Prefer a free connection, if possible.
numFree := len(db.freeConn)
if strategy == cachedOrNewConn && numFree > 0 {
conn := db.freeConn[0]
copy(db.freeConn, db.freeConn[1:])
db.freeConn = db.freeConn[:numFree-1]
conn.inUse = true
db.mu.Unlock()
if conn.expired(lifetime) {
conn.Close()
return nil, driver.ErrBadConn
}
// Lock around reading lastErr to ensure the session resetter finished.
conn.Lock()
err := conn.lastErr
conn.Unlock()
if err == driver.ErrBadConn {
conn.Close()
return nil, driver.ErrBadConn
}
return conn, nil
其它的沒啥特殊的。