2022-09-16 01:21:21 +02:00
using k8s.Models ;
using k8s.Tests.Mock ;
using Microsoft.AspNetCore.Http ;
using Nito.AsyncEx ;
2018-03-20 16:03:28 +11:00
using System ;
using System.Collections.Generic ;
using System.IO ;
using System.Linq ;
using System.Net ;
using System.Net.Http ;
2021-04-09 13:37:17 -07:00
using System.Text ;
2018-03-20 16:03:28 +11:00
using System.Threading ;
using System.Threading.Tasks ;
using Xunit ;
using Xunit.Abstractions ;
namespace k8s.Tests
{
public class WatchTests
{
private static readonly string MockAddedEventStreamLine = BuildWatchEventStreamLine ( WatchEventType . Added ) ;
private static readonly string MockDeletedStreamLine = BuildWatchEventStreamLine ( WatchEventType . Deleted ) ;
private static readonly string MockModifiedStreamLine = BuildWatchEventStreamLine ( WatchEventType . Modified ) ;
private static readonly string MockErrorStreamLine = BuildWatchEventStreamLine ( WatchEventType . Error ) ;
2020-10-23 08:31:57 -07:00
private const string MockBadStreamLine = "bad json" ;
2018-06-25 16:14:02 +02:00
private static readonly TimeSpan TestTimeout = TimeSpan . FromSeconds ( 150 ) ;
2018-03-20 16:03:28 +11:00
2018-04-28 05:40:47 +02:00
private readonly ITestOutputHelper testOutput ;
public WatchTests ( ITestOutputHelper testOutput )
2018-03-20 16:03:28 +11:00
{
2018-04-28 05:40:47 +02:00
this . testOutput = testOutput ;
2018-03-20 16:03:28 +11:00
}
private static string BuildWatchEventStreamLine ( WatchEventType eventType )
{
2021-12-13 07:31:59 -08:00
var corev1PodList = KubernetesJson . Deserialize < V1PodList > ( MockKubeApiServer . MockPodResponse ) ;
return KubernetesJson . Serialize ( new Watcher < V1Pod > . WatchEvent { Type = eventType , Object = corev1PodList . Items . First ( ) } ) ;
2018-03-20 16:03:28 +11:00
}
private static async Task WriteStreamLine ( HttpContext httpContext , string reponseLine )
{
const string crlf = "\r\n" ;
2020-10-23 08:31:57 -07:00
await httpContext . Response . WriteAsync ( reponseLine . Replace ( crlf , "" ) ) . ConfigureAwait ( false ) ;
await httpContext . Response . WriteAsync ( crlf ) . ConfigureAwait ( false ) ;
await httpContext . Response . Body . FlushAsync ( ) . ConfigureAwait ( false ) ;
2018-03-20 16:03:28 +11:00
}
[Fact]
2018-04-28 05:40:47 +02:00
public async Task CannotWatch ( )
2018-03-20 16:03:28 +11:00
{
2020-11-22 14:52:09 -08:00
using ( var server = new MockKubeApiServer ( testOutput ) )
2018-03-20 16:03:28 +11:00
{
2020-04-23 11:40:06 -07:00
var client = new Kubernetes ( new KubernetesClientConfiguration { Host = server . Uri . ToString ( ) } ) ;
2018-03-20 16:03:28 +11:00
// did not pass watch param
2022-05-07 13:05:17 -07:00
var listTask = client . CoreV1 . ListNamespacedPodWithHttpMessagesAsync ( "default" , watch : true ) ;
2019-10-22 16:02:13 -07:00
var onErrorCalled = false ;
2020-04-23 11:40:06 -07:00
using ( listTask . Watch < V1Pod , V1PodList > ( ( type , item ) = > { } , e = > { onErrorCalled = true ; } ) )
2020-03-22 21:18:45 -07:00
{
2020-11-22 14:52:09 -08:00
await Task . Delay ( TimeSpan . FromSeconds ( 1 ) ) . ConfigureAwait ( false ) ; // delay for onerror to be called
2020-04-23 11:40:06 -07:00
}
2019-10-22 16:02:13 -07:00
Assert . True ( onErrorCalled ) ;
2020-03-22 21:18:45 -07:00
2018-03-20 16:03:28 +11:00
// server did not response line by line
2018-04-28 05:40:47 +02:00
await Assert . ThrowsAnyAsync < Exception > ( ( ) = >
2018-03-20 16:03:28 +11:00
{
2022-05-07 13:05:17 -07:00
return client . CoreV1 . ListNamespacedPodWithHttpMessagesAsync ( "default" , watch : true ) ;
2018-03-20 16:03:28 +11:00
2018-04-28 05:40:47 +02:00
// this line did not throw
// listTask.Watch<Corev1Pod>((type, item) => { });
2020-10-23 08:31:57 -07:00
} ) . ConfigureAwait ( false ) ;
2018-03-20 16:03:28 +11:00
}
}
2019-10-22 16:02:13 -07:00
[Fact]
public async Task AsyncWatcher ( )
{
var created = new AsyncManualResetEvent ( false ) ;
var eventsReceived = new AsyncManualResetEvent ( false ) ;
using ( var server = new MockKubeApiServer ( testOutput , async httpContext = >
{
// block until reponse watcher obj created
2020-10-23 08:31:57 -07:00
await created . WaitAsync ( ) . ConfigureAwait ( false ) ;
await WriteStreamLine ( httpContext , MockAddedEventStreamLine ) . ConfigureAwait ( false ) ;
2019-10-22 16:02:13 -07:00
return false ;
} ) )
{
2020-04-23 11:40:06 -07:00
var client = new Kubernetes ( new KubernetesClientConfiguration { Host = server . Uri . ToString ( ) } ) ;
2019-10-22 16:02:13 -07:00
2022-05-07 13:05:17 -07:00
var listTask = client . CoreV1 . ListNamespacedPodWithHttpMessagesAsync ( "default" , watch : true ) ;
2020-04-23 11:40:06 -07:00
using ( listTask . Watch < V1Pod , V1PodList > ( ( type , item ) = > { eventsReceived . Set ( ) ; } ) )
2019-10-22 16:02:13 -07:00
{
// here watcher is ready to use, but http server has not responsed yet.
created . Set ( ) ;
2020-10-23 08:31:57 -07:00
await Task . WhenAny ( eventsReceived . WaitAsync ( ) , Task . Delay ( TestTimeout ) ) . ConfigureAwait ( false ) ;
2019-10-22 16:02:13 -07:00
}
Assert . True ( eventsReceived . IsSet ) ;
Assert . True ( created . IsSet ) ;
}
}
2018-03-20 16:03:28 +11:00
[Fact]
2021-09-17 08:19:01 -07:00
public async Task SurviveBadLine ( )
2018-03-20 16:03:28 +11:00
{
2020-11-22 14:52:09 -08:00
var eventsReceived = new AsyncCountdownEvent ( 5 ) ;
var serverShutdown = new AsyncManualResetEvent ( ) ;
var connectionClosed = new AsyncManualResetEvent ( ) ;
2018-03-20 16:03:28 +11:00
2018-04-28 05:40:47 +02:00
using ( var server =
new MockKubeApiServer (
testOutput ,
async httpContext = >
{
httpContext . Response . StatusCode = ( int ) HttpStatusCode . OK ;
httpContext . Response . ContentLength = null ;
2020-10-23 08:31:57 -07:00
await WriteStreamLine ( httpContext , MockKubeApiServer . MockPodResponse ) . ConfigureAwait ( false ) ;
await WriteStreamLine ( httpContext , MockBadStreamLine ) . ConfigureAwait ( false ) ;
await WriteStreamLine ( httpContext , MockAddedEventStreamLine ) . ConfigureAwait ( false ) ;
await WriteStreamLine ( httpContext , MockBadStreamLine ) . ConfigureAwait ( false ) ;
await WriteStreamLine ( httpContext , MockModifiedStreamLine ) . ConfigureAwait ( false ) ;
2018-04-28 05:40:47 +02:00
// make server alive, cannot set to int.max as of it would block response
2020-10-23 08:31:57 -07:00
await serverShutdown . WaitAsync ( ) . ConfigureAwait ( false ) ;
2018-04-28 05:40:47 +02:00
return false ;
} ) )
2018-03-20 16:03:28 +11:00
{
2020-04-23 11:40:06 -07:00
var client = new Kubernetes ( new KubernetesClientConfiguration { Host = server . Uri . ToString ( ) } ) ;
2018-03-20 16:03:28 +11:00
2022-05-07 13:05:17 -07:00
var listTask = await client . CoreV1 . ListNamespacedPodWithHttpMessagesAsync ( "default" , watch : true ) . ConfigureAwait ( false ) ;
2018-03-20 16:03:28 +11:00
var events = new HashSet < WatchEventType > ( ) ;
var errors = 0 ;
2019-10-22 16:02:13 -07:00
var watcher = listTask . Watch < V1Pod , V1PodList > (
2018-03-20 16:03:28 +11:00
( type , item ) = >
{
2018-04-28 05:40:47 +02:00
testOutput . WriteLine ( $"Watcher received '{type}' event." ) ;
2018-03-20 16:03:28 +11:00
events . Add ( type ) ;
eventsReceived . Signal ( ) ;
} ,
error = >
{
2018-04-28 05:40:47 +02:00
testOutput . WriteLine ( $"Watcher received '{error.GetType().FullName}' error." ) ;
2018-03-20 16:03:28 +11:00
errors + = 1 ;
eventsReceived . Signal ( ) ;
2018-06-27 23:38:28 +02:00
} ,
2020-11-22 14:52:09 -08:00
connectionClosed . Set ) ;
2018-03-20 16:03:28 +11:00
// wait server yields all events
2020-10-23 08:31:57 -07:00
await Task . WhenAny ( eventsReceived . WaitAsync ( ) , Task . Delay ( TestTimeout ) ) . ConfigureAwait ( false ) ;
2018-04-28 05:40:47 +02:00
2018-03-20 16:03:28 +11:00
Assert . True (
2018-04-28 05:40:47 +02:00
eventsReceived . CurrentCount = = 0 ,
2020-04-23 11:40:06 -07:00
"Timed out waiting for all events / errors to be received." ) ;
2018-03-20 16:03:28 +11:00
Assert . Contains ( WatchEventType . Added , events ) ;
Assert . Contains ( WatchEventType . Modified , events ) ;
2018-07-09 23:51:49 +10:00
Assert . Equal ( 3 , errors ) ;
2018-03-20 16:03:28 +11:00
Assert . True ( watcher . Watching ) ;
2018-04-28 05:40:47 +02:00
// Let the server know it can initiate a shut down.
serverShutdown . Set ( ) ;
2018-06-27 23:38:28 +02:00
2020-10-23 08:31:57 -07:00
await Task . WhenAny ( connectionClosed . WaitAsync ( ) , Task . Delay ( TestTimeout ) ) . ConfigureAwait ( false ) ;
2018-06-27 23:38:28 +02:00
Assert . True ( connectionClosed . IsSet ) ;
2018-03-20 16:03:28 +11:00
}
}
[Fact]
2018-04-28 05:40:47 +02:00
public async Task DisposeWatch ( )
2018-03-20 16:03:28 +11:00
{
2018-06-27 23:38:28 +02:00
var connectionClosed = new AsyncManualResetEvent ( ) ;
2021-09-17 08:19:01 -07:00
var eventsReceived = new AsyncCountdownEvent ( 1 ) ;
var serverShutdown = new AsyncManualResetEvent ( ) ;
2018-04-28 05:40:47 +02:00
using ( var server = new MockKubeApiServer ( testOutput , async httpContext = >
2018-03-20 16:03:28 +11:00
{
2021-09-17 08:19:01 -07:00
await WriteStreamLine ( httpContext , MockAddedEventStreamLine ) . ConfigureAwait ( false ) ;
await serverShutdown . WaitAsync ( ) . ConfigureAwait ( false ) ;
2018-04-28 05:40:47 +02:00
2021-09-17 08:19:01 -07:00
return false ;
2018-03-20 16:03:28 +11:00
} ) )
{
2020-04-23 11:40:06 -07:00
var client = new Kubernetes ( new KubernetesClientConfiguration { Host = server . Uri . ToString ( ) } ) ;
2018-03-20 16:03:28 +11:00
2022-05-07 13:05:17 -07:00
var listTask = await client . CoreV1 . ListNamespacedPodWithHttpMessagesAsync ( "default" , watch : true ) . ConfigureAwait ( false ) ;
2018-03-20 16:03:28 +11:00
var events = new HashSet < WatchEventType > ( ) ;
2019-10-22 16:02:13 -07:00
var watcher = listTask . Watch < V1Pod , V1PodList > (
2018-04-28 05:40:47 +02:00
( type , item ) = >
{
2018-03-30 22:24:22 -07:00
events . Add ( type ) ;
eventsReceived . Signal ( ) ;
2018-06-27 23:38:28 +02:00
} ,
2021-09-17 08:19:01 -07:00
error = >
{
testOutput . WriteLine ( $"Watcher received '{error.GetType().FullName}' error." ) ;
} ,
2020-04-23 11:40:06 -07:00
onClosed : connectionClosed . Set ) ;
2018-03-20 16:03:28 +11:00
// wait at least an event
2021-09-17 08:19:01 -07:00
await Task . WhenAny ( eventsReceived . WaitAsync ( ) , Task . Delay ( TestTimeout ) ) . ConfigureAwait ( false ) ;
2018-03-30 22:24:22 -07:00
Assert . True (
2018-04-28 05:40:47 +02:00
eventsReceived . CurrentCount = = 0 ,
2020-04-23 11:40:06 -07:00
"Timed out waiting for events." ) ;
2018-03-20 16:03:28 +11:00
Assert . NotEmpty ( events ) ;
Assert . True ( watcher . Watching ) ;
watcher . Dispose ( ) ;
events . Clear ( ) ;
2021-09-17 08:19:01 -07:00
await Task . WhenAny ( connectionClosed . WaitAsync ( ) , Task . Delay ( TestTimeout ) ) . ConfigureAwait ( false ) ;
2018-04-28 05:40:47 +02:00
2018-03-20 16:03:28 +11:00
Assert . False ( watcher . Watching ) ;
2018-06-27 23:38:28 +02:00
Assert . True ( connectionClosed . IsSet ) ;
2021-09-17 08:19:01 -07:00
serverShutdown . Set ( ) ;
2018-03-20 16:03:28 +11:00
}
}
[Fact]
2018-04-28 05:40:47 +02:00
public async Task WatchAllEvents ( )
2018-03-20 16:03:28 +11:00
{
2020-11-22 14:52:09 -08:00
var eventsReceived =
2020-04-23 11:40:06 -07:00
new AsyncCountdownEvent ( 4 /* first line of response is eaten by WatcherDelegatingHandler */ ) ;
2020-11-22 14:52:09 -08:00
var serverShutdown = new AsyncManualResetEvent ( ) ;
2018-06-27 23:38:28 +02:00
var waitForClosed = new AsyncManualResetEvent ( false ) ;
2018-04-28 05:40:47 +02:00
using ( var server = new MockKubeApiServer ( testOutput , async httpContext = >
2018-03-20 16:03:28 +11:00
{
2020-10-23 08:31:57 -07:00
await WriteStreamLine ( httpContext , MockAddedEventStreamLine ) . ConfigureAwait ( false ) ;
await WriteStreamLine ( httpContext , MockDeletedStreamLine ) . ConfigureAwait ( false ) ;
await WriteStreamLine ( httpContext , MockModifiedStreamLine ) . ConfigureAwait ( false ) ;
await WriteStreamLine ( httpContext , MockErrorStreamLine ) . ConfigureAwait ( false ) ;
2018-03-20 16:03:28 +11:00
// make server alive, cannot set to int.max as of it would block response
2020-10-23 08:31:57 -07:00
await serverShutdown . WaitAsync ( ) . ConfigureAwait ( false ) ;
2018-03-20 16:03:28 +11:00
return false ;
} ) )
{
2020-04-23 11:40:06 -07:00
var client = new Kubernetes ( new KubernetesClientConfiguration { Host = server . Uri . ToString ( ) } ) ;
2018-03-20 16:03:28 +11:00
2022-05-07 13:05:17 -07:00
var listTask = await client . CoreV1 . ListNamespacedPodWithHttpMessagesAsync ( "default" , watch : true ) . ConfigureAwait ( false ) ;
2018-06-25 16:14:02 +02:00
var events = new HashSet < WatchEventType > ( ) ;
var errors = 0 ;
2019-10-22 16:02:13 -07:00
var watcher = listTask . Watch < V1Pod , V1PodList > (
2018-06-25 16:14:02 +02:00
( type , item ) = >
{
testOutput . WriteLine ( $"Watcher received '{type}' event." ) ;
events . Add ( type ) ;
eventsReceived . Signal ( ) ;
} ,
error = >
{
testOutput . WriteLine ( $"Watcher received '{error.GetType().FullName}' error." ) ;
errors + = 1 ;
eventsReceived . Signal ( ) ;
2018-06-27 23:38:28 +02:00
} ,
2020-11-22 14:52:09 -08:00
waitForClosed . Set ) ;
2018-06-25 16:14:02 +02:00
// wait server yields all events
2020-10-23 08:31:57 -07:00
await Task . WhenAny ( eventsReceived . WaitAsync ( ) , Task . Delay ( TestTimeout ) ) . ConfigureAwait ( false ) ;
2018-06-25 16:14:02 +02:00
Assert . True (
eventsReceived . CurrentCount = = 0 ,
2020-04-23 11:40:06 -07:00
"Timed out waiting for all events / errors to be received." ) ;
2018-06-25 16:14:02 +02:00
Assert . Contains ( WatchEventType . Added , events ) ;
Assert . Contains ( WatchEventType . Deleted , events ) ;
Assert . Contains ( WatchEventType . Modified , events ) ;
Assert . Contains ( WatchEventType . Error , events ) ;
Assert . Equal ( 0 , errors ) ;
Assert . True ( watcher . Watching ) ;
serverShutdown . Set ( ) ;
2018-06-27 23:38:28 +02:00
2020-10-23 08:31:57 -07:00
await Task . WhenAny ( waitForClosed . WaitAsync ( ) , Task . Delay ( TestTimeout ) ) . ConfigureAwait ( false ) ;
2018-06-27 23:38:28 +02:00
Assert . True ( waitForClosed . IsSet ) ;
Assert . False ( watcher . Watching ) ;
2018-06-25 16:14:02 +02:00
}
}
[Fact]
public async Task WatchEventsWithTimeout ( )
{
2020-11-22 14:52:09 -08:00
var eventsReceived = new AsyncCountdownEvent ( 5 ) ;
var serverShutdown = new AsyncManualResetEvent ( ) ;
var connectionClosed = new AsyncManualResetEvent ( ) ;
2018-06-25 16:14:02 +02:00
using ( var server = new MockKubeApiServer ( testOutput , async httpContext = >
{
2020-10-23 08:31:57 -07:00
await WriteStreamLine ( httpContext , MockKubeApiServer . MockPodResponse ) . ConfigureAwait ( false ) ;
await Task . Delay ( TimeSpan . FromSeconds ( 120 ) ) . ConfigureAwait ( false ) ; // The default timeout is 100 seconds
await WriteStreamLine ( httpContext , MockAddedEventStreamLine ) . ConfigureAwait ( false ) ;
await WriteStreamLine ( httpContext , MockDeletedStreamLine ) . ConfigureAwait ( false ) ;
await WriteStreamLine ( httpContext , MockModifiedStreamLine ) . ConfigureAwait ( false ) ;
await WriteStreamLine ( httpContext , MockErrorStreamLine ) . ConfigureAwait ( false ) ;
2018-06-25 16:14:02 +02:00
// make server alive, cannot set to int.max as of it would block response
2020-10-23 08:31:57 -07:00
await serverShutdown . WaitAsync ( ) . ConfigureAwait ( false ) ;
2018-06-25 16:14:02 +02:00
return false ;
} ) )
{
2020-04-23 11:40:06 -07:00
var client = new Kubernetes ( new KubernetesClientConfiguration { Host = server . Uri . ToString ( ) } ) ;
2018-03-20 16:03:28 +11:00
2022-05-07 13:05:17 -07:00
var listTask = await client . CoreV1 . ListNamespacedPodWithHttpMessagesAsync ( "default" , watch : true ) . ConfigureAwait ( false ) ;
2018-03-20 16:03:28 +11:00
var events = new HashSet < WatchEventType > ( ) ;
var errors = 0 ;
2019-10-22 16:02:13 -07:00
var watcher = listTask . Watch < V1Pod , V1PodList > (
2018-03-20 16:03:28 +11:00
( type , item ) = >
{
2018-04-28 05:40:47 +02:00
testOutput . WriteLine ( $"Watcher received '{type}' event." ) ;
2018-03-20 16:03:28 +11:00
events . Add ( type ) ;
eventsReceived . Signal ( ) ;
} ,
error = >
{
2018-04-28 05:40:47 +02:00
testOutput . WriteLine ( $"Watcher received '{error.GetType().FullName}' error." ) ;
2018-03-20 16:03:28 +11:00
errors + = 1 ;
eventsReceived . Signal ( ) ;
2018-06-27 23:38:28 +02:00
} ,
2020-11-22 14:52:09 -08:00
connectionClosed . Set ) ;
2018-03-20 16:03:28 +11:00
// wait server yields all events
2020-10-23 08:31:57 -07:00
await Task . WhenAny ( eventsReceived . WaitAsync ( ) , Task . Delay ( TestTimeout ) ) . ConfigureAwait ( false ) ;
2018-04-28 05:40:47 +02:00
2018-03-20 16:03:28 +11:00
Assert . True (
2018-04-28 05:40:47 +02:00
eventsReceived . CurrentCount = = 0 ,
2020-04-23 11:40:06 -07:00
"Timed out waiting for all events / errors to be received." ) ;
2018-03-20 16:03:28 +11:00
Assert . Contains ( WatchEventType . Added , events ) ;
Assert . Contains ( WatchEventType . Deleted , events ) ;
Assert . Contains ( WatchEventType . Modified , events ) ;
Assert . Contains ( WatchEventType . Error , events ) ;
2018-07-09 23:51:49 +10:00
Assert . Equal ( 1 , errors ) ;
2018-03-20 16:03:28 +11:00
Assert . True ( watcher . Watching ) ;
2018-04-28 05:40:47 +02:00
serverShutdown . Set ( ) ;
2018-06-27 23:38:28 +02:00
2020-10-23 08:31:57 -07:00
await Task . WhenAny ( connectionClosed . WaitAsync ( ) , Task . Delay ( TestTimeout ) ) . ConfigureAwait ( false ) ;
2018-06-27 23:38:28 +02:00
Assert . True ( connectionClosed . IsSet ) ;
2018-03-20 16:03:28 +11:00
}
}
[Fact]
2018-04-28 05:40:47 +02:00
public async Task WatchServerDisconnect ( )
2018-03-20 16:03:28 +11:00
{
Exception exceptionCatched = null ;
2018-04-28 05:40:47 +02:00
var exceptionReceived = new AsyncManualResetEvent ( false ) ;
var waitForException = new AsyncManualResetEvent ( false ) ;
2018-06-27 23:38:28 +02:00
var waitForClosed = new AsyncManualResetEvent ( false ) ;
2018-04-28 05:40:47 +02:00
using ( var server = new MockKubeApiServer ( testOutput , async httpContext = >
2018-03-20 16:03:28 +11:00
{
2020-10-23 08:31:57 -07:00
await WriteStreamLine ( httpContext , MockKubeApiServer . MockPodResponse ) . ConfigureAwait ( false ) ;
await waitForException . WaitAsync ( ) . ConfigureAwait ( false ) ;
2018-03-20 16:03:28 +11:00
throw new IOException ( "server down" ) ;
} ) )
{
2020-04-23 11:40:06 -07:00
var client = new Kubernetes ( new KubernetesClientConfiguration { Host = server . Uri . ToString ( ) } ) ;
2018-03-20 16:03:28 +11:00
2022-05-07 13:05:17 -07:00
var listTask = await client . CoreV1 . ListNamespacedPodWithHttpMessagesAsync ( "default" , watch : true ) . ConfigureAwait ( false ) ;
2018-03-20 16:03:28 +11:00
2018-03-30 22:24:22 -07:00
waitForException . Set ( ) ;
Watcher < V1Pod > watcher ;
2019-10-22 16:02:13 -07:00
watcher = listTask . Watch < V1Pod , V1PodList > (
2020-11-22 14:52:09 -08:00
( type , item ) = > { } ,
e = >
2018-04-28 05:40:47 +02:00
{
2018-03-30 22:24:22 -07:00
exceptionCatched = e ;
exceptionReceived . Set ( ) ;
2018-06-27 23:38:28 +02:00
} ,
2020-11-22 14:52:09 -08:00
waitForClosed . Set ) ;
2018-03-20 16:03:28 +11:00
2018-03-30 22:24:22 -07:00
// wait server down
2020-10-23 08:31:57 -07:00
await Task . WhenAny ( exceptionReceived . WaitAsync ( ) , Task . Delay ( TestTimeout ) ) . ConfigureAwait ( false ) ;
2018-04-28 05:40:47 +02:00
2018-03-30 22:24:22 -07:00
Assert . True (
2018-04-28 05:40:47 +02:00
exceptionReceived . IsSet ,
2020-04-23 11:40:06 -07:00
"Timed out waiting for exception" ) ;
2018-03-20 16:03:28 +11:00
2020-10-23 08:31:57 -07:00
await Task . WhenAny ( waitForClosed . WaitAsync ( ) , Task . Delay ( TestTimeout ) ) . ConfigureAwait ( false ) ;
2018-06-27 23:38:28 +02:00
Assert . True ( waitForClosed . IsSet ) ;
2018-03-30 22:24:22 -07:00
Assert . False ( watcher . Watching ) ;
Assert . IsType < IOException > ( exceptionCatched ) ;
}
2018-03-20 16:03:28 +11:00
}
private class DummyHandler : DelegatingHandler
{
internal bool Called { get ; private set ; }
2020-11-01 12:24:51 -08:00
protected override Task < HttpResponseMessage > SendAsync (
HttpRequestMessage request ,
2018-03-20 16:03:28 +11:00
CancellationToken cancellationToken )
{
Called = true ;
return base . SendAsync ( request , cancellationToken ) ;
}
2017-11-07 20:42:31 +08:00
}
2018-03-20 16:03:28 +11:00
[Fact]
2018-04-28 05:40:47 +02:00
public async Task TestWatchWithHandlers ( )
2018-03-20 16:03:28 +11:00
{
2020-11-22 14:52:09 -08:00
var eventsReceived = new AsyncCountdownEvent ( 1 ) ;
var serverShutdown = new AsyncManualResetEvent ( ) ;
2018-04-28 05:40:47 +02:00
using ( var server = new MockKubeApiServer ( testOutput , async httpContext = >
2018-03-20 16:03:28 +11:00
{
2020-10-23 08:31:57 -07:00
await WriteStreamLine ( httpContext , MockKubeApiServer . MockPodResponse ) . ConfigureAwait ( false ) ;
await WriteStreamLine ( httpContext , MockAddedEventStreamLine ) . ConfigureAwait ( false ) ;
2018-03-20 16:03:28 +11:00
// make server alive, cannot set to int.max as of it would block response
2020-10-23 08:31:57 -07:00
await serverShutdown . WaitAsync ( ) . ConfigureAwait ( false ) ;
2018-03-20 16:03:28 +11:00
return false ;
} ) )
{
var handler1 = new DummyHandler ( ) ;
2017-11-07 20:42:31 +08:00
var handler2 = new DummyHandler ( ) ;
2018-03-20 16:03:28 +11:00
2023-04-04 22:35:39 +02:00
var client = new Kubernetes (
new KubernetesClientConfiguration { Host = server . Uri . ToString ( ) } , handler1 , handler2 ) ;
2018-03-20 16:03:28 +11:00
Assert . False ( handler1 . Called ) ;
2017-11-07 20:42:31 +08:00
Assert . False ( handler2 . Called ) ;
2022-05-07 13:05:17 -07:00
var listTask = await client . CoreV1 . ListNamespacedPodWithHttpMessagesAsync ( "default" , watch : true ) . ConfigureAwait ( false ) ;
2018-03-20 16:03:28 +11:00
var events = new HashSet < WatchEventType > ( ) ;
2019-10-22 16:02:13 -07:00
var watcher = listTask . Watch < V1Pod , V1PodList > (
2018-04-28 05:40:47 +02:00
( type , item ) = >
{
2018-03-30 22:24:22 -07:00
events . Add ( type ) ;
eventsReceived . Signal ( ) ;
2020-04-23 11:40:06 -07:00
} ) ;
2018-03-20 16:03:28 +11:00
// wait server yields all events
2020-10-23 08:31:57 -07:00
await Task . WhenAny ( eventsReceived . WaitAsync ( ) , Task . Delay ( TestTimeout ) ) . ConfigureAwait ( false ) ;
2018-04-28 05:40:47 +02:00
2018-03-30 22:24:22 -07:00
Assert . True (
2018-09-27 10:50:39 -07:00
eventsReceived . CurrentCount = = 0 ,
2020-04-23 11:40:06 -07:00
"Timed out waiting for all events / errors to be received." ) ;
2018-03-20 16:03:28 +11:00
Assert . Contains ( WatchEventType . Added , events ) ;
Assert . True ( handler1 . Called ) ;
2017-11-07 20:42:31 +08:00
Assert . True ( handler2 . Called ) ;
2018-04-28 05:40:47 +02:00
serverShutdown . Set ( ) ;
2018-03-20 16:03:28 +11:00
}
2017-11-07 20:42:31 +08:00
}
2018-06-25 16:14:02 +02:00
[Fact]
public async Task DirectWatchAllEvents ( )
{
2020-11-22 14:52:09 -08:00
var eventsReceived = new AsyncCountdownEvent ( 4 ) ;
var serverShutdown = new AsyncManualResetEvent ( ) ;
var connectionClosed = new AsyncManualResetEvent ( ) ;
2018-06-25 16:14:02 +02:00
using ( var server = new MockKubeApiServer ( testOutput , async httpContext = >
{
2020-10-23 08:31:57 -07:00
await WriteStreamLine ( httpContext , MockAddedEventStreamLine ) . ConfigureAwait ( false ) ;
await WriteStreamLine ( httpContext , MockDeletedStreamLine ) . ConfigureAwait ( false ) ;
await WriteStreamLine ( httpContext , MockModifiedStreamLine ) . ConfigureAwait ( false ) ;
await WriteStreamLine ( httpContext , MockErrorStreamLine ) . ConfigureAwait ( false ) ;
2018-06-25 16:14:02 +02:00
// make server alive, cannot set to int.max as of it would block response
2020-10-23 08:31:57 -07:00
await serverShutdown . WaitAsync ( ) . ConfigureAwait ( false ) ;
2018-06-25 16:14:02 +02:00
return false ;
} ) )
{
2020-04-23 11:40:06 -07:00
var client = new Kubernetes ( new KubernetesClientConfiguration { Host = server . Uri . ToString ( ) } ) ;
2018-06-25 16:14:02 +02:00
var events = new HashSet < WatchEventType > ( ) ;
var errors = 0 ;
2022-05-07 13:05:17 -07:00
var watcher = client . CoreV1 . ListNamespacedPodWithHttpMessagesAsync ( "default" , fieldSelector : $"metadata.name=${" myPod "}" , watch : true ) . Watch < V1Pod , V1PodList > (
2018-06-25 16:14:02 +02:00
onEvent :
2020-04-23 11:40:06 -07:00
( type , item ) = >
{
testOutput . WriteLine ( $"Watcher received '{type}' event." ) ;
2018-06-25 16:14:02 +02:00
2020-04-23 11:40:06 -07:00
events . Add ( type ) ;
eventsReceived . Signal ( ) ;
} ,
2018-06-25 16:14:02 +02:00
onError :
2020-04-23 11:40:06 -07:00
error = >
{
testOutput . WriteLine ( $"Watcher received '{error.GetType().FullName}' error." ) ;
2018-06-25 16:14:02 +02:00
2020-04-23 11:40:06 -07:00
errors + = 1 ;
eventsReceived . Signal ( ) ;
} ,
2021-12-13 07:31:59 -08:00
onClosed : connectionClosed . Set ) ;
2018-06-25 16:14:02 +02:00
// wait server yields all events
2020-10-23 08:31:57 -07:00
await Task . WhenAny ( eventsReceived . WaitAsync ( ) , Task . Delay ( TestTimeout ) ) . ConfigureAwait ( false ) ;
2018-06-25 16:14:02 +02:00
Assert . True (
eventsReceived . CurrentCount = = 0 ,
2020-04-23 11:40:06 -07:00
"Timed out waiting for all events / errors to be received." ) ;
2018-06-25 16:14:02 +02:00
Assert . Contains ( WatchEventType . Added , events ) ;
Assert . Contains ( WatchEventType . Deleted , events ) ;
Assert . Contains ( WatchEventType . Modified , events ) ;
Assert . Contains ( WatchEventType . Error , events ) ;
Assert . Equal ( 0 , errors ) ;
Assert . True ( watcher . Watching ) ;
serverShutdown . Set ( ) ;
2018-06-27 23:38:28 +02:00
2020-10-23 08:31:57 -07:00
await Task . WhenAny ( connectionClosed . WaitAsync ( ) , Task . Delay ( TestTimeout ) ) . ConfigureAwait ( false ) ;
2018-06-27 23:38:28 +02:00
Assert . True ( connectionClosed . IsSet ) ;
2018-06-25 16:14:02 +02:00
}
}
2021-10-11 12:55:02 -07:00
[Fact]
public async Task EnsureTimeoutWorks ( )
{
using var server = new MockKubeApiServer ( testOutput , async httpContext = >
{
await Task . Delay ( TimeSpan . FromSeconds ( 120 ) ) . ConfigureAwait ( false ) ; // The default timeout is 100 seconds
await WriteStreamLine ( httpContext , MockKubeApiServer . MockPodResponse ) . ConfigureAwait ( false ) ;
2018-06-27 23:38:28 +02:00
2021-10-11 12:55:02 -07:00
return false ;
} ) ;
2018-06-27 23:38:28 +02:00
2021-10-11 12:55:02 -07:00
// raw timeout
await Assert . ThrowsAsync < TaskCanceledException > ( async ( ) = >
{
var client = new Kubernetes ( new KubernetesClientConfiguration
{
Host = server . Uri . ToString ( ) ,
HttpClientTimeout = TimeSpan . FromSeconds ( 5 ) ,
} ) ;
2022-05-07 13:05:17 -07:00
await client . CoreV1 . ListNamespacedPodWithHttpMessagesAsync ( "default" ) . ConfigureAwait ( false ) ;
2021-10-11 12:55:02 -07:00
} ) . ConfigureAwait ( false ) ;
// cts
await Assert . ThrowsAsync < TaskCanceledException > ( async ( ) = >
{
var cts = new CancellationTokenSource ( ) ;
cts . CancelAfter ( TimeSpan . FromSeconds ( 5 ) ) ;
var client = new Kubernetes ( new KubernetesClientConfiguration
{
Host = server . Uri . ToString ( ) ,
} ) ;
2022-05-07 13:05:17 -07:00
await client . CoreV1 . ListNamespacedPodWithHttpMessagesAsync ( "default" , cancellationToken : cts . Token ) . ConfigureAwait ( false ) ;
2021-10-11 12:55:02 -07:00
} ) . ConfigureAwait ( false ) ;
}
[Fact]
2018-06-25 16:14:02 +02:00
public async Task DirectWatchEventsWithTimeout ( )
{
2020-11-22 14:52:09 -08:00
var eventsReceived = new AsyncCountdownEvent ( 4 ) ;
var serverShutdown = new AsyncManualResetEvent ( ) ;
2018-06-25 16:14:02 +02:00
using ( var server = new MockKubeApiServer ( testOutput , async httpContext = >
{
2020-10-23 08:31:57 -07:00
await Task . Delay ( TimeSpan . FromSeconds ( 120 ) ) . ConfigureAwait ( false ) ; // The default timeout is 100 seconds
await WriteStreamLine ( httpContext , MockAddedEventStreamLine ) . ConfigureAwait ( false ) ;
await WriteStreamLine ( httpContext , MockDeletedStreamLine ) . ConfigureAwait ( false ) ;
await WriteStreamLine ( httpContext , MockModifiedStreamLine ) . ConfigureAwait ( false ) ;
await WriteStreamLine ( httpContext , MockErrorStreamLine ) . ConfigureAwait ( false ) ;
2018-06-25 16:14:02 +02:00
// make server alive, cannot set to int.max as of it would block response
2020-10-23 08:31:57 -07:00
await serverShutdown . WaitAsync ( ) . ConfigureAwait ( false ) ;
2018-06-25 16:14:02 +02:00
return false ;
} ) )
{
2020-04-23 11:40:06 -07:00
var client = new Kubernetes ( new KubernetesClientConfiguration { Host = server . Uri . ToString ( ) } ) ;
2018-06-25 16:14:02 +02:00
var events = new HashSet < WatchEventType > ( ) ;
var errors = 0 ;
2022-05-07 13:05:17 -07:00
var watcher = client . CoreV1 . ListNamespacedPodWithHttpMessagesAsync ( "default" , fieldSelector : $"metadata.name=${" myPod "}" , watch : true ) . Watch < V1Pod , V1PodList > (
2018-06-25 16:14:02 +02:00
onEvent :
2020-04-23 11:40:06 -07:00
( type , item ) = >
{
testOutput . WriteLine ( $"Watcher received '{type}' event." ) ;
2018-06-25 16:14:02 +02:00
2020-04-23 11:40:06 -07:00
events . Add ( type ) ;
eventsReceived . Signal ( ) ;
} ,
2018-06-25 16:14:02 +02:00
onError :
2020-04-23 11:40:06 -07:00
error = >
{
testOutput . WriteLine ( $"Watcher received '{error.GetType().FullName}' error." ) ;
2018-06-25 16:14:02 +02:00
2020-04-23 11:40:06 -07:00
errors + = 1 ;
eventsReceived . Signal ( ) ;
2021-12-13 07:31:59 -08:00
} ) ;
2018-06-25 16:14:02 +02:00
// wait server yields all events
2020-10-23 08:31:57 -07:00
await Task . WhenAny ( eventsReceived . WaitAsync ( ) , Task . Delay ( TestTimeout ) ) . ConfigureAwait ( false ) ;
2018-06-25 16:14:02 +02:00
Assert . True (
eventsReceived . CurrentCount = = 0 ,
2020-04-23 11:40:06 -07:00
"Timed out waiting for all events / errors to be received." ) ;
2018-06-25 16:14:02 +02:00
Assert . Contains ( WatchEventType . Added , events ) ;
Assert . Contains ( WatchEventType . Deleted , events ) ;
Assert . Contains ( WatchEventType . Modified , events ) ;
Assert . Contains ( WatchEventType . Error , events ) ;
Assert . Equal ( 0 , errors ) ;
Assert . True ( watcher . Watching ) ;
serverShutdown . Set ( ) ;
}
}
2020-03-22 21:18:45 -07:00
[Fact]
public async Task WatchShouldCancelAfterRequested ( )
{
2020-11-22 14:52:09 -08:00
var serverShutdown = new AsyncManualResetEvent ( ) ;
2020-03-22 21:18:45 -07:00
using ( var server = new MockKubeApiServer ( testOutput , async httpContext = >
{
httpContext . Response . StatusCode = 200 ;
2020-10-23 08:31:57 -07:00
await httpContext . Response . Body . FlushAsync ( ) . ConfigureAwait ( false ) ;
await Task . Delay ( TimeSpan . FromSeconds ( 5 ) ) . ConfigureAwait ( false ) ; // The default timeout is 100 seconds
2020-03-22 21:18:45 -07:00
return true ;
} , resp : "" ) )
{
2020-04-23 11:40:06 -07:00
var client = new Kubernetes ( new KubernetesClientConfiguration { Host = server . Uri . ToString ( ) } ) ;
2020-03-22 21:18:45 -07:00
var cts = new CancellationTokenSource ( ) ;
cts . CancelAfter ( TimeSpan . FromSeconds ( 2 ) ) ;
2020-04-03 14:21:51 -06:00
await Assert . ThrowsAnyAsync < OperationCanceledException > ( async ( ) = >
2020-03-22 21:18:45 -07:00
{
2022-05-07 13:05:17 -07:00
await client . CoreV1 . ListNamespacedPodWithHttpMessagesAsync ( "default" , watch : true ,
2020-10-23 08:31:57 -07:00
cancellationToken : cts . Token ) . ConfigureAwait ( false ) ;
} ) . ConfigureAwait ( false ) ;
2020-03-22 21:18:45 -07:00
}
}
2021-04-09 13:37:17 -07:00
[Fact]
public void ReadError ( )
{
var data = Encoding . UTF8 . GetBytes (
"{\"type\":\"ERROR\",\"object\":{\"kind\":\"Status\",\"apiVersion\":\"v1\",\"metadata\":{},\"status\":\"Failure\",\"message\":\"too old resource version: 44982(53593)\",\"reason\":\"Gone\",\"code\":410}}" ) ;
using ( var stream = new MemoryStream ( data ) )
using ( var reader = new StreamReader ( stream ) )
{
Exception recordedException = null ;
var mre = new ManualResetEvent ( false ) ;
var watcher = new Watcher < V1Pod > (
( ) = > Task . FromResult ( reader ) ,
null ,
( exception ) = >
{
recordedException = exception ;
mre . Set ( ) ;
} ) ;
mre . WaitOne ( ) ;
Assert . NotNull ( recordedException ) ;
var k8sException = recordedException as KubernetesException ;
Assert . NotNull ( k8sException ) ;
Assert . NotNull ( k8sException . Status ) ;
Assert . Equal ( "too old resource version: 44982(53593)" , k8sException . Message ) ;
Assert . Equal ( "too old resource version: 44982(53593)" , k8sException . Status . Message ) ;
}
}
2022-04-22 15:15:37 -07:00
private class CheckHeaderDelegatingHandler : DelegatingHandler
{
public Version Version { get ; private set ; }
public CheckHeaderDelegatingHandler ( )
: base ( )
{
}
public CheckHeaderDelegatingHandler ( HttpMessageHandler innerHandler )
: base ( innerHandler )
{
}
protected override Task < HttpResponseMessage > SendAsync ( HttpRequestMessage request , CancellationToken cancellationToken )
{
Version = request . Version ;
return base . SendAsync ( request , cancellationToken ) ;
}
}
[Fact]
public async Task MustHttp2VersionSet ( )
{
var server = new MockKubeApiServer ( testOutput , async httpContext = >
{
await WriteStreamLine ( httpContext , MockAddedEventStreamLine ) . ConfigureAwait ( false ) ;
return false ;
} ) ;
2023-04-04 22:35:39 +02:00
var handler = new CheckHeaderDelegatingHandler ( ) ;
var client = new Kubernetes (
new KubernetesClientConfiguration { Host = server . Uri . ToString ( ) } , handler ) ;
2022-04-22 15:15:37 -07:00
2023-04-04 22:35:39 +02:00
Assert . Null ( handler . Version ) ;
2022-05-07 13:05:17 -07:00
await client . CoreV1 . ListNamespacedPodWithHttpMessagesAsync ( "default" , watch : true ) . ConfigureAwait ( false ) ;
2023-04-04 22:35:39 +02:00
Assert . Equal ( HttpVersion . Version20 , handler . Version ) ;
2022-04-22 15:15:37 -07:00
}
2018-03-20 16:03:28 +11:00
}
2017-11-08 14:22:10 +08:00
}