fix watch=true doesn't respect cancellation token (#376)
* fix watch=true doesn't respect cancellation token * use exp body where can * address comments
This commit is contained in:
@@ -27,20 +27,61 @@ namespace k8s
|
||||
|
||||
if (query.TryGetValue("watch", out var values) && values.Any(v => v == "true"))
|
||||
{
|
||||
originResponse.Content = new LineSeparatedHttpContent(originResponse.Content);
|
||||
originResponse.Content = new LineSeparatedHttpContent(originResponse.Content, cancellationToken);
|
||||
}
|
||||
}
|
||||
|
||||
return originResponse;
|
||||
}
|
||||
|
||||
internal class CancelableStream : Stream
|
||||
{
|
||||
private readonly Stream _innerStream;
|
||||
private readonly CancellationToken _cancellationToken;
|
||||
|
||||
public CancelableStream(Stream innerStream, CancellationToken cancellationToken)
|
||||
{
|
||||
_innerStream = innerStream;
|
||||
_cancellationToken = cancellationToken;
|
||||
}
|
||||
|
||||
public override void Flush() => _innerStream.Flush();
|
||||
|
||||
public override int Read(byte[] buffer, int offset, int count) =>
|
||||
_innerStream.ReadAsync(buffer, offset, count, _cancellationToken).GetAwaiter().GetResult();
|
||||
|
||||
public override long Seek(long offset, SeekOrigin origin) => _innerStream.Seek(offset, origin);
|
||||
|
||||
public override void SetLength(long value) => _innerStream.SetLength(value);
|
||||
|
||||
public override void Write(byte[] buffer, int offset, int count) =>
|
||||
_innerStream.WriteAsync(buffer, offset, count, _cancellationToken).GetAwaiter().GetResult();
|
||||
|
||||
public override bool CanRead => _innerStream.CanRead;
|
||||
|
||||
public override bool CanSeek => _innerStream.CanSeek;
|
||||
|
||||
public override bool CanWrite => _innerStream.CanWrite;
|
||||
|
||||
public override long Length => _innerStream.Length;
|
||||
|
||||
public override long Position
|
||||
{
|
||||
get => _innerStream.Position;
|
||||
set => _innerStream.Position = value;
|
||||
}
|
||||
}
|
||||
|
||||
internal class LineSeparatedHttpContent : HttpContent
|
||||
{
|
||||
private readonly HttpContent _originContent;
|
||||
private readonly CancellationToken _cancellationToken;
|
||||
private Stream _originStream;
|
||||
|
||||
public LineSeparatedHttpContent(HttpContent originContent)
|
||||
public LineSeparatedHttpContent(HttpContent originContent, CancellationToken cancellationToken)
|
||||
{
|
||||
_originContent = originContent;
|
||||
_cancellationToken = cancellationToken;
|
||||
}
|
||||
|
||||
internal PeekableStreamReader StreamReader { get; private set; }
|
||||
@@ -49,17 +90,14 @@ namespace k8s
|
||||
{
|
||||
_originStream = await _originContent.ReadAsStreamAsync().ConfigureAwait(false);
|
||||
|
||||
StreamReader = new PeekableStreamReader(_originStream);
|
||||
StreamReader = new PeekableStreamReader(new CancelableStream(_originStream, _cancellationToken));
|
||||
|
||||
var firstLine = await StreamReader.PeekLineAsync().ConfigureAwait(false);
|
||||
|
||||
var writer = new StreamWriter(stream);
|
||||
|
||||
// using (writer) // leave open
|
||||
{
|
||||
await writer.WriteAsync(firstLine).ConfigureAwait(false);
|
||||
await writer.FlushAsync().ConfigureAwait(false);
|
||||
}
|
||||
await writer.WriteAsync(firstLine).ConfigureAwait(false);
|
||||
await writer.FlushAsync().ConfigureAwait(false);
|
||||
}
|
||||
|
||||
protected override bool TryComputeLength(out long length)
|
||||
@@ -68,9 +106,11 @@ namespace k8s
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
internal class PeekableStreamReader : StreamReader
|
||||
{
|
||||
private Queue<string> _buffer;
|
||||
|
||||
public PeekableStreamReader(Stream stream) : base(stream)
|
||||
{
|
||||
_buffer = new Queue<string>();
|
||||
@@ -82,16 +122,20 @@ namespace k8s
|
||||
{
|
||||
return _buffer.Dequeue();
|
||||
}
|
||||
|
||||
return base.ReadLine();
|
||||
}
|
||||
|
||||
public override Task<string> ReadLineAsync()
|
||||
{
|
||||
if (_buffer.Count > 0)
|
||||
{
|
||||
return Task.FromResult(_buffer.Dequeue());
|
||||
}
|
||||
|
||||
return base.ReadLineAsync();
|
||||
}
|
||||
|
||||
public async Task<string> PeekLineAsync()
|
||||
{
|
||||
var line = await ReadLineAsync().ConfigureAwait(false);
|
||||
@@ -99,35 +143,19 @@ namespace k8s
|
||||
return line;
|
||||
}
|
||||
|
||||
public override int Read()
|
||||
{
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
public override int Read() => throw new NotImplementedException();
|
||||
|
||||
public override int Read(char[] buffer, int index, int count)
|
||||
{
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
public override Task<int> ReadAsync(char[] buffer, int index, int count)
|
||||
{
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
public override int ReadBlock(char[] buffer, int index, int count)
|
||||
{
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
public override Task<int> ReadBlockAsync(char[] buffer, int index, int count)
|
||||
{
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
public override string ReadToEnd()
|
||||
{
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
public override Task<string> ReadToEndAsync()
|
||||
{
|
||||
throw new NotImplementedException();
|
||||
}
|
||||
public override int Read(char[] buffer, int index, int count) => throw new NotImplementedException();
|
||||
|
||||
public override Task<int> ReadAsync(char[] buffer, int index, int count) => throw new NotImplementedException();
|
||||
|
||||
public override int ReadBlock(char[] buffer, int index, int count) => throw new NotImplementedException();
|
||||
|
||||
public override Task<int> ReadBlockAsync(char[] buffer, int index, int count) => throw new NotImplementedException();
|
||||
|
||||
public override string ReadToEnd() => throw new NotImplementedException();
|
||||
|
||||
public override Task<string> ReadToEndAsync() => throw new NotImplementedException();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -69,13 +69,14 @@ namespace k8s.Tests
|
||||
var listTask = client.ListNamespacedPodWithHttpMessagesAsync("default");
|
||||
var onErrorCalled = false;
|
||||
|
||||
using (listTask.Watch<V1Pod, V1PodList>((type, item) => { }, e => {
|
||||
using (listTask.Watch<V1Pod, V1PodList>((type, item) => { }, e =>
|
||||
{
|
||||
onErrorCalled = true;
|
||||
})) { }
|
||||
|
||||
await Task.Delay(TimeSpan.FromSeconds(1)); // delay for onerror to be called
|
||||
Assert.True(onErrorCalled);
|
||||
|
||||
|
||||
|
||||
// server did not response line by line
|
||||
await Assert.ThrowsAnyAsync<Exception>(() =>
|
||||
@@ -109,7 +110,8 @@ namespace k8s.Tests
|
||||
|
||||
|
||||
var listTask = client.ListNamespacedPodWithHttpMessagesAsync("default", watch: true);
|
||||
using (listTask.Watch<V1Pod, V1PodList>((type, item) => {
|
||||
using (listTask.Watch<V1Pod, V1PodList>((type, item) =>
|
||||
{
|
||||
eventsReceived.Set();
|
||||
}))
|
||||
{
|
||||
@@ -748,5 +750,35 @@ namespace k8s.Tests
|
||||
}
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task WatchShouldCancelAfterRequested()
|
||||
{
|
||||
AsyncManualResetEvent serverShutdown = new AsyncManualResetEvent();
|
||||
|
||||
using (var server = new MockKubeApiServer(testOutput, async httpContext =>
|
||||
{
|
||||
httpContext.Response.StatusCode = 200;
|
||||
await httpContext.Response.Body.FlushAsync();
|
||||
await Task.Delay(TimeSpan.FromSeconds(5)); // The default timeout is 100 seconds
|
||||
|
||||
return true;
|
||||
}, resp: ""))
|
||||
{
|
||||
var client = new Kubernetes(new KubernetesClientConfiguration
|
||||
{
|
||||
Host = server.Uri.ToString()
|
||||
});
|
||||
|
||||
var cts = new CancellationTokenSource();
|
||||
cts.CancelAfter(TimeSpan.FromSeconds(2));
|
||||
|
||||
await Assert.ThrowsAnyAsync<TaskCanceledException>(async () =>
|
||||
{
|
||||
await client.ListNamespacedPodWithHttpMessagesAsync("default", watch: true, cancellationToken: cts.Token);
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user