using k8s.Authentication; using k8s.Tests.Logging; using k8s.Tests.Mock.Server; using Microsoft.AspNetCore; using Microsoft.AspNetCore.Hosting; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using System; using System.IO; using System.Net.WebSockets; using System.Text; using System.Threading; using System.Threading.Tasks; using Xunit; using Xunit.Abstractions; namespace k8s.Tests { /// /// The base class for Kubernetes WebSocket test suites. /// public abstract class WebSocketTestBase : IDisposable { /// /// The next server port to use. /// private static int nextPort = 13255; private bool disposedValue; private readonly ITestOutputHelper testOutput; /// /// Initializes a new instance of the class. /// Create a new . /// /// /// Output for the current test. /// protected WebSocketTestBase(ITestOutputHelper testOutput) { this.testOutput = testOutput; int port = Interlocked.Increment(ref nextPort); // Useful to diagnose test timeouts. TestCancellation.Register( () => testOutput.WriteLine("Test-level cancellation token has been canceled.")); ServerBaseAddress = new Uri($"http://localhost:{port}"); WebSocketBaseAddress = new Uri($"ws://localhost:{port}"); Host = WebHost.CreateDefaultBuilder() .UseStartup() .ConfigureServices(ConfigureTestServerServices) .ConfigureLogging(ConfigureTestServerLogging) .UseUrls(ServerBaseAddress.AbsoluteUri) .Build(); } /// /// The test server's base address (http://). /// protected Uri ServerBaseAddress { get; } /// /// The test server's base WebSockets address (ws://). /// protected Uri WebSocketBaseAddress { get; } /// /// The test server's web host. /// protected IWebHost Host { get; } /// /// Test adapter for accepting web sockets. /// protected WebSocketTestAdapter WebSocketTestAdapter { get; } = new WebSocketTestAdapter(); /// /// The source for cancellation tokens used by the test. /// protected CancellationTokenSource CancellationSource { get; } = new CancellationTokenSource(); /// /// A that can be used to cancel asynchronous operations. /// /// protected CancellationToken TestCancellation => CancellationSource.Token; /// /// Configure services for the test server. /// /// /// The service collection to configure. /// protected virtual void ConfigureTestServerServices(IServiceCollection services) { if (services == null) { throw new ArgumentNullException(nameof(services)); } // Inject WebSocketTestData. services.AddSingleton(WebSocketTestAdapter); } /// /// Configure logging for the test server. /// /// /// The logger factory to configure. /// protected virtual void ConfigureTestServerLogging(ILoggingBuilder logging) { if (logging == null) { throw new ArgumentNullException(nameof(logging)); } logging.ClearProviders(); // Don't log to console. logging.AddTestOutput(testOutput, LogLevel.Information); } /// /// Create a Kubernetes client that uses the test server. /// /// /// Optional to use for authentication (defaults to anonymous, i.e. no credentials). /// /// /// The configured client. /// protected virtual Kubernetes CreateTestClient(ServiceClientCredentials credentials = null) { return new Kubernetes(new KubernetesClientConfiguration() { Host = ServerBaseAddress.ToString(), }); } /// /// Asynchronously disconnect client and server WebSockets using the standard handshake. /// /// /// The client-side . /// /// /// The server-side . /// /// /// An optional value indicating the reason for disconnection. /// /// Defaults to . /// /// /// An optional textual description of the reason for disconnection. /// /// Defaults to "Normal Closure". /// /// /// A representing the asynchronous operation. /// protected async Task Disconnect(WebSocket clientSocket, WebSocket serverSocket, WebSocketCloseStatus closeStatus = WebSocketCloseStatus.NormalClosure, string closeStatusDescription = "Normal Closure") { if (clientSocket == null) { throw new ArgumentNullException(nameof(clientSocket)); } if (serverSocket == null) { throw new ArgumentNullException(nameof(serverSocket)); } testOutput.WriteLine("Disconnecting..."); // Asynchronously perform the server's half of the handshake (the call to clientSocket.CloseAsync will block until it receives the server-side response). ArraySegment receiveBuffer = new byte[1024]; Task closeServerSocket = serverSocket.ReceiveAsync(receiveBuffer, TestCancellation) .ContinueWith(async received => { if (received.IsFaulted) { testOutput.WriteLine( "Server socket operation to receive Close message failed: {0}", received.Exception.Flatten().InnerExceptions[0]); } else if (received.IsCanceled) { testOutput.WriteLine("Server socket operation to receive Close message was canceled."); } else { testOutput.WriteLine( $"Received {received.Result.MessageType} message from server socket (expecting {WebSocketMessageType.Close})."); if (received.Result.MessageType == WebSocketMessageType.Close) { testOutput.WriteLine( $"Closing server socket (with status {received.Result.CloseStatus})..."); await serverSocket.CloseAsync( received.Result.CloseStatus.Value, received.Result.CloseStatusDescription, TestCancellation).ConfigureAwait(false); testOutput.WriteLine("Server socket closed."); } Assert.Equal(WebSocketMessageType.Close, received.Result.MessageType); } }); testOutput.WriteLine("Closing client socket..."); await clientSocket.CloseAsync(closeStatus, closeStatusDescription, TestCancellation).ConfigureAwait(false); testOutput.WriteLine("Client socket closed."); await closeServerSocket.ConfigureAwait(false); testOutput.WriteLine("Disconnected."); Assert.Equal(closeStatus, clientSocket.CloseStatus); Assert.Equal(clientSocket.CloseStatus, serverSocket.CloseStatus); Assert.Equal(closeStatusDescription, clientSocket.CloseStatusDescription); Assert.Equal(clientSocket.CloseStatusDescription, serverSocket.CloseStatusDescription); } /// /// Send text to a multiplexed substream over the specified WebSocket. /// /// /// The target . /// /// /// The 0-based index of the target substream. /// /// /// The text to send. /// /// /// The number of bytes sent to the WebSocket. /// protected async Task SendMultiplexed(WebSocket webSocket, byte streamIndex, string text) { if (webSocket == null) { throw new ArgumentNullException(nameof(webSocket)); } if (text == null) { throw new ArgumentNullException(nameof(text)); } byte[] payload = Encoding.ASCII.GetBytes(text); byte[] sendBuffer = new byte[payload.Length + 1]; sendBuffer[0] = streamIndex; Array.Copy(payload, 0, sendBuffer, 1, payload.Length); await webSocket.SendAsync(sendBuffer, WebSocketMessageType.Binary, true, TestCancellation).ConfigureAwait(false); return sendBuffer.Length; } /// /// Receive text from a multiplexed substream over the specified WebSocket. /// /// /// The target . /// /// /// A tuple containing the received text, 0-based substream index, and total bytes received. /// protected async Task<(string text, byte streamIndex, int totalBytes)> ReceiveTextMultiplexed( WebSocket webSocket) { if (webSocket == null) { throw new ArgumentNullException(nameof(webSocket)); } byte[] receivedData; using (MemoryStream buffer = new MemoryStream()) { byte[] receiveBuffer = new byte[1024]; WebSocketReceiveResult receiveResult = await webSocket.ReceiveAsync(receiveBuffer, TestCancellation).ConfigureAwait(false); if (receiveResult.MessageType != WebSocketMessageType.Binary) { throw new IOException( $"Received unexpected WebSocket message of type '{receiveResult.MessageType}'."); } buffer.Write(receiveBuffer, 0, receiveResult.Count); while (!receiveResult.EndOfMessage) { receiveResult = await webSocket.ReceiveAsync(receiveBuffer, TestCancellation).ConfigureAwait(false); buffer.Write(receiveBuffer, 0, receiveResult.Count); } buffer.Flush(); receivedData = buffer.ToArray(); } return ( text: Encoding.ASCII.GetString(receivedData, 1, receivedData.Length - 1), streamIndex: receivedData[0], totalBytes: receivedData.Length); } /// /// A implementation representing no credentials (i.e. anonymous). /// protected class AnonymousClientCredentials : ServiceClientCredentials { /// /// The singleton instance of . /// public static readonly AnonymousClientCredentials Instance = new AnonymousClientCredentials(); /// /// Initializes a new instance of the class. /// Create new . /// private AnonymousClientCredentials() { } } /// /// Event Id constants used in WebSocket tests. /// protected static class EventIds { /// /// An error occurred while closing the server-side socket. /// private static readonly EventId ErrorClosingServerSocket = new EventId(1000, nameof(ErrorClosingServerSocket)); } protected virtual void Dispose(bool disposing) { if (!disposedValue) { if (disposing) { CancellationSource.Dispose(); Host.Dispose(); } disposedValue = true; } } // // TODO: override finalizer only if 'Dispose(bool disposing)' has code to free unmanaged resources // ~WebSocketTestBase() // { // // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method // Dispose(disposing: false); // } public void Dispose() { // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method Dispose(true); GC.SuppressFinalize(this); } } }