transport.go 95 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077307830793080308130823083308430853086308730883089309030913092309330943095309630973098309931003101310231033104310531063107310831093110311131123113311431153116311731183119312031213122312331243125312631273128312931303131313231333134313531363137313831393140314131423143314431453146314731483149315031513152315331543155315631573158315931603161316231633164316531663167316831693170317131723173317431753176317731783179318031813182318331843185318631873188318931903191319231933194319531963197319831993200320132023203320432053206320732083209321032113212321332143215321632173218321932203221322232233224322532263227322832293230323132323233323432353236323732383239324032413242324332443245324632473248324932503251325232533254325532563257325832593260326132623263326432653266326732683269327032713272327332743275327632773278327932803281328232833284328532863287
  1. // Copyright 2015 The Go Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. // Transport code.
  5. package http2
  6. import (
  7. "bufio"
  8. "bytes"
  9. "compress/gzip"
  10. "context"
  11. "crypto/rand"
  12. "crypto/tls"
  13. "errors"
  14. "fmt"
  15. "io"
  16. "io/fs"
  17. "log"
  18. "math"
  19. "math/bits"
  20. mathrand "math/rand"
  21. "net"
  22. "net/http"
  23. "net/http/httptrace"
  24. "net/textproto"
  25. "strconv"
  26. "strings"
  27. "sync"
  28. "sync/atomic"
  29. "time"
  30. "golang.org/x/net/http/httpguts"
  31. "golang.org/x/net/http2/hpack"
  32. "golang.org/x/net/idna"
  33. "golang.org/x/net/internal/httpcommon"
  34. )
  35. const (
  36. // transportDefaultConnFlow is how many connection-level flow control
  37. // tokens we give the server at start-up, past the default 64k.
  38. transportDefaultConnFlow = 1 << 30
  39. // transportDefaultStreamFlow is how many stream-level flow
  40. // control tokens we announce to the peer, and how many bytes
  41. // we buffer per stream.
  42. transportDefaultStreamFlow = 4 << 20
  43. defaultUserAgent = "Go-http-client/2.0"
  44. // initialMaxConcurrentStreams is a connections maxConcurrentStreams until
  45. // it's received servers initial SETTINGS frame, which corresponds with the
  46. // spec's minimum recommended value.
  47. initialMaxConcurrentStreams = 100
  48. // defaultMaxConcurrentStreams is a connections default maxConcurrentStreams
  49. // if the server doesn't include one in its initial SETTINGS frame.
  50. defaultMaxConcurrentStreams = 1000
  51. )
  52. // Transport is an HTTP/2 Transport.
  53. //
  54. // A Transport internally caches connections to servers. It is safe
  55. // for concurrent use by multiple goroutines.
  56. type Transport struct {
  57. // DialTLSContext specifies an optional dial function with context for
  58. // creating TLS connections for requests.
  59. //
  60. // If DialTLSContext and DialTLS is nil, tls.Dial is used.
  61. //
  62. // If the returned net.Conn has a ConnectionState method like tls.Conn,
  63. // it will be used to set http.Response.TLS.
  64. DialTLSContext func(ctx context.Context, network, addr string, cfg *tls.Config) (net.Conn, error)
  65. // DialTLS specifies an optional dial function for creating
  66. // TLS connections for requests.
  67. //
  68. // If DialTLSContext and DialTLS is nil, tls.Dial is used.
  69. //
  70. // Deprecated: Use DialTLSContext instead, which allows the transport
  71. // to cancel dials as soon as they are no longer needed.
  72. // If both are set, DialTLSContext takes priority.
  73. DialTLS func(network, addr string, cfg *tls.Config) (net.Conn, error)
  74. // TLSClientConfig specifies the TLS configuration to use with
  75. // tls.Client. If nil, the default configuration is used.
  76. TLSClientConfig *tls.Config
  77. // ConnPool optionally specifies an alternate connection pool to use.
  78. // If nil, the default is used.
  79. ConnPool ClientConnPool
  80. // DisableCompression, if true, prevents the Transport from
  81. // requesting compression with an "Accept-Encoding: gzip"
  82. // request header when the Request contains no existing
  83. // Accept-Encoding value. If the Transport requests gzip on
  84. // its own and gets a gzipped response, it's transparently
  85. // decoded in the Response.Body. However, if the user
  86. // explicitly requested gzip it is not automatically
  87. // uncompressed.
  88. DisableCompression bool
  89. // AllowHTTP, if true, permits HTTP/2 requests using the insecure,
  90. // plain-text "http" scheme. Note that this does not enable h2c support.
  91. AllowHTTP bool
  92. // MaxHeaderListSize is the http2 SETTINGS_MAX_HEADER_LIST_SIZE to
  93. // send in the initial settings frame. It is how many bytes
  94. // of response headers are allowed. Unlike the http2 spec, zero here
  95. // means to use a default limit (currently 10MB). If you actually
  96. // want to advertise an unlimited value to the peer, Transport
  97. // interprets the highest possible value here (0xffffffff or 1<<32-1)
  98. // to mean no limit.
  99. MaxHeaderListSize uint32
  100. // MaxReadFrameSize is the http2 SETTINGS_MAX_FRAME_SIZE to send in the
  101. // initial settings frame. It is the size in bytes of the largest frame
  102. // payload that the sender is willing to receive. If 0, no setting is
  103. // sent, and the value is provided by the peer, which should be 16384
  104. // according to the spec:
  105. // https://datatracker.ietf.org/doc/html/rfc7540#section-6.5.2.
  106. // Values are bounded in the range 16k to 16M.
  107. MaxReadFrameSize uint32
  108. // MaxDecoderHeaderTableSize optionally specifies the http2
  109. // SETTINGS_HEADER_TABLE_SIZE to send in the initial settings frame. It
  110. // informs the remote endpoint of the maximum size of the header compression
  111. // table used to decode header blocks, in octets. If zero, the default value
  112. // of 4096 is used.
  113. MaxDecoderHeaderTableSize uint32
  114. // MaxEncoderHeaderTableSize optionally specifies an upper limit for the
  115. // header compression table used for encoding request headers. Received
  116. // SETTINGS_HEADER_TABLE_SIZE settings are capped at this limit. If zero,
  117. // the default value of 4096 is used.
  118. MaxEncoderHeaderTableSize uint32
  119. // StrictMaxConcurrentStreams controls whether the server's
  120. // SETTINGS_MAX_CONCURRENT_STREAMS should be respected
  121. // globally. If false, new TCP connections are created to the
  122. // server as needed to keep each under the per-connection
  123. // SETTINGS_MAX_CONCURRENT_STREAMS limit. If true, the
  124. // server's SETTINGS_MAX_CONCURRENT_STREAMS is interpreted as
  125. // a global limit and callers of RoundTrip block when needed,
  126. // waiting for their turn.
  127. StrictMaxConcurrentStreams bool
  128. // IdleConnTimeout is the maximum amount of time an idle
  129. // (keep-alive) connection will remain idle before closing
  130. // itself.
  131. // Zero means no limit.
  132. IdleConnTimeout time.Duration
  133. // ReadIdleTimeout is the timeout after which a health check using ping
  134. // frame will be carried out if no frame is received on the connection.
  135. // Note that a ping response will is considered a received frame, so if
  136. // there is no other traffic on the connection, the health check will
  137. // be performed every ReadIdleTimeout interval.
  138. // If zero, no health check is performed.
  139. ReadIdleTimeout time.Duration
  140. // PingTimeout is the timeout after which the connection will be closed
  141. // if a response to Ping is not received.
  142. // Defaults to 15s.
  143. PingTimeout time.Duration
  144. // WriteByteTimeout is the timeout after which the connection will be
  145. // closed no data can be written to it. The timeout begins when data is
  146. // available to write, and is extended whenever any bytes are written.
  147. WriteByteTimeout time.Duration
  148. // CountError, if non-nil, is called on HTTP/2 transport errors.
  149. // It's intended to increment a metric for monitoring, such
  150. // as an expvar or Prometheus metric.
  151. // The errType consists of only ASCII word characters.
  152. CountError func(errType string)
  153. // t1, if non-nil, is the standard library Transport using
  154. // this transport. Its settings are used (but not its
  155. // RoundTrip method, etc).
  156. t1 *http.Transport
  157. connPoolOnce sync.Once
  158. connPoolOrDef ClientConnPool // non-nil version of ConnPool
  159. *transportTestHooks
  160. }
  161. // Hook points used for testing.
  162. // Outside of tests, t.transportTestHooks is nil and these all have minimal implementations.
  163. // Inside tests, see the testSyncHooks function docs.
  164. type transportTestHooks struct {
  165. newclientconn func(*ClientConn)
  166. group synctestGroupInterface
  167. }
  168. func (t *Transport) markNewGoroutine() {
  169. if t != nil && t.transportTestHooks != nil {
  170. t.transportTestHooks.group.Join()
  171. }
  172. }
  173. func (t *Transport) now() time.Time {
  174. if t != nil && t.transportTestHooks != nil {
  175. return t.transportTestHooks.group.Now()
  176. }
  177. return time.Now()
  178. }
  179. func (t *Transport) timeSince(when time.Time) time.Duration {
  180. if t != nil && t.transportTestHooks != nil {
  181. return t.now().Sub(when)
  182. }
  183. return time.Since(when)
  184. }
  185. // newTimer creates a new time.Timer, or a synthetic timer in tests.
  186. func (t *Transport) newTimer(d time.Duration) timer {
  187. if t.transportTestHooks != nil {
  188. return t.transportTestHooks.group.NewTimer(d)
  189. }
  190. return timeTimer{time.NewTimer(d)}
  191. }
  192. // afterFunc creates a new time.AfterFunc timer, or a synthetic timer in tests.
  193. func (t *Transport) afterFunc(d time.Duration, f func()) timer {
  194. if t.transportTestHooks != nil {
  195. return t.transportTestHooks.group.AfterFunc(d, f)
  196. }
  197. return timeTimer{time.AfterFunc(d, f)}
  198. }
  199. func (t *Transport) contextWithTimeout(ctx context.Context, d time.Duration) (context.Context, context.CancelFunc) {
  200. if t.transportTestHooks != nil {
  201. return t.transportTestHooks.group.ContextWithTimeout(ctx, d)
  202. }
  203. return context.WithTimeout(ctx, d)
  204. }
  205. func (t *Transport) maxHeaderListSize() uint32 {
  206. n := int64(t.MaxHeaderListSize)
  207. if t.t1 != nil && t.t1.MaxResponseHeaderBytes != 0 {
  208. n = t.t1.MaxResponseHeaderBytes
  209. if n > 0 {
  210. n = adjustHTTP1MaxHeaderSize(n)
  211. }
  212. }
  213. if n <= 0 {
  214. return 10 << 20
  215. }
  216. if n >= 0xffffffff {
  217. return 0
  218. }
  219. return uint32(n)
  220. }
  221. func (t *Transport) disableCompression() bool {
  222. return t.DisableCompression || (t.t1 != nil && t.t1.DisableCompression)
  223. }
  224. // ConfigureTransport configures a net/http HTTP/1 Transport to use HTTP/2.
  225. // It returns an error if t1 has already been HTTP/2-enabled.
  226. //
  227. // Use ConfigureTransports instead to configure the HTTP/2 Transport.
  228. func ConfigureTransport(t1 *http.Transport) error {
  229. _, err := ConfigureTransports(t1)
  230. return err
  231. }
  232. // ConfigureTransports configures a net/http HTTP/1 Transport to use HTTP/2.
  233. // It returns a new HTTP/2 Transport for further configuration.
  234. // It returns an error if t1 has already been HTTP/2-enabled.
  235. func ConfigureTransports(t1 *http.Transport) (*Transport, error) {
  236. return configureTransports(t1)
  237. }
  238. func configureTransports(t1 *http.Transport) (*Transport, error) {
  239. connPool := new(clientConnPool)
  240. t2 := &Transport{
  241. ConnPool: noDialClientConnPool{connPool},
  242. t1: t1,
  243. }
  244. connPool.t = t2
  245. if err := registerHTTPSProtocol(t1, noDialH2RoundTripper{t2}); err != nil {
  246. return nil, err
  247. }
  248. if t1.TLSClientConfig == nil {
  249. t1.TLSClientConfig = new(tls.Config)
  250. }
  251. if !strSliceContains(t1.TLSClientConfig.NextProtos, "h2") {
  252. t1.TLSClientConfig.NextProtos = append([]string{"h2"}, t1.TLSClientConfig.NextProtos...)
  253. }
  254. if !strSliceContains(t1.TLSClientConfig.NextProtos, "http/1.1") {
  255. t1.TLSClientConfig.NextProtos = append(t1.TLSClientConfig.NextProtos, "http/1.1")
  256. }
  257. upgradeFn := func(scheme, authority string, c net.Conn) http.RoundTripper {
  258. addr := authorityAddr(scheme, authority)
  259. if used, err := connPool.addConnIfNeeded(addr, t2, c); err != nil {
  260. go c.Close()
  261. return erringRoundTripper{err}
  262. } else if !used {
  263. // Turns out we don't need this c.
  264. // For example, two goroutines made requests to the same host
  265. // at the same time, both kicking off TCP dials. (since protocol
  266. // was unknown)
  267. go c.Close()
  268. }
  269. if scheme == "http" {
  270. return (*unencryptedTransport)(t2)
  271. }
  272. return t2
  273. }
  274. if t1.TLSNextProto == nil {
  275. t1.TLSNextProto = make(map[string]func(string, *tls.Conn) http.RoundTripper)
  276. }
  277. t1.TLSNextProto[NextProtoTLS] = func(authority string, c *tls.Conn) http.RoundTripper {
  278. return upgradeFn("https", authority, c)
  279. }
  280. // The "unencrypted_http2" TLSNextProto key is used to pass off non-TLS HTTP/2 conns.
  281. t1.TLSNextProto[nextProtoUnencryptedHTTP2] = func(authority string, c *tls.Conn) http.RoundTripper {
  282. nc, err := unencryptedNetConnFromTLSConn(c)
  283. if err != nil {
  284. go c.Close()
  285. return erringRoundTripper{err}
  286. }
  287. return upgradeFn("http", authority, nc)
  288. }
  289. return t2, nil
  290. }
  291. // unencryptedTransport is a Transport with a RoundTrip method that
  292. // always permits http:// URLs.
  293. type unencryptedTransport Transport
  294. func (t *unencryptedTransport) RoundTrip(req *http.Request) (*http.Response, error) {
  295. return (*Transport)(t).RoundTripOpt(req, RoundTripOpt{allowHTTP: true})
  296. }
  297. func (t *Transport) connPool() ClientConnPool {
  298. t.connPoolOnce.Do(t.initConnPool)
  299. return t.connPoolOrDef
  300. }
  301. func (t *Transport) initConnPool() {
  302. if t.ConnPool != nil {
  303. t.connPoolOrDef = t.ConnPool
  304. } else {
  305. t.connPoolOrDef = &clientConnPool{t: t}
  306. }
  307. }
  308. // ClientConn is the state of a single HTTP/2 client connection to an
  309. // HTTP/2 server.
  310. type ClientConn struct {
  311. t *Transport
  312. tconn net.Conn // usually *tls.Conn, except specialized impls
  313. tlsState *tls.ConnectionState // nil only for specialized impls
  314. atomicReused uint32 // whether conn is being reused; atomic
  315. singleUse bool // whether being used for a single http.Request
  316. getConnCalled bool // used by clientConnPool
  317. // readLoop goroutine fields:
  318. readerDone chan struct{} // closed on error
  319. readerErr error // set before readerDone is closed
  320. idleTimeout time.Duration // or 0 for never
  321. idleTimer timer
  322. mu sync.Mutex // guards following
  323. cond *sync.Cond // hold mu; broadcast on flow/closed changes
  324. flow outflow // our conn-level flow control quota (cs.outflow is per stream)
  325. inflow inflow // peer's conn-level flow control
  326. doNotReuse bool // whether conn is marked to not be reused for any future requests
  327. closing bool
  328. closed bool
  329. closedOnIdle bool // true if conn was closed for idleness
  330. seenSettings bool // true if we've seen a settings frame, false otherwise
  331. seenSettingsChan chan struct{} // closed when seenSettings is true or frame reading fails
  332. wantSettingsAck bool // we sent a SETTINGS frame and haven't heard back
  333. goAway *GoAwayFrame // if non-nil, the GoAwayFrame we received
  334. goAwayDebug string // goAway frame's debug data, retained as a string
  335. streams map[uint32]*clientStream // client-initiated
  336. streamsReserved int // incr by ReserveNewRequest; decr on RoundTrip
  337. nextStreamID uint32
  338. pendingRequests int // requests blocked and waiting to be sent because len(streams) == maxConcurrentStreams
  339. pings map[[8]byte]chan struct{} // in flight ping data to notification channel
  340. br *bufio.Reader
  341. lastActive time.Time
  342. lastIdle time.Time // time last idle
  343. // Settings from peer: (also guarded by wmu)
  344. maxFrameSize uint32
  345. maxConcurrentStreams uint32
  346. peerMaxHeaderListSize uint64
  347. peerMaxHeaderTableSize uint32
  348. initialWindowSize uint32
  349. initialStreamRecvWindowSize int32
  350. readIdleTimeout time.Duration
  351. pingTimeout time.Duration
  352. extendedConnectAllowed bool
  353. // rstStreamPingsBlocked works around an unfortunate gRPC behavior.
  354. // gRPC strictly limits the number of PING frames that it will receive.
  355. // The default is two pings per two hours, but the limit resets every time
  356. // the gRPC endpoint sends a HEADERS or DATA frame. See golang/go#70575.
  357. //
  358. // rstStreamPingsBlocked is set after receiving a response to a PING frame
  359. // bundled with an RST_STREAM (see pendingResets below), and cleared after
  360. // receiving a HEADERS or DATA frame.
  361. rstStreamPingsBlocked bool
  362. // pendingResets is the number of RST_STREAM frames we have sent to the peer,
  363. // without confirming that the peer has received them. When we send a RST_STREAM,
  364. // we bundle it with a PING frame, unless a PING is already in flight. We count
  365. // the reset stream against the connection's concurrency limit until we get
  366. // a PING response. This limits the number of requests we'll try to send to a
  367. // completely unresponsive connection.
  368. pendingResets int
  369. // reqHeaderMu is a 1-element semaphore channel controlling access to sending new requests.
  370. // Write to reqHeaderMu to lock it, read from it to unlock.
  371. // Lock reqmu BEFORE mu or wmu.
  372. reqHeaderMu chan struct{}
  373. // wmu is held while writing.
  374. // Acquire BEFORE mu when holding both, to avoid blocking mu on network writes.
  375. // Only acquire both at the same time when changing peer settings.
  376. wmu sync.Mutex
  377. bw *bufio.Writer
  378. fr *Framer
  379. werr error // first write error that has occurred
  380. hbuf bytes.Buffer // HPACK encoder writes into this
  381. henc *hpack.Encoder
  382. }
  383. // clientStream is the state for a single HTTP/2 stream. One of these
  384. // is created for each Transport.RoundTrip call.
  385. type clientStream struct {
  386. cc *ClientConn
  387. // Fields of Request that we may access even after the response body is closed.
  388. ctx context.Context
  389. reqCancel <-chan struct{}
  390. trace *httptrace.ClientTrace // or nil
  391. ID uint32
  392. bufPipe pipe // buffered pipe with the flow-controlled response payload
  393. requestedGzip bool
  394. isHead bool
  395. abortOnce sync.Once
  396. abort chan struct{} // closed to signal stream should end immediately
  397. abortErr error // set if abort is closed
  398. peerClosed chan struct{} // closed when the peer sends an END_STREAM flag
  399. donec chan struct{} // closed after the stream is in the closed state
  400. on100 chan struct{} // buffered; written to if a 100 is received
  401. respHeaderRecv chan struct{} // closed when headers are received
  402. res *http.Response // set if respHeaderRecv is closed
  403. flow outflow // guarded by cc.mu
  404. inflow inflow // guarded by cc.mu
  405. bytesRemain int64 // -1 means unknown; owned by transportResponseBody.Read
  406. readErr error // sticky read error; owned by transportResponseBody.Read
  407. reqBody io.ReadCloser
  408. reqBodyContentLength int64 // -1 means unknown
  409. reqBodyClosed chan struct{} // guarded by cc.mu; non-nil on Close, closed when done
  410. // owned by writeRequest:
  411. sentEndStream bool // sent an END_STREAM flag to the peer
  412. sentHeaders bool
  413. // owned by clientConnReadLoop:
  414. firstByte bool // got the first response byte
  415. pastHeaders bool // got first MetaHeadersFrame (actual headers)
  416. pastTrailers bool // got optional second MetaHeadersFrame (trailers)
  417. readClosed bool // peer sent an END_STREAM flag
  418. readAborted bool // read loop reset the stream
  419. totalHeaderSize int64 // total size of 1xx headers seen
  420. trailer http.Header // accumulated trailers
  421. resTrailer *http.Header // client's Response.Trailer
  422. }
  423. var got1xxFuncForTests func(int, textproto.MIMEHeader) error
  424. // get1xxTraceFunc returns the value of request's httptrace.ClientTrace.Got1xxResponse func,
  425. // if any. It returns nil if not set or if the Go version is too old.
  426. func (cs *clientStream) get1xxTraceFunc() func(int, textproto.MIMEHeader) error {
  427. if fn := got1xxFuncForTests; fn != nil {
  428. return fn
  429. }
  430. return traceGot1xxResponseFunc(cs.trace)
  431. }
  432. func (cs *clientStream) abortStream(err error) {
  433. cs.cc.mu.Lock()
  434. defer cs.cc.mu.Unlock()
  435. cs.abortStreamLocked(err)
  436. }
  437. func (cs *clientStream) abortStreamLocked(err error) {
  438. cs.abortOnce.Do(func() {
  439. cs.abortErr = err
  440. close(cs.abort)
  441. })
  442. if cs.reqBody != nil {
  443. cs.closeReqBodyLocked()
  444. }
  445. // TODO(dneil): Clean up tests where cs.cc.cond is nil.
  446. if cs.cc.cond != nil {
  447. // Wake up writeRequestBody if it is waiting on flow control.
  448. cs.cc.cond.Broadcast()
  449. }
  450. }
  451. func (cs *clientStream) abortRequestBodyWrite() {
  452. cc := cs.cc
  453. cc.mu.Lock()
  454. defer cc.mu.Unlock()
  455. if cs.reqBody != nil && cs.reqBodyClosed == nil {
  456. cs.closeReqBodyLocked()
  457. cc.cond.Broadcast()
  458. }
  459. }
  460. func (cs *clientStream) closeReqBodyLocked() {
  461. if cs.reqBodyClosed != nil {
  462. return
  463. }
  464. cs.reqBodyClosed = make(chan struct{})
  465. reqBodyClosed := cs.reqBodyClosed
  466. go func() {
  467. cs.cc.t.markNewGoroutine()
  468. cs.reqBody.Close()
  469. close(reqBodyClosed)
  470. }()
  471. }
  472. type stickyErrWriter struct {
  473. group synctestGroupInterface
  474. conn net.Conn
  475. timeout time.Duration
  476. err *error
  477. }
  478. func (sew stickyErrWriter) Write(p []byte) (n int, err error) {
  479. if *sew.err != nil {
  480. return 0, *sew.err
  481. }
  482. n, err = writeWithByteTimeout(sew.group, sew.conn, sew.timeout, p)
  483. *sew.err = err
  484. return n, err
  485. }
  486. // noCachedConnError is the concrete type of ErrNoCachedConn, which
  487. // needs to be detected by net/http regardless of whether it's its
  488. // bundled version (in h2_bundle.go with a rewritten type name) or
  489. // from a user's x/net/http2. As such, as it has a unique method name
  490. // (IsHTTP2NoCachedConnError) that net/http sniffs for via func
  491. // isNoCachedConnError.
  492. type noCachedConnError struct{}
  493. func (noCachedConnError) IsHTTP2NoCachedConnError() {}
  494. func (noCachedConnError) Error() string { return "http2: no cached connection was available" }
  495. // isNoCachedConnError reports whether err is of type noCachedConnError
  496. // or its equivalent renamed type in net/http2's h2_bundle.go. Both types
  497. // may coexist in the same running program.
  498. func isNoCachedConnError(err error) bool {
  499. _, ok := err.(interface{ IsHTTP2NoCachedConnError() })
  500. return ok
  501. }
  502. var ErrNoCachedConn error = noCachedConnError{}
  503. // RoundTripOpt are options for the Transport.RoundTripOpt method.
  504. type RoundTripOpt struct {
  505. // OnlyCachedConn controls whether RoundTripOpt may
  506. // create a new TCP connection. If set true and
  507. // no cached connection is available, RoundTripOpt
  508. // will return ErrNoCachedConn.
  509. OnlyCachedConn bool
  510. allowHTTP bool // allow http:// URLs
  511. }
  512. func (t *Transport) RoundTrip(req *http.Request) (*http.Response, error) {
  513. return t.RoundTripOpt(req, RoundTripOpt{})
  514. }
  515. // authorityAddr returns a given authority (a host/IP, or host:port / ip:port)
  516. // and returns a host:port. The port 443 is added if needed.
  517. func authorityAddr(scheme string, authority string) (addr string) {
  518. host, port, err := net.SplitHostPort(authority)
  519. if err != nil { // authority didn't have a port
  520. host = authority
  521. port = ""
  522. }
  523. if port == "" { // authority's port was empty
  524. port = "443"
  525. if scheme == "http" {
  526. port = "80"
  527. }
  528. }
  529. if a, err := idna.ToASCII(host); err == nil {
  530. host = a
  531. }
  532. // IPv6 address literal, without a port:
  533. if strings.HasPrefix(host, "[") && strings.HasSuffix(host, "]") {
  534. return host + ":" + port
  535. }
  536. return net.JoinHostPort(host, port)
  537. }
  538. // RoundTripOpt is like RoundTrip, but takes options.
  539. func (t *Transport) RoundTripOpt(req *http.Request, opt RoundTripOpt) (*http.Response, error) {
  540. switch req.URL.Scheme {
  541. case "https":
  542. // Always okay.
  543. case "http":
  544. if !t.AllowHTTP && !opt.allowHTTP {
  545. return nil, errors.New("http2: unencrypted HTTP/2 not enabled")
  546. }
  547. default:
  548. return nil, errors.New("http2: unsupported scheme")
  549. }
  550. addr := authorityAddr(req.URL.Scheme, req.URL.Host)
  551. for retry := 0; ; retry++ {
  552. cc, err := t.connPool().GetClientConn(req, addr)
  553. if err != nil {
  554. t.vlogf("http2: Transport failed to get client conn for %s: %v", addr, err)
  555. return nil, err
  556. }
  557. reused := !atomic.CompareAndSwapUint32(&cc.atomicReused, 0, 1)
  558. traceGotConn(req, cc, reused)
  559. res, err := cc.RoundTrip(req)
  560. if err != nil && retry <= 6 {
  561. roundTripErr := err
  562. if req, err = shouldRetryRequest(req, err); err == nil {
  563. // After the first retry, do exponential backoff with 10% jitter.
  564. if retry == 0 {
  565. t.vlogf("RoundTrip retrying after failure: %v", roundTripErr)
  566. continue
  567. }
  568. backoff := float64(uint(1) << (uint(retry) - 1))
  569. backoff += backoff * (0.1 * mathrand.Float64())
  570. d := time.Second * time.Duration(backoff)
  571. tm := t.newTimer(d)
  572. select {
  573. case <-tm.C():
  574. t.vlogf("RoundTrip retrying after failure: %v", roundTripErr)
  575. continue
  576. case <-req.Context().Done():
  577. tm.Stop()
  578. err = req.Context().Err()
  579. }
  580. }
  581. }
  582. if err == errClientConnNotEstablished {
  583. // This ClientConn was created recently,
  584. // this is the first request to use it,
  585. // and the connection is closed and not usable.
  586. //
  587. // In this state, cc.idleTimer will remove the conn from the pool
  588. // when it fires. Stop the timer and remove it here so future requests
  589. // won't try to use this connection.
  590. //
  591. // If the timer has already fired and we're racing it, the redundant
  592. // call to MarkDead is harmless.
  593. if cc.idleTimer != nil {
  594. cc.idleTimer.Stop()
  595. }
  596. t.connPool().MarkDead(cc)
  597. }
  598. if err != nil {
  599. t.vlogf("RoundTrip failure: %v", err)
  600. return nil, err
  601. }
  602. return res, nil
  603. }
  604. }
  605. // CloseIdleConnections closes any connections which were previously
  606. // connected from previous requests but are now sitting idle.
  607. // It does not interrupt any connections currently in use.
  608. func (t *Transport) CloseIdleConnections() {
  609. if cp, ok := t.connPool().(clientConnPoolIdleCloser); ok {
  610. cp.closeIdleConnections()
  611. }
  612. }
  613. var (
  614. errClientConnClosed = errors.New("http2: client conn is closed")
  615. errClientConnUnusable = errors.New("http2: client conn not usable")
  616. errClientConnNotEstablished = errors.New("http2: client conn could not be established")
  617. errClientConnGotGoAway = errors.New("http2: Transport received Server's graceful shutdown GOAWAY")
  618. )
  619. // shouldRetryRequest is called by RoundTrip when a request fails to get
  620. // response headers. It is always called with a non-nil error.
  621. // It returns either a request to retry (either the same request, or a
  622. // modified clone), or an error if the request can't be replayed.
  623. func shouldRetryRequest(req *http.Request, err error) (*http.Request, error) {
  624. if !canRetryError(err) {
  625. return nil, err
  626. }
  627. // If the Body is nil (or http.NoBody), it's safe to reuse
  628. // this request and its Body.
  629. if req.Body == nil || req.Body == http.NoBody {
  630. return req, nil
  631. }
  632. // If the request body can be reset back to its original
  633. // state via the optional req.GetBody, do that.
  634. if req.GetBody != nil {
  635. body, err := req.GetBody()
  636. if err != nil {
  637. return nil, err
  638. }
  639. newReq := *req
  640. newReq.Body = body
  641. return &newReq, nil
  642. }
  643. // The Request.Body can't reset back to the beginning, but we
  644. // don't seem to have started to read from it yet, so reuse
  645. // the request directly.
  646. if err == errClientConnUnusable {
  647. return req, nil
  648. }
  649. return nil, fmt.Errorf("http2: Transport: cannot retry err [%v] after Request.Body was written; define Request.GetBody to avoid this error", err)
  650. }
  651. func canRetryError(err error) bool {
  652. if err == errClientConnUnusable || err == errClientConnGotGoAway {
  653. return true
  654. }
  655. if se, ok := err.(StreamError); ok {
  656. if se.Code == ErrCodeProtocol && se.Cause == errFromPeer {
  657. // See golang/go#47635, golang/go#42777
  658. return true
  659. }
  660. return se.Code == ErrCodeRefusedStream
  661. }
  662. return false
  663. }
  664. func (t *Transport) dialClientConn(ctx context.Context, addr string, singleUse bool) (*ClientConn, error) {
  665. if t.transportTestHooks != nil {
  666. return t.newClientConn(nil, singleUse)
  667. }
  668. host, _, err := net.SplitHostPort(addr)
  669. if err != nil {
  670. return nil, err
  671. }
  672. tconn, err := t.dialTLS(ctx, "tcp", addr, t.newTLSConfig(host))
  673. if err != nil {
  674. return nil, err
  675. }
  676. return t.newClientConn(tconn, singleUse)
  677. }
  678. func (t *Transport) newTLSConfig(host string) *tls.Config {
  679. cfg := new(tls.Config)
  680. if t.TLSClientConfig != nil {
  681. *cfg = *t.TLSClientConfig.Clone()
  682. }
  683. if !strSliceContains(cfg.NextProtos, NextProtoTLS) {
  684. cfg.NextProtos = append([]string{NextProtoTLS}, cfg.NextProtos...)
  685. }
  686. if cfg.ServerName == "" {
  687. cfg.ServerName = host
  688. }
  689. return cfg
  690. }
  691. func (t *Transport) dialTLS(ctx context.Context, network, addr string, tlsCfg *tls.Config) (net.Conn, error) {
  692. if t.DialTLSContext != nil {
  693. return t.DialTLSContext(ctx, network, addr, tlsCfg)
  694. } else if t.DialTLS != nil {
  695. return t.DialTLS(network, addr, tlsCfg)
  696. }
  697. tlsCn, err := t.dialTLSWithContext(ctx, network, addr, tlsCfg)
  698. if err != nil {
  699. return nil, err
  700. }
  701. state := tlsCn.ConnectionState()
  702. if p := state.NegotiatedProtocol; p != NextProtoTLS {
  703. return nil, fmt.Errorf("http2: unexpected ALPN protocol %q; want %q", p, NextProtoTLS)
  704. }
  705. if !state.NegotiatedProtocolIsMutual {
  706. return nil, errors.New("http2: could not negotiate protocol mutually")
  707. }
  708. return tlsCn, nil
  709. }
  710. // disableKeepAlives reports whether connections should be closed as
  711. // soon as possible after handling the first request.
  712. func (t *Transport) disableKeepAlives() bool {
  713. return t.t1 != nil && t.t1.DisableKeepAlives
  714. }
  715. func (t *Transport) expectContinueTimeout() time.Duration {
  716. if t.t1 == nil {
  717. return 0
  718. }
  719. return t.t1.ExpectContinueTimeout
  720. }
  721. func (t *Transport) NewClientConn(c net.Conn) (*ClientConn, error) {
  722. return t.newClientConn(c, t.disableKeepAlives())
  723. }
  724. func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, error) {
  725. conf := configFromTransport(t)
  726. cc := &ClientConn{
  727. t: t,
  728. tconn: c,
  729. readerDone: make(chan struct{}),
  730. nextStreamID: 1,
  731. maxFrameSize: 16 << 10, // spec default
  732. initialWindowSize: 65535, // spec default
  733. initialStreamRecvWindowSize: conf.MaxUploadBufferPerStream,
  734. maxConcurrentStreams: initialMaxConcurrentStreams, // "infinite", per spec. Use a smaller value until we have received server settings.
  735. peerMaxHeaderListSize: 0xffffffffffffffff, // "infinite", per spec. Use 2^64-1 instead.
  736. streams: make(map[uint32]*clientStream),
  737. singleUse: singleUse,
  738. seenSettingsChan: make(chan struct{}),
  739. wantSettingsAck: true,
  740. readIdleTimeout: conf.SendPingTimeout,
  741. pingTimeout: conf.PingTimeout,
  742. pings: make(map[[8]byte]chan struct{}),
  743. reqHeaderMu: make(chan struct{}, 1),
  744. lastActive: t.now(),
  745. }
  746. var group synctestGroupInterface
  747. if t.transportTestHooks != nil {
  748. t.markNewGoroutine()
  749. t.transportTestHooks.newclientconn(cc)
  750. c = cc.tconn
  751. group = t.group
  752. }
  753. if VerboseLogs {
  754. t.vlogf("http2: Transport creating client conn %p to %v", cc, c.RemoteAddr())
  755. }
  756. cc.cond = sync.NewCond(&cc.mu)
  757. cc.flow.add(int32(initialWindowSize))
  758. // TODO: adjust this writer size to account for frame size +
  759. // MTU + crypto/tls record padding.
  760. cc.bw = bufio.NewWriter(stickyErrWriter{
  761. group: group,
  762. conn: c,
  763. timeout: conf.WriteByteTimeout,
  764. err: &cc.werr,
  765. })
  766. cc.br = bufio.NewReader(c)
  767. cc.fr = NewFramer(cc.bw, cc.br)
  768. cc.fr.SetMaxReadFrameSize(conf.MaxReadFrameSize)
  769. if t.CountError != nil {
  770. cc.fr.countError = t.CountError
  771. }
  772. maxHeaderTableSize := conf.MaxDecoderHeaderTableSize
  773. cc.fr.ReadMetaHeaders = hpack.NewDecoder(maxHeaderTableSize, nil)
  774. cc.fr.MaxHeaderListSize = t.maxHeaderListSize()
  775. cc.henc = hpack.NewEncoder(&cc.hbuf)
  776. cc.henc.SetMaxDynamicTableSizeLimit(conf.MaxEncoderHeaderTableSize)
  777. cc.peerMaxHeaderTableSize = initialHeaderTableSize
  778. if cs, ok := c.(connectionStater); ok {
  779. state := cs.ConnectionState()
  780. cc.tlsState = &state
  781. }
  782. initialSettings := []Setting{
  783. {ID: SettingEnablePush, Val: 0},
  784. {ID: SettingInitialWindowSize, Val: uint32(cc.initialStreamRecvWindowSize)},
  785. }
  786. initialSettings = append(initialSettings, Setting{ID: SettingMaxFrameSize, Val: conf.MaxReadFrameSize})
  787. if max := t.maxHeaderListSize(); max != 0 {
  788. initialSettings = append(initialSettings, Setting{ID: SettingMaxHeaderListSize, Val: max})
  789. }
  790. if maxHeaderTableSize != initialHeaderTableSize {
  791. initialSettings = append(initialSettings, Setting{ID: SettingHeaderTableSize, Val: maxHeaderTableSize})
  792. }
  793. cc.bw.Write(clientPreface)
  794. cc.fr.WriteSettings(initialSettings...)
  795. cc.fr.WriteWindowUpdate(0, uint32(conf.MaxUploadBufferPerConnection))
  796. cc.inflow.init(conf.MaxUploadBufferPerConnection + initialWindowSize)
  797. cc.bw.Flush()
  798. if cc.werr != nil {
  799. cc.Close()
  800. return nil, cc.werr
  801. }
  802. // Start the idle timer after the connection is fully initialized.
  803. if d := t.idleConnTimeout(); d != 0 {
  804. cc.idleTimeout = d
  805. cc.idleTimer = t.afterFunc(d, cc.onIdleTimeout)
  806. }
  807. go cc.readLoop()
  808. return cc, nil
  809. }
  810. func (cc *ClientConn) healthCheck() {
  811. pingTimeout := cc.pingTimeout
  812. // We don't need to periodically ping in the health check, because the readLoop of ClientConn will
  813. // trigger the healthCheck again if there is no frame received.
  814. ctx, cancel := cc.t.contextWithTimeout(context.Background(), pingTimeout)
  815. defer cancel()
  816. cc.vlogf("http2: Transport sending health check")
  817. err := cc.Ping(ctx)
  818. if err != nil {
  819. cc.vlogf("http2: Transport health check failure: %v", err)
  820. cc.closeForLostPing()
  821. } else {
  822. cc.vlogf("http2: Transport health check success")
  823. }
  824. }
  825. // SetDoNotReuse marks cc as not reusable for future HTTP requests.
  826. func (cc *ClientConn) SetDoNotReuse() {
  827. cc.mu.Lock()
  828. defer cc.mu.Unlock()
  829. cc.doNotReuse = true
  830. }
  831. func (cc *ClientConn) setGoAway(f *GoAwayFrame) {
  832. cc.mu.Lock()
  833. defer cc.mu.Unlock()
  834. old := cc.goAway
  835. cc.goAway = f
  836. // Merge the previous and current GoAway error frames.
  837. if cc.goAwayDebug == "" {
  838. cc.goAwayDebug = string(f.DebugData())
  839. }
  840. if old != nil && old.ErrCode != ErrCodeNo {
  841. cc.goAway.ErrCode = old.ErrCode
  842. }
  843. last := f.LastStreamID
  844. for streamID, cs := range cc.streams {
  845. if streamID <= last {
  846. // The server's GOAWAY indicates that it received this stream.
  847. // It will either finish processing it, or close the connection
  848. // without doing so. Either way, leave the stream alone for now.
  849. continue
  850. }
  851. if streamID == 1 && cc.goAway.ErrCode != ErrCodeNo {
  852. // Don't retry the first stream on a connection if we get a non-NO error.
  853. // If the server is sending an error on a new connection,
  854. // retrying the request on a new one probably isn't going to work.
  855. cs.abortStreamLocked(fmt.Errorf("http2: Transport received GOAWAY from server ErrCode:%v", cc.goAway.ErrCode))
  856. } else {
  857. // Aborting the stream with errClentConnGotGoAway indicates that
  858. // the request should be retried on a new connection.
  859. cs.abortStreamLocked(errClientConnGotGoAway)
  860. }
  861. }
  862. }
  863. // CanTakeNewRequest reports whether the connection can take a new request,
  864. // meaning it has not been closed or received or sent a GOAWAY.
  865. //
  866. // If the caller is going to immediately make a new request on this
  867. // connection, use ReserveNewRequest instead.
  868. func (cc *ClientConn) CanTakeNewRequest() bool {
  869. cc.mu.Lock()
  870. defer cc.mu.Unlock()
  871. return cc.canTakeNewRequestLocked()
  872. }
  873. // ReserveNewRequest is like CanTakeNewRequest but also reserves a
  874. // concurrent stream in cc. The reservation is decremented on the
  875. // next call to RoundTrip.
  876. func (cc *ClientConn) ReserveNewRequest() bool {
  877. cc.mu.Lock()
  878. defer cc.mu.Unlock()
  879. if st := cc.idleStateLocked(); !st.canTakeNewRequest {
  880. return false
  881. }
  882. cc.streamsReserved++
  883. return true
  884. }
  885. // ClientConnState describes the state of a ClientConn.
  886. type ClientConnState struct {
  887. // Closed is whether the connection is closed.
  888. Closed bool
  889. // Closing is whether the connection is in the process of
  890. // closing. It may be closing due to shutdown, being a
  891. // single-use connection, being marked as DoNotReuse, or
  892. // having received a GOAWAY frame.
  893. Closing bool
  894. // StreamsActive is how many streams are active.
  895. StreamsActive int
  896. // StreamsReserved is how many streams have been reserved via
  897. // ClientConn.ReserveNewRequest.
  898. StreamsReserved int
  899. // StreamsPending is how many requests have been sent in excess
  900. // of the peer's advertised MaxConcurrentStreams setting and
  901. // are waiting for other streams to complete.
  902. StreamsPending int
  903. // MaxConcurrentStreams is how many concurrent streams the
  904. // peer advertised as acceptable. Zero means no SETTINGS
  905. // frame has been received yet.
  906. MaxConcurrentStreams uint32
  907. // LastIdle, if non-zero, is when the connection last
  908. // transitioned to idle state.
  909. LastIdle time.Time
  910. }
  911. // State returns a snapshot of cc's state.
  912. func (cc *ClientConn) State() ClientConnState {
  913. cc.wmu.Lock()
  914. maxConcurrent := cc.maxConcurrentStreams
  915. if !cc.seenSettings {
  916. maxConcurrent = 0
  917. }
  918. cc.wmu.Unlock()
  919. cc.mu.Lock()
  920. defer cc.mu.Unlock()
  921. return ClientConnState{
  922. Closed: cc.closed,
  923. Closing: cc.closing || cc.singleUse || cc.doNotReuse || cc.goAway != nil,
  924. StreamsActive: len(cc.streams) + cc.pendingResets,
  925. StreamsReserved: cc.streamsReserved,
  926. StreamsPending: cc.pendingRequests,
  927. LastIdle: cc.lastIdle,
  928. MaxConcurrentStreams: maxConcurrent,
  929. }
  930. }
  931. // clientConnIdleState describes the suitability of a client
  932. // connection to initiate a new RoundTrip request.
  933. type clientConnIdleState struct {
  934. canTakeNewRequest bool
  935. }
  936. func (cc *ClientConn) idleState() clientConnIdleState {
  937. cc.mu.Lock()
  938. defer cc.mu.Unlock()
  939. return cc.idleStateLocked()
  940. }
  941. func (cc *ClientConn) idleStateLocked() (st clientConnIdleState) {
  942. if cc.singleUse && cc.nextStreamID > 1 {
  943. return
  944. }
  945. var maxConcurrentOkay bool
  946. if cc.t.StrictMaxConcurrentStreams {
  947. // We'll tell the caller we can take a new request to
  948. // prevent the caller from dialing a new TCP
  949. // connection, but then we'll block later before
  950. // writing it.
  951. maxConcurrentOkay = true
  952. } else {
  953. // We can take a new request if the total of
  954. // - active streams;
  955. // - reservation slots for new streams; and
  956. // - streams for which we have sent a RST_STREAM and a PING,
  957. // but received no subsequent frame
  958. // is less than the concurrency limit.
  959. maxConcurrentOkay = cc.currentRequestCountLocked() < int(cc.maxConcurrentStreams)
  960. }
  961. st.canTakeNewRequest = cc.goAway == nil && !cc.closed && !cc.closing && maxConcurrentOkay &&
  962. !cc.doNotReuse &&
  963. int64(cc.nextStreamID)+2*int64(cc.pendingRequests) < math.MaxInt32 &&
  964. !cc.tooIdleLocked()
  965. // If this connection has never been used for a request and is closed,
  966. // then let it take a request (which will fail).
  967. // If the conn was closed for idleness, we're racing the idle timer;
  968. // don't try to use the conn. (Issue #70515.)
  969. //
  970. // This avoids a situation where an error early in a connection's lifetime
  971. // goes unreported.
  972. if cc.nextStreamID == 1 && cc.streamsReserved == 0 && cc.closed && !cc.closedOnIdle {
  973. st.canTakeNewRequest = true
  974. }
  975. return
  976. }
  977. // currentRequestCountLocked reports the number of concurrency slots currently in use,
  978. // including active streams, reserved slots, and reset streams waiting for acknowledgement.
  979. func (cc *ClientConn) currentRequestCountLocked() int {
  980. return len(cc.streams) + cc.streamsReserved + cc.pendingResets
  981. }
  982. func (cc *ClientConn) canTakeNewRequestLocked() bool {
  983. st := cc.idleStateLocked()
  984. return st.canTakeNewRequest
  985. }
  986. // tooIdleLocked reports whether this connection has been been sitting idle
  987. // for too much wall time.
  988. func (cc *ClientConn) tooIdleLocked() bool {
  989. // The Round(0) strips the monontonic clock reading so the
  990. // times are compared based on their wall time. We don't want
  991. // to reuse a connection that's been sitting idle during
  992. // VM/laptop suspend if monotonic time was also frozen.
  993. return cc.idleTimeout != 0 && !cc.lastIdle.IsZero() && cc.t.timeSince(cc.lastIdle.Round(0)) > cc.idleTimeout
  994. }
  995. // onIdleTimeout is called from a time.AfterFunc goroutine. It will
  996. // only be called when we're idle, but because we're coming from a new
  997. // goroutine, there could be a new request coming in at the same time,
  998. // so this simply calls the synchronized closeIfIdle to shut down this
  999. // connection. The timer could just call closeIfIdle, but this is more
  1000. // clear.
  1001. func (cc *ClientConn) onIdleTimeout() {
  1002. cc.closeIfIdle()
  1003. }
  1004. func (cc *ClientConn) closeConn() {
  1005. t := time.AfterFunc(250*time.Millisecond, cc.forceCloseConn)
  1006. defer t.Stop()
  1007. cc.tconn.Close()
  1008. }
  1009. // A tls.Conn.Close can hang for a long time if the peer is unresponsive.
  1010. // Try to shut it down more aggressively.
  1011. func (cc *ClientConn) forceCloseConn() {
  1012. tc, ok := cc.tconn.(*tls.Conn)
  1013. if !ok {
  1014. return
  1015. }
  1016. if nc := tc.NetConn(); nc != nil {
  1017. nc.Close()
  1018. }
  1019. }
  1020. func (cc *ClientConn) closeIfIdle() {
  1021. cc.mu.Lock()
  1022. if len(cc.streams) > 0 || cc.streamsReserved > 0 {
  1023. cc.mu.Unlock()
  1024. return
  1025. }
  1026. cc.closed = true
  1027. cc.closedOnIdle = true
  1028. nextID := cc.nextStreamID
  1029. // TODO: do clients send GOAWAY too? maybe? Just Close:
  1030. cc.mu.Unlock()
  1031. if VerboseLogs {
  1032. cc.vlogf("http2: Transport closing idle conn %p (forSingleUse=%v, maxStream=%v)", cc, cc.singleUse, nextID-2)
  1033. }
  1034. cc.closeConn()
  1035. }
  1036. func (cc *ClientConn) isDoNotReuseAndIdle() bool {
  1037. cc.mu.Lock()
  1038. defer cc.mu.Unlock()
  1039. return cc.doNotReuse && len(cc.streams) == 0
  1040. }
  1041. var shutdownEnterWaitStateHook = func() {}
  1042. // Shutdown gracefully closes the client connection, waiting for running streams to complete.
  1043. func (cc *ClientConn) Shutdown(ctx context.Context) error {
  1044. if err := cc.sendGoAway(); err != nil {
  1045. return err
  1046. }
  1047. // Wait for all in-flight streams to complete or connection to close
  1048. done := make(chan struct{})
  1049. cancelled := false // guarded by cc.mu
  1050. go func() {
  1051. cc.t.markNewGoroutine()
  1052. cc.mu.Lock()
  1053. defer cc.mu.Unlock()
  1054. for {
  1055. if len(cc.streams) == 0 || cc.closed {
  1056. cc.closed = true
  1057. close(done)
  1058. break
  1059. }
  1060. if cancelled {
  1061. break
  1062. }
  1063. cc.cond.Wait()
  1064. }
  1065. }()
  1066. shutdownEnterWaitStateHook()
  1067. select {
  1068. case <-done:
  1069. cc.closeConn()
  1070. return nil
  1071. case <-ctx.Done():
  1072. cc.mu.Lock()
  1073. // Free the goroutine above
  1074. cancelled = true
  1075. cc.cond.Broadcast()
  1076. cc.mu.Unlock()
  1077. return ctx.Err()
  1078. }
  1079. }
  1080. func (cc *ClientConn) sendGoAway() error {
  1081. cc.mu.Lock()
  1082. closing := cc.closing
  1083. cc.closing = true
  1084. maxStreamID := cc.nextStreamID
  1085. cc.mu.Unlock()
  1086. if closing {
  1087. // GOAWAY sent already
  1088. return nil
  1089. }
  1090. cc.wmu.Lock()
  1091. defer cc.wmu.Unlock()
  1092. // Send a graceful shutdown frame to server
  1093. if err := cc.fr.WriteGoAway(maxStreamID, ErrCodeNo, nil); err != nil {
  1094. return err
  1095. }
  1096. if err := cc.bw.Flush(); err != nil {
  1097. return err
  1098. }
  1099. // Prevent new requests
  1100. return nil
  1101. }
  1102. // closes the client connection immediately. In-flight requests are interrupted.
  1103. // err is sent to streams.
  1104. func (cc *ClientConn) closeForError(err error) {
  1105. cc.mu.Lock()
  1106. cc.closed = true
  1107. for _, cs := range cc.streams {
  1108. cs.abortStreamLocked(err)
  1109. }
  1110. cc.cond.Broadcast()
  1111. cc.mu.Unlock()
  1112. cc.closeConn()
  1113. }
  1114. // Close closes the client connection immediately.
  1115. //
  1116. // In-flight requests are interrupted. For a graceful shutdown, use Shutdown instead.
  1117. func (cc *ClientConn) Close() error {
  1118. err := errors.New("http2: client connection force closed via ClientConn.Close")
  1119. cc.closeForError(err)
  1120. return nil
  1121. }
  1122. // closes the client connection immediately. In-flight requests are interrupted.
  1123. func (cc *ClientConn) closeForLostPing() {
  1124. err := errors.New("http2: client connection lost")
  1125. if f := cc.t.CountError; f != nil {
  1126. f("conn_close_lost_ping")
  1127. }
  1128. cc.closeForError(err)
  1129. }
  1130. // errRequestCanceled is a copy of net/http's errRequestCanceled because it's not
  1131. // exported. At least they'll be DeepEqual for h1-vs-h2 comparisons tests.
  1132. var errRequestCanceled = errors.New("net/http: request canceled")
  1133. func (cc *ClientConn) responseHeaderTimeout() time.Duration {
  1134. if cc.t.t1 != nil {
  1135. return cc.t.t1.ResponseHeaderTimeout
  1136. }
  1137. // No way to do this (yet?) with just an http2.Transport. Probably
  1138. // no need. Request.Cancel this is the new way. We only need to support
  1139. // this for compatibility with the old http.Transport fields when
  1140. // we're doing transparent http2.
  1141. return 0
  1142. }
  1143. // actualContentLength returns a sanitized version of
  1144. // req.ContentLength, where 0 actually means zero (not unknown) and -1
  1145. // means unknown.
  1146. func actualContentLength(req *http.Request) int64 {
  1147. if req.Body == nil || req.Body == http.NoBody {
  1148. return 0
  1149. }
  1150. if req.ContentLength != 0 {
  1151. return req.ContentLength
  1152. }
  1153. return -1
  1154. }
  1155. func (cc *ClientConn) decrStreamReservations() {
  1156. cc.mu.Lock()
  1157. defer cc.mu.Unlock()
  1158. cc.decrStreamReservationsLocked()
  1159. }
  1160. func (cc *ClientConn) decrStreamReservationsLocked() {
  1161. if cc.streamsReserved > 0 {
  1162. cc.streamsReserved--
  1163. }
  1164. }
  1165. func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
  1166. return cc.roundTrip(req, nil)
  1167. }
  1168. func (cc *ClientConn) roundTrip(req *http.Request, streamf func(*clientStream)) (*http.Response, error) {
  1169. ctx := req.Context()
  1170. cs := &clientStream{
  1171. cc: cc,
  1172. ctx: ctx,
  1173. reqCancel: req.Cancel,
  1174. isHead: req.Method == "HEAD",
  1175. reqBody: req.Body,
  1176. reqBodyContentLength: actualContentLength(req),
  1177. trace: httptrace.ContextClientTrace(ctx),
  1178. peerClosed: make(chan struct{}),
  1179. abort: make(chan struct{}),
  1180. respHeaderRecv: make(chan struct{}),
  1181. donec: make(chan struct{}),
  1182. }
  1183. cs.requestedGzip = httpcommon.IsRequestGzip(req.Method, req.Header, cc.t.disableCompression())
  1184. go cs.doRequest(req, streamf)
  1185. waitDone := func() error {
  1186. select {
  1187. case <-cs.donec:
  1188. return nil
  1189. case <-ctx.Done():
  1190. return ctx.Err()
  1191. case <-cs.reqCancel:
  1192. return errRequestCanceled
  1193. }
  1194. }
  1195. handleResponseHeaders := func() (*http.Response, error) {
  1196. res := cs.res
  1197. if res.StatusCode > 299 {
  1198. // On error or status code 3xx, 4xx, 5xx, etc abort any
  1199. // ongoing write, assuming that the server doesn't care
  1200. // about our request body. If the server replied with 1xx or
  1201. // 2xx, however, then assume the server DOES potentially
  1202. // want our body (e.g. full-duplex streaming:
  1203. // golang.org/issue/13444). If it turns out the server
  1204. // doesn't, they'll RST_STREAM us soon enough. This is a
  1205. // heuristic to avoid adding knobs to Transport. Hopefully
  1206. // we can keep it.
  1207. cs.abortRequestBodyWrite()
  1208. }
  1209. res.Request = req
  1210. res.TLS = cc.tlsState
  1211. if res.Body == noBody && actualContentLength(req) == 0 {
  1212. // If there isn't a request or response body still being
  1213. // written, then wait for the stream to be closed before
  1214. // RoundTrip returns.
  1215. if err := waitDone(); err != nil {
  1216. return nil, err
  1217. }
  1218. }
  1219. return res, nil
  1220. }
  1221. cancelRequest := func(cs *clientStream, err error) error {
  1222. cs.cc.mu.Lock()
  1223. bodyClosed := cs.reqBodyClosed
  1224. cs.cc.mu.Unlock()
  1225. // Wait for the request body to be closed.
  1226. //
  1227. // If nothing closed the body before now, abortStreamLocked
  1228. // will have started a goroutine to close it.
  1229. //
  1230. // Closing the body before returning avoids a race condition
  1231. // with net/http checking its readTrackingBody to see if the
  1232. // body was read from or closed. See golang/go#60041.
  1233. //
  1234. // The body is closed in a separate goroutine without the
  1235. // connection mutex held, but dropping the mutex before waiting
  1236. // will keep us from holding it indefinitely if the body
  1237. // close is slow for some reason.
  1238. if bodyClosed != nil {
  1239. <-bodyClosed
  1240. }
  1241. return err
  1242. }
  1243. for {
  1244. select {
  1245. case <-cs.respHeaderRecv:
  1246. return handleResponseHeaders()
  1247. case <-cs.abort:
  1248. select {
  1249. case <-cs.respHeaderRecv:
  1250. // If both cs.respHeaderRecv and cs.abort are signaling,
  1251. // pick respHeaderRecv. The server probably wrote the
  1252. // response and immediately reset the stream.
  1253. // golang.org/issue/49645
  1254. return handleResponseHeaders()
  1255. default:
  1256. waitDone()
  1257. return nil, cs.abortErr
  1258. }
  1259. case <-ctx.Done():
  1260. err := ctx.Err()
  1261. cs.abortStream(err)
  1262. return nil, cancelRequest(cs, err)
  1263. case <-cs.reqCancel:
  1264. cs.abortStream(errRequestCanceled)
  1265. return nil, cancelRequest(cs, errRequestCanceled)
  1266. }
  1267. }
  1268. }
  1269. // doRequest runs for the duration of the request lifetime.
  1270. //
  1271. // It sends the request and performs post-request cleanup (closing Request.Body, etc.).
  1272. func (cs *clientStream) doRequest(req *http.Request, streamf func(*clientStream)) {
  1273. cs.cc.t.markNewGoroutine()
  1274. err := cs.writeRequest(req, streamf)
  1275. cs.cleanupWriteRequest(err)
  1276. }
  1277. var errExtendedConnectNotSupported = errors.New("net/http: extended connect not supported by peer")
  1278. // writeRequest sends a request.
  1279. //
  1280. // It returns nil after the request is written, the response read,
  1281. // and the request stream is half-closed by the peer.
  1282. //
  1283. // It returns non-nil if the request ends otherwise.
  1284. // If the returned error is StreamError, the error Code may be used in resetting the stream.
  1285. func (cs *clientStream) writeRequest(req *http.Request, streamf func(*clientStream)) (err error) {
  1286. cc := cs.cc
  1287. ctx := cs.ctx
  1288. // wait for setting frames to be received, a server can change this value later,
  1289. // but we just wait for the first settings frame
  1290. var isExtendedConnect bool
  1291. if req.Method == "CONNECT" && req.Header.Get(":protocol") != "" {
  1292. isExtendedConnect = true
  1293. }
  1294. // Acquire the new-request lock by writing to reqHeaderMu.
  1295. // This lock guards the critical section covering allocating a new stream ID
  1296. // (requires mu) and creating the stream (requires wmu).
  1297. if cc.reqHeaderMu == nil {
  1298. panic("RoundTrip on uninitialized ClientConn") // for tests
  1299. }
  1300. if isExtendedConnect {
  1301. select {
  1302. case <-cs.reqCancel:
  1303. return errRequestCanceled
  1304. case <-ctx.Done():
  1305. return ctx.Err()
  1306. case <-cc.seenSettingsChan:
  1307. if !cc.extendedConnectAllowed {
  1308. return errExtendedConnectNotSupported
  1309. }
  1310. }
  1311. }
  1312. select {
  1313. case cc.reqHeaderMu <- struct{}{}:
  1314. case <-cs.reqCancel:
  1315. return errRequestCanceled
  1316. case <-ctx.Done():
  1317. return ctx.Err()
  1318. }
  1319. cc.mu.Lock()
  1320. if cc.idleTimer != nil {
  1321. cc.idleTimer.Stop()
  1322. }
  1323. cc.decrStreamReservationsLocked()
  1324. if err := cc.awaitOpenSlotForStreamLocked(cs); err != nil {
  1325. cc.mu.Unlock()
  1326. <-cc.reqHeaderMu
  1327. return err
  1328. }
  1329. cc.addStreamLocked(cs) // assigns stream ID
  1330. if isConnectionCloseRequest(req) {
  1331. cc.doNotReuse = true
  1332. }
  1333. cc.mu.Unlock()
  1334. if streamf != nil {
  1335. streamf(cs)
  1336. }
  1337. continueTimeout := cc.t.expectContinueTimeout()
  1338. if continueTimeout != 0 {
  1339. if !httpguts.HeaderValuesContainsToken(req.Header["Expect"], "100-continue") {
  1340. continueTimeout = 0
  1341. } else {
  1342. cs.on100 = make(chan struct{}, 1)
  1343. }
  1344. }
  1345. // Past this point (where we send request headers), it is possible for
  1346. // RoundTrip to return successfully. Since the RoundTrip contract permits
  1347. // the caller to "mutate or reuse" the Request after closing the Response's Body,
  1348. // we must take care when referencing the Request from here on.
  1349. err = cs.encodeAndWriteHeaders(req)
  1350. <-cc.reqHeaderMu
  1351. if err != nil {
  1352. return err
  1353. }
  1354. hasBody := cs.reqBodyContentLength != 0
  1355. if !hasBody {
  1356. cs.sentEndStream = true
  1357. } else {
  1358. if continueTimeout != 0 {
  1359. traceWait100Continue(cs.trace)
  1360. timer := time.NewTimer(continueTimeout)
  1361. select {
  1362. case <-timer.C:
  1363. err = nil
  1364. case <-cs.on100:
  1365. err = nil
  1366. case <-cs.abort:
  1367. err = cs.abortErr
  1368. case <-ctx.Done():
  1369. err = ctx.Err()
  1370. case <-cs.reqCancel:
  1371. err = errRequestCanceled
  1372. }
  1373. timer.Stop()
  1374. if err != nil {
  1375. traceWroteRequest(cs.trace, err)
  1376. return err
  1377. }
  1378. }
  1379. if err = cs.writeRequestBody(req); err != nil {
  1380. if err != errStopReqBodyWrite {
  1381. traceWroteRequest(cs.trace, err)
  1382. return err
  1383. }
  1384. } else {
  1385. cs.sentEndStream = true
  1386. }
  1387. }
  1388. traceWroteRequest(cs.trace, err)
  1389. var respHeaderTimer <-chan time.Time
  1390. var respHeaderRecv chan struct{}
  1391. if d := cc.responseHeaderTimeout(); d != 0 {
  1392. timer := cc.t.newTimer(d)
  1393. defer timer.Stop()
  1394. respHeaderTimer = timer.C()
  1395. respHeaderRecv = cs.respHeaderRecv
  1396. }
  1397. // Wait until the peer half-closes its end of the stream,
  1398. // or until the request is aborted (via context, error, or otherwise),
  1399. // whichever comes first.
  1400. for {
  1401. select {
  1402. case <-cs.peerClosed:
  1403. return nil
  1404. case <-respHeaderTimer:
  1405. return errTimeout
  1406. case <-respHeaderRecv:
  1407. respHeaderRecv = nil
  1408. respHeaderTimer = nil // keep waiting for END_STREAM
  1409. case <-cs.abort:
  1410. return cs.abortErr
  1411. case <-ctx.Done():
  1412. return ctx.Err()
  1413. case <-cs.reqCancel:
  1414. return errRequestCanceled
  1415. }
  1416. }
  1417. }
  1418. func (cs *clientStream) encodeAndWriteHeaders(req *http.Request) error {
  1419. cc := cs.cc
  1420. ctx := cs.ctx
  1421. cc.wmu.Lock()
  1422. defer cc.wmu.Unlock()
  1423. // If the request was canceled while waiting for cc.mu, just quit.
  1424. select {
  1425. case <-cs.abort:
  1426. return cs.abortErr
  1427. case <-ctx.Done():
  1428. return ctx.Err()
  1429. case <-cs.reqCancel:
  1430. return errRequestCanceled
  1431. default:
  1432. }
  1433. // Encode headers.
  1434. //
  1435. // we send: HEADERS{1}, CONTINUATION{0,} + DATA{0,} (DATA is
  1436. // sent by writeRequestBody below, along with any Trailers,
  1437. // again in form HEADERS{1}, CONTINUATION{0,})
  1438. cc.hbuf.Reset()
  1439. res, err := encodeRequestHeaders(req, cs.requestedGzip, cc.peerMaxHeaderListSize, func(name, value string) {
  1440. cc.writeHeader(name, value)
  1441. })
  1442. if err != nil {
  1443. return fmt.Errorf("http2: %w", err)
  1444. }
  1445. hdrs := cc.hbuf.Bytes()
  1446. // Write the request.
  1447. endStream := !res.HasBody && !res.HasTrailers
  1448. cs.sentHeaders = true
  1449. err = cc.writeHeaders(cs.ID, endStream, int(cc.maxFrameSize), hdrs)
  1450. traceWroteHeaders(cs.trace)
  1451. return err
  1452. }
  1453. func encodeRequestHeaders(req *http.Request, addGzipHeader bool, peerMaxHeaderListSize uint64, headerf func(name, value string)) (httpcommon.EncodeHeadersResult, error) {
  1454. return httpcommon.EncodeHeaders(req.Context(), httpcommon.EncodeHeadersParam{
  1455. Request: httpcommon.Request{
  1456. Header: req.Header,
  1457. Trailer: req.Trailer,
  1458. URL: req.URL,
  1459. Host: req.Host,
  1460. Method: req.Method,
  1461. ActualContentLength: actualContentLength(req),
  1462. },
  1463. AddGzipHeader: addGzipHeader,
  1464. PeerMaxHeaderListSize: peerMaxHeaderListSize,
  1465. DefaultUserAgent: defaultUserAgent,
  1466. }, headerf)
  1467. }
  1468. // cleanupWriteRequest performs post-request tasks.
  1469. //
  1470. // If err (the result of writeRequest) is non-nil and the stream is not closed,
  1471. // cleanupWriteRequest will send a reset to the peer.
  1472. func (cs *clientStream) cleanupWriteRequest(err error) {
  1473. cc := cs.cc
  1474. if cs.ID == 0 {
  1475. // We were canceled before creating the stream, so return our reservation.
  1476. cc.decrStreamReservations()
  1477. }
  1478. // TODO: write h12Compare test showing whether
  1479. // Request.Body is closed by the Transport,
  1480. // and in multiple cases: server replies <=299 and >299
  1481. // while still writing request body
  1482. cc.mu.Lock()
  1483. mustCloseBody := false
  1484. if cs.reqBody != nil && cs.reqBodyClosed == nil {
  1485. mustCloseBody = true
  1486. cs.reqBodyClosed = make(chan struct{})
  1487. }
  1488. bodyClosed := cs.reqBodyClosed
  1489. closeOnIdle := cc.singleUse || cc.doNotReuse || cc.t.disableKeepAlives() || cc.goAway != nil
  1490. cc.mu.Unlock()
  1491. if mustCloseBody {
  1492. cs.reqBody.Close()
  1493. close(bodyClosed)
  1494. }
  1495. if bodyClosed != nil {
  1496. <-bodyClosed
  1497. }
  1498. if err != nil && cs.sentEndStream {
  1499. // If the connection is closed immediately after the response is read,
  1500. // we may be aborted before finishing up here. If the stream was closed
  1501. // cleanly on both sides, there is no error.
  1502. select {
  1503. case <-cs.peerClosed:
  1504. err = nil
  1505. default:
  1506. }
  1507. }
  1508. if err != nil {
  1509. cs.abortStream(err) // possibly redundant, but harmless
  1510. if cs.sentHeaders {
  1511. if se, ok := err.(StreamError); ok {
  1512. if se.Cause != errFromPeer {
  1513. cc.writeStreamReset(cs.ID, se.Code, false, err)
  1514. }
  1515. } else {
  1516. // We're cancelling an in-flight request.
  1517. //
  1518. // This could be due to the server becoming unresponsive.
  1519. // To avoid sending too many requests on a dead connection,
  1520. // we let the request continue to consume a concurrency slot
  1521. // until we can confirm the server is still responding.
  1522. // We do this by sending a PING frame along with the RST_STREAM
  1523. // (unless a ping is already in flight).
  1524. //
  1525. // For simplicity, we don't bother tracking the PING payload:
  1526. // We reset cc.pendingResets any time we receive a PING ACK.
  1527. //
  1528. // We skip this if the conn is going to be closed on idle,
  1529. // because it's short lived and will probably be closed before
  1530. // we get the ping response.
  1531. ping := false
  1532. if !closeOnIdle {
  1533. cc.mu.Lock()
  1534. // rstStreamPingsBlocked works around a gRPC behavior:
  1535. // see comment on the field for details.
  1536. if !cc.rstStreamPingsBlocked {
  1537. if cc.pendingResets == 0 {
  1538. ping = true
  1539. }
  1540. cc.pendingResets++
  1541. }
  1542. cc.mu.Unlock()
  1543. }
  1544. cc.writeStreamReset(cs.ID, ErrCodeCancel, ping, err)
  1545. }
  1546. }
  1547. cs.bufPipe.CloseWithError(err) // no-op if already closed
  1548. } else {
  1549. if cs.sentHeaders && !cs.sentEndStream {
  1550. cc.writeStreamReset(cs.ID, ErrCodeNo, false, nil)
  1551. }
  1552. cs.bufPipe.CloseWithError(errRequestCanceled)
  1553. }
  1554. if cs.ID != 0 {
  1555. cc.forgetStreamID(cs.ID)
  1556. }
  1557. cc.wmu.Lock()
  1558. werr := cc.werr
  1559. cc.wmu.Unlock()
  1560. if werr != nil {
  1561. cc.Close()
  1562. }
  1563. close(cs.donec)
  1564. }
  1565. // awaitOpenSlotForStreamLocked waits until len(streams) < maxConcurrentStreams.
  1566. // Must hold cc.mu.
  1567. func (cc *ClientConn) awaitOpenSlotForStreamLocked(cs *clientStream) error {
  1568. for {
  1569. if cc.closed && cc.nextStreamID == 1 && cc.streamsReserved == 0 {
  1570. // This is the very first request sent to this connection.
  1571. // Return a fatal error which aborts the retry loop.
  1572. return errClientConnNotEstablished
  1573. }
  1574. cc.lastActive = cc.t.now()
  1575. if cc.closed || !cc.canTakeNewRequestLocked() {
  1576. return errClientConnUnusable
  1577. }
  1578. cc.lastIdle = time.Time{}
  1579. if cc.currentRequestCountLocked() < int(cc.maxConcurrentStreams) {
  1580. return nil
  1581. }
  1582. cc.pendingRequests++
  1583. cc.cond.Wait()
  1584. cc.pendingRequests--
  1585. select {
  1586. case <-cs.abort:
  1587. return cs.abortErr
  1588. default:
  1589. }
  1590. }
  1591. }
  1592. // requires cc.wmu be held
  1593. func (cc *ClientConn) writeHeaders(streamID uint32, endStream bool, maxFrameSize int, hdrs []byte) error {
  1594. first := true // first frame written (HEADERS is first, then CONTINUATION)
  1595. for len(hdrs) > 0 && cc.werr == nil {
  1596. chunk := hdrs
  1597. if len(chunk) > maxFrameSize {
  1598. chunk = chunk[:maxFrameSize]
  1599. }
  1600. hdrs = hdrs[len(chunk):]
  1601. endHeaders := len(hdrs) == 0
  1602. if first {
  1603. cc.fr.WriteHeaders(HeadersFrameParam{
  1604. StreamID: streamID,
  1605. BlockFragment: chunk,
  1606. EndStream: endStream,
  1607. EndHeaders: endHeaders,
  1608. })
  1609. first = false
  1610. } else {
  1611. cc.fr.WriteContinuation(streamID, endHeaders, chunk)
  1612. }
  1613. }
  1614. cc.bw.Flush()
  1615. return cc.werr
  1616. }
  1617. // internal error values; they don't escape to callers
  1618. var (
  1619. // abort request body write; don't send cancel
  1620. errStopReqBodyWrite = errors.New("http2: aborting request body write")
  1621. // abort request body write, but send stream reset of cancel.
  1622. errStopReqBodyWriteAndCancel = errors.New("http2: canceling request")
  1623. errReqBodyTooLong = errors.New("http2: request body larger than specified content length")
  1624. )
  1625. // frameScratchBufferLen returns the length of a buffer to use for
  1626. // outgoing request bodies to read/write to/from.
  1627. //
  1628. // It returns max(1, min(peer's advertised max frame size,
  1629. // Request.ContentLength+1, 512KB)).
  1630. func (cs *clientStream) frameScratchBufferLen(maxFrameSize int) int {
  1631. const max = 512 << 10
  1632. n := int64(maxFrameSize)
  1633. if n > max {
  1634. n = max
  1635. }
  1636. if cl := cs.reqBodyContentLength; cl != -1 && cl+1 < n {
  1637. // Add an extra byte past the declared content-length to
  1638. // give the caller's Request.Body io.Reader a chance to
  1639. // give us more bytes than they declared, so we can catch it
  1640. // early.
  1641. n = cl + 1
  1642. }
  1643. if n < 1 {
  1644. return 1
  1645. }
  1646. return int(n) // doesn't truncate; max is 512K
  1647. }
  1648. // Seven bufPools manage different frame sizes. This helps to avoid scenarios where long-running
  1649. // streaming requests using small frame sizes occupy large buffers initially allocated for prior
  1650. // requests needing big buffers. The size ranges are as follows:
  1651. // {0 KB, 16 KB], {16 KB, 32 KB], {32 KB, 64 KB], {64 KB, 128 KB], {128 KB, 256 KB],
  1652. // {256 KB, 512 KB], {512 KB, infinity}
  1653. // In practice, the maximum scratch buffer size should not exceed 512 KB due to
  1654. // frameScratchBufferLen(maxFrameSize), thus the "infinity pool" should never be used.
  1655. // It exists mainly as a safety measure, for potential future increases in max buffer size.
  1656. var bufPools [7]sync.Pool // of *[]byte
  1657. func bufPoolIndex(size int) int {
  1658. if size <= 16384 {
  1659. return 0
  1660. }
  1661. size -= 1
  1662. bits := bits.Len(uint(size))
  1663. index := bits - 14
  1664. if index >= len(bufPools) {
  1665. return len(bufPools) - 1
  1666. }
  1667. return index
  1668. }
  1669. func (cs *clientStream) writeRequestBody(req *http.Request) (err error) {
  1670. cc := cs.cc
  1671. body := cs.reqBody
  1672. sentEnd := false // whether we sent the final DATA frame w/ END_STREAM
  1673. hasTrailers := req.Trailer != nil
  1674. remainLen := cs.reqBodyContentLength
  1675. hasContentLen := remainLen != -1
  1676. cc.mu.Lock()
  1677. maxFrameSize := int(cc.maxFrameSize)
  1678. cc.mu.Unlock()
  1679. // Scratch buffer for reading into & writing from.
  1680. scratchLen := cs.frameScratchBufferLen(maxFrameSize)
  1681. var buf []byte
  1682. index := bufPoolIndex(scratchLen)
  1683. if bp, ok := bufPools[index].Get().(*[]byte); ok && len(*bp) >= scratchLen {
  1684. defer bufPools[index].Put(bp)
  1685. buf = *bp
  1686. } else {
  1687. buf = make([]byte, scratchLen)
  1688. defer bufPools[index].Put(&buf)
  1689. }
  1690. var sawEOF bool
  1691. for !sawEOF {
  1692. n, err := body.Read(buf)
  1693. if hasContentLen {
  1694. remainLen -= int64(n)
  1695. if remainLen == 0 && err == nil {
  1696. // The request body's Content-Length was predeclared and
  1697. // we just finished reading it all, but the underlying io.Reader
  1698. // returned the final chunk with a nil error (which is one of
  1699. // the two valid things a Reader can do at EOF). Because we'd prefer
  1700. // to send the END_STREAM bit early, double-check that we're actually
  1701. // at EOF. Subsequent reads should return (0, EOF) at this point.
  1702. // If either value is different, we return an error in one of two ways below.
  1703. var scratch [1]byte
  1704. var n1 int
  1705. n1, err = body.Read(scratch[:])
  1706. remainLen -= int64(n1)
  1707. }
  1708. if remainLen < 0 {
  1709. err = errReqBodyTooLong
  1710. return err
  1711. }
  1712. }
  1713. if err != nil {
  1714. cc.mu.Lock()
  1715. bodyClosed := cs.reqBodyClosed != nil
  1716. cc.mu.Unlock()
  1717. switch {
  1718. case bodyClosed:
  1719. return errStopReqBodyWrite
  1720. case err == io.EOF:
  1721. sawEOF = true
  1722. err = nil
  1723. default:
  1724. return err
  1725. }
  1726. }
  1727. remain := buf[:n]
  1728. for len(remain) > 0 && err == nil {
  1729. var allowed int32
  1730. allowed, err = cs.awaitFlowControl(len(remain))
  1731. if err != nil {
  1732. return err
  1733. }
  1734. cc.wmu.Lock()
  1735. data := remain[:allowed]
  1736. remain = remain[allowed:]
  1737. sentEnd = sawEOF && len(remain) == 0 && !hasTrailers
  1738. err = cc.fr.WriteData(cs.ID, sentEnd, data)
  1739. if err == nil {
  1740. // TODO(bradfitz): this flush is for latency, not bandwidth.
  1741. // Most requests won't need this. Make this opt-in or
  1742. // opt-out? Use some heuristic on the body type? Nagel-like
  1743. // timers? Based on 'n'? Only last chunk of this for loop,
  1744. // unless flow control tokens are low? For now, always.
  1745. // If we change this, see comment below.
  1746. err = cc.bw.Flush()
  1747. }
  1748. cc.wmu.Unlock()
  1749. }
  1750. if err != nil {
  1751. return err
  1752. }
  1753. }
  1754. if sentEnd {
  1755. // Already sent END_STREAM (which implies we have no
  1756. // trailers) and flushed, because currently all
  1757. // WriteData frames above get a flush. So we're done.
  1758. return nil
  1759. }
  1760. // Since the RoundTrip contract permits the caller to "mutate or reuse"
  1761. // a request after the Response's Body is closed, verify that this hasn't
  1762. // happened before accessing the trailers.
  1763. cc.mu.Lock()
  1764. trailer := req.Trailer
  1765. err = cs.abortErr
  1766. cc.mu.Unlock()
  1767. if err != nil {
  1768. return err
  1769. }
  1770. cc.wmu.Lock()
  1771. defer cc.wmu.Unlock()
  1772. var trls []byte
  1773. if len(trailer) > 0 {
  1774. trls, err = cc.encodeTrailers(trailer)
  1775. if err != nil {
  1776. return err
  1777. }
  1778. }
  1779. // Two ways to send END_STREAM: either with trailers, or
  1780. // with an empty DATA frame.
  1781. if len(trls) > 0 {
  1782. err = cc.writeHeaders(cs.ID, true, maxFrameSize, trls)
  1783. } else {
  1784. err = cc.fr.WriteData(cs.ID, true, nil)
  1785. }
  1786. if ferr := cc.bw.Flush(); ferr != nil && err == nil {
  1787. err = ferr
  1788. }
  1789. return err
  1790. }
  1791. // awaitFlowControl waits for [1, min(maxBytes, cc.cs.maxFrameSize)] flow
  1792. // control tokens from the server.
  1793. // It returns either the non-zero number of tokens taken or an error
  1794. // if the stream is dead.
  1795. func (cs *clientStream) awaitFlowControl(maxBytes int) (taken int32, err error) {
  1796. cc := cs.cc
  1797. ctx := cs.ctx
  1798. cc.mu.Lock()
  1799. defer cc.mu.Unlock()
  1800. for {
  1801. if cc.closed {
  1802. return 0, errClientConnClosed
  1803. }
  1804. if cs.reqBodyClosed != nil {
  1805. return 0, errStopReqBodyWrite
  1806. }
  1807. select {
  1808. case <-cs.abort:
  1809. return 0, cs.abortErr
  1810. case <-ctx.Done():
  1811. return 0, ctx.Err()
  1812. case <-cs.reqCancel:
  1813. return 0, errRequestCanceled
  1814. default:
  1815. }
  1816. if a := cs.flow.available(); a > 0 {
  1817. take := a
  1818. if int(take) > maxBytes {
  1819. take = int32(maxBytes) // can't truncate int; take is int32
  1820. }
  1821. if take > int32(cc.maxFrameSize) {
  1822. take = int32(cc.maxFrameSize)
  1823. }
  1824. cs.flow.take(take)
  1825. return take, nil
  1826. }
  1827. cc.cond.Wait()
  1828. }
  1829. }
  1830. // requires cc.wmu be held.
  1831. func (cc *ClientConn) encodeTrailers(trailer http.Header) ([]byte, error) {
  1832. cc.hbuf.Reset()
  1833. hlSize := uint64(0)
  1834. for k, vv := range trailer {
  1835. for _, v := range vv {
  1836. hf := hpack.HeaderField{Name: k, Value: v}
  1837. hlSize += uint64(hf.Size())
  1838. }
  1839. }
  1840. if hlSize > cc.peerMaxHeaderListSize {
  1841. return nil, errRequestHeaderListSize
  1842. }
  1843. for k, vv := range trailer {
  1844. lowKey, ascii := httpcommon.LowerHeader(k)
  1845. if !ascii {
  1846. // Skip writing invalid headers. Per RFC 7540, Section 8.1.2, header
  1847. // field names have to be ASCII characters (just as in HTTP/1.x).
  1848. continue
  1849. }
  1850. // Transfer-Encoding, etc.. have already been filtered at the
  1851. // start of RoundTrip
  1852. for _, v := range vv {
  1853. cc.writeHeader(lowKey, v)
  1854. }
  1855. }
  1856. return cc.hbuf.Bytes(), nil
  1857. }
  1858. func (cc *ClientConn) writeHeader(name, value string) {
  1859. if VerboseLogs {
  1860. log.Printf("http2: Transport encoding header %q = %q", name, value)
  1861. }
  1862. cc.henc.WriteField(hpack.HeaderField{Name: name, Value: value})
  1863. }
  1864. type resAndError struct {
  1865. _ incomparable
  1866. res *http.Response
  1867. err error
  1868. }
  1869. // requires cc.mu be held.
  1870. func (cc *ClientConn) addStreamLocked(cs *clientStream) {
  1871. cs.flow.add(int32(cc.initialWindowSize))
  1872. cs.flow.setConnFlow(&cc.flow)
  1873. cs.inflow.init(cc.initialStreamRecvWindowSize)
  1874. cs.ID = cc.nextStreamID
  1875. cc.nextStreamID += 2
  1876. cc.streams[cs.ID] = cs
  1877. if cs.ID == 0 {
  1878. panic("assigned stream ID 0")
  1879. }
  1880. }
  1881. func (cc *ClientConn) forgetStreamID(id uint32) {
  1882. cc.mu.Lock()
  1883. slen := len(cc.streams)
  1884. delete(cc.streams, id)
  1885. if len(cc.streams) != slen-1 {
  1886. panic("forgetting unknown stream id")
  1887. }
  1888. cc.lastActive = cc.t.now()
  1889. if len(cc.streams) == 0 && cc.idleTimer != nil {
  1890. cc.idleTimer.Reset(cc.idleTimeout)
  1891. cc.lastIdle = cc.t.now()
  1892. }
  1893. // Wake up writeRequestBody via clientStream.awaitFlowControl and
  1894. // wake up RoundTrip if there is a pending request.
  1895. cc.cond.Broadcast()
  1896. closeOnIdle := cc.singleUse || cc.doNotReuse || cc.t.disableKeepAlives() || cc.goAway != nil
  1897. if closeOnIdle && cc.streamsReserved == 0 && len(cc.streams) == 0 {
  1898. if VerboseLogs {
  1899. cc.vlogf("http2: Transport closing idle conn %p (forSingleUse=%v, maxStream=%v)", cc, cc.singleUse, cc.nextStreamID-2)
  1900. }
  1901. cc.closed = true
  1902. defer cc.closeConn()
  1903. }
  1904. cc.mu.Unlock()
  1905. }
  1906. // clientConnReadLoop is the state owned by the clientConn's frame-reading readLoop.
  1907. type clientConnReadLoop struct {
  1908. _ incomparable
  1909. cc *ClientConn
  1910. }
  1911. // readLoop runs in its own goroutine and reads and dispatches frames.
  1912. func (cc *ClientConn) readLoop() {
  1913. cc.t.markNewGoroutine()
  1914. rl := &clientConnReadLoop{cc: cc}
  1915. defer rl.cleanup()
  1916. cc.readerErr = rl.run()
  1917. if ce, ok := cc.readerErr.(ConnectionError); ok {
  1918. cc.wmu.Lock()
  1919. cc.fr.WriteGoAway(0, ErrCode(ce), nil)
  1920. cc.wmu.Unlock()
  1921. }
  1922. }
  1923. // GoAwayError is returned by the Transport when the server closes the
  1924. // TCP connection after sending a GOAWAY frame.
  1925. type GoAwayError struct {
  1926. LastStreamID uint32
  1927. ErrCode ErrCode
  1928. DebugData string
  1929. }
  1930. func (e GoAwayError) Error() string {
  1931. return fmt.Sprintf("http2: server sent GOAWAY and closed the connection; LastStreamID=%v, ErrCode=%v, debug=%q",
  1932. e.LastStreamID, e.ErrCode, e.DebugData)
  1933. }
  1934. func isEOFOrNetReadError(err error) bool {
  1935. if err == io.EOF {
  1936. return true
  1937. }
  1938. ne, ok := err.(*net.OpError)
  1939. return ok && ne.Op == "read"
  1940. }
  1941. func (rl *clientConnReadLoop) cleanup() {
  1942. cc := rl.cc
  1943. defer cc.closeConn()
  1944. defer close(cc.readerDone)
  1945. if cc.idleTimer != nil {
  1946. cc.idleTimer.Stop()
  1947. }
  1948. // Close any response bodies if the server closes prematurely.
  1949. // TODO: also do this if we've written the headers but not
  1950. // gotten a response yet.
  1951. err := cc.readerErr
  1952. cc.mu.Lock()
  1953. if cc.goAway != nil && isEOFOrNetReadError(err) {
  1954. err = GoAwayError{
  1955. LastStreamID: cc.goAway.LastStreamID,
  1956. ErrCode: cc.goAway.ErrCode,
  1957. DebugData: cc.goAwayDebug,
  1958. }
  1959. } else if err == io.EOF {
  1960. err = io.ErrUnexpectedEOF
  1961. }
  1962. cc.closed = true
  1963. // If the connection has never been used, and has been open for only a short time,
  1964. // leave it in the connection pool for a little while.
  1965. //
  1966. // This avoids a situation where new connections are constantly created,
  1967. // added to the pool, fail, and are removed from the pool, without any error
  1968. // being surfaced to the user.
  1969. unusedWaitTime := 5 * time.Second
  1970. if cc.idleTimeout > 0 && unusedWaitTime > cc.idleTimeout {
  1971. unusedWaitTime = cc.idleTimeout
  1972. }
  1973. idleTime := cc.t.now().Sub(cc.lastActive)
  1974. if atomic.LoadUint32(&cc.atomicReused) == 0 && idleTime < unusedWaitTime && !cc.closedOnIdle {
  1975. cc.idleTimer = cc.t.afterFunc(unusedWaitTime-idleTime, func() {
  1976. cc.t.connPool().MarkDead(cc)
  1977. })
  1978. } else {
  1979. cc.mu.Unlock() // avoid any deadlocks in MarkDead
  1980. cc.t.connPool().MarkDead(cc)
  1981. cc.mu.Lock()
  1982. }
  1983. for _, cs := range cc.streams {
  1984. select {
  1985. case <-cs.peerClosed:
  1986. // The server closed the stream before closing the conn,
  1987. // so no need to interrupt it.
  1988. default:
  1989. cs.abortStreamLocked(err)
  1990. }
  1991. }
  1992. cc.cond.Broadcast()
  1993. cc.mu.Unlock()
  1994. if !cc.seenSettings {
  1995. // If we have a pending request that wants extended CONNECT,
  1996. // let it continue and fail with the connection error.
  1997. cc.extendedConnectAllowed = true
  1998. close(cc.seenSettingsChan)
  1999. }
  2000. }
  2001. // countReadFrameError calls Transport.CountError with a string
  2002. // representing err.
  2003. func (cc *ClientConn) countReadFrameError(err error) {
  2004. f := cc.t.CountError
  2005. if f == nil || err == nil {
  2006. return
  2007. }
  2008. if ce, ok := err.(ConnectionError); ok {
  2009. errCode := ErrCode(ce)
  2010. f(fmt.Sprintf("read_frame_conn_error_%s", errCode.stringToken()))
  2011. return
  2012. }
  2013. if errors.Is(err, io.EOF) {
  2014. f("read_frame_eof")
  2015. return
  2016. }
  2017. if errors.Is(err, io.ErrUnexpectedEOF) {
  2018. f("read_frame_unexpected_eof")
  2019. return
  2020. }
  2021. if errors.Is(err, ErrFrameTooLarge) {
  2022. f("read_frame_too_large")
  2023. return
  2024. }
  2025. f("read_frame_other")
  2026. }
  2027. func (rl *clientConnReadLoop) run() error {
  2028. cc := rl.cc
  2029. gotSettings := false
  2030. readIdleTimeout := cc.readIdleTimeout
  2031. var t timer
  2032. if readIdleTimeout != 0 {
  2033. t = cc.t.afterFunc(readIdleTimeout, cc.healthCheck)
  2034. }
  2035. for {
  2036. f, err := cc.fr.ReadFrame()
  2037. if t != nil {
  2038. t.Reset(readIdleTimeout)
  2039. }
  2040. if err != nil {
  2041. cc.vlogf("http2: Transport readFrame error on conn %p: (%T) %v", cc, err, err)
  2042. }
  2043. if se, ok := err.(StreamError); ok {
  2044. if cs := rl.streamByID(se.StreamID, notHeaderOrDataFrame); cs != nil {
  2045. if se.Cause == nil {
  2046. se.Cause = cc.fr.errDetail
  2047. }
  2048. rl.endStreamError(cs, se)
  2049. }
  2050. continue
  2051. } else if err != nil {
  2052. cc.countReadFrameError(err)
  2053. return err
  2054. }
  2055. if VerboseLogs {
  2056. cc.vlogf("http2: Transport received %s", summarizeFrame(f))
  2057. }
  2058. if !gotSettings {
  2059. if _, ok := f.(*SettingsFrame); !ok {
  2060. cc.logf("protocol error: received %T before a SETTINGS frame", f)
  2061. return ConnectionError(ErrCodeProtocol)
  2062. }
  2063. gotSettings = true
  2064. }
  2065. switch f := f.(type) {
  2066. case *MetaHeadersFrame:
  2067. err = rl.processHeaders(f)
  2068. case *DataFrame:
  2069. err = rl.processData(f)
  2070. case *GoAwayFrame:
  2071. err = rl.processGoAway(f)
  2072. case *RSTStreamFrame:
  2073. err = rl.processResetStream(f)
  2074. case *SettingsFrame:
  2075. err = rl.processSettings(f)
  2076. case *PushPromiseFrame:
  2077. err = rl.processPushPromise(f)
  2078. case *WindowUpdateFrame:
  2079. err = rl.processWindowUpdate(f)
  2080. case *PingFrame:
  2081. err = rl.processPing(f)
  2082. default:
  2083. cc.logf("Transport: unhandled response frame type %T", f)
  2084. }
  2085. if err != nil {
  2086. if VerboseLogs {
  2087. cc.vlogf("http2: Transport conn %p received error from processing frame %v: %v", cc, summarizeFrame(f), err)
  2088. }
  2089. return err
  2090. }
  2091. }
  2092. }
  2093. func (rl *clientConnReadLoop) processHeaders(f *MetaHeadersFrame) error {
  2094. cs := rl.streamByID(f.StreamID, headerOrDataFrame)
  2095. if cs == nil {
  2096. // We'd get here if we canceled a request while the
  2097. // server had its response still in flight. So if this
  2098. // was just something we canceled, ignore it.
  2099. return nil
  2100. }
  2101. if cs.readClosed {
  2102. rl.endStreamError(cs, StreamError{
  2103. StreamID: f.StreamID,
  2104. Code: ErrCodeProtocol,
  2105. Cause: errors.New("protocol error: headers after END_STREAM"),
  2106. })
  2107. return nil
  2108. }
  2109. if !cs.firstByte {
  2110. if cs.trace != nil {
  2111. // TODO(bradfitz): move first response byte earlier,
  2112. // when we first read the 9 byte header, not waiting
  2113. // until all the HEADERS+CONTINUATION frames have been
  2114. // merged. This works for now.
  2115. traceFirstResponseByte(cs.trace)
  2116. }
  2117. cs.firstByte = true
  2118. }
  2119. if !cs.pastHeaders {
  2120. cs.pastHeaders = true
  2121. } else {
  2122. return rl.processTrailers(cs, f)
  2123. }
  2124. res, err := rl.handleResponse(cs, f)
  2125. if err != nil {
  2126. if _, ok := err.(ConnectionError); ok {
  2127. return err
  2128. }
  2129. // Any other error type is a stream error.
  2130. rl.endStreamError(cs, StreamError{
  2131. StreamID: f.StreamID,
  2132. Code: ErrCodeProtocol,
  2133. Cause: err,
  2134. })
  2135. return nil // return nil from process* funcs to keep conn alive
  2136. }
  2137. if res == nil {
  2138. // (nil, nil) special case. See handleResponse docs.
  2139. return nil
  2140. }
  2141. cs.resTrailer = &res.Trailer
  2142. cs.res = res
  2143. close(cs.respHeaderRecv)
  2144. if f.StreamEnded() {
  2145. rl.endStream(cs)
  2146. }
  2147. return nil
  2148. }
  2149. // may return error types nil, or ConnectionError. Any other error value
  2150. // is a StreamError of type ErrCodeProtocol. The returned error in that case
  2151. // is the detail.
  2152. //
  2153. // As a special case, handleResponse may return (nil, nil) to skip the
  2154. // frame (currently only used for 1xx responses).
  2155. func (rl *clientConnReadLoop) handleResponse(cs *clientStream, f *MetaHeadersFrame) (*http.Response, error) {
  2156. if f.Truncated {
  2157. return nil, errResponseHeaderListSize
  2158. }
  2159. status := f.PseudoValue("status")
  2160. if status == "" {
  2161. return nil, errors.New("malformed response from server: missing status pseudo header")
  2162. }
  2163. statusCode, err := strconv.Atoi(status)
  2164. if err != nil {
  2165. return nil, errors.New("malformed response from server: malformed non-numeric status pseudo header")
  2166. }
  2167. regularFields := f.RegularFields()
  2168. strs := make([]string, len(regularFields))
  2169. header := make(http.Header, len(regularFields))
  2170. res := &http.Response{
  2171. Proto: "HTTP/2.0",
  2172. ProtoMajor: 2,
  2173. Header: header,
  2174. StatusCode: statusCode,
  2175. Status: status + " " + http.StatusText(statusCode),
  2176. }
  2177. for _, hf := range regularFields {
  2178. key := httpcommon.CanonicalHeader(hf.Name)
  2179. if key == "Trailer" {
  2180. t := res.Trailer
  2181. if t == nil {
  2182. t = make(http.Header)
  2183. res.Trailer = t
  2184. }
  2185. foreachHeaderElement(hf.Value, func(v string) {
  2186. t[httpcommon.CanonicalHeader(v)] = nil
  2187. })
  2188. } else {
  2189. vv := header[key]
  2190. if vv == nil && len(strs) > 0 {
  2191. // More than likely this will be a single-element key.
  2192. // Most headers aren't multi-valued.
  2193. // Set the capacity on strs[0] to 1, so any future append
  2194. // won't extend the slice into the other strings.
  2195. vv, strs = strs[:1:1], strs[1:]
  2196. vv[0] = hf.Value
  2197. header[key] = vv
  2198. } else {
  2199. header[key] = append(vv, hf.Value)
  2200. }
  2201. }
  2202. }
  2203. if statusCode >= 100 && statusCode <= 199 {
  2204. if f.StreamEnded() {
  2205. return nil, errors.New("1xx informational response with END_STREAM flag")
  2206. }
  2207. if fn := cs.get1xxTraceFunc(); fn != nil {
  2208. // If the 1xx response is being delivered to the user,
  2209. // then they're responsible for limiting the number
  2210. // of responses.
  2211. if err := fn(statusCode, textproto.MIMEHeader(header)); err != nil {
  2212. return nil, err
  2213. }
  2214. } else {
  2215. // If the user didn't examine the 1xx response, then we
  2216. // limit the size of all 1xx headers.
  2217. //
  2218. // This differs a bit from the HTTP/1 implementation, which
  2219. // limits the size of all 1xx headers plus the final response.
  2220. // Use the larger limit of MaxHeaderListSize and
  2221. // net/http.Transport.MaxResponseHeaderBytes.
  2222. limit := int64(cs.cc.t.maxHeaderListSize())
  2223. if t1 := cs.cc.t.t1; t1 != nil && t1.MaxResponseHeaderBytes > limit {
  2224. limit = t1.MaxResponseHeaderBytes
  2225. }
  2226. for _, h := range f.Fields {
  2227. cs.totalHeaderSize += int64(h.Size())
  2228. }
  2229. if cs.totalHeaderSize > limit {
  2230. if VerboseLogs {
  2231. log.Printf("http2: 1xx informational responses too large")
  2232. }
  2233. return nil, errors.New("header list too large")
  2234. }
  2235. }
  2236. if statusCode == 100 {
  2237. traceGot100Continue(cs.trace)
  2238. select {
  2239. case cs.on100 <- struct{}{}:
  2240. default:
  2241. }
  2242. }
  2243. cs.pastHeaders = false // do it all again
  2244. return nil, nil
  2245. }
  2246. res.ContentLength = -1
  2247. if clens := res.Header["Content-Length"]; len(clens) == 1 {
  2248. if cl, err := strconv.ParseUint(clens[0], 10, 63); err == nil {
  2249. res.ContentLength = int64(cl)
  2250. } else {
  2251. // TODO: care? unlike http/1, it won't mess up our framing, so it's
  2252. // more safe smuggling-wise to ignore.
  2253. }
  2254. } else if len(clens) > 1 {
  2255. // TODO: care? unlike http/1, it won't mess up our framing, so it's
  2256. // more safe smuggling-wise to ignore.
  2257. } else if f.StreamEnded() && !cs.isHead {
  2258. res.ContentLength = 0
  2259. }
  2260. if cs.isHead {
  2261. res.Body = noBody
  2262. return res, nil
  2263. }
  2264. if f.StreamEnded() {
  2265. if res.ContentLength > 0 {
  2266. res.Body = missingBody{}
  2267. } else {
  2268. res.Body = noBody
  2269. }
  2270. return res, nil
  2271. }
  2272. cs.bufPipe.setBuffer(&dataBuffer{expected: res.ContentLength})
  2273. cs.bytesRemain = res.ContentLength
  2274. res.Body = transportResponseBody{cs}
  2275. if cs.requestedGzip && asciiEqualFold(res.Header.Get("Content-Encoding"), "gzip") {
  2276. res.Header.Del("Content-Encoding")
  2277. res.Header.Del("Content-Length")
  2278. res.ContentLength = -1
  2279. res.Body = &gzipReader{body: res.Body}
  2280. res.Uncompressed = true
  2281. }
  2282. return res, nil
  2283. }
  2284. func (rl *clientConnReadLoop) processTrailers(cs *clientStream, f *MetaHeadersFrame) error {
  2285. if cs.pastTrailers {
  2286. // Too many HEADERS frames for this stream.
  2287. return ConnectionError(ErrCodeProtocol)
  2288. }
  2289. cs.pastTrailers = true
  2290. if !f.StreamEnded() {
  2291. // We expect that any headers for trailers also
  2292. // has END_STREAM.
  2293. return ConnectionError(ErrCodeProtocol)
  2294. }
  2295. if len(f.PseudoFields()) > 0 {
  2296. // No pseudo header fields are defined for trailers.
  2297. // TODO: ConnectionError might be overly harsh? Check.
  2298. return ConnectionError(ErrCodeProtocol)
  2299. }
  2300. trailer := make(http.Header)
  2301. for _, hf := range f.RegularFields() {
  2302. key := httpcommon.CanonicalHeader(hf.Name)
  2303. trailer[key] = append(trailer[key], hf.Value)
  2304. }
  2305. cs.trailer = trailer
  2306. rl.endStream(cs)
  2307. return nil
  2308. }
  2309. // transportResponseBody is the concrete type of Transport.RoundTrip's
  2310. // Response.Body. It is an io.ReadCloser.
  2311. type transportResponseBody struct {
  2312. cs *clientStream
  2313. }
  2314. func (b transportResponseBody) Read(p []byte) (n int, err error) {
  2315. cs := b.cs
  2316. cc := cs.cc
  2317. if cs.readErr != nil {
  2318. return 0, cs.readErr
  2319. }
  2320. n, err = b.cs.bufPipe.Read(p)
  2321. if cs.bytesRemain != -1 {
  2322. if int64(n) > cs.bytesRemain {
  2323. n = int(cs.bytesRemain)
  2324. if err == nil {
  2325. err = errors.New("net/http: server replied with more than declared Content-Length; truncated")
  2326. cs.abortStream(err)
  2327. }
  2328. cs.readErr = err
  2329. return int(cs.bytesRemain), err
  2330. }
  2331. cs.bytesRemain -= int64(n)
  2332. if err == io.EOF && cs.bytesRemain > 0 {
  2333. err = io.ErrUnexpectedEOF
  2334. cs.readErr = err
  2335. return n, err
  2336. }
  2337. }
  2338. if n == 0 {
  2339. // No flow control tokens to send back.
  2340. return
  2341. }
  2342. cc.mu.Lock()
  2343. connAdd := cc.inflow.add(n)
  2344. var streamAdd int32
  2345. if err == nil { // No need to refresh if the stream is over or failed.
  2346. streamAdd = cs.inflow.add(n)
  2347. }
  2348. cc.mu.Unlock()
  2349. if connAdd != 0 || streamAdd != 0 {
  2350. cc.wmu.Lock()
  2351. defer cc.wmu.Unlock()
  2352. if connAdd != 0 {
  2353. cc.fr.WriteWindowUpdate(0, mustUint31(connAdd))
  2354. }
  2355. if streamAdd != 0 {
  2356. cc.fr.WriteWindowUpdate(cs.ID, mustUint31(streamAdd))
  2357. }
  2358. cc.bw.Flush()
  2359. }
  2360. return
  2361. }
  2362. var errClosedResponseBody = errors.New("http2: response body closed")
  2363. func (b transportResponseBody) Close() error {
  2364. cs := b.cs
  2365. cc := cs.cc
  2366. cs.bufPipe.BreakWithError(errClosedResponseBody)
  2367. cs.abortStream(errClosedResponseBody)
  2368. unread := cs.bufPipe.Len()
  2369. if unread > 0 {
  2370. cc.mu.Lock()
  2371. // Return connection-level flow control.
  2372. connAdd := cc.inflow.add(unread)
  2373. cc.mu.Unlock()
  2374. // TODO(dneil): Acquiring this mutex can block indefinitely.
  2375. // Move flow control return to a goroutine?
  2376. cc.wmu.Lock()
  2377. // Return connection-level flow control.
  2378. if connAdd > 0 {
  2379. cc.fr.WriteWindowUpdate(0, uint32(connAdd))
  2380. }
  2381. cc.bw.Flush()
  2382. cc.wmu.Unlock()
  2383. }
  2384. select {
  2385. case <-cs.donec:
  2386. case <-cs.ctx.Done():
  2387. // See golang/go#49366: The net/http package can cancel the
  2388. // request context after the response body is fully read.
  2389. // Don't treat this as an error.
  2390. return nil
  2391. case <-cs.reqCancel:
  2392. return errRequestCanceled
  2393. }
  2394. return nil
  2395. }
  2396. func (rl *clientConnReadLoop) processData(f *DataFrame) error {
  2397. cc := rl.cc
  2398. cs := rl.streamByID(f.StreamID, headerOrDataFrame)
  2399. data := f.Data()
  2400. if cs == nil {
  2401. cc.mu.Lock()
  2402. neverSent := cc.nextStreamID
  2403. cc.mu.Unlock()
  2404. if f.StreamID >= neverSent {
  2405. // We never asked for this.
  2406. cc.logf("http2: Transport received unsolicited DATA frame; closing connection")
  2407. return ConnectionError(ErrCodeProtocol)
  2408. }
  2409. // We probably did ask for this, but canceled. Just ignore it.
  2410. // TODO: be stricter here? only silently ignore things which
  2411. // we canceled, but not things which were closed normally
  2412. // by the peer? Tough without accumulating too much state.
  2413. // But at least return their flow control:
  2414. if f.Length > 0 {
  2415. cc.mu.Lock()
  2416. ok := cc.inflow.take(f.Length)
  2417. connAdd := cc.inflow.add(int(f.Length))
  2418. cc.mu.Unlock()
  2419. if !ok {
  2420. return ConnectionError(ErrCodeFlowControl)
  2421. }
  2422. if connAdd > 0 {
  2423. cc.wmu.Lock()
  2424. cc.fr.WriteWindowUpdate(0, uint32(connAdd))
  2425. cc.bw.Flush()
  2426. cc.wmu.Unlock()
  2427. }
  2428. }
  2429. return nil
  2430. }
  2431. if cs.readClosed {
  2432. cc.logf("protocol error: received DATA after END_STREAM")
  2433. rl.endStreamError(cs, StreamError{
  2434. StreamID: f.StreamID,
  2435. Code: ErrCodeProtocol,
  2436. })
  2437. return nil
  2438. }
  2439. if !cs.pastHeaders {
  2440. cc.logf("protocol error: received DATA before a HEADERS frame")
  2441. rl.endStreamError(cs, StreamError{
  2442. StreamID: f.StreamID,
  2443. Code: ErrCodeProtocol,
  2444. })
  2445. return nil
  2446. }
  2447. if f.Length > 0 {
  2448. if cs.isHead && len(data) > 0 {
  2449. cc.logf("protocol error: received DATA on a HEAD request")
  2450. rl.endStreamError(cs, StreamError{
  2451. StreamID: f.StreamID,
  2452. Code: ErrCodeProtocol,
  2453. })
  2454. return nil
  2455. }
  2456. // Check connection-level flow control.
  2457. cc.mu.Lock()
  2458. if !takeInflows(&cc.inflow, &cs.inflow, f.Length) {
  2459. cc.mu.Unlock()
  2460. return ConnectionError(ErrCodeFlowControl)
  2461. }
  2462. // Return any padded flow control now, since we won't
  2463. // refund it later on body reads.
  2464. var refund int
  2465. if pad := int(f.Length) - len(data); pad > 0 {
  2466. refund += pad
  2467. }
  2468. didReset := false
  2469. var err error
  2470. if len(data) > 0 {
  2471. if _, err = cs.bufPipe.Write(data); err != nil {
  2472. // Return len(data) now if the stream is already closed,
  2473. // since data will never be read.
  2474. didReset = true
  2475. refund += len(data)
  2476. }
  2477. }
  2478. sendConn := cc.inflow.add(refund)
  2479. var sendStream int32
  2480. if !didReset {
  2481. sendStream = cs.inflow.add(refund)
  2482. }
  2483. cc.mu.Unlock()
  2484. if sendConn > 0 || sendStream > 0 {
  2485. cc.wmu.Lock()
  2486. if sendConn > 0 {
  2487. cc.fr.WriteWindowUpdate(0, uint32(sendConn))
  2488. }
  2489. if sendStream > 0 {
  2490. cc.fr.WriteWindowUpdate(cs.ID, uint32(sendStream))
  2491. }
  2492. cc.bw.Flush()
  2493. cc.wmu.Unlock()
  2494. }
  2495. if err != nil {
  2496. rl.endStreamError(cs, err)
  2497. return nil
  2498. }
  2499. }
  2500. if f.StreamEnded() {
  2501. rl.endStream(cs)
  2502. }
  2503. return nil
  2504. }
  2505. func (rl *clientConnReadLoop) endStream(cs *clientStream) {
  2506. // TODO: check that any declared content-length matches, like
  2507. // server.go's (*stream).endStream method.
  2508. if !cs.readClosed {
  2509. cs.readClosed = true
  2510. // Close cs.bufPipe and cs.peerClosed with cc.mu held to avoid a
  2511. // race condition: The caller can read io.EOF from Response.Body
  2512. // and close the body before we close cs.peerClosed, causing
  2513. // cleanupWriteRequest to send a RST_STREAM.
  2514. rl.cc.mu.Lock()
  2515. defer rl.cc.mu.Unlock()
  2516. cs.bufPipe.closeWithErrorAndCode(io.EOF, cs.copyTrailers)
  2517. close(cs.peerClosed)
  2518. }
  2519. }
  2520. func (rl *clientConnReadLoop) endStreamError(cs *clientStream, err error) {
  2521. cs.readAborted = true
  2522. cs.abortStream(err)
  2523. }
  2524. // Constants passed to streamByID for documentation purposes.
  2525. const (
  2526. headerOrDataFrame = true
  2527. notHeaderOrDataFrame = false
  2528. )
  2529. // streamByID returns the stream with the given id, or nil if no stream has that id.
  2530. // If headerOrData is true, it clears rst.StreamPingsBlocked.
  2531. func (rl *clientConnReadLoop) streamByID(id uint32, headerOrData bool) *clientStream {
  2532. rl.cc.mu.Lock()
  2533. defer rl.cc.mu.Unlock()
  2534. if headerOrData {
  2535. // Work around an unfortunate gRPC behavior.
  2536. // See comment on ClientConn.rstStreamPingsBlocked for details.
  2537. rl.cc.rstStreamPingsBlocked = false
  2538. }
  2539. cs := rl.cc.streams[id]
  2540. if cs != nil && !cs.readAborted {
  2541. return cs
  2542. }
  2543. return nil
  2544. }
  2545. func (cs *clientStream) copyTrailers() {
  2546. for k, vv := range cs.trailer {
  2547. t := cs.resTrailer
  2548. if *t == nil {
  2549. *t = make(http.Header)
  2550. }
  2551. (*t)[k] = vv
  2552. }
  2553. }
  2554. func (rl *clientConnReadLoop) processGoAway(f *GoAwayFrame) error {
  2555. cc := rl.cc
  2556. cc.t.connPool().MarkDead(cc)
  2557. if f.ErrCode != 0 {
  2558. // TODO: deal with GOAWAY more. particularly the error code
  2559. cc.vlogf("transport got GOAWAY with error code = %v", f.ErrCode)
  2560. if fn := cc.t.CountError; fn != nil {
  2561. fn("recv_goaway_" + f.ErrCode.stringToken())
  2562. }
  2563. }
  2564. cc.setGoAway(f)
  2565. return nil
  2566. }
  2567. func (rl *clientConnReadLoop) processSettings(f *SettingsFrame) error {
  2568. cc := rl.cc
  2569. // Locking both mu and wmu here allows frame encoding to read settings with only wmu held.
  2570. // Acquiring wmu when f.IsAck() is unnecessary, but convenient and mostly harmless.
  2571. cc.wmu.Lock()
  2572. defer cc.wmu.Unlock()
  2573. if err := rl.processSettingsNoWrite(f); err != nil {
  2574. return err
  2575. }
  2576. if !f.IsAck() {
  2577. cc.fr.WriteSettingsAck()
  2578. cc.bw.Flush()
  2579. }
  2580. return nil
  2581. }
  2582. func (rl *clientConnReadLoop) processSettingsNoWrite(f *SettingsFrame) error {
  2583. cc := rl.cc
  2584. cc.mu.Lock()
  2585. defer cc.mu.Unlock()
  2586. if f.IsAck() {
  2587. if cc.wantSettingsAck {
  2588. cc.wantSettingsAck = false
  2589. return nil
  2590. }
  2591. return ConnectionError(ErrCodeProtocol)
  2592. }
  2593. var seenMaxConcurrentStreams bool
  2594. err := f.ForeachSetting(func(s Setting) error {
  2595. switch s.ID {
  2596. case SettingMaxFrameSize:
  2597. cc.maxFrameSize = s.Val
  2598. case SettingMaxConcurrentStreams:
  2599. cc.maxConcurrentStreams = s.Val
  2600. seenMaxConcurrentStreams = true
  2601. case SettingMaxHeaderListSize:
  2602. cc.peerMaxHeaderListSize = uint64(s.Val)
  2603. case SettingInitialWindowSize:
  2604. // Values above the maximum flow-control
  2605. // window size of 2^31-1 MUST be treated as a
  2606. // connection error (Section 5.4.1) of type
  2607. // FLOW_CONTROL_ERROR.
  2608. if s.Val > math.MaxInt32 {
  2609. return ConnectionError(ErrCodeFlowControl)
  2610. }
  2611. // Adjust flow control of currently-open
  2612. // frames by the difference of the old initial
  2613. // window size and this one.
  2614. delta := int32(s.Val) - int32(cc.initialWindowSize)
  2615. for _, cs := range cc.streams {
  2616. cs.flow.add(delta)
  2617. }
  2618. cc.cond.Broadcast()
  2619. cc.initialWindowSize = s.Val
  2620. case SettingHeaderTableSize:
  2621. cc.henc.SetMaxDynamicTableSize(s.Val)
  2622. cc.peerMaxHeaderTableSize = s.Val
  2623. case SettingEnableConnectProtocol:
  2624. if err := s.Valid(); err != nil {
  2625. return err
  2626. }
  2627. // If the peer wants to send us SETTINGS_ENABLE_CONNECT_PROTOCOL,
  2628. // we require that it do so in the first SETTINGS frame.
  2629. //
  2630. // When we attempt to use extended CONNECT, we wait for the first
  2631. // SETTINGS frame to see if the server supports it. If we let the
  2632. // server enable the feature with a later SETTINGS frame, then
  2633. // users will see inconsistent results depending on whether we've
  2634. // seen that frame or not.
  2635. if !cc.seenSettings {
  2636. cc.extendedConnectAllowed = s.Val == 1
  2637. }
  2638. default:
  2639. cc.vlogf("Unhandled Setting: %v", s)
  2640. }
  2641. return nil
  2642. })
  2643. if err != nil {
  2644. return err
  2645. }
  2646. if !cc.seenSettings {
  2647. if !seenMaxConcurrentStreams {
  2648. // This was the servers initial SETTINGS frame and it
  2649. // didn't contain a MAX_CONCURRENT_STREAMS field so
  2650. // increase the number of concurrent streams this
  2651. // connection can establish to our default.
  2652. cc.maxConcurrentStreams = defaultMaxConcurrentStreams
  2653. }
  2654. close(cc.seenSettingsChan)
  2655. cc.seenSettings = true
  2656. }
  2657. return nil
  2658. }
  2659. func (rl *clientConnReadLoop) processWindowUpdate(f *WindowUpdateFrame) error {
  2660. cc := rl.cc
  2661. cs := rl.streamByID(f.StreamID, notHeaderOrDataFrame)
  2662. if f.StreamID != 0 && cs == nil {
  2663. return nil
  2664. }
  2665. cc.mu.Lock()
  2666. defer cc.mu.Unlock()
  2667. fl := &cc.flow
  2668. if cs != nil {
  2669. fl = &cs.flow
  2670. }
  2671. if !fl.add(int32(f.Increment)) {
  2672. // For stream, the sender sends RST_STREAM with an error code of FLOW_CONTROL_ERROR
  2673. if cs != nil {
  2674. rl.endStreamError(cs, StreamError{
  2675. StreamID: f.StreamID,
  2676. Code: ErrCodeFlowControl,
  2677. })
  2678. return nil
  2679. }
  2680. return ConnectionError(ErrCodeFlowControl)
  2681. }
  2682. cc.cond.Broadcast()
  2683. return nil
  2684. }
  2685. func (rl *clientConnReadLoop) processResetStream(f *RSTStreamFrame) error {
  2686. cs := rl.streamByID(f.StreamID, notHeaderOrDataFrame)
  2687. if cs == nil {
  2688. // TODO: return error if server tries to RST_STREAM an idle stream
  2689. return nil
  2690. }
  2691. serr := streamError(cs.ID, f.ErrCode)
  2692. serr.Cause = errFromPeer
  2693. if f.ErrCode == ErrCodeProtocol {
  2694. rl.cc.SetDoNotReuse()
  2695. }
  2696. if fn := cs.cc.t.CountError; fn != nil {
  2697. fn("recv_rststream_" + f.ErrCode.stringToken())
  2698. }
  2699. cs.abortStream(serr)
  2700. cs.bufPipe.CloseWithError(serr)
  2701. return nil
  2702. }
  2703. // Ping sends a PING frame to the server and waits for the ack.
  2704. func (cc *ClientConn) Ping(ctx context.Context) error {
  2705. c := make(chan struct{})
  2706. // Generate a random payload
  2707. var p [8]byte
  2708. for {
  2709. if _, err := rand.Read(p[:]); err != nil {
  2710. return err
  2711. }
  2712. cc.mu.Lock()
  2713. // check for dup before insert
  2714. if _, found := cc.pings[p]; !found {
  2715. cc.pings[p] = c
  2716. cc.mu.Unlock()
  2717. break
  2718. }
  2719. cc.mu.Unlock()
  2720. }
  2721. var pingError error
  2722. errc := make(chan struct{})
  2723. go func() {
  2724. cc.t.markNewGoroutine()
  2725. cc.wmu.Lock()
  2726. defer cc.wmu.Unlock()
  2727. if pingError = cc.fr.WritePing(false, p); pingError != nil {
  2728. close(errc)
  2729. return
  2730. }
  2731. if pingError = cc.bw.Flush(); pingError != nil {
  2732. close(errc)
  2733. return
  2734. }
  2735. }()
  2736. select {
  2737. case <-c:
  2738. return nil
  2739. case <-errc:
  2740. return pingError
  2741. case <-ctx.Done():
  2742. return ctx.Err()
  2743. case <-cc.readerDone:
  2744. // connection closed
  2745. return cc.readerErr
  2746. }
  2747. }
  2748. func (rl *clientConnReadLoop) processPing(f *PingFrame) error {
  2749. if f.IsAck() {
  2750. cc := rl.cc
  2751. cc.mu.Lock()
  2752. defer cc.mu.Unlock()
  2753. // If ack, notify listener if any
  2754. if c, ok := cc.pings[f.Data]; ok {
  2755. close(c)
  2756. delete(cc.pings, f.Data)
  2757. }
  2758. if cc.pendingResets > 0 {
  2759. // See clientStream.cleanupWriteRequest.
  2760. cc.pendingResets = 0
  2761. cc.rstStreamPingsBlocked = true
  2762. cc.cond.Broadcast()
  2763. }
  2764. return nil
  2765. }
  2766. cc := rl.cc
  2767. cc.wmu.Lock()
  2768. defer cc.wmu.Unlock()
  2769. if err := cc.fr.WritePing(true, f.Data); err != nil {
  2770. return err
  2771. }
  2772. return cc.bw.Flush()
  2773. }
  2774. func (rl *clientConnReadLoop) processPushPromise(f *PushPromiseFrame) error {
  2775. // We told the peer we don't want them.
  2776. // Spec says:
  2777. // "PUSH_PROMISE MUST NOT be sent if the SETTINGS_ENABLE_PUSH
  2778. // setting of the peer endpoint is set to 0. An endpoint that
  2779. // has set this setting and has received acknowledgement MUST
  2780. // treat the receipt of a PUSH_PROMISE frame as a connection
  2781. // error (Section 5.4.1) of type PROTOCOL_ERROR."
  2782. return ConnectionError(ErrCodeProtocol)
  2783. }
  2784. // writeStreamReset sends a RST_STREAM frame.
  2785. // When ping is true, it also sends a PING frame with a random payload.
  2786. func (cc *ClientConn) writeStreamReset(streamID uint32, code ErrCode, ping bool, err error) {
  2787. // TODO: map err to more interesting error codes, once the
  2788. // HTTP community comes up with some. But currently for
  2789. // RST_STREAM there's no equivalent to GOAWAY frame's debug
  2790. // data, and the error codes are all pretty vague ("cancel").
  2791. cc.wmu.Lock()
  2792. cc.fr.WriteRSTStream(streamID, code)
  2793. if ping {
  2794. var payload [8]byte
  2795. rand.Read(payload[:])
  2796. cc.fr.WritePing(false, payload)
  2797. }
  2798. cc.bw.Flush()
  2799. cc.wmu.Unlock()
  2800. }
  2801. var (
  2802. errResponseHeaderListSize = errors.New("http2: response header list larger than advertised limit")
  2803. errRequestHeaderListSize = httpcommon.ErrRequestHeaderListSize
  2804. )
  2805. func (cc *ClientConn) logf(format string, args ...interface{}) {
  2806. cc.t.logf(format, args...)
  2807. }
  2808. func (cc *ClientConn) vlogf(format string, args ...interface{}) {
  2809. cc.t.vlogf(format, args...)
  2810. }
  2811. func (t *Transport) vlogf(format string, args ...interface{}) {
  2812. if VerboseLogs {
  2813. t.logf(format, args...)
  2814. }
  2815. }
  2816. func (t *Transport) logf(format string, args ...interface{}) {
  2817. log.Printf(format, args...)
  2818. }
  2819. var noBody io.ReadCloser = noBodyReader{}
  2820. type noBodyReader struct{}
  2821. func (noBodyReader) Close() error { return nil }
  2822. func (noBodyReader) Read([]byte) (int, error) { return 0, io.EOF }
  2823. type missingBody struct{}
  2824. func (missingBody) Close() error { return nil }
  2825. func (missingBody) Read([]byte) (int, error) { return 0, io.ErrUnexpectedEOF }
  2826. func strSliceContains(ss []string, s string) bool {
  2827. for _, v := range ss {
  2828. if v == s {
  2829. return true
  2830. }
  2831. }
  2832. return false
  2833. }
  2834. type erringRoundTripper struct{ err error }
  2835. func (rt erringRoundTripper) RoundTripErr() error { return rt.err }
  2836. func (rt erringRoundTripper) RoundTrip(*http.Request) (*http.Response, error) { return nil, rt.err }
  2837. // gzipReader wraps a response body so it can lazily
  2838. // call gzip.NewReader on the first call to Read
  2839. type gzipReader struct {
  2840. _ incomparable
  2841. body io.ReadCloser // underlying Response.Body
  2842. zr *gzip.Reader // lazily-initialized gzip reader
  2843. zerr error // sticky error
  2844. }
  2845. func (gz *gzipReader) Read(p []byte) (n int, err error) {
  2846. if gz.zerr != nil {
  2847. return 0, gz.zerr
  2848. }
  2849. if gz.zr == nil {
  2850. gz.zr, err = gzip.NewReader(gz.body)
  2851. if err != nil {
  2852. gz.zerr = err
  2853. return 0, err
  2854. }
  2855. }
  2856. return gz.zr.Read(p)
  2857. }
  2858. func (gz *gzipReader) Close() error {
  2859. if err := gz.body.Close(); err != nil {
  2860. return err
  2861. }
  2862. gz.zerr = fs.ErrClosed
  2863. return nil
  2864. }
  2865. type errorReader struct{ err error }
  2866. func (r errorReader) Read(p []byte) (int, error) { return 0, r.err }
  2867. // isConnectionCloseRequest reports whether req should use its own
  2868. // connection for a single request and then close the connection.
  2869. func isConnectionCloseRequest(req *http.Request) bool {
  2870. return req.Close || httpguts.HeaderValuesContainsToken(req.Header["Connection"], "close")
  2871. }
  2872. // registerHTTPSProtocol calls Transport.RegisterProtocol but
  2873. // converting panics into errors.
  2874. func registerHTTPSProtocol(t *http.Transport, rt noDialH2RoundTripper) (err error) {
  2875. defer func() {
  2876. if e := recover(); e != nil {
  2877. err = fmt.Errorf("%v", e)
  2878. }
  2879. }()
  2880. t.RegisterProtocol("https", rt)
  2881. return nil
  2882. }
  2883. // noDialH2RoundTripper is a RoundTripper which only tries to complete the request
  2884. // if there's already has a cached connection to the host.
  2885. // (The field is exported so it can be accessed via reflect from net/http; tested
  2886. // by TestNoDialH2RoundTripperType)
  2887. type noDialH2RoundTripper struct{ *Transport }
  2888. func (rt noDialH2RoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
  2889. res, err := rt.Transport.RoundTrip(req)
  2890. if isNoCachedConnError(err) {
  2891. return nil, http.ErrSkipAltProtocol
  2892. }
  2893. return res, err
  2894. }
  2895. func (t *Transport) idleConnTimeout() time.Duration {
  2896. // to keep things backwards compatible, we use non-zero values of
  2897. // IdleConnTimeout, followed by using the IdleConnTimeout on the underlying
  2898. // http1 transport, followed by 0
  2899. if t.IdleConnTimeout != 0 {
  2900. return t.IdleConnTimeout
  2901. }
  2902. if t.t1 != nil {
  2903. return t.t1.IdleConnTimeout
  2904. }
  2905. return 0
  2906. }
  2907. func traceGetConn(req *http.Request, hostPort string) {
  2908. trace := httptrace.ContextClientTrace(req.Context())
  2909. if trace == nil || trace.GetConn == nil {
  2910. return
  2911. }
  2912. trace.GetConn(hostPort)
  2913. }
  2914. func traceGotConn(req *http.Request, cc *ClientConn, reused bool) {
  2915. trace := httptrace.ContextClientTrace(req.Context())
  2916. if trace == nil || trace.GotConn == nil {
  2917. return
  2918. }
  2919. ci := httptrace.GotConnInfo{Conn: cc.tconn}
  2920. ci.Reused = reused
  2921. cc.mu.Lock()
  2922. ci.WasIdle = len(cc.streams) == 0 && reused
  2923. if ci.WasIdle && !cc.lastActive.IsZero() {
  2924. ci.IdleTime = cc.t.timeSince(cc.lastActive)
  2925. }
  2926. cc.mu.Unlock()
  2927. trace.GotConn(ci)
  2928. }
  2929. func traceWroteHeaders(trace *httptrace.ClientTrace) {
  2930. if trace != nil && trace.WroteHeaders != nil {
  2931. trace.WroteHeaders()
  2932. }
  2933. }
  2934. func traceGot100Continue(trace *httptrace.ClientTrace) {
  2935. if trace != nil && trace.Got100Continue != nil {
  2936. trace.Got100Continue()
  2937. }
  2938. }
  2939. func traceWait100Continue(trace *httptrace.ClientTrace) {
  2940. if trace != nil && trace.Wait100Continue != nil {
  2941. trace.Wait100Continue()
  2942. }
  2943. }
  2944. func traceWroteRequest(trace *httptrace.ClientTrace, err error) {
  2945. if trace != nil && trace.WroteRequest != nil {
  2946. trace.WroteRequest(httptrace.WroteRequestInfo{Err: err})
  2947. }
  2948. }
  2949. func traceFirstResponseByte(trace *httptrace.ClientTrace) {
  2950. if trace != nil && trace.GotFirstResponseByte != nil {
  2951. trace.GotFirstResponseByte()
  2952. }
  2953. }
  2954. func traceGot1xxResponseFunc(trace *httptrace.ClientTrace) func(int, textproto.MIMEHeader) error {
  2955. if trace != nil {
  2956. return trace.Got1xxResponse
  2957. }
  2958. return nil
  2959. }
  2960. // dialTLSWithContext uses tls.Dialer, added in Go 1.15, to open a TLS
  2961. // connection.
  2962. func (t *Transport) dialTLSWithContext(ctx context.Context, network, addr string, cfg *tls.Config) (*tls.Conn, error) {
  2963. dialer := &tls.Dialer{
  2964. Config: cfg,
  2965. }
  2966. cn, err := dialer.DialContext(ctx, network, addr)
  2967. if err != nil {
  2968. return nil, err
  2969. }
  2970. tlsCn := cn.(*tls.Conn) // DialContext comment promises this will always succeed
  2971. return tlsCn, nil
  2972. }