RoundTrip是发送和接收一次http请求的底层实现逻辑
我们可以从中学习到
1、如何去实现一个长链接
2、如果去提升性能
3、如何考虑完备各种边界情况
4、http协议
请求和连接的关系一个请求肯定会使用一个或者多个连接,通常是一个,但如果异常情况,如刚好连接在使用过程中失效了,则不排除使用多个连接
一个连接可以在一次请求后就丢弃(关闭),但是对于长链接的情况,连接可以被放回到闲置连接池中,这样后续的请求到来时,就可以不必再次创建新的连接,而是复用之前的连接。要是到tcp的三次握手和四次分手还是挺消耗时间的
虽然长链接带来了性能的提升,但无疑增加了代码开发的复杂度。比如你需要考虑一个连接闲置时间过长时,可能server端已经关闭的情况。另外一种极端情况就是,当我们使用闲置连接处理请求时,在发送请求那一刻,可能server端正好主动关闭该连接。这无疑加大了代码开发的难度。
代码分析以下代码仅保留核心部分
核心类type Transport struct { idleMu sync.Mutex closeIdle bool // 用户请求关闭所有的闲置连接 idleConn map[connectMethodKey][]*persistConn // 每个host对应的闲置连接列表 idleConnWait map[connectMethodKey]wantConnQueue // 每个host对应的等待闲置连接列表,在其它request将连接放回连接池前先看一下这个队列是否为空,不为空则直接将连接交由其中一个等待对象 idleLRU connLRU // 用来清理过期的连接 reqMu sync.Mutex reqCanceler map[*Request]func(error) connsPerHostMu sync.Mutex connsPerHost map[connectMethodKey]int // 每个host对应的等待连接个数 connsPerHostWait map[connectMethodKey]wantConnQueue // 每个host对应的等待连接列表 // 用于指定创建未加密的TCP连接的dial功能,如果该函数为空,则使用net包下的dial函数 DialContext func(ctx context.Context, network, addr string) (net.Conn, error) Dial func(network, addr string) (net.Conn, error) // 以下两个函数处理https的请求 DialTLSContext func(ctx context.Context, network, addr string) (net.Conn, error) DialTLS func(network, addr string) (net.Conn, error) // 是否复用连接 DisableKeepAlives bool // 是否压缩 DisableCompression bool // 总的最大闲置连接的个数 MaxIdleConns int // 每个host最大闲置连接的个数 MaxIdleConnsPerHost int // 每个host的最大连接个数,如果已经达到该数字,dial连接会被block住 MaxConnsPerHost int // 闲置连接的最大等待时间,一旦超过该时间,连接会被关闭 IdleConnTimeout time.Duration // 读超时,从写完请求到接受到返回头的总时间 ResponseHeaderTimeout time.Duration // Expect:100-continue两个请求间的超时时间 ExpectContinueTimeout time.Duration // 返回中header的限制 MaxResponseHeaderBytes int64 // write buffer的使用量 WriteBufferSize int // read buffer的使用量 ReadBufferSize int }一次请求和接收的主函数roundTrip
// roundTrip implements a RoundTripper over HTTP. func (t *Transport) roundTrip(req *Request) (*Response, error) { ctx := req.Context() // 通过context控制请求的生命周期 trace := httptrace.ContextClientTrace(ctx) // 钩子函数,在请求的各个阶段可以指定回调函数 if req.URL == nil { // 异常情况 req.closeBody() return nil, errors.New("http: nil Request.URL") } if req.Header == nil { // 异常情况 req.closeBody() return nil, errors.New("http: nil Request.Header") } scheme := req.URL.Scheme isHTTP := scheme == "http" || scheme == "https" if isHTTP { for k, vv := range req.Header { if !httpguts.ValidHeaderFieldName(k) { // header key校验 req.closeBody() return nil, fmt.Errorf("net/http: invalid header field name %q", k) } for _, v := range vv { if !httpguts.ValidHeaderFieldValue(v) { // header values校验 req.closeBody() return nil, fmt.Errorf("net/http: invalid header field value %q for key %v", v, k) } } } } if altRT := t.alternateRoundTripper(req); altRT != nil { // 自定义的请求执行函数 if resp, err := altRT.RoundTrip(req); err != ErrSkipAltProtocol { return resp, err } } if !isHTTP { // 异常情况 req.closeBody() return nil, &badStringError{"unsupported protocol scheme", scheme} } if req.Method != "" && !validMethod(req.Method) { // 异常情况 req.closeBody() return nil, fmt.Errorf("net/http: invalid method %q", req.Method) } if req.URL.Host == "" { // 异常情况 req.closeBody() return nil, errors.New("http: no Host in request URL") } for { // 这一层循环主要是支持请求重试 select { case <-ctx.Done(): // 上层取消这次请求,或超时 req.closeBody() return nil, ctx.Err() default: } // treq gets modified by roundTrip, so we need to recreate for each retry. treq := &transportRequest{Request: req, trace: trace} cm, err := t.connectMethodForRequest(treq) // 给请求打标签,其值是shema + uri + httpversion if err != nil { req.closeBody() return nil, err } // 核心函数,获取一个请求链接,这个链接可能是缓存池中的,也可能是新建立的 // 请求的发送与接受也是在这个函数内部实现的 pconn, err := t.getConn(treq, cm) if err != nil { t.setReqCanceler(req, nil) req.closeBody() return nil, err } var resp *Response if pconn.alt != nil { // 自定义的情况 // HTTP/2 path. t.setReqCanceler(req, nil) // not cancelable with CancelRequest resp, err = pconn.alt.RoundTrip(req) } else { resp, err = pconn.roundTrip(treq) // 这是另一个核心函数,用来控制请求的发送和接受 } if err == nil { return resp, nil } if !pconn.shouldRetryRequest(req, err) { // 判断是否重新连接,通常对于新建的连接不支持重发。一种情况是server已经关闭该连接,但同时client使用老的连接发送了一个请求,这时就应该允许重发请求 // Issue 16465: return underlying net.Conn.Read error from peek, // as we've historically done. if e, ok := err.(transportReadFromServerError); ok { err = e.err } return nil, err } } }
这个函数里面最主要的逻辑是获取一个有效连接,由t.getConn函数实现;另一个是对于本次请求发送与接收的控制逻辑,由pconn.roundTrip。下面分别介绍这两个函数的具体实现。
persistConn类该类封装了底层连接,作为长链接的形式供请求使用
// persistConn wraps a connection, usually a persistent one // (but may be used for non-keep-alive requests as well) type persistConn struct { t *Transport cacheKey connectMethodKey // schema + host + uri conn net.Conn // 底层连接 br *bufio.Reader // 连接的读buffer bw *bufio.Writer // 连接的写buffer nwrite int64 reqch chan requestAndChan // 作为persistConn.roundTrip和readLoop之间的同步,由roundTrip写,readLoop读 writech chan writeRequest // 作为persistConn.roundTrip和writeLoop之间的同步,由roundTrip写,writeLoop读 closech chan struct{} // 连接关闭的channel sawEOF bool // 是否读完整请求内容,由readLoop负责 readLimit int64 // 读数据的最大值,由readLoop负责 writeErrCh chan error // writeLoop和readLoop之间的同步,用以判断该连接是否可以复用 writeLoopDone chan struct{} // writeLoop函数退出时关闭 // Both guarded by Transport.idleMu: idleAt time.Time // 最后一次闲置的时间 idleTimer *time.Timer // 计数器,用来到期清理本连接 mu sync.Mutex // guards following fields numExpectedResponses int // 连接的请求次数,大于1表示该连接被复用 closed error // 设置连接错误的原因 canceledErr error // 设置连接被取消的原因 broken bool // 在使用过程中被损坏 reused bool // 连接是否被复用 }persistConn roundTrip
// 一次请求的发送与接受的实现 func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) { testHookEnterRoundTrip() if !pc.t.replaceReqCanceler(req.Request, pc.cancelRequest) { pc.t.putOrCloseIdleConn(pc) return nil, errRequestCanceled } pc.mu.Lock() pc.numExpectedResponses++ // 当前连接的请求次数加1 headerFn := pc.mutateHeaderFunc pc.mu.Unlock() if headerFn != nil { headerFn(req.extraHeaders()) } var continueCh chan struct{} // 后面有详细分析 if req.ProtoAtLeast(1, 1) && req.Body != nil && req.expectsContinue() { continueCh = make(chan struct{}, 1) // capacity一定要为1,否则如果readLoop因为什么原因先关闭的话,会照成goroutine泄漏 } if pc.t.DisableKeepAlives && !req.wantsClose() { req.extraHeaders().Set("Connection", "close") // 通知server本次连接为短连接 } gone := make(chan struct{}) // 本次roundTrip关闭的channel defer close(gone) defer func() { if err != nil { pc.t.setReqCanceler(req.Request, nil) } }() const debugRoundTrip = false startBytesWritten := pc.nwrite writeErrCh := make(chan error, 1) pc.writech <- writeRequest{req, writeErrCh, continueCh} // 通知writeLoop启动一次写操作 resc := make(chan responseAndError) pc.reqch <- requestAndChan{ // 通知readLoop启动一次读操作 req: req.Request, ch: resc, addedGzip: requestedGzip, continueCh: continueCh, callerGone: gone, } var respHeaderTimer <-chan time.Time cancelChan := req.Request.Cancel ctxDoneChan := req.Context().Done() for { testHookWaitResLoop() select { case err := <-writeErrCh: // 完成写操作,和writeLoop同步 if debugRoundTrip { req.logf("writeErrCh resv: %T/%#v", err, err) } if err != nil { pc.close(fmt.Errorf("write error: %v", err)) return nil, pc.mapRoundTripError(req, startBytesWritten, err) } if d := pc.t.ResponseHeaderTimeout; d > 0 { // 给读response header设定时 if debugRoundTrip { req.logf("starting timer for %v", d) } timer := time.NewTimer(d) defer timer.Stop() // prevent leaks respHeaderTimer = timer.C } case <-pc.closech: // 连接被关闭 if debugRoundTrip { req.logf("closech recv: %T %#v", pc.closed, pc.closed) } return nil, pc.mapRoundTripError(req, startBytesWritten, pc.closed) case <-respHeaderTimer: // 读请求结果超时 if debugRoundTrip { req.logf("timeout waiting for response headers.") } pc.close(errTimeout) return nil, errTimeout case re := <-resc: // readLoop返回的结果 if (re.res == nil) == (re.err == nil) { panic(fmt.Sprintf("internal error: exactly one of res or err should be set; nil=%v", re.res == nil)) } if debugRoundTrip { req.logf("resc recv: %p, %T/%#v", re.res, re.err, re.err) } if re.err != nil { // 有错误的情况 return nil, pc.mapRoundTripError(req, startBytesWritten, re.err) } return re.res, nil // 正常情况 case <-cancelChan: // 请求被取消 pc.t.CancelRequest(req.Request) cancelChan = nil case <-ctxDoneChan: // 请求超时 pc.t.cancelRequest(req.Request, req.Context().Err()) cancelChan = nil ctxDoneChan = nil } } }getConn函数
这个函数的主要作用是获取一个有效连接
// 获取一个有效的连接 // 获取连接的方式有三种 // 1是从连接池中获取一个闲置连接 // 2是新创建一个连接 // 3是在等待新创建连接好期间,有别的请求释放了一个连接,则直接使用该连接 func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (pc *persistConn, err error) { req := treq.Request trace := treq.trace ctx := req.Context() if trace != nil && trace.GetConn != nil { trace.GetConn(cm.addr()) } w := &wantConn{ // 底层连接可以复用,但wantConn一定是每个request对应一个,需要使用它进行同步 cm: cm, key: cm.key(), ctx: ctx, ready: make(chan struct{}, 1), beforeDial: testHookPrePendingDial, afterDial: testHookPostPendingDial, } defer func() { if err != nil { w.cancel(t, err) } }() // 核心函数,从缓存池中获取到闲置连接,如果获取到则直接使用;否则在等待连接队列中进行注册 if delivered := t.queueForIdleConn(w); delivered { pc := w.pc // Trace only for HTTP/1. // HTTP/2 calls trace.GotConn itself. if pc.alt == nil && trace != nil && trace.GotConn != nil { trace.GotConn(pc.gotIdleConnTrace(pc.idleAt)) } // set request canceler to some non-nil function so we // can detect whether it was cleared between now and when // we enter roundTrip t.setReqCanceler(req, func(error) {}) return pc, nil } cancelc := make(chan error, 1) t.setReqCanceler(req, func(err error) { cancelc <- err }) // 新创建一个连接,与此同时,它仍可能在等待其它正在使用的连接 t.queueForDial(w) // Wait for completion or cancellation. select { case <-w.ready: // 连接已经创建好了,有以上三种情况 // Trace success but only for HTTP/1. // HTTP/2 calls trace.GotConn itself. if w.pc != nil && w.pc.alt == nil && trace != nil && trace.GotConn != nil { trace.GotConn(httptrace.GotConnInfo{Conn: w.pc.conn, Reused: w.pc.isReused()}) } if w.err != nil { // 有错误发生 // If the request has been cancelled, that's probably // what caused w.err; if so, prefer to return the // cancellation error (see golang.org/issue/16049). select { case <-req.Cancel: return nil, errRequestCanceledConn case <-req.Context().Done(): return nil, req.Context().Err() case err := <-cancelc: if err == errRequestCanceled { err = errRequestCanceledConn } return nil, err default: // return below } } return w.pc, w.err // 返回连接,此时连接是connect状态 case <-req.Cancel: return nil, errRequestCanceledConn case <-req.Context().Done(): return nil, req.Context().Err() case err := <-cancelc: if err == errRequestCanceled { err = errRequestCanceledConn } return nil, err } }
要么从闲置连接池中获取,要么新创建一个连接。下面分别介绍其对应的实现
queueForIdleConn函数该函数的主要作用是从闲置连接池获取空闲连接。
如果有空闲连接,delivered返回true;否则在等待连接队列里进行注册,这样如果之后有别的请求释放了一个连接,可以直接拿过来用
// 该函数的主要作用是从连接池中获取一个闲置连接 func (t *Transport) queueForIdleConn(w *wantConn) (delivered bool) { t.idleMu.Lock() defer t.idleMu.Unlock() // 设置闲置期最长的时间点 var oldTime time.Time if t.IdleConnTimeout > 0 { oldTime = time.Now().Add(-t.IdleConnTimeout) } // 从最新的连接开始遍历 if list, ok := t.idleConn[w.key]; ok { stop := false delivered := false for len(list) > 0 && !stop { pconn := list[len(list)-1] // 获取连接池中最新的连接 // 下面这个函数设计得很巧妙,要知道我们在把一个连接放入连接池中时设置过过期timer,它会主动清理连接 // 但在程序实际运行的过程中,很可能timer到时时,该协程并没有被cpu给调度上来,如果还继续使用的话,一旦清理协程被调度上来,会照成逻辑错误 tooOld := !oldTime.IsZero() && pconn.idleAt.Round(0).Before(oldTime) if tooOld { go pconn.closeConnIfStillIdle() // 异步关闭连接 } if pconn.isBroken() || tooOld { // 如果连接损坏,则使用下一个连接 list = list[:len(list)-1] continue } delivered = w.tryDeliver(pconn, nil) if delivered { // 该连接被使用,如果delivered为false,表示已经从别的渠道接受到连接 if pconn.alt != nil { // HTTP/2: multiple clients can share pconn. // Leave it in the list. } else { // HTTP/1: only one client can use pconn. // Remove it from the list. t.idleLRU.remove(pconn) list = list[:len(list)-1] } } stop = true } if len(list) > 0 { t.idleConn[w.key] = list } else { delete(t.idleConn, w.key) } if stop { // 1从连接池中获取到连接,2是在运行的过程中,创建新的连接已完成,这是也不需要在等待连接了 return delivered } } if t.idleConnWait == nil { t.idleConnWait = make(map[connectMethodKey]wantConnQueue) } q := t.idleConnWait[w.key] q.cleanFront() // 清理过期连接 q.pushBack(w) // 放入等待连接队列中 t.idleConnWait[w.key] = q return false }queueForDial函数
这个函数的主要功能是创建一个新的连接
// 创建一个新的连接 func (t *Transport) queueForDial(w *wantConn) { w.beforeDial() if t.MaxConnsPerHost <= 0 { // 对每个host的连接数不做限制,直接创建 go t.dialConnFor(w) // 注意这里启动了一个协程,它的好处是如果在新创建连接的过程中,有一个连接被释放,可以直接使用被释放的连接,而不用一直等待 return } t.connsPerHostMu.Lock() defer t.connsPerHostMu.Unlock() if n := t.connsPerHost[w.key]; n < t.MaxConnsPerHost { // 不到最大连接数,直接创建新的连接 if t.connsPerHost == nil { t.connsPerHost = make(map[connectMethodKey]int) } t.connsPerHost[w.key] = n + 1 go t.dialConnFor(w) return } // 此时连接已经达到设置的最大值,放入等待队列中 if t.connsPerHostWait == nil { t.connsPerHostWait = make(map[connectMethodKey]wantConnQueue) } q := t.connsPerHostWait[w.key] q.cleanFront() q.pushBack(w) // 放入等待列表中 t.connsPerHostWait[w.key] = q }dialConnFor函数
// dialConnFor dials on behalf of w and delivers the result to w. // dialConnFor has received permission to dial w.cm and is counted in t.connCount[w.cm.key()]. // If the dial is cancelled or unsuccessful, dialConnFor decrements t.connCount[w.cm.key()]. func (t *Transport) dialConnFor(w *wantConn) { defer w.afterDial() pc, err := t.dialConn(w.ctx, w.cm) // 核心函数,创建连接 delivered := w.tryDeliver(pc, err) // 将新创建的连接赋值给w,如果返回false,表示已经从别的渠道获取到连接 if err == nil && (!delivered || pc.alt != nil) { // 赋值不成功,说明w已经在这期间从等待列表中获取到有效的连接 // pconn was not passed to w, // or it is HTTP/2 and can be shared. // Add to the idle connection pool. t.putOrCloseIdleConn(pc) // 将新创建的连接放入闲置连接池中,或者关闭掉 } if err != nil { t.decConnsPerHost(w.key) // 创建失败,更新统计信息 } }
dialConn函数 这个函数主要有三个作用,1是创建底层的socket,2是将socket与read buffer和write buffer关联起来,3是启动readLoop和writeLoop,需要注意的是一旦socket创建成功,readLoop和writeLoop会循环运行,服务不同的请求,直到退出
func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) { pconn = &persistConn{ t: t, cacheKey: cm.key(), reqch: make(chan requestAndChan, 1), writech: make(chan writeRequest, 1), closech: make(chan struct{}), writeErrCh: make(chan error, 1), writeLoopDone: make(chan struct{}), } trace := httptrace.ContextClientTrace(ctx) wrapErr := func(err error) error { if cm.proxyURL != nil { // Return a typed error, per Issue 16997 return &net.OpError{Op: "proxyconnect", Net: "tcp", Err: err} } return err } if cm.scheme() == "https" && t.hasCustomTLSDialer() { ... } else { conn, err := t.dial(ctx, "tcp", cm.addr()) // 创建socket if err != nil { return nil, wrapErr(err) } pconn.conn = conn } pconn.br = bufio.NewReaderSize(pconn, t.readBufferSize()) pconn.bw = bufio.NewWriterSize(persistConnWriter{pconn}, t.writeBufferSize()) go pconn.readLoop() go pconn.writeLoop() return pconn, nil }writeLoop函数
实际的写循环
func (pc *persistConn) writeLoop() { defer close(pc.writeLoopDone) for { select { case wr := <-pc.writech: // 收到roundTrip的写命令 startBytesWritten := pc.nwrite err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra, pc.waitForContinue(wr.continueCh)) // 主函数,实际写http数据 if bre, ok := err.(requestBodyReadError); ok { err = bre.error wr.req.setError(err) } if err == nil { err = pc.bw.Flush() } if err != nil { wr.req.Request.closeBody() if pc.nwrite == startBytesWritten { err = nothingWrittenError{err} } } pc.writeErrCh <- err // 通知readLoop,用于判断连接是否可以回收 wr.ch <- err // 通知persistConn.roundTrip,设定读response的timeout if err != nil { // 有错误发生时不再复用连接 pc.close(err) return } // 处理下一个请求 case <-pc.closech: // 连接被关闭 return } } }
waitForContinue函数 该函数用来支持Expect:100-continue功能,它返回一个闭包函数,它的作用是阻塞继续执行,只到以下三种情况发生。1是在readLoop接受到100-continue的返回结果,会向continueCh发送一个数据,2是在100-continue等待server返回结果超时的情况,这时会继续将body发送出去,3是连接关闭的情况
func (pc *persistConn) waitForContinue(continueCh <-chan struct{}) func() bool { if continueCh == nil { return nil } return func() bool { timer := time.NewTimer(pc.t.ExpectContinueTimeout) // 设定超时时间 defer timer.Stop() select { case _, ok := <-continueCh: // 如果readLoop接受到的消息表示不支持Expect:100-continue类型的请求,则会关闭channel return ok case <-timer.C: // 超时 return true case <-pc.closech: // 连接被关闭 return false } } }write函数
这个函数实现了发送一次请求的实际 *** 作
主要的看点是Expect:100-continue的处理方式
func (r *Request) write(w io.Writer, usingProxy bool, extraHeaders Header, waitForContinue func() bool) (err error) { trace := httptrace.ContextClientTrace(r.Context()) if trace != nil && trace.WroteRequest != nil { defer func() { trace.WroteRequest(httptrace.WroteRequestInfo{ // 完成写请求的回调函数 Err: err, }) }() } // 确定host host := cleanHost(r.Host) if host == "" { if r.URL == nil { return errMissingHost } host = cleanHost(r.URL.Host) } host = removeZone(host) ruri := r.URL.RequestURI() if usingProxy && r.URL.Scheme != "" && r.URL.Opaque == "" { ruri = r.URL.Scheme + "://" + host + ruri } else if r.Method == "CONNECT" && r.URL.Path == "" { // ConNECT requests normally give just the host and port, not a full URL. ruri = host if r.URL.Opaque != "" { ruri = r.URL.Opaque } } if stringContainsCTLByte(ruri) { return errors.New("net/http: can't write control character in Request.URL") } var bw *bufio.Writer if _, ok := w.(io.ByteWriter); !ok { bw = bufio.NewWriter(w) w = bw } // 开始写数据 _, err = fmt.Fprintf(w, "%s %s HTTP/1.1rn", valueOrDefault(r.Method, "GET"), ruri) if err != nil { return err } // Header lines _, err = fmt.Fprintf(w, "Host: %srn", host) if err != nil { return err } if trace != nil && trace.WroteHeaderField != nil { trace.WroteHeaderField("Host", []string{host}) } userAgent := defaultUserAgent if r.Header.has("User-Agent") { userAgent = r.Header.Get("User-Agent") } if userAgent != "" { _, err = fmt.Fprintf(w, "User-Agent: %srn", userAgent) if err != nil { return err } if trace != nil && trace.WroteHeaderField != nil { trace.WroteHeaderField("User-Agent", []string{userAgent}) } } // Process Body,ContentLength,Close,Trailer tw, err := newTransferWriter(r) if err != nil { return err } err = tw.writeHeader(w, trace) if err != nil { return err } err = r.Header.writeSubset(w, reqWriteExcludeHeader, trace) if err != nil { return err } if extraHeaders != nil { err = extraHeaders.write(w, trace) if err != nil { return err } } _, err = io.WriteString(w, "rn") if err != nil { return err } // 完成header if trace != nil && trace.WroteHeaders != nil { trace.WroteHeaders() } // Flush and wait for 100-continue if expected. if waitForContinue != nil { if bw, ok := w.(*bufio.Writer); ok { err = bw.Flush() // 先发送header请求 if err != nil { return err } } if trace != nil && trace.Wait100Continue != nil { trace.Wait100Continue() } if !waitForContinue() { // 等待server的返回,在以下两种情况下继续发送body,1是server返回ok,2是超时 r.closeBody() return nil } } if bw, ok := w.(*bufio.Writer); ok && tw.FlushHeaders { if err := bw.Flush(); err != nil { return err } } // 写body err = tw.writeBody(w) if err != nil { if tw.bodyReadError == err { err = requestBodyReadError{err} } return err } if bw != nil { return bw.Flush() } return nil }readLoop函数
连接读循环
func (pc *persistConn) readLoop() { closeErr := errReadLoopExiting // default value, if not changed below defer func() { pc.close(closeErr) pc.t.removeIdleConn(pc) }() tryPutIdleConn := func(trace *httptrace.ClientTrace) bool { if err := pc.t.tryPutIdleConn(pc); err != nil { // 将连接放回空闲连接池 closeErr = err if trace != nil && trace.PutIdleConn != nil && err != errKeepAlivesDisabled { trace.PutIdleConn(err) } return false } if trace != nil && trace.PutIdleConn != nil { trace.PutIdleConn(nil) } return true } // eofc is used to block caller goroutines reading from Response.Body // at EOF until this goroutines has (potentially) added the connection // back to the idle pool. eofc := make(chan struct{}) defer close(eofc) // unblock reader on errors // Read this once, before loop starts. (to avoid races in tests) testHookMu.Lock() testHookReadLoopBeforeNextRead := testHookReadLoopBeforeNextRead testHookMu.Unlock() alive := true for alive { pc.readLimit = pc.maxHeaderResponseSize() _, err := pc.br.Peek(1) pc.mu.Lock() if pc.numExpectedResponses == 0 { // 表示server端主动关闭连接 pc.readLoopPeekFailLocked(err) pc.mu.Unlock() return } pc.mu.Unlock() rc := <-pc.reqch // 一次新的请求,和persistConn.roundTrip进行同步 trace := httptrace.ContextClientTrace(rc.req.Context()) var resp *Response if err == nil { resp, err = pc.readResponse(rc, trace) // 读请求的具体实现 } else { err = transportReadFromServerError{err} closeErr = err } if err != nil { if pc.readLimit <= 0 { err = fmt.Errorf("net/http: server response headers exceeded %d bytes; aborted", pc.maxHeaderResponseSize()) } select { case rc.ch <- responseAndError{err: err}: case <-rc.callerGone: // 调用方放弃请求,链接关闭。之所以不再复用该连接,可能是避免未知原因造成下次请求的异常 return } return } pc.readLimit = maxInt64 // effectively no limit for response bodies pc.mu.Lock() pc.numExpectedResponses-- pc.mu.Unlock() bodyWritable := resp.bodyIsWritable() hasBody := rc.req.Method != "HEAD" && resp.ContentLength != 0 if resp.Close || rc.req.Close || resp.StatusCode <= 199 || bodyWritable { // Don't do keep-alive on error if either party requested a close // or we get an unexpected informational (1xx) response. // StatusCode 100 is already handled above. alive = false } if !hasBody || bodyWritable { pc.t.setReqCanceler(rc.req, nil) // Put the idle conn back into the pool before we send the response // so if they process it quickly and make another request, they'll // get this same conn. But we use the unbuffered channel 'rc' // to guarantee that persistConn.roundTrip got out of its select // potentially waiting for this persistConn to close. // but after alive = alive && !pc.sawEOF && pc.wroteRequest() && // 这个函数有两个作用,1是确保请求已经写完,因为不排除服务端还未完全接受到请求时就返回结果;2是判断此次请求在writeLoop中是否发生错误,如果有错误则连接失效 tryPutIdleConn(trace) // 将该连接放入连接池中 if bodyWritable { closeErr = errCallerOwnsConn } select { case rc.ch <- responseAndError{res: resp}: // 写response,会在persistConn.roundTrip函数中获取到该结果 case <-rc.callerGone: return } continue // 处理下一个请求 } waitForBodyRead := make(chan bool, 2) body := &bodyEOFSignal{ body: resp.Body, earlyCloseFn: func() error { waitForBodyRead <- false <-eofc // will be closed by deferred call at the end of the function return nil }, fn: func(err error) error { isEOF := err == io.EOF waitForBodyRead <- isEOF if isEOF { <-eofc // see comment above eofc declaration } else if err != nil { if cerr := pc.canceled(); cerr != nil { return cerr } } return err }, } resp.Body = body if rc.addedGzip && strings.EqualFold(resp.Header.Get("Content-Encoding"), "gzip") { resp.Body = &gzipReader{body: body} resp.Header.Del("Content-Encoding") resp.Header.Del("Content-Length") resp.ContentLength = -1 resp.Uncompressed = true } select { case rc.ch <- responseAndError{res: resp}: // 写response,会在persistConn.roundTrip函数中获取到该结果 case <-rc.callerGone: return } // Before looping back to the top of this function and peeking on // the bufio.Reader, wait for the caller goroutine to finish // reading the response body. (or for cancellation or death) select { case bodyEOF := <-waitForBodyRead: pc.t.setReqCanceler(rc.req, nil) // before pc might return to idle pool alive = alive && bodyEOF && !pc.sawEOF && pc.wroteRequest() && tryPutIdleConn(trace) // 将该连接放入连接池中 if bodyEOF { eofc <- struct{}{} } case <-rc.req.Cancel: alive = false pc.t.CancelRequest(rc.req) case <-rc.req.Context().Done(): alive = false pc.t.cancelRequest(rc.req, rc.req.Context().Err()) case <-pc.closech: // 连接被关闭,可能是lru被踢出出来了 alive = false } testHookReadLoopBeforeNextRead() } }readResponse函数
读取一次http请求返回,如果是header带“Expect:100-continue”的请求,则在正常情况下会收到两次server的返回结果 关于Expect:100-continue的介绍可以参考https://www.jianshu.com/p/154c310748db
func (pc *persistConn) readResponse(rc requestAndChan, trace *httptrace.ClientTrace) (resp *Response, err error) { if trace != nil && trace.GotFirstResponseByte != nil { if peek, err := pc.br.Peek(1); err == nil && len(peek) == 1 { trace.GotFirstResponseByte() } } num1xx := 0 // number of informational 1xx headers received const max1xxResponses = 5 // arbitrary bound on number of informational responses continueCh := rc.continueCh for { resp, err = ReadResponse(pc.br, rc.req) // 读取server的返回请求结果 if err != nil { return } resCode := resp.StatusCode if continueCh != nil { // 表示header中带“Expect:100-continue”的请求 if resCode == 100 { if trace != nil && trace.Got100Continue != nil { // 回调函数 trace.Got100Continue() } continueCh <- struct{}{} // 发送通知,让writeLoop可以继续发送请求的body continueCh = nil // 重置channel } else if resCode >= 200 { // 不支持Expect:100-continue,通知writeLoop取消发送 close(continueCh) continueCh = nil // 重置channel } } break } return }高性能分析
1、使用闲置连接池,省去了创建连接和关闭连接的时间开销;
2、一个请求同时运行读和写,这样如果server在完成读请求前就返回的话,可以提升性能;
3、一个请求获取连接的方式有三种,1是从连接池中获取一个闲置连接,2是新创建一个连接,3是在等待新创建连接好期间,有别的请求释放了一个连接,则直接使用该连接。可见作者已经将性能推至极限;
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)