using k8s.Tests.Logging;
using k8s.Tests.Mock.Server;
using Microsoft.AspNetCore;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Rest;
using System;
using System.IO;
using System.Net.WebSockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Xunit;
using Xunit.Abstractions;
namespace k8s.Tests
{
///
/// The base class for Kubernetes WebSocket test suites.
///
public abstract class WebSocketTestBase : IDisposable
{
///
/// The next server port to use.
///
static int NextPort = 13255;
private readonly ITestOutputHelper testOutput;
///
/// Create a new .
///
///
/// Output for the current test.
///
protected WebSocketTestBase(ITestOutputHelper testOutput)
{
this.testOutput = testOutput;
int port = Interlocked.Increment(ref NextPort);
// Useful to diagnose test timeouts.
TestCancellation.Register(
() => testOutput.WriteLine("Test-level cancellation token has been canceled.")
);
ServerBaseAddress = new Uri($"http://localhost:{port}");
WebSocketBaseAddress = new Uri($"ws://localhost:{port}");
Host = WebHost.CreateDefaultBuilder()
.UseStartup()
.ConfigureServices(ConfigureTestServerServices)
.ConfigureLogging(ConfigureTestServerLogging)
.UseUrls(ServerBaseAddress.AbsoluteUri)
.Build();
}
///
/// The test server's base address (http://).
///
protected Uri ServerBaseAddress { get; }
///
/// The test server's base WebSockets address (ws://).
///
protected Uri WebSocketBaseAddress { get; }
///
/// The test server's web host.
///
protected IWebHost Host { get; }
///
/// Test adapter for accepting web sockets.
///
protected WebSocketTestAdapter WebSocketTestAdapter { get; } = new WebSocketTestAdapter();
///
/// The source for cancellation tokens used by the test.
///
protected CancellationTokenSource CancellationSource { get; } = new CancellationTokenSource();
///
/// A that can be used to cancel asynchronous operations.
///
///
protected CancellationToken TestCancellation => CancellationSource.Token;
///
/// Configure services for the test server.
///
///
/// The service collection to configure.
///
protected virtual void ConfigureTestServerServices(IServiceCollection services)
{
if (services == null)
throw new ArgumentNullException(nameof(services));
// Inject WebSocketTestData.
services.AddSingleton(WebSocketTestAdapter);
}
///
/// Configure logging for the test server.
///
///
/// The logger factory to configure.
///
protected virtual void ConfigureTestServerLogging(ILoggingBuilder logging)
{
if (logging == null)
throw new ArgumentNullException(nameof(logging));
logging.ClearProviders(); // Don't log to console.
logging.AddTestOutput(this.testOutput, LogLevel.Information);
}
///
/// Create a Kubernetes client that uses the test server.
///
///
/// Optional to use for authentication (defaults to anonymous, i.e. no credentials).
///
///
/// The configured client.
///
protected virtual Kubernetes CreateTestClient(ServiceClientCredentials credentials = null)
{
return new Kubernetes(credentials ?? AnonymousClientCredentials.Instance)
{
BaseUri = ServerBaseAddress
};
}
///
/// Asynchronously disconnect client and server WebSockets using the standard handshake.
///
///
/// The client-side .
///
///
/// The server-side .
///
///
/// An optional value indicating the reason for disconnection.
///
/// Defaults to .
///
///
/// An optional textual description of the reason for disconnection.
///
/// Defaults to "Normal Closure".
///
///
/// A representing the asynchronous operation.
///
protected async Task Disconnect(WebSocket clientSocket, WebSocket serverSocket, WebSocketCloseStatus closeStatus = WebSocketCloseStatus.NormalClosure, string closeStatusDescription = "Normal Closure")
{
if (clientSocket == null)
throw new ArgumentNullException(nameof(clientSocket));
if (serverSocket == null)
throw new ArgumentNullException(nameof(serverSocket));
testOutput.WriteLine("Disconnecting...");
// Asynchronously perform the server's half of the handshake (the call to clientSocket.CloseAsync will block until it receives the server-side response).
ArraySegment receiveBuffer = new byte[1024];
Task closeServerSocket = serverSocket.ReceiveAsync(receiveBuffer, TestCancellation)
.ContinueWith(async received =>
{
if (received.IsFaulted)
testOutput.WriteLine("Server socket operation to receive Close message failed: {0}", received.Exception.Flatten().InnerExceptions[0]);
else if (received.IsCanceled)
testOutput.WriteLine("Server socket operation to receive Close message was canceled.");
else
{
testOutput.WriteLine($"Received {received.Result.MessageType} message from server socket (expecting {WebSocketMessageType.Close}).");
if (received.Result.MessageType == WebSocketMessageType.Close)
{
testOutput.WriteLine($"Closing server socket (with status {received.Result.CloseStatus})...");
await serverSocket.CloseAsync(
received.Result.CloseStatus.Value,
received.Result.CloseStatusDescription,
TestCancellation
);
testOutput.WriteLine("Server socket closed.");
}
Assert.Equal(WebSocketMessageType.Close, received.Result.MessageType);
}
});
testOutput.WriteLine("Closing client socket...");
await clientSocket.CloseAsync(closeStatus, closeStatusDescription, TestCancellation).ConfigureAwait(false);
testOutput.WriteLine("Client socket closed.");
await closeServerSocket.ConfigureAwait(false);
testOutput.WriteLine("Disconnected.");
Assert.Equal(closeStatus, clientSocket.CloseStatus);
Assert.Equal(clientSocket.CloseStatus, serverSocket.CloseStatus);
Assert.Equal(closeStatusDescription, clientSocket.CloseStatusDescription);
Assert.Equal(clientSocket.CloseStatusDescription, serverSocket.CloseStatusDescription);
}
///
/// Send text to a multiplexed substream over the specified WebSocket.
///
///
/// The target .
///
///
/// The 0-based index of the target substream.
///
///
/// The text to send.
///
///
/// The number of bytes sent to the WebSocket.
///
protected async Task SendMultiplexed(WebSocket webSocket, byte streamIndex, string text)
{
if (webSocket == null)
throw new ArgumentNullException(nameof(webSocket));
if (text == null)
throw new ArgumentNullException(nameof(text));
byte[] payload = Encoding.ASCII.GetBytes(text);
byte[] sendBuffer = new byte[payload.Length + 1];
sendBuffer[0] = streamIndex;
Array.Copy(payload, 0, sendBuffer, 1, payload.Length);
await webSocket.SendAsync(sendBuffer, WebSocketMessageType.Binary,
endOfMessage: true,
cancellationToken: TestCancellation
);
return sendBuffer.Length;
}
///
/// Receive text from a multiplexed substream over the specified WebSocket.
///
///
/// The target .
///
///
/// The text to send.
///
///
/// A tuple containing the received text, 0-based substream index, and total bytes received.
///
protected async Task<(string text, byte streamIndex, int totalBytes)> ReceiveTextMultiplexed(WebSocket webSocket)
{
if (webSocket == null)
throw new ArgumentNullException(nameof(webSocket));
byte[] receivedData;
using (MemoryStream buffer = new MemoryStream())
{
byte[] receiveBuffer = new byte[1024];
WebSocketReceiveResult receiveResult = await webSocket.ReceiveAsync(receiveBuffer, TestCancellation);
if (receiveResult.MessageType != WebSocketMessageType.Binary)
throw new IOException($"Received unexpected WebSocket message of type '{receiveResult.MessageType}'.");
buffer.Write(receiveBuffer, 0, receiveResult.Count);
while (!receiveResult.EndOfMessage)
{
receiveResult = await webSocket.ReceiveAsync(receiveBuffer, TestCancellation);
buffer.Write(receiveBuffer, 0, receiveResult.Count);
}
buffer.Flush();
receivedData = buffer.ToArray();
}
return (
text: Encoding.ASCII.GetString(receivedData, 1, receivedData.Length - 1),
streamIndex: receivedData[0],
totalBytes: receivedData.Length
);
}
public void Dispose()
{
this.CancellationSource.Dispose();
this.Host.Dispose();
}
///
/// A implementation representing no credentials (i.e. anonymous).
///
protected class AnonymousClientCredentials
: ServiceClientCredentials
{
///
/// The singleton instance of .
///
public static readonly AnonymousClientCredentials Instance = new AnonymousClientCredentials();
///
/// Create new .
///
AnonymousClientCredentials()
{
}
}
///
/// Event Id constants used in WebSocket tests.
///
protected static class EventIds
{
///
/// An error occurred while closing the server-side socket.
///
static readonly EventId ErrorClosingServerSocket = new EventId(1000, nameof(ErrorClosingServerSocket));
}
}
}