blob: 671ed99d7d8ad41dbfbbe55fcead1cc663ce9e41 [file] [log] [blame]
// Copyright 2015 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package main
import (
"bufio"
"fmt"
"io"
"log"
"net/http"
"sync"
"golang.org/x/build/buildlet"
)
const minBuildletVersion = 1
var reversePool = &reverseBuildletPool{}
type reverseBuildletPool struct {
mu sync.Mutex
buildlets []reverseBuildlet
}
type reverseBuildlet struct {
modes []string
client *buildlet.Client
}
func handleReverse(w http.ResponseWriter, r *http.Request) {
if r.TLS == nil {
http.Error(w, "buildlet registration requires SSL", http.StatusInternalServerError)
return
}
// Check build keys.
modes := r.Header["X-Go-Builder-Type"]
gobuildkeys := r.Header["X-Go-Builder-Key"]
if len(modes) == 0 || len(modes) != len(gobuildkeys) {
http.Error(w, fmt.Sprintf("need at least one mode and matching key, got %d/%d", len(modes), len(gobuildkeys)), http.StatusPreconditionFailed)
return
}
for i, m := range modes {
if gobuildkeys[i] != builderKey(m) {
http.Error(w, fmt.Sprintf("bad key for mode %q", m), http.StatusPreconditionFailed)
return
}
}
conn, bufrw, err := w.(http.Hijacker).Hijack()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
log.Printf("Registering reverse buildlet %s", r.RemoteAddr)
// The server becomes a (very simple) http client.
(&http.Response{StatusCode: 200, Proto: "HTTP/1.1"}).Write(conn)
client := buildlet.NewClient("none", buildlet.NoKeyPair)
client.SetHTTPClient(&http.Client{
Transport: newRoundTripper(bufrw),
})
status, err := client.Status()
if err != nil {
log.Printf("Reverse connection did not answer status: %v", err)
conn.Close()
return
}
if status.Version < minBuildletVersion {
log.Printf("Buildlet too old: %s, %+v", r.RemoteAddr, status)
conn.Close()
return
}
log.Printf("Buildlet %s: %+v for %s", r.RemoteAddr, status, modes)
// TODO(crawshaw): unregister buildlet when it disconnects. Maybe just
// periodically request Status, and if there's no response unregister.
reversePool.mu.Lock()
defer reversePool.mu.Unlock()
b := reverseBuildlet{
modes: modes,
client: client,
}
reversePool.buildlets = append(reversePool.buildlets, b)
registerBuildlet(b)
}
var registerBuildlet = func(b reverseBuildlet) {} // test hook
func newRoundTripper(bufrw *bufio.ReadWriter) *reverseRoundTripper {
return &reverseRoundTripper{
bufrw: bufrw,
sema: make(chan bool, 1),
}
}
// reverseRoundTripper is an http client that serializes all requests
// over a *bufio.ReadWriter.
//
// Attempts at concurrent requests return an error.
type reverseRoundTripper struct {
bufrw *bufio.ReadWriter
sema chan bool
}
func (c *reverseRoundTripper) RoundTrip(req *http.Request) (resp *http.Response, err error) {
select {
case c.sema <- true:
default:
return nil, fmt.Errorf("reverseRoundTripper: line busy")
}
if err := req.Write(c.bufrw); err != nil {
return nil, err
}
if err := c.bufrw.Flush(); err != nil {
return nil, err
}
resp, err = http.ReadResponse(c.bufrw.Reader, req)
if err != nil {
return nil, err
}
resp.Body = &reverseLockedBody{resp.Body, c.sema}
return resp, err
}
type reverseLockedBody struct {
body io.ReadCloser
sema chan bool
}
func (b *reverseLockedBody) Read(p []byte) (n int, err error) {
return b.body.Read(p)
}
func (b *reverseLockedBody) Close() error {
err := b.body.Close()
<-b.sema
b.body = nil // prevent double close
return err
}