|  | // Copyright 2017 The Go Authors. All rights reserved. | 
|  | // Use of this source code is governed by a BSD-style | 
|  | // license that can be found in the LICENSE file. | 
|  |  | 
|  | // Package gcslog is an implementation of maintner.MutationSource and Logger for Google Cloud Storage. | 
|  | package gcslog | 
|  |  | 
|  | import ( | 
|  | "bytes" | 
|  | "context" | 
|  | "crypto/sha256" | 
|  | "encoding/json" | 
|  | "fmt" | 
|  | "hash" | 
|  | "io" | 
|  | "log" | 
|  | "net/http" | 
|  | "path" | 
|  | "regexp" | 
|  | "sort" | 
|  | "strconv" | 
|  | "strings" | 
|  | "sync" | 
|  | "time" | 
|  |  | 
|  | "cloud.google.com/go/storage" | 
|  | "github.com/golang/protobuf/proto" | 
|  | "golang.org/x/build/maintner" | 
|  | "golang.org/x/build/maintner/maintpb" | 
|  | "golang.org/x/build/maintner/reclog" | 
|  | "google.golang.org/api/iterator" | 
|  | ) | 
|  |  | 
|  | // targetObjectSize is the goal maximum size for each log segment on | 
|  | // GCS. In the unlikely case that a single record is larger than this | 
|  | // then a segment would be bigger than this (records are never split | 
|  | // between log segments).  But otherwise this is the max size. | 
|  | const targetObjectSize = 16 << 20 | 
|  |  | 
|  | const flushInterval = 10 * time.Minute | 
|  |  | 
|  | // GCSLog implements MutationLogger and MutationSource. | 
|  | var _ maintner.MutationLogger = &GCSLog{} | 
|  | var _ maintner.MutationSource = &GCSLog{} | 
|  |  | 
|  | // GCSLog logs mutations to GCS. | 
|  | type GCSLog struct { | 
|  | sc            *storage.Client | 
|  | bucketName    string | 
|  | bucket        *storage.BucketHandle | 
|  | segmentPrefix string | 
|  | debug         bool | 
|  |  | 
|  | mu         sync.Mutex // guards the following | 
|  | cond       *sync.Cond | 
|  | seg        map[int]gcsLogSegment | 
|  | curNum     int | 
|  | logBuf     bytes.Buffer | 
|  | logSHA224  hash.Hash | 
|  | flushTimer *time.Timer // non-nil if flush timer is active | 
|  | } | 
|  |  | 
|  | type gcsLogSegment struct { | 
|  | num     int // starting with 0 | 
|  | size    int64 | 
|  | sha224  string // in lowercase hex | 
|  | created time.Time | 
|  | } | 
|  |  | 
|  | func (s gcsLogSegment) ObjectName() string { | 
|  | return fmt.Sprintf("%04d.%s.mutlog", s.num, s.sha224) | 
|  | } | 
|  |  | 
|  | func (s gcsLogSegment) String() string { | 
|  | return fmt.Sprintf("{gcsLogSegment num=%v, size=%v, sha=%v, created=%v}", s.num, s.size, s.sha224, s.created.Format(time.RFC3339)) | 
|  | } | 
|  |  | 
|  | // newGCSLogBase returns a new gcsLog instance without any association | 
|  | // with Google Cloud Storage. | 
|  | func newGCSLogBase() *GCSLog { | 
|  | gl := &GCSLog{ | 
|  | seg: map[int]gcsLogSegment{}, | 
|  | } | 
|  | gl.cond = sync.NewCond(&gl.mu) | 
|  | return gl | 
|  | } | 
|  |  | 
|  | // NewGCSLog creates a GCSLog that logs mutations to a given GCS bucket. | 
|  | // If the bucket name contains a "/", the part after the slash will be a | 
|  | // prefix for the segments. | 
|  | func NewGCSLog(ctx context.Context, bucketName string) (*GCSLog, error) { | 
|  | sc, err := storage.NewClient(ctx) | 
|  | if err != nil { | 
|  | return nil, fmt.Errorf("storage.NewClient: %v", err) | 
|  | } | 
|  | gl := newGCSLogBase() | 
|  | gl.sc = sc | 
|  |  | 
|  | prefix := "" | 
|  | if f := strings.SplitN(bucketName, "/", 2); len(f) > 1 { | 
|  | bucketName, prefix = f[0], f[1] | 
|  | } | 
|  |  | 
|  | gl.bucketName = bucketName | 
|  | gl.segmentPrefix = prefix | 
|  | gl.bucket = sc.Bucket(gl.bucketName) | 
|  | if err := gl.initLoad(ctx); err != nil { | 
|  | return nil, err | 
|  | } | 
|  | return gl, nil | 
|  | } | 
|  |  | 
|  | // objNameRx is used to identify a mutation log file by suffix. | 
|  | var objnameRx = regexp.MustCompile(`(\d{4})\.([0-9a-f]{56})\.mutlog$`) | 
|  |  | 
|  | func (gl *GCSLog) initLoad(ctx context.Context) error { | 
|  | it := gl.bucket.Objects(ctx, nil) | 
|  | maxNum := 0 | 
|  | for { | 
|  | objAttrs, err := it.Next() | 
|  | if err == iterator.Done { | 
|  | break | 
|  | } | 
|  | if err != nil { | 
|  | return fmt.Errorf("iterating over %s bucket: %v", gl.bucketName, err) | 
|  | } | 
|  | if !strings.HasPrefix(objAttrs.Name, gl.segmentPrefix) { | 
|  | log.Printf("Ignoring GCS object with invalid prefix %q", objAttrs.Name) | 
|  | continue | 
|  | } | 
|  | m := objnameRx.FindStringSubmatch(objAttrs.Name) | 
|  | if m == nil { | 
|  | log.Printf("Ignoring unrecognized GCS object %q", objAttrs.Name) | 
|  | continue | 
|  | } | 
|  | n, _ := strconv.ParseInt(m[1], 10, 32) | 
|  | seg := gcsLogSegment{ | 
|  | num:     int(n), | 
|  | sha224:  m[2], | 
|  | size:    objAttrs.Size, | 
|  | created: objAttrs.Created, | 
|  | } | 
|  | if seg.num > maxNum { | 
|  | maxNum = seg.num | 
|  | } | 
|  | if prevSeg, ok := gl.seg[int(n)]; !ok || prevSeg.created.Before(seg.created) { | 
|  | gl.seg[int(n)] = seg | 
|  | log.Printf("seg[%v] = %s", n, seg) | 
|  | if ok { | 
|  | gl.deleteOldSegment(ctx, gl.objectPath(prevSeg)) | 
|  | } | 
|  | } | 
|  | } | 
|  | gl.curNum = maxNum | 
|  |  | 
|  | if len(gl.seg) == 0 { | 
|  | return nil | 
|  | } | 
|  |  | 
|  | // Check for any missing segments. | 
|  | for i := 0; i < maxNum; i++ { | 
|  | if _, ok := gl.seg[i]; !ok { | 
|  | return fmt.Errorf("saw max segment number %d but missing segment %d", maxNum, i) | 
|  | } | 
|  | } | 
|  |  | 
|  | // Should we resume writing to the latest entry? | 
|  | // If the latest one is big enough, leave it be. | 
|  | // Otherwise slurp it in and we'll append to it. | 
|  | if gl.seg[maxNum].size >= targetObjectSize-(4<<10) { | 
|  | gl.curNum++ | 
|  | return nil | 
|  | } | 
|  |  | 
|  | r, err := gl.bucket.Object(gl.objectPath(gl.seg[maxNum])).NewReader(ctx) | 
|  | if err != nil { | 
|  | return err | 
|  | } | 
|  | defer r.Close() | 
|  | if _, err = io.Copy(gcsLogWriter{gl}, r); err != nil { | 
|  | return err | 
|  | } | 
|  |  | 
|  | return nil | 
|  | } | 
|  |  | 
|  | func (gl *GCSLog) objectPath(seg gcsLogSegment) string { | 
|  | return path.Join(gl.segmentPrefix, seg.ObjectName()) | 
|  | } | 
|  |  | 
|  | func (gl *GCSLog) serveLogFile(w http.ResponseWriter, r *http.Request) { | 
|  | if r.Method != "GET" && r.Method != "HEAD" { | 
|  | http.Error(w, "bad method", http.StatusBadRequest) | 
|  | return | 
|  | } | 
|  |  | 
|  | num, err := strconv.Atoi(strings.TrimPrefix(r.URL.Path, "/logs/")) | 
|  | if err != nil { | 
|  | http.Error(w, "bad path", http.StatusBadRequest) | 
|  | return | 
|  | } | 
|  |  | 
|  | gl.mu.Lock() | 
|  | if num > gl.curNum { | 
|  | gl.mu.Unlock() | 
|  | http.Error(w, "bad segment number", http.StatusBadRequest) | 
|  | return | 
|  | } | 
|  | if num != gl.curNum { | 
|  | obj := gl.objectPath(gl.seg[num]) | 
|  | gl.mu.Unlock() | 
|  | http.Redirect(w, r, "https://storage.googleapis.com/"+gl.bucketName+"/"+obj, http.StatusFound) | 
|  | return | 
|  | } | 
|  | content := gl.logBuf.String() | 
|  |  | 
|  | gl.mu.Unlock() | 
|  | w.Header().Set("Content-Type", "application/octet-stream") | 
|  | http.ServeContent(w, r, "", time.Time{}, strings.NewReader(content)) | 
|  | } | 
|  |  | 
|  | func (gl *GCSLog) serveJSONLogsIndex(w http.ResponseWriter, r *http.Request) { | 
|  | if r.Method != "GET" && r.Method != "HEAD" { | 
|  | http.Error(w, "bad method", http.StatusBadRequest) | 
|  | return | 
|  | } | 
|  |  | 
|  | startSeg, _ := strconv.Atoi(r.FormValue("startseg")) | 
|  | if startSeg < 0 { | 
|  | http.Error(w, "bad startseg", http.StatusBadRequest) | 
|  | return | 
|  | } | 
|  |  | 
|  | // Long poll if request contains non-zero waitsizenot parameter. | 
|  | // The client's provided 'waitsizenot' value is the sum of the segment | 
|  | // sizes they already know. They're waiting for something new. | 
|  | if s := r.FormValue("waitsizenot"); s != "" { | 
|  | oldSize, err := strconv.ParseInt(s, 10, 64) | 
|  | if err != nil || oldSize < 0 { | 
|  | http.Error(w, "bad waitsizenot", http.StatusBadRequest) | 
|  | return | 
|  | } | 
|  | // Return a 304 if there's no activity in just under a minute. | 
|  | // This keeps some occasional activity on the TCP connection | 
|  | // so we (and any proxies) know it's alive, and can fit | 
|  | // within reasonable read/write deadlines on either side. | 
|  | ctx, cancel := context.WithTimeout(r.Context(), 55*time.Second) | 
|  | defer cancel() | 
|  | changed := gl.waitSizeNot(ctx, oldSize) | 
|  | if !changed { | 
|  | w.WriteHeader(http.StatusNotModified) | 
|  | return | 
|  | } | 
|  | } | 
|  |  | 
|  | segs := gl.getJSONLogs(startSeg) | 
|  |  | 
|  | w.Header().Set("Content-Type", "application/json") | 
|  | w.Header().Set("X-Sum-Segment-Size", fmt.Sprint(sumSegmentSizes(segs))) | 
|  |  | 
|  | body, _ := json.MarshalIndent(segs, "", "\t") | 
|  | w.Write(body) | 
|  | } | 
|  |  | 
|  | // sumSegmentSizes returns the sum of each seg.Size in segs. | 
|  | func sumSegmentSizes(segs []maintner.LogSegmentJSON) (sum int64) { | 
|  | for _, seg := range segs { | 
|  | sum += seg.Size | 
|  | } | 
|  | return sum | 
|  | } | 
|  |  | 
|  | // waitSizeNot blocks until the sum of GCSLog is not v, or the context expires. | 
|  | // It reports whether the size changed. | 
|  | func (gl *GCSLog) waitSizeNot(ctx context.Context, v int64) (changed bool) { | 
|  | returned := make(chan struct{}) | 
|  | defer close(returned) | 
|  | go gl.waitSizeNotAwaitContextOrChange(ctx, returned) | 
|  | gl.mu.Lock() | 
|  | defer gl.mu.Unlock() | 
|  | for { | 
|  | if curSize := gl.sumSizeLocked(); curSize != v { | 
|  | if gl.debug { | 
|  | log.Printf("gcslog: waitSize fired. from %d => %d", v, curSize) | 
|  | } | 
|  | return true | 
|  | } | 
|  | select { | 
|  | case <-ctx.Done(): | 
|  | return false | 
|  | default: | 
|  | gl.cond.Wait() | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | // waitSizeNotAwaitContextOrChange is part of waitSizeNot. | 
|  | // It's a goroutine that selects on two channels and calls | 
|  | // sync.Cond.Broadcast to wake up the waitSizeNot waiter if the | 
|  | // context expires. | 
|  | func (gl *GCSLog) waitSizeNotAwaitContextOrChange(ctx context.Context, returned <-chan struct{}) { | 
|  | select { | 
|  | case <-ctx.Done(): | 
|  | gl.cond.Broadcast() | 
|  | case <-returned: | 
|  | // No need to do a wakeup. Caller is already gone. | 
|  | } | 
|  | } | 
|  |  | 
|  | func (gl *GCSLog) sumSizeLocked() int64 { | 
|  | var sum int64 | 
|  | for n, seg := range gl.seg { | 
|  | if n != gl.curNum { | 
|  | sum += seg.size | 
|  | } | 
|  | } | 
|  | sum += int64(gl.logBuf.Len()) | 
|  | return sum | 
|  | } | 
|  |  | 
|  | func (gl *GCSLog) getJSONLogs(startSeg int) (segs []maintner.LogSegmentJSON) { | 
|  | gl.mu.Lock() | 
|  | defer gl.mu.Unlock() | 
|  | if startSeg > gl.curNum || startSeg < 0 { | 
|  | startSeg = 0 | 
|  | } | 
|  | segs = make([]maintner.LogSegmentJSON, 0, gl.curNum-startSeg) | 
|  | for i := startSeg; i < gl.curNum; i++ { | 
|  | seg := gl.seg[i] | 
|  | segs = append(segs, maintner.LogSegmentJSON{ | 
|  | Number: i, | 
|  | Size:   seg.size, | 
|  | SHA224: seg.sha224, | 
|  | URL:    fmt.Sprintf("https://storage.googleapis.com/%s/%s", gl.bucketName, gl.objectPath(seg)), | 
|  | }) | 
|  | } | 
|  | if gl.logBuf.Len() > 0 { | 
|  | segs = append(segs, maintner.LogSegmentJSON{ | 
|  | Number: gl.curNum, | 
|  | Size:   int64(gl.logBuf.Len()), | 
|  | SHA224: fmt.Sprintf("%x", gl.logSHA224.Sum(nil)), | 
|  | URL:    fmt.Sprintf("/logs/%d", gl.curNum), | 
|  | }) | 
|  | } | 
|  | return | 
|  | } | 
|  |  | 
|  | // gcsLogWriter is the io.Writer used to write to GCSLog.logBuf. It | 
|  | // keeps the sha224 in sync. Caller must hold gl.mu. | 
|  | type gcsLogWriter struct{ gl *GCSLog } | 
|  |  | 
|  | func (w gcsLogWriter) Write(p []byte) (n int, err error) { | 
|  | gl := w.gl | 
|  | if gl.logBuf.Len() == 0 { | 
|  | gl.logSHA224 = sha256.New224() | 
|  | } | 
|  | n, err = gl.logSHA224.Write(p) | 
|  | if n != len(p) || err != nil { | 
|  | panic(fmt.Sprintf("unexpected write (%v, %v) for %v bytes", n, err, len(p))) | 
|  | } | 
|  | n, err = gl.logBuf.Write(p) | 
|  | if n != len(p) || err != nil { | 
|  | panic(fmt.Sprintf("unexpected write (%v, %v) for %v bytes", n, err, len(p))) | 
|  | } | 
|  | return len(p), nil | 
|  | } | 
|  |  | 
|  | // SetDebug controls whether verbose debugging is enabled on this log. | 
|  | // | 
|  | // It must only be called before it's used. | 
|  | func (gl *GCSLog) SetDebug(v bool) { gl.debug = v } | 
|  |  | 
|  | // Log writes m to GCS after the buffer is full or after a periodic flush. | 
|  | func (gl *GCSLog) Log(m *maintpb.Mutation) error { | 
|  | data, err := proto.Marshal(m) | 
|  | if err != nil { | 
|  | return err | 
|  | } | 
|  |  | 
|  | gl.mu.Lock() | 
|  | defer gl.mu.Unlock() | 
|  |  | 
|  | // If we have some data and this item would push us over, flush. | 
|  | if gl.logBuf.Len()+len(data) > targetObjectSize { | 
|  | log.Printf("Log: record requires buffer flush.") | 
|  | if err := gl.flushLocked(context.TODO()); err != nil { | 
|  | return err | 
|  | } | 
|  | gl.curNum++ | 
|  | gl.logBuf.Reset() | 
|  | log.Printf("cur log file now %d", gl.curNum) | 
|  | } | 
|  |  | 
|  | if err := reclog.WriteRecord(gcsLogWriter{gl}, int64(gl.logBuf.Len()), data); err != nil { | 
|  | return err | 
|  | } | 
|  | gl.cond.Broadcast() // wake any long-polling subscribers | 
|  |  | 
|  | // Otherwise schedule a periodic flush. | 
|  | if gl.flushTimer == nil { | 
|  | log.Printf("wrote record; flush timer registered.") | 
|  | gl.flushTimer = time.AfterFunc(flushInterval, gl.onFlushTimer) | 
|  | } else { | 
|  | log.Printf("wrote record; using existing flush timer.") | 
|  | } | 
|  | return nil | 
|  | } | 
|  |  | 
|  | func (gl *GCSLog) onFlushTimer() { | 
|  | log.Printf("flush timer fired.") | 
|  | ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) | 
|  | defer cancel() | 
|  | gl.flush(ctx) | 
|  | } | 
|  |  | 
|  | func (gl *GCSLog) flush(ctx context.Context) error { | 
|  | gl.mu.Lock() | 
|  |  | 
|  | defer gl.mu.Unlock() | 
|  | if gl.flushTimer != nil { | 
|  | gl.flushTimer.Stop() | 
|  | gl.flushTimer = nil | 
|  | } | 
|  |  | 
|  | if err := gl.flushLocked(ctx); err != nil { | 
|  | gl.flushTimer = time.AfterFunc(1*time.Minute, gl.onFlushTimer) | 
|  | log.Printf("Error background flushing: %v", err) | 
|  | return err | 
|  | } | 
|  |  | 
|  | return nil | 
|  | } | 
|  |  | 
|  | func (gl *GCSLog) flushLocked(ctx context.Context) error { | 
|  | buf := gl.logBuf.Bytes() | 
|  | if len(buf) == 0 { | 
|  | return nil | 
|  | } | 
|  | seg := gcsLogSegment{ | 
|  | num:    gl.curNum, | 
|  | sha224: fmt.Sprintf("%x", sha256.Sum224(buf)), | 
|  | size:   int64(len(buf)), | 
|  | } | 
|  | objName := gl.objectPath(seg) | 
|  | log.Printf("flushing %s (%d bytes)", objName, len(buf)) | 
|  | err := try(4, time.Second, func() error { | 
|  | w := gl.bucket.Object(objName).NewWriter(ctx) | 
|  | w.ContentType = "application/octet-stream" | 
|  | if _, err := w.Write(buf); err != nil { | 
|  | return err | 
|  | } | 
|  | if err := w.Close(); err != nil { | 
|  | return err | 
|  | } | 
|  | attrs := w.Attrs() | 
|  | seg.created = attrs.Created | 
|  | return nil | 
|  | }) | 
|  | if err != nil { | 
|  | return err | 
|  | } | 
|  |  | 
|  | old := gl.seg[seg.num] | 
|  | gl.seg[seg.num] = seg | 
|  |  | 
|  | // Atomically update the manifest file. If we lose the CAS | 
|  | // race, that means some other instance of this process is | 
|  | // running at the same time (e.g. accidental replica count > 1 | 
|  | // in k8s config) and we should fail hard. | 
|  | // TODO: that^ | 
|  |  | 
|  | // Delete any old segment from the same position. | 
|  | if old.sha224 != "" && old.sha224 != seg.sha224 { | 
|  | gl.deleteOldSegment(ctx, gl.objectPath(old)) | 
|  | } | 
|  | return nil | 
|  | } | 
|  |  | 
|  | func (gl *GCSLog) deleteOldSegment(ctx context.Context, objName string) { | 
|  | err := gl.bucket.Object(objName).Delete(ctx) | 
|  | if err != nil { | 
|  | // Can ignore, though. Probably emphemeral, and not critical. | 
|  | // It'll be deleted by new versions or next start-up anyway. | 
|  | log.Printf("Warning: error deleting old segment version %v: %v", objName, err) | 
|  | } else { | 
|  | log.Printf("deleted old segment version %v", objName) | 
|  | } | 
|  | } | 
|  |  | 
|  | func (gl *GCSLog) objectNames() (names []string) { | 
|  | gl.mu.Lock() | 
|  | defer gl.mu.Unlock() | 
|  | for _, seg := range gl.seg { | 
|  | names = append(names, gl.objectPath(seg)) | 
|  | } | 
|  | sort.Strings(names) | 
|  | return | 
|  | } | 
|  |  | 
|  | func (gl *GCSLog) foreachSegmentReader(ctx context.Context, fn func(r io.Reader) error) error { | 
|  | objs := gl.objectNames() | 
|  | for i, obj := range objs { | 
|  | log.Printf("Reading %d/%d: %s ...", i+1, len(objs), obj) | 
|  | rd, err := gl.bucket.Object(obj).NewReader(ctx) | 
|  | if err != nil { | 
|  | return fmt.Errorf("failed to open %v: %v", obj, err) | 
|  | } | 
|  | err = fn(rd) | 
|  | rd.Close() | 
|  | if err != nil { | 
|  | return fmt.Errorf("error processing %v: %v", obj, err) | 
|  | } | 
|  | } | 
|  | return nil | 
|  | } | 
|  |  | 
|  | // GetMutations returns a channel of mutations or related events. | 
|  | // The channel will never be closed. | 
|  | // All sends on the returned channel should select on the provided context. | 
|  | func (gl *GCSLog) GetMutations(ctx context.Context) <-chan maintner.MutationStreamEvent { | 
|  | ch := make(chan maintner.MutationStreamEvent, 50) // buffered: overlap gunzip/unmarshal with loading | 
|  | go func() { | 
|  | err := gl.foreachSegmentReader(ctx, func(r io.Reader) error { | 
|  | return reclog.ForeachRecord(r, 0, func(off int64, hdr, rec []byte) error { | 
|  | m := new(maintpb.Mutation) | 
|  | if err := proto.Unmarshal(rec, m); err != nil { | 
|  | return err | 
|  | } | 
|  | select { | 
|  | case ch <- maintner.MutationStreamEvent{Mutation: m}: | 
|  | return nil | 
|  | case <-ctx.Done(): | 
|  | return ctx.Err() | 
|  | } | 
|  | }) | 
|  | }) | 
|  | final := maintner.MutationStreamEvent{Err: err} | 
|  | if err == nil { | 
|  | final.End = true | 
|  | } | 
|  | select { | 
|  | case ch <- final: | 
|  | case <-ctx.Done(): | 
|  | } | 
|  | }() | 
|  | return ch | 
|  | } | 
|  |  | 
|  | func try(tries int, firstDelay time.Duration, fn func() error) error { | 
|  | var err error | 
|  | delay := firstDelay | 
|  | for i := 0; i < tries; i++ { | 
|  | err = fn() | 
|  | if err == nil { | 
|  | return nil | 
|  | } | 
|  | time.Sleep(delay) | 
|  | delay *= 2 | 
|  | } | 
|  | return err | 
|  | } | 
|  |  | 
|  | // CopyFrom is only used for the one-time migrate from disk-to-GCS code path. | 
|  | func (gl *GCSLog) CopyFrom(src maintner.MutationSource) error { | 
|  | gl.curNum = 0 | 
|  | ctx := context.Background() | 
|  | for e := range src.GetMutations(ctx) { | 
|  | if e.Err != nil { | 
|  | log.Printf("Corpus.Initialize: %v", e.Err) | 
|  | return e.Err | 
|  | } | 
|  | if e.End { | 
|  | log.Printf("reached end. flushing.") | 
|  | err := gl.flush(ctx) | 
|  | log.Printf("final flush = %v", err) | 
|  | return nil | 
|  | } | 
|  | if err := gl.Log(e.Mutation); err != nil { | 
|  | return err | 
|  | } | 
|  | } | 
|  | panic("unexpected channel close") | 
|  | } | 
|  |  | 
|  | // RegisterHandlers adds handlers for the default paths (/logs and /logs/). | 
|  | func (gl *GCSLog) RegisterHandlers(mux *http.ServeMux) { | 
|  | mux.HandleFunc("/logs", gl.serveJSONLogsIndex) | 
|  | mux.HandleFunc("/logs/", gl.serveLogFile) | 
|  | } |