Ported GenericKubernetesApi from java along with other utilities (#682)

* Ported GenericKubernetesApi from java along with other utilities

* Replace DeleteOptions for V1DeleteOptions

* Clean up and add clear()

* Clean up

* Removed TweakApiHandler

* Rename methods to follow "async" pattern

* Fix method naming

* Remove unneeded json property

* Rearrange httpsuccess logic

* Simplify dispose pattern

* Treat MockKubeServerFlags as flags

* Clean up flags logic

* Remove unneeded json properties

* Fix cs formatting

* Remove unused variable

* Move MockApi server options to seperate class and revert MockApi back to original

* Remove IRunnable

* Refactor config constants to use existing service account path
This commit is contained in:
David Dieruf
2021-11-03 13:52:34 -04:00
committed by GitHub
parent c23baaf3e8
commit 4ff1ea49b8
23 changed files with 1359 additions and 108 deletions

View File

@@ -0,0 +1,12 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net5.0</TargetFramework>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\KubernetesClient\KubernetesClient.csproj" />
</ItemGroup>
</Project>

View File

@@ -0,0 +1,61 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using k8s;
using k8s.Models;
using k8s.Util.Common;
using k8s.Util.Common.Generic;
namespace GenericKubernetesApiExample
{
public class Program
{
private static GenericKubernetesApi _genericKubernetesApi;
public static void Main(string[] args)
{
var config = KubernetesClientConfiguration.BuildDefaultConfig();
IKubernetes client = new Kubernetes(config);
var cts = new CancellationTokenSource();
_genericKubernetesApi = new GenericKubernetesApi(
apiGroup: "pod",
apiVersion: "v1",
resourcePlural: "pods",
apiClient: client);
var aPod = GetNamespacedPod(Namespaces.NamespaceDefault, "my-pod-name", cts.Token);
var aListOfPods = ListPodsInNamespace(Namespaces.NamespaceDefault, cts.Token);
// Watch for pod actions in a namespsace
using var watch = _genericKubernetesApi.Watch<V1Pod>(
Namespaces.NamespaceDefault,
(eventType, pod) => { Console.WriteLine("The event {0} happened on pod named {1}", eventType, pod.Metadata.Name); },
exception => { Console.WriteLine("Oh no! An exception happened while watching pods. The message was '{0}'.", exception.Message); },
() => { Console.WriteLine("The server closed the connection."); });
Console.WriteLine("press ctrl + c to stop watching");
var ctrlc = new ManualResetEventSlim(false);
Console.CancelKeyPress += (sender, eventArgs) => ctrlc.Set();
ctrlc.Wait();
cts.Cancel();
}
private static V1Pod GetNamespacedPod(string @namespace, string podName, CancellationToken cancellationToken)
{
var resp = Task.Run(
async () => await _genericKubernetesApi.GetAsync<V1Pod>(@namespace, podName, cancellationToken).ConfigureAwait(false), cancellationToken);
return resp.Result;
}
private static V1PodList ListPodsInNamespace(string @namespace, CancellationToken cancellationToken)
{
var resp = Task.Run(
async () => await _genericKubernetesApi.ListAsync<V1PodList>(@namespace, cancellationToken).ConfigureAwait(false), cancellationToken);
return resp.Result;
}
}
}

View File

@@ -43,6 +43,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "customResource", "examples\
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KubernetesGenerator", "gen\KubernetesGenerator\KubernetesGenerator.csproj", "{79BA7C4A-98AA-467E-80D4-0E4F03EE6DDE}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "GenericKubernetesApi", "examples\GenericKubernetesApi\GenericKubernetesApi.csproj", "{F81AE4C4-E044-4225-BD76-385A0DE621FD}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -245,6 +247,18 @@ Global
{79BA7C4A-98AA-467E-80D4-0E4F03EE6DDE}.Release|x64.Build.0 = Release|Any CPU
{79BA7C4A-98AA-467E-80D4-0E4F03EE6DDE}.Release|x86.ActiveCfg = Release|Any CPU
{79BA7C4A-98AA-467E-80D4-0E4F03EE6DDE}.Release|x86.Build.0 = Release|Any CPU
{F81AE4C4-E044-4225-BD76-385A0DE621FD}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{F81AE4C4-E044-4225-BD76-385A0DE621FD}.Debug|Any CPU.Build.0 = Debug|Any CPU
{F81AE4C4-E044-4225-BD76-385A0DE621FD}.Debug|x64.ActiveCfg = Debug|Any CPU
{F81AE4C4-E044-4225-BD76-385A0DE621FD}.Debug|x64.Build.0 = Debug|Any CPU
{F81AE4C4-E044-4225-BD76-385A0DE621FD}.Debug|x86.ActiveCfg = Debug|Any CPU
{F81AE4C4-E044-4225-BD76-385A0DE621FD}.Debug|x86.Build.0 = Debug|Any CPU
{F81AE4C4-E044-4225-BD76-385A0DE621FD}.Release|Any CPU.ActiveCfg = Release|Any CPU
{F81AE4C4-E044-4225-BD76-385A0DE621FD}.Release|Any CPU.Build.0 = Release|Any CPU
{F81AE4C4-E044-4225-BD76-385A0DE621FD}.Release|x64.ActiveCfg = Release|Any CPU
{F81AE4C4-E044-4225-BD76-385A0DE621FD}.Release|x64.Build.0 = Release|Any CPU
{F81AE4C4-E044-4225-BD76-385A0DE621FD}.Release|x86.ActiveCfg = Release|Any CPU
{F81AE4C4-E044-4225-BD76-385A0DE621FD}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -266,6 +280,7 @@ Global
{4D2AE427-F856-49E5-B61D-EA6B17D89051} = {8AF4A5C2-F0CE-47D5-A4C5-FE4AB83CA509}
{95672061-5799-4454-ACDB-D6D330DB1EC4} = {B70AFB57-57C9-46DC-84BE-11B7DDD34B40}
{79BA7C4A-98AA-467E-80D4-0E4F03EE6DDE} = {879F8787-C3BB-43F3-A92D-6D4C7D3A5285}
{F81AE4C4-E044-4225-BD76-385A0DE621FD} = {B70AFB57-57C9-46DC-84BE-11B7DDD34B40}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {049A763A-C891-4E8D-80CF-89DD3E22ADC7}

View File

@@ -0,0 +1,16 @@
using System;
namespace k8s.Util.Common
{
public class BadNotificationException : Exception
{
public BadNotificationException()
{
}
public BadNotificationException(string message)
: base(message)
{
}
}
}

View File

@@ -0,0 +1,15 @@
namespace k8s.Util.Common
{
public static class Config
{
public static string ServiceAccountCaPath => KubernetesClientConfiguration.ServiceAccountPath + "/ca.crt";
public static string ServiceAccountTokenPath => KubernetesClientConfiguration.ServiceAccountPath + "/token";
public static string ServiceAccountNamespacePath => KubernetesClientConfiguration.ServiceAccountPath + "/namespace";
public static string EnvKubeconfig => "KUBECONFIG";
public static string EnvServiceHost => "KUBERNETES_SERVICE_HOST";
public static string EnvServicePort => "KUBERNETES_SERVICE_PORT";
// The last resort host to try
public static string DefaultFallbackHost => "http://localhost:8080";
}
}

View File

@@ -0,0 +1,657 @@
using System;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using k8s.Models;
using k8s.Util.Common.Generic.Options;
using Microsoft.Rest;
using Microsoft.Rest.Serialization;
namespace k8s.Util.Common.Generic
{
/// <summary>
///
/// The Generic kubernetes api provides a unified client interface for not only the non-core-group
/// built-in resources from kubernetes but also the custom-resources models meet the following
/// requirements:
///
/// 1. there's a `V1ObjectMeta` field in the model along with its getter/setter. 2. there's a
/// `V1ListMeta` field in the list model along with its getter/setter. - supports Json
/// serialization/deserialization. 3. the generic kubernetes api covers all the basic operations over
/// the custom resources including {get, list, watch, create, update, patch, delete}.
///
/// - For kubernetes-defined failures, the server will return a {@link V1Status} with 4xx/5xx
/// code. The status object will be nested in {@link KubernetesApiResponse#getStatus()} - For the
/// other unknown reason (including network, JVM..), throws an unchecked exception.
/// </summary>
public class GenericKubernetesApi
{
private readonly string _apiGroup;
private readonly string _apiVersion;
private readonly string _resourcePlural;
private readonly IKubernetes _client;
/// <summary>
/// Initializes a new instance of the <see cref="GenericKubernetesApi"/> class.
/// </summary>
/// <param name="apiGroup"> the api group"></param>
/// <param name="apiVersion"> the api version"></param>
/// <param name="resourcePlural"> the resource plural, e.g. "jobs""></param>
/// <param name="apiClient"> optional client"></param>
public GenericKubernetesApi(string apiGroup = default, string apiVersion = default, string resourcePlural = default, IKubernetes apiClient = default)
{
_apiGroup = apiGroup ?? throw new ArgumentNullException(nameof(apiGroup));
_apiVersion = apiVersion ?? throw new ArgumentNullException(nameof(apiVersion));
_resourcePlural = resourcePlural ?? throw new ArgumentNullException(nameof(resourcePlural));
_client = apiClient ?? new Kubernetes(KubernetesClientConfiguration.BuildDefaultConfig());
}
public TimeSpan ClientTimeout => _client.HttpClient.Timeout;
public void SetClientTimeout(TimeSpan value)
{
_client.HttpClient.Timeout = value;
}
/// <summary>
/// Get kubernetes object.
/// </summary>
/// <typeparam name="T">the object type</typeparam>
/// <param name="name">the object name </param>
/// <param name="cancellationToken">the token </param>
/// <returns>The object</returns>
public Task<T> GetAsync<T>(string name, CancellationToken cancellationToken = default)
where T : class, IKubernetesObject<V1ObjectMeta>
{
return GetAsync<T>(name, new GetOptions(), cancellationToken);
}
/// <summary>
/// Get kubernetes object under the namespaceProperty.
/// </summary>
/// <typeparam name="T">the object type</typeparam>
/// <param name="namespaceProperty"> the namespaceProperty</param>
/// <param name="name"> the name</param>
/// <param name="cancellationToken">the token </param>
/// <returns>the kubernetes object</returns>
public Task<T> GetAsync<T>(string namespaceProperty, string name, CancellationToken cancellationToken = default)
where T : class, IKubernetesObject<V1ObjectMeta>
{
return GetAsync<T>(namespaceProperty, name, new GetOptions(), cancellationToken);
}
/// <summary>
/// List kubernetes object cluster-scoped.
/// </summary>
/// <typeparam name="T">the object type</typeparam>
/// <param name="cancellationToken">the token </param>
/// <returns>the kubernetes object</returns>
public Task<T> ListAsync<T>(CancellationToken cancellationToken = default)
where T : class, IKubernetesObject<V1ListMeta>
{
return ListAsync<T>(new ListOptions(), cancellationToken);
}
/// <summary>
/// List kubernetes object under the namespaceProperty.
/// </summary>
/// <typeparam name="T">the object type</typeparam>
/// <param name="namespaceProperty"> the namespace</param>
/// <param name="cancellationToken">the token </param>
/// <returns>the kubernetes object</returns>
public Task<T> ListAsync<T>(string namespaceProperty, CancellationToken cancellationToken = default)
where T : class, IKubernetesObject<V1ListMeta>
{
return ListAsync<T>(namespaceProperty, new ListOptions(), cancellationToken);
}
/// <summary>
/// Create kubernetes object, if the namespaceProperty in the object is present, it will send a
/// namespaceProperty-scoped requests, vice versa.
/// </summary>
/// <typeparam name="T">the object type</typeparam>
/// <param name="obj"> the object</param>
/// <param name="cancellationToken">the token </param>
/// <returns>the kubernetes object</returns>
public Task<T> CreateAsync<T>(T obj, CancellationToken cancellationToken = default)
where T : class, IKubernetesObject<V1ObjectMeta>
{
return CreateAsync(obj, new CreateOptions(), cancellationToken);
}
/// <summary>
/// Create kubernetes object, if the namespaceProperty in the object is present, it will send a
/// namespaceProperty-scoped requests, vice versa.
/// </summary>
/// <param name="obj"> the object</param>
/// <param name="cancellationToken">the token </param>
/// <typeparam name="T">the object type</typeparam>
/// <returns>the kubernetes object</returns>
public Task<T> UpdateAsync<T>(T obj, CancellationToken cancellationToken = default)
where T : class, IKubernetesObject<V1ObjectMeta>
{
return UpdateAsync(obj, new UpdateOptions(), cancellationToken);
}
/// <summary>
/// Patch kubernetes object.
/// </summary>
/// <param name="name"> the name</param>
/// <param name="patch"> the string patch content</param>
/// <param name="cancellationToken">the token </param>
/// <typeparam name="T">the object type</typeparam>
/// <returns>the kubernetes object</returns>
public Task<T> PatchAsync<T>(string name, object patch, CancellationToken cancellationToken = default)
where T : class, IKubernetesObject<V1ObjectMeta>
{
return PatchAsync<T>(name, patch, new PatchOptions(), cancellationToken);
}
/// <summary>
/// Patch kubernetes object under the namespaceProperty.
/// </summary>
/// <param name="namespaceProperty"> the namespaceProperty</param>
/// <param name="name"> the name</param>
/// <param name="patch"> the string patch content</param>
/// <param name="cancellationToken">the token </param>
/// <typeparam name="T">the object type</typeparam>
/// <returns>the kubernetes object</returns>
public Task<T> PatchAsync<T>(string namespaceProperty, string name, object patch, CancellationToken cancellationToken = default)
where T : class, IKubernetesObject<V1ObjectMeta>
{
return PatchAsync<T>(namespaceProperty, name, patch, new PatchOptions(), cancellationToken);
}
/// <summary>
/// Delete kubernetes object.
/// </summary>
/// <param name="name"> the name</param>
/// <param name="cancellationToken">the token </param>
/// <typeparam name="T">the object type</typeparam>
/// <returns>the kubernetes object</returns>
public Task<T> DeleteAsync<T>(string name, CancellationToken cancellationToken = default)
where T : class, IKubernetesObject<V1ObjectMeta>
{
return DeleteAsync<T>(name, new V1DeleteOptions(), cancellationToken);
}
/// <summary>
/// Delete kubernetes object under the namespaceProperty.
/// </summary>
/// <param name="namespaceProperty"> the namespaceProperty</param>
/// <param name="name"> the name</param>
/// <param name="cancellationToken">the token </param>
/// <typeparam name="T">the object type</typeparam>
/// <returns>the kubernetes object</returns>
public Task<T> DeleteAsync<T>(string namespaceProperty, string name, CancellationToken cancellationToken = default)
where T : class, IKubernetesObject<V1ObjectMeta>
{
return DeleteAsync<T>(namespaceProperty, name, new V1DeleteOptions(), cancellationToken);
}
/// <summary>
/// Creates a cluster-scoped Watch on the resource.
/// </summary>
/// <param name="onEvent">action on event</param>
/// <param name="onError">action on error</param>
/// <param name="onClosed">action on closed</param>
/// <param name="cancellationToken">the token </param>
/// <typeparam name="T">the object type</typeparam>
/// <returns>the watchable</returns>
public Watcher<T> Watch<T>(Action<WatchEventType, T> onEvent, Action<Exception> onError = default, Action onClosed = default, CancellationToken cancellationToken = default)
where T : class, IKubernetesObject<V1ObjectMeta>
{
return Watch(new ListOptions(), onEvent, onError, onClosed, cancellationToken);
}
/// <summary>
/// Creates a namespaceProperty-scoped Watch on the resource.
/// </summary>
/// <typeparam name="T">the object type</typeparam>
/// <param name="namespaceProperty"> the namespaceProperty</param>
/// <param name="onEvent">action on event</param>
/// <param name="onError">action on error</param>
/// <param name="onClosed">action on closed</param>
/// <param name="cancellationToken">the token </param>
/// <returns>the watchable</returns>
public Watcher<T> Watch<T>(string namespaceProperty, Action<WatchEventType, T> onEvent, Action<Exception> onError = default, Action onClosed = default,
CancellationToken cancellationToken = default)
where T : class, IKubernetesObject<V1ObjectMeta>
{
return Watch(namespaceProperty, new ListOptions(), onEvent, onError, onClosed, cancellationToken);
}
// TODO(yue9944882): watch one resource?
/// <summary>
/// Get kubernetes object.
/// </summary>
/// <typeparam name="T">the object type</typeparam>
/// <param name="name"> the name</param>
/// <param name="getOptions">the get options</param>
/// <param name="cancellationToken">the token </param>
/// <returns>the kubernetes object</returns>
public async Task<T> GetAsync<T>(string name, GetOptions getOptions, CancellationToken cancellationToken = default)
where T : class, IKubernetesObject<V1ObjectMeta>
{
if (string.IsNullOrEmpty(name))
{
throw new ArgumentNullException(nameof(name));
}
var resp = await _client.GetClusterCustomObjectWithHttpMessagesAsync(group: _apiGroup, plural: _resourcePlural, version: _apiVersion, name: name, cancellationToken: cancellationToken)
.ConfigureAwait(false);
return SafeJsonConvert.DeserializeObject<T>(resp.Body.ToString());
}
/// <summary>
/// Get kubernetes object.
/// </summary>
/// <typeparam name="T">the object type</typeparam>
/// <param name="namespaceProperty"> the namespaceProperty</param>
/// <param name="name"> the name</param>
/// <param name="getOptions">the get options</param>
/// <param name="cancellationToken">the token </param>
/// <returns>the kubernetes object</returns>
public async Task<T> GetAsync<T>(string namespaceProperty, string name, GetOptions getOptions, CancellationToken cancellationToken = default)
where T : class, IKubernetesObject<V1ObjectMeta>
{
if (string.IsNullOrEmpty(name))
{
throw new ArgumentNullException(nameof(name));
}
if (string.IsNullOrEmpty(namespaceProperty))
{
throw new ArgumentNullException(nameof(namespaceProperty));
}
var resp = await _client.GetNamespacedCustomObjectWithHttpMessagesAsync(group: _apiGroup, plural: _resourcePlural, version: _apiVersion, name: name, namespaceParameter: namespaceProperty,
cancellationToken: cancellationToken).ConfigureAwait(false);
return SafeJsonConvert.DeserializeObject<T>(resp.Body.ToString());
}
/// <summary>
/// List kubernetes object.
/// </summary>
/// <typeparam name="T">the object type</typeparam>
/// <param name="listOptions">the list options</param>
/// <param name="cancellationToken">the token </param>
/// <returns>the kubernetes object</returns>
public async Task<T> ListAsync<T>(ListOptions listOptions, CancellationToken cancellationToken = default)
where T : class, IKubernetesObject<V1ListMeta>
{
if (listOptions == null)
{
throw new ArgumentNullException(nameof(listOptions));
}
var resp = await _client.ListClusterCustomObjectWithHttpMessagesAsync(group: _apiGroup, plural: _resourcePlural, version: _apiVersion, resourceVersion: listOptions.ResourceVersion,
continueParameter: listOptions.Continue, fieldSelector: listOptions.FieldSelector, labelSelector: listOptions.LabelSelector, limit: listOptions.Limit,
timeoutSeconds: listOptions.TimeoutSeconds, cancellationToken: cancellationToken).ConfigureAwait(false);
return SafeJsonConvert.DeserializeObject<T>(resp.Body.ToString());
}
/// <summary>
/// List kubernetes object.
/// </summary>
/// <typeparam name="T">the object type</typeparam>
/// <param name="namespaceProperty"> the namespaceProperty</param>
/// <param name="listOptions">the list options</param>
/// <param name="cancellationToken">the token </param>
/// <returns>the kubernetes object</returns>
public async Task<T> ListAsync<T>(string namespaceProperty, ListOptions listOptions, CancellationToken cancellationToken = default)
where T : class, IKubernetesObject<V1ListMeta>
{
if (listOptions == null)
{
throw new ArgumentNullException(nameof(listOptions));
}
if (string.IsNullOrEmpty(namespaceProperty))
{
throw new ArgumentNullException(nameof(namespaceProperty));
}
var resp = await _client.ListNamespacedCustomObjectWithHttpMessagesAsync(group: _apiGroup, plural: _resourcePlural, version: _apiVersion, resourceVersion: listOptions.ResourceVersion,
continueParameter: listOptions.Continue, fieldSelector: listOptions.FieldSelector, labelSelector: listOptions.LabelSelector, limit: listOptions.Limit,
timeoutSeconds: listOptions.TimeoutSeconds, namespaceParameter: namespaceProperty, cancellationToken: cancellationToken).ConfigureAwait(false);
return SafeJsonConvert.DeserializeObject<T>(resp.Body.ToString());
}
/// <summary>
/// Create kubernetes object.
/// </summary>
/// <typeparam name="T">the object type</typeparam>
/// <param name="obj">the object</param>
/// <param name="createOptions">the create options</param>
/// <param name="cancellationToken">the token </param>
/// <returns>the kubernetes object</returns>
public async Task<T> CreateAsync<T>(T obj, CreateOptions createOptions, CancellationToken cancellationToken = default)
where T : class, IKubernetesObject<V1ObjectMeta>
{
if (obj == null)
{
throw new ArgumentNullException(nameof(obj));
}
if (createOptions == null)
{
throw new ArgumentNullException(nameof(createOptions));
}
V1ObjectMeta objectMeta = obj.Metadata;
var isNamespaced = !string.IsNullOrEmpty(objectMeta.NamespaceProperty);
if (isNamespaced)
{
return await CreateAsync(objectMeta.NamespaceProperty, obj, createOptions, cancellationToken).ConfigureAwait(false);
}
var resp = await _client.CreateClusterCustomObjectWithHttpMessagesAsync(body: obj, group: _apiGroup, plural: _resourcePlural, version: _apiVersion, dryRun: createOptions.DryRun,
fieldManager: createOptions.FieldManager, cancellationToken: cancellationToken).ConfigureAwait(false);
return SafeJsonConvert.DeserializeObject<T>(resp.Body.ToString());
}
/// <summary>
/// Create namespaced kubernetes object.
/// </summary>
/// <typeparam name="T">the object type</typeparam>
/// <param name="namespaceProperty">the namespace</param>
/// <param name="obj">the object</param>
/// <param name="createOptions">the create options</param>
/// <param name="cancellationToken">the token </param>
/// <returns>the kubernetes object</returns>
public async Task<T> CreateAsync<T>(string namespaceProperty, T obj, CreateOptions createOptions, CancellationToken cancellationToken = default)
where T : class, IKubernetesObject<V1ObjectMeta>
{
if (obj == null)
{
throw new ArgumentNullException(nameof(obj));
}
if (createOptions == null)
{
throw new ArgumentNullException(nameof(createOptions));
}
var resp = await _client.CreateNamespacedCustomObjectWithHttpMessagesAsync(body: obj, group: _apiGroup, plural: _resourcePlural, version: _apiVersion,
namespaceParameter: namespaceProperty, dryRun: createOptions.DryRun, fieldManager: createOptions.FieldManager, cancellationToken: cancellationToken).ConfigureAwait(false);
return SafeJsonConvert.DeserializeObject<T>(resp.Body.ToString());
}
/// <summary>
/// Update kubernetes object.
/// </summary>
/// <typeparam name="T">the object type</typeparam>
/// <param name="obj">the object</param>
/// <param name="updateOptions">the update options</param>
/// <param name="cancellationToken">the token </param>
/// <returns>the kubernetes object</returns>
public async Task<T> UpdateAsync<T>(T obj, UpdateOptions updateOptions, CancellationToken cancellationToken = default)
where T : class, IKubernetesObject<V1ObjectMeta>
{
if (obj == null)
{
throw new ArgumentNullException(nameof(obj));
}
if (updateOptions == null)
{
throw new ArgumentNullException(nameof(updateOptions));
}
V1ObjectMeta objectMeta = obj.Metadata;
var isNamespaced = !string.IsNullOrEmpty(objectMeta.NamespaceProperty);
HttpOperationResponse<object> resp;
if (isNamespaced)
{
resp = await _client.ReplaceNamespacedCustomObjectWithHttpMessagesAsync(body: obj, name: objectMeta.Name, group: _apiGroup, plural: _resourcePlural, version: _apiVersion,
namespaceParameter: objectMeta.NamespaceProperty, dryRun: updateOptions.DryRun, fieldManager: updateOptions.FieldManager, cancellationToken: cancellationToken)
.ConfigureAwait(false);
}
else
{
resp = await _client.ReplaceClusterCustomObjectWithHttpMessagesAsync(body: obj, name: objectMeta.Name, group: _apiGroup ?? obj.ApiGroup(), plural: _resourcePlural,
version: _apiVersion, dryRun: updateOptions.DryRun, fieldManager: updateOptions.FieldManager, cancellationToken: cancellationToken).ConfigureAwait(false);
}
return SafeJsonConvert.DeserializeObject<T>(resp.Body.ToString());
}
/// <summary>
/// Create kubernetes object, if the namespaceProperty in the object is present, it will send a
/// namespaceProperty-scoped requests, vice versa.
/// </summary>
/// <typeparam name="T">the object type</typeparam>
/// <param name="obj"> the object</param>
/// <param name="status"> function to extract the status from the object</param>
/// <param name="cancellationToken">the token </param>
/// <returns>the kubernetes object</returns>
public Task<T> UpdateStatusAsync<T>(T obj, Func<T, object> status, CancellationToken cancellationToken = default)
where T : class, IKubernetesObject<V1ObjectMeta>
{
return UpdateStatusAsync(obj, status, new UpdateOptions(), cancellationToken);
}
/// <summary>
/// Update status of kubernetes object.
/// </summary>
/// <typeparam name="T">the object type</typeparam>
/// <param name="obj"> the object</param>
/// <param name="status"> function to extract the status from the object</param>
/// <param name="updateOptions">the update options</param>
/// <param name="cancellationToken">the token </param>
/// <returns>the kubernetes object</returns>
public async Task<T> UpdateStatusAsync<T>(T obj, Func<T, object> status, UpdateOptions updateOptions, CancellationToken cancellationToken = default)
where T : class, IKubernetesObject<V1ObjectMeta>
{
if (obj == null)
{
throw new ArgumentNullException(nameof(obj));
}
if (updateOptions == null)
{
throw new ArgumentNullException(nameof(updateOptions));
}
V1ObjectMeta objectMeta = obj.Metadata;
HttpOperationResponse<object> resp;
var isNamespaced = !string.IsNullOrEmpty(objectMeta.NamespaceProperty);
if (isNamespaced)
{
resp = await _client.PatchNamespacedCustomObjectStatusWithHttpMessagesAsync(body: obj, group: _apiGroup, version: _apiVersion, namespaceParameter: objectMeta.NamespaceProperty,
plural: _resourcePlural, name: objectMeta.Name, dryRun: updateOptions.DryRun, fieldManager: updateOptions.FieldManager, force: updateOptions.Force,
cancellationToken: cancellationToken).ConfigureAwait(false);
}
else
{
resp = await _client.PatchClusterCustomObjectStatusWithHttpMessagesAsync(body: obj, group: _apiGroup, version: _apiVersion, plural: _resourcePlural, name: objectMeta.Name,
dryRun: updateOptions.DryRun, fieldManager: updateOptions.FieldManager, force: updateOptions.Force, cancellationToken: cancellationToken).ConfigureAwait(false);
}
return SafeJsonConvert.DeserializeObject<T>(resp.Body.ToString());
}
/// <summary>
/// Patch kubernetes object.
/// </summary>
/// <typeparam name="T">the object type</typeparam>
/// <param name="name"> the name</param>
/// <param name="obj"> the object</param>
/// <param name="patchOptions">the patch options</param>
/// <param name="cancellationToken">the token </param>
/// <returns>the kubernetes object</returns>
public async Task<T> PatchAsync<T>(string name, object obj, PatchOptions patchOptions, CancellationToken cancellationToken = default)
where T : class, IKubernetesObject<V1ObjectMeta>
{
if (obj == null)
{
throw new ArgumentNullException(nameof(obj));
}
if (patchOptions == null)
{
throw new ArgumentNullException(nameof(patchOptions));
}
if (string.IsNullOrEmpty(name))
{
throw new ArgumentNullException(nameof(name));
}
var resp = await _client.PatchClusterCustomObjectWithHttpMessagesAsync(body: obj, group: _apiGroup, version: _apiVersion, plural: _resourcePlural, name: name, dryRun: patchOptions.DryRun,
fieldManager: patchOptions.FieldManager, force: patchOptions.Force, cancellationToken: cancellationToken).ConfigureAwait(false);
return SafeJsonConvert.DeserializeObject<T>(resp.Body.ToString());
}
/// <summary>
/// Patch kubernetes object.
/// </summary>
/// <typeparam name="T">the object type</typeparam>
/// <param name="namespaceProperty"> the namespaceProperty</param>
/// <param name="name"> the name</param>
/// <param name="obj"> the object</param>
/// <param name="patchOptions">the patch options</param>
/// <param name="cancellationToken">the token </param>
/// <returns>the kubernetes object</returns>
public async Task<T> PatchAsync<T>(string namespaceProperty, string name, object obj, PatchOptions patchOptions, CancellationToken cancellationToken = default)
where T : class, IKubernetesObject<V1ObjectMeta>
{
if (string.IsNullOrEmpty(namespaceProperty))
{
throw new ArgumentNullException(nameof(namespaceProperty));
}
if (string.IsNullOrEmpty(name))
{
throw new ArgumentNullException(nameof(name));
}
if (obj == null)
{
throw new ArgumentNullException(nameof(obj));
}
if (patchOptions == null)
{
throw new ArgumentNullException(nameof(patchOptions));
}
var resp = await _client.PatchNamespacedCustomObjectWithHttpMessagesAsync(body: obj, group: _apiGroup, version: _apiVersion, namespaceParameter: namespaceProperty, plural: _resourcePlural,
name: name, dryRun: patchOptions.DryRun, fieldManager: patchOptions.FieldManager, force: patchOptions.Force, cancellationToken: cancellationToken).ConfigureAwait(false);
return SafeJsonConvert.DeserializeObject<T>(resp.Body.ToString());
}
/// <summary>
/// Delete kubernetes object.
/// </summary>
/// <typeparam name="T">the object type</typeparam>
/// <param name="name"> the name</param>
/// <param name="deleteOptions">the delete options</param>
/// <param name="cancellationToken">the token </param>
/// <returns>the kubernetes object</returns>
public async Task<T> DeleteAsync<T>(string name, V1DeleteOptions deleteOptions, CancellationToken cancellationToken = default)
where T : class, IKubernetesObject<V1ObjectMeta>
{
if (string.IsNullOrEmpty(name))
{
throw new ArgumentNullException(nameof(name));
}
var resp = await _client.DeleteClusterCustomObjectWithHttpMessagesAsync(
group: _apiGroup, version: _apiVersion, plural: _resourcePlural, name: name, body: deleteOptions, cancellationToken: cancellationToken).ConfigureAwait(false);
return SafeJsonConvert.DeserializeObject<T>(resp.Body.ToString());
}
/// <summary>
/// Delete kubernetes object.
/// </summary>
/// <typeparam name="T">the object type</typeparam>
/// <param name="namespaceProperty"> the namespaceProperty</param>
/// <param name="name"> the name</param>
/// <param name="deleteOptions">the delete options</param>
/// <param name="cancellationToken">the token </param>
/// <returns>the kubernetes object</returns>
public async Task<T> DeleteAsync<T>(string namespaceProperty, string name, V1DeleteOptions deleteOptions, CancellationToken cancellationToken = default)
where T : class, IKubernetesObject<V1ObjectMeta>
{
if (string.IsNullOrEmpty(namespaceProperty))
{
throw new ArgumentNullException(nameof(namespaceProperty));
}
if (string.IsNullOrEmpty(name))
{
throw new ArgumentNullException(nameof(name));
}
var resp = await _client.DeleteNamespacedCustomObjectWithHttpMessagesAsync(group: _apiGroup, version: _apiVersion, namespaceParameter: namespaceProperty, plural: _resourcePlural,
name: name, body: deleteOptions, cancellationToken: cancellationToken).ConfigureAwait(false);
return SafeJsonConvert.DeserializeObject<T>(resp.Body.ToString());
}
/// <summary>
/// Watch watchable.
/// </summary>
/// <param name="listOptions">the list options</param>
/// <param name="onEvent">action on event</param>
/// <param name="onError">action on error</param>
/// <param name="onClosed">action on closed</param>
/// <param name="cancellationToken">the token </param>
/// <typeparam name="T">the object type</typeparam>
/// <returns>the watchable</returns>
public Watcher<T> Watch<T>(ListOptions listOptions, Action<WatchEventType, T> onEvent, Action<Exception> onError = default, Action onClosed = default,
CancellationToken cancellationToken = default)
where T : class, IKubernetesObject<V1ObjectMeta>
{
if (listOptions == null)
{
throw new ArgumentNullException(nameof(listOptions));
}
var resp = _client.ListClusterCustomObjectWithHttpMessagesAsync(group: _apiGroup, version: _apiVersion, plural: _resourcePlural, continueParameter: listOptions.Continue,
fieldSelector: listOptions.FieldSelector, labelSelector: listOptions.LabelSelector, limit: listOptions.Limit, resourceVersion: listOptions.ResourceVersion,
timeoutSeconds: listOptions.TimeoutSeconds, watch: true, cancellationToken: cancellationToken);
return resp.Watch(onEvent, onError, onClosed);
}
/// <summary>
/// Watch watchable.
/// </summary>
/// <param name="namespaceProperty"> the namespaceProperty</param>
/// <param name="listOptions">the list options</param>
/// <param name="onEvent">action on event</param>
/// <param name="onError">action on error</param>
/// <param name="onClosed">action on closed</param>
/// <param name="cancellationToken">the token </param>
/// <typeparam name="T">the object type</typeparam>
/// <returns>the watchable</returns>
public Watcher<T> Watch<T>(string namespaceProperty, ListOptions listOptions, Action<WatchEventType, T> onEvent, Action<Exception> onError = default, Action onClosed = default,
CancellationToken cancellationToken = default)
where T : class, IKubernetesObject<V1ObjectMeta>
{
if (listOptions == null)
{
throw new ArgumentNullException(nameof(listOptions));
}
if (string.IsNullOrEmpty(namespaceProperty))
{
throw new ArgumentNullException(nameof(namespaceProperty));
}
var resp = _client.ListNamespacedCustomObjectWithHttpMessagesAsync(group: _apiGroup, version: _apiVersion, namespaceParameter: namespaceProperty, plural: _resourcePlural,
continueParameter: listOptions.Continue, fieldSelector: listOptions.FieldSelector, labelSelector: listOptions.LabelSelector, limit: listOptions.Limit,
resourceVersion: listOptions.ResourceVersion, timeoutSeconds: listOptions.TimeoutSeconds, watch: true, cancellationToken: cancellationToken);
return resp.Watch(onEvent, onError, onClosed);
}
}
}

View File

@@ -0,0 +1,65 @@
using System.Net;
using k8s.Models;
namespace k8s.Util.Common.Generic
{
public class KubernetesApiResponse<TDataType>
where TDataType : IKubernetesObject
{
public KubernetesApiResponse(TDataType @object)
{
Object = @object;
Status = null;
HttpStatusCode = HttpStatusCode.OK; // 200
}
public KubernetesApiResponse(V1Status status, HttpStatusCode httpStatusCode)
{
Object = default(TDataType);
Status = status;
HttpStatusCode = httpStatusCode;
}
public TDataType Object { get; }
public V1Status Status { get; }
public HttpStatusCode HttpStatusCode { get; }
public bool IsSuccess => ((int)HttpStatusCode > 199 && (int)HttpStatusCode < 300); // 400
/// <summary>
/// Throws api exception kubernetes api response on failure. This is the recommended approach to
/// deal with errors returned from server.
/// </summary>
/// <returns>the kubernetes api response</returns>
/// <exception cref="HttpListenerException">the api exception</exception>
public KubernetesApiResponse<TDataType> ThrowsApiException()
{
return OnFailure(new ErrorStatusHandler());
}
/// <summary>
/// Calling errorStatusHandler upon errors from server
/// </summary>
/// <param name="errorStatusHandler">the error status handler</param>
/// <returns>the kubernetes api response</returns>
public KubernetesApiResponse<TDataType> OnFailure(ErrorStatusHandler errorStatusHandler)
{
if (!IsSuccess && errorStatusHandler != null)
{
errorStatusHandler.Handle((int)HttpStatusCode, Status);
}
return this;
}
public class ErrorStatusHandler
{
public void Handle(int code, V1Status errorStatus)
{
throw new HttpListenerException(code, errorStatus?.Message);
}
}
}
}

View File

@@ -0,0 +1,17 @@
using Newtonsoft.Json;
namespace k8s.Util.Common.Generic.Options
{
public class CreateOptions
{
public string DryRun { get; private set; }
public string FieldManager { get; private set; }
public CreateOptions(string dryRun = default, string fieldManager = default)
{
DryRun = dryRun;
FieldManager = fieldManager;
}
}
}

View File

@@ -0,0 +1,6 @@
namespace k8s.Util.Common.Generic.Options
{
public class GetOptions
{
}
}

View File

@@ -0,0 +1,30 @@
using Newtonsoft.Json;
namespace k8s.Util.Common.Generic.Options
{
public class ListOptions
{
public int? TimeoutSeconds { get; private set; }
public int Limit { get; private set; }
public string FieldSelector { get; private set; }
public string LabelSelector { get; private set; }
public string ResourceVersion { get; private set; }
public string Continue { get; private set; }
public ListOptions(int? timeoutSeconds = default, int limit = default, string fieldSelector = default, string labelSelector = default, string resourceVersion = default,
string @continue = default)
{
TimeoutSeconds = timeoutSeconds;
Limit = limit;
FieldSelector = fieldSelector;
LabelSelector = labelSelector;
ResourceVersion = resourceVersion;
Continue = @continue;
}
}
}

View File

@@ -0,0 +1,20 @@
using Newtonsoft.Json;
namespace k8s.Util.Common.Generic.Options
{
public class PatchOptions
{
public string DryRun { get; private set; }
public bool Force { get; private set; }
public string FieldManager { get; private set; }
public PatchOptions(string dryRun = default, bool force = false, string fieldManager = default)
{
DryRun = dryRun;
Force = force;
FieldManager = fieldManager;
}
}
}

View File

@@ -0,0 +1,20 @@
using Newtonsoft.Json;
namespace k8s.Util.Common.Generic.Options
{
public class UpdateOptions
{
public string DryRun { get; private set; }
public bool Force { get; private set; }
public string FieldManager { get; private set; }
public UpdateOptions(string dryRun = default, bool force = false, string fieldManager = default)
{
DryRun = dryRun;
Force = force;
FieldManager = fieldManager;
}
}
}

View File

@@ -0,0 +1,22 @@
using System.IO;
using System.Text;
namespace k8s.Util.Common
{
/// <summary>
/// Namespaces provides a set of helpers for operating namespaces.
/// </summary>
public class Namespaces
{
public const string NamespaceAll = "";
public const string NamespaceDefault = "default";
public const string NamespaceKubeSystem = "kube-system";
public static string GetPodNamespace()
{
return File.ReadAllText(Config.ServiceAccountNamespacePath, Encoding.UTF8);
}
}
}

View File

@@ -16,7 +16,7 @@ namespace k8s.Util.Informer.Cache
/// <summary>
/// keyFunc defines how to map index objects into indices
/// </summary>
private Func<TApiType, string> _keyFunc;
private Func<IKubernetesObject<V1ObjectMeta>, string> _keyFunc;
/// <summary>
/// indexers stores index functions by their names
@@ -51,6 +51,9 @@ namespace k8s.Util.Informer.Cache
/// learn more: https://docs.microsoft.com/en-us/dotnet/csharp/language-reference/keywords/lock-statement</remarks>
private readonly object _lock = new object();
/// <summary>
/// Initializes a new instance of the <see cref="Cache{TApiType}"/> class. Uses an object's namespace as the key.
/// </summary>
public Cache()
: this(Caches.NamespaceIndex, Caches.MetaNamespaceIndexFunc, Caches.DeletionHandlingMetaNamespaceKeyFunc)
{
@@ -63,13 +66,23 @@ namespace k8s.Util.Informer.Cache
/// <param name="indexName">the index name, an unique name representing the index</param>
/// <param name="indexFunc">the index func by which we map multiple object to an index for querying</param>
/// <param name="keyFunc">the key func by which we map one object to an unique key for storing</param>
public Cache(string indexName, Func<TApiType, List<string>> indexFunc, Func<TApiType, string> keyFunc)
public Cache(string indexName, Func<TApiType, List<string>> indexFunc, Func<IKubernetesObject<V1ObjectMeta>, string> keyFunc)
{
_indexers[indexName] = indexFunc;
_keyFunc = keyFunc;
_indices[indexName] = new Dictionary<string, HashSet<string>>();
}
public void Clear()
{
lock (_lock)
{
_items?.Clear();
_indices?.Clear();
_indexers?.Clear();
}
}
/// <summary>
/// Add objects.
/// </summary>
@@ -421,9 +434,9 @@ namespace k8s.Util.Informer.Cache
_indexers[indexName] = indexFunc;
}
public Func<TApiType, string> KeyFunc => _keyFunc;
public Func<IKubernetesObject<V1ObjectMeta>, string> KeyFunc => _keyFunc;
public void SetKeyFunc(Func<TApiType, string> keyFunc)
public void SetKeyFunc(Func<IKubernetesObject<V1ObjectMeta>, string> keyFunc)
{
_keyFunc = keyFunc;
}

View File

@@ -1,3 +1,4 @@
using System;
using System.Collections.Generic;
using k8s.Models;
@@ -61,5 +62,10 @@ namespace k8s.Util.Informer.Cache
/// </summary>
/// <returns>list of all the items</returns>
IEnumerable<TApiType> List();
/// <summary>
/// Empty the store
/// </summary>
void Clear();
}
}

View File

@@ -0,0 +1,109 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using FluentAssertions;
using k8s.Models;
using k8s.Tests.Mock;
using k8s.Util.Common;
using Xunit;
using Xunit.Abstractions;
namespace k8s.Tests.Util.Common.Generic
{
public class GenericKubernetesApiTest
{
private readonly ITestOutputHelper _outputHelper;
public GenericKubernetesApiTest(ITestOutputHelper outputHelper)
{
_outputHelper = outputHelper;
}
[Fact(DisplayName = "Create constructor success")]
public void CreateConstSuccess()
{
using var server = new MockKubeApiServer(_outputHelper);
var genericApi = Helpers.BuildGenericApi(server.Uri);
genericApi.Should().NotBeNull();
}
[Fact(DisplayName = "Get namespaced object success")]
public async Task GetNamespacedObject()
{
var serverOptions = new MockKubeApiServerOptions(MockKubeServerFlags.GetPod);
using var server = new MockKubeApiServer(_outputHelper, serverOptions.ShouldNext);
var podName = "nginx-1493591563-xb2v4";
var genericApi = Helpers.BuildGenericApi(server.Uri);
var resp = await genericApi.GetAsync<V1Pod>(Namespaces.NamespaceDefault, podName).ConfigureAwait(false);
resp.Should().NotBeNull();
resp.Metadata.Name.Should().Be(podName);
resp.Metadata.NamespaceProperty.Should().Be(Namespaces.NamespaceDefault);
}
[Fact(DisplayName = "List namespaced object success")]
public async Task ListNamespacedObject()
{
var serverOptions = new MockKubeApiServerOptions(MockKubeServerFlags.ListPods);
using var server = new MockKubeApiServer(_outputHelper, serverOptions.ShouldNext);
var genericApi = Helpers.BuildGenericApi(server.Uri);
var resp = await genericApi.ListAsync<V1PodList>(Namespaces.NamespaceDefault).ConfigureAwait(false);
resp.Should().NotBeNull();
resp.Items.Should().NotBeNull();
}
[Fact(DisplayName = "Patch namespaced object success")]
public async Task PatchNamespacedObject()
{
using var server = new MockKubeApiServer(_outputHelper);
var podName = "nginx-1493591563-xb2v4";
var genericApi = Helpers.BuildGenericApi(server.Uri);
var resp = await genericApi.PatchAsync<V1Pod>(Namespaces.NamespaceDefault, podName).ConfigureAwait(false);
resp.Should().NotBeNull();
}
[Fact(DisplayName = "Update object success")]
public async Task UpdateObject()
{
using var server = new MockKubeApiServer(_outputHelper);
var pod = Helpers.CreatePods(1).First();
var genericApi = Helpers.BuildGenericApi(server.Uri);
var resp = await genericApi.UpdateAsync(pod).ConfigureAwait(false);
resp.Should().NotBeNull();
}
[Fact(DisplayName = "Delete namespaced object success")]
public async Task DeleteNamespacedObject()
{
using var server = new MockKubeApiServer(_outputHelper);
var podName = "nginx-1493591563-xb2v4";
var genericApi = Helpers.BuildGenericApi(server.Uri);
var resp = await genericApi.DeleteAsync<V1Pod>(Namespaces.NamespaceDefault, podName).ConfigureAwait(false);
resp.Should().NotBeNull();
}
[Fact(DisplayName = "Watch namespaced object success")]
public void WatchNamespacedObject()
{
using var cts = new CancellationTokenSource();
var serverOptions = new MockKubeApiServerOptions(MockKubeServerFlags.ModifiedPod);
using var server = new MockKubeApiServer(_outputHelper, serverOptions.ShouldNext);
var genericApi = Helpers.BuildGenericApi(server.Uri);
using var resp = genericApi.Watch<V1Pod>(Namespaces.NamespaceDefault, (actionType, pod) => { }, exception => { }, () => { }, cts.Token);
resp.Should().NotBeNull();
cts.CancelAfter(1000);
serverOptions.ServerShutdown?.Set();
}
}
}

View File

@@ -0,0 +1,75 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using k8s.Models;
using k8s.Tests.Mock;
using k8s.Util.Common.Generic;
using Microsoft.AspNetCore.Http;
using Newtonsoft.Json;
using Newtonsoft.Json.Converters;
using Nito.AsyncEx;
using Xunit.Abstractions;
namespace k8s.Tests.Util
{
internal static class Helpers
{
public static IEnumerable<V1Pod> CreatePods(int cnt)
{
var pods = new List<V1Pod>();
for (var i = 0; i < cnt; i++)
{
pods.Add(new V1Pod()
{
ApiVersion = "Pod/V1",
Kind = "Pod",
Metadata = new V1ObjectMeta()
{
Name = Guid.NewGuid().ToString(),
NamespaceProperty = "the-namespace",
ResourceVersion = DateTime.Now.Ticks.ToString(),
},
});
}
return pods;
}
public static V1PodList CreatePodList(int cnt)
{
return new V1PodList()
{
ApiVersion = "Pod/V1",
Kind = "Pod",
Metadata = new V1ListMeta()
{
ResourceVersion = DateTime.Now.Ticks.ToString(),
},
Items = CreatePods(cnt).ToList(),
};
}
public static Kubernetes BuildApiClient(Uri hostAddress)
{
return new Kubernetes(new KubernetesClientConfiguration { Host = hostAddress.ToString() })
{
HttpClient =
{
Timeout = Timeout.InfiniteTimeSpan,
},
};
}
public static GenericKubernetesApi BuildGenericApi(Uri hostAddress)
{
return new GenericKubernetesApi(
apiGroup: "pod",
apiVersion: "v1",
resourcePlural: "pods",
apiClient: BuildApiClient(hostAddress));
}
}
}

View File

@@ -2,8 +2,8 @@ using System;
using System.Collections.Generic;
using System.Linq;
using FluentAssertions;
using k8s.Util.Informer.Cache;
using k8s.Models;
using k8s.Util.Informer.Cache;
using Xunit;
namespace k8s.Tests.Util.Informer.Cache
@@ -16,13 +16,12 @@ namespace k8s.Tests.Util.Informer.Cache
var cache = new Cache<V1Node>();
cache.Should().NotBeNull();
cache.GetIndexers().ContainsKey(Caches.NamespaceIndex).Should().BeTrue();
// Todo: validate all defaults gor set up
}
[Fact(DisplayName = "Add cache item success")]
private void AddCacheItemSuccess()
{
var aPod = Util.CreatePods(1).First();
var aPod = Helpers.CreatePods(1).First();
var cache = new Cache<V1Pod>();
cache.Add(aPod);
@@ -33,7 +32,7 @@ namespace k8s.Tests.Util.Informer.Cache
[Fact(DisplayName = "Update cache item success")]
private void UpdateCacheItemSuccess()
{
var aPod = Util.CreatePods(1).First();
var aPod = Helpers.CreatePods(1).First();
var cache = new Cache<V1Pod>();
@@ -47,7 +46,7 @@ namespace k8s.Tests.Util.Informer.Cache
[Fact(DisplayName = "Delete cache item success")]
private void DeleteCacheItemSuccess()
{
var aPod = Util.CreatePods(1).First();
var aPod = Helpers.CreatePods(1).First();
var cache = new Cache<V1Pod>();
@@ -61,7 +60,7 @@ namespace k8s.Tests.Util.Informer.Cache
[Fact(DisplayName = "Replace cache items success")]
private void ReplaceCacheItemsSuccess()
{
var pods = Util.CreatePods(3);
var pods = Helpers.CreatePods(3);
var aPod = pods.First();
var anotherPod = pods.Skip(1).First();
var yetAnotherPod = pods.Skip(2).First();
@@ -79,7 +78,7 @@ namespace k8s.Tests.Util.Informer.Cache
[Fact(DisplayName = "List item keys success")]
public void ListItemKeysSuccess()
{
var pods = Util.CreatePods(3);
var pods = Helpers.CreatePods(3);
var aPod = pods.First();
var anotherPod = pods.Skip(1).First();
var cache = new Cache<V1Pod>();
@@ -96,7 +95,7 @@ namespace k8s.Tests.Util.Informer.Cache
[Fact(DisplayName = "Get item doesn't exist")]
public void GetItemNotExist()
{
var aPod = Util.CreatePods(1).First();
var aPod = Helpers.CreatePods(1).First();
var cache = new Cache<V1Pod>();
var item = cache.Get(aPod);
@@ -106,7 +105,7 @@ namespace k8s.Tests.Util.Informer.Cache
[Fact(DisplayName = "Get item success")]
public void GetItemSuccess()
{
var aPod = Util.CreatePods(1).First();
var aPod = Helpers.CreatePods(1).First();
var cache = new Cache<V1Pod>();
cache.Add(aPod);
@@ -117,7 +116,7 @@ namespace k8s.Tests.Util.Informer.Cache
[Fact(DisplayName = "List items success")]
public void ListItemSuccess()
{
var pods = Util.CreatePods(3);
var pods = Helpers.CreatePods(3);
var aPod = pods.First();
var anotherPod = pods.Skip(1).First();
var yetAnotherPod = pods.Skip(2).First();
@@ -138,7 +137,7 @@ namespace k8s.Tests.Util.Informer.Cache
[Fact(DisplayName = "Get item by key success")]
public void GetItemByKeySuccess()
{
var pod = Util.CreatePods(1).First();
var pod = Helpers.CreatePods(1).First();
var cache = new Cache<V1Pod>();
cache.Add(pod);
@@ -149,7 +148,7 @@ namespace k8s.Tests.Util.Informer.Cache
[Fact(DisplayName = "Index items no index")]
public void IndexItemsNoIndex()
{
var pod = Util.CreatePods(1).First();
var pod = Helpers.CreatePods(1).First();
var cache = new Cache<V1Pod>();
@@ -161,7 +160,7 @@ namespace k8s.Tests.Util.Informer.Cache
[Fact(DisplayName = "Index items success")]
public void IndexItemsSuccess()
{
var pod = Util.CreatePods(1).First();
var pod = Helpers.CreatePods(1).First();
var cache = new Cache<V1Pod>();
@@ -191,7 +190,7 @@ namespace k8s.Tests.Util.Informer.Cache
[Fact(DisplayName = "Get index keys success")]
public void GetIndexKeysSuccess()
{
var pod = Util.CreatePods(1).First();
var pod = Helpers.CreatePods(1).First();
var cache = new Cache<V1Pod>();
@@ -221,7 +220,7 @@ namespace k8s.Tests.Util.Informer.Cache
[Fact(DisplayName = "List by index success")]
public void ListByIndexSuccess()
{
var pod = Util.CreatePods(1).First();
var pod = Helpers.CreatePods(1).First();
var cache = new Cache<V1Pod>();
@@ -323,7 +322,7 @@ namespace k8s.Tests.Util.Informer.Cache
},
};
var cache = new Cache<V1Pod>();
var newFunc = new Func<V1Pod, string>((pod) => pod.Kind);
var newFunc = new Func<IKubernetesObject<V1ObjectMeta>, string>((pod) => pod.Kind);
var defaultReturnValue = newFunc(aPod);
cache.SetKeyFunc(newFunc);

View File

@@ -12,14 +12,14 @@ namespace k8s.Tests.Util.Informer.Cache
[Fact(DisplayName = "Check for default DeletedFinalStateUnknown")]
public void CheckDefaultDeletedFinalStateUnknown()
{
var aPod = Util.CreatePods(1).First();
var aPod = Helpers.CreatePods(1).First();
Caches.DeletionHandlingMetaNamespaceKeyFunc(aPod).Should().Be($"{aPod.Metadata.NamespaceProperty}/{aPod.Metadata.Name}");
}
[Fact(DisplayName = "Check for obj DeletedFinalStateUnknown")]
public void CheckObjDeletedFinalStateUnknown()
{
var aPod = Util.CreatePods(1).First();
var aPod = Helpers.CreatePods(1).First();
var key = "a-key";
var deletedPod = new DeletedFinalStateUnknown<V1Pod>(key, aPod);
@@ -37,7 +37,7 @@ namespace k8s.Tests.Util.Informer.Cache
[Fact(DisplayName = "Get default namespace key success")]
public void GetDefaultNamespaceKeySuccess()
{
var aPod = Util.CreatePods(1).First();
var aPod = Helpers.CreatePods(1).First();
Caches.MetaNamespaceKeyFunc(aPod).Should().Be($"{aPod.Metadata.NamespaceProperty}/{aPod.Metadata.Name}");
}
@@ -50,7 +50,7 @@ namespace k8s.Tests.Util.Informer.Cache
[Fact(DisplayName = "Get default namespace index success")]
public void GetDefaultNamespaceIndexSuccess()
{
var aPod = Util.CreatePods(1).First();
var aPod = Helpers.CreatePods(1).First();
var indexes = Caches.MetaNamespaceIndexFunc(aPod);
indexes.Should().NotBeNull();

View File

@@ -1,8 +1,8 @@
using System.Linq;
using FluentAssertions;
using k8s.Models;
using Xunit;
using k8s.Util.Informer.Cache;
using Xunit;
namespace k8s.Tests.Util.Informer.Cache
{
@@ -20,7 +20,7 @@ namespace k8s.Tests.Util.Informer.Cache
[Fact(DisplayName = "List with null namespace success")]
private void ListNullNamespaceSuccess()
{
var aPod = Util.CreatePods(1).First();
var aPod = Helpers.CreatePods(1).First();
var cache = new Cache<V1Pod>();
var lister = new Lister<V1Pod>(cache);
@@ -35,7 +35,7 @@ namespace k8s.Tests.Util.Informer.Cache
[Fact(DisplayName = "List with custom namespace success")]
private void ListCustomNamespaceSuccess()
{
var aPod = Util.CreatePods(1).First();
var aPod = Helpers.CreatePods(1).First();
var cache = new Cache<V1Pod>();
var lister = new Lister<V1Pod>(cache, aPod.Metadata.NamespaceProperty);
@@ -50,7 +50,7 @@ namespace k8s.Tests.Util.Informer.Cache
[Fact(DisplayName = "Get with null namespace success")]
private void GetNullNamespaceSuccess()
{
var aPod = Util.CreatePods(1).First();
var aPod = Helpers.CreatePods(1).First();
var cache = new Cache<V1Pod>();
var lister = new Lister<V1Pod>(cache);
@@ -65,7 +65,7 @@ namespace k8s.Tests.Util.Informer.Cache
[Fact(DisplayName = "Get with custom namespace success")]
private void GetCustomNamespaceSuccess()
{
var aPod = Util.CreatePods(1).First();
var aPod = Helpers.CreatePods(1).First();
var cache = new Cache<V1Pod>();
var lister = new Lister<V1Pod>(cache, aPod.Metadata.NamespaceProperty);
@@ -78,7 +78,7 @@ namespace k8s.Tests.Util.Informer.Cache
[Fact(DisplayName = "Set custom namespace success")]
private void SetCustomNamespaceSuccess()
{
var aPod = Util.CreatePods(1).First();
var aPod = Helpers.CreatePods(1).First();
var cache = new Cache<V1Pod>();
var lister = new Lister<V1Pod>(cache);

View File

@@ -1,33 +0,0 @@
using FluentAssertions;
using k8s.Models;
using k8s.Util.Informer.Cache;
using Microsoft.Extensions.Logging;
using Xunit;
using Xunit.Abstractions;
using Xunit.Sdk;
namespace k8s.Tests.Util.Informer.Cache
{
public class ReflectorTest
{
private readonly ITestOutputHelper _ouputHelper;
public ReflectorTest(ITestOutputHelper outputHelper)
{
_ouputHelper = outputHelper;
}
[Fact(DisplayName = "Create default reflector success")]
public void CreateReflectorSuccess()
{
/*using var apiClient = new Kubernetes(_clientConfiguration);
var cache = new Cache<V1Pod>();
var queue = new DeltaFifo(Caches.MetaNamespaceKeyFunc, cache, _deltasLogger);
var listerWatcher = new ListWatcher<V1Pod, V1PodList>(apiClient, ListAllPods);
var logger = LoggerFactory.Create(builder => builder.AddXUnit(_ouputHelper).SetMinimumLevel(LogLevel.Trace)).CreateLogger<k8s.Util.Cache.Reflector>();
var reflector = new k8s.Util.Cache.Reflector<V1Pod, V1PodList>(listerWatcher, queue, logger);
reflector.Should().NotBeNull();*/
}
}
}

View File

@@ -1,45 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using k8s.Models;
namespace k8s.Tests.Util.Informer.Cache
{
internal static class Util
{
internal static IEnumerable<V1Pod> CreatePods(int cnt)
{
var pods = new List<V1Pod>();
for (var i = 0; i < cnt; i++)
{
pods.Add(new V1Pod()
{
ApiVersion = "Pod/V1",
Kind = "Pod",
Metadata = new V1ObjectMeta()
{
Name = Guid.NewGuid().ToString(),
NamespaceProperty = "the-namespace",
ResourceVersion = "1",
},
});
}
return pods;
}
internal static V1PodList CreatePostList(int cnt)
{
return new V1PodList()
{
ApiVersion = "Pod/V1",
Kind = "Pod",
Metadata = new V1ListMeta()
{
ResourceVersion = "1",
},
Items = CreatePods(cnt).ToList(),
};
}
}
}

View File

@@ -0,0 +1,171 @@
using System;
using System.Linq;
using System.Net;
using System.Threading.Tasks;
using k8s.Models;
using Microsoft.AspNetCore.Http;
using Newtonsoft.Json;
using Newtonsoft.Json.Converters;
using Nito.AsyncEx;
namespace k8s.Tests.Util
{
/// <summary>
/// Flags to configure how the server will respond to requests
/// </summary>
[Flags]
public enum MockKubeServerFlags
{
/// <summary>
/// No flag
/// </summary>
None = 0,
/// <summary>
/// Include a response with malformed json
/// </summary>
BadJson = 1,
/// <summary>
/// Include a pod added response
/// </summary>
AddedPod = 2,
/// <summary>
/// Include a pod delete response
/// </summary>
DeletedPod = 4,
/// <summary>
/// Include a pod modified response
/// </summary>
ModifiedPod = 8,
/// <summary>
/// Include a pod error response
/// </summary>
ErrorPod = 16,
/// <summary>
/// Include a response of pod list
/// </summary>
ListPods = 32,
/// <summary>
/// Include a reponse of get pod
/// </summary>
GetPod = 64,
/// <summary>
/// Throw a 500 Http status code on any request
/// </summary>
Throw500 = 128,
}
internal class MockKubeApiServerOptions
{
// paste from minikube /api/v1/namespaces/default/pods
public const string MockPodResponse =
"{\r\n \"kind\": \"PodList\",\r\n \"apiVersion\": \"v1\",\r\n \"metadata\": {\r\n \"selfLink\": \"/api/v1/namespaces/default/pods\",\r\n \"resourceVersion\": \"1762810\"\r\n },\r\n \"items\": [\r\n {\r\n \"metadata\": {\r\n \"name\": \"nginx-1493591563-xb2v4\",\r\n \"generateName\": \"nginx-1493591563-\",\r\n \"namespace\": \"default\",\r\n \"selfLink\": \"/api/v1/namespaces/default/pods/nginx-1493591563-xb2v4\",\r\n \"uid\": \"ac1abb94-9c58-11e7-aaf5-00155d744505\",\r\n \"resourceVersion\": \"1737928\",\r\n \"creationTimestamp\": \"2017-09-18T10:03:51Z\",\r\n \"labels\": {\r\n \"app\": \"nginx\",\r\n \"pod-template-hash\": \"1493591563\"\r\n },\r\n \"annotations\": {\r\n \"kubernetes.io/created-by\": \"{\\\"kind\\\":\\\"SerializedReference\\\",\\\"apiVersion\\\":\\\"v1\\\",\\\"reference\\\":{\\\"kind\\\":\\\"ReplicaSet\\\",\\\"namespace\\\":\\\"default\\\",\\\"name\\\":\\\"nginx-1493591563\\\",\\\"uid\\\":\\\"ac013b63-9c58-11e7-aaf5-00155d744505\\\",\\\"apiVersion\\\":\\\"extensions\\\",\\\"resourceVersion\\\":\\\"5306\\\"}}\\n\"\r\n },\r\n \"ownerReferences\": [\r\n {\r\n \"apiVersion\": \"extensions/v1beta1\",\r\n \"kind\": \"ReplicaSet\",\r\n \"name\": \"nginx-1493591563\",\r\n \"uid\": \"ac013b63-9c58-11e7-aaf5-00155d744505\",\r\n \"controller\": true,\r\n \"blockOwnerDeletion\": true\r\n }\r\n ]\r\n },\r\n \"spec\": {\r\n \"volumes\": [\r\n {\r\n \"name\": \"default-token-3zzcj\",\r\n \"secret\": {\r\n \"secretName\": \"default-token-3zzcj\",\r\n \"defaultMode\": 420\r\n }\r\n }\r\n ],\r\n \"containers\": [\r\n {\r\n \"name\": \"nginx\",\r\n \"image\": \"nginx\",\r\n \"resources\": {},\r\n \"volumeMounts\": [\r\n {\r\n \"name\": \"default-token-3zzcj\",\r\n \"readOnly\": true,\r\n \"mountPath\": \"/var/run/secrets/kubernetes.io/serviceaccount\"\r\n }\r\n ],\r\n \"terminationMessagePath\": \"/dev/termination-log\",\r\n \"terminationMessagePolicy\": \"File\",\r\n \"imagePullPolicy\": \"Always\"\r\n }\r\n ],\r\n \"restartPolicy\": \"Always\",\r\n \"terminationGracePeriodSeconds\": 30,\r\n \"dnsPolicy\": \"ClusterFirst\",\r\n \"serviceAccountName\": \"default\",\r\n \"serviceAccount\": \"default\",\r\n \"nodeName\": \"ubuntu\",\r\n \"securityContext\": {},\r\n \"schedulerName\": \"default-scheduler\"\r\n },\r\n \"status\": {\r\n \"phase\": \"Running\",\r\n \"conditions\": [\r\n {\r\n \"type\": \"Initialized\",\r\n \"status\": \"True\",\r\n \"lastProbeTime\": null,\r\n \"lastTransitionTime\": \"2017-09-18T10:03:51Z\"\r\n },\r\n {\r\n \"type\": \"Ready\",\r\n \"status\": \"True\",\r\n \"lastProbeTime\": null,\r\n \"lastTransitionTime\": \"2017-10-12T07:09:21Z\"\r\n },\r\n {\r\n \"type\": \"PodScheduled\",\r\n \"status\": \"True\",\r\n \"lastProbeTime\": null,\r\n \"lastTransitionTime\": \"2017-09-18T10:03:51Z\"\r\n }\r\n ],\r\n \"hostIP\": \"192.168.188.42\",\r\n \"podIP\": \"172.17.0.5\",\r\n \"startTime\": \"2017-09-18T10:03:51Z\",\r\n \"containerStatuses\": [\r\n {\r\n \"name\": \"nginx\",\r\n \"state\": {\r\n \"running\": {\r\n \"startedAt\": \"2017-10-12T07:09:20Z\"\r\n }\r\n },\r\n \"lastState\": {\r\n \"terminated\": {\r\n \"exitCode\": 0,\r\n \"reason\": \"Completed\",\r\n \"startedAt\": \"2017-10-10T21:35:51Z\",\r\n \"finishedAt\": \"2017-10-12T07:07:37Z\",\r\n \"containerID\": \"docker://94df3f3965807421ad6dc76618e00b76cb15d024919c4946f3eb46a92659c62a\"\r\n }\r\n },\r\n \"ready\": true,\r\n \"restartCount\": 7,\r\n \"image\": \"nginx:latest\",\r\n \"imageID\": \"docker-pullable://nginx@sha256:004ac1d5e791e705f12a17c80d7bb1e8f7f01aa7dca7deee6e65a03465392072\",\r\n \"containerID\": \"docker://fa11bdd48c9b7d3a6c4c3f9b6d7319743c3455ab8d00c57d59c083b319b88194\"\r\n }\r\n ],\r\n \"qosClass\": \"BestEffort\"\r\n }\r\n }\r\n ]\r\n}";
public AsyncManualResetEvent ServerShutdown { get; private set; }
private readonly MockKubeServerFlags _serverFlags;
private readonly string _mockAddedEventStreamLine = BuildWatchEventStreamLine(WatchEventType.Added);
private readonly string _mockDeletedStreamLine = BuildWatchEventStreamLine(WatchEventType.Deleted);
private readonly string _mockModifiedStreamLine = BuildWatchEventStreamLine(WatchEventType.Modified);
private readonly string _mockErrorStreamLine = BuildWatchEventStreamLine(WatchEventType.Error);
private const string MockBadStreamLine = "bad json";
public MockKubeApiServerOptions(MockKubeServerFlags? serverFlags)
{
_serverFlags = serverFlags ?? MockKubeServerFlags.None;
}
private static string BuildWatchEventStreamLine(WatchEventType eventType)
{
var corev1PodList = JsonConvert.DeserializeObject<V1PodList>(MockPodResponse);
return JsonConvert.SerializeObject(
new Watcher<V1Pod>.WatchEvent { Type = eventType, Object = corev1PodList.Items.First() },
new StringEnumConverter());
}
private async Task WriteStreamLine(HttpContext httpContext, string reponseLine)
{
const string crlf = "\r\n";
await httpContext.Response.WriteAsync(reponseLine.Replace(crlf, "")).ConfigureAwait(false);
await httpContext.Response.WriteAsync(crlf).ConfigureAwait(false);
await httpContext.Response.Body.FlushAsync().ConfigureAwait(false);
}
public async Task<bool> ShouldNext(HttpContext httpContext)
{
var isWatch = (httpContext.Request.Query.ContainsKey("watch") && httpContext.Request.Query["watch"] == "true");
var returnStatusCode = (_serverFlags.HasFlag(MockKubeServerFlags.Throw500) ? HttpStatusCode.InternalServerError : HttpStatusCode.OK);
httpContext.Response.StatusCode = (int)returnStatusCode;
httpContext.Response.ContentLength = null;
if (isWatch)
{
ServerShutdown = new AsyncManualResetEvent();
foreach (Enum flag in Enum.GetValues(_serverFlags.GetType()))
{
if (!_serverFlags.HasFlag(flag))
{
continue;
}
switch (flag)
{
case MockKubeServerFlags.AddedPod:
await WriteStreamLine(httpContext, _mockAddedEventStreamLine).ConfigureAwait(false);
break;
case MockKubeServerFlags.ErrorPod:
await WriteStreamLine(httpContext, _mockErrorStreamLine).ConfigureAwait(false);
break;
case MockKubeServerFlags.DeletedPod:
await WriteStreamLine(httpContext, _mockDeletedStreamLine).ConfigureAwait(false);
break;
case MockKubeServerFlags.ModifiedPod:
await WriteStreamLine(httpContext, _mockModifiedStreamLine).ConfigureAwait(false);
break;
case MockKubeServerFlags.BadJson:
await WriteStreamLine(httpContext, MockBadStreamLine).ConfigureAwait(false);
break;
case MockKubeServerFlags.Throw500:
return false;
}
}
// keep server connection open
await ServerShutdown.WaitAsync().ConfigureAwait(false);
return false;
}
foreach (Enum flag in Enum.GetValues(_serverFlags.GetType()))
{
if (!_serverFlags.HasFlag(flag))
{
continue;
}
switch (flag)
{
case MockKubeServerFlags.ListPods:
await WriteStreamLine(httpContext, MockPodResponse).ConfigureAwait(false);
break;
case MockKubeServerFlags.GetPod:
var corev1PodList = JsonConvert.DeserializeObject<V1PodList>(MockPodResponse);
await WriteStreamLine(httpContext, JsonConvert.SerializeObject(corev1PodList.Items.First())).ConfigureAwait(false);
break;
case MockKubeServerFlags.Throw500:
return false;
}
}
return false;
}
}
}