Compare commits

...

10 Commits

Author SHA1 Message Date
Kubernetes Prow Robot
92040c8d57 Merge pull request #39 from Akasurde/patch-1
Update Compatibility matrix
2021-12-17 08:26:29 -08:00
Abhijeet Kasurde
36593660d0 Update Compatibility matrix 2021-12-17 12:06:29 +05:30
Kubernetes Prow Robot
9dac5e4c54 Merge pull request #36 from jwierzbo/patch-1
Fix 401 error for in-cluster auth case
2020-02-22 09:16:47 -08:00
jwierzbo
c2520e06df Fix 401 error for in-cluster auth case 2020-02-22 14:47:19 +01:00
Kubernetes Prow Robot
c757968c4c Merge pull request #31 from brendandburns/master
Fix Azure config (again)
2019-09-27 21:03:39 -07:00
Brendan Burns
9efe3f7be6 Fix Azure config (again) 2019-09-27 20:57:08 -07:00
Kubernetes Prow Robot
cd8e39e789 Merge pull request #28 from brendandburns/ws
Add a simple informer implementation.
2019-06-25 11:13:39 -07:00
Brendan Burns
ca7256a68a Add a simple informer implementation. 2019-06-25 08:39:56 -07:00
Kubernetes Prow Robot
075b33afc7 Merge pull request #26 from brendandburns/azure
Update Azure config.
2019-05-16 09:38:13 -07:00
Brendan Burns
109e14c880 Update Azure config. 2019-05-15 21:42:56 -07:00
6 changed files with 281 additions and 3 deletions

View File

@@ -36,7 +36,6 @@ supported versions of Kubernetes clusters.
| | Kubernetes 1.10 | Kubernetes 1.13 |
|------------------|-----------------|-----------------|
| client 0.1.0a1 | ✓ | |
|------------------|-----------------|-----------------|
| client 0.2.0a1 | | ✓ |
Key:

View File

@@ -0,0 +1,88 @@
package main
import (
"context"
"fmt"
"time"
"github.com/kubernetes-client/go/kubernetes/client"
"github.com/kubernetes-client/go/kubernetes/config"
)
type handler struct{}
func (h handler) OnAdd(obj interface{}) {
ns := obj.(*client.V1Namespace)
fmt.Printf("Added %s\n", ns.Metadata.Name)
}
func (h handler) OnUpdate(oldObj, newObj interface{}) {
ns := newObj.(*client.V1Namespace)
fmt.Printf("Updated %s\n", ns.Metadata.Name)
}
func (h handler) OnDelete(obj interface{}) {
ns := obj.(*client.V1Namespace)
fmt.Printf("Deleted %s\n", ns.Metadata.Name)
}
func main() {
c, err := config.LoadKubeConfig()
if err != nil {
panic(err.Error())
}
// create the clientset
clientset := client.NewAPIClient(c)
lister := func() ([]interface{}, string, error) {
namespaces, _, err := clientset.CoreV1Api.ListNamespace(context.Background(), nil)
if err != nil {
return nil, "", err
}
result := make([]interface{}, len(namespaces.Items))
for ix := range namespaces.Items {
result[ix] = &namespaces.Items[ix]
}
return result, namespaces.Metadata.ResourceVersion, nil
}
watcher := func(resourceVersion string) (<-chan *client.Result, <-chan error) {
watch := client.WatchClient{
Cfg: c,
Client: clientset,
Path: "/api/v1/namespaces",
MakerFn: func() interface{} {
return &client.V1Namespace{}
},
}
results, errors, err := watch.Connect(context.Background(), resourceVersion)
if err != nil {
fmt.Printf("err: %s\n", err.Error())
}
return results, errors
}
extractor := func(obj interface{}) *client.V1ObjectMeta {
return obj.(*client.V1Namespace).Metadata
}
cache := client.Cache{
Extractor: extractor,
Lister: lister,
Watcher: watcher,
}
cache.AddEventHandler(handler{})
go cache.Run(make(chan bool))
for {
fmt.Printf("----------\n")
list := cache.List()
for ix := range list {
ns := list[ix].(*client.V1Namespace)
fmt.Printf("%s %#v\n", ns.Metadata.Name, ns.Metadata.Labels)
}
fmt.Printf("----------\n")
time.Sleep(5 * time.Second)
}
}

187
kubernetes/client/cache.go Normal file
View File

@@ -0,0 +1,187 @@
package client
import (
"log"
"time"
)
type objEntry struct {
metadata *V1ObjectMeta
obj interface{}
}
// ObjectLister is a function that knows how to list objects.
type ObjectLister func() ([]interface{}, string, error)
// ObjectWatcher is a function that knows how to perform a watch.
type ObjectWatcher func(resourceVersion string) (results <-chan *Result, errors <-chan error)
// EventHandler is implemented by objects that want event notifications
type EventHandler interface {
OnAdd(obj interface{})
OnUpdate(oldObj, newObj interface{})
OnDelete(obj interface{})
}
// Informer is an interface for things that can provide notifications
type Informer interface {
AddEventHandler(handler EventHandler)
}
// Lister is an interface for things that can list objects for all namespaces or by namespace
type Lister interface {
List() []interface{}
ByNamespace(namespace string) []interface{}
}
// Validate that we implement the interfaces
var _ Lister = &Cache{}
var _ Informer = &Cache{}
// Cache is an implementation of a List/Watch cache
type Cache struct {
Extractor func(interface{}) *V1ObjectMeta
Lister ObjectLister
Watcher ObjectWatcher
allObjects []objEntry
namespaceObjects map[string][]objEntry
eventHandlers []EventHandler
}
func (c *Cache) AddEventHandler(handler EventHandler) {
c.eventHandlers = append(c.eventHandlers, handler)
}
const maxSleep = 60 * time.Second
func (c *Cache) Run(stop <-chan bool) {
sleep := 1 * time.Second
for {
select {
case <-stop:
return
default:
// pass
}
if err := c.ListWatch(); err != nil {
log.Printf("%s\n", err.Error())
time.Sleep(sleep)
sleep = sleep * 2
if sleep > maxSleep {
sleep = maxSleep
}
} else {
sleep = 1
}
}
}
func (c *Cache) ListWatch() error {
objects, resourceVersion, err := c.Lister()
if err != nil {
return err
}
for ix := range objects {
meta := c.Extractor(objects[ix])
c.AddOrUpdate(meta, objects[ix])
}
results, errors := c.Watcher(resourceVersion)
for {
select {
case result, ok := <-results:
if !ok {
return nil
}
c.ProcessResult(result)
case err := <-errors:
return err
}
}
}
func (c *Cache) ProcessResult(res *Result) {
metadata := c.Extractor(res.Object)
switch res.Type {
case Added, Modified:
c.AddOrUpdate(metadata, res.Object)
case Deleted:
c.Delete(metadata, res.Object)
}
}
func (c *Cache) AddOrUpdate(metadata *V1ObjectMeta, obj interface{}) {
var oldObj interface{}
c.allObjects, oldObj = InsertOrUpdate(c.allObjects, metadata, obj)
if len(metadata.Namespace) > 0 {
c.namespaceObjects[metadata.Namespace], _ =
InsertOrUpdate(c.namespaceObjects[metadata.Namespace], metadata, obj)
}
for ix := range c.eventHandlers {
if oldObj == nil {
c.eventHandlers[ix].OnAdd(obj)
} else {
c.eventHandlers[ix].OnUpdate(oldObj, obj)
}
}
}
func (c *Cache) Delete(metadata *V1ObjectMeta, obj interface{}) {
var deleted bool
c.allObjects, deleted = Delete(c.allObjects, metadata)
if len(metadata.Namespace) > 0 {
c.namespaceObjects[metadata.Namespace], _ =
Delete(c.namespaceObjects[metadata.Namespace], metadata)
}
if deleted {
for ix := range c.eventHandlers {
c.eventHandlers[ix].OnDelete(obj)
}
}
}
func (c *Cache) List() []interface{} {
result := make([]interface{}, len(c.allObjects))
for ix := range c.allObjects {
result[ix] = c.allObjects[ix].obj
}
return result
}
func (c *Cache) ByNamespace(namespace string) []interface{} {
list := c.namespaceObjects[namespace]
result := make([]interface{}, len(list))
for ix := range list {
result[ix] = list[ix].obj
}
return result
}
func InsertOrUpdate(list []objEntry, metadata *V1ObjectMeta, obj interface{}) ([]objEntry, interface{}) {
ix := FindObject(list, metadata)
if ix == -1 {
return append(list, objEntry{metadata: metadata, obj: obj}), nil
}
oldObj := list[ix]
list[ix] = objEntry{metadata: metadata, obj: obj}
return list, oldObj
}
func Delete(list []objEntry, metadata *V1ObjectMeta) ([]objEntry, bool) {
ix := FindObject(list, metadata)
if ix == -1 {
return list, false
}
return append(list[:ix], list[ix+1:]...), true
}
func FindObject(list []objEntry, metadata *V1ObjectMeta) int {
for ix := range list {
entry := &list[ix]
if entry.metadata.Namespace == metadata.Namespace &&
entry.metadata.Name == metadata.Name {
return ix
}
}
return -1
}

View File

@@ -15,6 +15,10 @@ type Result struct {
Object interface{}
}
const Added = "ADDED"
const Modified = "MODIFIED"
const Deleted = "DELETED"
// WatchClient is a client for Watching the Kubernetes API
type WatchClient struct {
Cfg *Configuration

View File

@@ -61,7 +61,7 @@ func (l *KubeConfigLoader) refreshAzureToken() error {
AccessToken: l.user.AuthProvider.Config["access-token"],
RefreshToken: l.user.AuthProvider.Config["refresh-token"],
ExpiresIn: l.user.AuthProvider.Config["expires-in"],
ExpiresOn: l.user.AuthProvider.Config["expires-in"],
ExpiresOn: l.user.AuthProvider.Config["expires-on"],
}
sptToken, err := adal.NewServicePrincipalTokenFromManualToken(*config, clientID, resource, token)
if err := sptToken.Refresh(); err != nil {

View File

@@ -67,7 +67,7 @@ func InClusterConfig() (*client.Configuration, error) {
BasePath: "https://" + net.JoinHostPort(host, port),
Host: net.JoinHostPort(host, port),
Scheme: "https",
DefaultHeader: map[string]string{"Authentication": "Bearer " + string(token)},
DefaultHeader: map[string]string{"Authorization": "Bearer " + string(token)},
UserAgent: defaultUserAgent,
HTTPClient: c,
}, nil