diff --git a/examples/watch/Program.cs b/examples/watch/Program.cs index ad2fbf0..99da5a3 100644 --- a/examples/watch/Program.cs +++ b/examples/watch/Program.cs @@ -13,8 +13,8 @@ namespace watch IKubernetes client = new Kubernetes(config); - var podlistResp = client.ListNamespacedPodWithHttpMessagesAsync("default", watch: true).Result; - using (podlistResp.Watch((type, item) => + var podlistResp = client.ListNamespacedPodWithHttpMessagesAsync("default", watch: true); + using (podlistResp.Watch((type, item) => { Console.WriteLine("==on watch event=="); Console.WriteLine(type); diff --git a/src/KubernetesClient/Kubernetes.Watch.cs b/src/KubernetesClient/Kubernetes.Watch.cs index 9ecf56e..ca992ca 100644 --- a/src/KubernetesClient/Kubernetes.Watch.cs +++ b/src/KubernetesClient/Kubernetes.Watch.cs @@ -149,10 +149,12 @@ namespace k8s throw ex; } - var stream = await httpResponse.Content.ReadAsStreamAsync().ConfigureAwait(false); - StreamReader reader = new StreamReader(stream); + return new Watcher(async () => { + var stream = await httpResponse.Content.ReadAsStreamAsync().ConfigureAwait(false); + StreamReader reader = new StreamReader(stream); - return new Watcher(reader, onEvent, onError, onClosed); + return reader; + }, onEvent, onError, onClosed); } } } diff --git a/src/KubernetesClient/Watcher.cs b/src/KubernetesClient/Watcher.cs index da730f7..e4db6f9 100644 --- a/src/KubernetesClient/Watcher.cs +++ b/src/KubernetesClient/Watcher.cs @@ -29,7 +29,9 @@ namespace k8s public bool Watching { get; private set; } private readonly CancellationTokenSource _cts; - private readonly StreamReader _streamReader; + private readonly Func> _streamReaderCreator; + + private StreamReader _streamReader; private readonly Task _watcherLoop; /// @@ -47,9 +49,9 @@ namespace k8s /// /// The action to invoke when the server closes the connection. /// - public Watcher(StreamReader streamReader, Action onEvent, Action onError, Action onClosed = null) + public Watcher(Func> streamReaderCreator, Action onEvent, Action onError, Action onClosed = null) { - _streamReader = streamReader; + _streamReaderCreator = streamReaderCreator; OnEvent += onEvent; OnError += onError; OnClosed += onClosed; @@ -62,7 +64,7 @@ namespace k8s public void Dispose() { _cts.Cancel(); - _streamReader.Dispose(); + _streamReader?.Dispose(); } /// @@ -96,9 +98,10 @@ namespace k8s { Watching = true; string line; + _streamReader = await _streamReaderCreator(); // ReadLineAsync will return null when we've reached the end of the stream. - while ((line = await this._streamReader.ReadLineAsync().ConfigureAwait(false)) != null) + while ((line = await _streamReader.ReadLineAsync().ConfigureAwait(false)) != null) { if (cancellationToken.IsCancellationRequested) { @@ -147,6 +150,7 @@ namespace k8s /// create a watch object from a call to api server with watch=true /// /// type of the event object + /// type of the HttpOperationResponse object /// the api response /// a callback when any event raised from api server /// a callbak when any exception was caught during watching @@ -154,23 +158,28 @@ namespace k8s /// The action to invoke when the server closes the connection. /// /// a watch object - public static Watcher Watch(this HttpOperationResponse response, + public static Watcher Watch(this Task> responseTask, Action onEvent, Action onError = null, Action onClosed = null) { - if (!(response.Response.Content is WatcherDelegatingHandler.LineSeparatedHttpContent content)) - { - throw new KubernetesClientException("not a watchable request or failed response"); - } + return new Watcher(async () => { + var response = await responseTask; - return new Watcher(content.StreamReader, onEvent, onError, onClosed); + if (!(response.Response.Content is WatcherDelegatingHandler.LineSeparatedHttpContent content)) + { + throw new KubernetesClientException("not a watchable request or failed response"); + } + + return content.StreamReader; + } , onEvent, onError, onClosed); } /// /// create a watch object from a call to api server with watch=true /// /// type of the event object + /// type of the HttpOperationResponse object /// the api response /// a callback when any event raised from api server /// a callbak when any exception was caught during watching @@ -178,12 +187,12 @@ namespace k8s /// The action to invoke when the server closes the connection. /// /// a watch object - public static Watcher Watch(this HttpOperationResponse response, + public static Watcher Watch(this HttpOperationResponse response, Action onEvent, Action onError = null, Action onClosed = null) { - return Watch((HttpOperationResponse)response, onEvent, onError, onClosed); + return Watch(Task.FromResult(response), onEvent, onError, onClosed); } } } diff --git a/tests/KubernetesClient.Tests/WatchTests.cs b/tests/KubernetesClient.Tests/WatchTests.cs index 0b61dcd..916c69e 100644 --- a/tests/KubernetesClient.Tests/WatchTests.cs +++ b/tests/KubernetesClient.Tests/WatchTests.cs @@ -66,11 +66,16 @@ namespace k8s.Tests }); // did not pass watch param - var listTask = await client.ListNamespacedPodWithHttpMessagesAsync("default"); - Assert.ThrowsAny(() => - { - listTask.Watch((type, item) => { }); - }); + var listTask = client.ListNamespacedPodWithHttpMessagesAsync("default"); + var onErrorCalled = false; + + using (listTask.Watch((type, item) => { }, e => { + onErrorCalled = true; + })) { } + + await Task.Delay(TimeSpan.FromSeconds(1)); // delay for onerror to be called + Assert.True(onErrorCalled); + // server did not response line by line await Assert.ThrowsAnyAsync(() => @@ -83,6 +88,41 @@ namespace k8s.Tests } } + [Fact] + public async Task AsyncWatcher() + { + var created = new AsyncManualResetEvent(false); + var eventsReceived = new AsyncManualResetEvent(false); + + using (var server = new MockKubeApiServer(testOutput, async httpContext => + { + // block until reponse watcher obj created + await created.WaitAsync(); + await WriteStreamLine(httpContext, MockAddedEventStreamLine); + return false; + })) + { + var client = new Kubernetes(new KubernetesClientConfiguration + { + Host = server.Uri.ToString() + }); + + + var listTask = client.ListNamespacedPodWithHttpMessagesAsync("default", watch: true); + using (listTask.Watch((type, item) => { + eventsReceived.Set(); + })) + { + // here watcher is ready to use, but http server has not responsed yet. + created.Set(); + await Task.WhenAny(eventsReceived.WaitAsync(), Task.Delay(TestTimeout)); + } + + Assert.True(eventsReceived.IsSet); + Assert.True(created.IsSet); + } + } + [Fact] public async Task SuriveBadLine() { @@ -119,7 +159,7 @@ namespace k8s.Tests var events = new HashSet(); var errors = 0; - var watcher = listTask.Watch( + var watcher = listTask.Watch( (type, item) => { testOutput.WriteLine($"Watcher received '{type}' event."); @@ -188,7 +228,7 @@ namespace k8s.Tests var events = new HashSet(); - var watcher = listTask.Watch( + var watcher = listTask.Watch( (type, item) => { events.Add(type); @@ -256,7 +296,7 @@ namespace k8s.Tests var events = new HashSet(); var errors = 0; - var watcher = listTask.Watch( + var watcher = listTask.Watch( (type, item) => { testOutput.WriteLine($"Watcher received '{type}' event."); @@ -330,7 +370,7 @@ namespace k8s.Tests var events = new HashSet(); var errors = 0; - var watcher = listTask.Watch( + var watcher = listTask.Watch( (type, item) => { testOutput.WriteLine($"Watcher received '{type}' event."); @@ -396,7 +436,7 @@ namespace k8s.Tests waitForException.Set(); Watcher watcher; - watcher = listTask.Watch( + watcher = listTask.Watch( onEvent: (type, item) => { }, onError: e => { @@ -463,7 +503,7 @@ namespace k8s.Tests var events = new HashSet(); - var watcher = listTask.Watch( + var watcher = listTask.Watch( (type, item) => { events.Add(type); diff --git a/tests/KubernetesClient.Tests/WatcherTests.cs b/tests/KubernetesClient.Tests/WatcherTests.cs index 1aa71ce..5e8c9b7 100644 --- a/tests/KubernetesClient.Tests/WatcherTests.cs +++ b/tests/KubernetesClient.Tests/WatcherTests.cs @@ -3,6 +3,7 @@ using System; using System.IO; using System.Text; using System.Threading; +using System.Threading.Tasks; using Xunit; namespace k8s.Tests @@ -21,7 +22,7 @@ namespace k8s.Tests ManualResetEvent mre = new ManualResetEvent(false); Watcher watcher = new Watcher( - reader, + () => Task.FromResult(reader), null, (exception) => {