using k8s.Models; using k8s.Tests.Mock; using Microsoft.AspNetCore.Http; using Nito.AsyncEx; using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Net; using System.Net.Http; using System.Text; using System.Threading; using System.Threading.Tasks; using Xunit; using Xunit.Abstractions; namespace k8s.Tests { public class WatchTests { private static readonly string MockAddedEventStreamLine = BuildWatchEventStreamLine(WatchEventType.Added); private static readonly string MockDeletedStreamLine = BuildWatchEventStreamLine(WatchEventType.Deleted); private static readonly string MockModifiedStreamLine = BuildWatchEventStreamLine(WatchEventType.Modified); private static readonly string MockErrorStreamLine = BuildWatchEventStreamLine(WatchEventType.Error); private const string MockBadStreamLine = "bad json"; private static readonly TimeSpan TestTimeout = TimeSpan.FromSeconds(150); private readonly ITestOutputHelper testOutput; public WatchTests(ITestOutputHelper testOutput) { this.testOutput = testOutput; } private static string BuildWatchEventStreamLine(WatchEventType eventType) { var corev1PodList = KubernetesJson.Deserialize(MockKubeApiServer.MockPodResponse); return KubernetesJson.Serialize(new Watcher.WatchEvent { Type = eventType, Object = corev1PodList.Items.First() }); } private static async Task WriteStreamLine(HttpContext httpContext, string reponseLine) { const string crlf = "\r\n"; await httpContext.Response.WriteAsync(reponseLine.Replace(crlf, "")).ConfigureAwait(false); await httpContext.Response.WriteAsync(crlf).ConfigureAwait(false); await httpContext.Response.Body.FlushAsync().ConfigureAwait(false); } [Fact] public async Task CannotWatch() { using (var server = new MockKubeApiServer(testOutput)) { var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString() }); // did not pass watch param var onErrorCalled = false; using (var watcher = client.CoreV1.WatchListNamespacedPod( "default", onEvent: (type, item) => { }, onError: e => { onErrorCalled = true; })) { await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(true); // delay for onerror to be called } Assert.True(onErrorCalled); // server did not response line by line await Assert.ThrowsAnyAsync(() => { using (var testWatcher = client.CoreV1.WatchListNamespacedPod( "default")) { return Task.CompletedTask; } // this line did not throw // using (var testWatcher = client.CoreV1.WatchListNamespacedPod("default", onEvent: (type, item) => { })) }).ConfigureAwait(true); } } [Fact] public async Task AsyncWatcher() { var created = new AsyncManualResetEvent(false); var eventsReceived = new AsyncManualResetEvent(false); using (var server = new MockKubeApiServer(testOutput, async httpContext => { // block until reponse watcher obj created await created.WaitAsync().ConfigureAwait(true); await WriteStreamLine(httpContext, MockAddedEventStreamLine).ConfigureAwait(true); return false; })) { var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString() }); using (var watcher = client.CoreV1.WatchListNamespacedPod("default", onEvent: (type, item) => { eventsReceived.Set(); })) { // here watcher is ready to use, but http server has not responsed yet. created.Set(); await Task.WhenAny(eventsReceived.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true); } Assert.True(eventsReceived.IsSet); Assert.True(created.IsSet); } } [Fact] public async Task SurviveBadLine() { var eventsReceived = new AsyncCountdownEvent(5); var serverShutdown = new AsyncManualResetEvent(); var connectionClosed = new AsyncManualResetEvent(); using (var server = new MockKubeApiServer( testOutput, async httpContext => { httpContext.Response.StatusCode = (int)HttpStatusCode.OK; httpContext.Response.ContentLength = null; await WriteStreamLine(httpContext, MockKubeApiServer.MockPodResponse).ConfigureAwait(true); await WriteStreamLine(httpContext, MockBadStreamLine).ConfigureAwait(true); await WriteStreamLine(httpContext, MockAddedEventStreamLine).ConfigureAwait(true); await WriteStreamLine(httpContext, MockBadStreamLine).ConfigureAwait(true); await WriteStreamLine(httpContext, MockModifiedStreamLine).ConfigureAwait(true); // make server alive, cannot set to int.max as of it would block response await serverShutdown.WaitAsync().ConfigureAwait(true); return false; })) { var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString() }); var events = new HashSet(); var errors = 0; var watcher = client.CoreV1.WatchListNamespacedPod( "default", onEvent: (type, item) => { testOutput.WriteLine($"Watcher received '{type}' event."); events.Add(type); eventsReceived.Signal(); }, onError: error => { testOutput.WriteLine($"Watcher received '{error.GetType().FullName}' error."); errors += 1; eventsReceived.Signal(); }, onClosed: connectionClosed.Set); // wait server yields all events await Task.WhenAny(eventsReceived.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true); Assert.True( eventsReceived.CurrentCount == 0, "Timed out waiting for all events / errors to be received."); Assert.Contains(WatchEventType.Added, events); Assert.Contains(WatchEventType.Modified, events); Assert.Equal(3, errors); Assert.True(watcher.Watching); // Let the server know it can initiate a shut down. serverShutdown.Set(); await Task.WhenAny(connectionClosed.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true); Assert.True(connectionClosed.IsSet); } } [Fact] public async Task DisposeWatch() { var connectionClosed = new AsyncManualResetEvent(); var eventsReceived = new AsyncCountdownEvent(1); var serverShutdown = new AsyncManualResetEvent(); using (var server = new MockKubeApiServer(testOutput, async httpContext => { await WriteStreamLine(httpContext, MockAddedEventStreamLine).ConfigureAwait(true); await serverShutdown.WaitAsync().ConfigureAwait(true); return false; })) { var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString() }); var events = new HashSet(); var watcher = client.CoreV1.WatchListNamespacedPod( "default", onEvent: (type, item) => { events.Add(type); eventsReceived.Signal(); }, onError: error => { testOutput.WriteLine($"Watcher received '{error.GetType().FullName}' error."); }, onClosed: connectionClosed.Set); // wait at least an event await Task.WhenAny(eventsReceived.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true); Assert.True( eventsReceived.CurrentCount == 0, "Timed out waiting for events."); Assert.NotEmpty(events); Assert.True(watcher.Watching); watcher.Dispose(); events.Clear(); await Task.WhenAny(connectionClosed.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true); Assert.False(watcher.Watching); Assert.True(connectionClosed.IsSet); serverShutdown.Set(); } } [Fact] public async Task WatchAllEvents() { var eventsReceived = new AsyncCountdownEvent(4 /* first line of response is eaten by WatcherDelegatingHandler */); var serverShutdown = new AsyncManualResetEvent(); var waitForClosed = new AsyncManualResetEvent(false); using (var server = new MockKubeApiServer(testOutput, async httpContext => { await WriteStreamLine(httpContext, MockAddedEventStreamLine).ConfigureAwait(true); await WriteStreamLine(httpContext, MockDeletedStreamLine).ConfigureAwait(true); await WriteStreamLine(httpContext, MockModifiedStreamLine).ConfigureAwait(true); await WriteStreamLine(httpContext, MockErrorStreamLine).ConfigureAwait(true); // make server alive, cannot set to int.max as of it would block response await serverShutdown.WaitAsync().ConfigureAwait(true); return false; })) { var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString() }); var events = new HashSet(); var errors = 0; var watcher = client.CoreV1.WatchListNamespacedPod( "default", onEvent: (type, item) => { testOutput.WriteLine($"Watcher received '{type}' event."); events.Add(type); eventsReceived.Signal(); }, onError: error => { testOutput.WriteLine($"Watcher received '{error.GetType().FullName}' error."); errors += 1; eventsReceived.Signal(); }, onClosed: waitForClosed.Set); // wait server yields all events await Task.WhenAny(eventsReceived.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true); Assert.True( eventsReceived.CurrentCount == 0, "Timed out waiting for all events / errors to be received."); Assert.Contains(WatchEventType.Added, events); Assert.Contains(WatchEventType.Deleted, events); Assert.Contains(WatchEventType.Modified, events); Assert.Contains(WatchEventType.Error, events); Assert.Equal(0, errors); Assert.True(watcher.Watching); serverShutdown.Set(); await Task.WhenAny(waitForClosed.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true); Assert.True(waitForClosed.IsSet); Assert.False(watcher.Watching); } } [Fact] public async Task WatchEventsWithTimeout() { var eventsReceived = new AsyncCountdownEvent(5); var serverShutdown = new AsyncManualResetEvent(); var connectionClosed = new AsyncManualResetEvent(); using (var server = new MockKubeApiServer(testOutput, async httpContext => { await WriteStreamLine(httpContext, MockKubeApiServer.MockPodResponse).ConfigureAwait(true); await Task.Delay(TimeSpan.FromSeconds(120)).ConfigureAwait(true); // The default timeout is 100 seconds await WriteStreamLine(httpContext, MockAddedEventStreamLine).ConfigureAwait(true); await WriteStreamLine(httpContext, MockDeletedStreamLine).ConfigureAwait(true); await WriteStreamLine(httpContext, MockModifiedStreamLine).ConfigureAwait(true); await WriteStreamLine(httpContext, MockErrorStreamLine).ConfigureAwait(true); // make server alive, cannot set to int.max as of it would block response await serverShutdown.WaitAsync().ConfigureAwait(true); return false; })) { var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString() }); var events = new HashSet(); var errors = 0; var watcher = client.CoreV1.WatchListNamespacedPod( "default", onEvent: (type, item) => { testOutput.WriteLine($"Watcher received '{type}' event."); events.Add(type); eventsReceived.Signal(); }, onError: error => { testOutput.WriteLine($"Watcher received '{error.GetType().FullName}' error."); errors += 1; eventsReceived.Signal(); }, onClosed: connectionClosed.Set); // wait server yields all events await Task.WhenAny(eventsReceived.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true); Assert.True( eventsReceived.CurrentCount == 0, "Timed out waiting for all events / errors to be received."); Assert.Contains(WatchEventType.Added, events); Assert.Contains(WatchEventType.Deleted, events); Assert.Contains(WatchEventType.Modified, events); Assert.Contains(WatchEventType.Error, events); Assert.Equal(1, errors); Assert.True(watcher.Watching); serverShutdown.Set(); await Task.WhenAny(connectionClosed.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true); Assert.True(connectionClosed.IsSet); } } [Fact] public async Task WatchServerDisconnect() { Exception exceptionCatched = null; var exceptionReceived = new AsyncManualResetEvent(false); var waitForException = new AsyncManualResetEvent(false); var waitForClosed = new AsyncManualResetEvent(false); using (var server = new MockKubeApiServer(testOutput, async httpContext => { await WriteStreamLine(httpContext, MockKubeApiServer.MockPodResponse).ConfigureAwait(true); await waitForException.WaitAsync().ConfigureAwait(true); throw new IOException("server down"); })) { var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString() }); waitForException.Set(); Watcher watcher; watcher = client.CoreV1.WatchListNamespacedPod( "default", onEvent: (type, item) => { }, onError: e => { exceptionCatched = e; exceptionReceived.Set(); }, onClosed: waitForClosed.Set); // wait server down await Task.WhenAny(exceptionReceived.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true); Assert.True( exceptionReceived.IsSet, "Timed out waiting for exception"); await Task.WhenAny(waitForClosed.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true); Assert.True(waitForClosed.IsSet); Assert.False(watcher.Watching); #if NET8_0_OR_GREATER Assert.IsType(exceptionCatched); #else Assert.IsType(exceptionCatched); #endif } } private class DummyHandler : DelegatingHandler { internal bool Called { get; private set; } protected override Task SendAsync( HttpRequestMessage request, CancellationToken cancellationToken) { Called = true; return base.SendAsync(request, cancellationToken); } } [Fact] public async Task TestWatchWithHandlers() { var eventsReceived = new AsyncCountdownEvent(1); var serverShutdown = new AsyncManualResetEvent(); using (var server = new MockKubeApiServer(testOutput, async httpContext => { await WriteStreamLine(httpContext, MockKubeApiServer.MockPodResponse).ConfigureAwait(true); await WriteStreamLine(httpContext, MockAddedEventStreamLine).ConfigureAwait(true); // make server alive, cannot set to int.max as of it would block response await serverShutdown.WaitAsync().ConfigureAwait(true); return false; })) { var handler1 = new DummyHandler(); var handler2 = new DummyHandler(); var client = new Kubernetes( new KubernetesClientConfiguration { Host = server.Uri.ToString() }, handler1, handler2); Assert.False(handler1.Called); Assert.False(handler2.Called); var events = new HashSet(); var watcher = client.CoreV1.WatchListNamespacedPod( "default", onEvent: (type, item) => { events.Add(type); eventsReceived.Signal(); }); // wait server yields all events await Task.WhenAny(eventsReceived.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true); Assert.True( eventsReceived.CurrentCount == 0, "Timed out waiting for all events / errors to be received."); Assert.Contains(WatchEventType.Added, events); Assert.True(handler1.Called); Assert.True(handler2.Called); serverShutdown.Set(); } } [Fact] public async Task DirectWatchAllEvents() { var eventsReceived = new AsyncCountdownEvent(4); var serverShutdown = new AsyncManualResetEvent(); var connectionClosed = new AsyncManualResetEvent(); using (var server = new MockKubeApiServer(testOutput, async httpContext => { await WriteStreamLine(httpContext, MockAddedEventStreamLine).ConfigureAwait(true); await WriteStreamLine(httpContext, MockDeletedStreamLine).ConfigureAwait(true); await WriteStreamLine(httpContext, MockModifiedStreamLine).ConfigureAwait(true); await WriteStreamLine(httpContext, MockErrorStreamLine).ConfigureAwait(true); // make server alive, cannot set to int.max as of it would block response await serverShutdown.WaitAsync().ConfigureAwait(true); return false; })) { var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString() }); var events = new HashSet(); var errors = 0; var watcher = client.CoreV1.WatchListNamespacedPod( "default", fieldSelector: $"metadata.name=${"myPod"}", onEvent: (type, item) => { testOutput.WriteLine($"Watcher received '{type}' event."); events.Add(type); eventsReceived.Signal(); }, onError: error => { testOutput.WriteLine($"Watcher received '{error.GetType().FullName}' error."); errors += 1; eventsReceived.Signal(); }, onClosed: connectionClosed.Set); // wait server yields all events await Task.WhenAny(eventsReceived.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true); Assert.True( eventsReceived.CurrentCount == 0, "Timed out waiting for all events / errors to be received."); Assert.Contains(WatchEventType.Added, events); Assert.Contains(WatchEventType.Deleted, events); Assert.Contains(WatchEventType.Modified, events); Assert.Contains(WatchEventType.Error, events); Assert.Equal(0, errors); Assert.True(watcher.Watching); serverShutdown.Set(); await Task.WhenAny(connectionClosed.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true); Assert.True(connectionClosed.IsSet); } } [Fact] public async Task EnsureTimeoutWorks() { using var server = new MockKubeApiServer(testOutput, async httpContext => { await Task.Delay(TimeSpan.FromSeconds(120)).ConfigureAwait(true); // The default timeout is 100 seconds await WriteStreamLine(httpContext, MockKubeApiServer.MockPodResponse).ConfigureAwait(true); return false; }); // raw timeout await Assert.ThrowsAsync(async () => { var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString(), HttpClientTimeout = TimeSpan.FromSeconds(5), }); await client.CoreV1.ListNamespacedPodAsync("default").ConfigureAwait(true); }).ConfigureAwait(true); // cts await Assert.ThrowsAsync(async () => { var cts = new CancellationTokenSource(); cts.CancelAfter(TimeSpan.FromSeconds(5)); var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString(), }); await client.CoreV1.ListNamespacedPodAsync("default", cancellationToken: cts.Token).ConfigureAwait(true); }).ConfigureAwait(true); } [Fact] public async Task DirectWatchEventsWithTimeout() { var eventsReceived = new AsyncCountdownEvent(4); var serverShutdown = new AsyncManualResetEvent(); using (var server = new MockKubeApiServer(testOutput, async httpContext => { await Task.Delay(TimeSpan.FromSeconds(120)).ConfigureAwait(true); // The default timeout is 100 seconds await WriteStreamLine(httpContext, MockAddedEventStreamLine).ConfigureAwait(true); await WriteStreamLine(httpContext, MockDeletedStreamLine).ConfigureAwait(true); await WriteStreamLine(httpContext, MockModifiedStreamLine).ConfigureAwait(true); await WriteStreamLine(httpContext, MockErrorStreamLine).ConfigureAwait(true); // make server alive, cannot set to int.max as of it would block response await serverShutdown.WaitAsync().ConfigureAwait(true); return false; })) { var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString() }); var events = new HashSet(); var errors = 0; var watcher = client.CoreV1.WatchListNamespacedPod( "default", fieldSelector: $"metadata.name=${"myPod"}", onEvent: (type, item) => { testOutput.WriteLine($"Watcher received '{type}' event."); events.Add(type); eventsReceived.Signal(); }, onError: error => { testOutput.WriteLine($"Watcher received '{error.GetType().FullName}' error."); errors += 1; eventsReceived.Signal(); }); // wait server yields all events await Task.WhenAny(eventsReceived.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true); Assert.True( eventsReceived.CurrentCount == 0, "Timed out waiting for all events / errors to be received."); Assert.Contains(WatchEventType.Added, events); Assert.Contains(WatchEventType.Deleted, events); Assert.Contains(WatchEventType.Modified, events); Assert.Contains(WatchEventType.Error, events); Assert.Equal(0, errors); Assert.True(watcher.Watching); serverShutdown.Set(); } } [Fact] public async Task WatchShouldCancelAfterRequested() { var serverShutdown = new AsyncManualResetEvent(); using (var server = new MockKubeApiServer(testOutput, async httpContext => { httpContext.Response.StatusCode = 200; await httpContext.Response.Body.FlushAsync().ConfigureAwait(true); await Task.Delay(TimeSpan.FromSeconds(5)).ConfigureAwait(true); // The default timeout is 100 seconds return true; }, resp: "")) { var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString() }); var cts = new CancellationTokenSource(); cts.CancelAfter(TimeSpan.FromSeconds(2)); await Assert.ThrowsAnyAsync(async () => { using var watcher = client.CoreV1.WatchListNamespacedPod( "default", onEvent: (type, item) => { }); await Task.Delay(TimeSpan.FromSeconds(5), cts.Token).ConfigureAwait(true); }).ConfigureAwait(true); } } [Fact] public void ReadError() { var data = Encoding.UTF8.GetBytes( "{\"type\":\"ERROR\",\"object\":{\"kind\":\"Status\",\"apiVersion\":\"v1\",\"metadata\":{},\"status\":\"Failure\",\"message\":\"too old resource version: 44982(53593)\",\"reason\":\"Gone\",\"code\":410}}"); using (var stream = new MemoryStream(data)) using (var reader = new StreamReader(stream)) { Exception recordedException = null; var mre = new ManualResetEvent(false); var watcher = new Watcher( () => Task.FromResult(reader), null, (exception) => { recordedException = exception; mre.Set(); }); mre.WaitOne(); Assert.NotNull(recordedException); var k8sException = recordedException as KubernetesException; Assert.NotNull(k8sException); Assert.NotNull(k8sException.Status); Assert.Equal("too old resource version: 44982(53593)", k8sException.Message); Assert.Equal("too old resource version: 44982(53593)", k8sException.Status.Message); } } private class CheckHeaderDelegatingHandler : DelegatingHandler { public Version Version { get; private set; } public CheckHeaderDelegatingHandler() : base() { } public CheckHeaderDelegatingHandler(HttpMessageHandler innerHandler) : base(innerHandler) { } protected override Task SendAsync(HttpRequestMessage request, CancellationToken cancellationToken) { Version = request.Version; return base.SendAsync(request, cancellationToken); } } [Fact] public async Task MustHttp2VersionSet() { var server = new MockKubeApiServer(testOutput, async httpContext => { await WriteStreamLine(httpContext, MockAddedEventStreamLine).ConfigureAwait(true); return false; }); var handler = new CheckHeaderDelegatingHandler(); var client = new Kubernetes( new KubernetesClientConfiguration { Host = server.Uri.ToString() }, handler); Assert.Null(handler.Version); using var watcher = client.CoreV1.WatchListNamespacedPod("default", onEvent: (type, item) => { }); Assert.Equal(HttpVersion.Version20, handler.Version); await Task.CompletedTask.ConfigureAwait(true); } [Fact] public async Task AsyncEnumerableWatchAllEvents() { var eventsReceived = new AsyncCountdownEvent(4); var serverShutdown = new AsyncManualResetEvent(); var watchCompleted = new AsyncManualResetEvent(); using (var server = new MockKubeApiServer(testOutput, async httpContext => { await WriteStreamLine(httpContext, MockAddedEventStreamLine).ConfigureAwait(true); await WriteStreamLine(httpContext, MockDeletedStreamLine).ConfigureAwait(true); await WriteStreamLine(httpContext, MockModifiedStreamLine).ConfigureAwait(true); await WriteStreamLine(httpContext, MockErrorStreamLine).ConfigureAwait(true); // make server alive, cannot set to int.max as of it would block response await serverShutdown.WaitAsync().ConfigureAwait(true); return false; })) { var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString() }); var events = new HashSet(); var errors = 0; // Start async enumerable watch in background task var watchTask = Task.Run(async () => { try { await foreach (var (type, item) in client.CoreV1.WatchListNamespacedPodAsync("default").ConfigureAwait(false)) { testOutput.WriteLine($"AsyncEnumerable Watcher received '{type}' event."); events.Add(type); eventsReceived.Signal(); // Break when we have all expected events if (events.Count >= 4) { break; } } } catch (Exception ex) { testOutput.WriteLine($"AsyncEnumerable Watcher received exception: {ex.GetType().FullName}"); errors++; eventsReceived.Signal(); } finally { watchCompleted.Set(); } }); // wait server yields all events await Task.WhenAny(eventsReceived.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true); Assert.True( eventsReceived.CurrentCount == 0, "Timed out waiting for all events / errors to be received."); Assert.Contains(WatchEventType.Added, events); Assert.Contains(WatchEventType.Deleted, events); Assert.Contains(WatchEventType.Modified, events); Assert.Contains(WatchEventType.Error, events); Assert.Equal(0, errors); serverShutdown.Set(); await Task.WhenAny(watchCompleted.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true); Assert.True(watchCompleted.IsSet); } } [Fact] public async Task AsyncEnumerableWatchWithCancellation() { var eventsReceived = new AsyncCountdownEvent(2); var serverShutdown = new AsyncManualResetEvent(); using (var server = new MockKubeApiServer(testOutput, async httpContext => { await WriteStreamLine(httpContext, MockAddedEventStreamLine).ConfigureAwait(true); await WriteStreamLine(httpContext, MockModifiedStreamLine).ConfigureAwait(true); // Keep server alive await serverShutdown.WaitAsync().ConfigureAwait(true); return false; })) { var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString() }); var events = new HashSet(); var cts = new CancellationTokenSource(); var watchTask = Task.Run(async () => { try { await foreach (var (type, item) in client.CoreV1.WatchListNamespacedPodAsync("default", cancellationToken: cts.Token).ConfigureAwait(false)) { testOutput.WriteLine($"AsyncEnumerable Watcher received '{type}' event."); events.Add(type); eventsReceived.Signal(); } } catch (OperationCanceledException) { testOutput.WriteLine("AsyncEnumerable Watcher was cancelled as expected."); } }); // Wait for some events to be received await Task.WhenAny(eventsReceived.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true); Assert.True( eventsReceived.CurrentCount == 0, "Timed out waiting for events to be received."); Assert.Contains(WatchEventType.Added, events); Assert.Contains(WatchEventType.Modified, events); // Cancel the watch cts.Cancel(); // Wait for watch task to complete await Task.WhenAny(watchTask, Task.Delay(TimeSpan.FromSeconds(5))).ConfigureAwait(true); Assert.True(watchTask.IsCompletedSuccessfully || watchTask.IsCanceled); serverShutdown.Set(); } } [Fact] public async Task AsyncEnumerableWatchWithFieldSelector() { var eventsReceived = new AsyncCountdownEvent(3); var serverShutdown = new AsyncManualResetEvent(); var watchCompleted = new AsyncManualResetEvent(); using (var server = new MockKubeApiServer(testOutput, async httpContext => { await WriteStreamLine(httpContext, MockAddedEventStreamLine).ConfigureAwait(true); await WriteStreamLine(httpContext, MockDeletedStreamLine).ConfigureAwait(true); await WriteStreamLine(httpContext, MockModifiedStreamLine).ConfigureAwait(true); await serverShutdown.WaitAsync().ConfigureAwait(true); return false; })) { var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString() }); var events = new List<(WatchEventType, V1Pod)>(); var watchTask = Task.Run(async () => { try { await foreach (var (type, item) in client.CoreV1.WatchListNamespacedPodAsync( "default", fieldSelector: $"metadata.name={"testPod"}").ConfigureAwait(false)) { testOutput.WriteLine($"AsyncEnumerable Watcher received '{type}' event for pod '{item?.Metadata?.Name}'."); events.Add((type, item)); eventsReceived.Signal(); if (events.Count >= 3) { break; } } } catch (Exception ex) { testOutput.WriteLine($"AsyncEnumerable Watcher received exception: {ex.GetType().FullName}"); } finally { watchCompleted.Set(); } }); // Wait for events await Task.WhenAny(eventsReceived.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true); Assert.True( eventsReceived.CurrentCount == 0, "Timed out waiting for all events to be received."); Assert.Equal(3, events.Count); Assert.Contains(events, e => e.Item1 == WatchEventType.Added); Assert.Contains(events, e => e.Item1 == WatchEventType.Deleted); Assert.Contains(events, e => e.Item1 == WatchEventType.Modified); serverShutdown.Set(); await Task.WhenAny(watchCompleted.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true); Assert.True(watchCompleted.IsSet); } } [Fact] public async Task AsyncEnumerableWatchErrorHandling() { var eventsReceived = new AsyncCountdownEvent(3); var serverShutdown = new AsyncManualResetEvent(); var watchCompleted = new AsyncManualResetEvent(); var errorReceived = new AsyncManualResetEvent(); using (var server = new MockKubeApiServer(testOutput, async httpContext => { await WriteStreamLine(httpContext, MockKubeApiServer.MockPodResponse).ConfigureAwait(true); await WriteStreamLine(httpContext, MockBadStreamLine).ConfigureAwait(true); await WriteStreamLine(httpContext, MockAddedEventStreamLine).ConfigureAwait(true); await serverShutdown.WaitAsync().ConfigureAwait(true); return false; })) { var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString() }); var events = new List<(WatchEventType, V1Pod)>(); var errorCaught = false; var watchTask = Task.Run(async () => { try { await foreach (var (type, item) in client.CoreV1.WatchListNamespacedPodAsync( "default", onError: ex => { testOutput.WriteLine($"AsyncEnumerable Watcher onError called: {ex.GetType().FullName}"); errorCaught = true; errorReceived.Set(); eventsReceived.Signal(); }).ConfigureAwait(false)) { testOutput.WriteLine($"AsyncEnumerable Watcher received '{type}' event."); events.Add((type, item)); eventsReceived.Signal(); // Expect some valid events plus error handling if (events.Count >= 2) { break; } } } catch (Exception ex) { testOutput.WriteLine($"AsyncEnumerable Watcher caught exception: {ex.GetType().FullName}"); } finally { watchCompleted.Set(); } }); // Wait for events and errors await Task.WhenAny(eventsReceived.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true); Assert.True( eventsReceived.CurrentCount == 0, "Timed out waiting for events and errors to be received."); // Should have received at least one valid event and one error Assert.True(events.Count >= 1, "Should have received at least one valid event"); Assert.True(errorCaught, "Should have caught parsing error"); Assert.True(errorReceived.IsSet, "Error callback should have been called"); serverShutdown.Set(); await Task.WhenAny(watchCompleted.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(true); Assert.True(watchCompleted.IsSet); } } } }