expose IAsyncEnumerable API for watcher (#586)

* support async enum watch

* honor ct and catch more exception

* fix format

* better flaky

* ct to for should throw

* make sure no npe

* fix nuget build

* fix watcher test

* check close before dispose
This commit is contained in:
Boshi Lian
2021-09-17 08:19:01 -07:00
committed by GitHub
parent 61025a0255
commit 4e29dff8d2
4 changed files with 106 additions and 57 deletions

View File

@@ -10,6 +10,7 @@
<PackageIconUrl>https://raw.githubusercontent.com/kubernetes/kubernetes/master/logo/logo.png</PackageIconUrl>
<PackageTags>kubernetes;docker;containers;</PackageTags>
<LangVersion>8.0</LangVersion>
<TargetFrameworks>netstandard2.1;net5.0</TargetFrameworks>
<RootNamespace>k8s</RootNamespace>
<SignAssembly>true</SignAssembly>

View File

@@ -1,5 +1,8 @@
using System;
using System.Collections;
using System.Collections.Generic;
using System.IO;
using System.Runtime.CompilerServices;
using System.Runtime.Serialization;
using System.Threading;
using System.Threading.Tasks;
@@ -46,10 +49,10 @@ namespace k8s
private readonly CancellationTokenSource _cts;
private readonly Func<Task<TextReader>> _streamReaderCreator;
private TextReader _streamReader;
private bool disposedValue;
private readonly Task _watcherLoop;
/// <summary>
/// Initializes a new instance of the <see cref="Watcher{T}"/> class.
/// </summary>
@@ -111,7 +114,7 @@ namespace k8s
public event Action<Exception> OnError;
/// <summary>
/// The event which is raised when the server closes th econnection.
/// The event which is raised when the server closes the connection.
/// </summary>
public event Action OnClosed;
@@ -127,41 +130,19 @@ namespace k8s
try
{
Watching = true;
string line;
_streamReader = await _streamReaderCreator().ConfigureAwait(false);
// ReadLineAsync will return null when we've reached the end of the stream.
while ((line = await _streamReader.ReadLineAsync().ConfigureAwait(false)) != null)
await foreach (var (t, evt) in CreateWatchEventEnumerator(_streamReaderCreator, OnError,
cancellationToken)
.ConfigureAwait(false)
)
{
if (cancellationToken.IsCancellationRequested)
{
return;
}
try
{
var genericEvent =
SafeJsonConvert.DeserializeObject<Watcher<KubernetesObject>.WatchEvent>(line);
if (genericEvent.Object.Kind == "Status")
{
var statusEvent = SafeJsonConvert.DeserializeObject<Watcher<V1Status>.WatchEvent>(line);
var exception = new KubernetesException(statusEvent.Object);
OnError?.Invoke(exception);
}
else
{
var @event = SafeJsonConvert.DeserializeObject<WatchEvent>(line);
OnEvent?.Invoke(@event.Type, @event.Object);
}
}
catch (Exception e)
{
// error if deserialized failed or onevent throws
OnError?.Invoke(e);
}
OnEvent?.Invoke(t, evt);
}
}
catch (OperationCanceledException)
{
// ignore
}
catch (Exception e)
{
// error when transport error, IOException ect
@@ -174,6 +155,67 @@ namespace k8s
}
}
internal static async IAsyncEnumerable<(WatchEventType, T)> CreateWatchEventEnumerator(
Func<Task<TextReader>> streamReaderCreator,
Action<Exception> onError = null,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
Task<TR> AttachCancellationToken<TR>(Task<TR> task)
{
if (!task.IsCompleted)
{
// here to pass cancellationToken into task
return task.ContinueWith(t => t.GetAwaiter().GetResult(), cancellationToken);
}
return task;
}
using var streamReader = await AttachCancellationToken(streamReaderCreator()).ConfigureAwait(false);
for (; ; )
{
// ReadLineAsync will return null when we've reached the end of the stream.
var line = await AttachCancellationToken(streamReader.ReadLineAsync()).ConfigureAwait(false);
cancellationToken.ThrowIfCancellationRequested();
if (line == null)
{
yield break;
}
WatchEvent @event = null;
try
{
var genericEvent = SafeJsonConvert.DeserializeObject<Watcher<KubernetesObject>.WatchEvent>(line);
if (genericEvent.Object.Kind == "Status")
{
var statusEvent = SafeJsonConvert.DeserializeObject<Watcher<V1Status>.WatchEvent>(line);
var exception = new KubernetesException(statusEvent.Object);
onError?.Invoke(exception);
}
else
{
@event = SafeJsonConvert.DeserializeObject<WatchEvent>(line);
}
}
catch (Exception e)
{
onError?.Invoke(e);
}
if (@event != null)
{
yield return (@event.Type, @event.Object);
}
}
}
protected virtual void Dispose(bool disposing)
{
if (!disposedValue)
@@ -182,7 +224,6 @@ namespace k8s
{
_cts?.Cancel();
_cts?.Dispose();
_streamReader?.Dispose();
}
disposedValue = true;

View File

@@ -1,4 +1,6 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading.Tasks;
using k8s.Exceptions;
using Microsoft.Rest;
@@ -25,8 +27,12 @@ namespace k8s
Action<Exception> onError = null,
Action onClosed = null)
{
return new Watcher<T>(
async () =>
return new Watcher<T>(MakeStreamReaderCreator<T, L>(responseTask), onEvent, onError, onClosed);
}
private static Func<Task<TextReader>> MakeStreamReaderCreator<T, L>(Task<HttpOperationResponse<L>> responseTask)
{
return async () =>
{
var response = await responseTask.ConfigureAwait(false);
@@ -36,7 +42,7 @@ namespace k8s
}
return content.StreamReader;
}, onEvent, onError, onClosed);
};
}
/// <summary>
@@ -59,5 +65,12 @@ namespace k8s
{
return Watch(Task.FromResult(response), onEvent, onError, onClosed);
}
public static IAsyncEnumerable<(WatchEventType, T)> WatchAsync<T, L>(
this Task<HttpOperationResponse<L>> responseTask,
Action<Exception> onError = null)
{
return Watcher<T>.CreateWatchEventEnumerator(MakeStreamReaderCreator<T, L>(responseTask), onError);
}
}
}

View File

@@ -113,7 +113,7 @@ namespace k8s.Tests
}
[Fact]
public async Task SuriveBadLine()
public async Task SurviveBadLine()
{
var eventsReceived = new AsyncCountdownEvent(5);
var serverShutdown = new AsyncManualResetEvent();
@@ -188,19 +188,15 @@ namespace k8s.Tests
public async Task DisposeWatch()
{
var connectionClosed = new AsyncManualResetEvent();
var eventsReceived = new CountdownEvent(1);
var serverRunning = true;
var eventsReceived = new AsyncCountdownEvent(1);
var serverShutdown = new AsyncManualResetEvent();
using (var server = new MockKubeApiServer(testOutput, async httpContext =>
{
await WriteStreamLine(httpContext, MockKubeApiServer.MockPodResponse).ConfigureAwait(false);
await WriteStreamLine(httpContext, MockAddedEventStreamLine).ConfigureAwait(false);
await serverShutdown.WaitAsync().ConfigureAwait(false);
while (serverRunning)
{
await WriteStreamLine(httpContext, MockAddedEventStreamLine).ConfigureAwait(false);
}
return true;
return false;
}))
{
var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString() });
@@ -215,10 +211,14 @@ namespace k8s.Tests
events.Add(type);
eventsReceived.Signal();
},
error =>
{
testOutput.WriteLine($"Watcher received '{error.GetType().FullName}' error.");
},
onClosed: connectionClosed.Set);
// wait at least an event
await Task.WhenAny(Task.Run(() => eventsReceived.Wait()), Task.Delay(TestTimeout)).ConfigureAwait(false);
await Task.WhenAny(eventsReceived.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(false);
Assert.True(
eventsReceived.CurrentCount == 0,
"Timed out waiting for events.");
@@ -230,18 +230,12 @@ namespace k8s.Tests
events.Clear();
// Let the server disconnect
serverRunning = false;
var timeout = Task.Delay(TestTimeout);
while (!timeout.IsCompleted && watcher.Watching)
{
await Task.Yield();
}
await Task.WhenAny(connectionClosed.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(false);
Assert.False(watcher.Watching);
Assert.True(connectionClosed.IsSet);
serverShutdown.Set();
}
}