From 4e29dff8d28f7b7ab49b92c424771f930ac3df5f Mon Sep 17 00:00:00 2001 From: Boshi Lian Date: Fri, 17 Sep 2021 08:19:01 -0700 Subject: [PATCH] expose IAsyncEnumerable API for watcher (#586) * support async enum watch * honor ct and catch more exception * fix format * better flaky * ct to for should throw * make sure no npe * fix nuget build * fix watcher test * check close before dispose --- src/KubernetesClient/KubernetesClient.csproj | 1 + src/KubernetesClient/Watcher.cs | 109 +++++++++++++------ src/KubernetesClient/WatcherExt.cs | 19 +++- tests/KubernetesClient.Tests/WatchTests.cs | 34 +++--- 4 files changed, 106 insertions(+), 57 deletions(-) diff --git a/src/KubernetesClient/KubernetesClient.csproj b/src/KubernetesClient/KubernetesClient.csproj index 3e54e61..6acee80 100644 --- a/src/KubernetesClient/KubernetesClient.csproj +++ b/src/KubernetesClient/KubernetesClient.csproj @@ -10,6 +10,7 @@ https://raw.githubusercontent.com/kubernetes/kubernetes/master/logo/logo.png kubernetes;docker;containers; + 8.0 netstandard2.1;net5.0 k8s true diff --git a/src/KubernetesClient/Watcher.cs b/src/KubernetesClient/Watcher.cs index f87cbbb..11106e9 100644 --- a/src/KubernetesClient/Watcher.cs +++ b/src/KubernetesClient/Watcher.cs @@ -1,5 +1,8 @@ using System; +using System.Collections; +using System.Collections.Generic; using System.IO; +using System.Runtime.CompilerServices; using System.Runtime.Serialization; using System.Threading; using System.Threading.Tasks; @@ -46,10 +49,10 @@ namespace k8s private readonly CancellationTokenSource _cts; private readonly Func> _streamReaderCreator; - private TextReader _streamReader; private bool disposedValue; private readonly Task _watcherLoop; + /// /// Initializes a new instance of the class. /// @@ -111,7 +114,7 @@ namespace k8s public event Action OnError; /// - /// The event which is raised when the server closes th econnection. + /// The event which is raised when the server closes the connection. /// public event Action OnClosed; @@ -127,41 +130,19 @@ namespace k8s try { Watching = true; - string line; - _streamReader = await _streamReaderCreator().ConfigureAwait(false); - // ReadLineAsync will return null when we've reached the end of the stream. - while ((line = await _streamReader.ReadLineAsync().ConfigureAwait(false)) != null) + await foreach (var (t, evt) in CreateWatchEventEnumerator(_streamReaderCreator, OnError, + cancellationToken) + .ConfigureAwait(false) + ) { - if (cancellationToken.IsCancellationRequested) - { - return; - } - - try - { - var genericEvent = - SafeJsonConvert.DeserializeObject.WatchEvent>(line); - - if (genericEvent.Object.Kind == "Status") - { - var statusEvent = SafeJsonConvert.DeserializeObject.WatchEvent>(line); - var exception = new KubernetesException(statusEvent.Object); - OnError?.Invoke(exception); - } - else - { - var @event = SafeJsonConvert.DeserializeObject(line); - OnEvent?.Invoke(@event.Type, @event.Object); - } - } - catch (Exception e) - { - // error if deserialized failed or onevent throws - OnError?.Invoke(e); - } + OnEvent?.Invoke(t, evt); } } + catch (OperationCanceledException) + { + // ignore + } catch (Exception e) { // error when transport error, IOException ect @@ -174,6 +155,67 @@ namespace k8s } } + internal static async IAsyncEnumerable<(WatchEventType, T)> CreateWatchEventEnumerator( + Func> streamReaderCreator, + Action onError = null, + [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + Task AttachCancellationToken(Task task) + { + if (!task.IsCompleted) + { + // here to pass cancellationToken into task + return task.ContinueWith(t => t.GetAwaiter().GetResult(), cancellationToken); + } + + return task; + } + + using var streamReader = await AttachCancellationToken(streamReaderCreator()).ConfigureAwait(false); + + for (; ; ) + { + // ReadLineAsync will return null when we've reached the end of the stream. + var line = await AttachCancellationToken(streamReader.ReadLineAsync()).ConfigureAwait(false); + + cancellationToken.ThrowIfCancellationRequested(); + + if (line == null) + { + yield break; + } + + WatchEvent @event = null; + + try + { + var genericEvent = SafeJsonConvert.DeserializeObject.WatchEvent>(line); + + if (genericEvent.Object.Kind == "Status") + { + var statusEvent = SafeJsonConvert.DeserializeObject.WatchEvent>(line); + var exception = new KubernetesException(statusEvent.Object); + onError?.Invoke(exception); + } + else + { + @event = SafeJsonConvert.DeserializeObject(line); + } + } + catch (Exception e) + { + onError?.Invoke(e); + } + + + if (@event != null) + { + yield return (@event.Type, @event.Object); + } + } + } + + protected virtual void Dispose(bool disposing) { if (!disposedValue) @@ -182,7 +224,6 @@ namespace k8s { _cts?.Cancel(); _cts?.Dispose(); - _streamReader?.Dispose(); } disposedValue = true; diff --git a/src/KubernetesClient/WatcherExt.cs b/src/KubernetesClient/WatcherExt.cs index f6e3787..e162a20 100644 --- a/src/KubernetesClient/WatcherExt.cs +++ b/src/KubernetesClient/WatcherExt.cs @@ -1,4 +1,6 @@ using System; +using System.Collections.Generic; +using System.IO; using System.Threading.Tasks; using k8s.Exceptions; using Microsoft.Rest; @@ -25,8 +27,12 @@ namespace k8s Action onError = null, Action onClosed = null) { - return new Watcher( - async () => + return new Watcher(MakeStreamReaderCreator(responseTask), onEvent, onError, onClosed); + } + + private static Func> MakeStreamReaderCreator(Task> responseTask) + { + return async () => { var response = await responseTask.ConfigureAwait(false); @@ -36,7 +42,7 @@ namespace k8s } return content.StreamReader; - }, onEvent, onError, onClosed); + }; } /// @@ -59,5 +65,12 @@ namespace k8s { return Watch(Task.FromResult(response), onEvent, onError, onClosed); } + + public static IAsyncEnumerable<(WatchEventType, T)> WatchAsync( + this Task> responseTask, + Action onError = null) + { + return Watcher.CreateWatchEventEnumerator(MakeStreamReaderCreator(responseTask), onError); + } } } diff --git a/tests/KubernetesClient.Tests/WatchTests.cs b/tests/KubernetesClient.Tests/WatchTests.cs index c216678..9ec9de2 100644 --- a/tests/KubernetesClient.Tests/WatchTests.cs +++ b/tests/KubernetesClient.Tests/WatchTests.cs @@ -113,7 +113,7 @@ namespace k8s.Tests } [Fact] - public async Task SuriveBadLine() + public async Task SurviveBadLine() { var eventsReceived = new AsyncCountdownEvent(5); var serverShutdown = new AsyncManualResetEvent(); @@ -188,19 +188,15 @@ namespace k8s.Tests public async Task DisposeWatch() { var connectionClosed = new AsyncManualResetEvent(); - var eventsReceived = new CountdownEvent(1); - var serverRunning = true; + var eventsReceived = new AsyncCountdownEvent(1); + var serverShutdown = new AsyncManualResetEvent(); using (var server = new MockKubeApiServer(testOutput, async httpContext => { - await WriteStreamLine(httpContext, MockKubeApiServer.MockPodResponse).ConfigureAwait(false); + await WriteStreamLine(httpContext, MockAddedEventStreamLine).ConfigureAwait(false); + await serverShutdown.WaitAsync().ConfigureAwait(false); - while (serverRunning) - { - await WriteStreamLine(httpContext, MockAddedEventStreamLine).ConfigureAwait(false); - } - - return true; + return false; })) { var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString() }); @@ -215,10 +211,14 @@ namespace k8s.Tests events.Add(type); eventsReceived.Signal(); }, + error => + { + testOutput.WriteLine($"Watcher received '{error.GetType().FullName}' error."); + }, onClosed: connectionClosed.Set); // wait at least an event - await Task.WhenAny(Task.Run(() => eventsReceived.Wait()), Task.Delay(TestTimeout)).ConfigureAwait(false); + await Task.WhenAny(eventsReceived.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(false); Assert.True( eventsReceived.CurrentCount == 0, "Timed out waiting for events."); @@ -230,18 +230,12 @@ namespace k8s.Tests events.Clear(); - // Let the server disconnect - serverRunning = false; - - var timeout = Task.Delay(TestTimeout); - - while (!timeout.IsCompleted && watcher.Watching) - { - await Task.Yield(); - } + await Task.WhenAny(connectionClosed.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(false); Assert.False(watcher.Watching); Assert.True(connectionClosed.IsSet); + + serverShutdown.Set(); } }