diff --git a/src/KubernetesClient/KubernetesClient.csproj b/src/KubernetesClient/KubernetesClient.csproj index c973f66..bc9cdd0 100644 --- a/src/KubernetesClient/KubernetesClient.csproj +++ b/src/KubernetesClient/KubernetesClient.csproj @@ -9,6 +9,7 @@ https://raw.githubusercontent.com/kubernetes/kubernetes/master/logo/logo.png kubernetes;docker;containers; + 7.3 netstandard2.0;net452;netcoreapp2.1 netstandard2.0;netcoreapp2.1 k8s diff --git a/src/KubernetesClient/Watcher.cs b/src/KubernetesClient/Watcher.cs index 045c4b9..5c850c3 100644 --- a/src/KubernetesClient/Watcher.cs +++ b/src/KubernetesClient/Watcher.cs @@ -29,9 +29,9 @@ namespace k8s public bool Watching { get; private set; } private readonly CancellationTokenSource _cts; - private readonly Func> _streamReaderCreator; + private readonly Func> _streamReaderCreator; - private StreamReader _streamReader; + private TextReader _streamReader; private readonly Task _watcherLoop; /// @@ -50,6 +50,28 @@ namespace k8s /// The action to invoke when the server closes the connection. /// public Watcher(Func> streamReaderCreator, Action onEvent, Action onError, Action onClosed = null) + : this( + async () => (TextReader)await streamReaderCreator().ConfigureAwait(false), + onEvent, onError, onClosed) + { + } + + /// + /// Initializes a new instance of the class. + /// + /// + /// A from which to read the events. + /// + /// + /// The action to invoke when the server sends a new event. + /// + /// + /// The action to invoke when an error occurs. + /// + /// + /// The action to invoke when the server closes the connection. + /// + public Watcher(Func> streamReaderCreator, Action onEvent, Action onError, Action onClosed = null) { _streamReaderCreator = streamReaderCreator; OnEvent += onEvent; diff --git a/src/KubernetesClient/WatcherDelegatingHandler.cs b/src/KubernetesClient/WatcherDelegatingHandler.cs index f7f1578..9fd753e 100644 --- a/src/KubernetesClient/WatcherDelegatingHandler.cs +++ b/src/KubernetesClient/WatcherDelegatingHandler.cs @@ -45,11 +45,28 @@ namespace k8s _cancellationToken = cancellationToken; } - public override void Flush() => _innerStream.Flush(); + public override void Flush() => + _innerStream.FlushAsync(_cancellationToken).GetAwaiter().GetResult(); + + public override async Task FlushAsync(CancellationToken cancellationToken) + { + using (var cancellationTokenSource = CreateCancellationTokenSource(cancellationToken)) + { + await _innerStream.FlushAsync(cancellationTokenSource.Token).ConfigureAwait(false); + } + } public override int Read(byte[] buffer, int offset, int count) => _innerStream.ReadAsync(buffer, offset, count, _cancellationToken).GetAwaiter().GetResult(); + public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + using (var cancellationTokenSource = CreateCancellationTokenSource(cancellationToken)) + { + return await _innerStream.ReadAsync(buffer, offset, count, cancellationTokenSource.Token).ConfigureAwait(false); + } + } + public override long Seek(long offset, SeekOrigin origin) => _innerStream.Seek(offset, origin); public override void SetLength(long value) => _innerStream.SetLength(value); @@ -57,6 +74,14 @@ namespace k8s public override void Write(byte[] buffer, int offset, int count) => _innerStream.WriteAsync(buffer, offset, count, _cancellationToken).GetAwaiter().GetResult(); + public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + using (var cancellationTokenSource = CreateCancellationTokenSource(cancellationToken)) + { + await _innerStream.WriteAsync(buffer, offset, count, cancellationTokenSource.Token).ConfigureAwait(false); + } + } + public override bool CanRead => _innerStream.CanRead; public override bool CanSeek => _innerStream.CanSeek; @@ -70,6 +95,46 @@ namespace k8s get => _innerStream.Position; set => _innerStream.Position = value; } + + protected override void Dispose(bool disposing) + { + if (disposing) + { + _innerStream.Dispose(); + } + base.Dispose(disposing); + } + + private LinkedCancellationTokenSource CreateCancellationTokenSource(CancellationToken userCancellationToken) + { + return new LinkedCancellationTokenSource(_cancellationToken, userCancellationToken); + } + + private readonly struct LinkedCancellationTokenSource : IDisposable + { + private readonly CancellationTokenSource _cancellationTokenSource; + + public LinkedCancellationTokenSource(CancellationToken token1, CancellationToken token2) + { + if (token1.CanBeCanceled && token2.CanBeCanceled) + { + _cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token1, token2); + Token = _cancellationTokenSource.Token; + } + else + { + _cancellationTokenSource = null; + Token = token1.CanBeCanceled ? token1 : token2; + } + } + + public CancellationToken Token { get; } + + public void Dispose() + { + _cancellationTokenSource?.Dispose(); + } + } } internal class LineSeparatedHttpContent : HttpContent @@ -107,24 +172,18 @@ namespace k8s } } - internal class PeekableStreamReader : StreamReader + internal class PeekableStreamReader : TextReader { - private Queue _buffer; + private readonly Queue _buffer; + private readonly StreamReader _inner; - public PeekableStreamReader(Stream stream) : base(stream) + public PeekableStreamReader(Stream stream) { _buffer = new Queue(); + _inner = new StreamReader(stream); } - public override string ReadLine() - { - if (_buffer.Count > 0) - { - return _buffer.Dequeue(); - } - - return base.ReadLine(); - } + public override string ReadLine() => throw new NotImplementedException(); public override Task ReadLineAsync() { @@ -133,7 +192,7 @@ namespace k8s return Task.FromResult(_buffer.Dequeue()); } - return base.ReadLineAsync(); + return _inner.ReadLineAsync(); } public async Task PeekLineAsync() @@ -156,6 +215,15 @@ namespace k8s public override string ReadToEnd() => throw new NotImplementedException(); public override Task ReadToEndAsync() => throw new NotImplementedException(); + + protected override void Dispose(bool disposing) + { + if (disposing) + { + _inner.Dispose(); + } + base.Dispose(disposing); + } } } }