| // Copyright 2015 The Go Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package main |
| |
| /* |
| This file implements reverse buildlets. These are buildlets that are not |
| started by the coordinator. They dial the coordinator and then accept |
| instructions. This feature is used for machines that cannot be started by |
| an API, for example real OS X machines with iOS and Android devices attached. |
| |
| You can test this setup locally. In one terminal start a coordinator. |
| It will default to dev mode, using a dummy TLS cert and not talking to GCE. |
| |
| $ coordinator |
| |
| In another terminal, start a reverse buildlet: |
| |
| $ buildlet -reverse "darwin-amd64" |
| |
| It will dial and register itself with the coordinator. To confirm the |
| coordinator can see the buildlet, check the logs output or visit its |
| diagnostics page: https://localhost:8119. To send the buildlet some |
| work, go to: |
| |
| https://localhost:8119/dosomework |
| */ |
| |
| import ( |
| "bufio" |
| "errors" |
| "fmt" |
| "io" |
| "log" |
| "net" |
| "net/http" |
| "sort" |
| "sync" |
| "time" |
| |
| "golang.org/x/build/buildlet" |
| ) |
| |
| const minBuildletVersion = 1 |
| |
| var reversePool = &reverseBuildletPool{ |
| buildletReturned: make(chan token, 1), |
| } |
| |
| func init() { |
| go func() { |
| for { |
| time.Sleep(5 * time.Second) |
| reversePool.reverseHealthCheck() |
| } |
| }() |
| } |
| |
| type token struct{} |
| |
| type reverseBuildletPool struct { |
| buildletReturned chan token // best-effort tickle when any buildlet becomes free |
| |
| mu sync.Mutex // guards buildlets and their fields |
| buildlets []*reverseBuildlet |
| } |
| |
| var errInUse = errors.New("all buildlets are in use") |
| |
| func (p *reverseBuildletPool) tryToGrab(machineType string) (*buildlet.Client, error) { |
| p.mu.Lock() |
| defer p.mu.Unlock() |
| usableCount := 0 |
| for _, b := range p.buildlets { |
| usable := false |
| for _, m := range b.modes { |
| if m == machineType { |
| usable = true |
| usableCount++ |
| break |
| } |
| } |
| if usable && b.inUseAs == "" { |
| // Found an unused match. |
| b.inUseAs = machineType |
| b.client.SetCloseFunc(func() error { |
| p.mu.Lock() |
| b.inUseAs = "" |
| p.mu.Unlock() |
| select { |
| case p.buildletReturned <- token{}: |
| default: |
| } |
| return nil |
| }) |
| return b.client, nil |
| } |
| } |
| if usableCount == 0 { |
| return nil, fmt.Errorf("no buildlets registered for machine type %q", machineType) |
| } |
| return nil, errInUse |
| } |
| |
| // nukeBuildlet wipes out victim as a buildlet we'll ever return again, |
| // and closes its TCP connection in hopes that it will fix itself |
| // later. |
| func (p *reverseBuildletPool) nukeBuildlet(victim *buildlet.Client) { |
| p.mu.Lock() |
| defer p.mu.Unlock() |
| for i, rb := range p.buildlets { |
| if rb.client == victim { |
| defer rb.conn.Close() |
| p.buildlets = append(p.buildlets[:i], p.buildlets[i+1:]...) |
| return |
| } |
| } |
| } |
| |
| // reverseHealthCheck requests the status page of each idle buildlet. |
| // If the buildlet fails to respond promptly, it is removed from the pool. |
| func (p *reverseBuildletPool) reverseHealthCheck() { |
| p.mu.Lock() |
| responses := make(map[*reverseBuildlet]chan error) |
| for _, b := range p.buildlets { |
| if b.inUseAs == "health" { // sanity check |
| panic("previous health check still running") |
| } |
| if b.inUseAs != "" { |
| continue // skip busy buildlets |
| } |
| b.inUseAs = "health" |
| res := make(chan error, 1) |
| responses[b] = res |
| client := b.client |
| go func() { |
| _, err := client.Status() |
| res <- err |
| }() |
| } |
| p.mu.Unlock() |
| time.Sleep(5 * time.Second) // give buildlets time to respond |
| p.mu.Lock() |
| |
| var buildlets []*reverseBuildlet |
| for _, b := range p.buildlets { |
| res := responses[b] |
| if b.inUseAs != "health" || res == nil { |
| // buildlet skipped or registered after health check |
| buildlets = append(buildlets, b) |
| continue |
| } |
| b.inUseAs = "" |
| var err error |
| select { |
| case err = <-res: |
| default: |
| // It had 5 seconds above to send to the |
| // buffered channel. So if we're here, it took |
| // over 5 seconds. |
| err = errors.New("health check timeout") |
| } |
| if err == nil { |
| buildlets = append(buildlets, b) |
| continue |
| } |
| // remove bad buildlet |
| log.Printf("Reverse buildlet %s %v not responding, removing from pool", b.client, b.modes) |
| go b.client.Close() |
| go b.conn.Close() |
| } |
| p.buildlets = buildlets |
| p.mu.Unlock() |
| } |
| |
| func (p *reverseBuildletPool) GetBuildlet(cancel Cancel, machineType, rev string, el eventTimeLogger) (*buildlet.Client, error) { |
| seenErrInUse := false |
| for { |
| b, err := p.tryToGrab(machineType) |
| if err == errInUse { |
| if !seenErrInUse { |
| el.logEventTime("waiting_machine_in_use") |
| seenErrInUse = true |
| } |
| select { |
| case <-p.buildletReturned: |
| // As multiple goroutines can be listening for the |
| // buildletReturned signal, it must be treated as |
| // a best effort signal. So periodically try to grab |
| // a buildlet again. |
| case <-time.After(30 * time.Second): |
| case <-cancel: |
| return nil, ErrCanceled |
| } |
| } else if err != nil { |
| return nil, err |
| } else { |
| el.logEventTime("got_machine") |
| // Clean up any files from previous builds. |
| if err := b.RemoveAll("."); err != nil { |
| b.Close() |
| return nil, err |
| } |
| el.logEventTime("cleaned_up") |
| return b, nil |
| } |
| } |
| } |
| |
| func (p *reverseBuildletPool) WriteHTMLStatus(w io.Writer) { |
| // total maps from a builder type to the number of machines which are |
| // capable of that role. |
| total := make(map[string]int) |
| // inUse and inUseOther track the number of machines using machines. |
| // inUse is how many machines are building that type, and inUseOther counts |
| // how many machines are occupied doing a similar role on that hardware. |
| // e.g. "darwin-amd64-10_10" occupied as a "darwin-arm-a5ios", |
| // or "linux-arm" as a "linux-arm-arm5" count as inUseOther. |
| inUse := make(map[string]int) |
| inUseOther := make(map[string]int) |
| |
| p.mu.Lock() |
| for _, b := range p.buildlets { |
| for _, mode := range b.modes { |
| if b.inUseAs != "" && b.inUseAs != "health" { |
| if mode == b.inUseAs { |
| inUse[mode]++ |
| } else { |
| inUseOther[mode]++ |
| } |
| } |
| total[mode]++ |
| } |
| } |
| p.mu.Unlock() |
| |
| var modes []string |
| for mode := range total { |
| modes = append(modes, mode) |
| } |
| sort.Strings(modes) |
| |
| io.WriteString(w, "<b>Reverse pool</b><ul>") |
| if len(modes) == 0 { |
| io.WriteString(w, "<li>no connections</li>") |
| } |
| for _, mode := range modes { |
| use, other := inUse[mode], inUseOther[mode] |
| if use+other == 0 { |
| fmt.Fprintf(w, "<li>%s: 0/%d</li>", mode, total[mode]) |
| } else { |
| fmt.Fprintf(w, "<li>%s: %d/%d (%d + %d other)</li>", mode, use+other, total[mode], use, other) |
| } |
| } |
| io.WriteString(w, "</ul>") |
| } |
| |
| func (p *reverseBuildletPool) String() string { |
| p.mu.Lock() |
| inUse := 0 |
| for _, b := range p.buildlets { |
| if b.inUseAs != "" && b.inUseAs != "health" { |
| inUse++ |
| } |
| } |
| p.mu.Unlock() |
| |
| return fmt.Sprintf("Reverse pool capacity: %d/%d %s", inUse, len(p.buildlets), p.Modes()) |
| } |
| |
| // Modes returns the a deduplicated list of buildlet modes curently supported |
| // by the pool. Buildlet modes are described on reverseBuildlet comments. |
| func (p *reverseBuildletPool) Modes() (modes []string) { |
| mm := make(map[string]bool) |
| p.mu.Lock() |
| for _, b := range p.buildlets { |
| for _, mode := range b.modes { |
| mm[mode] = true |
| } |
| } |
| p.mu.Unlock() |
| |
| for mode := range mm { |
| modes = append(modes, mode) |
| } |
| sort.Strings(modes) |
| return modes |
| } |
| |
| // CanBuild reports whether the pool has a machine capable of building mode. |
| // The machine may be in use, so you may have to wait. |
| func (p *reverseBuildletPool) CanBuild(mode string) bool { |
| p.mu.Lock() |
| defer p.mu.Unlock() |
| for _, b := range p.buildlets { |
| for _, m := range b.modes { |
| if m == mode { |
| return true |
| } |
| } |
| } |
| return false |
| } |
| |
| // reverseBuildlet is a registered reverse buildlet. |
| // Its immediate fields are guarded by the reverseBuildletPool mutex. |
| type reverseBuildlet struct { |
| client *buildlet.Client |
| conn net.Conn |
| |
| // modes is the set of valid modes for this buildlet. |
| // |
| // A mode is the equivalent of a builder name, for example |
| // "darwin-amd64", "android-arm", or "linux-amd64-race". |
| // |
| // Each buildlet may potentially have several modes. For example a |
| // Mac OS X machine with an attached iOS device may be registered |
| // as both "darwin-amd64", "darwin-arm64". |
| modes []string |
| |
| // inUseAs signifies that the buildlet is in use as the named mode. |
| // guarded by mutex on reverseBuildletPool. |
| inUseAs string |
| // TODO: inUseTime time.Time + more HTML status |
| } |
| |
| func handleReverse(w http.ResponseWriter, r *http.Request) { |
| if r.TLS == nil { |
| http.Error(w, "buildlet registration requires SSL", http.StatusInternalServerError) |
| return |
| } |
| // Check build keys. |
| modes := r.Header["X-Go-Builder-Type"] |
| gobuildkeys := r.Header["X-Go-Builder-Key"] |
| if len(modes) == 0 || len(modes) != len(gobuildkeys) { |
| http.Error(w, fmt.Sprintf("need at least one mode and matching key, got %d/%d", len(modes), len(gobuildkeys)), http.StatusPreconditionFailed) |
| return |
| } |
| for i, m := range modes { |
| if gobuildkeys[i] != builderKey(m) { |
| http.Error(w, fmt.Sprintf("bad key for mode %q", m), http.StatusPreconditionFailed) |
| return |
| } |
| } |
| |
| conn, bufrw, err := w.(http.Hijacker).Hijack() |
| if err != nil { |
| http.Error(w, err.Error(), http.StatusInternalServerError) |
| return |
| } |
| log.Printf("Registering reverse buildlet %s", r.RemoteAddr) |
| |
| // The server becomes a (very simple) http client. |
| (&http.Response{StatusCode: 200, Proto: "HTTP/1.1"}).Write(conn) |
| |
| client := buildlet.NewClient("none", buildlet.NoKeyPair) |
| client.SetHTTPClient(&http.Client{ |
| Transport: newRoundTripper(bufrw), |
| }) |
| status, err := client.Status() |
| if err != nil { |
| log.Printf("Reverse connection did not answer status: %v", err) |
| conn.Close() |
| return |
| } |
| if status.Version < minBuildletVersion { |
| log.Printf("Buildlet too old: %s, %+v", r.RemoteAddr, status) |
| conn.Close() |
| return |
| } |
| log.Printf("Buildlet %s: %+v for %s", r.RemoteAddr, status, modes) |
| |
| // TODO(crawshaw): unregister buildlet when it disconnects. Maybe just |
| // periodically request Status, and if there's no response unregister. |
| reversePool.mu.Lock() |
| defer reversePool.mu.Unlock() |
| b := &reverseBuildlet{ |
| modes: modes, |
| client: client, |
| conn: conn, |
| } |
| reversePool.buildlets = append(reversePool.buildlets, b) |
| registerBuildlet(modes) |
| } |
| |
| var registerBuildlet = func(modes []string) {} // test hook |
| |
| func newRoundTripper(bufrw *bufio.ReadWriter) *reverseRoundTripper { |
| return &reverseRoundTripper{ |
| bufrw: bufrw, |
| sema: make(chan bool, 1), |
| } |
| } |
| |
| // reverseRoundTripper is an http client that serializes all requests |
| // over a *bufio.ReadWriter. |
| // |
| // Attempts at concurrent requests return an error. |
| type reverseRoundTripper struct { |
| bufrw *bufio.ReadWriter |
| sema chan bool |
| } |
| |
| func (c *reverseRoundTripper) RoundTrip(req *http.Request) (resp *http.Response, err error) { |
| // Serialize trips. It is up to callers to avoid deadlocking. |
| c.sema <- true |
| if err := req.Write(c.bufrw); err != nil { |
| <-c.sema |
| return nil, err |
| } |
| if err := c.bufrw.Flush(); err != nil { |
| <-c.sema |
| return nil, err |
| } |
| resp, err = http.ReadResponse(c.bufrw.Reader, req) |
| if err != nil { |
| <-c.sema |
| return nil, err |
| } |
| resp.Body = &reverseLockedBody{resp.Body, c.sema} |
| return resp, err |
| } |
| |
| type reverseLockedBody struct { |
| body io.ReadCloser |
| sema chan bool |
| } |
| |
| func (b *reverseLockedBody) Read(p []byte) (n int, err error) { |
| return b.body.Read(p) |
| } |
| |
| func (b *reverseLockedBody) Close() error { |
| err := b.body.Close() |
| <-b.sema |
| b.body = nil // prevent double close |
| return err |
| } |