Merge pull request #571 from bradfitz/internal
Remove test-only methods from grpc package.
diff --git a/.travis.yml b/.travis.yml
index 3f83776..055d664 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -11,4 +11,3 @@
script:
- make test testrace
- - make coverage
diff --git a/call_test.go b/call_test.go
index fb587e3..0a224f0 100644
--- a/call_test.go
+++ b/call_test.go
@@ -39,6 +39,7 @@
"math"
"net"
"strconv"
+ "strings"
"sync"
"testing"
"time"
@@ -95,7 +96,7 @@
return
}
if v != expectedRequest {
- h.t.WriteStatus(s, codes.Internal, string(make([]byte, sizeLargeErr)))
+ h.t.WriteStatus(s, codes.Internal, strings.Repeat("A", sizeLargeErr))
return
}
}
diff --git a/test/end2end_test.go b/test/end2end_test.go
index d116455..86c921f 100644
--- a/test/end2end_test.go
+++ b/test/end2end_test.go
@@ -1807,7 +1807,9 @@
strings.Contains(stack, "interestingGoroutines") ||
strings.Contains(stack, "runtime.MHeap_Scavenger") ||
strings.Contains(stack, "signal.signal_recv") ||
- strings.Contains(stack, "sigterm.handler") {
+ strings.Contains(stack, "sigterm.handler") ||
+ strings.Contains(stack, "runtime_mcall") ||
+ strings.Contains(stack, "goroutine in C code") {
continue
}
gs = append(gs, g)
diff --git a/transport/http2_client.go b/transport/http2_client.go
index bb72fea..1978053 100644
--- a/transport/http2_client.go
+++ b/transport/http2_client.go
@@ -550,14 +550,8 @@
func (t *http2Client) getStream(f http2.Frame) (*Stream, bool) {
t.mu.Lock()
defer t.mu.Unlock()
- if t.activeStreams == nil {
- // The transport is closing.
- return nil, false
- }
- if s, ok := t.activeStreams[f.Header().StreamID]; ok {
- return s, true
- }
- return nil, false
+ s, ok := t.activeStreams[f.Header().StreamID]
+ return s, ok
}
// updateWindow adjusts the inbound quota for the stream and the transport.
@@ -680,54 +674,49 @@
}
}
-// operateHeader takes action on the decoded headers. It returns the current
-// stream if there are remaining headers on the wire (in the following
-// Continuation frame).
-func (t *http2Client) operateHeaders(hDec *hpackDecoder, s *Stream, frame headerFrame, endStream bool) (pendingStream *Stream) {
- defer func() {
- if pendingStream == nil {
- hDec.state = decodeState{}
- }
- }()
- endHeaders, err := hDec.decodeClientHTTP2Headers(frame)
- if s == nil {
- // s has been closed.
- return nil
+// operateHeaders takes action on the decoded headers.
+func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
+ s, ok := t.getStream(frame)
+ if !ok {
+ return
}
- if err != nil {
- s.write(recvMsg{err: err})
+ var state decodeState
+ for _, hf := range frame.Fields {
+ state.processHeaderField(hf)
+ }
+ if state.err != nil {
+ s.write(recvMsg{err: state.err})
// Something wrong. Stops reading even when there is remaining.
- return nil
+ return
}
- if !endHeaders {
- return s
- }
+
+ endStream := frame.StreamEnded()
+
s.mu.Lock()
if !endStream {
- s.recvCompress = hDec.state.encoding
+ s.recvCompress = state.encoding
}
if !s.headerDone {
- if !endStream && len(hDec.state.mdata) > 0 {
- s.header = hDec.state.mdata
+ if !endStream && len(state.mdata) > 0 {
+ s.header = state.mdata
}
close(s.headerChan)
s.headerDone = true
}
if !endStream || s.state == streamDone {
s.mu.Unlock()
- return nil
+ return
}
- if len(hDec.state.mdata) > 0 {
- s.trailer = hDec.state.mdata
+ if len(state.mdata) > 0 {
+ s.trailer = state.mdata
}
s.state = streamDone
- s.statusCode = hDec.state.statusCode
- s.statusDesc = hDec.state.statusDesc
+ s.statusCode = state.statusCode
+ s.statusDesc = state.statusDesc
s.mu.Unlock()
s.write(recvMsg{err: io.EOF})
- return nil
}
// reader runs as a separate goroutine in charge of reading data from network
@@ -750,8 +739,6 @@
}
t.handleSettings(sf)
- hDec := newHPACKDecoder()
- var curStream *Stream
// loop to keep reading incoming messages on this transport.
for {
frame, err := t.framer.readFrame()
@@ -760,15 +747,8 @@
return
}
switch frame := frame.(type) {
- case *http2.HeadersFrame:
- // operateHeaders has to be invoked regardless the value of curStream
- // because the HPACK decoder needs to be updated using the received
- // headers.
- curStream, _ = t.getStream(frame)
- endStream := frame.Header().Flags.Has(http2.FlagHeadersEndStream)
- curStream = t.operateHeaders(hDec, curStream, frame, endStream)
- case *http2.ContinuationFrame:
- curStream = t.operateHeaders(hDec, curStream, frame, frame.HeadersEnded())
+ case *http2.MetaHeadersFrame:
+ t.operateHeaders(frame)
case *http2.DataFrame:
t.handleData(frame)
case *http2.RSTStreamFrame:
@@ -866,6 +846,17 @@
func (t *http2Client) notifyError(err error) {
t.mu.Lock()
defer t.mu.Unlock()
+
+ // Abort an active stream if the http2.Framer returns a
+ // http2.StreamError. This can happen only if the server's response
+ // is malformed http2.
+ if se, ok := err.(http2.StreamError); ok {
+ if s, ok := t.activeStreams[se.StreamID]; ok {
+ s.write(recvMsg{err: StreamErrorf(http2ErrConvTab[se.Code], "%v", err)})
+ return
+ }
+ }
+
// make sure t.errorChan is closed only once.
if t.state == reachable {
t.state = unreachable
diff --git a/transport/http2_server.go b/transport/http2_server.go
index b9d4959..cec441c 100644
--- a/transport/http2_server.go
+++ b/transport/http2_server.go
@@ -136,37 +136,38 @@
return t, nil
}
-// operateHeader takes action on the decoded headers. It returns the current
-// stream if there are remaining headers on the wire (in the following
-// Continuation frame).
-func (t *http2Server) operateHeaders(hDec *hpackDecoder, s *Stream, frame headerFrame, endStream bool, handle func(*Stream)) (pendingStream *Stream) {
- defer func() {
- if pendingStream == nil {
- hDec.state = decodeState{}
- }
- }()
- endHeaders, err := hDec.decodeServerHTTP2Headers(frame)
- if s == nil {
- // s has been closed.
- return nil
+// operateHeader takes action on the decoded headers.
+func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream)) {
+ buf := newRecvBuffer()
+ fc := &inFlow{
+ limit: initialWindowSize,
+ conn: t.fc,
}
- if err != nil {
- grpclog.Printf("transport: http2Server.operateHeader found %v", err)
+ s := &Stream{
+ id: frame.Header().StreamID,
+ st: t,
+ buf: buf,
+ fc: fc,
+ }
+
+ var state decodeState
+ for _, hf := range frame.Fields {
+ state.processHeaderField(hf)
+ }
+ if err := state.err; err != nil {
if se, ok := err.(StreamError); ok {
t.controlBuf.put(&resetStream{s.id, statusCodeConvTab[se.Code]})
}
- return nil
+ return
}
- if endStream {
+
+ if frame.StreamEnded() {
// s is just created by the caller. No lock needed.
s.state = streamReadDone
}
- if !endHeaders {
- return s
- }
- s.recvCompress = hDec.state.encoding
- if hDec.state.timeoutSet {
- s.ctx, s.cancel = context.WithTimeout(context.TODO(), hDec.state.timeout)
+ s.recvCompress = state.encoding
+ if state.timeoutSet {
+ s.ctx, s.cancel = context.WithTimeout(context.TODO(), state.timeout)
} else {
s.ctx, s.cancel = context.WithCancel(context.TODO())
}
@@ -183,25 +184,25 @@
// back to the client (unary call only).
s.ctx = newContextWithStream(s.ctx, s)
// Attach the received metadata to the context.
- if len(hDec.state.mdata) > 0 {
- s.ctx = metadata.NewContext(s.ctx, hDec.state.mdata)
+ if len(state.mdata) > 0 {
+ s.ctx = metadata.NewContext(s.ctx, state.mdata)
}
s.dec = &recvBufferReader{
ctx: s.ctx,
recv: s.buf,
}
- s.recvCompress = hDec.state.encoding
- s.method = hDec.state.method
+ s.recvCompress = state.encoding
+ s.method = state.method
t.mu.Lock()
if t.state != reachable {
t.mu.Unlock()
- return nil
+ return
}
if uint32(len(t.activeStreams)) >= t.maxStreams {
t.mu.Unlock()
t.controlBuf.put(&resetStream{s.id, http2.ErrCodeRefusedStream})
- return nil
+ return
}
s.sendQuotaPool = newQuotaPool(int(t.streamSendQuota))
t.activeStreams[s.id] = s
@@ -210,7 +211,6 @@
t.updateWindow(s, uint32(n))
}
handle(s)
- return nil
}
// HandleStreams receives incoming streams using the given handler. This is
@@ -243,8 +243,6 @@
}
t.handleSettings(sf)
- hDec := newHPACKDecoder()
- var curStream *Stream
for {
frame, err := t.framer.readFrame()
if err != nil {
@@ -252,7 +250,7 @@
return
}
switch frame := frame.(type) {
- case *http2.HeadersFrame:
+ case *http2.MetaHeadersFrame:
id := frame.Header().StreamID
if id%2 != 1 || id <= t.maxStreamID {
// illegal gRPC stream id.
@@ -261,21 +259,7 @@
break
}
t.maxStreamID = id
- buf := newRecvBuffer()
- fc := &inFlow{
- limit: initialWindowSize,
- conn: t.fc,
- }
- curStream = &Stream{
- id: frame.Header().StreamID,
- st: t,
- buf: buf,
- fc: fc,
- }
- endStream := frame.Header().Flags.Has(http2.FlagHeadersEndStream)
- curStream = t.operateHeaders(hDec, curStream, frame, endStream, handle)
- case *http2.ContinuationFrame:
- curStream = t.operateHeaders(hDec, curStream, frame, frame.HeadersEnded(), handle)
+ t.operateHeaders(frame, handle)
case *http2.DataFrame:
t.handleData(frame)
case *http2.RSTStreamFrame:
diff --git a/transport/http_util.go b/transport/http_util.go
index ea7b39b..bd2dc5d 100644
--- a/transport/http_util.go
+++ b/transport/http_util.go
@@ -90,6 +90,8 @@
// Records the states during HPACK decoding. Must be reset once the
// decoding of the entire headers are finished.
type decodeState struct {
+ err error // first error encountered decoding
+
encoding string
// statusCode caches the stream status received from the trailer
// the server sent. Client side only.
@@ -103,20 +105,6 @@
mdata map[string][]string
}
-// An hpackDecoder decodes HTTP2 headers which may span multiple frames.
-type hpackDecoder struct {
- h *hpack.Decoder
- state decodeState
- err error // The err when decoding
-}
-
-// A headerFrame is either a http2.HeaderFrame or http2.ContinuationFrame.
-type headerFrame interface {
- Header() http2.FrameHeader
- HeaderBlockFragment() []byte
- HeadersEnded() bool
-}
-
// isReservedHeader checks whether hdr belongs to HTTP2 headers
// reserved by gRPC protocol. Any other headers are classified as the
// user-specified metadata.
@@ -138,100 +126,62 @@
}
}
-func newHPACKDecoder() *hpackDecoder {
- d := &hpackDecoder{}
- d.h = hpack.NewDecoder(http2InitHeaderTableSize, func(f hpack.HeaderField) {
- switch f.Name {
- case "content-type":
- if !strings.Contains(f.Value, "application/grpc") {
- d.err = StreamErrorf(codes.FailedPrecondition, "transport: received the unexpected content-type %q", f.Value)
- return
- }
- case "grpc-encoding":
- d.state.encoding = f.Value
- case "grpc-status":
- code, err := strconv.Atoi(f.Value)
- if err != nil {
- d.err = StreamErrorf(codes.Internal, "transport: malformed grpc-status: %v", err)
- return
- }
- d.state.statusCode = codes.Code(code)
- case "grpc-message":
- d.state.statusDesc = f.Value
- case "grpc-timeout":
- d.state.timeoutSet = true
- var err error
- d.state.timeout, err = timeoutDecode(f.Value)
- if err != nil {
- d.err = StreamErrorf(codes.Internal, "transport: malformed time-out: %v", err)
- return
- }
- case ":path":
- d.state.method = f.Value
- default:
- if !isReservedHeader(f.Name) {
- if f.Name == "user-agent" {
- i := strings.LastIndex(f.Value, " ")
- if i == -1 {
- // There is no application user agent string being set.
- return
- }
- // Extract the application user agent string.
- f.Value = f.Value[:i]
- }
- if d.state.mdata == nil {
- d.state.mdata = make(map[string][]string)
- }
- k, v, err := metadata.DecodeKeyValue(f.Name, f.Value)
- if err != nil {
- grpclog.Printf("Failed to decode (%q, %q): %v", f.Name, f.Value, err)
+func (d *decodeState) setErr(err error) {
+ if d.err == nil {
+ d.err = err
+ }
+}
+
+func (d *decodeState) processHeaderField(f hpack.HeaderField) {
+ switch f.Name {
+ case "content-type":
+ if !strings.Contains(f.Value, "application/grpc") {
+ d.setErr(StreamErrorf(codes.FailedPrecondition, "transport: received the unexpected content-type %q", f.Value))
+ return
+ }
+ case "grpc-encoding":
+ d.encoding = f.Value
+ case "grpc-status":
+ code, err := strconv.Atoi(f.Value)
+ if err != nil {
+ d.setErr(StreamErrorf(codes.Internal, "transport: malformed grpc-status: %v", err))
+ return
+ }
+ d.statusCode = codes.Code(code)
+ case "grpc-message":
+ d.statusDesc = f.Value
+ case "grpc-timeout":
+ d.timeoutSet = true
+ var err error
+ d.timeout, err = timeoutDecode(f.Value)
+ if err != nil {
+ d.setErr(StreamErrorf(codes.Internal, "transport: malformed time-out: %v", err))
+ return
+ }
+ case ":path":
+ d.method = f.Value
+ default:
+ if !isReservedHeader(f.Name) {
+ if f.Name == "user-agent" {
+ i := strings.LastIndex(f.Value, " ")
+ if i == -1 {
+ // There is no application user agent string being set.
return
}
- d.state.mdata[k] = append(d.state.mdata[k], v)
+ // Extract the application user agent string.
+ f.Value = f.Value[:i]
}
+ if d.mdata == nil {
+ d.mdata = make(map[string][]string)
+ }
+ k, v, err := metadata.DecodeKeyValue(f.Name, f.Value)
+ if err != nil {
+ grpclog.Printf("Failed to decode (%q, %q): %v", f.Name, f.Value, err)
+ return
+ }
+ d.mdata[k] = append(d.mdata[k], v)
}
- })
- return d
-}
-
-func (d *hpackDecoder) decodeClientHTTP2Headers(frame headerFrame) (endHeaders bool, err error) {
- d.err = nil
- _, err = d.h.Write(frame.HeaderBlockFragment())
- if err != nil {
- err = StreamErrorf(codes.Internal, "transport: HPACK header decode error: %v", err)
}
-
- if frame.HeadersEnded() {
- if closeErr := d.h.Close(); closeErr != nil && err == nil {
- err = StreamErrorf(codes.Internal, "transport: HPACK decoder close error: %v", closeErr)
- }
- endHeaders = true
- }
-
- if err == nil && d.err != nil {
- err = d.err
- }
- return
-}
-
-func (d *hpackDecoder) decodeServerHTTP2Headers(frame headerFrame) (endHeaders bool, err error) {
- d.err = nil
- _, err = d.h.Write(frame.HeaderBlockFragment())
- if err != nil {
- err = StreamErrorf(codes.Internal, "transport: HPACK header decode error: %v", err)
- }
-
- if frame.HeadersEnded() {
- if closeErr := d.h.Close(); closeErr != nil && err == nil {
- err = StreamErrorf(codes.Internal, "transport: HPACK decoder close error: %v", closeErr)
- }
- endHeaders = true
- }
-
- if err == nil && d.err != nil {
- err = d.err
- }
- return
}
type timeoutUnit uint8
@@ -326,6 +276,7 @@
writer: bufio.NewWriterSize(conn, http2IOBufSize),
}
f.fr = http2.NewFramer(f.writer, f.reader)
+ f.fr.ReadMetaHeaders = hpack.NewDecoder(http2InitHeaderTableSize, nil)
return f
}