blob: 94445e427ad94e4dfea6abf58e40c6ccd3d6ddd9 [file] [log] [blame]
Johan Euphrosine093044a2015-05-12 14:21:46 -07001// Copyright 2015 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5// Package kubernetes contains a minimal client for the Kubernetes API.
6package kubernetes
7
8import (
Evan Brown956434c2015-10-02 15:38:22 -07009 "bufio"
Johan Euphrosine093044a2015-05-12 14:21:46 -070010 "bytes"
11 "encoding/json"
Johan Euphrosinef46fbc22016-04-07 23:27:02 -070012 "errors"
Johan Euphrosine093044a2015-05-12 14:21:46 -070013 "fmt"
Johan Euphrosinef46fbc22016-04-07 23:27:02 -070014 "io"
Johan Euphrosine093044a2015-05-12 14:21:46 -070015 "io/ioutil"
Brad Fitzpatrickc6ced0a2016-05-04 20:57:37 +000016 "log"
Johan Euphrosine093044a2015-05-12 14:21:46 -070017 "net/http"
18 "net/url"
19 "strings"
20 "time"
21
Evan Brownc51d4e02015-09-08 15:56:15 -070022 "golang.org/x/build/kubernetes/api"
Evan Brown956434c2015-10-02 15:38:22 -070023 "golang.org/x/net/context"
24 "golang.org/x/net/context/ctxhttp"
Johan Euphrosine093044a2015-05-12 14:21:46 -070025)
26
27const (
28 // APIEndpoint defines the base path for kubernetes API resources.
Johan Euphrosinef46fbc22016-04-07 23:27:02 -070029 APIEndpoint = "/api/v1"
30 defaultPodNS = "/namespaces/default/pods"
31 defaultSecretNS = "/namespaces/default/secrets"
32 defaultWatchPodNS = "/watch/namespaces/default/pods"
33 nodes = "/nodes"
Johan Euphrosine093044a2015-05-12 14:21:46 -070034)
35
Johan Euphrosinef46fbc22016-04-07 23:27:02 -070036// ErrSecretNotFound is returned by GetSecret when a secret is not found.
37var ErrSecretNotFound = errors.New("kubernetes: secret not found")
38
39// APIError is returned by Client methods when an API call failed.
40type APIError struct {
41 StatusCode int
42 Body string
43 Header http.Header
44}
45
46func (e *APIError) Error() string {
47 return fmt.Sprintf("API error %d: %q", e.StatusCode, e.Body)
48}
49
Johan Euphrosine093044a2015-05-12 14:21:46 -070050// Client is a client for the Kubernetes master.
51type Client struct {
52 endpointURL string
53 httpClient *http.Client
54}
55
56// NewClient returns a new Kubernetes client.
57// The provided host is an url (scheme://hostname[:port]) of a
58// Kubernetes master without any path.
59// The provided client is an authorized http.Client used to perform requests to the Kubernetes API master.
60func NewClient(baseURL string, client *http.Client) (*Client, error) {
61 validURL, err := url.Parse(baseURL)
62 if err != nil {
63 return nil, fmt.Errorf("failed to parse URL %q: %v", baseURL, err)
64 }
65 return &Client{
66 endpointURL: strings.TrimSuffix(validURL.String(), "/") + APIEndpoint,
67 httpClient: client,
68 }, nil
69}
70
Brad Fitzpatrick4a3f4a82016-05-05 18:29:41 +000071// RunLongLivedPod creates a new pod resource in the default pod namespace with
72// the given pod API specification. It assumes the pod runs a
73// long-lived server (i.e. if the container exit quickly quickly, even
74// with success, then that is an error).
75//
Evan Brown956434c2015-10-02 15:38:22 -070076// It returns the pod status once it has entered the Running phase.
Evan Brown34ff1d92016-02-16 22:39:27 -080077// An error is returned if the pod can not be created, or if ctx.Done
Evan Brown956434c2015-10-02 15:38:22 -070078// is closed.
Brad Fitzpatrick4a3f4a82016-05-05 18:29:41 +000079func (c *Client) RunLongLivedPod(ctx context.Context, pod *api.Pod) (*api.PodStatus, error) {
Johan Euphrosine093044a2015-05-12 14:21:46 -070080 var podResult api.Pod
Brad Fitzpatrick49a940a2016-07-18 18:56:59 -070081 if err := c.do(ctx, &podResult, "POST", wantCreated, defaultPodNS, pod); err != nil {
Johan Euphrosinef46fbc22016-04-07 23:27:02 -070082 return nil, err
Johan Euphrosine093044a2015-05-12 14:21:46 -070083 }
Evan Brown956434c2015-10-02 15:38:22 -070084
Evan Brown34ff1d92016-02-16 22:39:27 -080085 for {
Brad Fitzpatrick4a3f4a82016-05-05 18:29:41 +000086 // TODO(bradfitz,evanbrown): pass podResult.ObjectMeta.ResourceVersion to PodStatus?
87 ps, err := c.PodStatus(ctx, podResult.Name)
Evan Brown34ff1d92016-02-16 22:39:27 -080088 if err != nil {
Brad Fitzpatrick4a3f4a82016-05-05 18:29:41 +000089 return nil, err
Evan Brown34ff1d92016-02-16 22:39:27 -080090 }
Brad Fitzpatrick4a3f4a82016-05-05 18:29:41 +000091 switch ps.Phase {
92 case api.PodPending:
93 // The main phase we're waiting on
94 break
95 case api.PodRunning:
96 return ps, nil
97 case api.PodSucceeded, api.PodFailed:
98 return nil, fmt.Errorf("pod entered phase %q", ps.Phase)
99 default:
100 log.Printf("RunLongLivedPod poll loop: pod %q in unexpected phase %q; sleeping", podResult.Name, ps.Phase)
101 }
102 select {
103 case <-time.After(5 * time.Second):
104 case <-ctx.Done():
105 // The pod did not leave the pending
106 // state. Try to clean it up.
107 go c.DeletePod(context.Background(), podResult.Name)
108 return nil, ctx.Err()
109 }
Johan Euphrosine093044a2015-05-12 14:21:46 -0700110 }
Evan Brown956434c2015-10-02 15:38:22 -0700111}
112
Evan Brown83f97482015-10-17 20:35:55 -0700113// GetPods returns all pods in the cluster, regardless of status.
114func (c *Client) GetPods(ctx context.Context) ([]api.Pod, error) {
Johan Euphrosinef46fbc22016-04-07 23:27:02 -0700115 var res api.PodList
Brad Fitzpatrick49a940a2016-07-18 18:56:59 -0700116 if err := c.do(ctx, &res, "GET", wantOK, c.endpointURL+defaultPodNS, nil); err != nil {
Johan Euphrosinef46fbc22016-04-07 23:27:02 -0700117 return nil, err
Evan Brown83f97482015-10-17 20:35:55 -0700118 }
Johan Euphrosinef46fbc22016-04-07 23:27:02 -0700119 return res.Items, nil
Evan Brown83f97482015-10-17 20:35:55 -0700120}
121
122// PodDelete deletes the specified Kubernetes pod.
123func (c *Client) DeletePod(ctx context.Context, podName string) error {
Brad Fitzpatrick49a940a2016-07-18 18:56:59 -0700124 return c.do(ctx, nil, "DELETE", wantOK, defaultPodNS+"/"+podName, nil)
Evan Brown83f97482015-10-17 20:35:55 -0700125}
126
Brad Fitzpatrick4a3f4a82016-05-05 18:29:41 +0000127// TODO(bradfitz): WatchPod is unreliable, so this is disabled.
128//
129// AwaitPodNotPending will return a pod's status in a
Evan Brown956434c2015-10-02 15:38:22 -0700130// podStatusResult when the pod is no longer in the pending
131// state.
132// The podResourceVersion is required to prevent a pod's entire
133// history from being retrieved when the watch is initiated.
134// If there is an error polling for the pod's status, or if
135// ctx.Done is closed, podStatusResult will contain an error.
Brad Fitzpatrick4a3f4a82016-05-05 18:29:41 +0000136func (c *Client) _AwaitPodNotPending(ctx context.Context, podName, podResourceVersion string) (*api.Pod, error) {
Evan Brown956434c2015-10-02 15:38:22 -0700137 if podResourceVersion == "" {
138 return nil, fmt.Errorf("resourceVersion for pod %v must be provided", podName)
139 }
140 ctx, cancel := context.WithCancel(ctx)
141 defer cancel()
142
Brad Fitzpatrick4a3f4a82016-05-05 18:29:41 +0000143 podStatusUpdates, err := c._WatchPod(ctx, podName, podResourceVersion)
Evan Brown956434c2015-10-02 15:38:22 -0700144 if err != nil {
145 return nil, err
146 }
Evan Brown956434c2015-10-02 15:38:22 -0700147 for {
148 select {
Brad Fitzpatrick90f71792016-04-08 19:03:58 +0000149 case <-ctx.Done():
150 return nil, ctx.Err()
151 case psr := <-podStatusUpdates:
Evan Brown956434c2015-10-02 15:38:22 -0700152 if psr.Err != nil {
Brad Fitzpatrick90f71792016-04-08 19:03:58 +0000153 // If the context is done, prefer its error:
154 select {
155 case <-ctx.Done():
156 return nil, ctx.Err()
157 default:
158 return nil, psr.Err
159 }
Evan Brown956434c2015-10-02 15:38:22 -0700160 }
Evan Brown2e452e12015-11-20 09:43:56 -0800161 if psr.Pod.Status.Phase != api.PodPending {
162 return psr.Pod, nil
Evan Brown956434c2015-10-02 15:38:22 -0700163 }
164 }
165 }
166}
167
Brad Fitzpatrick90f71792016-04-08 19:03:58 +0000168// PodStatusResult wraps an api.PodStatus and error.
Evan Brown956434c2015-10-02 15:38:22 -0700169type PodStatusResult struct {
Evan Brown2e452e12015-11-20 09:43:56 -0800170 Pod *api.Pod
171 Type string
172 Err error
Evan Brown956434c2015-10-02 15:38:22 -0700173}
174
175type watchPodStatus struct {
176 // The type of watch update contained in the message
177 Type string `json:"type"`
178 // Pod details
179 Object api.Pod `json:"object"`
180}
181
Brad Fitzpatrick4a3f4a82016-05-05 18:29:41 +0000182// TODO(bradfitz): WatchPod is unreliable and sometimes hangs forever
183// without closing and sometimes ends prematurely, so this API is
184// disabled.
185//
Evan Brown2e452e12015-11-20 09:43:56 -0800186// WatchPod long-polls the Kubernetes watch API to be notified
Evan Brown956434c2015-10-02 15:38:22 -0700187// of changes to the specified pod. Changes are sent on the returned
188// PodStatusResult channel as they are received.
189// The podResourceVersion is required to prevent a pod's entire
190// history from being retrieved when the watch is initiated.
191// The provided context must be canceled or timed out to stop the watch.
192// If any error occurs communicating with the Kubernetes API, the
193// error will be sent on the returned PodStatusResult channel and
194// it will be closed.
Brad Fitzpatrick4a3f4a82016-05-05 18:29:41 +0000195func (c *Client) _WatchPod(ctx context.Context, podName, podResourceVersion string) (<-chan PodStatusResult, error) {
Evan Brown956434c2015-10-02 15:38:22 -0700196 if podResourceVersion == "" {
197 return nil, fmt.Errorf("resourceVersion for pod %v must be provided", podName)
198 }
Brad Fitzpatrick90f71792016-04-08 19:03:58 +0000199 statusChan := make(chan PodStatusResult, 1)
Evan Brown956434c2015-10-02 15:38:22 -0700200
201 go func() {
202 defer close(statusChan)
Brad Fitzpatrickc6ced0a2016-05-04 20:57:37 +0000203 ctx, cancel := context.WithCancel(ctx)
204 defer cancel()
205
Evan Brown956434c2015-10-02 15:38:22 -0700206 // Make request to Kubernetes API
Johan Euphrosinef46fbc22016-04-07 23:27:02 -0700207 getURL := c.endpointURL + defaultWatchPodNS + "/" + podName
Evan Brown956434c2015-10-02 15:38:22 -0700208 req, err := http.NewRequest("GET", getURL, nil)
209 req.URL.Query().Add("resourceVersion", podResourceVersion)
210 if err != nil {
211 statusChan <- PodStatusResult{Err: fmt.Errorf("failed to create request: GET %q : %v", getURL, err)}
212 return
213 }
214 res, err := ctxhttp.Do(ctx, c.httpClient, req)
215 if err != nil {
Brad Fitzpatrick90f71792016-04-08 19:03:58 +0000216 statusChan <- PodStatusResult{Err: err}
Evan Brown956434c2015-10-02 15:38:22 -0700217 return
218 }
Evan Brown34ff1d92016-02-16 22:39:27 -0800219 defer res.Body.Close()
Brad Fitzpatrickc6ced0a2016-05-04 20:57:37 +0000220 if res.StatusCode != 200 {
221 statusChan <- PodStatusResult{Err: fmt.Errorf("WatchPod status %v", res.Status)}
222 return
223 }
Evan Brown956434c2015-10-02 15:38:22 -0700224 reader := bufio.NewReader(res.Body)
Evan Brown83f97482015-10-17 20:35:55 -0700225
226 // bufio.Reader.ReadBytes is blocking, so we watch for
227 // context timeout or cancellation in a goroutine
228 // and close the response body when see see it. The
229 // response body is also closed via defer when the
230 // request is made, but closing twice is OK.
231 go func() {
232 <-ctx.Done()
233 res.Body.Close()
234 }()
235
Brad Fitzpatrickc6ced0a2016-05-04 20:57:37 +0000236 const backupPollDuration = 30 * time.Second
237 backupPoller := time.AfterFunc(backupPollDuration, func() {
238 log.Printf("kubernetes: backup poller in WatchPod checking on %q", podName)
239 st, err := c.PodStatus(ctx, podName)
240 log.Printf("kubernetes: backup poller in WatchPod PodStatus(%q) = %v, %v", podName, st, err)
241 if err != nil {
242 // Some error.
243 cancel()
244 }
245 })
246 defer backupPoller.Stop()
247
Evan Brown956434c2015-10-02 15:38:22 -0700248 for {
249 line, err := reader.ReadBytes('\n')
Brad Fitzpatrickc6ced0a2016-05-04 20:57:37 +0000250 log.Printf("kubernetes WatchPod status line of %q: %q, %v", podName, line, err)
251 backupPoller.Reset(backupPollDuration)
Evan Brown956434c2015-10-02 15:38:22 -0700252 if err != nil {
253 statusChan <- PodStatusResult{Err: fmt.Errorf("error reading streaming response body: %v", err)}
254 return
255 }
Brad Fitzpatrick90f71792016-04-08 19:03:58 +0000256 var wps watchPodStatus
Evan Brown956434c2015-10-02 15:38:22 -0700257 if err := json.Unmarshal(line, &wps); err != nil {
258 statusChan <- PodStatusResult{Err: fmt.Errorf("failed to decode watch pod status: %v", err)}
259 return
260 }
Evan Brown2e452e12015-11-20 09:43:56 -0800261 statusChan <- PodStatusResult{Pod: &wps.Object, Type: wps.Type}
Evan Brown956434c2015-10-02 15:38:22 -0700262 }
263 }()
264 return statusChan, nil
265}
266
267// Retrieve the status of a pod synchronously from the Kube
268// API server.
269func (c *Client) PodStatus(ctx context.Context, podName string) (*api.PodStatus, error) {
Johan Euphrosinef46fbc22016-04-07 23:27:02 -0700270 var pod api.Pod
Brad Fitzpatrick49a940a2016-07-18 18:56:59 -0700271 if err := c.do(ctx, &pod, "GET", wantOK, defaultPodNS+"/"+podName, nil); err != nil {
Johan Euphrosinef46fbc22016-04-07 23:27:02 -0700272 return nil, err
Evan Brown956434c2015-10-02 15:38:22 -0700273 }
274 return &pod.Status, nil
275}
276
277// PodLog retrieves the container log for the first container
278// in the pod.
279func (c *Client) PodLog(ctx context.Context, podName string) (string, error) {
280 // TODO(evanbrown): support multiple containers
Johan Euphrosinef46fbc22016-04-07 23:27:02 -0700281 var logs string
Brad Fitzpatrick49a940a2016-07-18 18:56:59 -0700282 if err := c.do(ctx, &logs, "GET", wantOK, defaultPodNS+"/"+podName+"/log", nil); err != nil {
Johan Euphrosinef46fbc22016-04-07 23:27:02 -0700283 return "", err
Evan Brown956434c2015-10-02 15:38:22 -0700284 }
Johan Euphrosinef46fbc22016-04-07 23:27:02 -0700285 return logs, nil
Johan Euphrosine093044a2015-05-12 14:21:46 -0700286}
Evan Brown83f97482015-10-17 20:35:55 -0700287
288// PodNodes returns the list of nodes that comprise the Kubernetes cluster
289func (c *Client) GetNodes(ctx context.Context) ([]api.Node, error) {
Johan Euphrosinef46fbc22016-04-07 23:27:02 -0700290 var res api.NodeList
Brad Fitzpatrick49a940a2016-07-18 18:56:59 -0700291 if err := c.do(ctx, &res, "GET", wantOK, nodes, nil); err != nil {
Johan Euphrosinef46fbc22016-04-07 23:27:02 -0700292 return nil, err
293 }
294 return res.Items, nil
295}
296
297// CreateSecret creates a new secret resource in the default secret namespace with
298// the given secret.
299// It returns a new secret instance corresponding to the server side representation.
300func (c *Client) CreateSecret(ctx context.Context, secret *api.Secret) (*api.Secret, error) {
301 var res api.Secret
Brad Fitzpatrick49a940a2016-07-18 18:56:59 -0700302 if err := c.do(ctx, &res, "POST", wantCreated, defaultSecretNS, secret); err != nil {
Johan Euphrosinef46fbc22016-04-07 23:27:02 -0700303 return nil, err
304 }
305 return &res, nil
306}
307
308// GetSecret returns the specified secret from the default secret namespace.
309// If the secret is not found, the err will be ErrSecretNotFound.
310func (c *Client) GetSecret(ctx context.Context, name string) (*api.Secret, error) {
311 var res api.Secret
Brad Fitzpatrick49a940a2016-07-18 18:56:59 -0700312 if err := c.do(ctx, &res, "GET", wantOK, defaultSecretNS+"/"+name, nil); err != nil {
Johan Euphrosinef46fbc22016-04-07 23:27:02 -0700313 return nil, err
314 }
315 return &res, nil
316}
317
Brad Fitzpatrick49a940a2016-07-18 18:56:59 -0700318func wantOK(code int) bool { return code == http.StatusOK }
319func wantCreated(code int) bool { return code == http.StatusCreated }
320
321func (c *Client) do(ctx context.Context, dst interface{}, method string, checkResStatus func(int) bool, path string, payload interface{}) error {
Johan Euphrosinef46fbc22016-04-07 23:27:02 -0700322 var body io.Reader
323 if payload != nil {
324 buf := new(bytes.Buffer)
325 if err := json.NewEncoder(buf).Encode(payload); err != nil {
326 return fmt.Errorf("failed encode json payload: %v", err)
327 }
328 body = buf
329 }
330 req, err := http.NewRequest(method, c.endpointURL+path, body)
Evan Brown83f97482015-10-17 20:35:55 -0700331 if err != nil {
Johan Euphrosinef46fbc22016-04-07 23:27:02 -0700332 return fmt.Errorf("failed to create request: %s %q : %v", method, path, err)
Evan Brown83f97482015-10-17 20:35:55 -0700333 }
Johan Euphrosinef46fbc22016-04-07 23:27:02 -0700334 resp, err := ctxhttp.Do(ctx, c.httpClient, req)
Evan Brown83f97482015-10-17 20:35:55 -0700335 if err != nil {
Johan Euphrosinef46fbc22016-04-07 23:27:02 -0700336 return fmt.Errorf("failed to perform request: %s %q: %v", method, path, err)
Evan Brown83f97482015-10-17 20:35:55 -0700337 }
Johan Euphrosinef46fbc22016-04-07 23:27:02 -0700338 defer resp.Body.Close()
Brad Fitzpatrick49a940a2016-07-18 18:56:59 -0700339 if !checkResStatus(resp.StatusCode) {
Johan Euphrosinef46fbc22016-04-07 23:27:02 -0700340 body, _ := ioutil.ReadAll(resp.Body)
341 return &APIError{
342 StatusCode: resp.StatusCode,
343 Body: string(body),
344 Header: resp.Header,
345 }
Evan Brown83f97482015-10-17 20:35:55 -0700346 }
Johan Euphrosinef46fbc22016-04-07 23:27:02 -0700347
348 switch dst := dst.(type) {
349 case nil:
350 return nil
351 case *string:
352 // string dest
353 bs, err := ioutil.ReadAll(resp.Body)
354 if err != nil {
355 return fmt.Errorf("failed to read raw body: %v", err)
356 }
357 *dst = string(bs)
358 default:
359 // json dest
360 if err := json.NewDecoder(resp.Body).Decode(dst); err != nil {
361 return fmt.Errorf("failed to decode API response: %v", err)
362 }
Evan Brown83f97482015-10-17 20:35:55 -0700363 }
Johan Euphrosinef46fbc22016-04-07 23:27:02 -0700364 return nil
Evan Brown83f97482015-10-17 20:35:55 -0700365}