Watcher: notify the caller when the server closes the connection (#184)
* Watcher: Add onClosed event and callback * Update the code generators to accept the onClose callback * Re-generate API * Update unit tests * Fix unit test * Watcher fixes/improvements: - Run Task.Yield at the beginning to make sure we're running async - Set Watching = false before invoking OnClosed, to make sure the callers see a consistent state.
This commit is contained in:
committed by
Brendan Burns
parent
c1543b527e
commit
bc1cb6205c
@@ -1,5 +1,7 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Collections.ObjectModel;
|
||||
using System.Diagnostics;
|
||||
using System.IO;
|
||||
using System.Linq;
|
||||
using System.Net;
|
||||
@@ -86,6 +88,7 @@ namespace k8s.Tests
|
||||
{
|
||||
AsyncCountdownEvent eventsReceived = new AsyncCountdownEvent(4 /* first line of response is eaten by WatcherDelegatingHandler */);
|
||||
AsyncManualResetEvent serverShutdown = new AsyncManualResetEvent();
|
||||
AsyncManualResetEvent connectionClosed = new AsyncManualResetEvent();
|
||||
|
||||
using (var server =
|
||||
new MockKubeApiServer(
|
||||
@@ -130,7 +133,8 @@ namespace k8s.Tests
|
||||
|
||||
errors += 1;
|
||||
eventsReceived.Signal();
|
||||
}
|
||||
},
|
||||
onClosed: connectionClosed.Set
|
||||
);
|
||||
|
||||
// wait server yields all events
|
||||
@@ -150,12 +154,16 @@ namespace k8s.Tests
|
||||
|
||||
// Let the server know it can initiate a shut down.
|
||||
serverShutdown.Set();
|
||||
|
||||
await Task.WhenAny(connectionClosed.WaitAsync(), Task.Delay(TestTimeout));
|
||||
Assert.True(connectionClosed.IsSet);
|
||||
}
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task DisposeWatch()
|
||||
{
|
||||
var connectionClosed = new AsyncManualResetEvent();
|
||||
var eventsReceived = new AsyncCountdownEvent(1);
|
||||
bool serverRunning = true;
|
||||
|
||||
@@ -185,7 +193,8 @@ namespace k8s.Tests
|
||||
{
|
||||
events.Add(type);
|
||||
eventsReceived.Signal();
|
||||
}
|
||||
},
|
||||
onClosed: connectionClosed.Set
|
||||
);
|
||||
|
||||
// wait at least an event
|
||||
@@ -207,13 +216,14 @@ namespace k8s.Tests
|
||||
|
||||
var timeout = Task.Delay(TestTimeout);
|
||||
|
||||
while(!timeout.IsCompleted && watcher.Watching)
|
||||
while (!timeout.IsCompleted && watcher.Watching)
|
||||
{
|
||||
await Task.Yield();
|
||||
}
|
||||
|
||||
Assert.Empty(events);
|
||||
Assert.False(watcher.Watching);
|
||||
Assert.True(connectionClosed.IsSet);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -222,6 +232,7 @@ namespace k8s.Tests
|
||||
{
|
||||
AsyncCountdownEvent eventsReceived = new AsyncCountdownEvent(4 /* first line of response is eaten by WatcherDelegatingHandler */);
|
||||
AsyncManualResetEvent serverShutdown = new AsyncManualResetEvent();
|
||||
var waitForClosed = new AsyncManualResetEvent(false);
|
||||
|
||||
using (var server = new MockKubeApiServer(testOutput, async httpContext =>
|
||||
{
|
||||
@@ -260,7 +271,8 @@ namespace k8s.Tests
|
||||
|
||||
errors += 1;
|
||||
eventsReceived.Signal();
|
||||
}
|
||||
},
|
||||
onClosed: waitForClosed.Set
|
||||
);
|
||||
|
||||
// wait server yields all events
|
||||
@@ -281,6 +293,10 @@ namespace k8s.Tests
|
||||
Assert.True(watcher.Watching);
|
||||
|
||||
serverShutdown.Set();
|
||||
|
||||
await Task.WhenAny(waitForClosed.WaitAsync(), Task.Delay(TestTimeout));
|
||||
Assert.True(waitForClosed.IsSet);
|
||||
Assert.False(watcher.Watching);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -289,6 +305,7 @@ namespace k8s.Tests
|
||||
{
|
||||
AsyncCountdownEvent eventsReceived = new AsyncCountdownEvent(4 /* first line of response is eaten by WatcherDelegatingHandler */);
|
||||
AsyncManualResetEvent serverShutdown = new AsyncManualResetEvent();
|
||||
AsyncManualResetEvent connectionClosed = new AsyncManualResetEvent();
|
||||
|
||||
using (var server = new MockKubeApiServer(testOutput, async httpContext =>
|
||||
{
|
||||
@@ -328,7 +345,8 @@ namespace k8s.Tests
|
||||
|
||||
errors += 1;
|
||||
eventsReceived.Signal();
|
||||
}
|
||||
},
|
||||
onClosed: connectionClosed.Set
|
||||
);
|
||||
|
||||
// wait server yields all events
|
||||
@@ -349,6 +367,9 @@ namespace k8s.Tests
|
||||
Assert.True(watcher.Watching);
|
||||
|
||||
serverShutdown.Set();
|
||||
|
||||
await Task.WhenAny(connectionClosed.WaitAsync(), Task.Delay(TestTimeout));
|
||||
Assert.True(connectionClosed.IsSet);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -358,6 +379,8 @@ namespace k8s.Tests
|
||||
Exception exceptionCatched = null;
|
||||
var exceptionReceived = new AsyncManualResetEvent(false);
|
||||
var waitForException = new AsyncManualResetEvent(false);
|
||||
var waitForClosed = new AsyncManualResetEvent(false);
|
||||
|
||||
using (var server = new MockKubeApiServer(testOutput, async httpContext =>
|
||||
{
|
||||
await WriteStreamLine(httpContext, MockKubeApiServer.MockPodResponse);
|
||||
@@ -375,12 +398,13 @@ namespace k8s.Tests
|
||||
waitForException.Set();
|
||||
Watcher<V1Pod> watcher;
|
||||
watcher = listTask.Watch<V1Pod>(
|
||||
(type, item) => { },
|
||||
e =>
|
||||
onEvent: (type, item) => { },
|
||||
onError: e =>
|
||||
{
|
||||
exceptionCatched = e;
|
||||
exceptionReceived.Set();
|
||||
});
|
||||
},
|
||||
onClosed: waitForClosed.Set);
|
||||
|
||||
// wait server down
|
||||
await Task.WhenAny(exceptionReceived.WaitAsync(), Task.Delay(TestTimeout));
|
||||
@@ -390,6 +414,8 @@ namespace k8s.Tests
|
||||
"Timed out waiting for exception"
|
||||
);
|
||||
|
||||
await Task.WhenAny(waitForClosed.WaitAsync(), Task.Delay(TestTimeout));
|
||||
Assert.True(waitForClosed.IsSet);
|
||||
Assert.False(watcher.Watching);
|
||||
Assert.IsType<IOException>(exceptionCatched);
|
||||
}
|
||||
@@ -468,6 +494,7 @@ namespace k8s.Tests
|
||||
{
|
||||
AsyncCountdownEvent eventsReceived = new AsyncCountdownEvent(4);
|
||||
AsyncManualResetEvent serverShutdown = new AsyncManualResetEvent();
|
||||
AsyncManualResetEvent connectionClosed = new AsyncManualResetEvent();
|
||||
|
||||
using (var server = new MockKubeApiServer(testOutput, async httpContext =>
|
||||
{
|
||||
@@ -507,7 +534,8 @@ namespace k8s.Tests
|
||||
|
||||
errors += 1;
|
||||
eventsReceived.Signal();
|
||||
}
|
||||
},
|
||||
onClosed: connectionClosed.Set
|
||||
);
|
||||
|
||||
// wait server yields all events
|
||||
@@ -528,10 +556,91 @@ namespace k8s.Tests
|
||||
Assert.True(watcher.Watching);
|
||||
|
||||
serverShutdown.Set();
|
||||
|
||||
await Task.WhenAny(connectionClosed.WaitAsync(), Task.Delay(TestTimeout));
|
||||
Assert.True(connectionClosed.IsSet);
|
||||
}
|
||||
}
|
||||
|
||||
[Fact (Skip = "https://github.com/kubernetes-client/csharp/issues/165")]
|
||||
[Fact(Skip = "Integration Test")]
|
||||
public async Task WatcherIntegrationTest()
|
||||
{
|
||||
var kubernetesConfig = KubernetesClientConfiguration.BuildConfigFromConfigFile(kubeconfigPath: @"C:\Users\frede\Source\Repos\cloud\minikube.config");
|
||||
var kubernetes = new Kubernetes(kubernetesConfig);
|
||||
|
||||
var job = await kubernetes.CreateNamespacedJobAsync(
|
||||
new V1Job()
|
||||
{
|
||||
ApiVersion = "batch/v1",
|
||||
Kind = V1Job.KubeKind,
|
||||
Metadata = new V1ObjectMeta()
|
||||
{
|
||||
Name = nameof(WatcherIntegrationTest).ToLowerInvariant()
|
||||
},
|
||||
Spec = new V1JobSpec()
|
||||
{
|
||||
|
||||
Template = new V1PodTemplateSpec()
|
||||
{
|
||||
Spec = new V1PodSpec()
|
||||
{
|
||||
Containers = new List<V1Container>()
|
||||
{
|
||||
new V1Container()
|
||||
{
|
||||
Image = "ubuntu/xenial",
|
||||
Name = "runner",
|
||||
Command = new List<string>()
|
||||
{
|
||||
"/bin/bash",
|
||||
"-c",
|
||||
"--"
|
||||
},
|
||||
Args = new List<string>()
|
||||
{
|
||||
"trap : TERM INT; sleep infinity & wait"
|
||||
}
|
||||
}
|
||||
},
|
||||
RestartPolicy = "Never"
|
||||
},
|
||||
}
|
||||
}
|
||||
},
|
||||
"default");
|
||||
|
||||
Collection<Tuple<WatchEventType, V1Job>> events = new Collection<Tuple<WatchEventType, V1Job>>();
|
||||
|
||||
AsyncManualResetEvent started = new AsyncManualResetEvent();
|
||||
AsyncManualResetEvent connectionClosed = new AsyncManualResetEvent();
|
||||
|
||||
var watcher = await kubernetes.WatchNamespacedJobAsync(
|
||||
job.Metadata.Name,
|
||||
job.Metadata.NamespaceProperty,
|
||||
job.Metadata.ResourceVersion,
|
||||
timeoutSeconds: 30,
|
||||
onEvent:
|
||||
(type, source) =>
|
||||
{
|
||||
Debug.WriteLine($"Watcher 1: {type}, {source}");
|
||||
events.Add(new Tuple<WatchEventType, V1Job>(type, source));
|
||||
job = source;
|
||||
started.Set();
|
||||
},
|
||||
onClosed: connectionClosed.Set).ConfigureAwait(false);
|
||||
|
||||
await started.WaitAsync();
|
||||
|
||||
await Task.WhenAny(connectionClosed.WaitAsync(), Task.Delay(TimeSpan.FromMinutes(3)));
|
||||
Assert.True(connectionClosed.IsSet);
|
||||
|
||||
await kubernetes.DeleteNamespacedJobAsync(
|
||||
new V1DeleteOptions(),
|
||||
job.Metadata.Name,
|
||||
job.Metadata.NamespaceProperty);
|
||||
}
|
||||
|
||||
[Fact(Skip = "https://github.com/kubernetes-client/csharp/issues/165")]
|
||||
public async Task DirectWatchEventsWithTimeout()
|
||||
{
|
||||
AsyncCountdownEvent eventsReceived = new AsyncCountdownEvent(4);
|
||||
|
||||
Reference in New Issue
Block a user