blob: 2ee425a3df4082041a34428b0d82912f724c1833 [file] [log] [blame]
Johan Euphrosine093044a2015-05-12 14:21:46 -07001// Copyright 2015 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5// Package kubernetes contains a minimal client for the Kubernetes API.
6package kubernetes
7
8import (
Evan Brown956434c2015-10-02 15:38:22 -07009 "bufio"
Johan Euphrosine093044a2015-05-12 14:21:46 -070010 "bytes"
11 "encoding/json"
12 "fmt"
13 "io/ioutil"
14 "net/http"
15 "net/url"
16 "strings"
17 "time"
18
Evan Brownc51d4e02015-09-08 15:56:15 -070019 "golang.org/x/build/kubernetes/api"
Evan Brown956434c2015-10-02 15:38:22 -070020 "golang.org/x/net/context"
21 "golang.org/x/net/context/ctxhttp"
Johan Euphrosine093044a2015-05-12 14:21:46 -070022)
23
24const (
25 // APIEndpoint defines the base path for kubernetes API resources.
Evan Brown956434c2015-10-02 15:38:22 -070026 APIEndpoint = "/api/v1"
27 defaultPod = "/namespaces/default/pods"
28 defaultWatchPod = "/watch/namespaces/default/pods"
Johan Euphrosine093044a2015-05-12 14:21:46 -070029)
30
31// Client is a client for the Kubernetes master.
32type Client struct {
33 endpointURL string
34 httpClient *http.Client
35}
36
37// NewClient returns a new Kubernetes client.
38// The provided host is an url (scheme://hostname[:port]) of a
39// Kubernetes master without any path.
40// The provided client is an authorized http.Client used to perform requests to the Kubernetes API master.
41func NewClient(baseURL string, client *http.Client) (*Client, error) {
42 validURL, err := url.Parse(baseURL)
43 if err != nil {
44 return nil, fmt.Errorf("failed to parse URL %q: %v", baseURL, err)
45 }
46 return &Client{
47 endpointURL: strings.TrimSuffix(validURL.String(), "/") + APIEndpoint,
48 httpClient: client,
49 }, nil
50}
51
Evan Brown956434c2015-10-02 15:38:22 -070052// Run creates a new pod resource in the default pod namespace with
Johan Euphrosine093044a2015-05-12 14:21:46 -070053// the given pod API specification.
Evan Brown956434c2015-10-02 15:38:22 -070054// It returns the pod status once it has entered the Running phase.
55// An error is returned if the pod can not be created, if it does
56// does not enter the running phase within 2 minutes, or if ctx.Done
57// is closed.
58func (c *Client) Run(ctx context.Context, pod *api.Pod) (*api.PodStatus, error) {
Johan Euphrosine093044a2015-05-12 14:21:46 -070059 var podJSON bytes.Buffer
60 if err := json.NewEncoder(&podJSON).Encode(pod); err != nil {
61 return nil, fmt.Errorf("failed to encode pod in json: %v", err)
62 }
Evan Brown956434c2015-10-02 15:38:22 -070063 postURL := c.endpointURL + defaultPod
64 req, err := http.NewRequest("POST", postURL, &podJSON)
Johan Euphrosine093044a2015-05-12 14:21:46 -070065 if err != nil {
66 return nil, fmt.Errorf("failed to create request: POST %q : %v", postURL, err)
67 }
Evan Brown956434c2015-10-02 15:38:22 -070068 res, err := ctxhttp.Do(ctx, c.httpClient, req)
Johan Euphrosine093044a2015-05-12 14:21:46 -070069 if err != nil {
70 return nil, fmt.Errorf("failed to make request: POST %q: %v", postURL, err)
71 }
72 body, err := ioutil.ReadAll(res.Body)
73 res.Body.Close()
74 if err != nil {
75 return nil, fmt.Errorf("failed to read request body for POST %q: %v", postURL, err)
76 }
77 if res.StatusCode != http.StatusCreated {
78 return nil, fmt.Errorf("http error: %d POST %q: %q: %v", res.StatusCode, postURL, string(body), err)
79 }
80 var podResult api.Pod
81 if err := json.Unmarshal(body, &podResult); err != nil {
82 return nil, fmt.Errorf("failed to decode pod resources: %v", err)
83 }
Evan Brown956434c2015-10-02 15:38:22 -070084 ctx, cancel := context.WithTimeout(ctx, 2*time.Minute)
85 defer cancel()
86
87 status, err := c.AwaitPodNotPending(ctx, podResult.Name, podResult.ObjectMeta.ResourceVersion)
88 if err != nil {
89 return nil, err
Johan Euphrosine093044a2015-05-12 14:21:46 -070090 }
Evan Brown956434c2015-10-02 15:38:22 -070091 return status, nil
92}
93
94// awaitPodNotPending will return a pod's status in a
95// podStatusResult when the pod is no longer in the pending
96// state.
97// The podResourceVersion is required to prevent a pod's entire
98// history from being retrieved when the watch is initiated.
99// If there is an error polling for the pod's status, or if
100// ctx.Done is closed, podStatusResult will contain an error.
101func (c *Client) AwaitPodNotPending(ctx context.Context, podName, podResourceVersion string) (*api.PodStatus, error) {
102 if podResourceVersion == "" {
103 return nil, fmt.Errorf("resourceVersion for pod %v must be provided", podName)
104 }
105 ctx, cancel := context.WithCancel(ctx)
106 defer cancel()
107
108 podStatusResult, err := c.WatchPodStatus(ctx, podName, podResourceVersion)
109 if err != nil {
110 return nil, err
111 }
112 var psr PodStatusResult
113 for {
114 select {
115 case psr = <-podStatusResult:
116 if psr.Err != nil {
117 return nil, psr.Err
118 }
119 if psr.Status.Phase != api.PodPending {
120 return psr.Status, nil
121 }
122 }
123 }
124}
125
126// PodStatusResult wraps a api.PodStatus and error
127type PodStatusResult struct {
128 Status *api.PodStatus
129 Err error
130}
131
132type watchPodStatus struct {
133 // The type of watch update contained in the message
134 Type string `json:"type"`
135 // Pod details
136 Object api.Pod `json:"object"`
137}
138
139// WatchPodStatus long-polls the Kubernetes watch API to be notified
140// of changes to the specified pod. Changes are sent on the returned
141// PodStatusResult channel as they are received.
142// The podResourceVersion is required to prevent a pod's entire
143// history from being retrieved when the watch is initiated.
144// The provided context must be canceled or timed out to stop the watch.
145// If any error occurs communicating with the Kubernetes API, the
146// error will be sent on the returned PodStatusResult channel and
147// it will be closed.
148func (c *Client) WatchPodStatus(ctx context.Context, podName, podResourceVersion string) (<-chan PodStatusResult, error) {
149 if podResourceVersion == "" {
150 return nil, fmt.Errorf("resourceVersion for pod %v must be provided", podName)
151 }
152 statusChan := make(chan PodStatusResult)
153
154 go func() {
155 defer close(statusChan)
156 // Make request to Kubernetes API
157 getURL := c.endpointURL + defaultWatchPod + "/" + podName
158 req, err := http.NewRequest("GET", getURL, nil)
159 req.URL.Query().Add("resourceVersion", podResourceVersion)
160 if err != nil {
161 statusChan <- PodStatusResult{Err: fmt.Errorf("failed to create request: GET %q : %v", getURL, err)}
162 return
163 }
164 res, err := ctxhttp.Do(ctx, c.httpClient, req)
165 if err != nil {
166 statusChan <- PodStatusResult{Err: fmt.Errorf("failed to make request: GET %q: %v", getURL, err)}
167 return
168 }
169
170 var wps watchPodStatus
171 reader := bufio.NewReader(res.Body)
172 for {
173 line, err := reader.ReadBytes('\n')
174 if err != nil {
175 statusChan <- PodStatusResult{Err: fmt.Errorf("error reading streaming response body: %v", err)}
176 return
177 }
178 if err := json.Unmarshal(line, &wps); err != nil {
179 statusChan <- PodStatusResult{Err: fmt.Errorf("failed to decode watch pod status: %v", err)}
180 return
181 }
182 statusChan <- PodStatusResult{Status: &wps.Object.Status}
183 }
184 }()
185 return statusChan, nil
186}
187
188// Retrieve the status of a pod synchronously from the Kube
189// API server.
190func (c *Client) PodStatus(ctx context.Context, podName string) (*api.PodStatus, error) {
191 getURL := c.endpointURL + defaultPod + "/" + podName
192
193 // Make request to Kubernetes API
194 req, err := http.NewRequest("GET", getURL, nil)
195 if err != nil {
196 return nil, fmt.Errorf("failed to create request: GET %q : %v", getURL, err)
197 }
198 res, err := ctxhttp.Do(ctx, c.httpClient, req)
199 if err != nil {
200 return nil, fmt.Errorf("failed to make request: GET %q: %v", getURL, err)
201 }
202
203 body, err := ioutil.ReadAll(res.Body)
204 res.Body.Close()
205 if err != nil {
206 return nil, fmt.Errorf("failed to read request body for GET %q: %v", getURL, err)
207 }
208 if res.StatusCode != http.StatusOK {
209 return nil, fmt.Errorf("http error %d GET %q: %q: %v", res.StatusCode, getURL, string(body), err)
210 }
211
212 var pod *api.Pod
213 if err := json.Unmarshal(body, &pod); err != nil {
214 return nil, fmt.Errorf("failed to decode pod resources: %v", err)
215 }
216 return &pod.Status, nil
217}
218
219// PodLog retrieves the container log for the first container
220// in the pod.
221func (c *Client) PodLog(ctx context.Context, podName string) (string, error) {
222 // TODO(evanbrown): support multiple containers
223 url := c.endpointURL + defaultPod + "/" + podName + "/log"
224 req, err := http.NewRequest("GET", url, nil)
225 if err != nil {
226 return "", fmt.Errorf("failed to create request: GET %q : %v", url, err)
227 }
228 res, err := ctxhttp.Do(ctx, c.httpClient, req)
229 if err != nil {
230 return "", fmt.Errorf("failed to make request: GET %q: %v", url, err)
231 }
232 body, err := ioutil.ReadAll(res.Body)
233 if err != nil {
Matthew Dempskyd5f422f2015-10-23 20:37:37 -0700234 return "", fmt.Errorf("failed to read response body: GET %q: %v", url, err)
Evan Brown956434c2015-10-02 15:38:22 -0700235 }
236 res.Body.Close()
237 return string(body), nil
Johan Euphrosine093044a2015-05-12 14:21:46 -0700238}