internal/requestlog: prototype request log middleware

- Request/response body sizes are collected on a streaming basis
  (negligible overhead).
- Header sizes are approximate and only based on what the underlying
  handler sends or receives.  net/http may add extra headers, so headers
  will likely be undercounted.  I don't see a reliable way to address
  this.
- Includes a fluentd logger to send the forward input plugin that is
  open by default in google-fluentd.  I've integration tested this
  middleware locally.  Unit tests verify that the format is correct.
- Hijack/Websocket support is missing.  This is fine for gddo.

Change-Id: I53c006edc1c3004e08c1b7e4b68fa94240f38db6
Reviewed-on: https://go-review.googlesource.com/70790
Reviewed-by: Herbie Ong <herbie@google.com>
diff --git a/internal/requestlog/fluentd.go b/internal/requestlog/fluentd.go
new file mode 100644
index 0000000..26a3ef0
--- /dev/null
+++ b/internal/requestlog/fluentd.go
@@ -0,0 +1,110 @@
+// Copyright 2017 The Go Authors. All rights reserved.
+//
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file or at
+// https://developers.google.com/open-source/licenses/bsd.
+
+package requestlog
+
+import (
+	"bytes"
+	"encoding/json"
+	"io"
+	"strconv"
+	"sync"
+	"time"
+)
+
+// A FluentdLogger writes log entries in the Fluentd forward JSON
+// format.  The record's fields are suitable for consumption by
+// Stackdriver Logging.
+type FluentdLogger struct {
+	tag   string
+	onErr func(error)
+
+	mu  sync.Mutex
+	w   io.Writer
+	buf bytes.Buffer
+	enc *json.Encoder
+}
+
+// NewFluentdLogger returns a new logger that writes to w.
+func NewFluentdLogger(w io.Writer, tag string, onErr func(error)) *FluentdLogger {
+	l := &FluentdLogger{
+		tag:   tag,
+		w:     w,
+		onErr: onErr,
+	}
+	l.enc = json.NewEncoder(&l.buf)
+	return l
+}
+
+// Log writes a record to its writer.  Multiple concurrent calls will
+// produce sequential writes to its writer.
+func (l *FluentdLogger) Log(ent *Entry) {
+	if err := l.log(ent); err != nil {
+		l.onErr(err)
+	}
+}
+
+func (l *FluentdLogger) log(ent *Entry) error {
+	defer l.mu.Unlock()
+	l.mu.Lock()
+
+	l.buf.Reset()
+	l.buf.WriteByte('[')
+	if err := l.enc.Encode(l.tag); err != nil {
+		return err
+	}
+	l.buf.WriteByte(',')
+	t := ent.ReceivedTime.Add(ent.Latency)
+	if err := l.enc.Encode(t.Unix()); err != nil {
+		return err
+	}
+	l.buf.WriteByte(',')
+
+	var r struct {
+		HTTPRequest struct {
+			RequestMethod string `json:"requestMethod"`
+			RequestURL    string `json:"requestUrl"`
+			RequestSize   int64  `json:"requestSize,string"`
+			Status        int    `json:"status"`
+			ResponseSize  int64  `json:"responseSize,string"`
+			UserAgent     string `json:"userAgent"`
+			RemoteIP      string `json:"remoteIp"`
+			Referer       string `json:"referer"`
+			Latency       string `json:"latency"`
+		} `json:"httpRequest"`
+		Timestamp struct {
+			Seconds int64 `json:"seconds"`
+			Nanos   int   `json:"nanos"`
+		} `json:"timestamp"`
+	}
+	r.HTTPRequest.RequestMethod = ent.RequestMethod
+	r.HTTPRequest.RequestURL = ent.RequestURL
+	// TODO(light): determine whether this is the formula LogEntry expects.
+	r.HTTPRequest.RequestSize = ent.RequestHeaderSize + ent.RequestBodySize
+	r.HTTPRequest.Status = ent.Status
+	// TODO(light): determine whether this is the formula LogEntry expects.
+	r.HTTPRequest.ResponseSize = ent.ResponseHeaderSize + ent.ResponseBodySize
+	r.HTTPRequest.UserAgent = ent.UserAgent
+	r.HTTPRequest.RemoteIP = ent.RemoteIP
+	r.HTTPRequest.Referer = ent.Referer
+	r.HTTPRequest.Latency = string(appendLatency(nil, ent.Latency))
+	r.Timestamp.Seconds = t.Unix()
+	r.Timestamp.Nanos = t.Nanosecond()
+	if err := l.enc.Encode(r); err != nil {
+		return err
+	}
+	l.buf.WriteByte(']')
+	_, err := l.w.Write(l.buf.Bytes())
+	return err
+}
+
+func appendLatency(b []byte, d time.Duration) []byte {
+	// Parses format understood by google-fluentd (which is looser than the documented LogEntry format).
+	// See the comment at https://github.com/GoogleCloudPlatform/fluent-plugin-google-cloud/blob/e2f60cdd1d97e79ffe4e91bdbf6bd84837f27fa5/lib/fluent/plugin/out_google_cloud.rb#L1539
+	b = strconv.AppendFloat(b, d.Seconds(), 'f', 9, 64)
+	b = append(b, 's')
+	return b
+}
diff --git a/internal/requestlog/fluentd_test.go b/internal/requestlog/fluentd_test.go
new file mode 100644
index 0000000..8bdf243
--- /dev/null
+++ b/internal/requestlog/fluentd_test.go
@@ -0,0 +1,186 @@
+// Copyright 2017 The Go Authors. All rights reserved.
+//
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file or at
+// https://developers.google.com/open-source/licenses/bsd.
+
+package requestlog
+
+import (
+	"bytes"
+	"encoding/json"
+	"io/ioutil"
+	"strings"
+	"testing"
+	"time"
+)
+
+func TestFluentdLog(t *testing.T) {
+	const (
+		startTime      = 1507914000
+		startTimeNanos = 512
+
+		latencySec   = 5
+		latencyNanos = 123456789
+
+		endTime      = startTime + latencySec
+		endTimeNanos = startTimeNanos + latencyNanos
+	)
+	buf := new(bytes.Buffer)
+	var logErr error
+	l := NewFluentdLogger(buf, "mytag", func(e error) { logErr = e })
+	want := &Entry{
+		ReceivedTime:       time.Unix(startTime, startTimeNanos),
+		RequestMethod:      "POST",
+		RequestURL:         "/foo/bar",
+		RequestHeaderSize:  456,
+		RequestBodySize:    123000,
+		UserAgent:          "Chrome proxied through Firefox and Edge",
+		Referer:            "http://www.example.com/",
+		Proto:              "HTTP/1.1",
+		RemoteIP:           "12.34.56.78",
+		ServerIP:           "127.0.0.1",
+		Status:             404,
+		ResponseHeaderSize: 555,
+		ResponseBodySize:   789000,
+		Latency:            latencySec*time.Second + latencyNanos*time.Nanosecond,
+	}
+	ent := *want // copy in case Log accidentally mutates
+	l.Log(&ent)
+	if logErr != nil {
+		t.Error("Logger called error callback:", logErr)
+	}
+
+	var got []json.RawMessage
+	if err := json.Unmarshal(buf.Bytes(), &got); err != nil {
+		t.Fatal("Unmarshal:", err)
+	}
+
+	if len(got) == 0 {
+		t.Fatal("Message is empty; want 3 elements")
+	}
+	var tag string
+	if err := json.Unmarshal(got[0], &tag); err != nil {
+		t.Error("Unmarshal tag:", err)
+	} else if want := "mytag"; tag != want {
+		t.Errorf("tag = %q; want %q", tag, want)
+	}
+
+	if len(got) < 2 {
+		t.Fatal("Message only has 1 element; want 3 elements")
+	}
+	var timestamp int64
+	if err := json.Unmarshal(got[1], &timestamp); err != nil {
+		t.Error("Unmarshal timestamp:", err)
+	} else if want := int64(endTime); timestamp != want {
+		t.Errorf("timestamp = %d; want %d", timestamp, want)
+	}
+
+	if len(got) < 3 {
+		t.Fatal("Message only has 2 elements; want 3 elements")
+	}
+	var r map[string]interface{}
+	if err := json.Unmarshal(got[2], &r); err != nil {
+		t.Error("Unmarshal record:", err)
+	} else {
+		rr, _ := r["httpRequest"].(map[string]interface{})
+		if rr == nil {
+			t.Error("httpRequest does not exist in record or is not a JSON object")
+		}
+		if got, want := jsonString(rr, "requestMethod"), ent.RequestMethod; got != want {
+			t.Errorf("httpRequest.requestMethod = %q; want %q", got, want)
+		}
+		if got, want := jsonString(rr, "requestUrl"), ent.RequestURL; got != want {
+			t.Errorf("httpRequest.requestUrl = %q; want %q", got, want)
+		}
+		if got, want := jsonString(rr, "requestSize"), "123456"; got != want {
+			t.Errorf("httpRequest.requestSize = %q; want %q", got, want)
+		}
+		if got, want := jsonNumber(rr, "status"), float64(ent.Status); got != want {
+			t.Errorf("httpRequest.status = %d; want %d", int64(got), int64(want))
+		}
+		if got, want := jsonString(rr, "responseSize"), "789555"; got != want {
+			t.Errorf("httpRequest.responseSize = %q; want %q", got, want)
+		}
+		if got, want := jsonString(rr, "userAgent"), ent.UserAgent; got != want {
+			t.Errorf("httpRequest.userAgent = %q; want %q", got, want)
+		}
+		if got, want := jsonString(rr, "remoteIp"), ent.RemoteIP; got != want {
+			t.Errorf("httpRequest.remoteIp = %q; want %q", got, want)
+		}
+		if got, want := jsonString(rr, "referer"), ent.Referer; got != want {
+			t.Errorf("httpRequest.referer = %q; want %q", got, want)
+		}
+		if got, want := jsonString(rr, "latency"), "5.123456789"; parseLatency(got) != want {
+			t.Errorf("httpRequest.latency = %q; want %q", got, want+"s")
+		}
+		ts, _ := r["timestamp"].(map[string]interface{})
+		if ts == nil {
+			t.Error("timestamp does not exist in record or is not a JSON object")
+		}
+		if got, want := jsonNumber(ts, "seconds"), float64(endTime); got != want {
+			t.Errorf("timestamp.seconds = %g; want %g", got, want)
+		}
+		if got, want := jsonNumber(ts, "nanos"), float64(endTimeNanos); got != want {
+			t.Errorf("timestamp.nanos = %g; want %g", got, want)
+		}
+	}
+	if len(got) > 3 {
+		t.Errorf("Message has %d elements; want 3 elements", len(got))
+	}
+}
+
+func parseLatency(s string) string {
+	s = strings.TrimSpace(s)
+	if !strings.HasSuffix(s, "s") {
+		return ""
+	}
+	s = strings.TrimSpace(s[:len(s)-1])
+	for _, c := range s {
+		if !(c >= '0' && c <= '9') && c != '.' {
+			return ""
+		}
+	}
+	return s
+}
+
+func jsonString(obj map[string]interface{}, k string) string {
+	v, _ := obj[k].(string)
+	return v
+}
+
+func jsonNumber(obj map[string]interface{}, k string) float64 {
+	v, _ := obj[k].(float64)
+	return v
+}
+
+func BenchmarkFluentdLog(b *testing.B) {
+	ent := &Entry{
+		ReceivedTime:       time.Date(2017, time.October, 13, 17, 0, 0, 512, time.UTC),
+		RequestMethod:      "POST",
+		RequestURL:         "/foo/bar",
+		RequestHeaderSize:  456,
+		RequestBodySize:    123000,
+		UserAgent:          "Chrome proxied through Firefox and Edge",
+		Referer:            "http://www.example.com/",
+		Proto:              "HTTP/1.1",
+		RemoteIP:           "12.34.56.78",
+		ServerIP:           "127.0.0.1",
+		Status:             404,
+		ResponseHeaderSize: 555,
+		ResponseBodySize:   789000,
+		Latency:            5 * time.Second,
+	}
+	var buf bytes.Buffer
+	l := NewFluentdLogger(&buf, "mytag", func(error) {})
+	l.Log(ent)
+	b.ReportAllocs()
+	b.SetBytes(int64(buf.Len()))
+	buf.Reset()
+	b.ResetTimer()
+
+	l = NewFluentdLogger(ioutil.Discard, "mytag", func(error) {})
+	for i := 0; i < b.N; i++ {
+		l.Log(ent)
+	}
+}
diff --git a/internal/requestlog/requestlog.go b/internal/requestlog/requestlog.go
new file mode 100644
index 0000000..c528de5
--- /dev/null
+++ b/internal/requestlog/requestlog.go
@@ -0,0 +1,185 @@
+// Copyright 2017 The Go Authors. All rights reserved.
+//
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file or at
+// https://developers.google.com/open-source/licenses/bsd.
+
+// Package requestlog provides an http.Handler that logs information
+// about requests.
+package requestlog
+
+import (
+	"errors"
+	"io"
+	"io/ioutil"
+	"net"
+	"net/http"
+	"time"
+)
+
+// Logger wraps the Log method.  Log must be safe to call from multiple
+// goroutines.  Log must not hold onto an Entry after it returns.
+type Logger interface {
+	Log(*Entry)
+}
+
+// A Handler emits request information to a Logger.
+type Handler struct {
+	log Logger
+	h   http.Handler
+}
+
+// NewHandler returns a handler that emits information to log and calls
+// h.ServeHTTP.
+func NewHandler(log Logger, h http.Handler) *Handler {
+	return &Handler{
+		log: log,
+		h:   h,
+	}
+}
+
+// ServeHTTP calls its underlying handler's ServeHTTP method, then calls
+// Log after the handler returns.
+//
+// ServeHTTP will always consume the request body up to the first error,
+// even if the underlying handler does not.
+func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+	start := time.Now()
+	ent := &Entry{
+		ReceivedTime:      start,
+		RequestMethod:     r.Method,
+		RequestURL:        r.URL.String(),
+		RequestHeaderSize: headerSize(r.Header),
+		UserAgent:         r.UserAgent(),
+		Referer:           r.Referer(),
+		Proto:             r.Proto,
+		RemoteIP:          ipFromHostPort(r.RemoteAddr),
+	}
+	if addr, ok := r.Context().Value(http.LocalAddrContextKey).(net.Addr); ok {
+		ent.ServerIP = ipFromHostPort(addr.String())
+	}
+	r2 := new(http.Request)
+	*r2 = *r
+	rcc := &readCounterCloser{r: r.Body}
+	r2.Body = rcc
+	w2 := &responseStats{w: w}
+
+	h.h.ServeHTTP(w2, r2)
+
+	ent.Latency = time.Since(start)
+	if rcc.err == nil {
+		// If the handler hasn't encountered an error in the Body (like EOF),
+		// then consume the rest of the Body to provide an accurate rcc.n.
+		io.Copy(ioutil.Discard, rcc)
+	}
+	ent.RequestBodySize = rcc.n
+	ent.Status = w2.code
+	if ent.Status == 0 {
+		ent.Status = http.StatusOK
+	}
+	ent.ResponseHeaderSize, ent.ResponseBodySize = w2.size()
+	h.log.Log(ent)
+}
+
+// Entry records information about a completed HTTP request.
+type Entry struct {
+	ReceivedTime      time.Time
+	RequestMethod     string
+	RequestURL        string
+	RequestHeaderSize int64
+	RequestBodySize   int64
+	UserAgent         string
+	Referer           string
+	Proto             string
+
+	RemoteIP string
+	ServerIP string
+
+	Status             int
+	ResponseHeaderSize int64
+	ResponseBodySize   int64
+	Latency            time.Duration
+}
+
+func ipFromHostPort(hp string) string {
+	h, _, err := net.SplitHostPort(hp)
+	if err != nil {
+		return ""
+	}
+	if len(h) > 0 && h[0] == '[' {
+		return h[1 : len(h)-1]
+	}
+	return h
+}
+
+type readCounterCloser struct {
+	r   io.ReadCloser
+	n   int64
+	err error
+}
+
+func (rcc *readCounterCloser) Read(p []byte) (n int, err error) {
+	if rcc.err != nil {
+		return 0, rcc.err
+	}
+	n, rcc.err = rcc.r.Read(p)
+	rcc.n += int64(n)
+	return n, rcc.err
+}
+
+func (rcc *readCounterCloser) Close() error {
+	rcc.err = errors.New("read from closed reader")
+	return rcc.r.Close()
+}
+
+type writeCounter int64
+
+func (wc *writeCounter) Write(p []byte) (n int, err error) {
+	*wc += writeCounter(len(p))
+	return len(p), nil
+}
+
+func headerSize(h http.Header) int64 {
+	var wc writeCounter
+	h.Write(&wc)
+	return int64(wc) + 2 // for CRLF
+}
+
+type responseStats struct {
+	w     http.ResponseWriter
+	hsize int64
+	wc    writeCounter
+	code  int
+}
+
+func (r *responseStats) Header() http.Header {
+	return r.w.Header()
+}
+
+func (r *responseStats) WriteHeader(statusCode int) {
+	if r.code != 0 {
+		return
+	}
+	r.hsize = headerSize(r.w.Header())
+	r.w.WriteHeader(statusCode)
+	r.code = statusCode
+}
+
+func (r *responseStats) Write(p []byte) (n int, err error) {
+	if r.code == 0 {
+		r.WriteHeader(http.StatusOK)
+	}
+	n, err = r.w.Write(p)
+	r.wc.Write(p[:n])
+	return
+}
+
+func (r *responseStats) size() (hdr, body int64) {
+	if r.code == 0 {
+		return headerSize(r.w.Header()), 0
+	}
+	// Use the header size from the time WriteHeader was called.
+	// The Header map can be mutated after the call to add HTTP Trailers,
+	// which we don't want to count.
+	return r.hsize, int64(r.wc)
+}
diff --git a/internal/requestlog/requestlog_test.go b/internal/requestlog/requestlog_test.go
new file mode 100644
index 0000000..7b00e30
--- /dev/null
+++ b/internal/requestlog/requestlog_test.go
@@ -0,0 +1,92 @@
+// Copyright 2017 The Go Authors. All rights reserved.
+//
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file or at
+// https://developers.google.com/open-source/licenses/bsd.
+
+package requestlog
+
+import (
+	"fmt"
+	"io"
+	"net/http"
+	"net/http/httptest"
+	"strings"
+	"testing"
+)
+
+func TestHandler(t *testing.T) {
+	const requestMsg = "Hello, World!"
+	const responseMsg = "I see you."
+	const userAgent = "Request Log Test UA"
+	const referer = "http://www.example.com/"
+
+	r, err := http.NewRequest("POST", "http://localhost/foo", strings.NewReader(requestMsg))
+	if err != nil {
+		t.Fatal("NewRequest:", err)
+	}
+	r.Header.Set("User-Agent", userAgent)
+	r.Header.Set("Referer", referer)
+	requestHdrSize := len(fmt.Sprintf("User-Agent: %s\r\nReferer: %s\r\nContent-Length: %v\r\n", userAgent, referer, len(requestMsg)))
+	responseHdrSize := len(fmt.Sprintf("Content-Length: %v\r\n", len(responseMsg)))
+	ent, err := roundTrip(r, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+		w.Header().Set("Content-Length", fmt.Sprint(len(responseMsg)))
+		w.WriteHeader(http.StatusOK)
+		io.WriteString(w, responseMsg)
+	}))
+	if err != nil {
+		t.Fatal("Could not get entry:", err)
+	}
+	if want := "POST"; ent.RequestMethod != want {
+		t.Errorf("RequestMethod = %q; want %q", ent.RequestMethod, want)
+	}
+	if want := "/foo"; ent.RequestURL != want {
+		t.Errorf("RequestURL = %q; want %q", ent.RequestURL, want)
+	}
+	if ent.RequestHeaderSize < int64(requestHdrSize) {
+		t.Errorf("RequestHeaderSize = %d; want >=%d", ent.RequestHeaderSize, requestHdrSize)
+	}
+	if ent.RequestBodySize != int64(len(requestMsg)) {
+		t.Errorf("RequestBodySize = %d; want %d", ent.RequestBodySize, len(requestMsg))
+	}
+	if ent.UserAgent != userAgent {
+		t.Errorf("UserAgent = %q; want %q", ent.UserAgent, userAgent)
+	}
+	if ent.Referer != referer {
+		t.Errorf("Referer = %q; want %q", ent.Referer, referer)
+	}
+	if want := "HTTP/1.1"; ent.Proto != want {
+		t.Errorf("Proto = %q; want %q", ent.Proto, want)
+	}
+	if ent.Status != http.StatusOK {
+		t.Errorf("Status = %d; want %d", ent.Status, http.StatusOK)
+	}
+	if ent.ResponseHeaderSize < int64(responseHdrSize) {
+		t.Errorf("ResponseHeaderSize = %d; want >=%d", ent.ResponseHeaderSize, responseHdrSize)
+	}
+	if ent.ResponseBodySize != int64(len(responseMsg)) {
+		t.Errorf("ResponseBodySize = %d; want %d", ent.ResponseBodySize, len(responseMsg))
+	}
+}
+
+func roundTrip(r *http.Request, h http.Handler) (*Entry, error) {
+	capture := new(captureLogger)
+	hh := NewHandler(capture, h)
+	s := httptest.NewServer(hh)
+	defer s.Close()
+	r.URL.Host = s.URL[len("http://"):]
+	resp, err := http.DefaultClient.Do(r)
+	if err != nil {
+		return nil, err
+	}
+	resp.Body.Close()
+	return &capture.ent, nil
+}
+
+type captureLogger struct {
+	ent Entry
+}
+
+func (cl *captureLogger) Log(ent *Entry) {
+	cl.ent = *ent
+}