Merge pull request #24 from brendandburns/watch

Implement Watch
This commit is contained in:
Kubernetes Prow Robot
2019-03-26 21:08:47 -07:00
committed by GitHub
3 changed files with 306 additions and 0 deletions

52
examples/watch/watch.go Normal file
View File

@@ -0,0 +1,52 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Note: the example only works with the code within the same release/branch.
package main
import (
"context"
"fmt"
"github.com/kubernetes-client/go/kubernetes/client"
"github.com/kubernetes-client/go/kubernetes/config"
)
func main() {
c, err := config.LoadKubeConfig()
if err != nil {
panic(err.Error())
}
cl := client.NewAPIClient(c)
watch := client.WatchClient{
Cfg: c,
Client: cl,
Path: "/api/v1/namespaces",
MakerFn: func() interface{} { return &client.V1Namespace{} },
}
if resultChan, errChan, err := watch.Connect(context.Background()); err != nil {
panic(err)
} else {
for obj := range resultChan {
fmt.Printf("%s\n%#v\n", obj.Type, obj.Object)
}
for err := range errChan {
fmt.Printf("ERROR: %#v", err)
}
fmt.Printf("Closed!\n")
}
}

100
kubernetes/client/watch.go Normal file
View File

@@ -0,0 +1,100 @@
package client
import (
"bufio"
"context"
"encoding/json"
"fmt"
"io"
"strings"
)
// Result is a watch result
type Result struct {
Type string
Object interface{}
}
// WatchClient is a client for Watching the Kubernetes API
type WatchClient struct {
Cfg *Configuration
Client *APIClient
Path string
MakerFn func() interface{}
}
// Connect initiates a watch to the server. TODO: support watch from resource version
func (w *WatchClient) Connect(ctx context.Context) (<-chan *Result, <-chan error, error) {
url := w.Cfg.Scheme + "://" + w.Cfg.Host + w.Path + "?watch=true"
req, err := w.Client.prepareRequest(ctx, url, "GET", nil, nil, nil, nil, "", []byte{})
if err != nil {
return nil, nil, err
}
res, err := w.Client.callAPI(req)
if err != nil {
return nil, nil, err
}
if res.StatusCode != 200 {
return nil, nil, fmt.Errorf("Error connecting watch (%d: %s)", res.StatusCode, res.Status)
}
resultChan := make(chan *Result, 1)
errChan := make(chan error, 1)
processWatch(res.Body, w.MakerFn, resultChan, errChan)
return resultChan, errChan, nil
}
func processWatch(stream io.Reader, makerFn func() interface{}, resultChan chan<- *Result, errChan chan<- error) {
scanner := bufio.NewScanner(stream)
go func() {
defer close(resultChan)
defer close(errChan)
for scanner.Scan() {
watchObj, err := decode(scanner.Text(), makerFn)
if err != nil {
errChan <- err
return
}
if watchObj != nil {
resultChan <- watchObj
}
}
if err := scanner.Err(); err != nil {
errChan <- err
}
}()
}
func decode(line string, makerFn func() interface{}) (*Result, error) {
if len(line) == 0 {
return nil, nil
}
// TODO: support protocol buffer encoding?
decoder := json.NewDecoder(strings.NewReader(line))
result := &Result{}
for decoder.More() {
name, err := decoder.Token()
if err != nil {
return nil, err
}
if name == "type" {
token, err := decoder.Token()
if err != nil {
return nil, err
}
var ok bool
result.Type, ok = token.(string)
if !ok {
return nil, fmt.Errorf("Error casting %v to string", token)
}
}
if name == "object" {
obj := makerFn()
if err := decoder.Decode(&obj); err != nil {
return nil, err
}
result.Object = obj
return result, nil
}
}
return nil, nil
}

View File

@@ -0,0 +1,154 @@
package client
import (
"context"
"net/http"
"net/http/httptest"
"net/url"
"strings"
"testing"
)
func makerFn() interface{} { return &V1Namespace{} }
type staticHandler struct {
Code int
Body string
}
func (s *staticHandler) ServeHTTP(res http.ResponseWriter, req *http.Request) {
res.WriteHeader(s.Code)
res.Write([]byte(s.Body))
}
func TestFullError(t *testing.T) {
server := httptest.NewServer(&staticHandler{
Code: 404,
Body: `{"type":"ADDED","object":{"kind":"Namespace","apiVersion":"v1","metadata":{"name":"kube-system","selfLink":"/api/v1/namespaces/kube-system","uid":"164931a7-3d75-11e9-a0a0-2683b9459238","resourceVersion":"227","creationTimestamp":"2019-03-03T05:27:50Z","annotations":{"kubectl.kubernetes.io/last-applied-configuration":"{\"apiVersion\":\"v1\",\"kind\":\"Namespace\",\"metadata\":{\"annotations\":{},\"name\":\"kube-system\",\"namespace\":\"\"}}\n"}},"spec":{"finalizers":["kubernetes"]},"status":{"phase":"Active"}}}\n`,
})
defer server.Close()
u, err := url.Parse(server.URL)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
cfg := &Configuration{}
cfg.Host = u.Host
cfg.Scheme = u.Scheme
watch := WatchClient{
Cfg: cfg,
Client: NewAPIClient(cfg),
MakerFn: makerFn,
}
if _, _, err := watch.Connect(context.Background()); err == nil {
t.Error("unexpected nil error")
}
}
func TestFull(t *testing.T) {
server := httptest.NewServer(&staticHandler{
Code: 200,
Body: `{"type":"ADDED","object":{"kind":"Namespace","apiVersion":"v1","metadata":{"name":"kube-system","selfLink":"/api/v1/namespaces/kube-system","uid":"164931a7-3d75-11e9-a0a0-2683b9459238","resourceVersion":"227","creationTimestamp":"2019-03-03T05:27:50Z","annotations":{"kubectl.kubernetes.io/last-applied-configuration":"{\"apiVersion\":\"v1\",\"kind\":\"Namespace\",\"metadata\":{\"annotations\":{},\"name\":\"kube-system\",\"namespace\":\"\"}}\n"}},"spec":{"finalizers":["kubernetes"]},"status":{"phase":"Active"}}}\n`,
})
defer server.Close()
u, err := url.Parse(server.URL)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
cfg := &Configuration{}
cfg.Host = u.Host
cfg.Scheme = u.Scheme
watch := WatchClient{
Cfg: cfg,
Client: NewAPIClient(cfg),
MakerFn: makerFn,
}
resultChan, errChan, err := watch.Connect(context.Background())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
out := []*Result{}
outErrs := []error{}
for r := range resultChan {
out = append(out, r)
}
for e := range errChan {
outErrs = append(outErrs, e)
}
if len(out) != 1 {
t.Errorf("unexpected results: %v", out)
t.FailNow()
}
for ix, val := range []string{"ADDED"} {
if out[ix].Type != val {
t.Errorf("unexpected value (%d): %v", ix, out[ix])
}
}
if len(outErrs) != 0 {
t.Errorf("unexpected errors: %v", outErrs)
}
}
func TestDecode(t *testing.T) {
line := `{"type":"ADDED","object":{"kind":"Namespace","apiVersion":"v1","metadata":{"name":"kube-system","selfLink":"/api/v1/namespaces/kube-system","uid":"164931a7-3d75-11e9-a0a0-2683b9459238","resourceVersion":"227","creationTimestamp":"2019-03-03T05:27:50Z","annotations":{"kubectl.kubernetes.io/last-applied-configuration":"{\"apiVersion\":\"v1\",\"kind\":\"Namespace\",\"metadata\":{\"annotations\":{},\"name\":\"kube-system\",\"namespace\":\"\"}}\n"}},"spec":{"finalizers":["kubernetes"]},"status":{"phase":"Active"}}}`
result, err := decode(line, makerFn)
if err != nil {
t.Errorf("Unexpected non-nil: %v", err)
}
if result.Type != "ADDED" {
t.Errorf("Unexpected event type: %s expected 'ADDED'", result.Type)
}
ns, ok := result.Object.(*V1Namespace)
if !ok {
t.Errorf("Cast failed: %v", result.Object)
}
if ns.Kind != "Namespace" || ns.Metadata.Name != "kube-system" {
t.Errorf("Unexpected value %#v", *ns)
}
}
func TestMultiDecode(t *testing.T) {
lines := `
{"type":"ADDED","object":{"kind":"Namespace","apiVersion":"v1","metadata":{"name":"kube-system","selfLink":"/api/v1/namespaces/kube-system","uid":"164931a7-3d75-11e9-a0a0-2683b9459238","resourceVersion":"227","creationTimestamp":"2019-03-03T05:27:50Z","annotations":{"kubectl.kubernetes.io/last-applied-configuration":"{\"apiVersion\":\"v1\",\"kind\":\"Namespace\",\"metadata\":{\"annotations\":{},\"name\":\"kube-system\",\"namespace\":\"\"}}\n"}},"spec":{"finalizers":["kubernetes"]},"status":{"phase":"Active"}}}
{"type":"MODIFIED","object":{"kind":"Namespace","apiVersion":"v1","metadata":{"name":"kube-system-2","selfLink":"/api/v1/namespaces/kube-system","uid":"164931a7-3d75-11e9-a0a0-2683b9459238","resourceVersion":"227","creationTimestamp":"2019-03-03T05:27:50Z","annotations":{"kubectl.kubernetes.io/last-applied-configuration":"{\"apiVersion\":\"v1\",\"kind\":\"Namespace\",\"metadata\":{\"annotations\":{},\"name\":\"kube-system\",\"namespace\":\"\"}}\n"}},"spec":{"finalizers":["kubernetes"]},"status":{"phase":"Active"}}}
{"type":"DELETED","object":{"kind":"Namespace","apiVersion":"v1","metadata":{"name":"kube-system-3","selfLink":"/api/v1/namespaces/kube-system","uid":"164931a7-3d75-11e9-a0a0-2683b9459238","resourceVersion":"227","creationTimestamp":"2019-03-03T05:27:50Z","annotations":{"kubectl.kubernetes.io/last-applied-configuration":"{\"apiVersion\":\"v1\",\"kind\":\"Namespace\",\"metadata\":{\"annotations\":{},\"name\":\"kube-system\",\"namespace\":\"\"}}\n"}},"spec":{"finalizers":["kubernetes"]},"status":{"phase":"Active"}}}
`
results := make(chan *Result)
errs := make(chan error)
processWatch(strings.NewReader(lines), makerFn, results, errs)
out := []*Result{}
outErrs := []error{}
for r := range results {
out = append(out, r)
}
for e := range errs {
outErrs = append(outErrs, e)
}
if len(out) != 3 {
t.Errorf("unexpected results: %v", out)
}
for ix, val := range []string{"ADDED", "MODIFIED", "DELETED"} {
if out[ix].Type != val {
t.Errorf("unexpected value (%d): %v", ix, out[ix])
}
}
if len(outErrs) != 0 {
t.Errorf("unexpected errors: %v", outErrs)
}
}