Merge pull request #571 from bradfitz/internal

Remove test-only methods from grpc package.
diff --git a/.travis.yml b/.travis.yml
index 3f83776..055d664 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -11,4 +11,3 @@
 
 script:
   - make test testrace
-  - make coverage
diff --git a/call_test.go b/call_test.go
index fb587e3..0a224f0 100644
--- a/call_test.go
+++ b/call_test.go
@@ -39,6 +39,7 @@
 	"math"
 	"net"
 	"strconv"
+	"strings"
 	"sync"
 	"testing"
 	"time"
@@ -95,7 +96,7 @@
 			return
 		}
 		if v != expectedRequest {
-			h.t.WriteStatus(s, codes.Internal, string(make([]byte, sizeLargeErr)))
+			h.t.WriteStatus(s, codes.Internal, strings.Repeat("A", sizeLargeErr))
 			return
 		}
 	}
diff --git a/test/end2end_test.go b/test/end2end_test.go
index d116455..86c921f 100644
--- a/test/end2end_test.go
+++ b/test/end2end_test.go
@@ -1807,7 +1807,9 @@
 			strings.Contains(stack, "interestingGoroutines") ||
 			strings.Contains(stack, "runtime.MHeap_Scavenger") ||
 			strings.Contains(stack, "signal.signal_recv") ||
-			strings.Contains(stack, "sigterm.handler") {
+			strings.Contains(stack, "sigterm.handler") ||
+			strings.Contains(stack, "runtime_mcall") ||
+			strings.Contains(stack, "goroutine in C code") {
 			continue
 		}
 		gs = append(gs, g)
diff --git a/transport/http2_client.go b/transport/http2_client.go
index bb72fea..1978053 100644
--- a/transport/http2_client.go
+++ b/transport/http2_client.go
@@ -550,14 +550,8 @@
 func (t *http2Client) getStream(f http2.Frame) (*Stream, bool) {
 	t.mu.Lock()
 	defer t.mu.Unlock()
-	if t.activeStreams == nil {
-		// The transport is closing.
-		return nil, false
-	}
-	if s, ok := t.activeStreams[f.Header().StreamID]; ok {
-		return s, true
-	}
-	return nil, false
+	s, ok := t.activeStreams[f.Header().StreamID]
+	return s, ok
 }
 
 // updateWindow adjusts the inbound quota for the stream and the transport.
@@ -680,54 +674,49 @@
 	}
 }
 
-// operateHeader takes action on the decoded headers. It returns the current
-// stream if there are remaining headers on the wire (in the following
-// Continuation frame).
-func (t *http2Client) operateHeaders(hDec *hpackDecoder, s *Stream, frame headerFrame, endStream bool) (pendingStream *Stream) {
-	defer func() {
-		if pendingStream == nil {
-			hDec.state = decodeState{}
-		}
-	}()
-	endHeaders, err := hDec.decodeClientHTTP2Headers(frame)
-	if s == nil {
-		// s has been closed.
-		return nil
+// operateHeaders takes action on the decoded headers.
+func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
+	s, ok := t.getStream(frame)
+	if !ok {
+		return
 	}
-	if err != nil {
-		s.write(recvMsg{err: err})
+	var state decodeState
+	for _, hf := range frame.Fields {
+		state.processHeaderField(hf)
+	}
+	if state.err != nil {
+		s.write(recvMsg{err: state.err})
 		// Something wrong. Stops reading even when there is remaining.
-		return nil
+		return
 	}
-	if !endHeaders {
-		return s
-	}
+
+	endStream := frame.StreamEnded()
+
 	s.mu.Lock()
 	if !endStream {
-		s.recvCompress = hDec.state.encoding
+		s.recvCompress = state.encoding
 	}
 	if !s.headerDone {
-		if !endStream && len(hDec.state.mdata) > 0 {
-			s.header = hDec.state.mdata
+		if !endStream && len(state.mdata) > 0 {
+			s.header = state.mdata
 		}
 		close(s.headerChan)
 		s.headerDone = true
 	}
 	if !endStream || s.state == streamDone {
 		s.mu.Unlock()
-		return nil
+		return
 	}
 
-	if len(hDec.state.mdata) > 0 {
-		s.trailer = hDec.state.mdata
+	if len(state.mdata) > 0 {
+		s.trailer = state.mdata
 	}
 	s.state = streamDone
-	s.statusCode = hDec.state.statusCode
-	s.statusDesc = hDec.state.statusDesc
+	s.statusCode = state.statusCode
+	s.statusDesc = state.statusDesc
 	s.mu.Unlock()
 
 	s.write(recvMsg{err: io.EOF})
-	return nil
 }
 
 // reader runs as a separate goroutine in charge of reading data from network
@@ -750,8 +739,6 @@
 	}
 	t.handleSettings(sf)
 
-	hDec := newHPACKDecoder()
-	var curStream *Stream
 	// loop to keep reading incoming messages on this transport.
 	for {
 		frame, err := t.framer.readFrame()
@@ -760,15 +747,8 @@
 			return
 		}
 		switch frame := frame.(type) {
-		case *http2.HeadersFrame:
-			// operateHeaders has to be invoked regardless the value of curStream
-			// because the HPACK decoder needs to be updated using the received
-			// headers.
-			curStream, _ = t.getStream(frame)
-			endStream := frame.Header().Flags.Has(http2.FlagHeadersEndStream)
-			curStream = t.operateHeaders(hDec, curStream, frame, endStream)
-		case *http2.ContinuationFrame:
-			curStream = t.operateHeaders(hDec, curStream, frame, frame.HeadersEnded())
+		case *http2.MetaHeadersFrame:
+			t.operateHeaders(frame)
 		case *http2.DataFrame:
 			t.handleData(frame)
 		case *http2.RSTStreamFrame:
@@ -866,6 +846,17 @@
 func (t *http2Client) notifyError(err error) {
 	t.mu.Lock()
 	defer t.mu.Unlock()
+
+	// Abort an active stream if the http2.Framer returns a
+	// http2.StreamError. This can happen only if the server's response
+	// is malformed http2.
+	if se, ok := err.(http2.StreamError); ok {
+		if s, ok := t.activeStreams[se.StreamID]; ok {
+			s.write(recvMsg{err: StreamErrorf(http2ErrConvTab[se.Code], "%v", err)})
+			return
+		}
+	}
+
 	// make sure t.errorChan is closed only once.
 	if t.state == reachable {
 		t.state = unreachable
diff --git a/transport/http2_server.go b/transport/http2_server.go
index b9d4959..cec441c 100644
--- a/transport/http2_server.go
+++ b/transport/http2_server.go
@@ -136,37 +136,38 @@
 	return t, nil
 }
 
-// operateHeader takes action on the decoded headers. It returns the current
-// stream if there are remaining headers on the wire (in the following
-// Continuation frame).
-func (t *http2Server) operateHeaders(hDec *hpackDecoder, s *Stream, frame headerFrame, endStream bool, handle func(*Stream)) (pendingStream *Stream) {
-	defer func() {
-		if pendingStream == nil {
-			hDec.state = decodeState{}
-		}
-	}()
-	endHeaders, err := hDec.decodeServerHTTP2Headers(frame)
-	if s == nil {
-		// s has been closed.
-		return nil
+// operateHeader takes action on the decoded headers.
+func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream)) {
+	buf := newRecvBuffer()
+	fc := &inFlow{
+		limit: initialWindowSize,
+		conn:  t.fc,
 	}
-	if err != nil {
-		grpclog.Printf("transport: http2Server.operateHeader found %v", err)
+	s := &Stream{
+		id:  frame.Header().StreamID,
+		st:  t,
+		buf: buf,
+		fc:  fc,
+	}
+
+	var state decodeState
+	for _, hf := range frame.Fields {
+		state.processHeaderField(hf)
+	}
+	if err := state.err; err != nil {
 		if se, ok := err.(StreamError); ok {
 			t.controlBuf.put(&resetStream{s.id, statusCodeConvTab[se.Code]})
 		}
-		return nil
+		return
 	}
-	if endStream {
+
+	if frame.StreamEnded() {
 		// s is just created by the caller. No lock needed.
 		s.state = streamReadDone
 	}
-	if !endHeaders {
-		return s
-	}
-	s.recvCompress = hDec.state.encoding
-	if hDec.state.timeoutSet {
-		s.ctx, s.cancel = context.WithTimeout(context.TODO(), hDec.state.timeout)
+	s.recvCompress = state.encoding
+	if state.timeoutSet {
+		s.ctx, s.cancel = context.WithTimeout(context.TODO(), state.timeout)
 	} else {
 		s.ctx, s.cancel = context.WithCancel(context.TODO())
 	}
@@ -183,25 +184,25 @@
 	// back to the client (unary call only).
 	s.ctx = newContextWithStream(s.ctx, s)
 	// Attach the received metadata to the context.
-	if len(hDec.state.mdata) > 0 {
-		s.ctx = metadata.NewContext(s.ctx, hDec.state.mdata)
+	if len(state.mdata) > 0 {
+		s.ctx = metadata.NewContext(s.ctx, state.mdata)
 	}
 
 	s.dec = &recvBufferReader{
 		ctx:  s.ctx,
 		recv: s.buf,
 	}
-	s.recvCompress = hDec.state.encoding
-	s.method = hDec.state.method
+	s.recvCompress = state.encoding
+	s.method = state.method
 	t.mu.Lock()
 	if t.state != reachable {
 		t.mu.Unlock()
-		return nil
+		return
 	}
 	if uint32(len(t.activeStreams)) >= t.maxStreams {
 		t.mu.Unlock()
 		t.controlBuf.put(&resetStream{s.id, http2.ErrCodeRefusedStream})
-		return nil
+		return
 	}
 	s.sendQuotaPool = newQuotaPool(int(t.streamSendQuota))
 	t.activeStreams[s.id] = s
@@ -210,7 +211,6 @@
 		t.updateWindow(s, uint32(n))
 	}
 	handle(s)
-	return nil
 }
 
 // HandleStreams receives incoming streams using the given handler. This is
@@ -243,8 +243,6 @@
 	}
 	t.handleSettings(sf)
 
-	hDec := newHPACKDecoder()
-	var curStream *Stream
 	for {
 		frame, err := t.framer.readFrame()
 		if err != nil {
@@ -252,7 +250,7 @@
 			return
 		}
 		switch frame := frame.(type) {
-		case *http2.HeadersFrame:
+		case *http2.MetaHeadersFrame:
 			id := frame.Header().StreamID
 			if id%2 != 1 || id <= t.maxStreamID {
 				// illegal gRPC stream id.
@@ -261,21 +259,7 @@
 				break
 			}
 			t.maxStreamID = id
-			buf := newRecvBuffer()
-			fc := &inFlow{
-				limit: initialWindowSize,
-				conn:  t.fc,
-			}
-			curStream = &Stream{
-				id:  frame.Header().StreamID,
-				st:  t,
-				buf: buf,
-				fc:  fc,
-			}
-			endStream := frame.Header().Flags.Has(http2.FlagHeadersEndStream)
-			curStream = t.operateHeaders(hDec, curStream, frame, endStream, handle)
-		case *http2.ContinuationFrame:
-			curStream = t.operateHeaders(hDec, curStream, frame, frame.HeadersEnded(), handle)
+			t.operateHeaders(frame, handle)
 		case *http2.DataFrame:
 			t.handleData(frame)
 		case *http2.RSTStreamFrame:
diff --git a/transport/http_util.go b/transport/http_util.go
index ea7b39b..bd2dc5d 100644
--- a/transport/http_util.go
+++ b/transport/http_util.go
@@ -90,6 +90,8 @@
 // Records the states during HPACK decoding. Must be reset once the
 // decoding of the entire headers are finished.
 type decodeState struct {
+	err error // first error encountered decoding
+
 	encoding string
 	// statusCode caches the stream status received from the trailer
 	// the server sent. Client side only.
@@ -103,20 +105,6 @@
 	mdata map[string][]string
 }
 
-// An hpackDecoder decodes HTTP2 headers which may span multiple frames.
-type hpackDecoder struct {
-	h     *hpack.Decoder
-	state decodeState
-	err   error // The err when decoding
-}
-
-// A headerFrame is either a http2.HeaderFrame or http2.ContinuationFrame.
-type headerFrame interface {
-	Header() http2.FrameHeader
-	HeaderBlockFragment() []byte
-	HeadersEnded() bool
-}
-
 // isReservedHeader checks whether hdr belongs to HTTP2 headers
 // reserved by gRPC protocol. Any other headers are classified as the
 // user-specified metadata.
@@ -138,100 +126,62 @@
 	}
 }
 
-func newHPACKDecoder() *hpackDecoder {
-	d := &hpackDecoder{}
-	d.h = hpack.NewDecoder(http2InitHeaderTableSize, func(f hpack.HeaderField) {
-		switch f.Name {
-		case "content-type":
-			if !strings.Contains(f.Value, "application/grpc") {
-				d.err = StreamErrorf(codes.FailedPrecondition, "transport: received the unexpected content-type %q", f.Value)
-				return
-			}
-		case "grpc-encoding":
-			d.state.encoding = f.Value
-		case "grpc-status":
-			code, err := strconv.Atoi(f.Value)
-			if err != nil {
-				d.err = StreamErrorf(codes.Internal, "transport: malformed grpc-status: %v", err)
-				return
-			}
-			d.state.statusCode = codes.Code(code)
-		case "grpc-message":
-			d.state.statusDesc = f.Value
-		case "grpc-timeout":
-			d.state.timeoutSet = true
-			var err error
-			d.state.timeout, err = timeoutDecode(f.Value)
-			if err != nil {
-				d.err = StreamErrorf(codes.Internal, "transport: malformed time-out: %v", err)
-				return
-			}
-		case ":path":
-			d.state.method = f.Value
-		default:
-			if !isReservedHeader(f.Name) {
-				if f.Name == "user-agent" {
-					i := strings.LastIndex(f.Value, " ")
-					if i == -1 {
-						// There is no application user agent string being set.
-						return
-					}
-					// Extract the application user agent string.
-					f.Value = f.Value[:i]
-				}
-				if d.state.mdata == nil {
-					d.state.mdata = make(map[string][]string)
-				}
-				k, v, err := metadata.DecodeKeyValue(f.Name, f.Value)
-				if err != nil {
-					grpclog.Printf("Failed to decode (%q, %q): %v", f.Name, f.Value, err)
+func (d *decodeState) setErr(err error) {
+	if d.err == nil {
+		d.err = err
+	}
+}
+
+func (d *decodeState) processHeaderField(f hpack.HeaderField) {
+	switch f.Name {
+	case "content-type":
+		if !strings.Contains(f.Value, "application/grpc") {
+			d.setErr(StreamErrorf(codes.FailedPrecondition, "transport: received the unexpected content-type %q", f.Value))
+			return
+		}
+	case "grpc-encoding":
+		d.encoding = f.Value
+	case "grpc-status":
+		code, err := strconv.Atoi(f.Value)
+		if err != nil {
+			d.setErr(StreamErrorf(codes.Internal, "transport: malformed grpc-status: %v", err))
+			return
+		}
+		d.statusCode = codes.Code(code)
+	case "grpc-message":
+		d.statusDesc = f.Value
+	case "grpc-timeout":
+		d.timeoutSet = true
+		var err error
+		d.timeout, err = timeoutDecode(f.Value)
+		if err != nil {
+			d.setErr(StreamErrorf(codes.Internal, "transport: malformed time-out: %v", err))
+			return
+		}
+	case ":path":
+		d.method = f.Value
+	default:
+		if !isReservedHeader(f.Name) {
+			if f.Name == "user-agent" {
+				i := strings.LastIndex(f.Value, " ")
+				if i == -1 {
+					// There is no application user agent string being set.
 					return
 				}
-				d.state.mdata[k] = append(d.state.mdata[k], v)
+				// Extract the application user agent string.
+				f.Value = f.Value[:i]
 			}
+			if d.mdata == nil {
+				d.mdata = make(map[string][]string)
+			}
+			k, v, err := metadata.DecodeKeyValue(f.Name, f.Value)
+			if err != nil {
+				grpclog.Printf("Failed to decode (%q, %q): %v", f.Name, f.Value, err)
+				return
+			}
+			d.mdata[k] = append(d.mdata[k], v)
 		}
-	})
-	return d
-}
-
-func (d *hpackDecoder) decodeClientHTTP2Headers(frame headerFrame) (endHeaders bool, err error) {
-	d.err = nil
-	_, err = d.h.Write(frame.HeaderBlockFragment())
-	if err != nil {
-		err = StreamErrorf(codes.Internal, "transport: HPACK header decode error: %v", err)
 	}
-
-	if frame.HeadersEnded() {
-		if closeErr := d.h.Close(); closeErr != nil && err == nil {
-			err = StreamErrorf(codes.Internal, "transport: HPACK decoder close error: %v", closeErr)
-		}
-		endHeaders = true
-	}
-
-	if err == nil && d.err != nil {
-		err = d.err
-	}
-	return
-}
-
-func (d *hpackDecoder) decodeServerHTTP2Headers(frame headerFrame) (endHeaders bool, err error) {
-	d.err = nil
-	_, err = d.h.Write(frame.HeaderBlockFragment())
-	if err != nil {
-		err = StreamErrorf(codes.Internal, "transport: HPACK header decode error: %v", err)
-	}
-
-	if frame.HeadersEnded() {
-		if closeErr := d.h.Close(); closeErr != nil && err == nil {
-			err = StreamErrorf(codes.Internal, "transport: HPACK decoder close error: %v", closeErr)
-		}
-		endHeaders = true
-	}
-
-	if err == nil && d.err != nil {
-		err = d.err
-	}
-	return
 }
 
 type timeoutUnit uint8
@@ -326,6 +276,7 @@
 		writer: bufio.NewWriterSize(conn, http2IOBufSize),
 	}
 	f.fr = http2.NewFramer(f.writer, f.reader)
+	f.fr.ReadMetaHeaders = hpack.NewDecoder(http2InitHeaderTableSize, nil)
 	return f
 }