Golang Http RoundTrip解析

Golang Http RoundTrip解析,第1张

Golang Http RoundTrip解析 概要

RoundTrip是发送和接收一次http请求的底层实现逻辑

我们可以从中学习到

1、如何去实现一个长链接

2、如果去提升性能

3、如何考虑完备各种边界情况

4、http协议

请求和连接的关系

一个请求肯定会使用一个或者多个连接,通常是一个,但如果异常情况,如刚好连接在使用过程中失效了,则不排除使用多个连接

一个连接可以在一次请求后就丢弃(关闭),但是对于长链接的情况,连接可以被放回到闲置连接池中,这样后续的请求到来时,就可以不必再次创建新的连接,而是复用之前的连接。要是到tcp的三次握手和四次分手还是挺消耗时间的

虽然长链接带来了性能的提升,但无疑增加了代码开发的复杂度。比如你需要考虑一个连接闲置时间过长时,可能server端已经关闭的情况。另外一种极端情况就是,当我们使用闲置连接处理请求时,在发送请求那一刻,可能server端正好主动关闭该连接。这无疑加大了代码开发的难度。

代码分析

以下代码仅保留核心部分

核心类
type Transport struct {
	idleMu       sync.Mutex
	closeIdle    bool                                // 用户请求关闭所有的闲置连接
	idleConn     map[connectMethodKey][]*persistConn // 每个host对应的闲置连接列表
	idleConnWait map[connectMethodKey]wantConnQueue  // 每个host对应的等待闲置连接列表,在其它request将连接放回连接池前先看一下这个队列是否为空,不为空则直接将连接交由其中一个等待对象
	idleLRU      connLRU                             // 用来清理过期的连接

	reqMu       sync.Mutex
	reqCanceler map[*Request]func(error)

	connsPerHostMu   sync.Mutex
	connsPerHost     map[connectMethodKey]int           // 每个host对应的等待连接个数
	connsPerHostWait map[connectMethodKey]wantConnQueue // 每个host对应的等待连接列表

	// 用于指定创建未加密的TCP连接的dial功能,如果该函数为空,则使用net包下的dial函数
	DialContext func(ctx context.Context, network, addr string) (net.Conn, error)
	Dial        func(network, addr string) (net.Conn, error)
	// 以下两个函数处理https的请求
	DialTLSContext func(ctx context.Context, network, addr string) (net.Conn, error)
	DialTLS        func(network, addr string) (net.Conn, error)

	// 是否复用连接
	DisableKeepAlives bool

	// 是否压缩
	DisableCompression bool

	// 总的最大闲置连接的个数
	MaxIdleConns int

	// 每个host最大闲置连接的个数
	MaxIdleConnsPerHost int

	// 每个host的最大连接个数,如果已经达到该数字,dial连接会被block住
	MaxConnsPerHost int

	// 闲置连接的最大等待时间,一旦超过该时间,连接会被关闭
	IdleConnTimeout time.Duration

	// 读超时,从写完请求到接受到返回头的总时间
	ResponseHeaderTimeout time.Duration

	// Expect:100-continue两个请求间的超时时间
	ExpectContinueTimeout time.Duration

	// 返回中header的限制
	MaxResponseHeaderBytes int64

	// write buffer的使用量
	WriteBufferSize int

	// read buffer的使用量
	ReadBufferSize int
}
一次请求和接收的主函数roundTrip
// roundTrip implements a RoundTripper over HTTP.
func (t *Transport) roundTrip(req *Request) (*Response, error) {
	ctx := req.Context()                       // 通过context控制请求的生命周期
	trace := httptrace.ContextClientTrace(ctx) // 钩子函数,在请求的各个阶段可以指定回调函数

	if req.URL == nil { // 异常情况
		req.closeBody()
		return nil, errors.New("http: nil Request.URL")
	}
	if req.Header == nil { // 异常情况
		req.closeBody()
		return nil, errors.New("http: nil Request.Header")
	}
	scheme := req.URL.Scheme
	isHTTP := scheme == "http" || scheme == "https"
	if isHTTP {
		for k, vv := range req.Header {
			if !httpguts.ValidHeaderFieldName(k) { // header key校验
				req.closeBody()
				return nil, fmt.Errorf("net/http: invalid header field name %q", k)
			}
			for _, v := range vv {
				if !httpguts.ValidHeaderFieldValue(v) { // header values校验
					req.closeBody()
					return nil, fmt.Errorf("net/http: invalid header field value %q for key %v", v, k)
				}
			}
		}
	}

	if altRT := t.alternateRoundTripper(req); altRT != nil { // 自定义的请求执行函数
		if resp, err := altRT.RoundTrip(req); err != ErrSkipAltProtocol {
			return resp, err
		}
	}
	if !isHTTP { // 异常情况
		req.closeBody()
		return nil, &badStringError{"unsupported protocol scheme", scheme}
	}
	if req.Method != "" && !validMethod(req.Method) { // 异常情况
		req.closeBody()
		return nil, fmt.Errorf("net/http: invalid method %q", req.Method)
	}
	if req.URL.Host == "" { // 异常情况
		req.closeBody()
		return nil, errors.New("http: no Host in request URL")
	}

	for { // 这一层循环主要是支持请求重试
		select {
		case <-ctx.Done(): // 上层取消这次请求,或超时
			req.closeBody()
			return nil, ctx.Err()
		default:
		}

		// treq gets modified by roundTrip, so we need to recreate for each retry.
		treq := &transportRequest{Request: req, trace: trace}
		cm, err := t.connectMethodForRequest(treq) // 给请求打标签,其值是shema + uri + httpversion
		if err != nil {
			req.closeBody()
			return nil, err
		}

		// 核心函数,获取一个请求链接,这个链接可能是缓存池中的,也可能是新建立的
		// 请求的发送与接受也是在这个函数内部实现的
		pconn, err := t.getConn(treq, cm)
		if err != nil {
			t.setReqCanceler(req, nil)
			req.closeBody()
			return nil, err
		}

		var resp *Response
		if pconn.alt != nil { // 自定义的情况
			// HTTP/2 path.
			t.setReqCanceler(req, nil) // not cancelable with CancelRequest
			resp, err = pconn.alt.RoundTrip(req)
		} else {
			resp, err = pconn.roundTrip(treq) // 这是另一个核心函数,用来控制请求的发送和接受
		}
		if err == nil {
			return resp, nil
		}

		if !pconn.shouldRetryRequest(req, err) { // 判断是否重新连接,通常对于新建的连接不支持重发。一种情况是server已经关闭该连接,但同时client使用老的连接发送了一个请求,这时就应该允许重发请求
			// Issue 16465: return underlying net.Conn.Read error from peek,
			// as we've historically done.
			if e, ok := err.(transportReadFromServerError); ok {
				err = e.err
			}
			return nil, err
		}
	}
}

这个函数里面最主要的逻辑是获取一个有效连接,由t.getConn函数实现;另一个是对于本次请求发送与接收的控制逻辑,由pconn.roundTrip。下面分别介绍这两个函数的具体实现。

persistConn类

该类封装了底层连接,作为长链接的形式供请求使用

// persistConn wraps a connection, usually a persistent one
// (but may be used for non-keep-alive requests as well)
type persistConn struct {
	t         *Transport
	cacheKey  connectMethodKey  // schema + host + uri
	conn      net.Conn          // 底层连接
	br        *bufio.Reader       // 连接的读buffer
	bw        *bufio.Writer       // 连接的写buffer
	nwrite    int64               
	reqch     chan requestAndChan // 作为persistConn.roundTrip和readLoop之间的同步,由roundTrip写,readLoop读
	writech   chan writeRequest   // 作为persistConn.roundTrip和writeLoop之间的同步,由roundTrip写,writeLoop读
	closech   chan struct{}       // 连接关闭的channel
	sawEOF    bool  // 是否读完整请求内容,由readLoop负责
	readLimit int64 // 读数据的最大值,由readLoop负责
	writeErrCh chan error // writeLoop和readLoop之间的同步,用以判断该连接是否可以复用
	writeLoopDone chan struct{} // writeLoop函数退出时关闭

	// Both guarded by Transport.idleMu:
	idleAt    time.Time   // 最后一次闲置的时间
	idleTimer *time.Timer // 计数器,用来到期清理本连接

	mu                   sync.Mutex // guards following fields
	numExpectedResponses int // 连接的请求次数,大于1表示该连接被复用
	closed               error // 设置连接错误的原因
	canceledErr          error // 设置连接被取消的原因
	broken               bool  // 在使用过程中被损坏
	reused               bool  // 连接是否被复用
}
persistConn roundTrip
// 一次请求的发送与接受的实现
func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
	testHookEnterRoundTrip()
	if !pc.t.replaceReqCanceler(req.Request, pc.cancelRequest) {
		pc.t.putOrCloseIdleConn(pc)
		return nil, errRequestCanceled
	}
	pc.mu.Lock()
	pc.numExpectedResponses++ // 当前连接的请求次数加1
	headerFn := pc.mutateHeaderFunc
	pc.mu.Unlock()

	if headerFn != nil {
		headerFn(req.extraHeaders())
	}

	var continueCh chan struct{} // 后面有详细分析
	if req.ProtoAtLeast(1, 1) && req.Body != nil && req.expectsContinue() {
		continueCh = make(chan struct{}, 1) // capacity一定要为1,否则如果readLoop因为什么原因先关闭的话,会照成goroutine泄漏
	}

	if pc.t.DisableKeepAlives && !req.wantsClose() {
		req.extraHeaders().Set("Connection", "close") // 通知server本次连接为短连接
	}

	gone := make(chan struct{}) // 本次roundTrip关闭的channel
	defer close(gone)

	defer func() {
		if err != nil {
			pc.t.setReqCanceler(req.Request, nil)
		}
	}()

	const debugRoundTrip = false

	startBytesWritten := pc.nwrite
	writeErrCh := make(chan error, 1)
	pc.writech <- writeRequest{req, writeErrCh, continueCh} // 通知writeLoop启动一次写操作

	resc := make(chan responseAndError)
	pc.reqch <- requestAndChan{ // 通知readLoop启动一次读操作
		req:        req.Request,
		ch:         resc,
		addedGzip:  requestedGzip,
		continueCh: continueCh,
		callerGone: gone,
	}

	var respHeaderTimer <-chan time.Time
	cancelChan := req.Request.Cancel
	ctxDoneChan := req.Context().Done()
	for {
		testHookWaitResLoop()
		select {
		case err := <-writeErrCh: // 完成写操作,和writeLoop同步
			if debugRoundTrip {
				req.logf("writeErrCh resv: %T/%#v", err, err)
			}
			if err != nil {
				pc.close(fmt.Errorf("write error: %v", err))
				return nil, pc.mapRoundTripError(req, startBytesWritten, err)
			}
			if d := pc.t.ResponseHeaderTimeout; d > 0 { // 给读response header设定时
				if debugRoundTrip {
					req.logf("starting timer for %v", d)
				}
				timer := time.NewTimer(d)
				defer timer.Stop() // prevent leaks
				respHeaderTimer = timer.C
			}
		case <-pc.closech: // 连接被关闭
			if debugRoundTrip {
				req.logf("closech recv: %T %#v", pc.closed, pc.closed)
			}
			return nil, pc.mapRoundTripError(req, startBytesWritten, pc.closed)
		case <-respHeaderTimer: // 读请求结果超时
			if debugRoundTrip {
				req.logf("timeout waiting for response headers.")
			}
			pc.close(errTimeout)
			return nil, errTimeout
		case re := <-resc: // readLoop返回的结果
			if (re.res == nil) == (re.err == nil) {
				panic(fmt.Sprintf("internal error: exactly one of res or err should be set; nil=%v", re.res == nil))
			}
			if debugRoundTrip {
				req.logf("resc recv: %p, %T/%#v", re.res, re.err, re.err)
			}
			if re.err != nil { // 有错误的情况
				return nil, pc.mapRoundTripError(req, startBytesWritten, re.err)
			}
			return re.res, nil // 正常情况
		case <-cancelChan: // 请求被取消
			pc.t.CancelRequest(req.Request)
			cancelChan = nil
		case <-ctxDoneChan: // 请求超时
			pc.t.cancelRequest(req.Request, req.Context().Err())
			cancelChan = nil
			ctxDoneChan = nil
		}
	}
}
getConn函数

这个函数的主要作用是获取一个有效连接

// 获取一个有效的连接
// 获取连接的方式有三种
// 1是从连接池中获取一个闲置连接
// 2是新创建一个连接
// 3是在等待新创建连接好期间,有别的请求释放了一个连接,则直接使用该连接
func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (pc *persistConn, err error) {
	req := treq.Request
	trace := treq.trace
	ctx := req.Context()
	if trace != nil && trace.GetConn != nil {
		trace.GetConn(cm.addr())
	}

	w := &wantConn{ // 底层连接可以复用,但wantConn一定是每个request对应一个,需要使用它进行同步
		cm:         cm,
		key:        cm.key(),
		ctx:        ctx,
		ready:      make(chan struct{}, 1),
		beforeDial: testHookPrePendingDial,
		afterDial:  testHookPostPendingDial,
	}
	defer func() {
		if err != nil {
			w.cancel(t, err)
		}
	}()

	// 核心函数,从缓存池中获取到闲置连接,如果获取到则直接使用;否则在等待连接队列中进行注册
	if delivered := t.queueForIdleConn(w); delivered {
		pc := w.pc
		// Trace only for HTTP/1.
		// HTTP/2 calls trace.GotConn itself.
		if pc.alt == nil && trace != nil && trace.GotConn != nil {
			trace.GotConn(pc.gotIdleConnTrace(pc.idleAt))
		}
		// set request canceler to some non-nil function so we
		// can detect whether it was cleared between now and when
		// we enter roundTrip
		t.setReqCanceler(req, func(error) {})
		return pc, nil
	}

	cancelc := make(chan error, 1)
	t.setReqCanceler(req, func(err error) { cancelc <- err })

	// 新创建一个连接,与此同时,它仍可能在等待其它正在使用的连接
	t.queueForDial(w)

	// Wait for completion or cancellation.
	select {
	case <-w.ready: // 连接已经创建好了,有以上三种情况
		// Trace success but only for HTTP/1.
		// HTTP/2 calls trace.GotConn itself.
		if w.pc != nil && w.pc.alt == nil && trace != nil && trace.GotConn != nil {
			trace.GotConn(httptrace.GotConnInfo{Conn: w.pc.conn, Reused: w.pc.isReused()})
		}
		if w.err != nil { // 有错误发生
			// If the request has been cancelled, that's probably
			// what caused w.err; if so, prefer to return the
			// cancellation error (see golang.org/issue/16049).
			select {
			case <-req.Cancel:
				return nil, errRequestCanceledConn
			case <-req.Context().Done():
				return nil, req.Context().Err()
			case err := <-cancelc:
				if err == errRequestCanceled {
					err = errRequestCanceledConn
				}
				return nil, err
			default:
				// return below
			}
		}
		return w.pc, w.err // 返回连接,此时连接是connect状态
	case <-req.Cancel:
		return nil, errRequestCanceledConn
	case <-req.Context().Done():
		return nil, req.Context().Err()
	case err := <-cancelc:
		if err == errRequestCanceled {
			err = errRequestCanceledConn
		}
		return nil, err
	}
}

要么从闲置连接池中获取,要么新创建一个连接。下面分别介绍其对应的实现

queueForIdleConn函数

该函数的主要作用是从闲置连接池获取空闲连接。

如果有空闲连接,delivered返回true;否则在等待连接队列里进行注册,这样如果之后有别的请求释放了一个连接,可以直接拿过来用

// 该函数的主要作用是从连接池中获取一个闲置连接
func (t *Transport) queueForIdleConn(w *wantConn) (delivered bool) {
	t.idleMu.Lock()
	defer t.idleMu.Unlock()

	// 设置闲置期最长的时间点
	var oldTime time.Time
	if t.IdleConnTimeout > 0 {
		oldTime = time.Now().Add(-t.IdleConnTimeout)
	}

	// 从最新的连接开始遍历
	if list, ok := t.idleConn[w.key]; ok {
		stop := false
		delivered := false
		for len(list) > 0 && !stop {
			pconn := list[len(list)-1] // 获取连接池中最新的连接

			// 下面这个函数设计得很巧妙,要知道我们在把一个连接放入连接池中时设置过过期timer,它会主动清理连接
			// 但在程序实际运行的过程中,很可能timer到时时,该协程并没有被cpu给调度上来,如果还继续使用的话,一旦清理协程被调度上来,会照成逻辑错误
			tooOld := !oldTime.IsZero() && pconn.idleAt.Round(0).Before(oldTime)
			if tooOld {
				go pconn.closeConnIfStillIdle() // 异步关闭连接
			}
			if pconn.isBroken() || tooOld {
				// 如果连接损坏,则使用下一个连接
				list = list[:len(list)-1]
				continue
			}
			delivered = w.tryDeliver(pconn, nil)
			if delivered { // 该连接被使用,如果delivered为false,表示已经从别的渠道接受到连接
				if pconn.alt != nil {
					// HTTP/2: multiple clients can share pconn.
					// Leave it in the list.
				} else {
					// HTTP/1: only one client can use pconn.
					// Remove it from the list.
					t.idleLRU.remove(pconn)
					list = list[:len(list)-1]
				}
			}
			stop = true
		}
		if len(list) > 0 {
			t.idleConn[w.key] = list
		} else {
			delete(t.idleConn, w.key)
		}
		if stop { // 1从连接池中获取到连接,2是在运行的过程中,创建新的连接已完成,这是也不需要在等待连接了
			return delivered
		}
	}

	if t.idleConnWait == nil {
		t.idleConnWait = make(map[connectMethodKey]wantConnQueue)
	}
	q := t.idleConnWait[w.key]
	q.cleanFront() // 清理过期连接
	q.pushBack(w) // 放入等待连接队列中
	t.idleConnWait[w.key] = q
	return false
}
queueForDial函数

这个函数的主要功能是创建一个新的连接

// 创建一个新的连接
func (t *Transport) queueForDial(w *wantConn) {
	w.beforeDial()
	if t.MaxConnsPerHost <= 0 { // 对每个host的连接数不做限制,直接创建
		go t.dialConnFor(w) // 注意这里启动了一个协程,它的好处是如果在新创建连接的过程中,有一个连接被释放,可以直接使用被释放的连接,而不用一直等待
		return
	}

	t.connsPerHostMu.Lock()
	defer t.connsPerHostMu.Unlock()

	if n := t.connsPerHost[w.key]; n < t.MaxConnsPerHost { // 不到最大连接数,直接创建新的连接
		if t.connsPerHost == nil {
			t.connsPerHost = make(map[connectMethodKey]int)
		}
		t.connsPerHost[w.key] = n + 1
		go t.dialConnFor(w)
		return
	}

    // 此时连接已经达到设置的最大值,放入等待队列中
	if t.connsPerHostWait == nil {
		t.connsPerHostWait = make(map[connectMethodKey]wantConnQueue)
	}
	q := t.connsPerHostWait[w.key]
	q.cleanFront()
	q.pushBack(w) // 放入等待列表中
	t.connsPerHostWait[w.key] = q
}
dialConnFor函数
// dialConnFor dials on behalf of w and delivers the result to w.
// dialConnFor has received permission to dial w.cm and is counted in t.connCount[w.cm.key()].
// If the dial is cancelled or unsuccessful, dialConnFor decrements t.connCount[w.cm.key()].
func (t *Transport) dialConnFor(w *wantConn) {
	defer w.afterDial()

	pc, err := t.dialConn(w.ctx, w.cm)               // 核心函数,创建连接
	delivered := w.tryDeliver(pc, err)               // 将新创建的连接赋值给w,如果返回false,表示已经从别的渠道获取到连接
	if err == nil && (!delivered || pc.alt != nil) { // 赋值不成功,说明w已经在这期间从等待列表中获取到有效的连接
		// pconn was not passed to w,
		// or it is HTTP/2 and can be shared.
		// Add to the idle connection pool.
		t.putOrCloseIdleConn(pc) // 将新创建的连接放入闲置连接池中,或者关闭掉
	}
	if err != nil {
		t.decConnsPerHost(w.key) // 创建失败,更新统计信息
	}
}
dialConn函数

这个函数主要有三个作用,1是创建底层的socket,2是将socket与read buffer和write buffer关联起来,3是启动readLoop和writeLoop,需要注意的是一旦socket创建成功,readLoop和writeLoop会循环运行,服务不同的请求,直到退出
func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) {
	pconn = &persistConn{
		t:             t,
		cacheKey:      cm.key(),
		reqch:         make(chan requestAndChan, 1),
		writech:       make(chan writeRequest, 1),
		closech:       make(chan struct{}),
		writeErrCh:    make(chan error, 1),
		writeLoopDone: make(chan struct{}),
	}
	trace := httptrace.ContextClientTrace(ctx)
	wrapErr := func(err error) error {
		if cm.proxyURL != nil {
			// Return a typed error, per Issue 16997
			return &net.OpError{Op: "proxyconnect", Net: "tcp", Err: err}
		}
		return err
	}
	if cm.scheme() == "https" && t.hasCustomTLSDialer() {
        ...
	} else {
		conn, err := t.dial(ctx, "tcp", cm.addr()) // 创建socket
		if err != nil {
			return nil, wrapErr(err)
		}
		pconn.conn = conn
	}

	pconn.br = bufio.NewReaderSize(pconn, t.readBufferSize())
	pconn.bw = bufio.NewWriterSize(persistConnWriter{pconn}, t.writeBufferSize())

	go pconn.readLoop()
	go pconn.writeLoop()
	return pconn, nil
}
writeLoop函数

实际的写循环

func (pc *persistConn) writeLoop() {
	defer close(pc.writeLoopDone)
	for {
		select {
		case wr := <-pc.writech: // 收到roundTrip的写命令
			startBytesWritten := pc.nwrite
			err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra, pc.waitForContinue(wr.continueCh)) // 主函数,实际写http数据
			if bre, ok := err.(requestBodyReadError); ok {
				err = bre.error
				wr.req.setError(err)
			}
			if err == nil {
				err = pc.bw.Flush()
			}
			if err != nil {
				wr.req.Request.closeBody()
				if pc.nwrite == startBytesWritten {
					err = nothingWrittenError{err}
				}
			}
			pc.writeErrCh <- err // 通知readLoop,用于判断连接是否可以回收
			wr.ch <- err         // 通知persistConn.roundTrip,设定读response的timeout
			if err != nil { // 有错误发生时不再复用连接
				pc.close(err)
				return
			}
			// 处理下一个请求
		case <-pc.closech: // 连接被关闭
			return
		}
	}
}
waitForContinue函数

该函数用来支持Expect:100-continue功能,它返回一个闭包函数,它的作用是阻塞继续执行,只到以下三种情况发生。1是在readLoop接受到100-continue的返回结果,会向continueCh发送一个数据,2是在100-continue等待server返回结果超时的情况,这时会继续将body发送出去,3是连接关闭的情况
func (pc *persistConn) waitForContinue(continueCh <-chan struct{}) func() bool {
	if continueCh == nil {
		return nil
	}
	return func() bool {
		timer := time.NewTimer(pc.t.ExpectContinueTimeout) // 设定超时时间
		defer timer.Stop()

		select {
		case _, ok := <-continueCh: // 如果readLoop接受到的消息表示不支持Expect:100-continue类型的请求,则会关闭channel
			return ok
		case <-timer.C: // 超时
			return true
		case <-pc.closech: // 连接被关闭
			return false
		}
	}
}
write函数
 这个函数实现了发送一次请求的实际 *** 作
主要的看点是Expect:100-continue的处理方式
func (r *Request) write(w io.Writer, usingProxy bool, extraHeaders Header, waitForContinue func() bool) (err error) {
	trace := httptrace.ContextClientTrace(r.Context())
	if trace != nil && trace.WroteRequest != nil {
		defer func() {
			trace.WroteRequest(httptrace.WroteRequestInfo{ // 完成写请求的回调函数
				Err: err,
			})
		}()
	}

	// 确定host
	host := cleanHost(r.Host)
	if host == "" {
		if r.URL == nil {
			return errMissingHost
		}
		host = cleanHost(r.URL.Host)
	}

	host = removeZone(host)

	ruri := r.URL.RequestURI()
	if usingProxy && r.URL.Scheme != "" && r.URL.Opaque == "" {
		ruri = r.URL.Scheme + "://" + host + ruri
	} else if r.Method == "CONNECT" && r.URL.Path == "" {
		// ConNECT requests normally give just the host and port, not a full URL.
		ruri = host
		if r.URL.Opaque != "" {
			ruri = r.URL.Opaque
		}
	}
	if stringContainsCTLByte(ruri) {
		return errors.New("net/http: can't write control character in Request.URL")
	}

	var bw *bufio.Writer
	if _, ok := w.(io.ByteWriter); !ok {
		bw = bufio.NewWriter(w)
		w = bw
	}
    // 开始写数据
	_, err = fmt.Fprintf(w, "%s %s HTTP/1.1rn", valueOrDefault(r.Method, "GET"), ruri)
	if err != nil {
		return err
	}

	// Header lines
	_, err = fmt.Fprintf(w, "Host: %srn", host)
	if err != nil {
		return err
	}
	if trace != nil && trace.WroteHeaderField != nil {
		trace.WroteHeaderField("Host", []string{host})
	}

	userAgent := defaultUserAgent
	if r.Header.has("User-Agent") {
		userAgent = r.Header.Get("User-Agent")
	}
	if userAgent != "" {
		_, err = fmt.Fprintf(w, "User-Agent: %srn", userAgent)
		if err != nil {
			return err
		}
		if trace != nil && trace.WroteHeaderField != nil {
			trace.WroteHeaderField("User-Agent", []string{userAgent})
		}
	}

	// Process Body,ContentLength,Close,Trailer
	tw, err := newTransferWriter(r)
	if err != nil {
		return err
	}
	err = tw.writeHeader(w, trace)
	if err != nil {
		return err
	}

	err = r.Header.writeSubset(w, reqWriteExcludeHeader, trace)
	if err != nil {
		return err
	}

	if extraHeaders != nil {
		err = extraHeaders.write(w, trace)
		if err != nil {
			return err
		}
	}

	_, err = io.WriteString(w, "rn")
	if err != nil {
		return err
	}
    // 完成header
	if trace != nil && trace.WroteHeaders != nil {
		trace.WroteHeaders()
	}

	// Flush and wait for 100-continue if expected.
	if waitForContinue != nil {
		if bw, ok := w.(*bufio.Writer); ok {
			err = bw.Flush() // 先发送header请求
			if err != nil {
				return err
			}
		}
		if trace != nil && trace.Wait100Continue != nil {
			trace.Wait100Continue()
		}
		if !waitForContinue() { // 等待server的返回,在以下两种情况下继续发送body,1是server返回ok,2是超时
			r.closeBody()
			return nil
		}
	}

	if bw, ok := w.(*bufio.Writer); ok && tw.FlushHeaders {
		if err := bw.Flush(); err != nil {
			return err
		}
	}

	// 写body
	err = tw.writeBody(w)
	if err != nil {
		if tw.bodyReadError == err {
			err = requestBodyReadError{err}
		}
		return err
	}

	if bw != nil {
		return bw.Flush()
	}
	return nil
}
readLoop函数

连接读循环

func (pc *persistConn) readLoop() {
	closeErr := errReadLoopExiting // default value, if not changed below
	defer func() {
		pc.close(closeErr)
		pc.t.removeIdleConn(pc)
	}()

	tryPutIdleConn := func(trace *httptrace.ClientTrace) bool {
		if err := pc.t.tryPutIdleConn(pc); err != nil { // 将连接放回空闲连接池
			closeErr = err
			if trace != nil && trace.PutIdleConn != nil && err != errKeepAlivesDisabled {
				trace.PutIdleConn(err)
			}
			return false
		}
		if trace != nil && trace.PutIdleConn != nil {
			trace.PutIdleConn(nil)
		}
		return true
	}

	// eofc is used to block caller goroutines reading from Response.Body
	// at EOF until this goroutines has (potentially) added the connection
	// back to the idle pool.
	eofc := make(chan struct{})
	defer close(eofc) // unblock reader on errors

	// Read this once, before loop starts. (to avoid races in tests)
	testHookMu.Lock()
	testHookReadLoopBeforeNextRead := testHookReadLoopBeforeNextRead
	testHookMu.Unlock()

	alive := true
	for alive {
		pc.readLimit = pc.maxHeaderResponseSize()
		_, err := pc.br.Peek(1)

		pc.mu.Lock()
		if pc.numExpectedResponses == 0 {  // 表示server端主动关闭连接
			pc.readLoopPeekFailLocked(err)
			pc.mu.Unlock()
			return
		}
		pc.mu.Unlock()

		rc := <-pc.reqch // 一次新的请求,和persistConn.roundTrip进行同步
		trace := httptrace.ContextClientTrace(rc.req.Context())

		var resp *Response
		if err == nil {
			resp, err = pc.readResponse(rc, trace) // 读请求的具体实现
		} else {
			err = transportReadFromServerError{err}
			closeErr = err
		}

		if err != nil {
			if pc.readLimit <= 0 {
				err = fmt.Errorf("net/http: server response headers exceeded %d bytes; aborted", pc.maxHeaderResponseSize())
			}

			select {
			case rc.ch <- responseAndError{err: err}:
			case <-rc.callerGone: // 调用方放弃请求,链接关闭。之所以不再复用该连接,可能是避免未知原因造成下次请求的异常
				return
			}
			return
		}
		pc.readLimit = maxInt64 // effectively no limit for response bodies

		pc.mu.Lock()
		pc.numExpectedResponses--
		pc.mu.Unlock()

		bodyWritable := resp.bodyIsWritable()
		hasBody := rc.req.Method != "HEAD" && resp.ContentLength != 0

		if resp.Close || rc.req.Close || resp.StatusCode <= 199 || bodyWritable {
			// Don't do keep-alive on error if either party requested a close
			// or we get an unexpected informational (1xx) response.
			// StatusCode 100 is already handled above.
			alive = false
		}

		if !hasBody || bodyWritable {
			pc.t.setReqCanceler(rc.req, nil)

			// Put the idle conn back into the pool before we send the response
			// so if they process it quickly and make another request, they'll
			// get this same conn. But we use the unbuffered channel 'rc'
			// to guarantee that persistConn.roundTrip got out of its select
			// potentially waiting for this persistConn to close.
			// but after
			alive = alive &&
				!pc.sawEOF &&
				pc.wroteRequest() && // 这个函数有两个作用,1是确保请求已经写完,因为不排除服务端还未完全接受到请求时就返回结果;2是判断此次请求在writeLoop中是否发生错误,如果有错误则连接失效
				tryPutIdleConn(trace) // 将该连接放入连接池中

			if bodyWritable {
				closeErr = errCallerOwnsConn
			}

			select {
			case rc.ch <- responseAndError{res: resp}: // 写response,会在persistConn.roundTrip函数中获取到该结果
			case <-rc.callerGone:
				return
			}
			continue // 处理下一个请求
		}

		waitForBodyRead := make(chan bool, 2)
		body := &bodyEOFSignal{
			body: resp.Body,
			earlyCloseFn: func() error {
				waitForBodyRead <- false
				<-eofc // will be closed by deferred call at the end of the function
				return nil

			},
			fn: func(err error) error {
				isEOF := err == io.EOF
				waitForBodyRead <- isEOF
				if isEOF {
					<-eofc // see comment above eofc declaration
				} else if err != nil {
					if cerr := pc.canceled(); cerr != nil {
						return cerr
					}
				}
				return err
			},
		}

		resp.Body = body
		if rc.addedGzip && strings.EqualFold(resp.Header.Get("Content-Encoding"), "gzip") {
			resp.Body = &gzipReader{body: body}
			resp.Header.Del("Content-Encoding")
			resp.Header.Del("Content-Length")
			resp.ContentLength = -1
			resp.Uncompressed = true
		}

		select {
		case rc.ch <- responseAndError{res: resp}: // 写response,会在persistConn.roundTrip函数中获取到该结果
		case <-rc.callerGone:
			return
		}

		// Before looping back to the top of this function and peeking on
		// the bufio.Reader, wait for the caller goroutine to finish
		// reading the response body. (or for cancellation or death)
		select {
		case bodyEOF := <-waitForBodyRead:
			pc.t.setReqCanceler(rc.req, nil) // before pc might return to idle pool
			alive = alive &&
				bodyEOF &&
				!pc.sawEOF &&
				pc.wroteRequest() &&
				tryPutIdleConn(trace) // 将该连接放入连接池中
			if bodyEOF {
				eofc <- struct{}{}
			}
		case <-rc.req.Cancel:
			alive = false
			pc.t.CancelRequest(rc.req)
		case <-rc.req.Context().Done():
			alive = false
			pc.t.cancelRequest(rc.req, rc.req.Context().Err())
		case <-pc.closech: // 连接被关闭,可能是lru被踢出出来了
			alive = false
		}

		testHookReadLoopBeforeNextRead()
	}
}
readResponse函数
读取一次http请求返回,如果是header带“Expect:100-continue”的请求,则在正常情况下会收到两次server的返回结果
关于Expect:100-continue的介绍可以参考https://www.jianshu.com/p/154c310748db
func (pc *persistConn) readResponse(rc requestAndChan, trace *httptrace.ClientTrace) (resp *Response, err error) {
	if trace != nil && trace.GotFirstResponseByte != nil {
		if peek, err := pc.br.Peek(1); err == nil && len(peek) == 1 {
			trace.GotFirstResponseByte()
		}
	}
	num1xx := 0               // number of informational 1xx headers received
	const max1xxResponses = 5 // arbitrary bound on number of informational responses

	continueCh := rc.continueCh
	for {
		resp, err = ReadResponse(pc.br, rc.req) // 读取server的返回请求结果
		if err != nil {
			return
		}
		resCode := resp.StatusCode
		if continueCh != nil { // 表示header中带“Expect:100-continue”的请求
			if resCode == 100 {
				if trace != nil && trace.Got100Continue != nil { // 回调函数
					trace.Got100Continue()
				}
				continueCh <- struct{}{} // 发送通知,让writeLoop可以继续发送请求的body
				continueCh = nil // 重置channel
			} else if resCode >= 200 { // 不支持Expect:100-continue,通知writeLoop取消发送
				close(continueCh)
				continueCh = nil // 重置channel
			}
		}
		break
	}
	return
}

高性能分析

1、使用闲置连接池,省去了创建连接和关闭连接的时间开销;

2、一个请求同时运行读和写,这样如果server在完成读请求前就返回的话,可以提升性能;

3、一个请求获取连接的方式有三种,1是从连接池中获取一个闲置连接,2是新创建一个连接,3是在等待新创建连接好期间,有别的请求释放了一个连接,则直接使用该连接。可见作者已经将性能推至极限;

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5685065.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存