pipe.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. // Copyright 2014 The Go Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package http2
  5. import (
  6. "errors"
  7. "io"
  8. "sync"
  9. )
  10. // pipe is a goroutine-safe io.Reader/io.Writer pair. It's like
  11. // io.Pipe except there are no PipeReader/PipeWriter halves, and the
  12. // underlying buffer is an interface. (io.Pipe is always unbuffered)
  13. type pipe struct {
  14. mu sync.Mutex
  15. c sync.Cond // c.L lazily initialized to &p.mu
  16. b pipeBuffer // nil when done reading
  17. unread int // bytes unread when done
  18. err error // read error once empty. non-nil means closed.
  19. breakErr error // immediate read error (caller doesn't see rest of b)
  20. donec chan struct{} // closed on error
  21. readFn func() // optional code to run in Read before error
  22. }
  23. type pipeBuffer interface {
  24. Len() int
  25. io.Writer
  26. io.Reader
  27. }
  28. // setBuffer initializes the pipe buffer.
  29. // It has no effect if the pipe is already closed.
  30. func (p *pipe) setBuffer(b pipeBuffer) {
  31. p.mu.Lock()
  32. defer p.mu.Unlock()
  33. if p.err != nil || p.breakErr != nil {
  34. return
  35. }
  36. p.b = b
  37. }
  38. func (p *pipe) Len() int {
  39. p.mu.Lock()
  40. defer p.mu.Unlock()
  41. if p.b == nil {
  42. return p.unread
  43. }
  44. return p.b.Len()
  45. }
  46. // Read waits until data is available and copies bytes
  47. // from the buffer into p.
  48. func (p *pipe) Read(d []byte) (n int, err error) {
  49. p.mu.Lock()
  50. defer p.mu.Unlock()
  51. if p.c.L == nil {
  52. p.c.L = &p.mu
  53. }
  54. for {
  55. if p.breakErr != nil {
  56. return 0, p.breakErr
  57. }
  58. if p.b != nil && p.b.Len() > 0 {
  59. return p.b.Read(d)
  60. }
  61. if p.err != nil {
  62. if p.readFn != nil {
  63. p.readFn() // e.g. copy trailers
  64. p.readFn = nil // not sticky like p.err
  65. }
  66. p.b = nil
  67. return 0, p.err
  68. }
  69. p.c.Wait()
  70. }
  71. }
  72. var (
  73. errClosedPipeWrite = errors.New("write on closed buffer")
  74. errUninitializedPipeWrite = errors.New("write on uninitialized buffer")
  75. )
  76. // Write copies bytes from p into the buffer and wakes a reader.
  77. // It is an error to write more data than the buffer can hold.
  78. func (p *pipe) Write(d []byte) (n int, err error) {
  79. p.mu.Lock()
  80. defer p.mu.Unlock()
  81. if p.c.L == nil {
  82. p.c.L = &p.mu
  83. }
  84. defer p.c.Signal()
  85. if p.err != nil || p.breakErr != nil {
  86. return 0, errClosedPipeWrite
  87. }
  88. // pipe.setBuffer is never invoked, leaving the buffer uninitialized.
  89. // We shouldn't try to write to an uninitialized pipe,
  90. // but returning an error is better than panicking.
  91. if p.b == nil {
  92. return 0, errUninitializedPipeWrite
  93. }
  94. return p.b.Write(d)
  95. }
  96. // CloseWithError causes the next Read (waking up a current blocked
  97. // Read if needed) to return the provided err after all data has been
  98. // read.
  99. //
  100. // The error must be non-nil.
  101. func (p *pipe) CloseWithError(err error) { p.closeWithError(&p.err, err, nil) }
  102. // BreakWithError causes the next Read (waking up a current blocked
  103. // Read if needed) to return the provided err immediately, without
  104. // waiting for unread data.
  105. func (p *pipe) BreakWithError(err error) { p.closeWithError(&p.breakErr, err, nil) }
  106. // closeWithErrorAndCode is like CloseWithError but also sets some code to run
  107. // in the caller's goroutine before returning the error.
  108. func (p *pipe) closeWithErrorAndCode(err error, fn func()) { p.closeWithError(&p.err, err, fn) }
  109. func (p *pipe) closeWithError(dst *error, err error, fn func()) {
  110. if err == nil {
  111. panic("err must be non-nil")
  112. }
  113. p.mu.Lock()
  114. defer p.mu.Unlock()
  115. if p.c.L == nil {
  116. p.c.L = &p.mu
  117. }
  118. defer p.c.Signal()
  119. if *dst != nil {
  120. // Already been done.
  121. return
  122. }
  123. p.readFn = fn
  124. if dst == &p.breakErr {
  125. if p.b != nil {
  126. p.unread += p.b.Len()
  127. }
  128. p.b = nil
  129. }
  130. *dst = err
  131. p.closeDoneLocked()
  132. }
  133. // requires p.mu be held.
  134. func (p *pipe) closeDoneLocked() {
  135. if p.donec == nil {
  136. return
  137. }
  138. // Close if unclosed. This isn't racy since we always
  139. // hold p.mu while closing.
  140. select {
  141. case <-p.donec:
  142. default:
  143. close(p.donec)
  144. }
  145. }
  146. // Err returns the error (if any) first set by BreakWithError or CloseWithError.
  147. func (p *pipe) Err() error {
  148. p.mu.Lock()
  149. defer p.mu.Unlock()
  150. if p.breakErr != nil {
  151. return p.breakErr
  152. }
  153. return p.err
  154. }
  155. // Done returns a channel which is closed if and when this pipe is closed
  156. // with CloseWithError.
  157. func (p *pipe) Done() <-chan struct{} {
  158. p.mu.Lock()
  159. defer p.mu.Unlock()
  160. if p.donec == nil {
  161. p.donec = make(chan struct{})
  162. if p.err != nil || p.breakErr != nil {
  163. // Already hit an error.
  164. p.closeDoneLocked()
  165. }
  166. }
  167. return p.donec
  168. }