Merge pull request #409 from iamqizhao/master
fix a counting race for max streams.
diff --git a/test/end2end_test.go b/test/end2end_test.go
index 862261e..55bf2ed 100644
--- a/test/end2end_test.go
+++ b/test/end2end_test.go
@@ -1060,33 +1060,24 @@
s, cc := setUp(t, nil, 1, "", e)
tc := testpb.NewTestServiceClient(cc)
defer tearDown(s, cc)
- done := make(chan struct{})
- ch := make(chan int)
+ stream, err := tc.StreamingInputCall(context.Background())
+ if err != nil {
+ t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, <nil>", tc, err)
+ }
go func() {
- timer := time.After(5 * time.Second)
- for {
- select {
- case <-time.After(5 * time.Millisecond):
- ch <- 0
- case <-timer:
- close(done)
- return
- }
- }
+ stream.Header()
}()
- // Loop until a stream creation hangs due to the new max stream setting.
+ // Loop until receiving the new max stream setting from the server.
for {
- select {
- case <-ch:
- ctx, _ := context.WithTimeout(context.Background(), time.Second)
- if _, err := tc.StreamingInputCall(ctx); err != nil {
- if grpc.Code(err) == codes.DeadlineExceeded {
- return
- }
- t.Fatalf("%v.StreamingInputCall(_) = %v, want <nil>", tc, err)
- }
- case <-done:
- t.Fatalf("Client has not received the max stream setting in 5 seconds.")
+ ctx, _ := context.WithTimeout(context.Background(), time.Second)
+ _, err := tc.StreamingInputCall(ctx)
+ if err == nil {
+ time.Sleep(time.Second)
+ continue
}
+ if grpc.Code(err) == codes.DeadlineExceeded {
+ break
+ }
+ t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, %d", tc, err, codes.DeadlineExceeded)
}
}
diff --git a/transport/http2_client.go b/transport/http2_client.go
index 526f511..882a9bb 100644
--- a/transport/http2_client.go
+++ b/transport/http2_client.go
@@ -293,7 +293,18 @@
}
s := t.newStream(ctx, callHdr)
t.activeStreams[s.id] = s
+
+ // This stream is not counted when applySetings(...) initialize t.streamsQuota.
+ // Reset t.streamsQuota to the right value.
+ var reset bool
+ if !checkStreamsQuota && t.streamsQuota != nil {
+ reset = true
+ }
t.mu.Unlock()
+ if reset {
+ t.streamsQuota.reset(-1)
+ }
+
// HPACK encodes various headers. Note that once WriteField(...) is
// called, the corresponding headers/continuation frame has to be sent
// because hpack.Encoder is stateful.
diff --git a/transport/http2_server.go b/transport/http2_server.go
index 28b1e09..f3488f8 100644
--- a/transport/http2_server.go
+++ b/transport/http2_server.go
@@ -115,15 +115,15 @@
}
var buf bytes.Buffer
t := &http2Server{
- conn: conn,
- authInfo: authInfo,
- framer: framer,
- hBuf: &buf,
- hEnc: hpack.NewEncoder(&buf),
- maxStreams: maxStreams,
- controlBuf: newRecvBuffer(),
- fc: &inFlow{limit: initialConnWindowSize},
- sendQuotaPool: newQuotaPool(defaultWindowSize),
+ conn: conn,
+ authInfo: authInfo,
+ framer: framer,
+ hBuf: &buf,
+ hEnc: hpack.NewEncoder(&buf),
+ maxStreams: maxStreams,
+ controlBuf: newRecvBuffer(),
+ fc: &inFlow{limit: initialConnWindowSize},
+ sendQuotaPool: newQuotaPool(defaultWindowSize),
state: reachable,
writableChan: make(chan int, 1),
shutdownChan: make(chan struct{}),