| // Copyright 2017 The Go Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package maintner |
| |
| import ( |
| "context" |
| "encoding/json" |
| "fmt" |
| "io" |
| "log" |
| "net/http" |
| "net/url" |
| "reflect" |
| "regexp" |
| "runtime" |
| "slices" |
| "sort" |
| "strconv" |
| "strings" |
| "time" |
| |
| "github.com/google/go-github/v48/github" |
| "github.com/gregjones/httpcache" |
| "golang.org/x/build/maintner/maintpb" |
| "golang.org/x/oauth2" |
| "golang.org/x/sync/errgroup" |
| "golang.org/x/time/rate" |
| "google.golang.org/protobuf/types/known/timestamppb" |
| ) |
| |
| // xFromCache is the synthetic response header added by the httpcache |
| // package for responses fulfilled from cache due to a 304 from the server. |
| const xFromCache = "X-From-Cache" |
| |
| // GitHubRepoID is a GitHub org & repo, lowercase. |
| type GitHubRepoID struct { |
| Owner, Repo string |
| } |
| |
| func (id GitHubRepoID) String() string { return id.Owner + "/" + id.Repo } |
| |
| func (id GitHubRepoID) valid() bool { |
| if id.Owner == "" || id.Repo == "" { |
| // TODO: more validation. whatever GitHub requires. |
| return false |
| } |
| return true |
| } |
| |
| // GitHub holds data about a GitHub repo. |
| type GitHub struct { |
| c *Corpus |
| users map[int64]*GitHubUser |
| teams map[int64]*GitHubTeam |
| repos map[GitHubRepoID]*GitHubRepo |
| } |
| |
| // ForeachRepo calls fn serially for each GitHubRepo, stopping if fn |
| // returns an error. The function is called with lexically increasing |
| // repo IDs. |
| func (g *GitHub) ForeachRepo(fn func(*GitHubRepo) error) error { |
| var ids []GitHubRepoID |
| for id := range g.repos { |
| ids = append(ids, id) |
| } |
| sort.Slice(ids, func(i, j int) bool { |
| if ids[i].Owner < ids[j].Owner { |
| return true |
| } |
| return ids[i].Owner == ids[j].Owner && ids[i].Repo < ids[j].Repo |
| }) |
| for _, id := range ids { |
| if err := fn(g.repos[id]); err != nil { |
| return err |
| } |
| } |
| return nil |
| } |
| |
| // Repo returns the repo if it's known. Otherwise it returns nil. |
| func (g *GitHub) Repo(owner, repo string) *GitHubRepo { |
| return g.repos[GitHubRepoID{owner, repo}] |
| } |
| |
| func (g *GitHub) getOrCreateRepo(owner, repo string) *GitHubRepo { |
| if g == nil { |
| panic("cannot call methods on nil GitHub") |
| } |
| id := GitHubRepoID{owner, repo} |
| if !id.valid() { |
| return nil |
| } |
| r, ok := g.repos[id] |
| if ok { |
| return r |
| } |
| r = &GitHubRepo{ |
| github: g, |
| id: id, |
| issues: map[int32]*GitHubIssue{}, |
| } |
| g.repos[id] = r |
| return r |
| } |
| |
| type GitHubRepo struct { |
| github *GitHub |
| id GitHubRepoID |
| issues map[int32]*GitHubIssue // num -> issue |
| milestones map[int64]*GitHubMilestone |
| labels map[int64]*GitHubLabel |
| } |
| |
| func (gr *GitHubRepo) ID() GitHubRepoID { return gr.id } |
| |
| // Issue returns the provided issue number, or nil if it's not known. |
| func (gr *GitHubRepo) Issue(n int32) *GitHubIssue { return gr.issues[n] } |
| |
| // ForeachLabel calls fn for each label in the repo, in unsorted order. |
| // |
| // Iteration ends if fn returns an error, with that error. |
| func (gr *GitHubRepo) ForeachLabel(fn func(*GitHubLabel) error) error { |
| for _, lb := range gr.labels { |
| if err := fn(lb); err != nil { |
| return err |
| } |
| } |
| return nil |
| } |
| |
| // ForeachMilestone calls fn for each milestone in the repo, in unsorted order. |
| // |
| // Iteration ends if fn returns an error, with that error. |
| func (gr *GitHubRepo) ForeachMilestone(fn func(*GitHubMilestone) error) error { |
| for _, m := range gr.milestones { |
| if err := fn(m); err != nil { |
| return err |
| } |
| } |
| return nil |
| } |
| |
| // ForeachIssue calls fn for each issue in the repo. |
| // |
| // If fn returns an error, iteration ends and ForeachIssue returns |
| // with that error. |
| // |
| // The fn function is called serially, with increasingly numbered |
| // issues. |
| func (gr *GitHubRepo) ForeachIssue(fn func(*GitHubIssue) error) error { |
| s := make([]*GitHubIssue, 0, len(gr.issues)) |
| for _, gi := range gr.issues { |
| s = append(s, gi) |
| } |
| sort.Slice(s, func(i, j int) bool { return s[i].Number < s[j].Number }) |
| for _, gi := range s { |
| if err := fn(gi); err != nil { |
| return err |
| } |
| } |
| return nil |
| } |
| |
| // ForeachReview calls fn for each review event on the issue |
| // |
| // If the issue is not a PullRequest, then it returns early with no error. |
| // |
| // If fn returns an error, iteration ends and ForeachReview returns |
| // with that error. |
| // |
| // The fn function is called serially, in chronological order. |
| func (pr *GitHubIssue) ForeachReview(fn func(*GitHubReview) error) error { |
| if !pr.PullRequest { |
| return nil |
| } |
| s := make([]*GitHubReview, 0, len(pr.reviews)) |
| for _, rv := range pr.reviews { |
| s = append(s, rv) |
| } |
| sort.Slice(s, func(i, j int) bool { return s[i].Created.Before(s[j].Created) }) |
| for _, rv := range s { |
| if err := fn(rv); err != nil { |
| return err |
| } |
| } |
| |
| return nil |
| } |
| |
| func (g *GitHubRepo) getOrCreateMilestone(id int64) *GitHubMilestone { |
| if id == 0 { |
| panic("zero id") |
| } |
| m, ok := g.milestones[id] |
| if ok { |
| return m |
| } |
| if g.milestones == nil { |
| g.milestones = map[int64]*GitHubMilestone{} |
| } |
| m = &GitHubMilestone{ID: id} |
| g.milestones[id] = m |
| return m |
| } |
| |
| func (g *GitHubRepo) getOrCreateLabel(id int64) *GitHubLabel { |
| if id == 0 { |
| panic("zero id") |
| } |
| lb, ok := g.labels[id] |
| if ok { |
| return lb |
| } |
| if g.labels == nil { |
| g.labels = map[int64]*GitHubLabel{} |
| } |
| lb = &GitHubLabel{ID: id} |
| g.labels[id] = lb |
| return lb |
| } |
| |
| func (g *GitHubRepo) verbose() bool { |
| return g.github != nil && g.github.c != nil && g.github.c.verbose |
| } |
| |
| // GitHubUser represents a GitHub user. |
| // It is a subset of https://developer.github.com/v3/users/#get-a-single-user |
| type GitHubUser struct { |
| ID int64 |
| Login string |
| } |
| |
| // GitHubTeam represents a GitHub team. |
| // It is a subset of https://developer.github.com/v3/orgs/teams/#get-team |
| type GitHubTeam struct { |
| ID int64 |
| |
| // Slug is a URL-friendly representation of the team name. |
| // It is unique across a GitHub organization. |
| Slug string |
| } |
| |
| // GitHubIssueRef is a reference to an issue (or pull request) number |
| // in a repo. These are parsed from text making references such as |
| // "golang/go#1234" or just "#1234" (with an implicit Repo). |
| type GitHubIssueRef struct { |
| Repo *GitHubRepo // must be non-nil |
| Number int32 // GitHubIssue.Number |
| } |
| |
| func (r GitHubIssueRef) String() string { return fmt.Sprintf("%s#%d", r.Repo.ID(), r.Number) } |
| |
| // GitHubIssue represents a GitHub issue. |
| // This is maintner's in-memory representation. It differs slightly |
| // from the API's *github.Issue type, notably in the lack of pointers |
| // for all fields. |
| // See https://developer.github.com/v3/issues/#get-a-single-issue |
| type GitHubIssue struct { |
| ID int64 |
| Number int32 |
| NotExist bool // if true, rest of fields should be ignored. |
| Closed bool |
| Locked bool |
| PullRequest bool // if true, this issue is a Pull Request. All PRs are issues, but not all issues are PRs. |
| User *GitHubUser |
| Assignees []*GitHubUser |
| Created time.Time |
| Updated time.Time |
| ClosedAt time.Time |
| ClosedBy *GitHubUser // TODO(dmitshur): Implement (see golang.org/issue/28745). |
| Title string |
| Body string |
| Milestone *GitHubMilestone // nil for unknown, noMilestone for none |
| Labels map[int64]*GitHubLabel // label ID => label |
| |
| commentsUpdatedTil time.Time // max comment modtime seen |
| commentsSyncedAsOf time.Time // as of server's Date header |
| comments map[int64]*GitHubComment // by comment.ID |
| eventMaxTime time.Time // latest time of any event in events map |
| eventsSyncedAsOf time.Time // as of server's Date header |
| reviewsSyncedAsOf time.Time // as of server's Date header |
| events map[int64]*GitHubIssueEvent // by event.ID |
| reviews map[int64]*GitHubReview // by event.ID |
| } |
| |
| // LastModified reports the most recent time that any known metadata was updated. |
| // In contrast to the Updated field, LastModified includes comments and events. |
| // |
| // TODO(bradfitz): this seems to not be working, at least events |
| // aren't updating it. Investigate. |
| func (gi *GitHubIssue) LastModified() time.Time { |
| ret := gi.Updated |
| if gi.commentsUpdatedTil.After(ret) { |
| ret = gi.commentsUpdatedTil |
| } |
| if gi.eventMaxTime.After(ret) { |
| ret = gi.eventMaxTime |
| } |
| return ret |
| } |
| |
| // HasEvent reports whether there's any GitHubIssueEvent in this |
| // issue's history of the given type. |
| func (gi *GitHubIssue) HasEvent(eventType string) bool { |
| for _, e := range gi.events { |
| if e.Type == eventType { |
| return true |
| } |
| } |
| return false |
| } |
| |
| // ForeachEvent calls fn for each event on the issue. |
| // |
| // If fn returns an error, iteration ends and ForeachEvent returns |
| // with that error. |
| // |
| // The fn function is called serially, in order of the event's time. |
| func (gi *GitHubIssue) ForeachEvent(fn func(*GitHubIssueEvent) error) error { |
| // TODO: keep these sorted in the corpus |
| s := make([]*GitHubIssueEvent, 0, len(gi.events)) |
| for _, e := range gi.events { |
| s = append(s, e) |
| } |
| sort.Slice(s, func(i, j int) bool { |
| ci, cj := s[i].Created, s[j].Created |
| if ci.Before(cj) { |
| return true |
| } |
| return ci.Equal(cj) && s[i].ID < s[j].ID |
| }) |
| for _, e := range s { |
| if err := fn(e); err != nil { |
| return err |
| } |
| } |
| return nil |
| } |
| |
| // ForeachComment calls fn for each event on the issue. |
| // |
| // If fn returns an error, iteration ends and ForeachComment returns |
| // with that error. |
| // |
| // The fn function is called serially, in order of the comment's time. |
| func (gi *GitHubIssue) ForeachComment(fn func(*GitHubComment) error) error { |
| // TODO: keep these sorted in the corpus |
| s := make([]*GitHubComment, 0, len(gi.comments)) |
| for _, e := range gi.comments { |
| s = append(s, e) |
| } |
| sort.Slice(s, func(i, j int) bool { |
| ci, cj := s[i].Created, s[j].Created |
| if ci.Before(cj) { |
| return true |
| } |
| return ci.Equal(cj) && s[i].ID < s[j].ID |
| }) |
| for _, e := range s { |
| if err := fn(e); err != nil { |
| return err |
| } |
| } |
| return nil |
| } |
| |
| // HasLabel reports whether the issue is labeled with the given label. |
| func (gi *GitHubIssue) HasLabel(label string) bool { |
| for _, lb := range gi.Labels { |
| if lb.Name == label { |
| return true |
| } |
| } |
| return false |
| } |
| |
| // HasLabelID returns whether the issue has a label with the given ID. |
| func (gi *GitHubIssue) HasLabelID(id int64) bool { |
| _, ok := gi.Labels[id] |
| return ok |
| } |
| |
| func (gi *GitHubIssue) getCreatedAt() time.Time { |
| if gi == nil { |
| return time.Time{} |
| } |
| return gi.Created |
| } |
| |
| func (gi *GitHubIssue) getUpdatedAt() time.Time { |
| if gi == nil { |
| return time.Time{} |
| } |
| return gi.Updated |
| } |
| |
| func (gi *GitHubIssue) getClosedAt() time.Time { |
| if gi == nil { |
| return time.Time{} |
| } |
| return gi.ClosedAt |
| } |
| |
| // noMilestone is a sentinel value to explicitly mean no milestone. |
| var noMilestone = new(GitHubMilestone) |
| |
| type GitHubLabel struct { |
| ID int64 |
| Name string |
| // TODO: color? |
| } |
| |
| // GenMutationDiff generates a diff from in-memory state 'a' (which |
| // may be nil) to the current (non-nil) state b from GitHub. It |
| // returns nil if there's no difference. |
| func (a *GitHubLabel) GenMutationDiff(b *github.Label) *maintpb.GithubLabel { |
| id := int64(b.GetID()) |
| if a != nil && a.ID == id && a.Name == b.GetName() { |
| // No change. |
| return nil |
| } |
| return &maintpb.GithubLabel{Id: id, Name: b.GetName()} |
| } |
| |
| func (lb *GitHubLabel) processMutation(mut maintpb.GithubLabel) { |
| if lb.ID == 0 { |
| panic("bogus label ID 0") |
| } |
| if lb.ID != mut.Id { |
| panic(fmt.Sprintf("label ID = %v != mutation ID = %v", lb.ID, mut.Id)) |
| } |
| if mut.Name != "" { |
| lb.Name = mut.Name |
| } |
| } |
| |
| type GitHubMilestone struct { |
| ID int64 |
| Title string |
| Number int32 |
| Closed bool |
| } |
| |
| // IsNone reports whether ms represents the sentinel "no milestone" milestone. |
| func (ms *GitHubMilestone) IsNone() bool { return ms == noMilestone } |
| |
| // IsUnknown reports whether ms is nil, which represents the unknown |
| // state. Milestones should never be in this state, though. |
| func (ms *GitHubMilestone) IsUnknown() bool { return ms == nil } |
| |
| // emptyMilestone is a non-nil *githubMilestone with zero values for |
| // all fields. |
| var emptyMilestone = new(GitHubMilestone) |
| |
| // GenMutationDiff generates a diff from in-memory state 'a' (which |
| // may be nil) to the current (non-nil) state b from GitHub. It |
| // returns nil if there's no difference. |
| func (a *GitHubMilestone) GenMutationDiff(b *github.Milestone) *maintpb.GithubMilestone { |
| var ret *maintpb.GithubMilestone // lazily inited by diff |
| diff := func() *maintpb.GithubMilestone { |
| if ret == nil { |
| ret = &maintpb.GithubMilestone{Id: int64(b.GetID())} |
| } |
| return ret |
| } |
| if a == nil { |
| a = emptyMilestone |
| } |
| if a.Title != b.GetTitle() { |
| diff().Title = b.GetTitle() |
| } |
| if a.Number != int32(b.GetNumber()) { |
| diff().Number = int64(b.GetNumber()) |
| } |
| if closed := b.GetState() == "closed"; a.Closed != closed { |
| diff().Closed = &maintpb.BoolChange{Val: closed} |
| } |
| return ret |
| } |
| |
| func (ms *GitHubMilestone) processMutation(mut maintpb.GithubMilestone) { |
| if ms.ID == 0 { |
| panic("bogus milestone ID 0") |
| } |
| if ms.ID != mut.Id { |
| panic(fmt.Sprintf("milestone ID = %v != mutation ID = %v", ms.ID, mut.Id)) |
| } |
| if mut.Title != "" { |
| ms.Title = mut.Title |
| } |
| if mut.Number != 0 { |
| ms.Number = int32(mut.Number) |
| } |
| if mut.Closed != nil { |
| ms.Closed = mut.Closed.Val |
| } |
| } |
| |
| // GitHubReview represents a review on a Pull Request. |
| // For more details, see https://developer.github.com/v3/pulls/reviews/ |
| type GitHubReview struct { |
| ID int64 |
| Actor *GitHubUser |
| Body string |
| State string // COMMENTED, APPROVED, CHANGES_REQUESTED |
| CommitID string |
| ActorAssociation string // CONTRIBUTOR |
| Created time.Time |
| OtherJSON string |
| } |
| |
| // Proto converts GitHubReview to a protobuf |
| func (e *GitHubReview) Proto() *maintpb.GithubReview { |
| p := &maintpb.GithubReview{ |
| Id: e.ID, |
| Body: e.Body, |
| State: e.State, |
| CommitId: e.CommitID, |
| ActorAssociation: e.ActorAssociation, |
| } |
| if e.OtherJSON != "" { |
| p.OtherJson = []byte(e.OtherJSON) |
| } |
| if !e.Created.IsZero() { |
| p.Created = timestamppb.New(e.Created) |
| } |
| if e.Actor != nil { |
| p.ActorId = e.Actor.ID |
| } |
| |
| return p |
| } |
| |
| // r.github.c.mu must be held. |
| func (r *GitHubRepo) newGithubReview(p *maintpb.GithubReview) *GitHubReview { |
| g := r.github |
| e := &GitHubReview{ |
| ID: p.Id, |
| Actor: g.getOrCreateUserID(p.ActorId), |
| ActorAssociation: p.ActorAssociation, |
| CommitID: p.CommitId, |
| Body: p.Body, |
| State: p.State, |
| } |
| |
| if p.Created != nil { |
| e.Created = p.Created.AsTime() |
| } |
| if len(p.OtherJson) > 0 { |
| // TODO: parse it and see if we've since learned how |
| // to deal with it? |
| if r.verbose() { |
| log.Printf("newGithubReview: unknown JSON in log: %s", p.OtherJson) |
| } |
| e.OtherJSON = string(p.OtherJson) |
| } |
| |
| return e |
| } |
| |
| type GitHubComment struct { |
| ID int64 |
| User *GitHubUser |
| Created time.Time |
| Updated time.Time |
| Body string |
| } |
| |
| // GitHubDismissedReview is the contents of a dismissed review event. For more |
| // details, see https://developer.github.com/v3/issues/events/. |
| type GitHubDismissedReviewEvent struct { |
| ReviewID int64 |
| State string // commented, approved, changes_requested |
| DismissalMessage string |
| } |
| |
| type GitHubIssueEvent struct { |
| // TODO: this struct is a little wide. change it to an interface |
| // instead? Maybe later, if memory profiling suggests it would help. |
| |
| // ID is the ID of the event. |
| ID int64 |
| |
| // Type is one of: |
| // * labeled, unlabeled |
| // * milestoned, demilestoned |
| // * assigned, unassigned |
| // * locked, unlocked |
| // * closed |
| // * referenced |
| // * renamed |
| // * reopened |
| // * comment_deleted |
| // * head_ref_restored |
| // * base_ref_changed |
| // * subscribed |
| // * mentioned |
| // * review_requested, review_request_removed, review_dismissed |
| Type string |
| |
| // OtherJSON optionally contains a JSON object of GitHub's API |
| // response for any fields maintner was unable to extract at |
| // the time. It is empty if maintner supported all the fields |
| // when the mutation was created. |
| OtherJSON string |
| |
| Created time.Time |
| Actor *GitHubUser |
| |
| Label string // for type: "unlabeled", "labeled" |
| Assignee *GitHubUser // for type: "assigned", "unassigned" |
| Assigner *GitHubUser // for type: "assigned", "unassigned" |
| Milestone string // for type: "milestoned", "demilestoned" |
| From, To string // for type: "renamed" |
| CommitID, CommitURL string // for type: "closed", "referenced" ... ? |
| |
| Reviewer *GitHubUser |
| TeamReviewer *GitHubTeam |
| ReviewRequester *GitHubUser |
| DismissedReview *GitHubDismissedReviewEvent |
| } |
| |
| func (e *GitHubIssueEvent) Proto() *maintpb.GithubIssueEvent { |
| p := &maintpb.GithubIssueEvent{ |
| Id: e.ID, |
| EventType: e.Type, |
| RenameFrom: e.From, |
| RenameTo: e.To, |
| } |
| if e.OtherJSON != "" { |
| p.OtherJson = []byte(e.OtherJSON) |
| } |
| if !e.Created.IsZero() { |
| p.Created = timestamppb.New(e.Created) |
| } |
| if e.Actor != nil { |
| p.ActorId = e.Actor.ID |
| } |
| if e.Assignee != nil { |
| p.AssigneeId = e.Assignee.ID |
| } |
| if e.Assigner != nil { |
| p.AssignerId = e.Assigner.ID |
| } |
| if e.Label != "" { |
| p.Label = &maintpb.GithubLabel{Name: e.Label} |
| } |
| if e.Milestone != "" { |
| p.Milestone = &maintpb.GithubMilestone{Title: e.Milestone} |
| } |
| if e.CommitID != "" { |
| c := &maintpb.GithubCommit{CommitId: e.CommitID} |
| if m := rxGithubCommitURL.FindStringSubmatch(e.CommitURL); m != nil { |
| c.Owner = m[1] |
| c.Repo = m[2] |
| } |
| p.Commit = c |
| } |
| if e.Reviewer != nil { |
| p.ReviewerId = e.Reviewer.ID |
| } |
| if e.TeamReviewer != nil { |
| p.TeamReviewer = &maintpb.GithubTeam{ |
| Id: e.TeamReviewer.ID, |
| Slug: e.TeamReviewer.Slug, |
| } |
| } |
| if e.ReviewRequester != nil { |
| p.ReviewRequesterId = e.ReviewRequester.ID |
| } |
| if e.DismissedReview != nil { |
| p.DismissedReview = &maintpb.GithubDismissedReviewEvent{ |
| ReviewId: e.DismissedReview.ReviewID, |
| State: e.DismissedReview.State, |
| DismissalMessage: e.DismissedReview.DismissalMessage, |
| } |
| } |
| return p |
| } |
| |
| var rxGithubCommitURL = regexp.MustCompile(`^https://api\.github\.com/repos/([^/]+)/([^/]+)/commits/`) |
| |
| // r.github.c.mu must be held. |
| func (r *GitHubRepo) newGithubEvent(p *maintpb.GithubIssueEvent) *GitHubIssueEvent { |
| g := r.github |
| e := &GitHubIssueEvent{ |
| ID: p.Id, |
| Type: p.EventType, |
| Actor: g.getOrCreateUserID(p.ActorId), |
| Assignee: g.getOrCreateUserID(p.AssigneeId), |
| Assigner: g.getOrCreateUserID(p.AssignerId), |
| Reviewer: g.getOrCreateUserID(p.ReviewerId), |
| TeamReviewer: g.getTeam(p.TeamReviewer), |
| ReviewRequester: g.getOrCreateUserID(p.ReviewRequesterId), |
| From: p.RenameFrom, |
| To: p.RenameTo, |
| } |
| if p.Created != nil { |
| e.Created = p.Created.AsTime() |
| } |
| if len(p.OtherJson) > 0 { |
| // TODO: parse it and see if we've since learned how |
| // to deal with it? |
| if r.verbose() { |
| log.Printf("newGithubEvent: unknown JSON in log: %s", p.OtherJson) |
| } |
| e.OtherJSON = string(p.OtherJson) |
| } |
| if p.Label != nil { |
| e.Label = g.c.str(p.Label.Name) |
| } |
| if p.Milestone != nil { |
| e.Milestone = g.c.str(p.Milestone.Title) |
| } |
| if c := p.Commit; c != nil { |
| e.CommitID = c.CommitId |
| if c.Owner != "" && c.Repo != "" { |
| // TODO: this field is dumb. break it down. |
| e.CommitURL = "https://api.github.com/repos/" + c.Owner + "/" + c.Repo + "/commits/" + c.CommitId |
| } |
| } |
| if d := p.DismissedReview; d != nil { |
| e.DismissedReview = &GitHubDismissedReviewEvent{ |
| ReviewID: d.ReviewId, |
| State: d.State, |
| DismissalMessage: d.DismissalMessage, |
| } |
| } |
| return e |
| } |
| |
| // (requires corpus be locked for reads) |
| func (gi *GitHubIssue) commentsSynced() bool { |
| if gi.NotExist { |
| // Issue doesn't exist, so can't sync its non-issues, |
| // so consider it done. |
| return true |
| } |
| return gi.commentsSyncedAsOf.After(gi.Updated) |
| } |
| |
| // (requires corpus be locked for reads) |
| func (gi *GitHubIssue) eventsSynced() bool { |
| if gi.NotExist { |
| // Issue doesn't exist, so can't sync its non-issues, |
| // so consider it done. |
| return true |
| } |
| return gi.eventsSyncedAsOf.After(gi.Updated) |
| } |
| |
| // (requires corpus be locked for reads) |
| func (gi *GitHubIssue) reviewsSynced() bool { |
| if gi.NotExist { |
| // Issue doesn't exist, so can't sync its non-issues, |
| // so consider it done. |
| return true |
| } |
| return gi.reviewsSyncedAsOf.After(gi.Updated) |
| } |
| |
| func (c *Corpus) initGithub() { |
| if c.github != nil { |
| return |
| } |
| c.github = &GitHub{ |
| c: c, |
| repos: map[GitHubRepoID]*GitHubRepo{}, |
| } |
| } |
| |
| // SetGitHubLimiter sets a limiter that controls the rate of requests made |
| // to GitHub APIs. If nil, requests are not limited. Only valid in leader mode. |
| // The limiter must only be set before Sync or SyncLoop is called. |
| func (c *Corpus) SetGitHubLimiter(l *rate.Limiter) { |
| c.githubLimiter = l |
| } |
| |
| // TrackGitHub registers the named GitHub repo as a repo to |
| // watch and append to the mutation log. Only valid in leader mode. |
| // The token is the auth token to use to make API calls. |
| func (c *Corpus) TrackGitHub(owner, repo, token string) { |
| if c.mutationLogger == nil { |
| panic("can't TrackGitHub in non-leader mode") |
| } |
| |
| c.mu.Lock() |
| defer c.mu.Unlock() |
| c.initGithub() |
| gr := c.github.getOrCreateRepo(owner, repo) |
| if gr == nil { |
| log.Fatalf("invalid github owner/repo %q/%q", owner, repo) |
| } |
| c.watchedGithubRepos = append(c.watchedGithubRepos, watchedGithubRepo{ |
| gr: gr, |
| token: token, |
| }) |
| } |
| |
| type watchedGithubRepo struct { |
| gr *GitHubRepo |
| token string |
| } |
| |
| // g.c.mu must be held |
| func (g *GitHub) getUser(pu *maintpb.GithubUser) *GitHubUser { |
| if pu == nil { |
| return nil |
| } |
| if u := g.users[pu.Id]; u != nil { |
| if pu.Login != "" && pu.Login != u.Login { |
| u.Login = pu.Login |
| } |
| return u |
| } |
| if g.users == nil { |
| g.users = make(map[int64]*GitHubUser) |
| } |
| u := &GitHubUser{ |
| ID: pu.Id, |
| Login: pu.Login, |
| } |
| g.users[pu.Id] = u |
| return u |
| } |
| |
| func (g *GitHub) getOrCreateUserID(id int64) *GitHubUser { |
| if id == 0 { |
| return nil |
| } |
| if u := g.users[id]; u != nil { |
| return u |
| } |
| if g.users == nil { |
| g.users = make(map[int64]*GitHubUser) |
| } |
| u := &GitHubUser{ID: id} |
| g.users[id] = u |
| return u |
| } |
| |
| // g.c.mu must be held |
| func (g *GitHub) getTeam(pt *maintpb.GithubTeam) *GitHubTeam { |
| if pt == nil { |
| return nil |
| } |
| if g.teams == nil { |
| g.teams = make(map[int64]*GitHubTeam) |
| } |
| |
| t := g.teams[pt.Id] |
| if t == nil { |
| t = &GitHubTeam{ |
| ID: pt.Id, |
| } |
| g.teams[pt.Id] = t |
| } |
| if pt.Slug != "" { |
| t.Slug = pt.Slug |
| } |
| return t |
| } |
| |
| // newGithubUserProto creates a GithubUser with the minimum diff between |
| // existing and g. The return value is nil if there were no changes. existing |
| // may also be nil. |
| func newGithubUserProto(existing *GitHubUser, g *github.User) *maintpb.GithubUser { |
| if g == nil { |
| return nil |
| } |
| id := int64(g.GetID()) |
| if existing == nil { |
| return &maintpb.GithubUser{ |
| Id: id, |
| Login: g.GetLogin(), |
| } |
| } |
| hasChanges := false |
| u := &maintpb.GithubUser{Id: id} |
| if login := g.GetLogin(); existing.Login != login { |
| u.Login = login |
| hasChanges = true |
| } |
| // Add more fields here |
| if hasChanges { |
| return u |
| } |
| return nil |
| } |
| |
| // deletedAssignees returns an array of user ID's that are present in existing |
| // but not present in new. |
| func deletedAssignees(existing []*GitHubUser, new []*github.User) []int64 { |
| mp := make(map[int64]bool, len(existing)) |
| for _, u := range new { |
| id := int64(u.GetID()) |
| mp[id] = true |
| } |
| toDelete := []int64{} |
| for _, u := range existing { |
| if _, ok := mp[u.ID]; !ok { |
| toDelete = append(toDelete, u.ID) |
| } |
| } |
| return toDelete |
| } |
| |
| // newAssignees returns an array of diffs between existing and new. New users in |
| // new will be present in the returned array in their entirety. Modified users |
| // will appear containing only the ID field and changed fields. Unmodified users |
| // will not appear in the returned array. |
| func newAssignees(existing []*GitHubUser, new []*github.User) []*maintpb.GithubUser { |
| mp := make(map[int64]*GitHubUser, len(existing)) |
| for _, u := range existing { |
| mp[u.ID] = u |
| } |
| changes := []*maintpb.GithubUser{} |
| for _, u := range new { |
| if existingUser, ok := mp[int64(u.GetID())]; ok { |
| diffUser := &maintpb.GithubUser{ |
| Id: int64(u.GetID()), |
| } |
| hasDiff := false |
| if login := u.GetLogin(); existingUser.Login != login { |
| diffUser.Login = login |
| hasDiff = true |
| } |
| // check more User fields for diffs here, as we add them to the proto |
| |
| if hasDiff { |
| changes = append(changes, diffUser) |
| } |
| } else { |
| changes = append(changes, &maintpb.GithubUser{ |
| Id: int64(u.GetID()), |
| Login: u.GetLogin(), |
| }) |
| } |
| } |
| return changes |
| } |
| |
| // setAssigneesFromProto returns a new array of assignees according to the |
| // instructions in new (adds or modifies users in existing), and toDelete |
| // (deletes them). c.mu must be held. |
| func (g *GitHub) setAssigneesFromProto(existing []*GitHubUser, new []*maintpb.GithubUser, toDelete []int64) []*GitHubUser { |
| c := g.c |
| mp := make(map[int64]*GitHubUser) |
| for _, u := range existing { |
| mp[u.ID] = u |
| } |
| for _, u := range new { |
| if existingUser, ok := mp[u.Id]; ok { |
| if u.Login != "" { |
| existingUser.Login = u.Login |
| } |
| // TODO: add other fields here when we add them for user. |
| } else { |
| c.debugf("adding assignee %q", u.Login) |
| existing = append(existing, g.getUser(u)) |
| } |
| } |
| // this is quadratic but the number of assignees is very unlikely to exceed, |
| // say, 5. |
| existing = slices.DeleteFunc(existing, func(u *GitHubUser) bool { |
| return slices.Contains(toDelete, u.ID) |
| }) |
| return existing |
| } |
| |
| // githubIssueDiffer generates a minimal diff (protobuf mutation) to |
| // get a GitHub Issue from its in-memory state 'a' to the current |
| // GitHub API state 'b'. |
| type githubIssueDiffer struct { |
| gr *GitHubRepo |
| a *GitHubIssue // may be nil if no current state |
| b *github.Issue // may NOT be nil |
| } |
| |
| // returns nil if no changes. |
| func (d githubIssueDiffer) Diff() *maintpb.GithubIssueMutation { |
| var changed bool |
| m := &maintpb.GithubIssueMutation{ |
| Owner: d.gr.id.Owner, |
| Repo: d.gr.id.Repo, |
| Number: int32(d.b.GetNumber()), |
| PullRequest: d.b.IsPullRequest(), |
| } |
| for _, f := range issueDiffMethods { |
| if f(d, m) { |
| if d.gr.verbose() { |
| fname := strings.TrimPrefix(runtime.FuncForPC(reflect.ValueOf(f).Pointer()).Name(), "golang.org/x/build/maintner.githubIssueDiffer.") |
| log.Printf("Issue %d changed: %v", d.b.GetNumber(), fname) |
| } |
| changed = true |
| } |
| } |
| if !changed { |
| return nil |
| } |
| return m |
| } |
| |
| // issueDiffMethods are the different steps githubIssueDiffer.Diff |
| // goes through to compute a diff. The methods should return true if |
| // any change was made. The order is irrelevant unless otherwise |
| // documented in comments in the list below. |
| var issueDiffMethods = []func(githubIssueDiffer, *maintpb.GithubIssueMutation) bool{ |
| githubIssueDiffer.diffCreatedAt, |
| githubIssueDiffer.diffUpdatedAt, |
| githubIssueDiffer.diffUser, |
| githubIssueDiffer.diffBody, |
| githubIssueDiffer.diffTitle, |
| githubIssueDiffer.diffMilestone, |
| githubIssueDiffer.diffAssignees, |
| githubIssueDiffer.diffClosedState, |
| githubIssueDiffer.diffClosedAt, |
| githubIssueDiffer.diffClosedBy, |
| githubIssueDiffer.diffLockedState, |
| githubIssueDiffer.diffLabels, |
| } |
| |
| func (d githubIssueDiffer) diffCreatedAt(m *maintpb.GithubIssueMutation) bool { |
| return d.diffTimeField(&m.Created, d.a.getCreatedAt(), d.b.GetCreatedAt()) |
| } |
| |
| func (d githubIssueDiffer) diffUpdatedAt(m *maintpb.GithubIssueMutation) bool { |
| return d.diffTimeField(&m.Updated, d.a.getUpdatedAt(), d.b.GetUpdatedAt()) |
| } |
| |
| func (d githubIssueDiffer) diffClosedAt(m *maintpb.GithubIssueMutation) bool { |
| return d.diffTimeField(&m.ClosedAt, d.a.getClosedAt(), d.b.GetClosedAt()) |
| } |
| |
| func (d githubIssueDiffer) diffTimeField(dst **timestamppb.Timestamp, memTime, githubTime time.Time) bool { |
| if githubTime.IsZero() || memTime.Equal(githubTime) { |
| return false |
| } |
| *dst = timestamppb.New(githubTime) |
| return true |
| } |
| |
| func (d githubIssueDiffer) diffUser(m *maintpb.GithubIssueMutation) bool { |
| var existing *GitHubUser |
| if d.a != nil { |
| existing = d.a.User |
| } |
| m.User = newGithubUserProto(existing, d.b.User) |
| return m.User != nil |
| } |
| |
| func (d githubIssueDiffer) diffClosedBy(m *maintpb.GithubIssueMutation) bool { |
| var existing *GitHubUser |
| if d.a != nil { |
| existing = d.a.ClosedBy |
| } |
| m.ClosedBy = newGithubUserProto(existing, d.b.ClosedBy) |
| return m.ClosedBy != nil |
| } |
| |
| func (d githubIssueDiffer) diffBody(m *maintpb.GithubIssueMutation) bool { |
| if d.a != nil && d.a.Body == d.b.GetBody() { |
| return false |
| } |
| m.BodyChange = &maintpb.StringChange{Val: d.b.GetBody()} |
| return true |
| } |
| |
| func (d githubIssueDiffer) diffTitle(m *maintpb.GithubIssueMutation) bool { |
| if d.a != nil && d.a.Title == d.b.GetTitle() { |
| return false |
| } |
| m.Title = d.b.GetTitle() |
| // TODO: emit a StringChange if we ever have a problem that we |
| // legitimately need real issues with no titles reflected in |
| // maintner's model. For now just ignore such changes, if |
| // GitHub even permits the. |
| return m.Title != "" |
| } |
| |
| func (d githubIssueDiffer) diffMilestone(m *maintpb.GithubIssueMutation) bool { |
| if d.a != nil && d.a.Milestone != nil { |
| ma, mb := d.a.Milestone, d.b.Milestone |
| if ma == noMilestone && d.b.Milestone == nil { |
| // Unchanged. Still no milestone. |
| return false |
| } |
| if mb != nil && ma.ID == int64(mb.GetID()) { |
| // Unchanged. Same milestone. |
| // TODO: detect milestone renames and emit mutation for that? |
| return false |
| } |
| |
| } |
| if mb := d.b.Milestone; mb != nil { |
| m.MilestoneId = int64(mb.GetID()) |
| m.MilestoneNum = int64(mb.GetNumber()) |
| m.MilestoneTitle = mb.GetTitle() |
| } else { |
| m.NoMilestone = true |
| } |
| return true |
| } |
| |
| func (d githubIssueDiffer) diffAssignees(m *maintpb.GithubIssueMutation) bool { |
| if d.a == nil { |
| m.Assignees = newAssignees(nil, d.b.Assignees) |
| return true |
| } |
| m.Assignees = newAssignees(d.a.Assignees, d.b.Assignees) |
| m.DeletedAssignees = deletedAssignees(d.a.Assignees, d.b.Assignees) |
| return len(m.Assignees) > 0 || len(m.DeletedAssignees) > 0 |
| } |
| |
| func (d githubIssueDiffer) diffLabels(m *maintpb.GithubIssueMutation) bool { |
| // Common case: no changes. Return false quickly without allocations. |
| if d.a != nil && len(d.a.Labels) == len(d.b.Labels) { |
| missing := false |
| for _, gl := range d.b.Labels { |
| if _, ok := d.a.Labels[int64(gl.GetID())]; !ok { |
| missing = true |
| break |
| } |
| } |
| if !missing { |
| return false |
| } |
| } |
| |
| toAdd := map[int64]*maintpb.GithubLabel{} |
| for _, gl := range d.b.Labels { |
| id := int64(gl.GetID()) |
| if id == 0 { |
| panic("zero label ID") |
| } |
| toAdd[id] = &maintpb.GithubLabel{Id: id, Name: gl.GetName()} |
| } |
| |
| var toDelete []int64 |
| if d.a != nil { |
| for id := range d.a.Labels { |
| if _, ok := toAdd[id]; ok { |
| // Already had it. |
| delete(toAdd, id) |
| } else { |
| // We had it, but no longer. |
| toDelete = append(toDelete, id) |
| } |
| } |
| } |
| |
| m.RemoveLabel = toDelete |
| for _, labpb := range toAdd { |
| m.AddLabel = append(m.AddLabel, labpb) |
| } |
| |
| return len(m.RemoveLabel) > 0 || len(m.AddLabel) > 0 |
| } |
| |
| func (d githubIssueDiffer) diffClosedState(m *maintpb.GithubIssueMutation) bool { |
| bclosed := d.b.GetState() == "closed" |
| if d.a != nil && d.a.Closed == bclosed { |
| return false |
| } |
| m.Closed = &maintpb.BoolChange{Val: bclosed} |
| return true |
| } |
| |
| func (d githubIssueDiffer) diffLockedState(m *maintpb.GithubIssueMutation) bool { |
| if d.a != nil && d.a.Locked == d.b.GetLocked() { |
| return false |
| } |
| if d.a == nil && !d.b.GetLocked() { |
| return false |
| } |
| m.Locked = &maintpb.BoolChange{Val: d.b.GetLocked()} |
| return true |
| } |
| |
| // newMutationFromIssue generates a GithubIssueMutation using the |
| // smallest possible diff between a (the state we have in memory in |
| // the corpus) and b (the current GitHub API state). |
| // |
| // If newMutationFromIssue returns nil, the provided github.Issue is no newer |
| // than the data we have in the corpus. 'a' may be nil. |
| func (r *GitHubRepo) newMutationFromIssue(a *GitHubIssue, b *github.Issue) *maintpb.Mutation { |
| if b == nil || b.Number == nil { |
| panic(fmt.Sprintf("github issue with nil number: %#v", b)) |
| } |
| gim := githubIssueDiffer{gr: r, a: a, b: b}.Diff() |
| if gim == nil { |
| // No changes. |
| return nil |
| } |
| return &maintpb.Mutation{GithubIssue: gim} |
| } |
| |
| func (r *GitHubRepo) missingIssues() []int32 { |
| c := r.github.c |
| c.mu.RLock() |
| defer c.mu.RUnlock() |
| |
| var maxNum int32 |
| for num := range r.issues { |
| if num > maxNum { |
| maxNum = num |
| } |
| } |
| |
| var missing []int32 |
| for num := int32(1); num < maxNum; num++ { |
| if _, ok := r.issues[num]; !ok { |
| missing = append(missing, num) |
| } |
| } |
| return missing |
| } |
| |
| // processGithubMutation updates the corpus with the information in m. |
| func (c *Corpus) processGithubMutation(m *maintpb.GithubMutation) { |
| if c == nil { |
| panic("nil corpus") |
| } |
| c.initGithub() |
| gr := c.github.getOrCreateRepo(m.Owner, m.Repo) |
| if gr == nil { |
| log.Printf("bogus Owner/Repo %q/%q in mutation: %v", m.Owner, m.Repo, m) |
| return |
| } |
| for _, lp := range m.Labels { |
| lb := gr.getOrCreateLabel(lp.Id) |
| lb.processMutation(*lp) |
| } |
| for _, mp := range m.Milestones { |
| ms := gr.getOrCreateMilestone(mp.Id) |
| ms.processMutation(*mp) |
| } |
| } |
| |
| // processGithubIssueMutation updates the corpus with the information in m. |
| func (c *Corpus) processGithubIssueMutation(m *maintpb.GithubIssueMutation) { |
| if c == nil { |
| panic("nil corpus") |
| } |
| c.initGithub() |
| gr := c.github.getOrCreateRepo(m.Owner, m.Repo) |
| if gr == nil { |
| log.Printf("bogus Owner/Repo %q/%q in mutation: %v", m.Owner, m.Repo, m) |
| return |
| } |
| if m.Number == 0 { |
| log.Printf("bogus zero Number in mutation: %v", m) |
| return |
| } |
| gi, ok := gr.issues[m.Number] |
| if !ok { |
| gi = &GitHubIssue{ |
| // User added below |
| Number: m.Number, |
| ID: m.Id, |
| } |
| if gr.issues == nil { |
| gr.issues = make(map[int32]*GitHubIssue) |
| } |
| gr.issues[m.Number] = gi |
| |
| if m.NotExist { |
| gi.NotExist = true |
| return |
| } |
| |
| gi.Created = m.Created.AsTime() |
| } |
| if m.NotExist != gi.NotExist { |
| gi.NotExist = m.NotExist |
| } |
| if gi.NotExist { |
| return |
| } |
| |
| // Check Updated before all other fields so they don't update if this |
| // Mutation is stale |
| // (ignoring Created since it *should* never update) |
| if m.Updated != nil { |
| gi.Updated = m.Updated.AsTime() |
| } |
| if m.ClosedAt != nil { |
| gi.ClosedAt = m.ClosedAt.AsTime() |
| } |
| if m.User != nil { |
| gi.User = c.github.getUser(m.User) |
| } |
| if m.NoMilestone { |
| gi.Milestone = noMilestone |
| } else if m.MilestoneId != 0 { |
| ms := gr.getOrCreateMilestone(m.MilestoneId) |
| ms.processMutation(maintpb.GithubMilestone{ |
| Id: m.MilestoneId, |
| Title: m.MilestoneTitle, |
| Number: m.MilestoneNum, |
| }) |
| gi.Milestone = ms |
| } |
| if m.ClosedBy != nil { |
| gi.ClosedBy = c.github.getUser(m.ClosedBy) |
| } |
| if b := m.Closed; b != nil { |
| gi.Closed = b.Val |
| } |
| if b := m.Locked; b != nil { |
| gi.Locked = b.Val |
| } |
| if m.PullRequest { |
| gi.PullRequest = true |
| } |
| |
| gi.Assignees = c.github.setAssigneesFromProto(gi.Assignees, m.Assignees, m.DeletedAssignees) |
| |
| if m.Body != "" { |
| gi.Body = m.Body |
| } |
| if m.BodyChange != nil { |
| gi.Body = m.BodyChange.Val |
| } |
| if m.Title != "" { |
| gi.Title = m.Title |
| } |
| if len(m.RemoveLabel) > 0 || len(m.AddLabel) > 0 { |
| if gi.Labels == nil { |
| gi.Labels = make(map[int64]*GitHubLabel) |
| } |
| for _, lid := range m.RemoveLabel { |
| delete(gi.Labels, lid) |
| } |
| for _, lp := range m.AddLabel { |
| lb := gr.getOrCreateLabel(lp.Id) |
| lb.processMutation(*lp) |
| gi.Labels[lp.Id] = lb |
| } |
| } |
| |
| for _, cmut := range m.Comment { |
| if cmut.Id == 0 { |
| log.Printf("Ignoring bogus comment mutation lacking Id: %v", cmut) |
| continue |
| } |
| gc, ok := gi.comments[cmut.Id] |
| if !ok { |
| if gi.comments == nil { |
| gi.comments = make(map[int64]*GitHubComment) |
| } |
| gc = &GitHubComment{ID: cmut.Id} |
| gi.comments[gc.ID] = gc |
| } |
| if cmut.User != nil { |
| gc.User = c.github.getUser(cmut.User) |
| } |
| if cmut.Created != nil { |
| gc.Created = cmut.Created.AsTime().UTC() |
| } |
| if cmut.Updated != nil { |
| gc.Updated = cmut.Updated.AsTime().UTC() |
| } |
| if cmut.Body != "" { |
| gc.Body = cmut.Body |
| } |
| } |
| if m.CommentStatus != nil && m.CommentStatus.ServerDate != nil { |
| gi.commentsSyncedAsOf = m.CommentStatus.ServerDate.AsTime().UTC() |
| } |
| |
| for _, emut := range m.Event { |
| if emut.Id == 0 { |
| log.Printf("Ignoring bogus event mutation lacking Id: %v", emut) |
| continue |
| } |
| if gi.events == nil { |
| gi.events = make(map[int64]*GitHubIssueEvent) |
| } |
| gie := gr.newGithubEvent(emut) |
| gi.events[emut.Id] = gie |
| if gie.Created.After(gi.eventMaxTime) { |
| gi.eventMaxTime = gie.Created |
| } |
| } |
| if m.EventStatus != nil && m.EventStatus.ServerDate != nil { |
| gi.eventsSyncedAsOf = m.EventStatus.ServerDate.AsTime().UTC() |
| } |
| |
| for _, rmut := range m.Review { |
| if rmut.Id == 0 { |
| log.Printf("Ignoring bogus review mutation lacking Id: %v", rmut) |
| continue |
| } |
| if gi.reviews == nil { |
| gi.reviews = make(map[int64]*GitHubReview) |
| } |
| gre := gr.newGithubReview(rmut) |
| gi.reviews[rmut.Id] = gre |
| if gre.Created.After(gi.eventMaxTime) { |
| gi.eventMaxTime = gre.Created |
| } |
| } |
| if m.ReviewStatus != nil && m.ReviewStatus.ServerDate != nil { |
| gi.reviewsSyncedAsOf = m.ReviewStatus.ServerDate.AsTime().UTC() |
| } |
| } |
| |
| // githubCache is an httpcache.Cache wrapper that only |
| // stores responses for: |
| // - https://api.github.com/repos/$OWNER/$REPO/issues?direction=desc&page=1&sort=updated |
| // - https://api.github.com/repos/$OWNER/$REPO/milestones?page=1 |
| // - https://api.github.com/repos/$OWNER/$REPO/labels?page=1 |
| type githubCache struct { |
| httpcache.Cache |
| } |
| |
| var rxGithubCacheURLs = regexp.MustCompile(`^https://api.github.com/repos/\w+/\w+/(issues|milestones|labels)\?(.+)`) |
| |
| func cacheableURL(urlStr string) bool { |
| m := rxGithubCacheURLs.FindStringSubmatch(urlStr) |
| if m == nil { |
| return false |
| } |
| v, _ := url.ParseQuery(m[2]) |
| if v.Get("page") != "1" { |
| return false |
| } |
| switch m[1] { |
| case "issues": |
| return v.Get("sort") == "updated" && v.Get("direction") == "desc" |
| case "milestones", "labels": |
| return true |
| default: |
| panic("unexpected cache key base " + m[1]) |
| } |
| } |
| |
| func (c *githubCache) Set(urlKey string, res []byte) { |
| // TODO: verify that the httpcache package guarantees that the |
| // first string parameter to Set here is actually a |
| // URL. Empirically they appear to be. |
| if cacheableURL(urlKey) { |
| c.Cache.Set(urlKey, res) |
| } |
| } |
| |
| // sync checks for new changes on a single GitHub repository and |
| // updates the Corpus with any changes. If loop is true, it runs |
| // forever. |
| func (gr *GitHubRepo) sync(ctx context.Context, token string, loop bool) error { |
| ts := oauth2.StaticTokenSource(&oauth2.Token{AccessToken: token}) |
| hc := oauth2.NewClient(ctx, ts) |
| if tr, ok := hc.Transport.(*http.Transport); ok { |
| defer tr.CloseIdleConnections() |
| } |
| directTransport := hc.Transport |
| if gr.github.c.githubLimiter != nil { |
| directTransport = limitTransport{gr.github.c.githubLimiter, hc.Transport} |
| } |
| cachingTransport := &httpcache.Transport{ |
| Transport: directTransport, |
| Cache: &githubCache{Cache: httpcache.NewMemoryCache()}, |
| MarkCachedResponses: true, // adds "X-From-Cache: 1" response header. |
| } |
| |
| p := &githubRepoPoller{ |
| c: gr.github.c, |
| token: token, |
| gr: gr, |
| githubDirect: github.NewClient(&http.Client{Transport: directTransport}), |
| githubCaching: github.NewClient(&http.Client{Transport: cachingTransport}), |
| client: http.DefaultClient, |
| } |
| activityCh := gr.github.c.activityChan("github:" + gr.id.String()) |
| var expectChanges bool // got webhook update, but haven't seen new data yet |
| var sleepDelay time.Duration |
| for { |
| prevLastUpdate := p.lastUpdate |
| err := p.sync(ctx, expectChanges) |
| if err == context.Canceled || !loop { |
| return err |
| } |
| sawChanges := !p.lastUpdate.Equal(prevLastUpdate) |
| if sawChanges { |
| expectChanges = false |
| } |
| // If we got woken up by a webhook, sometimes |
| // immediately polling GitHub for the data results in |
| // a cache hit saying nothing's changed. Don't believe |
| // it. Polling quickly with exponential backoff until |
| // we see what we're expecting. |
| if expectChanges { |
| if sleepDelay == 0 { |
| sleepDelay = 1 * time.Second |
| } else { |
| sleepDelay *= 2 |
| if sleepDelay > 15*time.Minute { |
| sleepDelay = 15 * time.Minute |
| } |
| } |
| p.logf("expect changes; re-polling in %v", sleepDelay) |
| } else { |
| sleepDelay = 15 * time.Minute |
| } |
| p.logf("sync = %v; sleeping", err) |
| timer := time.NewTimer(sleepDelay) |
| select { |
| case <-ctx.Done(): |
| timer.Stop() |
| return ctx.Err() |
| case <-activityCh: |
| timer.Stop() |
| expectChanges = true |
| sleepDelay = 0 |
| case <-timer.C: |
| } |
| } |
| } |
| |
| type httpClient interface { |
| Do(req *http.Request) (*http.Response, error) |
| } |
| |
| // A githubRepoPoller updates the Corpus (gr.c) to have the latest |
| // version of the GitHub repo rp, using the GitHub client ghc. |
| type githubRepoPoller struct { |
| c *Corpus // shortcut for gr.github.c |
| gr *GitHubRepo |
| token string |
| lastUpdate time.Time // modified by sync |
| githubCaching *github.Client |
| githubDirect *github.Client // not caching |
| client httpClient // the client used to poll github |
| } |
| |
| func (p *githubRepoPoller) Owner() string { return p.gr.id.Owner } |
| func (p *githubRepoPoller) Repo() string { return p.gr.id.Repo } |
| |
| func (p *githubRepoPoller) logf(format string, args ...interface{}) { |
| log.Printf("sync github "+p.gr.id.String()+": "+format, args...) |
| } |
| |
| func (p *githubRepoPoller) sync(ctx context.Context, expectChanges bool) error { |
| p.logf("Beginning sync.") |
| if err := p.syncIssues(ctx, expectChanges); err != nil { |
| return err |
| } |
| if err := p.syncComments(ctx); err != nil { |
| return err |
| } |
| if err := p.syncEvents(ctx); err != nil { |
| return err |
| } |
| if err := p.syncReviews(ctx); err != nil { |
| return err |
| } |
| return nil |
| } |
| |
| func (p *githubRepoPoller) syncMilestones(ctx context.Context) error { |
| var mut *maintpb.GithubMutation // lazy init |
| var changes int |
| err := p.foreachItem(ctx, 1, p.getMilestonePage, func(e interface{}) error { |
| ms := e.(*github.Milestone) |
| id := int64(ms.GetID()) |
| p.c.mu.RLock() |
| diff := p.gr.milestones[id].GenMutationDiff(ms) |
| p.c.mu.RUnlock() |
| if diff == nil { |
| return nil |
| } |
| if mut == nil { |
| mut = &maintpb.GithubMutation{ |
| Owner: p.Owner(), |
| Repo: p.Repo(), |
| } |
| } |
| mut.Milestones = append(mut.Milestones, diff) |
| changes++ |
| return nil |
| }) |
| if err != nil { |
| return err |
| } |
| p.logf("%d milestone changes.", changes) |
| if changes == 0 { |
| return nil |
| } |
| p.c.addMutation(&maintpb.Mutation{Github: mut}) |
| return nil |
| } |
| |
| func (p *githubRepoPoller) syncLabels(ctx context.Context) error { |
| var mut *maintpb.GithubMutation // lazy init |
| var changes int |
| err := p.foreachItem(ctx, 1, p.getLabelPage, func(e interface{}) error { |
| lb := e.(*github.Label) |
| id := int64(lb.GetID()) |
| p.c.mu.RLock() |
| diff := p.gr.labels[id].GenMutationDiff(lb) |
| p.c.mu.RUnlock() |
| if diff == nil { |
| return nil |
| } |
| if mut == nil { |
| mut = &maintpb.GithubMutation{ |
| Owner: p.Owner(), |
| Repo: p.Repo(), |
| } |
| } |
| mut.Labels = append(mut.Labels, diff) |
| changes++ |
| return nil |
| }) |
| if err != nil { |
| return err |
| } |
| p.logf("%d label changes.", changes) |
| if changes == 0 { |
| return nil |
| } |
| p.c.addMutation(&maintpb.Mutation{Github: mut}) |
| return nil |
| } |
| |
| func (p *githubRepoPoller) getMilestonePage(ctx context.Context, page int) ([]interface{}, *github.Response, error) { |
| ms, res, err := p.githubCaching.Issues.ListMilestones(ctx, p.Owner(), p.Repo(), &github.MilestoneListOptions{ |
| State: "all", |
| ListOptions: github.ListOptions{Page: page}, |
| }) |
| if err != nil { |
| return nil, nil, err |
| } |
| its := make([]interface{}, len(ms)) |
| for i, m := range ms { |
| its[i] = m |
| } |
| return its, res, err |
| } |
| |
| func (p *githubRepoPoller) getLabelPage(ctx context.Context, page int) ([]interface{}, *github.Response, error) { |
| ls, res, err := p.githubCaching.Issues.ListLabels(ctx, p.Owner(), p.Repo(), &github.ListOptions{ |
| Page: page, |
| }) |
| if err != nil { |
| return nil, nil, err |
| } |
| its := make([]interface{}, len(ls)) |
| for i, lb := range ls { |
| its[i] = lb |
| } |
| return its, res, err |
| } |
| |
| // foreachItem walks over all pages of items from getPage and calls fn for each item. |
| // If the first page's response was cached, fn is never called. |
| func (p *githubRepoPoller) foreachItem( |
| ctx context.Context, |
| page int, |
| getPage func(ctx context.Context, page int) ([]interface{}, *github.Response, error), |
| fn func(interface{}) error) error { |
| for { |
| select { |
| case <-ctx.Done(): |
| return ctx.Err() |
| default: |
| } |
| items, res, err := getPage(ctx, page) |
| if err != nil { |
| if canRetry(ctx, err) { |
| continue |
| } |
| return err |
| } |
| if len(items) == 0 { |
| return nil |
| } |
| fromCache := page == 1 && res.Response.Header.Get(xFromCache) == "1" |
| if fromCache { |
| log.Printf("no new items of type %T", items[0]) |
| // No need to walk over these again. |
| return nil |
| } |
| // TODO: use res.Rate (sleep until Reset if Limit == 0) |
| for _, it := range items { |
| if err := fn(it); err != nil { |
| return err |
| } |
| } |
| if res.NextPage == 0 { |
| return nil |
| } |
| page = res.NextPage |
| } |
| } |
| |
| func (p *githubRepoPoller) syncIssues(ctx context.Context, expectChanges bool) error { |
| page := 1 |
| seen := make(map[int64]bool) |
| keepGoing := true |
| owner, repo := p.gr.id.Owner, p.gr.id.Repo |
| for keepGoing { |
| ghc := p.githubCaching |
| if expectChanges { |
| ghc = p.githubDirect |
| } |
| issues, res, err := ghc.Issues.ListByRepo(ctx, owner, repo, &github.IssueListByRepoOptions{ |
| State: "all", |
| Sort: "updated", |
| Direction: "desc", |
| ListOptions: github.ListOptions{ |
| Page: page, |
| PerPage: 100, |
| }, |
| }) |
| if err != nil { |
| if canRetry(ctx, err) { |
| continue |
| } |
| return err |
| } |
| // See https://developer.github.com/v3/activity/events/ for X-Poll-Interval: |
| if pi := res.Response.Header.Get("X-Poll-Interval"); pi != "" { |
| nsec, _ := strconv.Atoi(pi) |
| d := time.Duration(nsec) * time.Second |
| p.logf("Requested to adjust poll interval to %v", d) |
| // TODO: return an error type up that the sync loop can use |
| // to adjust its default interval. |
| // For now, ignore. |
| } |
| fromCache := res.Response.Header.Get(xFromCache) == "1" |
| if len(issues) == 0 { |
| p.logf("issues: reached end.") |
| break |
| } |
| |
| didMilestoneLabelSync := false |
| changes := 0 |
| for _, is := range issues { |
| id := int64(is.GetID()) |
| if seen[id] { |
| // If an issue gets updated (and bumped to the top) while we |
| // are paging, it's possible the last issue from page N can |
| // appear as the first issue on page N+1. Don't process that |
| // issue twice. |
| // https://github.com/google/go-github/issues/566 |
| continue |
| } |
| seen[id] = true |
| |
| var mp *maintpb.Mutation |
| p.c.mu.RLock() |
| { |
| gi := p.gr.issues[int32(*is.Number)] |
| mp = p.gr.newMutationFromIssue(gi, is) |
| } |
| p.c.mu.RUnlock() |
| |
| if mp == nil { |
| continue |
| } |
| |
| // If there's something new (not a cached response), |
| // then check for updated milestones and labels before |
| // creating issue mutations below. Doesn't matter |
| // much, but helps to have it all loaded. |
| if !fromCache && !didMilestoneLabelSync { |
| didMilestoneLabelSync = true |
| group, ctx := errgroup.WithContext(ctx) |
| group.Go(func() error { return p.syncMilestones(ctx) }) |
| group.Go(func() error { return p.syncLabels(ctx) }) |
| if err := group.Wait(); err != nil { |
| return err |
| } |
| } |
| |
| changes++ |
| p.logf("changed issue %d: %s", is.GetNumber(), is.GetTitle()) |
| p.c.addMutation(mp) |
| p.lastUpdate = time.Now() |
| } |
| |
| if changes == 0 { |
| missing := p.gr.missingIssues() |
| if len(missing) == 0 { |
| p.logf("no changed issues; cached=%v", fromCache) |
| return nil |
| } |
| if len(missing) > 0 { |
| p.logf("%d missing github issues.", len(missing)) |
| } |
| if len(missing) < 100 { |
| keepGoing = false |
| } |
| } |
| |
| p.c.mu.RLock() |
| num := len(p.gr.issues) |
| p.c.mu.RUnlock() |
| p.logf("After page %d: %v issues, %v changes, %v issues in memory", page, len(issues), changes, num) |
| |
| page++ |
| } |
| |
| missing := p.gr.missingIssues() |
| if len(missing) > 0 { |
| p.logf("remaining issues: %v", missing) |
| for _, num := range missing { |
| p.logf("getting issue %v ...", num) |
| var issue *github.Issue |
| var err error |
| for { |
| issue, _, err = p.githubDirect.Issues.Get(ctx, owner, repo, int(num)) |
| if canRetry(ctx, err) { |
| continue |
| } |
| break |
| } |
| if ge, ok := err.(*github.ErrorResponse); ok && (ge.Response.StatusCode == http.StatusNotFound || ge.Response.StatusCode == http.StatusGone) { |
| mut := &maintpb.Mutation{ |
| GithubIssue: &maintpb.GithubIssueMutation{ |
| Owner: owner, |
| Repo: repo, |
| Number: num, |
| NotExist: true, |
| }, |
| } |
| p.logf("issue %d is gone, marking as NotExist", num) |
| p.c.addMutation(mut) |
| continue |
| } else if err != nil { |
| return err |
| } |
| mp := p.gr.newMutationFromIssue(nil, issue) |
| if mp == nil { |
| continue |
| } |
| p.logf("modified issue %d: %s", issue.GetNumber(), issue.GetTitle()) |
| p.c.addMutation(mp) |
| p.lastUpdate = time.Now() |
| } |
| } |
| |
| return nil |
| } |
| |
| func (p *githubRepoPoller) issueNumbersWithStaleCommentSync() (issueNums []int32) { |
| p.c.mu.RLock() |
| defer p.c.mu.RUnlock() |
| |
| for n, gi := range p.gr.issues { |
| if !gi.commentsSynced() { |
| issueNums = append(issueNums, n) |
| } |
| } |
| sort.Slice(issueNums, func(i, j int) bool { |
| return issueNums[i] < issueNums[j] |
| }) |
| return issueNums |
| } |
| |
| func (p *githubRepoPoller) syncComments(ctx context.Context) error { |
| for { |
| nums := p.issueNumbersWithStaleCommentSync() |
| if len(nums) == 0 { |
| return nil |
| } |
| remain := len(nums) |
| for _, num := range nums { |
| p.logf("comment sync: %d issues remaining; syncing issue %v", remain, num) |
| if err := p.syncCommentsOnIssue(ctx, num); err != nil { |
| p.logf("comment sync on issue %d: %v", num, err) |
| return err |
| } |
| remain-- |
| } |
| } |
| } |
| |
| func (p *githubRepoPoller) syncCommentsOnIssue(ctx context.Context, issueNum int32) error { |
| p.c.mu.RLock() |
| issue := p.gr.issues[issueNum] |
| if issue == nil { |
| p.c.mu.RUnlock() |
| return fmt.Errorf("unknown issue number %v", issueNum) |
| } |
| since := issue.commentsUpdatedTil |
| p.c.mu.RUnlock() |
| |
| owner, repo := p.gr.id.Owner, p.gr.id.Repo |
| morePages := true // at least try the first. might be empty. |
| for morePages { |
| opt := &github.IssueListCommentsOptions{ |
| Direction: github.String("asc"), |
| Sort: github.String("updated"), |
| ListOptions: github.ListOptions{PerPage: 100}, |
| } |
| if !since.IsZero() { |
| opt.Since = &since |
| } |
| ics, res, err := p.githubDirect.Issues.ListComments(ctx, owner, repo, int(issueNum), opt) |
| if canRetry(ctx, err) { |
| continue |
| } else if ge, ok := err.(*github.ErrorResponse); ok && (ge.Response.StatusCode == http.StatusNotFound || ge.Response.StatusCode == http.StatusGone) { |
| mut := &maintpb.Mutation{ |
| GithubIssue: &maintpb.GithubIssueMutation{ |
| Owner: owner, |
| Repo: repo, |
| Number: issueNum, |
| NotExist: true, |
| }, |
| } |
| p.logf("issue %d comments are gone, marking as NotExist", issueNum) |
| p.c.addMutation(mut) |
| return nil |
| } else if err != nil { |
| return err |
| } |
| serverDate, err := http.ParseTime(res.Header.Get("Date")) |
| if err != nil { |
| return fmt.Errorf("invalid server Date response: %v", err) |
| } |
| serverDate = serverDate.UTC() |
| p.logf("Number of comments on issue %d since %v: %v", issueNum, since, len(ics)) |
| |
| mut := &maintpb.Mutation{ |
| GithubIssue: &maintpb.GithubIssueMutation{ |
| Owner: owner, |
| Repo: repo, |
| Number: issueNum, |
| }, |
| } |
| |
| p.c.mu.RLock() |
| for _, ic := range ics { |
| if ic.ID == nil || ic.Body == nil || ic.User == nil || ic.CreatedAt == nil || ic.UpdatedAt == nil { |
| // Bogus. |
| p.logf("bogus comment: %v", ic) |
| continue |
| } |
| created := timestamppb.New(*ic.CreatedAt) |
| updated := timestamppb.New(*ic.UpdatedAt) |
| since = *ic.UpdatedAt // for next round |
| |
| id := int64(*ic.ID) |
| cur := issue.comments[id] |
| |
| // TODO: does a reaction update a comment's UpdatedAt time? |
| var cmut *maintpb.GithubIssueCommentMutation |
| if cur == nil { |
| cmut = &maintpb.GithubIssueCommentMutation{ |
| Id: id, |
| User: &maintpb.GithubUser{ |
| Id: int64(*ic.User.ID), |
| Login: *ic.User.Login, |
| }, |
| Body: *ic.Body, |
| Created: created, |
| Updated: updated, |
| } |
| } else if !cur.Updated.Equal(*ic.UpdatedAt) || cur.Body != *ic.Body { |
| cmut = &maintpb.GithubIssueCommentMutation{ |
| Id: id, |
| } |
| if !cur.Updated.Equal(*ic.UpdatedAt) { |
| cmut.Updated = updated |
| } |
| if cur.Body != *ic.Body { |
| cmut.Body = *ic.Body |
| } |
| } |
| if cmut != nil { |
| mut.GithubIssue.Comment = append(mut.GithubIssue.Comment, cmut) |
| } |
| } |
| p.c.mu.RUnlock() |
| |
| if res.NextPage == 0 { |
| mut.GithubIssue.CommentStatus = &maintpb.GithubIssueSyncStatus{ |
| ServerDate: timestamppb.New(serverDate), |
| } |
| morePages = false |
| } |
| |
| p.c.addMutation(mut) |
| } |
| return nil |
| } |
| |
| func (p *githubRepoPoller) issueNumbersWithStaleEventSync() (issueNums []int32) { |
| p.c.mu.RLock() |
| defer p.c.mu.RUnlock() |
| |
| for n, gi := range p.gr.issues { |
| if !gi.eventsSynced() { |
| issueNums = append(issueNums, n) |
| } |
| } |
| sort.Slice(issueNums, func(i, j int) bool { |
| return issueNums[i] < issueNums[j] |
| }) |
| return issueNums |
| } |
| |
| func (p *githubRepoPoller) syncEvents(ctx context.Context) error { |
| for { |
| nums := p.issueNumbersWithStaleEventSync() |
| if len(nums) == 0 { |
| return nil |
| } |
| remain := len(nums) |
| for _, num := range nums { |
| p.logf("event sync: %d issues remaining; syncing issue %v", remain, num) |
| if err := p.syncEventsOnIssue(ctx, num); err != nil { |
| p.logf("event sync on issue %d: %v", num, err) |
| return err |
| } |
| remain-- |
| } |
| } |
| } |
| |
| func (p *githubRepoPoller) syncEventsOnIssue(ctx context.Context, issueNum int32) error { |
| const perPage = 100 |
| p.c.mu.RLock() |
| gi := p.gr.issues[issueNum] |
| if gi == nil { |
| panic(fmt.Sprintf("bogus issue %v", issueNum)) |
| } |
| have := len(gi.events) |
| p.c.mu.RUnlock() |
| |
| skipPages := have / perPage |
| |
| mut := &maintpb.Mutation{ |
| GithubIssue: &maintpb.GithubIssueMutation{ |
| Owner: p.Owner(), |
| Repo: p.Repo(), |
| Number: issueNum, |
| }, |
| } |
| |
| err := p.foreachItem(ctx, |
| 1+skipPages, |
| func(ctx context.Context, page int) ([]interface{}, *github.Response, error) { |
| u := fmt.Sprintf("https://api.github.com/repos/%s/%s/issues/%v/events?per_page=%v&page=%v", |
| p.Owner(), p.Repo(), issueNum, perPage, page) |
| req, _ := http.NewRequest("GET", u, nil) |
| |
| req.Header.Set("Authorization", "Bearer "+p.token) |
| req.Header.Set("User-Agent", "golang-x-build-maintner/1.0") |
| ctx, cancel := context.WithTimeout(ctx, time.Minute) |
| defer cancel() |
| req = req.WithContext(ctx) |
| res, err := p.client.Do(req) |
| if err != nil { |
| log.Printf("Fetching %s: %v", u, err) |
| return nil, nil, err |
| } |
| log.Printf("Fetching %s: %v", u, res.Status) |
| ghResp := makeGithubResponse(res) |
| if err := github.CheckResponse(res); err != nil { |
| log.Printf("Fetching %s: %v: %+v", u, res.Status, res.Header) |
| log.Printf("GitHub error %s: %v", u, ghResp) |
| return nil, nil, err |
| } |
| |
| evts, err := parseGithubEvents(res.Body) |
| if err != nil { |
| return nil, nil, fmt.Errorf("%s: parse github events: %v", u, err) |
| } |
| is := make([]interface{}, len(evts)) |
| for i, v := range evts { |
| is[i] = v |
| } |
| serverDate, err := http.ParseTime(res.Header.Get("Date")) |
| if err != nil { |
| return nil, nil, fmt.Errorf("invalid server Date response: %v", err) |
| } |
| mut.GithubIssue.EventStatus = &maintpb.GithubIssueSyncStatus{ |
| ServerDate: timestamppb.New(serverDate.UTC()), |
| } |
| |
| return is, ghResp, err |
| }, |
| func(v interface{}) error { |
| ge := v.(*GitHubIssueEvent) |
| p.c.mu.RLock() |
| _, ok := gi.events[ge.ID] |
| p.c.mu.RUnlock() |
| if ok { |
| // Already have it. And they're |
| // assumed to be immutable, so the |
| // copy we already have should be |
| // good. Don't add to mutation log. |
| return nil |
| } |
| mut.GithubIssue.Event = append(mut.GithubIssue.Event, ge.Proto()) |
| return nil |
| }) |
| if err != nil { |
| return err |
| } |
| p.c.addMutation(mut) |
| return nil |
| } |
| |
| // parseGithubEvents parses the JSON array of GitHub issue events in r. |
| // It does this the very manual way (using map[string]interface{}) |
| // instead of using nice types because https://golang.org/issue/15314 |
| // isn't implemented yet and also because even if it were implemented, |
| // this code still wants to preserve any unknown fields to store in |
| // the "OtherJSON" field for future updates of the code to parse. (If |
| // GitHub adds new Event types in the future, we want to archive them, |
| // even if we don't understand them) |
| func parseGithubEvents(r io.Reader) ([]*GitHubIssueEvent, error) { |
| var jevents []map[string]interface{} |
| jd := json.NewDecoder(r) |
| jd.UseNumber() |
| if err := jd.Decode(&jevents); err != nil { |
| return nil, err |
| } |
| var evts []*GitHubIssueEvent |
| for _, em := range jevents { |
| for k, v := range em { |
| if v == nil { |
| delete(em, k) |
| } |
| } |
| delete(em, "url") |
| |
| e := &GitHubIssueEvent{} |
| |
| e.Type, _ = em["event"].(string) |
| delete(em, "event") |
| |
| e.ID = jint64(em["id"]) |
| delete(em, "id") |
| |
| // TODO: store these two more compactly: |
| e.CommitID, _ = em["commit_id"].(string) // "5383ecf5a0824649ffcc0349f00f0317575753d0" |
| delete(em, "commit_id") |
| e.CommitURL, _ = em["commit_url"].(string) // "https://api.github.com/repos/bradfitz/go-issue-mirror/commits/5383ecf5a0824649ffcc0349f00f0317575753d0" |
| delete(em, "commit_url") |
| |
| getUser := func(field string, gup **GitHubUser) { |
| am, ok := em[field].(map[string]interface{}) |
| if !ok { |
| return |
| } |
| delete(em, field) |
| gu := &GitHubUser{ID: jint64(am["id"])} |
| gu.Login, _ = am["login"].(string) |
| *gup = gu |
| } |
| |
| getUser("actor", &e.Actor) |
| getUser("assignee", &e.Assignee) |
| getUser("assigner", &e.Assigner) |
| getUser("requested_reviewer", &e.Reviewer) |
| getUser("review_requester", &e.ReviewRequester) |
| |
| if lm, ok := em["label"].(map[string]interface{}); ok { |
| delete(em, "label") |
| e.Label, _ = lm["name"].(string) |
| } |
| |
| if mm, ok := em["milestone"].(map[string]interface{}); ok { |
| delete(em, "milestone") |
| e.Milestone, _ = mm["title"].(string) |
| } |
| |
| if rm, ok := em["rename"].(map[string]interface{}); ok { |
| delete(em, "rename") |
| e.From, _ = rm["from"].(string) |
| e.To, _ = rm["to"].(string) |
| } |
| |
| if createdStr, ok := em["created_at"].(string); ok { |
| delete(em, "created_at") |
| var err error |
| e.Created, err = time.Parse(time.RFC3339, createdStr) |
| if err != nil { |
| return nil, err |
| } |
| e.Created = e.Created.UTC() |
| } |
| if dr, ok := em["dismissed_review"]; ok { |
| delete(em, "dismissed_review") |
| drm := dr.(map[string]interface{}) |
| dro := &GitHubDismissedReviewEvent{} |
| dro.ReviewID = jint64(drm["review_id"]) |
| if state, ok := drm["state"].(string); ok { |
| dro.State = state |
| } else { |
| log.Printf("got type %T for 'state' field, expected string in %+v", drm["state"], drm) |
| } |
| dro.DismissalMessage, _ = drm["dismissal_message"].(string) |
| e.DismissedReview = dro |
| } |
| if rt, ok := em["requested_team"]; ok { |
| delete(em, "requested_team") |
| rtm, ok := rt.(map[string]interface{}) |
| if !ok { |
| log.Printf("got value %+v for 'requested_team' field, wanted a map with 'id' and 'slug' fields", rt) |
| } else { |
| t := &GitHubTeam{} |
| t.ID = jint64(rtm["id"]) |
| t.Slug, _ = rtm["slug"].(string) |
| e.TeamReviewer = t |
| } |
| } |
| delete(em, "node_id") // GitHub API v4 Global Node ID; don't store it. |
| delete(em, "lock_reason") // Not stored. |
| |
| otherJSON, _ := json.Marshal(em) |
| e.OtherJSON = string(otherJSON) |
| if e.OtherJSON == "{}" { |
| e.OtherJSON = "" |
| } |
| if e.OtherJSON != "" { |
| log.Printf("warning: storing unknown field(s) in GitHub issue event: %s", e.OtherJSON) |
| } |
| evts = append(evts, e) |
| } |
| return evts, nil |
| } |
| |
| func (p *githubRepoPoller) issueNumbersWithStaleReviewsSync() (issueNums []int32) { |
| p.c.mu.RLock() |
| defer p.c.mu.RUnlock() |
| |
| for n, gi := range p.gr.issues { |
| if gi.PullRequest && !gi.reviewsSynced() { |
| issueNums = append(issueNums, n) |
| } |
| } |
| sort.Slice(issueNums, func(i, j int) bool { |
| return issueNums[i] < issueNums[j] |
| }) |
| return issueNums |
| } |
| |
| func (p *githubRepoPoller) syncReviews(ctx context.Context) error { |
| for { |
| nums := p.issueNumbersWithStaleReviewsSync() |
| if len(nums) == 0 { |
| return nil |
| } |
| remain := len(nums) |
| for _, num := range nums { |
| p.logf("reviews sync: %d issues remaining; syncing issue %v", remain, num) |
| if err := p.syncReviewsOnPullRequest(ctx, num); err != nil { |
| p.logf("review sync on issue %d: %v", num, err) |
| return err |
| } |
| remain-- |
| } |
| } |
| } |
| |
| func (p *githubRepoPoller) syncReviewsOnPullRequest(ctx context.Context, issueNum int32) error { |
| const perPage = 100 |
| p.c.mu.RLock() |
| gi := p.gr.issues[issueNum] |
| if gi == nil { |
| p.c.mu.RUnlock() |
| panic(fmt.Sprintf("bogus issue %v", issueNum)) |
| } |
| |
| if !gi.PullRequest { |
| p.c.mu.RUnlock() |
| return nil |
| } |
| |
| have := len(gi.reviews) |
| p.c.mu.RUnlock() |
| |
| skipPages := have / perPage |
| |
| mut := &maintpb.Mutation{ |
| GithubIssue: &maintpb.GithubIssueMutation{ |
| Owner: p.Owner(), |
| Repo: p.Repo(), |
| Number: issueNum, |
| }, |
| } |
| |
| err := p.foreachItem(ctx, |
| 1+skipPages, |
| func(ctx context.Context, page int) ([]interface{}, *github.Response, error) { |
| u := fmt.Sprintf("https://api.github.com/repos/%s/%s/pulls/%v/reviews?per_page=%v&page=%v", |
| p.Owner(), p.Repo(), issueNum, perPage, page) |
| req, _ := http.NewRequest("GET", u, nil) |
| |
| req.Header.Set("Authorization", "Bearer "+p.token) |
| req.Header.Set("User-Agent", "golang-x-build-maintner/1.0") |
| ctx, cancel := context.WithTimeout(ctx, time.Minute) |
| defer cancel() |
| req = req.WithContext(ctx) |
| res, err := http.DefaultClient.Do(req) |
| if err != nil { |
| log.Printf("Fetching %s: %v", u, err) |
| return nil, nil, err |
| } |
| log.Printf("Fetching %s: %v", u, res.Status) |
| ghResp := makeGithubResponse(res) |
| if err := github.CheckResponse(res); err != nil { |
| log.Printf("Fetching %s: %v: %+v", u, res.Status, res.Header) |
| log.Printf("GitHub error %s: %v", u, ghResp) |
| return nil, nil, err |
| } |
| evts, err := parseGithubReviews(res.Body) |
| if err != nil { |
| return nil, nil, fmt.Errorf("%s: parse github pr reviews: %v", u, err) |
| } |
| is := make([]interface{}, len(evts)) |
| for i, v := range evts { |
| is[i] = v |
| } |
| serverDate, err := http.ParseTime(res.Header.Get("Date")) |
| if err != nil { |
| return nil, nil, fmt.Errorf("invalid server Date response: %v", err) |
| } |
| mut.GithubIssue.ReviewStatus = &maintpb.GithubIssueSyncStatus{ |
| ServerDate: timestamppb.New(serverDate.UTC()), |
| } |
| |
| return is, ghResp, err |
| }, |
| func(v interface{}) error { |
| ge := v.(*GitHubReview) |
| p.c.mu.RLock() |
| _, ok := gi.reviews[ge.ID] |
| p.c.mu.RUnlock() |
| if ok { |
| // Already have it. And they're |
| // assumed to be immutable, so the |
| // copy we already have should be |
| // good. Don't add to mutation log. |
| return nil |
| } |
| mut.GithubIssue.Review = append(mut.GithubIssue.Review, ge.Proto()) |
| return nil |
| }) |
| if err != nil { |
| return err |
| } |
| p.c.addMutation(mut) |
| return nil |
| } |
| |
| // parseGithubReviews parses the JSON array of GitHub reviews in r. |
| // It does this the very manual way (using map[string]interface{}) |
| // instead of using nice types because https://golang.org/issue/15314 |
| // isn't implemented yet and also because even if it were implemented, |
| // this code still wants to preserve any unknown fields to store in |
| // the "OtherJSON" field for future updates of the code to parse. (If |
| // GitHub adds new Event types in the future, we want to archive them, |
| // even if we don't understand them) |
| func parseGithubReviews(r io.Reader) ([]*GitHubReview, error) { |
| var jevents []map[string]interface{} |
| jd := json.NewDecoder(r) |
| jd.UseNumber() |
| if err := jd.Decode(&jevents); err != nil { |
| return nil, err |
| } |
| var evts []*GitHubReview |
| for _, em := range jevents { |
| for k, v := range em { |
| if v == nil { |
| delete(em, k) |
| } |
| } |
| |
| e := &GitHubReview{} |
| |
| e.ID = jint64(em["id"]) |
| delete(em, "id") |
| |
| e.Body, _ = em["body"].(string) |
| delete(em, "body") |
| |
| e.State, _ = em["state"].(string) |
| delete(em, "state") |
| |
| // TODO: store these two more compactly: |
| e.CommitID, _ = em["commit_id"].(string) // "5383ecf5a0824649ffcc0349f00f0317575753d0" |
| delete(em, "commit_id") |
| |
| getUser := func(field string, gup **GitHubUser) { |
| am, ok := em[field].(map[string]interface{}) |
| if !ok { |
| return |
| } |
| delete(em, field) |
| gu := &GitHubUser{ID: jint64(am["id"])} |
| gu.Login, _ = am["login"].(string) |
| *gup = gu |
| } |
| |
| getUser("user", &e.Actor) |
| |
| e.ActorAssociation, _ = em["author_association"].(string) |
| delete(em, "author_association") |
| |
| if createdStr, ok := em["submitted_at"].(string); ok { |
| delete(em, "submitted_at") |
| var err error |
| e.Created, err = time.Parse(time.RFC3339, createdStr) |
| if err != nil { |
| return nil, err |
| } |
| e.Created = e.Created.UTC() |
| } |
| |
| delete(em, "node_id") // GitHub API v4 Global Node ID; don't store it. |
| delete(em, "html_url") // not needed. |
| delete(em, "pull_request_url") // not needed. |
| delete(em, "_links") // not needed. (duplicate data of above two nodes) |
| |
| otherJSON, _ := json.Marshal(em) |
| e.OtherJSON = string(otherJSON) |
| if e.OtherJSON == "{}" { |
| e.OtherJSON = "" |
| } |
| if e.OtherJSON != "" { |
| log.Printf("warning: storing unknown field(s) in GitHub review: %s", e.OtherJSON) |
| } |
| evts = append(evts, e) |
| } |
| return evts, nil |
| } |
| |
| // jint64 return an int64 from the provided JSON object value v. |
| func jint64(v interface{}) int64 { |
| switch v := v.(type) { |
| case nil: |
| return 0 |
| case json.Number: |
| n, _ := strconv.ParseInt(string(v), 10, 64) |
| return n |
| default: |
| panic(fmt.Sprintf("unexpected type %T", v)) |
| } |
| } |
| |
| // copy of go-github's parseRate, basically. |
| func parseRate(r *http.Response) github.Rate { |
| var rate github.Rate |
| // Note: even though the header names below are not canonical (the |
| // canonical form would be X-Ratelimit-Limit), this particular |
| // casing is what GitHub returns. See headerRateRemaining in |
| // package go-github. |
| if limit := r.Header.Get("X-RateLimit-Limit"); limit != "" { |
| rate.Limit, _ = strconv.Atoi(limit) |
| } |
| if remaining := r.Header.Get("X-RateLimit-Remaining"); remaining != "" { |
| rate.Remaining, _ = strconv.Atoi(remaining) |
| } |
| if reset := r.Header.Get("X-RateLimit-Reset"); reset != "" { |
| if v, _ := strconv.ParseInt(reset, 10, 64); v != 0 { |
| rate.Reset = github.Timestamp{time.Unix(v, 0)} |
| } |
| } |
| return rate |
| } |
| |
| // Copy of go-github's func newResponse, basically. |
| func makeGithubResponse(res *http.Response) *github.Response { |
| gr := &github.Response{Response: res} |
| gr.Rate = parseRate(res) |
| for _, lv := range res.Header["Link"] { |
| for _, link := range strings.Split(lv, ",") { |
| segs := strings.Split(strings.TrimSpace(link), ";") |
| if len(segs) < 2 { |
| continue |
| } |
| // ensure href is properly formatted |
| if !strings.HasPrefix(segs[0], "<") || !strings.HasSuffix(segs[0], ">") { |
| continue |
| } |
| |
| // try to pull out page parameter |
| u, err := url.Parse(segs[0][1 : len(segs[0])-1]) |
| if err != nil { |
| continue |
| } |
| page := u.Query().Get("page") |
| if page == "" { |
| continue |
| } |
| |
| for _, seg := range segs[1:] { |
| switch strings.TrimSpace(seg) { |
| case `rel="next"`: |
| gr.NextPage, _ = strconv.Atoi(page) |
| case `rel="prev"`: |
| gr.PrevPage, _ = strconv.Atoi(page) |
| case `rel="first"`: |
| gr.FirstPage, _ = strconv.Atoi(page) |
| case `rel="last"`: |
| gr.LastPage, _ = strconv.Atoi(page) |
| } |
| } |
| } |
| } |
| return gr |
| } |
| |
| var rxReferences = regexp.MustCompile(`(?:\b([\w\-]+)/([\w\-]+))?\#(\d+)\b`) |
| |
| // parseGithubRefs parses references to GitHub issues from commit message commitMsg. |
| // Multiple references to the same issue are deduplicated. |
| func (c *Corpus) parseGithubRefs(gerritProj string, commitMsg string) []GitHubIssueRef { |
| // Use of rxReferences by itself caused this function to take 20% of the CPU time. |
| // TODO(bradfitz): stop using regexps here. |
| // But in the meantime, help the regexp engine with this one weird trick: |
| // Reduce the length of the string given to FindAllStringSubmatch. |
| // Discard all lines before the first line containing a '#'. |
| // The "Fixes #nnnn" is usually at the end, so this discards most of the input. |
| // Now CPU is only 2% instead of 20%. |
| hash := strings.IndexByte(commitMsg, '#') |
| if hash == -1 { |
| return nil |
| } |
| nl := strings.LastIndexByte(commitMsg[:hash], '\n') |
| commitMsg = commitMsg[nl+1:] |
| |
| // TODO: use FindAllStringSubmatchIndex instead, so we can |
| // back up and see what's behind it and ignore "#1", "#2", |
| // "#3" 'references' which are actually bullets or ARM |
| // disassembly, and only respect them as real if they have the |
| // word "Fixes " or "Issue " or similar before them. |
| ms := rxReferences.FindAllStringSubmatch(commitMsg, -1) |
| if len(ms) == 0 { |
| return nil |
| } |
| /* e.g. |
| 2017/03/30 21:42:07 matches: [["golang/go#9327" "golang" "go" "9327"]] |
| 2017/03/30 21:42:07 matches: [["golang/go#16512" "golang" "go" "16512"] ["golang/go#18404" "golang" "go" "18404"]] |
| 2017/03/30 21:42:07 matches: [["#1" "" "" "1"]] |
| 2017/03/30 21:42:07 matches: [["#10234" "" "" "10234"]] |
| 2017/03/30 21:42:31 matches: [["GoogleCloudPlatform/gcloud-golang#262" "GoogleCloudPlatform" "gcloud-golang" "262"]] |
| 2017/03/30 21:42:31 matches: [["GoogleCloudPlatform/google-cloud-go#481" "GoogleCloudPlatform" "google-cloud-go" "481"]] |
| */ |
| c.initGithub() |
| github := c.GitHub() |
| refs := make([]GitHubIssueRef, 0, len(ms)) |
| for _, m := range ms { |
| owner, repo, numStr := strings.ToLower(m[1]), strings.ToLower(m[2]), m[3] |
| num, err := strconv.ParseInt(numStr, 10, 32) |
| if err != nil { |
| continue |
| } |
| if owner == "" { |
| if gerritProj == "go.googlesource.com/go" { |
| owner, repo = "golang", "go" |
| } else { |
| continue |
| } |
| } |
| ref := GitHubIssueRef{github.getOrCreateRepo(owner, repo), int32(num)} |
| if contains(refs, ref) { |
| continue |
| } |
| refs = append(refs, ref) |
| } |
| return refs |
| } |
| |
| // contains reports whether refs contains the reference ref. |
| func contains(refs []GitHubIssueRef, ref GitHubIssueRef) bool { |
| for _, r := range refs { |
| if r == ref { |
| return true |
| } |
| } |
| return false |
| } |
| |
| type limitTransport struct { |
| limiter *rate.Limiter |
| base http.RoundTripper |
| } |
| |
| func (t limitTransport) RoundTrip(r *http.Request) (*http.Response, error) { |
| limiter := t.limiter |
| // NOTE(cbro): limiter should not be nil, but check defensively. |
| if limiter != nil { |
| if err := limiter.Wait(r.Context()); err != nil { |
| return nil, err |
| } |
| } |
| return t.base.RoundTrip(r) |
| } |
| |
| // canRetry reports whether ctx hasn't been canceled and err is a non-nil retryable error. |
| // If so, it blocks until enough time passes so that it's acceptable to retry immediately. |
| func canRetry(ctx context.Context, err error) bool { |
| switch e := err.(type) { |
| case *github.RateLimitError: |
| log.Printf("GitHub rate limit error: %s, waiting until %s", e.Message, e.Rate.Reset.Time) |
| ctx, cancel := context.WithDeadline(ctx, e.Rate.Reset.Time) |
| defer cancel() |
| <-ctx.Done() |
| return ctx.Err() != context.Canceled |
| case *github.AbuseRateLimitError: |
| if e.RetryAfter != nil { |
| log.Printf("GitHub rate abuse error: %s, waiting for %s", e.Message, *e.RetryAfter) |
| ctx, cancel := context.WithTimeout(ctx, *e.RetryAfter) |
| defer cancel() |
| <-ctx.Done() |
| return ctx.Err() != context.Canceled |
| } |
| log.Printf("GitHub rate abuse error: %s", e.Message) |
| } |
| return false |
| } |