Improve Watcher thread utilisation. (#386)

* Add Watcher constructor overload which accepts the more general TextReader.

* Make CancelableStream delegate the async methods to the inner stream.

* Make CancelableStream dispose of the inner stream.

* Make PeekableStreamReader a decorator enabling direct delegation to the inner's asynchronous methods.

* Make CancelableStream.Flush respect the cancellation token.

* Rename CancellationTokenSourceSlim -> LinkedCancellationTokenSource.

* Specify 7.3 language version.
This commit is contained in:
Andy Kernahan
2020-04-01 03:45:27 +01:00
committed by GitHub
parent ddb4b21aeb
commit 3e94068d43
3 changed files with 107 additions and 16 deletions

View File

@@ -9,6 +9,7 @@
<PackageIconUrl>https://raw.githubusercontent.com/kubernetes/kubernetes/master/logo/logo.png</PackageIconUrl>
<PackageTags>kubernetes;docker;containers;</PackageTags>
<LangVersion>7.3</LangVersion>
<TargetFrameworks>netstandard2.0;net452;netcoreapp2.1</TargetFrameworks>
<TargetFrameworks Condition="'$(OS)' != 'Windows_NT'">netstandard2.0;netcoreapp2.1</TargetFrameworks>
<RootNamespace>k8s</RootNamespace>

View File

@@ -29,9 +29,9 @@ namespace k8s
public bool Watching { get; private set; }
private readonly CancellationTokenSource _cts;
private readonly Func<Task<StreamReader>> _streamReaderCreator;
private readonly Func<Task<TextReader>> _streamReaderCreator;
private StreamReader _streamReader;
private TextReader _streamReader;
private readonly Task _watcherLoop;
/// <summary>
@@ -50,6 +50,28 @@ namespace k8s
/// The action to invoke when the server closes the connection.
/// </param>
public Watcher(Func<Task<StreamReader>> streamReaderCreator, Action<WatchEventType, T> onEvent, Action<Exception> onError, Action onClosed = null)
: this(
async () => (TextReader)await streamReaderCreator().ConfigureAwait(false),
onEvent, onError, onClosed)
{
}
/// <summary>
/// Initializes a new instance of the <see cref="Watcher{T}"/> class.
/// </summary>
/// <param name="streamReader">
/// A <see cref="StreamReader"/> from which to read the events.
/// </param>
/// <param name="onEvent">
/// The action to invoke when the server sends a new event.
/// </param>
/// <param name="onError">
/// The action to invoke when an error occurs.
/// </param>
/// <param name="onClosed">
/// The action to invoke when the server closes the connection.
/// </param>
public Watcher(Func<Task<TextReader>> streamReaderCreator, Action<WatchEventType, T> onEvent, Action<Exception> onError, Action onClosed = null)
{
_streamReaderCreator = streamReaderCreator;
OnEvent += onEvent;

View File

@@ -45,11 +45,28 @@ namespace k8s
_cancellationToken = cancellationToken;
}
public override void Flush() => _innerStream.Flush();
public override void Flush() =>
_innerStream.FlushAsync(_cancellationToken).GetAwaiter().GetResult();
public override async Task FlushAsync(CancellationToken cancellationToken)
{
using (var cancellationTokenSource = CreateCancellationTokenSource(cancellationToken))
{
await _innerStream.FlushAsync(cancellationTokenSource.Token).ConfigureAwait(false);
}
}
public override int Read(byte[] buffer, int offset, int count) =>
_innerStream.ReadAsync(buffer, offset, count, _cancellationToken).GetAwaiter().GetResult();
public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
using (var cancellationTokenSource = CreateCancellationTokenSource(cancellationToken))
{
return await _innerStream.ReadAsync(buffer, offset, count, cancellationTokenSource.Token).ConfigureAwait(false);
}
}
public override long Seek(long offset, SeekOrigin origin) => _innerStream.Seek(offset, origin);
public override void SetLength(long value) => _innerStream.SetLength(value);
@@ -57,6 +74,14 @@ namespace k8s
public override void Write(byte[] buffer, int offset, int count) =>
_innerStream.WriteAsync(buffer, offset, count, _cancellationToken).GetAwaiter().GetResult();
public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
using (var cancellationTokenSource = CreateCancellationTokenSource(cancellationToken))
{
await _innerStream.WriteAsync(buffer, offset, count, cancellationTokenSource.Token).ConfigureAwait(false);
}
}
public override bool CanRead => _innerStream.CanRead;
public override bool CanSeek => _innerStream.CanSeek;
@@ -70,6 +95,46 @@ namespace k8s
get => _innerStream.Position;
set => _innerStream.Position = value;
}
protected override void Dispose(bool disposing)
{
if (disposing)
{
_innerStream.Dispose();
}
base.Dispose(disposing);
}
private LinkedCancellationTokenSource CreateCancellationTokenSource(CancellationToken userCancellationToken)
{
return new LinkedCancellationTokenSource(_cancellationToken, userCancellationToken);
}
private readonly struct LinkedCancellationTokenSource : IDisposable
{
private readonly CancellationTokenSource _cancellationTokenSource;
public LinkedCancellationTokenSource(CancellationToken token1, CancellationToken token2)
{
if (token1.CanBeCanceled && token2.CanBeCanceled)
{
_cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token1, token2);
Token = _cancellationTokenSource.Token;
}
else
{
_cancellationTokenSource = null;
Token = token1.CanBeCanceled ? token1 : token2;
}
}
public CancellationToken Token { get; }
public void Dispose()
{
_cancellationTokenSource?.Dispose();
}
}
}
internal class LineSeparatedHttpContent : HttpContent
@@ -107,24 +172,18 @@ namespace k8s
}
}
internal class PeekableStreamReader : StreamReader
internal class PeekableStreamReader : TextReader
{
private Queue<string> _buffer;
private readonly Queue<string> _buffer;
private readonly StreamReader _inner;
public PeekableStreamReader(Stream stream) : base(stream)
public PeekableStreamReader(Stream stream)
{
_buffer = new Queue<string>();
_inner = new StreamReader(stream);
}
public override string ReadLine()
{
if (_buffer.Count > 0)
{
return _buffer.Dequeue();
}
return base.ReadLine();
}
public override string ReadLine() => throw new NotImplementedException();
public override Task<string> ReadLineAsync()
{
@@ -133,7 +192,7 @@ namespace k8s
return Task.FromResult(_buffer.Dequeue());
}
return base.ReadLineAsync();
return _inner.ReadLineAsync();
}
public async Task<string> PeekLineAsync()
@@ -156,6 +215,15 @@ namespace k8s
public override string ReadToEnd() => throw new NotImplementedException();
public override Task<string> ReadToEndAsync() => throw new NotImplementedException();
protected override void Dispose(bool disposing)
{
if (disposing)
{
_inner.Dispose();
}
base.Dispose(disposing);
}
}
}
}