Additional watcher tests (#178)
This commit is contained in:
committed by
Brendan Burns
parent
7723604b7e
commit
de916f6390
@@ -26,7 +26,7 @@ namespace k8s.Tests
|
||||
private static readonly string MockModifiedStreamLine = BuildWatchEventStreamLine(WatchEventType.Modified);
|
||||
private static readonly string MockErrorStreamLine = BuildWatchEventStreamLine(WatchEventType.Error);
|
||||
private static readonly string MockBadStreamLine = "bad json";
|
||||
private static readonly TimeSpan TestTimeout = TimeSpan.FromSeconds(15);
|
||||
private static readonly TimeSpan TestTimeout = TimeSpan.FromSeconds(150);
|
||||
|
||||
private readonly ITestOutputHelper testOutput;
|
||||
|
||||
@@ -241,8 +241,75 @@ namespace k8s.Tests
|
||||
Host = server.Uri.ToString()
|
||||
});
|
||||
|
||||
var listTask = client.ListNamespacedPodWithHttpMessagesAsync("default", watch: true).Result;
|
||||
var listTask = await client.ListNamespacedPodWithHttpMessagesAsync("default", watch: true);
|
||||
|
||||
var events = new HashSet<WatchEventType>();
|
||||
var errors = 0;
|
||||
|
||||
var watcher = listTask.Watch<V1Pod>(
|
||||
(type, item) =>
|
||||
{
|
||||
testOutput.WriteLine($"Watcher received '{type}' event.");
|
||||
|
||||
events.Add(type);
|
||||
eventsReceived.Signal();
|
||||
},
|
||||
error =>
|
||||
{
|
||||
testOutput.WriteLine($"Watcher received '{error.GetType().FullName}' error.");
|
||||
|
||||
errors += 1;
|
||||
eventsReceived.Signal();
|
||||
}
|
||||
);
|
||||
|
||||
// wait server yields all events
|
||||
await Task.WhenAny(eventsReceived.WaitAsync(), Task.Delay(TestTimeout));
|
||||
|
||||
Assert.True(
|
||||
eventsReceived.CurrentCount == 0,
|
||||
"Timed out waiting for all events / errors to be received."
|
||||
);
|
||||
|
||||
Assert.Contains(WatchEventType.Added, events);
|
||||
Assert.Contains(WatchEventType.Deleted, events);
|
||||
Assert.Contains(WatchEventType.Modified, events);
|
||||
Assert.Contains(WatchEventType.Error, events);
|
||||
|
||||
Assert.Equal(0, errors);
|
||||
|
||||
Assert.True(watcher.Watching);
|
||||
|
||||
serverShutdown.Set();
|
||||
}
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task WatchEventsWithTimeout()
|
||||
{
|
||||
AsyncCountdownEvent eventsReceived = new AsyncCountdownEvent(4 /* first line of response is eaten by WatcherDelegatingHandler */);
|
||||
AsyncManualResetEvent serverShutdown = new AsyncManualResetEvent();
|
||||
|
||||
using (var server = new MockKubeApiServer(testOutput, async httpContext =>
|
||||
{
|
||||
await WriteStreamLine(httpContext, MockKubeApiServer.MockPodResponse);
|
||||
await Task.Delay(TimeSpan.FromSeconds(120)); // The default timeout is 100 seconds
|
||||
await WriteStreamLine(httpContext, MockAddedEventStreamLine);
|
||||
await WriteStreamLine(httpContext, MockDeletedStreamLine);
|
||||
await WriteStreamLine(httpContext, MockModifiedStreamLine);
|
||||
await WriteStreamLine(httpContext, MockErrorStreamLine);
|
||||
|
||||
// make server alive, cannot set to int.max as of it would block response
|
||||
await serverShutdown.WaitAsync();
|
||||
return false;
|
||||
}))
|
||||
{
|
||||
var client = new Kubernetes(new KubernetesClientConfiguration
|
||||
{
|
||||
Host = server.Uri.ToString()
|
||||
});
|
||||
|
||||
var listTask = await client.ListNamespacedPodWithHttpMessagesAsync("default", watch: true);
|
||||
|
||||
var events = new HashSet<WatchEventType>();
|
||||
var errors = 0;
|
||||
@@ -395,5 +462,143 @@ namespace k8s.Tests
|
||||
serverShutdown.Set();
|
||||
}
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task DirectWatchAllEvents()
|
||||
{
|
||||
AsyncCountdownEvent eventsReceived = new AsyncCountdownEvent(4);
|
||||
AsyncManualResetEvent serverShutdown = new AsyncManualResetEvent();
|
||||
|
||||
using (var server = new MockKubeApiServer(testOutput, async httpContext =>
|
||||
{
|
||||
await WriteStreamLine(httpContext, MockAddedEventStreamLine);
|
||||
await WriteStreamLine(httpContext, MockDeletedStreamLine);
|
||||
await WriteStreamLine(httpContext, MockModifiedStreamLine);
|
||||
await WriteStreamLine(httpContext, MockErrorStreamLine);
|
||||
|
||||
// make server alive, cannot set to int.max as of it would block response
|
||||
await serverShutdown.WaitAsync();
|
||||
return false;
|
||||
}))
|
||||
{
|
||||
var client = new Kubernetes(new KubernetesClientConfiguration
|
||||
{
|
||||
Host = server.Uri.ToString()
|
||||
});
|
||||
|
||||
var events = new HashSet<WatchEventType>();
|
||||
var errors = 0;
|
||||
|
||||
var watcher = await client.WatchNamespacedPodAsync(
|
||||
name: "myPod",
|
||||
@namespace: "default",
|
||||
onEvent:
|
||||
(type, item) =>
|
||||
{
|
||||
testOutput.WriteLine($"Watcher received '{type}' event.");
|
||||
|
||||
events.Add(type);
|
||||
eventsReceived.Signal();
|
||||
},
|
||||
onError:
|
||||
error =>
|
||||
{
|
||||
testOutput.WriteLine($"Watcher received '{error.GetType().FullName}' error.");
|
||||
|
||||
errors += 1;
|
||||
eventsReceived.Signal();
|
||||
}
|
||||
);
|
||||
|
||||
// wait server yields all events
|
||||
await Task.WhenAny(eventsReceived.WaitAsync(), Task.Delay(TestTimeout));
|
||||
|
||||
Assert.True(
|
||||
eventsReceived.CurrentCount == 0,
|
||||
"Timed out waiting for all events / errors to be received."
|
||||
);
|
||||
|
||||
Assert.Contains(WatchEventType.Added, events);
|
||||
Assert.Contains(WatchEventType.Deleted, events);
|
||||
Assert.Contains(WatchEventType.Modified, events);
|
||||
Assert.Contains(WatchEventType.Error, events);
|
||||
|
||||
Assert.Equal(0, errors);
|
||||
|
||||
Assert.True(watcher.Watching);
|
||||
|
||||
serverShutdown.Set();
|
||||
}
|
||||
}
|
||||
|
||||
[Fact (Skip = "https://github.com/kubernetes-client/csharp/issues/165")]
|
||||
public async Task DirectWatchEventsWithTimeout()
|
||||
{
|
||||
AsyncCountdownEvent eventsReceived = new AsyncCountdownEvent(4);
|
||||
AsyncManualResetEvent serverShutdown = new AsyncManualResetEvent();
|
||||
|
||||
using (var server = new MockKubeApiServer(testOutput, async httpContext =>
|
||||
{
|
||||
await Task.Delay(TimeSpan.FromSeconds(120)); // The default timeout is 100 seconds
|
||||
await WriteStreamLine(httpContext, MockAddedEventStreamLine);
|
||||
await WriteStreamLine(httpContext, MockDeletedStreamLine);
|
||||
await WriteStreamLine(httpContext, MockModifiedStreamLine);
|
||||
await WriteStreamLine(httpContext, MockErrorStreamLine);
|
||||
|
||||
// make server alive, cannot set to int.max as of it would block response
|
||||
await serverShutdown.WaitAsync();
|
||||
return false;
|
||||
}))
|
||||
{
|
||||
var client = new Kubernetes(new KubernetesClientConfiguration
|
||||
{
|
||||
Host = server.Uri.ToString()
|
||||
});
|
||||
|
||||
var events = new HashSet<WatchEventType>();
|
||||
var errors = 0;
|
||||
|
||||
var watcher = await client.WatchNamespacedPodAsync(
|
||||
name: "myPod",
|
||||
@namespace: "default",
|
||||
onEvent:
|
||||
(type, item) =>
|
||||
{
|
||||
testOutput.WriteLine($"Watcher received '{type}' event.");
|
||||
|
||||
events.Add(type);
|
||||
eventsReceived.Signal();
|
||||
},
|
||||
onError:
|
||||
error =>
|
||||
{
|
||||
testOutput.WriteLine($"Watcher received '{error.GetType().FullName}' error.");
|
||||
|
||||
errors += 1;
|
||||
eventsReceived.Signal();
|
||||
}
|
||||
);
|
||||
|
||||
// wait server yields all events
|
||||
await Task.WhenAny(eventsReceived.WaitAsync(), Task.Delay(TestTimeout));
|
||||
|
||||
Assert.True(
|
||||
eventsReceived.CurrentCount == 0,
|
||||
"Timed out waiting for all events / errors to be received."
|
||||
);
|
||||
|
||||
Assert.Contains(WatchEventType.Added, events);
|
||||
Assert.Contains(WatchEventType.Deleted, events);
|
||||
Assert.Contains(WatchEventType.Modified, events);
|
||||
Assert.Contains(WatchEventType.Error, events);
|
||||
|
||||
Assert.Equal(0, errors);
|
||||
|
||||
Assert.True(watcher.Watching);
|
||||
|
||||
serverShutdown.Set();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user