commit dc93612024202e651a9cbe4194c1495c823bff12
Author: Boshi Lian <farmer1992@gmail.com>
Date: Fri Oct 9 16:24:33 2020 -0700
fix SA1505
commit dc9fdbc4a4fbce7f4362a24e1ff98be4d27e16a8
Author: Boshi Lian <farmer1992@gmail.com>
Date: Fri Oct 9 16:24:02 2020 -0700
add ()
commit 16fb7357fcd7e288a4b8fb201fda2b0aae92e5bc
Author: Boshi Lian <farmer1992@gmail.com>
Date: Fri Oct 9 16:21:37 2020 -0700
disable SA1117
commit 544a7e5891e853e2e222f855e5446f3fd79ce2ba
Author: Boshi Lian <farmer1992@gmail.com>
Date: Fri Oct 9 16:21:16 2020 -0700
fix SA1508
commit 4e998adf440dda4f13512d1e10f8cb5d5fbc6bd9
Author: Boshi Lian <farmer1992@gmail.com>
Date: Fri Oct 9 16:08:28 2020 -0700
allow sa1623
commit baf787255c657a00a6074598c6875e0ab4c9d065
Author: Boshi Lian <farmer1992@gmail.com>
Date: Fri Oct 9 16:07:23 2020 -0700
fix SA1413
commit 5ef2ca65de62e6c3cbe513902e3954d78f6dc315
Author: Boshi Lian <farmer1992@gmail.com>
Date: Fri Oct 9 16:05:45 2020 -0700
fix SA1413
commit 6cb71f08060b8252a18b01a5788eb2ddcee67c3e
Author: Boshi Lian <farmer1992@gmail.com>
Date: Fri Oct 9 06:44:55 2020 -0700
fix throw stack
commit e6ada0b1cb3aa72df5fcaa0b4690aadcbd4bda5a
Author: Boshi Lian <farmer1992@gmail.com>
Date: Fri Oct 9 06:44:35 2020 -0700
allow CA2225
commit 2e79edec5843c20b7e8f8e9ec5b61cf95284466a
Author: Boshi Lian <farmer1992@gmail.com>
Date: Fri Oct 9 06:35:50 2020 -0700
allow SA1507
commit 108f5a6361f4faa211a8e01f783803295fac0453
Author: Boshi Lian <farmer1992@gmail.com>
Date: Fri Oct 9 06:35:31 2020 -0700
force SA1413
commit 20f33b64972bfafeada513ae1a46a030934673fd
Author: Boshi Lian <farmer1992@gmail.com>
Date: Fri Oct 9 06:30:58 2020 -0700
force SA1413
commit 6b0de102d68a116e149868731e155bc374f56cc8
Author: Boshi Lian <farmer1992@gmail.com>
Date: Fri Oct 9 06:28:33 2020 -0700
fix encoding
commit 4bd8892c2f0e0fa3666e59b0b77f5b23a2e4ca50
Author: Boshi Lian <farmer1992@gmail.com>
Date: Fri Oct 9 06:26:00 2020 -0700
fix xunit order
commit e28556b37ecd782df2d740321e782622ecd277ca
Author: Boshi Lian <farmer1992@gmail.com>
Date: Fri Oct 9 06:10:20 2020 -0700
fix spacing SA1012 SA1004
commit e8cf4b1e0be951babe04cc3674e17718319b8476
Author: Boshi Lian <farmer1992@gmail.com>
Date: Fri Oct 9 06:04:44 2020 -0700
fix SA1211
commit b4164446f7f9d82fb872243e59e3f5c46fbb1f3c
Author: Boshi Lian <farmer1992@gmail.com>
Date: Fri Oct 9 06:02:34 2020 -0700
fix attribute related warning
commit 2f17ef45947f6ade36593ede6ba4d27bd1991508
Author: Boshi Lian <farmer1992@gmail.com>
Date: Fri Oct 9 05:56:53 2020 -0700
allow ca1801 ca1052 ca1054
commit 49b857f3f1b4a44a809c9186108caab0412c101e
Author: Boshi Lian <farmer1992@gmail.com>
Date: Fri Oct 9 05:50:07 2020 -0700
fix SA1001
commit 3389662a32cfc481a3fdf50b6fd651e23aadd9dd
Author: Boshi LIAN <bolian@microsoft.com>
Date: Fri Oct 9 06:24:32 2020 -0700
fix dotnet format
commit f9d55fc925e8a7d2f2b403bd3ae35673068134da
Merge: 8e81532 0d68823
Author: Boshi Lian <farmer1992@gmail.com>
Date: Fri Oct 9 05:44:30 2020 -0700
Merge branch 'master' into style_fix0
commit 8e815324040837714efb323580cc5dcd79e58310
Author: Boshi Lian <farmer1992@gmail.com>
Date: Fri Oct 9 05:33:02 2020 -0700
fix remaing build err
commit ecf0152f9e989c4c68274b488d4b3ed6ee88daf9
Author: Boshi Lian <farmer1992@gmail.com>
Date: Wed Oct 7 05:24:00 2020 -0700
fix SA1707
commit 462d94794848ebfcd102b56a4344ffc33b50f591
Author: Boshi Lian <farmer1992@gmail.com>
Date: Wed Oct 7 05:19:38 2020 -0700
fix underscore naming
commit 5271b113603e469021348523f19555e6be22aebc
Author: Boshi Lian <farmer1992@gmail.com>
Date: Wed Oct 7 05:19:12 2020 -0700
allow CA1822
commit 602713ce631026e88d8ff7e8803bb12c2addc3c2
Author: Boshi Lian <farmer1992@gmail.com>
Date: Wed Oct 7 04:37:16 2020 -0700
fix CA1822
commit bd4fee4d31c1054eadf6d03aa10f443eee9654c0
Author: Boshi Lian <farmer1992@gmail.com>
Date: Wed Oct 7 04:36:36 2020 -0700
fix CA1822
commit 257d461f21ef7df65fbc787d5c42c59a89d0eced
Author: Boshi Lian <farmer1992@gmail.com>
Date: Wed Oct 7 04:34:25 2020 -0700
introduce dispose pattern
commit 1d668c7926f877ea196edb67acbfe9bfeddb9e15
Author: Boshi Lian <farmer1992@gmail.com>
Date: Wed Oct 7 04:23:09 2020 -0700
allow CA2008
commit e4fa6acaf36b84298c8c2ab125ff8aa9efc097b7
Author: Boshi Lian <farmer1992@gmail.com>
Date: Wed Oct 7 04:20:28 2020 -0700
allow CA1827
commit dd931d99fa3a95f936ed566320fffa85efb22838
Author: Boshi Lian <farmer1992@gmail.com>
Date: Wed Oct 7 04:14:35 2020 -0700
allow SA1314 CA1825
commit 13b6cf11df439be8020e17bc5d30addc62f90c39
Author: Boshi Lian <farmer1992@gmail.com>
Date: Wed Oct 7 04:13:51 2020 -0700
Revert "fix CA1825"
This reverts commit 17e03bcd4e0f129a64e57d54fbe72acb7d1d226b.
commit 368664139c75d61ab5a0c432a7fbbdad956c54cf
Author: Boshi Lian <farmer1992@gmail.com>
Date: Wed Oct 7 04:09:52 2020 -0700
move class to single files
commit 0015631805d6bc31e4695881989058bb3955766f
Author: Boshi Lian <farmer1992@gmail.com>
Date: Wed Oct 7 04:09:27 2020 -0700
disable CA2000 / TODO
commit 0a1241e84ba1247c8ab4ab8d32bd5d800114420b
Author: Boshi Lian <farmer1992@gmail.com>
Date: Wed Oct 7 04:07:23 2020 -0700
allow SA1715
commit 17e03bcd4e0f129a64e57d54fbe72acb7d1d226b
Author: Boshi Lian <farmer1992@gmail.com>
Date: Wed Oct 7 04:06:57 2020 -0700
fix CA1825
commit 7baf350ca93cb45e2587d86fb6ab6e4cf665b6da
Author: Boshi Lian <farmer1992@gmail.com>
Date: Wed Oct 7 03:42:04 2020 -0700
fix SA1312 SA1306
commit 44ad5934182adfc871215637e9612295bc26e6f2
Author: Boshi Lian <farmer1992@gmail.com>
Date: Wed Oct 7 03:30:35 2020 -0700
fix CA2007
commit 325fa2c2d16d541db6e21b791c5170f39f832d43
Author: Boshi Lian <farmer1992@gmail.com>
Date: Wed Oct 7 03:25:11 2020 -0700
fix SA1131
commit 8f1f46b065dd7e9b316491676bb0b93ef91d0595
Author: Boshi Lian <farmer1992@gmail.com>
Date: Wed Oct 7 03:17:08 2020 -0700
allow SA1119
commit 57c0fe7cc26932cc30b4d7cc75a809746d74d5aa
Author: Boshi Lian <farmer1992@gmail.com>
Date: Wed Oct 7 03:14:14 2020 -0700
fix SA1400
commit 0afcbbc09d5ef66fbbd4b291d14e7804a8e5a1d3
Author: Boshi Lian <farmer1992@gmail.com>
Date: Wed Oct 7 03:12:18 2020 -0700
fix SA1513
commit 45f2424531d35a2a106e10e788aff1a18d745078
Author: Boshi Lian <farmer1992@gmail.com>
Date: Wed Oct 7 03:09:17 2020 -0700
allow ca1720 ca1716 sa1405
commit 3403814130a1bf730c4e275f74e9cf5d03bedb41
Author: Boshi Lian <farmer1992@gmail.com>
Date: Wed Oct 7 02:16:37 2020 -0700
fix model oper not contains generated header
commit 11377d916cf8cd3ad9109388aff6cf989ff4b7b0
Author: Boshi Lian <farmer1992@gmail.com>
Date: Wed Oct 7 02:14:05 2020 -0700
fix SA1649
commit 92b00051a8c80542a63e1dddbb6eed4e98ad26f9
Author: Boshi Lian <farmer1992@gmail.com>
Date: Wed Oct 7 02:11:16 2020 -0700
fix SA1124
commit 901a9dd2426fa316bcc5a3c2fc411e583f0e07df
Author: Boshi Lian <farmer1992@gmail.com>
Date: Wed Oct 7 02:09:27 2020 -0700
save 1122
commit a8f17b6bac1f1c115b7ed9ebb70d16697a3e81b7
Author: Boshi Lian <farmer1992@gmail.com>
Date: Wed Oct 7 02:09:07 2020 -0700
1507 followup
commit a143184921abb38a09e28a7ef07379003fb19563
Author: Boshi Lian <farmer1992@gmail.com>
Date: Wed Oct 7 02:07:38 2020 -0700
fix sa1507
commit 54b56026265cbbbfa6e5b8b4dcfab281ffbfa272
Author: Boshi Lian <farmer1992@gmail.com>
Date: Wed Oct 7 02:06:44 2020 -0700
fix sa1513
commit 53a009205c88a1d63d8daf32599bbc6428619638
Author: Boshi Lian <farmer1992@gmail.com>
Date: Wed Oct 7 02:05:36 2020 -0700
fix SA1649
commit 26d3e78f61ffc381887baaf5c8b56d92aa0ec563
Author: Boshi Lian <farmer1992@gmail.com>
Date: Wed Oct 7 02:01:01 2020 -0700
fix ca1816
commit 1ce5a04ce7a32d901cbece3e18d59e3c068cfd27
Author: Boshi Lian <farmer1992@gmail.com>
Date: Wed Oct 7 01:56:43 2020 -0700
readable ruleset
commit dafc55f1c2cdc8466919276291333ba46176161a
Author: Boshi Lian <farmer1992@gmail.com>
Date: Wed May 27 19:13:56 2020 -0700
sync none from guideline
724 lines
30 KiB
C#
724 lines
30 KiB
C#
using System;
|
|
using System.Collections.Generic;
|
|
using System.Collections.ObjectModel;
|
|
using System.Diagnostics;
|
|
using System.IO;
|
|
using System.Linq;
|
|
using System.Net;
|
|
using System.Net.Http;
|
|
using System.Threading;
|
|
using System.Threading.Tasks;
|
|
using k8s.Exceptions;
|
|
using k8s.Models;
|
|
using k8s.Tests.Mock;
|
|
using Microsoft.AspNetCore.Http;
|
|
using Microsoft.Extensions.Logging;
|
|
using Newtonsoft.Json;
|
|
using Newtonsoft.Json.Converters;
|
|
using Nito.AsyncEx;
|
|
using Xunit;
|
|
using Xunit.Abstractions;
|
|
|
|
namespace k8s.Tests
|
|
{
|
|
public class WatchTests
|
|
{
|
|
private static readonly string MockAddedEventStreamLine = BuildWatchEventStreamLine(WatchEventType.Added);
|
|
private static readonly string MockDeletedStreamLine = BuildWatchEventStreamLine(WatchEventType.Deleted);
|
|
private static readonly string MockModifiedStreamLine = BuildWatchEventStreamLine(WatchEventType.Modified);
|
|
private static readonly string MockErrorStreamLine = BuildWatchEventStreamLine(WatchEventType.Error);
|
|
private const string MockBadStreamLine = "bad json";
|
|
private static readonly TimeSpan TestTimeout = TimeSpan.FromSeconds(150);
|
|
|
|
private readonly ITestOutputHelper testOutput;
|
|
|
|
public WatchTests(ITestOutputHelper testOutput)
|
|
{
|
|
this.testOutput = testOutput;
|
|
}
|
|
|
|
private static string BuildWatchEventStreamLine(WatchEventType eventType)
|
|
{
|
|
var corev1PodList = JsonConvert.DeserializeObject<V1PodList>(MockKubeApiServer.MockPodResponse);
|
|
return JsonConvert.SerializeObject(
|
|
new Watcher<V1Pod>.WatchEvent { Type = eventType, Object = corev1PodList.Items.First() },
|
|
new StringEnumConverter());
|
|
}
|
|
|
|
private static async Task WriteStreamLine(HttpContext httpContext, string reponseLine)
|
|
{
|
|
const string crlf = "\r\n";
|
|
await httpContext.Response.WriteAsync(reponseLine.Replace(crlf, "")).ConfigureAwait(false);
|
|
await httpContext.Response.WriteAsync(crlf).ConfigureAwait(false);
|
|
await httpContext.Response.Body.FlushAsync().ConfigureAwait(false);
|
|
}
|
|
|
|
[Fact]
|
|
public async Task CannotWatch()
|
|
{
|
|
using (var server = new MockKubeApiServer(testOutput: testOutput))
|
|
{
|
|
var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString() });
|
|
|
|
// did not pass watch param
|
|
var listTask = client.ListNamespacedPodWithHttpMessagesAsync("default");
|
|
var onErrorCalled = false;
|
|
|
|
using (listTask.Watch<V1Pod, V1PodList>((type, item) => { }, e => { onErrorCalled = true; }))
|
|
{
|
|
}
|
|
|
|
await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(false); // delay for onerror to be called
|
|
Assert.True(onErrorCalled);
|
|
|
|
|
|
// server did not response line by line
|
|
await Assert.ThrowsAnyAsync<Exception>(() =>
|
|
{
|
|
return client.ListNamespacedPodWithHttpMessagesAsync("default", watch: true);
|
|
|
|
// this line did not throw
|
|
// listTask.Watch<Corev1Pod>((type, item) => { });
|
|
}).ConfigureAwait(false);
|
|
}
|
|
}
|
|
|
|
[Fact]
|
|
public async Task AsyncWatcher()
|
|
{
|
|
var created = new AsyncManualResetEvent(false);
|
|
var eventsReceived = new AsyncManualResetEvent(false);
|
|
|
|
using (var server = new MockKubeApiServer(testOutput, async httpContext =>
|
|
{
|
|
// block until reponse watcher obj created
|
|
await created.WaitAsync().ConfigureAwait(false);
|
|
await WriteStreamLine(httpContext, MockAddedEventStreamLine).ConfigureAwait(false);
|
|
return false;
|
|
}))
|
|
{
|
|
var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString() });
|
|
|
|
|
|
var listTask = client.ListNamespacedPodWithHttpMessagesAsync("default", watch: true);
|
|
using (listTask.Watch<V1Pod, V1PodList>((type, item) => { eventsReceived.Set(); }))
|
|
{
|
|
// here watcher is ready to use, but http server has not responsed yet.
|
|
created.Set();
|
|
await Task.WhenAny(eventsReceived.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(false);
|
|
}
|
|
|
|
Assert.True(eventsReceived.IsSet);
|
|
Assert.True(created.IsSet);
|
|
}
|
|
}
|
|
|
|
[Fact]
|
|
public async Task SuriveBadLine()
|
|
{
|
|
AsyncCountdownEvent eventsReceived = new AsyncCountdownEvent(5);
|
|
AsyncManualResetEvent serverShutdown = new AsyncManualResetEvent();
|
|
AsyncManualResetEvent connectionClosed = new AsyncManualResetEvent();
|
|
|
|
using (var server =
|
|
new MockKubeApiServer(
|
|
testOutput,
|
|
async httpContext =>
|
|
{
|
|
httpContext.Response.StatusCode = (int)HttpStatusCode.OK;
|
|
httpContext.Response.ContentLength = null;
|
|
|
|
await WriteStreamLine(httpContext, MockKubeApiServer.MockPodResponse).ConfigureAwait(false);
|
|
await WriteStreamLine(httpContext, MockBadStreamLine).ConfigureAwait(false);
|
|
await WriteStreamLine(httpContext, MockAddedEventStreamLine).ConfigureAwait(false);
|
|
await WriteStreamLine(httpContext, MockBadStreamLine).ConfigureAwait(false);
|
|
await WriteStreamLine(httpContext, MockModifiedStreamLine).ConfigureAwait(false);
|
|
|
|
// make server alive, cannot set to int.max as of it would block response
|
|
await serverShutdown.WaitAsync().ConfigureAwait(false);
|
|
return false;
|
|
}))
|
|
{
|
|
var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString() });
|
|
|
|
var listTask = await client.ListNamespacedPodWithHttpMessagesAsync("default", watch: true).ConfigureAwait(false);
|
|
|
|
var events = new HashSet<WatchEventType>();
|
|
var errors = 0;
|
|
|
|
var watcher = listTask.Watch<V1Pod, V1PodList>(
|
|
(type, item) =>
|
|
{
|
|
testOutput.WriteLine($"Watcher received '{type}' event.");
|
|
|
|
events.Add(type);
|
|
eventsReceived.Signal();
|
|
},
|
|
error =>
|
|
{
|
|
testOutput.WriteLine($"Watcher received '{error.GetType().FullName}' error.");
|
|
|
|
errors += 1;
|
|
eventsReceived.Signal();
|
|
},
|
|
onClosed: connectionClosed.Set);
|
|
|
|
// wait server yields all events
|
|
await Task.WhenAny(eventsReceived.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(false);
|
|
|
|
Assert.True(
|
|
eventsReceived.CurrentCount == 0,
|
|
"Timed out waiting for all events / errors to be received.");
|
|
|
|
Assert.Contains(WatchEventType.Added, events);
|
|
Assert.Contains(WatchEventType.Modified, events);
|
|
|
|
Assert.Equal(3, errors);
|
|
|
|
Assert.True(watcher.Watching);
|
|
|
|
// Let the server know it can initiate a shut down.
|
|
serverShutdown.Set();
|
|
|
|
await Task.WhenAny(connectionClosed.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(false);
|
|
Assert.True(connectionClosed.IsSet);
|
|
}
|
|
}
|
|
|
|
[Fact]
|
|
public async Task DisposeWatch()
|
|
{
|
|
var connectionClosed = new AsyncManualResetEvent();
|
|
var eventsReceived = new CountdownEvent(1);
|
|
bool serverRunning = true;
|
|
|
|
using (var server = new MockKubeApiServer(testOutput, async httpContext =>
|
|
{
|
|
await WriteStreamLine(httpContext, MockKubeApiServer.MockPodResponse).ConfigureAwait(false);
|
|
|
|
while (serverRunning)
|
|
{
|
|
await WriteStreamLine(httpContext, MockAddedEventStreamLine).ConfigureAwait(false);
|
|
}
|
|
|
|
return true;
|
|
}))
|
|
{
|
|
var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString() });
|
|
|
|
var listTask = await client.ListNamespacedPodWithHttpMessagesAsync("default", watch: true).ConfigureAwait(false);
|
|
|
|
var events = new HashSet<WatchEventType>();
|
|
|
|
var watcher = listTask.Watch<V1Pod, V1PodList>(
|
|
(type, item) =>
|
|
{
|
|
events.Add(type);
|
|
eventsReceived.Signal();
|
|
},
|
|
onClosed: connectionClosed.Set);
|
|
|
|
// wait at least an event
|
|
await Task.WhenAny(Task.Run(() => eventsReceived.Wait()), Task.Delay(TestTimeout)).ConfigureAwait(false);
|
|
Assert.True(
|
|
eventsReceived.CurrentCount == 0,
|
|
"Timed out waiting for events.");
|
|
|
|
Assert.NotEmpty(events);
|
|
Assert.True(watcher.Watching);
|
|
|
|
watcher.Dispose();
|
|
|
|
events.Clear();
|
|
|
|
// Let the server disconnect
|
|
serverRunning = false;
|
|
|
|
var timeout = Task.Delay(TestTimeout);
|
|
|
|
while (!timeout.IsCompleted && watcher.Watching)
|
|
{
|
|
await Task.Yield();
|
|
}
|
|
|
|
Assert.False(watcher.Watching);
|
|
Assert.True(connectionClosed.IsSet);
|
|
}
|
|
}
|
|
|
|
[Fact]
|
|
public async Task WatchAllEvents()
|
|
{
|
|
AsyncCountdownEvent eventsReceived =
|
|
new AsyncCountdownEvent(4 /* first line of response is eaten by WatcherDelegatingHandler */);
|
|
AsyncManualResetEvent serverShutdown = new AsyncManualResetEvent();
|
|
var waitForClosed = new AsyncManualResetEvent(false);
|
|
|
|
using (var server = new MockKubeApiServer(testOutput, async httpContext =>
|
|
{
|
|
await WriteStreamLine(httpContext, MockAddedEventStreamLine).ConfigureAwait(false);
|
|
await WriteStreamLine(httpContext, MockDeletedStreamLine).ConfigureAwait(false);
|
|
await WriteStreamLine(httpContext, MockModifiedStreamLine).ConfigureAwait(false);
|
|
await WriteStreamLine(httpContext, MockErrorStreamLine).ConfigureAwait(false);
|
|
|
|
// make server alive, cannot set to int.max as of it would block response
|
|
await serverShutdown.WaitAsync().ConfigureAwait(false);
|
|
return false;
|
|
}))
|
|
{
|
|
var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString() });
|
|
|
|
var listTask = await client.ListNamespacedPodWithHttpMessagesAsync("default", watch: true).ConfigureAwait(false);
|
|
|
|
var events = new HashSet<WatchEventType>();
|
|
var errors = 0;
|
|
|
|
var watcher = listTask.Watch<V1Pod, V1PodList>(
|
|
(type, item) =>
|
|
{
|
|
testOutput.WriteLine($"Watcher received '{type}' event.");
|
|
|
|
events.Add(type);
|
|
eventsReceived.Signal();
|
|
},
|
|
error =>
|
|
{
|
|
testOutput.WriteLine($"Watcher received '{error.GetType().FullName}' error.");
|
|
|
|
errors += 1;
|
|
eventsReceived.Signal();
|
|
},
|
|
onClosed: waitForClosed.Set);
|
|
|
|
// wait server yields all events
|
|
await Task.WhenAny(eventsReceived.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(false);
|
|
|
|
Assert.True(
|
|
eventsReceived.CurrentCount == 0,
|
|
"Timed out waiting for all events / errors to be received.");
|
|
|
|
Assert.Contains(WatchEventType.Added, events);
|
|
Assert.Contains(WatchEventType.Deleted, events);
|
|
Assert.Contains(WatchEventType.Modified, events);
|
|
Assert.Contains(WatchEventType.Error, events);
|
|
|
|
Assert.Equal(0, errors);
|
|
|
|
Assert.True(watcher.Watching);
|
|
|
|
serverShutdown.Set();
|
|
|
|
await Task.WhenAny(waitForClosed.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(false);
|
|
Assert.True(waitForClosed.IsSet);
|
|
Assert.False(watcher.Watching);
|
|
}
|
|
}
|
|
|
|
[Fact]
|
|
public async Task WatchEventsWithTimeout()
|
|
{
|
|
AsyncCountdownEvent eventsReceived = new AsyncCountdownEvent(5);
|
|
AsyncManualResetEvent serverShutdown = new AsyncManualResetEvent();
|
|
AsyncManualResetEvent connectionClosed = new AsyncManualResetEvent();
|
|
|
|
using (var server = new MockKubeApiServer(testOutput, async httpContext =>
|
|
{
|
|
await WriteStreamLine(httpContext, MockKubeApiServer.MockPodResponse).ConfigureAwait(false);
|
|
await Task.Delay(TimeSpan.FromSeconds(120)).ConfigureAwait(false); // The default timeout is 100 seconds
|
|
await WriteStreamLine(httpContext, MockAddedEventStreamLine).ConfigureAwait(false);
|
|
await WriteStreamLine(httpContext, MockDeletedStreamLine).ConfigureAwait(false);
|
|
await WriteStreamLine(httpContext, MockModifiedStreamLine).ConfigureAwait(false);
|
|
await WriteStreamLine(httpContext, MockErrorStreamLine).ConfigureAwait(false);
|
|
|
|
// make server alive, cannot set to int.max as of it would block response
|
|
await serverShutdown.WaitAsync().ConfigureAwait(false);
|
|
return false;
|
|
}))
|
|
{
|
|
var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString() });
|
|
|
|
var listTask = await client.ListNamespacedPodWithHttpMessagesAsync("default", watch: true).ConfigureAwait(false);
|
|
|
|
var events = new HashSet<WatchEventType>();
|
|
var errors = 0;
|
|
|
|
var watcher = listTask.Watch<V1Pod, V1PodList>(
|
|
(type, item) =>
|
|
{
|
|
testOutput.WriteLine($"Watcher received '{type}' event.");
|
|
|
|
events.Add(type);
|
|
eventsReceived.Signal();
|
|
},
|
|
error =>
|
|
{
|
|
testOutput.WriteLine($"Watcher received '{error.GetType().FullName}' error.");
|
|
|
|
errors += 1;
|
|
eventsReceived.Signal();
|
|
},
|
|
onClosed: connectionClosed.Set);
|
|
|
|
// wait server yields all events
|
|
await Task.WhenAny(eventsReceived.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(false);
|
|
|
|
Assert.True(
|
|
eventsReceived.CurrentCount == 0,
|
|
"Timed out waiting for all events / errors to be received.");
|
|
|
|
Assert.Contains(WatchEventType.Added, events);
|
|
Assert.Contains(WatchEventType.Deleted, events);
|
|
Assert.Contains(WatchEventType.Modified, events);
|
|
Assert.Contains(WatchEventType.Error, events);
|
|
|
|
Assert.Equal(1, errors);
|
|
|
|
Assert.True(watcher.Watching);
|
|
|
|
serverShutdown.Set();
|
|
|
|
await Task.WhenAny(connectionClosed.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(false);
|
|
Assert.True(connectionClosed.IsSet);
|
|
}
|
|
}
|
|
|
|
[Fact]
|
|
public async Task WatchServerDisconnect()
|
|
{
|
|
Exception exceptionCatched = null;
|
|
var exceptionReceived = new AsyncManualResetEvent(false);
|
|
var waitForException = new AsyncManualResetEvent(false);
|
|
var waitForClosed = new AsyncManualResetEvent(false);
|
|
|
|
using (var server = new MockKubeApiServer(testOutput, async httpContext =>
|
|
{
|
|
await WriteStreamLine(httpContext, MockKubeApiServer.MockPodResponse).ConfigureAwait(false);
|
|
await waitForException.WaitAsync().ConfigureAwait(false);
|
|
throw new IOException("server down");
|
|
}))
|
|
{
|
|
var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString() });
|
|
|
|
var listTask = await client.ListNamespacedPodWithHttpMessagesAsync("default", watch: true).ConfigureAwait(false);
|
|
|
|
waitForException.Set();
|
|
Watcher<V1Pod> watcher;
|
|
watcher = listTask.Watch<V1Pod, V1PodList>(
|
|
onEvent: (type, item) => { },
|
|
onError: e =>
|
|
{
|
|
exceptionCatched = e;
|
|
exceptionReceived.Set();
|
|
},
|
|
onClosed: waitForClosed.Set);
|
|
|
|
// wait server down
|
|
await Task.WhenAny(exceptionReceived.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(false);
|
|
|
|
Assert.True(
|
|
exceptionReceived.IsSet,
|
|
"Timed out waiting for exception");
|
|
|
|
await Task.WhenAny(waitForClosed.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(false);
|
|
Assert.True(waitForClosed.IsSet);
|
|
Assert.False(watcher.Watching);
|
|
Assert.IsType<IOException>(exceptionCatched);
|
|
}
|
|
}
|
|
|
|
private class DummyHandler : DelegatingHandler
|
|
{
|
|
internal bool Called { get; private set; }
|
|
|
|
protected override Task<HttpResponseMessage> SendAsync(HttpRequestMessage request,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
Called = true;
|
|
return base.SendAsync(request, cancellationToken);
|
|
}
|
|
}
|
|
|
|
[Fact]
|
|
public async Task TestWatchWithHandlers()
|
|
{
|
|
AsyncCountdownEvent eventsReceived = new AsyncCountdownEvent(1);
|
|
AsyncManualResetEvent serverShutdown = new AsyncManualResetEvent();
|
|
|
|
using (var server = new MockKubeApiServer(testOutput, async httpContext =>
|
|
{
|
|
await WriteStreamLine(httpContext, MockKubeApiServer.MockPodResponse).ConfigureAwait(false);
|
|
await WriteStreamLine(httpContext, MockAddedEventStreamLine).ConfigureAwait(false);
|
|
|
|
// make server alive, cannot set to int.max as of it would block response
|
|
await serverShutdown.WaitAsync().ConfigureAwait(false);
|
|
return false;
|
|
}))
|
|
{
|
|
var handler1 = new DummyHandler();
|
|
var handler2 = new DummyHandler();
|
|
|
|
var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString() }, handler1,
|
|
handler2);
|
|
|
|
Assert.False(handler1.Called);
|
|
Assert.False(handler2.Called);
|
|
|
|
var listTask = await client.ListNamespacedPodWithHttpMessagesAsync("default", watch: true).ConfigureAwait(false);
|
|
|
|
var events = new HashSet<WatchEventType>();
|
|
|
|
var watcher = listTask.Watch<V1Pod, V1PodList>(
|
|
(type, item) =>
|
|
{
|
|
events.Add(type);
|
|
eventsReceived.Signal();
|
|
});
|
|
|
|
// wait server yields all events
|
|
await Task.WhenAny(eventsReceived.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(false);
|
|
|
|
Assert.True(
|
|
eventsReceived.CurrentCount == 0,
|
|
"Timed out waiting for all events / errors to be received.");
|
|
|
|
Assert.Contains(WatchEventType.Added, events);
|
|
|
|
Assert.True(handler1.Called);
|
|
Assert.True(handler2.Called);
|
|
|
|
serverShutdown.Set();
|
|
}
|
|
}
|
|
|
|
[Fact]
|
|
public async Task DirectWatchAllEvents()
|
|
{
|
|
AsyncCountdownEvent eventsReceived = new AsyncCountdownEvent(4);
|
|
AsyncManualResetEvent serverShutdown = new AsyncManualResetEvent();
|
|
AsyncManualResetEvent connectionClosed = new AsyncManualResetEvent();
|
|
|
|
using (var server = new MockKubeApiServer(testOutput, async httpContext =>
|
|
{
|
|
await WriteStreamLine(httpContext, MockAddedEventStreamLine).ConfigureAwait(false);
|
|
await WriteStreamLine(httpContext, MockDeletedStreamLine).ConfigureAwait(false);
|
|
await WriteStreamLine(httpContext, MockModifiedStreamLine).ConfigureAwait(false);
|
|
await WriteStreamLine(httpContext, MockErrorStreamLine).ConfigureAwait(false);
|
|
|
|
// make server alive, cannot set to int.max as of it would block response
|
|
await serverShutdown.WaitAsync().ConfigureAwait(false);
|
|
return false;
|
|
}))
|
|
{
|
|
var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString() });
|
|
|
|
var events = new HashSet<WatchEventType>();
|
|
var errors = 0;
|
|
|
|
var watcher = await client.WatchNamespacedPodAsync(
|
|
name: "myPod",
|
|
@namespace: "default",
|
|
onEvent:
|
|
(type, item) =>
|
|
{
|
|
testOutput.WriteLine($"Watcher received '{type}' event.");
|
|
|
|
events.Add(type);
|
|
eventsReceived.Signal();
|
|
},
|
|
onError:
|
|
error =>
|
|
{
|
|
testOutput.WriteLine($"Watcher received '{error.GetType().FullName}' error.");
|
|
|
|
errors += 1;
|
|
eventsReceived.Signal();
|
|
},
|
|
onClosed: connectionClosed.Set).ConfigureAwait(false);
|
|
|
|
// wait server yields all events
|
|
await Task.WhenAny(eventsReceived.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(false);
|
|
|
|
Assert.True(
|
|
eventsReceived.CurrentCount == 0,
|
|
"Timed out waiting for all events / errors to be received.");
|
|
|
|
Assert.Contains(WatchEventType.Added, events);
|
|
Assert.Contains(WatchEventType.Deleted, events);
|
|
Assert.Contains(WatchEventType.Modified, events);
|
|
Assert.Contains(WatchEventType.Error, events);
|
|
|
|
Assert.Equal(0, errors);
|
|
|
|
Assert.True(watcher.Watching);
|
|
|
|
serverShutdown.Set();
|
|
|
|
await Task.WhenAny(connectionClosed.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(false);
|
|
Assert.True(connectionClosed.IsSet);
|
|
}
|
|
}
|
|
|
|
[Fact(Skip = "Integration Test")]
|
|
public async Task WatcherIntegrationTest()
|
|
{
|
|
var kubernetesConfig =
|
|
KubernetesClientConfiguration.BuildConfigFromConfigFile(
|
|
kubeconfigPath: @"C:\Users\frede\Source\Repos\cloud\minikube.config");
|
|
var kubernetes = new Kubernetes(kubernetesConfig);
|
|
|
|
var job = await kubernetes.CreateNamespacedJobAsync(
|
|
new V1Job()
|
|
{
|
|
ApiVersion = "batch/v1",
|
|
Kind = V1Job.KubeKind,
|
|
Metadata = new V1ObjectMeta() { Name = nameof(WatcherIntegrationTest).ToLowerInvariant() },
|
|
Spec = new V1JobSpec()
|
|
{
|
|
Template = new V1PodTemplateSpec()
|
|
{
|
|
Spec = new V1PodSpec()
|
|
{
|
|
Containers = new List<V1Container>()
|
|
{
|
|
new V1Container()
|
|
{
|
|
Image = "ubuntu/xenial",
|
|
Name = "runner",
|
|
Command = new List<string>() { "/bin/bash", "-c", "--" },
|
|
Args = new List<string>()
|
|
{
|
|
"trap : TERM INT; sleep infinity & wait",
|
|
},
|
|
},
|
|
},
|
|
RestartPolicy = "Never",
|
|
},
|
|
},
|
|
},
|
|
},
|
|
"default").ConfigureAwait(false);
|
|
|
|
Collection<Tuple<WatchEventType, V1Job>> events = new Collection<Tuple<WatchEventType, V1Job>>();
|
|
|
|
AsyncManualResetEvent started = new AsyncManualResetEvent();
|
|
AsyncManualResetEvent connectionClosed = new AsyncManualResetEvent();
|
|
|
|
var watcher = await kubernetes.WatchNamespacedJobAsync(
|
|
job.Metadata.Name,
|
|
job.Metadata.NamespaceProperty,
|
|
resourceVersion: job.Metadata.ResourceVersion,
|
|
timeoutSeconds: 30,
|
|
onEvent:
|
|
(type, source) =>
|
|
{
|
|
Debug.WriteLine($"Watcher 1: {type}, {source}");
|
|
events.Add(new Tuple<WatchEventType, V1Job>(type, source));
|
|
job = source;
|
|
started.Set();
|
|
},
|
|
onClosed: connectionClosed.Set).ConfigureAwait(false);
|
|
|
|
await started.WaitAsync().ConfigureAwait(false);
|
|
|
|
await Task.WhenAny(connectionClosed.WaitAsync(), Task.Delay(TimeSpan.FromMinutes(3))).ConfigureAwait(false);
|
|
Assert.True(connectionClosed.IsSet);
|
|
|
|
await kubernetes.DeleteNamespacedJobAsync(
|
|
job.Metadata.Name,
|
|
job.Metadata.NamespaceProperty,
|
|
new V1DeleteOptions()).ConfigureAwait(false);
|
|
}
|
|
|
|
[Fact(Skip = "https://github.com/kubernetes-client/csharp/issues/165")]
|
|
public async Task DirectWatchEventsWithTimeout()
|
|
{
|
|
AsyncCountdownEvent eventsReceived = new AsyncCountdownEvent(4);
|
|
AsyncManualResetEvent serverShutdown = new AsyncManualResetEvent();
|
|
|
|
using (var server = new MockKubeApiServer(testOutput, async httpContext =>
|
|
{
|
|
await Task.Delay(TimeSpan.FromSeconds(120)).ConfigureAwait(false); // The default timeout is 100 seconds
|
|
await WriteStreamLine(httpContext, MockAddedEventStreamLine).ConfigureAwait(false);
|
|
await WriteStreamLine(httpContext, MockDeletedStreamLine).ConfigureAwait(false);
|
|
await WriteStreamLine(httpContext, MockModifiedStreamLine).ConfigureAwait(false);
|
|
await WriteStreamLine(httpContext, MockErrorStreamLine).ConfigureAwait(false);
|
|
|
|
// make server alive, cannot set to int.max as of it would block response
|
|
await serverShutdown.WaitAsync().ConfigureAwait(false);
|
|
return false;
|
|
}))
|
|
{
|
|
var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString() });
|
|
|
|
var events = new HashSet<WatchEventType>();
|
|
var errors = 0;
|
|
|
|
var watcher = await client.WatchNamespacedPodAsync(
|
|
name: "myPod",
|
|
@namespace: "default",
|
|
onEvent:
|
|
(type, item) =>
|
|
{
|
|
testOutput.WriteLine($"Watcher received '{type}' event.");
|
|
|
|
events.Add(type);
|
|
eventsReceived.Signal();
|
|
},
|
|
onError:
|
|
error =>
|
|
{
|
|
testOutput.WriteLine($"Watcher received '{error.GetType().FullName}' error.");
|
|
|
|
errors += 1;
|
|
eventsReceived.Signal();
|
|
}).ConfigureAwait(false);
|
|
|
|
// wait server yields all events
|
|
await Task.WhenAny(eventsReceived.WaitAsync(), Task.Delay(TestTimeout)).ConfigureAwait(false);
|
|
|
|
Assert.True(
|
|
eventsReceived.CurrentCount == 0,
|
|
"Timed out waiting for all events / errors to be received.");
|
|
|
|
Assert.Contains(WatchEventType.Added, events);
|
|
Assert.Contains(WatchEventType.Deleted, events);
|
|
Assert.Contains(WatchEventType.Modified, events);
|
|
Assert.Contains(WatchEventType.Error, events);
|
|
|
|
Assert.Equal(0, errors);
|
|
|
|
Assert.True(watcher.Watching);
|
|
|
|
serverShutdown.Set();
|
|
}
|
|
}
|
|
|
|
[Fact]
|
|
public async Task WatchShouldCancelAfterRequested()
|
|
{
|
|
AsyncManualResetEvent serverShutdown = new AsyncManualResetEvent();
|
|
|
|
using (var server = new MockKubeApiServer(testOutput, async httpContext =>
|
|
{
|
|
httpContext.Response.StatusCode = 200;
|
|
await httpContext.Response.Body.FlushAsync().ConfigureAwait(false);
|
|
await Task.Delay(TimeSpan.FromSeconds(5)).ConfigureAwait(false); // The default timeout is 100 seconds
|
|
|
|
return true;
|
|
}, resp: ""))
|
|
{
|
|
var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString() });
|
|
|
|
var cts = new CancellationTokenSource();
|
|
cts.CancelAfter(TimeSpan.FromSeconds(2));
|
|
|
|
await Assert.ThrowsAnyAsync<OperationCanceledException>(async () =>
|
|
{
|
|
await client.ListNamespacedPodWithHttpMessagesAsync("default", watch: true,
|
|
cancellationToken: cts.Token).ConfigureAwait(false);
|
|
}).ConfigureAwait(false);
|
|
}
|
|
}
|
|
}
|
|
}
|