blob: 2e896959a30f1a2abf9271ef4ada039ee932c9dd [file] [log] [blame]
// Copyright 2017 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package maintner
import (
"context"
"encoding/json"
"errors"
"fmt"
"log"
"net/http"
"strings"
"time"
"golang.org/x/build/cmd/pubsubhelper/pubsubtypes"
)
func (c *Corpus) activityChan(topic string) chan struct{} {
c.mu.Lock()
defer c.mu.Unlock()
if ch, ok := c.activityChans[topic]; ok {
return ch
}
if c.activityChans == nil {
c.activityChans = map[string]chan struct{}{}
}
ch := make(chan struct{}) // unbuffered
c.activityChans[topic] = ch
return ch
}
func (c *Corpus) fire(topic string) {
ch := c.activityChan(topic)
select {
case ch <- struct{}{}:
log.Printf("Pubsub woke up sync for topic %q", topic)
default:
log.Printf("Pubsub event on topic %q discarded; already syncing?", topic)
}
}
// StartPubSubHelperSubscribe starts subscribing to a
// golang.org/x/build/cmd/pubsubhelper server, such
// as https://pubsubhelper.golang.org
func (c *Corpus) StartPubSubHelperSubscribe(urlBase string) {
go c.subscribeLoop(urlBase)
}
func (c *Corpus) subscribeLoop(urlBase string) {
var after time.Time
for {
newAfter, err := c.getEvent(urlBase, after)
if err != nil {
log.Printf("pubsub subscribe: %v", err)
time.Sleep(5 * time.Second)
continue
}
after = newAfter
}
}
var zt time.Time // a zero time.Time
func (c *Corpus) getEvent(urlBase string, after time.Time) (newAfter time.Time, err error) {
var afterStr string
if !after.IsZero() {
afterStr = after.UTC().Format(time.RFC3339Nano)
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
req, _ := http.NewRequest("GET", urlBase+"/waitevent?after="+afterStr, nil)
req = req.WithContext(ctx)
res, err := http.DefaultClient.Do(req)
if err != nil {
return zt, err
}
defer res.Body.Close()
if res.StatusCode != 200 {
return zt, errors.New(res.Status)
}
var evt pubsubtypes.Event
if err := json.NewDecoder(res.Body).Decode(&evt); err != nil {
return zt, err
}
if !evt.LongPollTimeout {
got, _ := json.MarshalIndent(evt, "", "\t")
log.Printf("Got pubsubhelper event: %s", got)
if gh := evt.GitHub; gh != nil {
topic := "github:" + gh.RepoOwner + "/" + gh.Repo
c.fire(topic)
}
if gr := evt.Gerrit; gr != nil {
c.fire(gerritTopicOfEvent(gr))
}
}
return evt.Time.Time(), nil
}
// Return topics like "gerrit:go.googlesource.com/build"
func gerritTopicOfEvent(gr *pubsubtypes.GerritEvent) string {
server := gr.URL // "https://code-review.googlesource.com/11970
if i := strings.Index(server, "//"); i != -1 {
server = server[i+2:] // code-review.googlesource.com/11970
}
if i := strings.Index(server, "/"); i != -1 {
server = server[:i] // code-review.googlesource.com
}
server = strings.Replace(server, "-review.googlesource.com", ".googlesource.com", 1)
return fmt.Sprintf("gerrit:%s/%s", server, gr.Project)
}