* feat: enhance Kubernetes client with watch functionality * refactor: simplify watch event handling in Kubernetes client example * refactor: update Kubernetes watch functionality to use new event handling methods and add async enumerable support * fix * fix * fix: correct usage of Pod list items in client example and update Obsolete attribute formatting * fix: update client example to use correct Pod list method and improve Obsolete attribute formatting * refactor: enhance type resolution for list items in TypeHelper by adding TryGetItemTypeFromSchema method * feat: mark Watch methods as obsolete to prepare for future deprecation * fix * refactor: update WatcherExt class to internal and remove obsolete attributes; improve example method signature in Program.cs * refactor: change WatcherExt class from internal to public and mark methods as obsolete for future deprecation
1033 lines
42 KiB
C#
1033 lines
42 KiB
C#
using k8s.Models;
|
|
using k8s.Tests.Mock;
|
|
using Microsoft.AspNetCore.Http;
|
|
using Nito.AsyncEx;
|
|
using System;
|
|
using System.Collections.Generic;
|
|
using System.IO;
|
|
using System.Linq;
|
|
using System.Net;
|
|
using System.Net.Http;
|
|
using System.Text;
|
|
using System.Threading;
|
|
using System.Threading.Tasks;
|
|
using Xunit;
|
|
using Xunit.Abstractions;
|
|
|
|
namespace k8s.Tests
|
|
{
|
|
public class WatchTests
|
|
{
|
|
private static readonly string MockAddedEventStreamLine = BuildWatchEventStreamLine(WatchEventType.Added);
|
|
private static readonly string MockDeletedStreamLine = BuildWatchEventStreamLine(WatchEventType.Deleted);
|
|
private static readonly string MockModifiedStreamLine = BuildWatchEventStreamLine(WatchEventType.Modified);
|
|
private static readonly string MockErrorStreamLine = BuildWatchEventStreamLine(WatchEventType.Error);
|
|
private const string MockBadStreamLine = "bad json";
|
|
private static readonly TimeSpan TestTimeout = TimeSpan.FromSeconds(150);
|
|
|
|
private readonly ITestOutputHelper testOutput;
|
|
|
|
public WatchTests(ITestOutputHelper testOutput)
|
|
{
|
|
this.testOutput = testOutput;
|
|
}
|
|
|
|
private static string BuildWatchEventStreamLine(WatchEventType eventType)
|
|
{
|
|
var corev1PodList = KubernetesJson.Deserialize<V1PodList>(MockKubeApiServer.MockPodResponse);
|
|
return KubernetesJson.Serialize(new Watcher<V1Pod>.WatchEvent { Type = eventType, Object = corev1PodList.Items.First() });
|
|
}
|
|
|
|
private static async Task WriteStreamLine(HttpContext httpContext, string reponseLine)
|
|
{
|
|
const string crlf = "\r\n";
|
|
await httpContext.Response.WriteAsync(reponseLine.Replace(crlf, "")).ConfigureAwait(false);
|
|
await httpContext.Response.WriteAsync(crlf).ConfigureAwait(false);
|
|
await httpContext.Response.Body.FlushAsync().ConfigureAwait(false);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task CannotWatch()
|
|
{
|
|
using (var server = new MockKubeApiServer(testOutput))
|
|
{
|
|
var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString() });
|
|
|
|
// did not pass watch param
|
|
var onErrorCalled = false;
|
|
|
|
using (var watcher = client.CoreV1.WatchListNamespacedPod(
|
|
"default",
|
|
onEvent: (type, item) => { },
|
|
onError: e => { onErrorCalled = true; }))
|
|
{
|
|
await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(true); // delay for onerror to be called
|
|
}
|
|
|
|
Assert.True(onErrorCalled);
|
|
|
|
// server did not response line by line
|
|
await Assert.ThrowsAnyAsync<Exception>(() =>
|
|
{
|
|
using (var testWatcher = client.CoreV1.WatchListNamespacedPod(
|
|
"default"))
|
|
{
|
|
return Task.CompletedTask;
|
|
}
|
|
|
|
// this line did not throw
|
|
// using (var testWatcher = client.CoreV1.WatchListNamespacedPod("default", onEvent: (type, item) => { }))
|
|
}).ConfigureAwait(true);
|
|
}
|
|
}
|
|
|
|
[Fact]
|
|
public async Task AsyncWatcher()
|
|
{
|
|
var created = new AsyncManualResetEvent(false);
|
|
var eventsReceived = new AsyncManualResetEvent(false);
|
|
|
|
using (var server = new MockKubeApiServer(testOutput, async httpContext =>
|
|
{
|
|
// block until reponse watcher obj created
|
|
await created.WaitAsync().ConfigureAwait(true);
|
|
await WriteStreamLine(httpContext, MockAddedEventStreamLine).ConfigureAwait(true);
|
|
return false;
|
|
}))
|
|
{
|
|
var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString() });
|
|
|
|
|
|
using (var watcher = client.CoreV1.WatchListNamespacedPod("default", onEvent: (type, item) => { eventsReceived.Set(); }))
|
|
{
|
|
// here watcher is ready to use, but http server has not responsed yet.
|
|
created.Set();
|
|
await Task.WhenAny(eventsReceived.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true);
|
|
}
|
|
|
|
Assert.True(eventsReceived.IsSet);
|
|
Assert.True(created.IsSet);
|
|
}
|
|
}
|
|
|
|
[Fact]
|
|
public async Task SurviveBadLine()
|
|
{
|
|
var eventsReceived = new AsyncCountdownEvent(5);
|
|
var serverShutdown = new AsyncManualResetEvent();
|
|
var connectionClosed = new AsyncManualResetEvent();
|
|
|
|
using (var server =
|
|
new MockKubeApiServer(
|
|
testOutput,
|
|
async httpContext =>
|
|
{
|
|
httpContext.Response.StatusCode = (int)HttpStatusCode.OK;
|
|
httpContext.Response.ContentLength = null;
|
|
|
|
await WriteStreamLine(httpContext, MockKubeApiServer.MockPodResponse).ConfigureAwait(true);
|
|
await WriteStreamLine(httpContext, MockBadStreamLine).ConfigureAwait(true);
|
|
await WriteStreamLine(httpContext, MockAddedEventStreamLine).ConfigureAwait(true);
|
|
await WriteStreamLine(httpContext, MockBadStreamLine).ConfigureAwait(true);
|
|
await WriteStreamLine(httpContext, MockModifiedStreamLine).ConfigureAwait(true);
|
|
|
|
// make server alive, cannot set to int.max as of it would block response
|
|
await serverShutdown.WaitAsync().ConfigureAwait(true);
|
|
return false;
|
|
}))
|
|
{
|
|
var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString() });
|
|
|
|
var events = new HashSet<WatchEventType>();
|
|
var errors = 0;
|
|
|
|
var watcher = client.CoreV1.WatchListNamespacedPod(
|
|
"default",
|
|
onEvent: (type, item) =>
|
|
{
|
|
testOutput.WriteLine($"Watcher received '{type}' event.");
|
|
|
|
events.Add(type);
|
|
eventsReceived.Signal();
|
|
},
|
|
onError: error =>
|
|
{
|
|
testOutput.WriteLine($"Watcher received '{error.GetType().FullName}' error.");
|
|
|
|
errors += 1;
|
|
eventsReceived.Signal();
|
|
},
|
|
onClosed: connectionClosed.Set);
|
|
|
|
// wait server yields all events
|
|
await Task.WhenAny(eventsReceived.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true);
|
|
|
|
Assert.True(
|
|
eventsReceived.CurrentCount == 0,
|
|
"Timed out waiting for all events / errors to be received.");
|
|
|
|
Assert.Contains(WatchEventType.Added, events);
|
|
Assert.Contains(WatchEventType.Modified, events);
|
|
|
|
Assert.Equal(3, errors);
|
|
|
|
Assert.True(watcher.Watching);
|
|
|
|
// Let the server know it can initiate a shut down.
|
|
serverShutdown.Set();
|
|
|
|
await Task.WhenAny(connectionClosed.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true);
|
|
Assert.True(connectionClosed.IsSet);
|
|
}
|
|
}
|
|
|
|
[Fact]
|
|
public async Task DisposeWatch()
|
|
{
|
|
var connectionClosed = new AsyncManualResetEvent();
|
|
var eventsReceived = new AsyncCountdownEvent(1);
|
|
var serverShutdown = new AsyncManualResetEvent();
|
|
|
|
using (var server = new MockKubeApiServer(testOutput, async httpContext =>
|
|
{
|
|
await WriteStreamLine(httpContext, MockAddedEventStreamLine).ConfigureAwait(true);
|
|
await serverShutdown.WaitAsync().ConfigureAwait(true);
|
|
|
|
return false;
|
|
}))
|
|
{
|
|
var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString() });
|
|
|
|
var events = new HashSet<WatchEventType>();
|
|
|
|
var watcher = client.CoreV1.WatchListNamespacedPod(
|
|
"default",
|
|
onEvent: (type, item) =>
|
|
{
|
|
events.Add(type);
|
|
eventsReceived.Signal();
|
|
},
|
|
onError: error =>
|
|
{
|
|
testOutput.WriteLine($"Watcher received '{error.GetType().FullName}' error.");
|
|
},
|
|
onClosed: connectionClosed.Set);
|
|
|
|
// wait at least an event
|
|
await Task.WhenAny(eventsReceived.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true);
|
|
Assert.True(
|
|
eventsReceived.CurrentCount == 0,
|
|
"Timed out waiting for events.");
|
|
|
|
Assert.NotEmpty(events);
|
|
Assert.True(watcher.Watching);
|
|
|
|
watcher.Dispose();
|
|
|
|
events.Clear();
|
|
|
|
await Task.WhenAny(connectionClosed.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true);
|
|
|
|
Assert.False(watcher.Watching);
|
|
Assert.True(connectionClosed.IsSet);
|
|
|
|
serverShutdown.Set();
|
|
}
|
|
}
|
|
|
|
[Fact]
|
|
public async Task WatchAllEvents()
|
|
{
|
|
var eventsReceived =
|
|
new AsyncCountdownEvent(4 /* first line of response is eaten by WatcherDelegatingHandler */);
|
|
var serverShutdown = new AsyncManualResetEvent();
|
|
var waitForClosed = new AsyncManualResetEvent(false);
|
|
|
|
using (var server = new MockKubeApiServer(testOutput, async httpContext =>
|
|
{
|
|
await WriteStreamLine(httpContext, MockAddedEventStreamLine).ConfigureAwait(true);
|
|
await WriteStreamLine(httpContext, MockDeletedStreamLine).ConfigureAwait(true);
|
|
await WriteStreamLine(httpContext, MockModifiedStreamLine).ConfigureAwait(true);
|
|
await WriteStreamLine(httpContext, MockErrorStreamLine).ConfigureAwait(true);
|
|
|
|
// make server alive, cannot set to int.max as of it would block response
|
|
await serverShutdown.WaitAsync().ConfigureAwait(true);
|
|
return false;
|
|
}))
|
|
{
|
|
var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString() });
|
|
|
|
var events = new HashSet<WatchEventType>();
|
|
var errors = 0;
|
|
|
|
var watcher = client.CoreV1.WatchListNamespacedPod(
|
|
"default",
|
|
onEvent: (type, item) =>
|
|
{
|
|
testOutput.WriteLine($"Watcher received '{type}' event.");
|
|
|
|
events.Add(type);
|
|
eventsReceived.Signal();
|
|
},
|
|
onError: error =>
|
|
{
|
|
testOutput.WriteLine($"Watcher received '{error.GetType().FullName}' error.");
|
|
|
|
errors += 1;
|
|
eventsReceived.Signal();
|
|
},
|
|
onClosed: waitForClosed.Set);
|
|
|
|
// wait server yields all events
|
|
await Task.WhenAny(eventsReceived.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true);
|
|
|
|
Assert.True(
|
|
eventsReceived.CurrentCount == 0,
|
|
"Timed out waiting for all events / errors to be received.");
|
|
|
|
Assert.Contains(WatchEventType.Added, events);
|
|
Assert.Contains(WatchEventType.Deleted, events);
|
|
Assert.Contains(WatchEventType.Modified, events);
|
|
Assert.Contains(WatchEventType.Error, events);
|
|
|
|
Assert.Equal(0, errors);
|
|
|
|
Assert.True(watcher.Watching);
|
|
|
|
serverShutdown.Set();
|
|
|
|
await Task.WhenAny(waitForClosed.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true);
|
|
Assert.True(waitForClosed.IsSet);
|
|
Assert.False(watcher.Watching);
|
|
}
|
|
}
|
|
|
|
[Fact]
|
|
public async Task WatchEventsWithTimeout()
|
|
{
|
|
var eventsReceived = new AsyncCountdownEvent(5);
|
|
var serverShutdown = new AsyncManualResetEvent();
|
|
var connectionClosed = new AsyncManualResetEvent();
|
|
|
|
using (var server = new MockKubeApiServer(testOutput, async httpContext =>
|
|
{
|
|
await WriteStreamLine(httpContext, MockKubeApiServer.MockPodResponse).ConfigureAwait(true);
|
|
await Task.Delay(TimeSpan.FromSeconds(120)).ConfigureAwait(true); // The default timeout is 100 seconds
|
|
await WriteStreamLine(httpContext, MockAddedEventStreamLine).ConfigureAwait(true);
|
|
await WriteStreamLine(httpContext, MockDeletedStreamLine).ConfigureAwait(true);
|
|
await WriteStreamLine(httpContext, MockModifiedStreamLine).ConfigureAwait(true);
|
|
await WriteStreamLine(httpContext, MockErrorStreamLine).ConfigureAwait(true);
|
|
|
|
// make server alive, cannot set to int.max as of it would block response
|
|
await serverShutdown.WaitAsync().ConfigureAwait(true);
|
|
return false;
|
|
}))
|
|
{
|
|
var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString() });
|
|
|
|
var events = new HashSet<WatchEventType>();
|
|
var errors = 0;
|
|
|
|
var watcher = client.CoreV1.WatchListNamespacedPod(
|
|
"default",
|
|
onEvent: (type, item) =>
|
|
{
|
|
testOutput.WriteLine($"Watcher received '{type}' event.");
|
|
|
|
events.Add(type);
|
|
eventsReceived.Signal();
|
|
},
|
|
onError: error =>
|
|
{
|
|
testOutput.WriteLine($"Watcher received '{error.GetType().FullName}' error.");
|
|
|
|
errors += 1;
|
|
eventsReceived.Signal();
|
|
},
|
|
onClosed: connectionClosed.Set);
|
|
|
|
// wait server yields all events
|
|
await Task.WhenAny(eventsReceived.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true);
|
|
|
|
Assert.True(
|
|
eventsReceived.CurrentCount == 0,
|
|
"Timed out waiting for all events / errors to be received.");
|
|
|
|
Assert.Contains(WatchEventType.Added, events);
|
|
Assert.Contains(WatchEventType.Deleted, events);
|
|
Assert.Contains(WatchEventType.Modified, events);
|
|
Assert.Contains(WatchEventType.Error, events);
|
|
|
|
Assert.Equal(1, errors);
|
|
|
|
Assert.True(watcher.Watching);
|
|
|
|
serverShutdown.Set();
|
|
|
|
await Task.WhenAny(connectionClosed.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true);
|
|
Assert.True(connectionClosed.IsSet);
|
|
}
|
|
}
|
|
|
|
[Fact]
|
|
public async Task WatchServerDisconnect()
|
|
{
|
|
Exception exceptionCatched = null;
|
|
var exceptionReceived = new AsyncManualResetEvent(false);
|
|
var waitForException = new AsyncManualResetEvent(false);
|
|
var waitForClosed = new AsyncManualResetEvent(false);
|
|
|
|
using (var server = new MockKubeApiServer(testOutput, async httpContext =>
|
|
{
|
|
await WriteStreamLine(httpContext, MockKubeApiServer.MockPodResponse).ConfigureAwait(true);
|
|
await waitForException.WaitAsync().ConfigureAwait(true);
|
|
throw new IOException("server down");
|
|
}))
|
|
{
|
|
var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString() });
|
|
|
|
waitForException.Set();
|
|
Watcher<V1Pod> watcher;
|
|
watcher = client.CoreV1.WatchListNamespacedPod(
|
|
"default",
|
|
onEvent: (type, item) => { },
|
|
onError: e =>
|
|
{
|
|
exceptionCatched = e;
|
|
exceptionReceived.Set();
|
|
},
|
|
onClosed: waitForClosed.Set);
|
|
|
|
// wait server down
|
|
await Task.WhenAny(exceptionReceived.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true);
|
|
|
|
Assert.True(
|
|
exceptionReceived.IsSet,
|
|
"Timed out waiting for exception");
|
|
|
|
await Task.WhenAny(waitForClosed.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true);
|
|
Assert.True(waitForClosed.IsSet);
|
|
Assert.False(watcher.Watching);
|
|
|
|
#if NET8_0_OR_GREATER
|
|
Assert.IsType<HttpIOException>(exceptionCatched);
|
|
#else
|
|
Assert.IsType<IOException>(exceptionCatched);
|
|
#endif
|
|
}
|
|
}
|
|
|
|
private class DummyHandler : DelegatingHandler
|
|
{
|
|
internal bool Called { get; private set; }
|
|
|
|
protected override Task<HttpResponseMessage> SendAsync(
|
|
HttpRequestMessage request,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
Called = true;
|
|
return base.SendAsync(request, cancellationToken);
|
|
}
|
|
}
|
|
|
|
[Fact]
|
|
public async Task TestWatchWithHandlers()
|
|
{
|
|
var eventsReceived = new AsyncCountdownEvent(1);
|
|
var serverShutdown = new AsyncManualResetEvent();
|
|
|
|
using (var server = new MockKubeApiServer(testOutput, async httpContext =>
|
|
{
|
|
await WriteStreamLine(httpContext, MockKubeApiServer.MockPodResponse).ConfigureAwait(true);
|
|
await WriteStreamLine(httpContext, MockAddedEventStreamLine).ConfigureAwait(true);
|
|
|
|
// make server alive, cannot set to int.max as of it would block response
|
|
await serverShutdown.WaitAsync().ConfigureAwait(true);
|
|
return false;
|
|
}))
|
|
{
|
|
var handler1 = new DummyHandler();
|
|
var handler2 = new DummyHandler();
|
|
|
|
var client = new Kubernetes(
|
|
new KubernetesClientConfiguration { Host = server.Uri.ToString() }, handler1, handler2);
|
|
|
|
Assert.False(handler1.Called);
|
|
Assert.False(handler2.Called);
|
|
|
|
var events = new HashSet<WatchEventType>();
|
|
|
|
var watcher = client.CoreV1.WatchListNamespacedPod(
|
|
"default",
|
|
onEvent: (type, item) =>
|
|
{
|
|
events.Add(type);
|
|
eventsReceived.Signal();
|
|
});
|
|
|
|
// wait server yields all events
|
|
await Task.WhenAny(eventsReceived.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true);
|
|
|
|
Assert.True(
|
|
eventsReceived.CurrentCount == 0,
|
|
"Timed out waiting for all events / errors to be received.");
|
|
|
|
Assert.Contains(WatchEventType.Added, events);
|
|
|
|
Assert.True(handler1.Called);
|
|
Assert.True(handler2.Called);
|
|
|
|
serverShutdown.Set();
|
|
}
|
|
}
|
|
|
|
[Fact]
|
|
public async Task DirectWatchAllEvents()
|
|
{
|
|
var eventsReceived = new AsyncCountdownEvent(4);
|
|
var serverShutdown = new AsyncManualResetEvent();
|
|
var connectionClosed = new AsyncManualResetEvent();
|
|
|
|
using (var server = new MockKubeApiServer(testOutput, async httpContext =>
|
|
{
|
|
await WriteStreamLine(httpContext, MockAddedEventStreamLine).ConfigureAwait(true);
|
|
await WriteStreamLine(httpContext, MockDeletedStreamLine).ConfigureAwait(true);
|
|
await WriteStreamLine(httpContext, MockModifiedStreamLine).ConfigureAwait(true);
|
|
await WriteStreamLine(httpContext, MockErrorStreamLine).ConfigureAwait(true);
|
|
|
|
// make server alive, cannot set to int.max as of it would block response
|
|
await serverShutdown.WaitAsync().ConfigureAwait(true);
|
|
return false;
|
|
}))
|
|
{
|
|
var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString() });
|
|
|
|
var events = new HashSet<WatchEventType>();
|
|
var errors = 0;
|
|
|
|
var watcher = client.CoreV1.WatchListNamespacedPod(
|
|
"default",
|
|
fieldSelector: $"metadata.name=${"myPod"}",
|
|
onEvent:
|
|
(type, item) =>
|
|
{
|
|
testOutput.WriteLine($"Watcher received '{type}' event.");
|
|
|
|
events.Add(type);
|
|
eventsReceived.Signal();
|
|
},
|
|
onError:
|
|
error =>
|
|
{
|
|
testOutput.WriteLine($"Watcher received '{error.GetType().FullName}' error.");
|
|
|
|
errors += 1;
|
|
eventsReceived.Signal();
|
|
},
|
|
onClosed: connectionClosed.Set);
|
|
|
|
// wait server yields all events
|
|
await Task.WhenAny(eventsReceived.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true);
|
|
|
|
Assert.True(
|
|
eventsReceived.CurrentCount == 0,
|
|
"Timed out waiting for all events / errors to be received.");
|
|
|
|
Assert.Contains(WatchEventType.Added, events);
|
|
Assert.Contains(WatchEventType.Deleted, events);
|
|
Assert.Contains(WatchEventType.Modified, events);
|
|
Assert.Contains(WatchEventType.Error, events);
|
|
|
|
Assert.Equal(0, errors);
|
|
|
|
Assert.True(watcher.Watching);
|
|
|
|
serverShutdown.Set();
|
|
|
|
await Task.WhenAny(connectionClosed.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true);
|
|
Assert.True(connectionClosed.IsSet);
|
|
}
|
|
}
|
|
|
|
[Fact]
|
|
public async Task EnsureTimeoutWorks()
|
|
{
|
|
using var server = new MockKubeApiServer(testOutput, async httpContext =>
|
|
{
|
|
await Task.Delay(TimeSpan.FromSeconds(120)).ConfigureAwait(true); // The default timeout is 100 seconds
|
|
await WriteStreamLine(httpContext, MockKubeApiServer.MockPodResponse).ConfigureAwait(true);
|
|
|
|
return false;
|
|
});
|
|
|
|
// raw timeout
|
|
await Assert.ThrowsAsync<TaskCanceledException>(async () =>
|
|
{
|
|
var client = new Kubernetes(new KubernetesClientConfiguration
|
|
{
|
|
Host = server.Uri.ToString(),
|
|
HttpClientTimeout = TimeSpan.FromSeconds(5),
|
|
});
|
|
await client.CoreV1.ListNamespacedPodAsync("default").ConfigureAwait(true);
|
|
}).ConfigureAwait(true);
|
|
|
|
// cts
|
|
await Assert.ThrowsAsync<TaskCanceledException>(async () =>
|
|
{
|
|
var cts = new CancellationTokenSource();
|
|
cts.CancelAfter(TimeSpan.FromSeconds(5));
|
|
var client = new Kubernetes(new KubernetesClientConfiguration
|
|
{
|
|
Host = server.Uri.ToString(),
|
|
});
|
|
await client.CoreV1.ListNamespacedPodAsync("default", cancellationToken: cts.Token).ConfigureAwait(true);
|
|
}).ConfigureAwait(true);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task DirectWatchEventsWithTimeout()
|
|
{
|
|
var eventsReceived = new AsyncCountdownEvent(4);
|
|
var serverShutdown = new AsyncManualResetEvent();
|
|
|
|
using (var server = new MockKubeApiServer(testOutput, async httpContext =>
|
|
{
|
|
await Task.Delay(TimeSpan.FromSeconds(120)).ConfigureAwait(true); // The default timeout is 100 seconds
|
|
await WriteStreamLine(httpContext, MockAddedEventStreamLine).ConfigureAwait(true);
|
|
await WriteStreamLine(httpContext, MockDeletedStreamLine).ConfigureAwait(true);
|
|
await WriteStreamLine(httpContext, MockModifiedStreamLine).ConfigureAwait(true);
|
|
await WriteStreamLine(httpContext, MockErrorStreamLine).ConfigureAwait(true);
|
|
|
|
// make server alive, cannot set to int.max as of it would block response
|
|
await serverShutdown.WaitAsync().ConfigureAwait(true);
|
|
return false;
|
|
}))
|
|
{
|
|
var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString() });
|
|
|
|
var events = new HashSet<WatchEventType>();
|
|
var errors = 0;
|
|
|
|
var watcher = client.CoreV1.WatchListNamespacedPod(
|
|
"default",
|
|
fieldSelector: $"metadata.name=${"myPod"}",
|
|
onEvent:
|
|
(type, item) =>
|
|
{
|
|
testOutput.WriteLine($"Watcher received '{type}' event.");
|
|
|
|
events.Add(type);
|
|
eventsReceived.Signal();
|
|
},
|
|
onError:
|
|
error =>
|
|
{
|
|
testOutput.WriteLine($"Watcher received '{error.GetType().FullName}' error.");
|
|
|
|
errors += 1;
|
|
eventsReceived.Signal();
|
|
});
|
|
|
|
// wait server yields all events
|
|
await Task.WhenAny(eventsReceived.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true);
|
|
|
|
Assert.True(
|
|
eventsReceived.CurrentCount == 0,
|
|
"Timed out waiting for all events / errors to be received.");
|
|
|
|
Assert.Contains(WatchEventType.Added, events);
|
|
Assert.Contains(WatchEventType.Deleted, events);
|
|
Assert.Contains(WatchEventType.Modified, events);
|
|
Assert.Contains(WatchEventType.Error, events);
|
|
|
|
Assert.Equal(0, errors);
|
|
|
|
Assert.True(watcher.Watching);
|
|
|
|
serverShutdown.Set();
|
|
}
|
|
}
|
|
|
|
[Fact]
|
|
public async Task WatchShouldCancelAfterRequested()
|
|
{
|
|
var serverShutdown = new AsyncManualResetEvent();
|
|
|
|
using (var server = new MockKubeApiServer(testOutput, async httpContext =>
|
|
{
|
|
httpContext.Response.StatusCode = 200;
|
|
await httpContext.Response.Body.FlushAsync().ConfigureAwait(true);
|
|
await Task.Delay(TimeSpan.FromSeconds(5)).ConfigureAwait(true); // The default timeout is 100 seconds
|
|
return true;
|
|
}, resp: ""))
|
|
{
|
|
var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString() });
|
|
|
|
var cts = new CancellationTokenSource();
|
|
cts.CancelAfter(TimeSpan.FromSeconds(2));
|
|
|
|
await Assert.ThrowsAnyAsync<OperationCanceledException>(async () =>
|
|
{
|
|
using var watcher = client.CoreV1.WatchListNamespacedPod(
|
|
"default",
|
|
onEvent: (type, item) => { });
|
|
await Task.Delay(TimeSpan.FromSeconds(5), cts.Token).ConfigureAwait(true);
|
|
}).ConfigureAwait(true);
|
|
}
|
|
}
|
|
|
|
[Fact]
|
|
public void ReadError()
|
|
{
|
|
var data = Encoding.UTF8.GetBytes(
|
|
"{\"type\":\"ERROR\",\"object\":{\"kind\":\"Status\",\"apiVersion\":\"v1\",\"metadata\":{},\"status\":\"Failure\",\"message\":\"too old resource version: 44982(53593)\",\"reason\":\"Gone\",\"code\":410}}");
|
|
|
|
using (var stream = new MemoryStream(data))
|
|
using (var reader = new StreamReader(stream))
|
|
{
|
|
Exception recordedException = null;
|
|
var mre = new ManualResetEvent(false);
|
|
|
|
var watcher = new Watcher<V1Pod>(
|
|
() => Task.FromResult(reader),
|
|
null,
|
|
(exception) =>
|
|
{
|
|
recordedException = exception;
|
|
mre.Set();
|
|
});
|
|
|
|
mre.WaitOne();
|
|
|
|
Assert.NotNull(recordedException);
|
|
|
|
var k8sException = recordedException as KubernetesException;
|
|
|
|
Assert.NotNull(k8sException);
|
|
Assert.NotNull(k8sException.Status);
|
|
Assert.Equal("too old resource version: 44982(53593)", k8sException.Message);
|
|
Assert.Equal("too old resource version: 44982(53593)", k8sException.Status.Message);
|
|
}
|
|
}
|
|
|
|
private class CheckHeaderDelegatingHandler : DelegatingHandler
|
|
{
|
|
public Version Version { get; private set; }
|
|
|
|
public CheckHeaderDelegatingHandler()
|
|
: base()
|
|
{
|
|
}
|
|
|
|
public CheckHeaderDelegatingHandler(HttpMessageHandler innerHandler)
|
|
: base(innerHandler)
|
|
{
|
|
}
|
|
|
|
protected override Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
|
|
{
|
|
Version = request.Version;
|
|
return base.SendAsync(request, cancellationToken);
|
|
}
|
|
}
|
|
|
|
[Fact]
|
|
public async Task MustHttp2VersionSet()
|
|
{
|
|
var server = new MockKubeApiServer(testOutput, async httpContext =>
|
|
{
|
|
await WriteStreamLine(httpContext, MockAddedEventStreamLine).ConfigureAwait(true);
|
|
return false;
|
|
});
|
|
|
|
var handler = new CheckHeaderDelegatingHandler();
|
|
var client = new Kubernetes(
|
|
new KubernetesClientConfiguration { Host = server.Uri.ToString() }, handler);
|
|
|
|
Assert.Null(handler.Version);
|
|
using var watcher = client.CoreV1.WatchListNamespacedPod("default", onEvent: (type, item) => { });
|
|
Assert.Equal(HttpVersion.Version20, handler.Version);
|
|
await Task.CompletedTask.ConfigureAwait(true);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task AsyncEnumerableWatchAllEvents()
|
|
{
|
|
var eventsReceived = new AsyncCountdownEvent(4);
|
|
var serverShutdown = new AsyncManualResetEvent();
|
|
var watchCompleted = new AsyncManualResetEvent();
|
|
|
|
using (var server = new MockKubeApiServer(testOutput, async httpContext =>
|
|
{
|
|
await WriteStreamLine(httpContext, MockAddedEventStreamLine).ConfigureAwait(true);
|
|
await WriteStreamLine(httpContext, MockDeletedStreamLine).ConfigureAwait(true);
|
|
await WriteStreamLine(httpContext, MockModifiedStreamLine).ConfigureAwait(true);
|
|
await WriteStreamLine(httpContext, MockErrorStreamLine).ConfigureAwait(true);
|
|
|
|
// make server alive, cannot set to int.max as of it would block response
|
|
await serverShutdown.WaitAsync().ConfigureAwait(true);
|
|
return false;
|
|
}))
|
|
{
|
|
var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString() });
|
|
|
|
var events = new HashSet<WatchEventType>();
|
|
var errors = 0;
|
|
|
|
// Start async enumerable watch in background task
|
|
var watchTask = Task.Run(async () =>
|
|
{
|
|
try
|
|
{
|
|
await foreach (var (type, item) in client.CoreV1.WatchListNamespacedPodAsync("default").ConfigureAwait(false))
|
|
{
|
|
testOutput.WriteLine($"AsyncEnumerable Watcher received '{type}' event.");
|
|
events.Add(type);
|
|
eventsReceived.Signal();
|
|
|
|
// Break when we have all expected events
|
|
if (events.Count >= 4)
|
|
{
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
testOutput.WriteLine($"AsyncEnumerable Watcher received exception: {ex.GetType().FullName}");
|
|
errors++;
|
|
eventsReceived.Signal();
|
|
}
|
|
finally
|
|
{
|
|
watchCompleted.Set();
|
|
}
|
|
});
|
|
|
|
// wait server yields all events
|
|
await Task.WhenAny(eventsReceived.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true);
|
|
|
|
Assert.True(
|
|
eventsReceived.CurrentCount == 0,
|
|
"Timed out waiting for all events / errors to be received.");
|
|
|
|
Assert.Contains(WatchEventType.Added, events);
|
|
Assert.Contains(WatchEventType.Deleted, events);
|
|
Assert.Contains(WatchEventType.Modified, events);
|
|
Assert.Contains(WatchEventType.Error, events);
|
|
|
|
Assert.Equal(0, errors);
|
|
|
|
serverShutdown.Set();
|
|
|
|
await Task.WhenAny(watchCompleted.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true);
|
|
Assert.True(watchCompleted.IsSet);
|
|
}
|
|
}
|
|
|
|
[Fact]
|
|
public async Task AsyncEnumerableWatchWithCancellation()
|
|
{
|
|
var eventsReceived = new AsyncCountdownEvent(2);
|
|
var serverShutdown = new AsyncManualResetEvent();
|
|
|
|
using (var server = new MockKubeApiServer(testOutput, async httpContext =>
|
|
{
|
|
await WriteStreamLine(httpContext, MockAddedEventStreamLine).ConfigureAwait(true);
|
|
await WriteStreamLine(httpContext, MockModifiedStreamLine).ConfigureAwait(true);
|
|
|
|
// Keep server alive
|
|
await serverShutdown.WaitAsync().ConfigureAwait(true);
|
|
return false;
|
|
}))
|
|
{
|
|
var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString() });
|
|
|
|
var events = new HashSet<WatchEventType>();
|
|
var cts = new CancellationTokenSource();
|
|
|
|
var watchTask = Task.Run(async () =>
|
|
{
|
|
try
|
|
{
|
|
await foreach (var (type, item) in client.CoreV1.WatchListNamespacedPodAsync("default", cancellationToken: cts.Token).ConfigureAwait(false))
|
|
{
|
|
testOutput.WriteLine($"AsyncEnumerable Watcher received '{type}' event.");
|
|
events.Add(type);
|
|
eventsReceived.Signal();
|
|
}
|
|
}
|
|
catch (OperationCanceledException)
|
|
{
|
|
testOutput.WriteLine("AsyncEnumerable Watcher was cancelled as expected.");
|
|
}
|
|
});
|
|
|
|
// Wait for some events to be received
|
|
await Task.WhenAny(eventsReceived.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true);
|
|
|
|
Assert.True(
|
|
eventsReceived.CurrentCount == 0,
|
|
"Timed out waiting for events to be received.");
|
|
|
|
Assert.Contains(WatchEventType.Added, events);
|
|
Assert.Contains(WatchEventType.Modified, events);
|
|
|
|
// Cancel the watch
|
|
cts.Cancel();
|
|
|
|
// Wait for watch task to complete
|
|
await Task.WhenAny(watchTask, Task.Delay(TimeSpan.FromSeconds(5))).ConfigureAwait(true);
|
|
Assert.True(watchTask.IsCompletedSuccessfully || watchTask.IsCanceled);
|
|
|
|
serverShutdown.Set();
|
|
}
|
|
}
|
|
|
|
[Fact]
|
|
public async Task AsyncEnumerableWatchWithFieldSelector()
|
|
{
|
|
var eventsReceived = new AsyncCountdownEvent(3);
|
|
var serverShutdown = new AsyncManualResetEvent();
|
|
var watchCompleted = new AsyncManualResetEvent();
|
|
|
|
using (var server = new MockKubeApiServer(testOutput, async httpContext =>
|
|
{
|
|
await WriteStreamLine(httpContext, MockAddedEventStreamLine).ConfigureAwait(true);
|
|
await WriteStreamLine(httpContext, MockDeletedStreamLine).ConfigureAwait(true);
|
|
await WriteStreamLine(httpContext, MockModifiedStreamLine).ConfigureAwait(true);
|
|
|
|
await serverShutdown.WaitAsync().ConfigureAwait(true);
|
|
return false;
|
|
}))
|
|
{
|
|
var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString() });
|
|
|
|
var events = new List<(WatchEventType, V1Pod)>();
|
|
|
|
var watchTask = Task.Run(async () =>
|
|
{
|
|
try
|
|
{
|
|
await foreach (var (type, item) in client.CoreV1.WatchListNamespacedPodAsync(
|
|
"default",
|
|
fieldSelector: $"metadata.name={"testPod"}").ConfigureAwait(false))
|
|
{
|
|
testOutput.WriteLine($"AsyncEnumerable Watcher received '{type}' event for pod '{item?.Metadata?.Name}'.");
|
|
events.Add((type, item));
|
|
eventsReceived.Signal();
|
|
|
|
if (events.Count >= 3)
|
|
{
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
testOutput.WriteLine($"AsyncEnumerable Watcher received exception: {ex.GetType().FullName}");
|
|
}
|
|
finally
|
|
{
|
|
watchCompleted.Set();
|
|
}
|
|
});
|
|
|
|
// Wait for events
|
|
await Task.WhenAny(eventsReceived.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true);
|
|
|
|
Assert.True(
|
|
eventsReceived.CurrentCount == 0,
|
|
"Timed out waiting for all events to be received.");
|
|
|
|
Assert.Equal(3, events.Count);
|
|
Assert.Contains(events, e => e.Item1 == WatchEventType.Added);
|
|
Assert.Contains(events, e => e.Item1 == WatchEventType.Deleted);
|
|
Assert.Contains(events, e => e.Item1 == WatchEventType.Modified);
|
|
|
|
serverShutdown.Set();
|
|
|
|
await Task.WhenAny(watchCompleted.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true);
|
|
Assert.True(watchCompleted.IsSet);
|
|
}
|
|
}
|
|
|
|
[Fact]
|
|
public async Task AsyncEnumerableWatchErrorHandling()
|
|
{
|
|
var eventsReceived = new AsyncCountdownEvent(3);
|
|
var serverShutdown = new AsyncManualResetEvent();
|
|
var watchCompleted = new AsyncManualResetEvent();
|
|
var errorReceived = new AsyncManualResetEvent();
|
|
|
|
using (var server = new MockKubeApiServer(testOutput, async httpContext =>
|
|
{
|
|
await WriteStreamLine(httpContext, MockKubeApiServer.MockPodResponse).ConfigureAwait(true);
|
|
await WriteStreamLine(httpContext, MockBadStreamLine).ConfigureAwait(true);
|
|
await WriteStreamLine(httpContext, MockAddedEventStreamLine).ConfigureAwait(true);
|
|
|
|
await serverShutdown.WaitAsync().ConfigureAwait(true);
|
|
return false;
|
|
}))
|
|
{
|
|
var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString() });
|
|
|
|
var events = new List<(WatchEventType, V1Pod)>();
|
|
var errorCaught = false;
|
|
|
|
var watchTask = Task.Run(async () =>
|
|
{
|
|
try
|
|
{
|
|
await foreach (var (type, item) in client.CoreV1.WatchListNamespacedPodAsync(
|
|
"default",
|
|
onError: ex =>
|
|
{
|
|
testOutput.WriteLine($"AsyncEnumerable Watcher onError called: {ex.GetType().FullName}");
|
|
errorCaught = true;
|
|
errorReceived.Set();
|
|
eventsReceived.Signal();
|
|
}).ConfigureAwait(false))
|
|
{
|
|
testOutput.WriteLine($"AsyncEnumerable Watcher received '{type}' event.");
|
|
events.Add((type, item));
|
|
eventsReceived.Signal();
|
|
|
|
// Expect some valid events plus error handling
|
|
if (events.Count >= 2)
|
|
{
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
testOutput.WriteLine($"AsyncEnumerable Watcher caught exception: {ex.GetType().FullName}");
|
|
}
|
|
finally
|
|
{
|
|
watchCompleted.Set();
|
|
}
|
|
});
|
|
|
|
// Wait for events and errors
|
|
await Task.WhenAny(eventsReceived.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true);
|
|
|
|
Assert.True(
|
|
eventsReceived.CurrentCount == 0,
|
|
"Timed out waiting for events and errors to be received.");
|
|
|
|
// Should have received at least one valid event and one error
|
|
Assert.True(events.Count >= 1, "Should have received at least one valid event");
|
|
Assert.True(errorCaught, "Should have caught parsing error");
|
|
Assert.True(errorReceived.IsSet, "Error callback should have been called");
|
|
|
|
serverShutdown.Set();
|
|
|
|
await Task.WhenAny(watchCompleted.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true);
|
|
Assert.True(watchCompleted.IsSet);
|
|
}
|
|
}
|
|
}
|
|
}
|