diff --git a/.travis.yml b/.travis.yml index 6051ec7..06106f0 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,10 +1,19 @@ language: csharp -sudo: false +sudo: false matrix: include: - - dotnet: 2.0.0 - mono: none + - mono: none dist: trusty + # We need the .NET Core 2.1 (preview 1) SDK to build. Travis doesn't know how to install this yet. + before_install: + - echo 'Installing .NET Core...' + - export DOTNET_SKIP_FIRST_TIME_EXPERIENCE=1 + - export DOTNET_CLI_TELEMETRY_OPTOUT=1 + - curl https://packages.microsoft.com/keys/microsoft.asc | gpg --dearmor > microsoft.gpg + - sudo mv microsoft.gpg /etc/apt/trusted.gpg.d/microsoft.gpg + - sudo sh -c 'echo "deb [arch=amd64] https://packages.microsoft.com/repos/microsoft-ubuntu-trusty-prod trusty main" > /etc/apt/sources.list.d/dotnetdev.list' + - sudo apt-get -qq update + - sudo apt-get install -y dotnet-sdk-2.1.300-preview1-008174 script: - ./ci.sh diff --git a/ci.sh b/ci.sh index f40f2fd..42af5b8 100755 --- a/ci.sh +++ b/ci.sh @@ -4,9 +4,12 @@ set -e # Ensure no compile errors in all projects -find . -name *.csproj -exec dotnet build {} \; +dotnet restore +dotnet build --no-restore # Execute Unit tests cd tests -dotnet restore -dotnet test +dotnet test --no-restore --no-build +if [[ $? != 0 ]]; then + exit 1 +fi diff --git a/kubernetes-client.sln b/kubernetes-client.sln index 5c467ac..294edf9 100644 --- a/kubernetes-client.sln +++ b/kubernetes-client.sln @@ -1,4 +1,3 @@ - Microsoft Visual Studio Solution File, Format Version 12.00 # Visual Studio 15 VisualStudioVersion = 15.0.26430.16 @@ -25,4 +24,7 @@ Global GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE EndGlobalSection + GlobalSection(ExtensibilityGlobals) = postSolution + SolutionGuid = {049A763A-C891-4E8D-80CF-89DD3E22ADC7} + EndGlobalSection EndGlobal diff --git a/src/CoreFX.cs b/src/CoreFX.cs new file mode 100644 index 0000000..828862f --- /dev/null +++ b/src/CoreFX.cs @@ -0,0 +1,566 @@ +/* + * This (temporary) code has been adapted from Microsoft's .NET Core 2.0.4 codebase. Original code copyright (c) .NET Foundation and Contributors. + * Hopefully, once .NET Core 2.1 lands, we can drop it in favour of the built-in ManagedWebSocket and SocketHttpHandler classes (providing they support custom validation of server certificates). + * + * Original code: https://github.com/dotnet/corefx/blob/v2.0.4/src/System.Net.WebSockets.Client/src/System/Net/WebSockets/WebSocketHandle.Managed.cs#L74 + * License: https://github.com/dotnet/corefx/blob/v2.0.4/LICENSE.TXT + * + */ + +#if NETCOREAPP2_1 + +using k8s; +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.IO; +using System.Net; +using System.Net.Http.Headers; +using System.Net.Security; +using System.Net.Sockets; +using System.Net.WebSockets; +using System.Runtime.ExceptionServices; +using System.Security.Cryptography; +using System.Security.Cryptography.X509Certificates; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace CoreFX +{ + /// + /// Connection factory for Kubernetes web sockets. + /// + internal static class K8sWebSocket + { + /// + /// GUID appended by the server as part of the security key response. + /// + /// Defined in the RFC. + /// + const string WSServerGuid = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; + + /// + /// Asynchronously connect to a Kubernetes WebSocket. + /// + /// + /// The target URI. + /// + /// + /// that control the WebSocket's configuration and connection process. + /// + /// + /// An optional that can be used to cancel the operation. + /// + /// + /// A representing the connection. + /// + public static async Task ConnectAsync(Uri uri, KubernetesWebSocketOptions options, CancellationToken cancellationToken = default(CancellationToken)) + { + try + { + // Connect to the remote server + Socket connectedSocket = await ConnectSocketAsync(uri.Host, uri.Port, cancellationToken).ConfigureAwait(false); + Stream stream = new NetworkStream(connectedSocket, ownsSocket: true); + + // Upgrade to SSL if needed + if (uri.Scheme == "wss") + { + X509Certificate2Collection clientCertificates = new X509Certificate2Collection(); + foreach (X509Certificate2 clientCertificate in options.ClientCertificates) + clientCertificates.Add(clientCertificate); + + var sslStream = new SslStream( + innerStream: stream, + leaveInnerStreamOpen: false, + userCertificateValidationCallback: options.ServerCertificateCustomValidationCallback + ); + await + sslStream.AuthenticateAsClientAsync( + uri.Host, + clientCertificates, + options.EnabledSslProtocols, + checkCertificateRevocation: false + ) + .ConfigureAwait(false); + + stream = sslStream; + } + + // Create the security key and expected response, then build all of the request headers + (string secKey, string webSocketAccept) = CreateSecKeyAndSecWebSocketAccept(); + byte[] requestHeader = BuildRequestHeader(uri, options, secKey); + + // Write out the header to the connection + await stream.WriteAsync(requestHeader, 0, requestHeader.Length, cancellationToken).ConfigureAwait(false); + + // Parse the response and store our state for the remainder of the connection + string subprotocol = await ParseAndValidateConnectResponseAsync(stream, options, webSocketAccept, cancellationToken).ConfigureAwait(false); + + return WebSocket.CreateClientWebSocket( + stream, + subprotocol, + options.ReceiveBufferSize, + options.SendBufferSize, + options.KeepAliveInterval, + false, + WebSocket.CreateClientBuffer(options.ReceiveBufferSize, options.SendBufferSize) + ); + } + catch (Exception unexpectedError) + { + throw new WebSocketException("WebSocket connection failure.", unexpectedError); + } + } + + /// Connects a socket to the specified host and port, subject to cancellation and aborting. + /// The host to which to connect. + /// The port to which to connect on the host. + /// The CancellationToken to use to cancel the websocket. + /// The connected Socket. + private static async Task ConnectSocketAsync(string host, int port, CancellationToken cancellationToken) + { + IPAddress[] addresses = await Dns.GetHostAddressesAsync(host).ConfigureAwait(false); + + ExceptionDispatchInfo lastException = null; + foreach (IPAddress address in addresses) + { + var socket = new Socket(address.AddressFamily, SocketType.Stream, ProtocolType.Tcp); + try + { + using (cancellationToken.Register(() => socket.Dispose())) + { + try + { + await socket.ConnectAsync(address, port).ConfigureAwait(false); + } + catch (ObjectDisposedException objectDisposed) + { + // If the socket was disposed because cancellation was requested, translate the exception + // into a new OperationCanceledException. Otherwise, let the original ObjectDisposedexception propagate. + if (cancellationToken.IsCancellationRequested) + { + throw new OperationCanceledException(new OperationCanceledException().Message, objectDisposed, cancellationToken); + } + } + } + cancellationToken.ThrowIfCancellationRequested(); // in case of a race and socket was disposed after the await + + return socket; + } + catch (Exception exc) + { + socket.Dispose(); + lastException = ExceptionDispatchInfo.Capture(exc); + } + } + + lastException?.Throw(); + + Debug.Fail("We should never get here. We should have already returned or an exception should have been thrown."); + throw new WebSocketException("WebSocket connection failure."); + } + + /// Creates a byte[] containing the headers to send to the server. + /// The Uri of the server. + /// The options used to configure the websocket. + /// The generated security key to send in the Sec-WebSocket-Key header. + /// The byte[] containing the encoded headers ready to send to the network. + private static byte[] BuildRequestHeader(Uri uri, KubernetesWebSocketOptions options, string secKey) + { + StringBuilder builder = new StringBuilder() + .Append("GET ") + .Append(uri.PathAndQuery) + .Append(" HTTP/1.1\r\n"); + + // Add all of the required headers, honoring Host header if set. + string hostHeader; + if (!options.RequestHeaders.TryGetValue(HttpKnownHeaderNames.Host, out hostHeader)) + hostHeader = uri.Host; + + builder.Append("Host: "); + if (String.IsNullOrEmpty(hostHeader)) + { + builder.Append(uri.IdnHost).Append(':').Append(uri.Port).Append("\r\n"); + } + else + { + builder.Append(hostHeader).Append("\r\n"); + } + + builder.Append("Connection: Upgrade\r\n"); + builder.Append("Upgrade: websocket\r\n"); + builder.Append("Sec-WebSocket-Version: 13\r\n"); + builder.Append("Sec-WebSocket-Key: ").Append(secKey).Append("\r\n"); + + // Add all of the additionally requested headers + foreach (string key in options.RequestHeaders.Keys) + { + if (String.Equals(key, HttpKnownHeaderNames.Host, StringComparison.OrdinalIgnoreCase)) + { + // Host header handled above + continue; + } + + builder.Append(key).Append(": ").Append(options.RequestHeaders[key]).Append("\r\n"); + } + + // Add the optional subprotocols header + if (options.RequestedSubProtocols.Count > 0) + { + builder.Append(HttpKnownHeaderNames.SecWebSocketProtocol).Append(": "); + builder.Append(options.RequestedSubProtocols[0]); + for (int i = 1; i < options.RequestedSubProtocols.Count; i++) + { + builder.Append(", ").Append(options.RequestedSubProtocols[i]); + } + builder.Append("\r\n"); + } + + // End the headers + builder.Append("\r\n"); + + // Return the bytes for the built up header + return Encoding.ASCII.GetBytes(builder.ToString()); + } + + /// Read and validate the connect response headers from the server. + /// The stream from which to read the response headers. + /// The options used to configure the websocket. + /// The expected value of the Sec-WebSocket-Accept header. + /// The CancellationToken to use to cancel the websocket. + /// The agreed upon subprotocol with the server, or null if there was none. + static async Task ParseAndValidateConnectResponseAsync(Stream stream, KubernetesWebSocketOptions options, string expectedSecWebSocketAccept, CancellationToken cancellationToken) + { + // Read the first line of the response + string statusLine = await ReadResponseHeaderLineAsync(stream, cancellationToken).ConfigureAwait(false); + + // Depending on the underlying sockets implementation and timing, connecting to a server that then + // immediately closes the connection may either result in an exception getting thrown from the connect + // earlier, or it may result in getting to here but reading 0 bytes. If we read 0 bytes and thus have + // an empty status line, treat it as a connect failure. + if (String.IsNullOrEmpty(statusLine)) + { + throw new WebSocketException("Connection failure."); + } + + const string ExpectedStatusStart = "HTTP/1.1 "; + const string ExpectedStatusStatWithCode = "HTTP/1.1 101"; // 101 == SwitchingProtocols + + // If the status line doesn't begin with "HTTP/1.1" or isn't long enough to contain a status code, fail. + if (!statusLine.StartsWith(ExpectedStatusStart, StringComparison.Ordinal) || statusLine.Length < ExpectedStatusStatWithCode.Length) + { + throw new WebSocketException(WebSocketError.HeaderError); + } + + // If the status line doesn't contain a status code 101, or if it's long enough to have a status description + // but doesn't contain whitespace after the 101, fail. + if (!statusLine.StartsWith(ExpectedStatusStatWithCode, StringComparison.Ordinal) || + (statusLine.Length > ExpectedStatusStatWithCode.Length && !char.IsWhiteSpace(statusLine[ExpectedStatusStatWithCode.Length]))) + { + throw new WebSocketException(WebSocketError.HeaderError, $"Connection failure (status line = '{statusLine}')."); + } + + // Read each response header. Be liberal in parsing the response header, treating + // everything to the left of the colon as the key and everything to the right as the value, trimming both. + // For each header, validate that we got the expected value. + bool foundUpgrade = false, foundConnection = false, foundSecWebSocketAccept = false; + string subprotocol = null; + string line; + while (!String.IsNullOrEmpty(line = await ReadResponseHeaderLineAsync(stream, cancellationToken).ConfigureAwait(false))) + { + int colonIndex = line.IndexOf(':'); + if (colonIndex == -1) + { + throw new WebSocketException(WebSocketError.HeaderError); + } + + string headerName = line.SubstringTrim(0, colonIndex); + string headerValue = line.SubstringTrim(colonIndex + 1); + + // The Connection, Upgrade, and SecWebSocketAccept headers are required and with specific values. + ValidateAndTrackHeader(HttpKnownHeaderNames.Connection, "Upgrade", headerName, headerValue, ref foundConnection); + ValidateAndTrackHeader(HttpKnownHeaderNames.Upgrade, "websocket", headerName, headerValue, ref foundUpgrade); + ValidateAndTrackHeader(HttpKnownHeaderNames.SecWebSocketAccept, expectedSecWebSocketAccept, headerName, headerValue, ref foundSecWebSocketAccept); + + // The SecWebSocketProtocol header is optional. We should only get it with a non-empty value if we requested subprotocols, + // and then it must only be one of the ones we requested. If we got a subprotocol other than one we requested (or if we + // already got one in a previous header), fail. Otherwise, track which one we got. + if (String.Equals(HttpKnownHeaderNames.SecWebSocketProtocol, headerName, StringComparison.OrdinalIgnoreCase) && + !String.IsNullOrWhiteSpace(headerValue)) + { + if (options.RequestedSubProtocols.Count > 0) + { + string newSubprotocol = options.RequestedSubProtocols.Find(requested => String.Equals(requested, headerValue, StringComparison.OrdinalIgnoreCase)); + if (newSubprotocol == null || subprotocol != null) + { + throw new WebSocketException( + String.Format("Unsupported sub-protocol '{0}' (expected one of [{1}]).", + newSubprotocol, + String.Join(", ", options.RequestedSubProtocols) + ) + ); + } + subprotocol = newSubprotocol; + } + } + } + if (!foundUpgrade || !foundConnection || !foundSecWebSocketAccept) + { + throw new WebSocketException("Connection failure."); + } + + return subprotocol; + } + + /// Validates a received header against expected values and tracks that we've received it. + /// The header name against which we're comparing. + /// The header value against which we're comparing. + /// The actual header name received. + /// The actual header value received. + /// A bool tracking whether this header has been seen. + private static void ValidateAndTrackHeader( + string targetHeaderName, string targetHeaderValue, + string foundHeaderName, string foundHeaderValue, + ref bool foundHeader) + { + bool isTargetHeader = String.Equals(targetHeaderName, foundHeaderName, StringComparison.OrdinalIgnoreCase); + if (!foundHeader) + { + if (isTargetHeader) + { + if (!String.Equals(targetHeaderValue, foundHeaderValue, StringComparison.OrdinalIgnoreCase)) + { + throw new WebSocketException( + $"Invalid value for '{foundHeaderName}' header: '{foundHeaderValue}' (expected '{targetHeaderValue}')." + ); + } + foundHeader = true; + } + } + else + { + if (isTargetHeader) + { + throw new WebSocketException("Connection failure."); + } + } + } + + /// Reads a line from the stream. + /// The stream from which to read. + /// The CancellationToken used to cancel the websocket. + /// The read line, or null if none could be read. + private static async Task ReadResponseHeaderLineAsync(Stream stream, CancellationToken cancellationToken) + { + StringBuilder sb = new StringBuilder(); + + var arr = new byte[1]; + char prevChar = '\0'; + try + { + // TODO: Reading one byte is extremely inefficient. The problem, however, + // is that if we read multiple bytes, we could end up reading bytes post-headers + // that are part of messages meant to be read by the managed websocket after + // the connection. The likely solution here is to wrap the stream in a BufferedStream, + // though a) that comes at the expense of an extra set of virtual calls, b) + // it adds a buffer when the managed websocket will already be using a buffer, and + // c) it's not exposed on the version of the System.IO contract we're currently using. + while (await stream.ReadAsync(arr, 0, 1, cancellationToken).ConfigureAwait(false) == 1) + { + // Process the next char + char curChar = (char)arr[0]; + if (prevChar == '\r' && curChar == '\n') + { + break; + } + sb.Append(curChar); + prevChar = curChar; + } + + if (sb.Length > 0 && sb[sb.Length - 1] == '\r') + { + sb.Length = sb.Length - 1; + } + + return sb.ToString(); + } + finally + { + sb.Clear(); + } + } + + /// + /// Create a security key for sending in the Sec-WebSocket-Key header and the associated response we expect to receive as the Sec-WebSocket-Accept header value. + /// + /// A key-value pair of the request header security key and expected response header value. + [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Security", "CA5350", Justification = "Required by RFC6455")] + static (string secKey, string expectedResponse) CreateSecKeyAndSecWebSocketAccept() + { + string secKey = Convert.ToBase64String(Guid.NewGuid().ToByteArray()); + using (SHA1 sha = SHA1.Create()) + { + return ( + secKey, + Convert.ToBase64String( + sha.ComputeHash(Encoding.ASCII.GetBytes(secKey + WSServerGuid)) + ) + ); + } + } + + static void ValidateHeader(HttpHeaders headers, string name, string expectedValue) + { + if (!headers.TryGetValues(name, out IEnumerable values)) + ThrowConnectFailure(); + + Debug.Assert(values is string[]); + string[] array = (string[])values; + if (array.Length != 1 || !String.Equals(array[0], expectedValue, StringComparison.OrdinalIgnoreCase)) + { + throw new WebSocketException( + $"Invalid WebSocker response header '{name}': [{String.Join(", ", array)}]" + ); + } + } + + static void ThrowConnectFailure() => throw new WebSocketException("Connection failure."); + } + + /// + /// Well-known HTTP header names from CoreFX used by . + /// + static class HttpKnownHeaderNames + { + public const string Accept = "Accept"; + public const string AcceptCharset = "Accept-Charset"; + public const string AcceptEncoding = "Accept-Encoding"; + public const string AcceptLanguage = "Accept-Language"; + public const string AcceptPatch = "Accept-Patch"; + public const string AcceptRanges = "Accept-Ranges"; + public const string AccessControlAllowCredentials = "Access-Control-Allow-Credentials"; + public const string AccessControlAllowHeaders = "Access-Control-Allow-Headers"; + public const string AccessControlAllowMethods = "Access-Control-Allow-Methods"; + public const string AccessControlAllowOrigin = "Access-Control-Allow-Origin"; + public const string AccessControlExposeHeaders = "Access-Control-Expose-Headers"; + public const string AccessControlMaxAge = "Access-Control-Max-Age"; + public const string Age = "Age"; + public const string Allow = "Allow"; + public const string AltSvc = "Alt-Svc"; + public const string Authorization = "Authorization"; + public const string CacheControl = "Cache-Control"; + public const string Connection = "Connection"; + public const string ContentDisposition = "Content-Disposition"; + public const string ContentEncoding = "Content-Encoding"; + public const string ContentLanguage = "Content-Language"; + public const string ContentLength = "Content-Length"; + public const string ContentLocation = "Content-Location"; + public const string ContentMD5 = "Content-MD5"; + public const string ContentRange = "Content-Range"; + public const string ContentSecurityPolicy = "Content-Security-Policy"; + public const string ContentType = "Content-Type"; + public const string Cookie = "Cookie"; + public const string Cookie2 = "Cookie2"; + public const string Date = "Date"; + public const string ETag = "ETag"; + public const string Expect = "Expect"; + public const string Expires = "Expires"; + public const string From = "From"; + public const string Host = "Host"; + public const string IfMatch = "If-Match"; + public const string IfModifiedSince = "If-Modified-Since"; + public const string IfNoneMatch = "If-None-Match"; + public const string IfRange = "If-Range"; + public const string IfUnmodifiedSince = "If-Unmodified-Since"; + public const string KeepAlive = "Keep-Alive"; + public const string LastModified = "Last-Modified"; + public const string Link = "Link"; + public const string Location = "Location"; + public const string MaxForwards = "Max-Forwards"; + public const string Origin = "Origin"; + public const string P3P = "P3P"; + public const string Pragma = "Pragma"; + public const string ProxyAuthenticate = "Proxy-Authenticate"; + public const string ProxyAuthorization = "Proxy-Authorization"; + public const string ProxyConnection = "Proxy-Connection"; + public const string PublicKeyPins = "Public-Key-Pins"; + public const string Range = "Range"; + public const string Referer = "Referer"; // NB: The spelling-mistake "Referer" for "Referrer" must be matched. + public const string RetryAfter = "Retry-After"; + public const string SecWebSocketAccept = "Sec-WebSocket-Accept"; + public const string SecWebSocketExtensions = "Sec-WebSocket-Extensions"; + public const string SecWebSocketKey = "Sec-WebSocket-Key"; + public const string SecWebSocketProtocol = "Sec-WebSocket-Protocol"; + public const string SecWebSocketVersion = "Sec-WebSocket-Version"; + public const string Server = "Server"; + public const string SetCookie = "Set-Cookie"; + public const string SetCookie2 = "Set-Cookie2"; + public const string StrictTransportSecurity = "Strict-Transport-Security"; + public const string TE = "TE"; + public const string TSV = "TSV"; + public const string Trailer = "Trailer"; + public const string TransferEncoding = "Transfer-Encoding"; + public const string Upgrade = "Upgrade"; + public const string UpgradeInsecureRequests = "Upgrade-Insecure-Requests"; + public const string UserAgent = "User-Agent"; + public const string Vary = "Vary"; + public const string Via = "Via"; + public const string WWWAuthenticate = "WWW-Authenticate"; + public const string Warning = "Warning"; + public const string XAspNetVersion = "X-AspNet-Version"; + public const string XContentDuration = "X-Content-Duration"; + public const string XContentTypeOptions = "X-Content-Type-Options"; + public const string XFrameOptions = "X-Frame-Options"; + public const string XMSEdgeRef = "X-MSEdge-Ref"; + public const string XPoweredBy = "X-Powered-By"; + public const string XRequestID = "X-Request-ID"; + public const string XUACompatible = "X-UA-Compatible"; + } + + /// + /// Extension methods for s from the CoreFX codebase (used by ). + /// + static class CoreFXStringExtensions + { + public static string SubstringTrim(this string value, int startIndex) + { + return SubstringTrim(value, startIndex, value.Length - startIndex); + } + + public static string SubstringTrim(this string value, int startIndex, int length) + { + Debug.Assert(value != null, "string must be non-null"); + Debug.Assert(startIndex >= 0, "startIndex must be non-negative"); + Debug.Assert(length >= 0, "length must be non-negative"); + Debug.Assert(startIndex <= value.Length - length, "startIndex + length must be <= value.Length"); + + if (length == 0) + { + return String.Empty; + } + + int endIndex = startIndex + length - 1; + + while (startIndex <= endIndex && char.IsWhiteSpace(value[startIndex])) + { + startIndex++; + } + + while (endIndex >= startIndex && char.IsWhiteSpace(value[endIndex])) + { + endIndex--; + } + + int newLength = endIndex - startIndex + 1; + Debug.Assert(newLength >= 0 && newLength <= value.Length, "Expected resulting length to be within value's length"); + + return + newLength == 0 ? String.Empty : + newLength == value.Length ? value : + value.Substring(startIndex, newLength); + } + } +} + +#endif // NETCOREAPP2_1 diff --git a/src/K8sProtocol.cs b/src/K8sProtocol.cs new file mode 100644 index 0000000..47db6f8 --- /dev/null +++ b/src/K8sProtocol.cs @@ -0,0 +1,46 @@ +namespace k8s +{ + /// + /// Well-known WebSocket sub-protocols used by the Kubernetes API. + /// + public static class K8sProtocol + { + /// + /// Version 1 of the Kubernetes channel WebSocket protocol. + /// + /// + /// This protocol prepends each binary message with a byte indicating the channel number (zero indexed) that the message was sent on. + /// Messages in both directions should prefix their messages with this channel byte. + /// + /// When used for remote execution, the channel numbers are by convention defined to match the POSIX file-descriptors assigned to STDIN, STDOUT, and STDERR (0, 1, and 2). + /// No other conversion is performed on the raw subprotocol - writes are sent as they are received by the server. + /// + /// Example client session: + /// + /// CONNECT http://server.com with subprotocol "channel.k8s.io" + /// WRITE []byte{0, 102, 111, 111, 10} # send "foo\n" on channel 0 (STDIN) + /// READ []byte{1, 10} # receive "\n" on channel 1 (STDOUT) + /// CLOSE + /// + public static readonly string ChannelV1 = "channel.k8s.io"; + + /// + /// Version 1 of the Kubernetes Base64-encoded channel WebSocket protocol. + /// + /// + /// This protocol base64 encodes each message with a character representing the channel number (zero indexed) the message was sent on (if the channel number is 1, then the character is '1', i.e. a byte value of 49). + /// Messages in both directions should prefix their messages with this character. + /// + /// When used for remote execution, the channel numbers are by convention defined to match the POSIX file-descriptors assigned to STDIN, STDOUT, and STDERR ('0', '1', and '2'). + /// The data received on the server is base64 decoded (and must be be valid) and data written by the server to the client is base64 encoded. + /// + /// Example client session: + /// + /// CONNECT http://server.com with subprotocol "base64.channel.k8s.io" + /// WRITE []byte{48, 90, 109, 57, 118, 67, 103, 111, 61} # send "foo\n" (base64: "Zm9vCgo=") on channel '0' (STDIN) + /// READ []byte{49, 67, 103, 61, 61} # receive "\n" (base64: "Cg==") on channel '1' (STDOUT) + /// CLOSE + /// + public static readonly string ChannelBase64V1 = "base64.channel.k8s.io"; + } +} diff --git a/src/Kubernetes.WebSocket.cs b/src/Kubernetes.WebSocket.cs index c617f62..405a3b4 100644 --- a/src/Kubernetes.WebSocket.cs +++ b/src/Kubernetes.WebSocket.cs @@ -2,8 +2,10 @@ using Microsoft.AspNetCore.WebUtilities; using Microsoft.Rest; using System; using System.Collections.Generic; +using System.Linq; using System.Net.Http; using System.Net.WebSockets; +using System.Security.Cryptography.X509Certificates; using System.Threading; using System.Threading.Tasks; @@ -204,11 +206,11 @@ namespace k8s } } - // Set Credentials + // Set Credentials #if NET452 - foreach (var cert in ((WebRequestHandler)this.HttpClientHandler).ClientCertificates) + foreach (var cert in ((WebRequestHandler)this.HttpClientHandler).ClientCertificates.OfType()) #else - foreach (var cert in this.HttpClientHandler.ClientCertificates) + foreach (var cert in this.HttpClientHandler.ClientCertificates.OfType()) #endif { webSocketBuilder.AddClientCertificate(cert); @@ -222,6 +224,15 @@ namespace k8s webSocketBuilder.SetRequestHeader(_header.Key, string.Join(" ", _header.Value)); } +#if NETCOREAPP2_1 + if (this.CaCert != null) + webSocketBuilder.ExpectServerCertificate(this.CaCert); + else + webSocketBuilder.SkipServerCertificateValidation(); + + webSocketBuilder.Options.RequestedSubProtocols.Add(K8sProtocol.ChannelV1); +#endif // NETCOREAPP2_1 + // Send Request cancellationToken.ThrowIfCancellationRequested(); diff --git a/src/KubernetesClient.csproj b/src/KubernetesClient.csproj index 9281873..0a337df 100644 --- a/src/KubernetesClient.csproj +++ b/src/KubernetesClient.csproj @@ -9,8 +9,8 @@ https://github.com/kubernetes-client/csharp kubernetes;docker;containers; - netstandard1.4;net452 - netstandard1.4 + netstandard1.4;net452;netcoreapp2.1 + netstandard1.4;netcoreapp2.1 k8s diff --git a/src/WebSocketBuilder.NetCoreApp2.1.cs b/src/WebSocketBuilder.NetCoreApp2.1.cs new file mode 100644 index 0000000..870c4a4 --- /dev/null +++ b/src/WebSocketBuilder.NetCoreApp2.1.cs @@ -0,0 +1,143 @@ +#if NETCOREAPP2_1 + +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Net.Security; +using System.Net.WebSockets; +using System.Security.Authentication; +using System.Security.Cryptography.X509Certificates; +using System.Threading; +using System.Threading.Tasks; + +namespace k8s +{ + /// + /// The creates a new object which connects to a remote WebSocket. + /// + public sealed class WebSocketBuilder + { + public KubernetesWebSocketOptions Options { get; } = new KubernetesWebSocketOptions(); + + public WebSocketBuilder() + { + } + + public WebSocketBuilder SetRequestHeader(string headerName, string headerValue) + { + Options.RequestHeaders[headerName] = headerValue; + + return this; + } + + public WebSocketBuilder AddClientCertificate(X509Certificate2 certificate) + { + Options.ClientCertificates.Add(certificate); + + return this; + } + + public WebSocketBuilder ExpectServerCertificate(X509Certificate2 serverCertificate) + { + Options.ServerCertificateCustomValidationCallback = (sender, certificate, chain, sslPolicyErrors) => + { + if (sslPolicyErrors != SslPolicyErrors.RemoteCertificateChainErrors) + return false; + + try + { + using (X509Chain certificateChain = new X509Chain()) + { + certificateChain.ChainPolicy.ExtraStore.Add(serverCertificate); + certificateChain.ChainPolicy.VerificationFlags = X509VerificationFlags.AllowUnknownCertificateAuthority; + certificateChain.ChainPolicy.RevocationMode = X509RevocationMode.NoCheck; + + return certificateChain.Build( + (X509Certificate2)certificate + ); + } + } + catch (Exception chainException) + { + Debug.WriteLine(chainException); + + return false; + } + }; + + return this; + } + + public WebSocketBuilder SkipServerCertificateValidation() + { + Options.ServerCertificateCustomValidationCallback = (sender, certificate, chain, sslPolicyErrors) => true; + + return this; + } + + public async Task BuildAndConnectAsync(Uri uri, CancellationToken cancellationToken) + { + return await CoreFX.K8sWebSocket.ConnectAsync(uri, Options, cancellationToken).ConfigureAwait(false); + } + } + + /// + /// Options for connecting to Kubernetes web sockets. + /// + public class KubernetesWebSocketOptions + { + /// + /// The default size (in bytes) for WebSocket send / receive buffers. + /// + public static readonly int DefaultBufferSize = 2048; + + /// + /// Create new . + /// + public KubernetesWebSocketOptions() + { + } + + /// + /// The requested size (in bytes) of the WebSocket send buffer. + /// + public int SendBufferSize { get; set; } = 2048; + + /// + /// The requested size (in bytes) of the WebSocket receive buffer. + /// + public int ReceiveBufferSize { get; set; } = 2048; + + /// + /// Custom request headers (if any). + /// + public Dictionary RequestHeaders { get; } = new Dictionary(StringComparer.OrdinalIgnoreCase); + + /// + /// Requested sub-protocols (if any). + /// + public List RequestedSubProtocols { get; } = new List(); + + /// + /// Client certificates (if any) to use for authentication. + /// + public List ClientCertificates = new List(); + + /// + /// An optional delegate to use for authenticating the remote server certificate. + /// + public RemoteCertificateValidationCallback ServerCertificateCustomValidationCallback { get; set; } + + /// + /// An value representing the SSL protocols that the client supports. + /// + public SslProtocols EnabledSslProtocols { get; set; } = SslProtocols.Tls; + + /// + /// The WebSocket keep-alive interval. + /// + public TimeSpan KeepAliveInterval { get; set; } = TimeSpan.FromSeconds(5); + } +} + +#endif // NETCOREAPP2_1 diff --git a/src/WebSocketBuilder.cs b/src/WebSocketBuilder.cs index 57a73d6..b618369 100644 --- a/src/WebSocketBuilder.cs +++ b/src/WebSocketBuilder.cs @@ -1,3 +1,5 @@ +#if !NETCOREAPP2_1 + using System; using System.Net.WebSockets; using System.Security.Cryptography.X509Certificates; @@ -19,7 +21,6 @@ namespace k8s public WebSocketBuilder() { - this.WebSocket = new ClientWebSocket(); } public virtual WebSocketBuilder SetRequestHeader(string headerName, string headerValue) @@ -28,7 +29,7 @@ namespace k8s return this; } - public virtual WebSocketBuilder AddClientCertificate(X509Certificate certificate) + public virtual WebSocketBuilder AddClientCertificate(X509Certificate2 certificate) { this.WebSocket.Options.ClientCertificates.Add(certificate); return this; @@ -41,3 +42,5 @@ namespace k8s } } } + +#endif // !NETCOREAPP2_1 diff --git a/tests/AuthTests.cs b/tests/AuthTests.cs index 1e2a4b6..6e73133 100644 --- a/tests/AuthTests.cs +++ b/tests/AuthTests.cs @@ -12,11 +12,17 @@ using Microsoft.AspNetCore.Hosting; using Microsoft.AspNetCore.Server.Kestrel.Https; using Microsoft.Rest; using Xunit; +using Xunit.Abstractions; namespace k8s.Tests { - public class AuthTests - { + public class AuthTests + : TestBase + { + public AuthTests(ITestOutputHelper testOutput) : base(testOutput) + { + } + private static HttpOperationResponse ExecuteListPods(IKubernetes client) { return client.ListNamespacedPodWithHttpMessagesAsync("default").Result; @@ -25,7 +31,7 @@ namespace k8s.Tests [Fact] public void Anonymous() { - using (var server = new MockKubeApiServer()) + using (var server = new MockKubeApiServer(TestOutput)) { var client = new Kubernetes(new KubernetesClientConfiguration { @@ -38,7 +44,7 @@ namespace k8s.Tests Assert.Equal(1, listTask.Body.Items.Count); } - using (var server = new MockKubeApiServer(cxt => + using (var server = new MockKubeApiServer(TestOutput, cxt => { cxt.Response.StatusCode = (int) HttpStatusCode.Unauthorized; return Task.FromResult(false); @@ -61,7 +67,7 @@ namespace k8s.Tests const string testName = "test_name"; const string testPassword = "test_password"; - using (var server = new MockKubeApiServer(cxt => + using (var server = new MockKubeApiServer(TestOutput, cxt => { var header = cxt.Request.Headers["Authorization"].FirstOrDefault(); @@ -167,7 +173,7 @@ namespace k8s.Tests var clientCertificateValidationCalled = false; - using (var server = new MockKubeApiServer(listenConfigure: options => + using (var server = new MockKubeApiServer(TestOutput, listenConfigure: options => { options.UseHttps(new HttpsConnectionAdapterOptions { @@ -249,7 +255,7 @@ namespace k8s.Tests { const string token = "testingtoken"; - using (var server = new MockKubeApiServer(cxt => + using (var server = new MockKubeApiServer(TestOutput, cxt => { var header = cxt.Request.Headers["Authorization"].FirstOrDefault(); diff --git a/tests/ByteBufferTests.cs b/tests/ByteBufferTests.cs index 5bdca7c..3bc1527 100644 --- a/tests/ByteBufferTests.cs +++ b/tests/ByteBufferTests.cs @@ -3,7 +3,7 @@ using System.Threading; using System.Threading.Tasks; using Xunit; -namespace k8s.tests +namespace k8s.Tests { /// /// Tests the class. @@ -242,7 +242,7 @@ namespace k8s.tests /// sure the call blocks until data is available. /// [Fact] - public void ReadBlocksUntilDataAvailableTest() + public async Task ReadBlocksUntilDataAvailableTest() { // Makes sure that the Read method does not return until data is available. var buffer = new ByteBuffer(); @@ -251,15 +251,16 @@ namespace k8s.tests // Kick off a read operation var readTask = Task.Run(() => read = buffer.Read(readData, 0, readData.Length)); - Thread.Sleep(250); - Assert.False(readTask.IsCompleted); + await Task.Delay(250); + Assert.False(readTask.IsCompleted, "Read task completed before data was available."); // Write data to the buffer buffer.Write(this.writeData, 0, 0x03); - Thread.Sleep(250); - - Assert.True(readTask.IsCompleted); + await TaskAssert.Completed(readTask, + timeout: TimeSpan.FromMilliseconds(1000), + message: "Timed out waiting for read task to complete." + ); Assert.Equal(3, read); Assert.Equal(0xF0, readData[0]); diff --git a/tests/Kubernetes.Exec.Tests.cs b/tests/Kubernetes.Exec.Tests.cs new file mode 100644 index 0000000..0f5e47f --- /dev/null +++ b/tests/Kubernetes.Exec.Tests.cs @@ -0,0 +1,92 @@ +/* + * These tests are for the netcoreapp2.1 version of the client (there are separate tests for netstandard that don't actually connect to a server). + */ + +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Net.WebSockets; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using Microsoft.Rest; +using Xunit; +using Xunit.Abstractions; + +namespace k8s.Tests + { + /// + /// Tests for 's exec-in-pod functionality. + /// + public class PodExecTests + : WebSocketTestBase + { + /// + /// Create a new exec-in-pod test suite. + /// + /// + /// Output for the current test. + /// + public PodExecTests(ITestOutputHelper testOutput) + : base(testOutput) + { + } + + /// + /// Verify that the client can request execution of a command in a pod's default container, with only the STDOUT stream enabled. + /// + [Fact(DisplayName = "Can exec in pod's default container, STDOUT only")] + public async Task Exec_DefaultContainer_StdOut() + { + if (!Debugger.IsAttached) + { + CancellationSource.CancelAfter( + TimeSpan.FromSeconds(5) + ); + } + await Host.StartAsync(TestCancellation); + + using (Kubernetes client = CreateTestClient()) + { + Log.LogInformation("Invoking exec operation..."); + + WebSocket clientSocket = await client.WebSocketNamespacedPodExecAsync( + name: "mypod", + @namespace: "mynamespace", + command: "/bin/bash", + container: "mycontainer", + stderr: false, + stdin: false, + stdout: true, + cancellationToken: TestCancellation + ); + Assert.Equal(K8sProtocol.ChannelV1, clientSocket.SubProtocol); // For WebSockets, the Kubernetes API defaults to the binary channel (v1) protocol. + + Log.LogInformation("Client socket connected (socket state is {ClientSocketState}). Waiting for server-side socket to become available...", clientSocket.State); + + WebSocket serverSocket = await WebSocketTestAdapter.AcceptedPodExecV1Connection; + Log.LogInformation("Server-side socket is now available (socket state is {ServerSocketState}). Sending data to server socket...", serverSocket.State); + + const int STDOUT = 1; + const string expectedOutput = "This is text send to STDOUT."; + + int bytesSent = await SendMultiplexed(serverSocket, STDOUT, expectedOutput); + Log.LogInformation("Sent {ByteCount} bytes to server socket; receiving from client socket...", bytesSent); + + (string receivedText, byte streamIndex, int bytesReceived) = await ReceiveTextMultiplexed(clientSocket); + Log.LogInformation("Received {ByteCount} bytes from client socket ('{ReceivedText}', stream {StreamIndex}).", bytesReceived, receivedText, streamIndex); + + Assert.Equal(STDOUT, streamIndex); + Assert.Equal(expectedOutput, receivedText); + + await Disconnect(clientSocket, serverSocket, + closeStatus: WebSocketCloseStatus.NormalClosure, + closeStatusDescription: "Normal Closure" + ); + + WebSocketTestAdapter.CompleteTest(); + } + } + } +} diff --git a/tests/Kubernetes.WebSockets.Tests.cs b/tests/Kubernetes.WebSockets.Tests.cs index 630e130..fd23c3b 100644 --- a/tests/Kubernetes.WebSockets.Tests.cs +++ b/tests/Kubernetes.WebSockets.Tests.cs @@ -1,3 +1,7 @@ +/* + * These tests are only for the netstandard version of the client (there are separate tests for netcoreapp that connect to a local test-hosted server). + */ + using k8s.tests.Mock; using Microsoft.Rest; using System; diff --git a/tests/Logging/TestOutputLogger.cs b/tests/Logging/TestOutputLogger.cs new file mode 100644 index 0000000..98e7483 --- /dev/null +++ b/tests/Logging/TestOutputLogger.cs @@ -0,0 +1,113 @@ +using Microsoft.Extensions.Logging; +using System; +using System.Reactive.Disposables; +using Xunit.Abstractions; + +namespace k8s.Tests.Logging +{ + /// + /// An implementation of that writes to the output of the current Xunit test. + /// + sealed class TestOutputLogger + : ILogger + { + /// + /// Create a new . + /// + /// + /// The output for the current test. + /// + /// + /// The logger's category name. + /// + /// + /// The logger's minimum log level. + /// + public TestOutputLogger(ITestOutputHelper testOutput, string loggerCategory, LogLevel minLogLevel) + { + if (testOutput == null) + throw new ArgumentNullException(nameof(testOutput)); + + if (String.IsNullOrWhiteSpace(loggerCategory)) + throw new ArgumentException("Argument cannot be null, empty, or entirely composed of whitespace: 'loggerCategory'.", nameof(loggerCategory)); + + TestOutput = testOutput; + LoggerCategory = loggerCategory; + MinLogLevel = minLogLevel; + } + + /// + /// The output for the current test. + /// + public ITestOutputHelper TestOutput { get; } + + /// + /// The logger's category name. + /// + public string LoggerCategory { get; } + + /// + /// The logger's minimum log level. + /// + public LogLevel MinLogLevel { get; } + + /// + /// Emit a log entry. + /// + /// + /// The log entry's level. + /// + /// + /// The log entry's associated event Id. + /// + /// + /// The log entry to be written. Can be also an object. + /// + /// + /// The exception (if any) related to the log entry. + /// + /// + /// A function that creates a string log message from the and . + /// + public void Log(LogLevel level, EventId eventId, TState state, Exception exception, Func formatter) + { + if (formatter == null) + throw new ArgumentNullException(nameof(formatter)); + + TestOutput.WriteLine(String.Format("[{0}] {1}: {2}", + level, + LoggerCategory, + formatter(state, exception) + )); + + if (exception != null) + { + TestOutput.WriteLine( + exception.ToString() + ); + } + } + + /// + /// Check if the given is enabled. + /// + /// + /// The level to be checked. + /// + /// + /// true if enabled; otherwise, false. + /// + public bool IsEnabled(LogLevel logLevel) => logLevel >= MinLogLevel; + + /// + /// Begin a logical operation scope. + /// + /// + /// An identifier for the scope. + /// + /// + /// An that ends the logical operation scope when disposed. + /// + public IDisposable BeginScope(TState state) => Disposable.Empty; + } +} diff --git a/tests/Logging/TestOutputLoggerProvider.cs b/tests/Logging/TestOutputLoggerProvider.cs new file mode 100644 index 0000000..7e336e2 --- /dev/null +++ b/tests/Logging/TestOutputLoggerProvider.cs @@ -0,0 +1,59 @@ +using Microsoft.Extensions.Logging; +using System; +using Xunit.Abstractions; + +namespace k8s.Tests.Logging +{ + /// + /// Logger provider for logging to Xunit test output. + /// + sealed class TestOutputLoggerProvider + : ILoggerProvider + { + /// + /// Create a new . + /// + /// + /// The output for the current test. + /// + /// + /// The logger's minimum log level. + /// + public TestOutputLoggerProvider(ITestOutputHelper testOutput, LogLevel minLogLevel) + { + if (testOutput == null) + throw new ArgumentNullException(nameof(testOutput)); + + TestOutput = testOutput; + MinLogLevel = minLogLevel; + } + + /// + /// Dispose of resources being used by the logger provider. + /// + public void Dispose() + { + } + + /// + /// The output for the current test. + /// + ITestOutputHelper TestOutput { get; } + + /// + /// The logger's minimum log level. + /// + public LogLevel MinLogLevel { get; } + + /// + /// Create a new logger. + /// + /// + /// The logger category name. + /// + /// + /// The logger, as an . + /// + public ILogger CreateLogger(string categoryName) => new TestOutputLogger(TestOutput, categoryName, MinLogLevel); + } +} diff --git a/tests/Logging/TestOutputLoggingExtensions.cs b/tests/Logging/TestOutputLoggingExtensions.cs new file mode 100644 index 0000000..c6d3532 --- /dev/null +++ b/tests/Logging/TestOutputLoggingExtensions.cs @@ -0,0 +1,67 @@ +using Microsoft.Extensions.Logging; +using System; +using Xunit.Abstractions; + +namespace k8s.Tests.Logging +{ + /// + /// Extension methods for logging to Xunit text output. + /// + public static class TestOutputLoggingExtensions + { + /// + /// Log to test output. + /// + /// + /// The global logging configuration. + /// + /// + /// Output for the current test. + /// + /// + /// The minimum level to log at. + /// + public static void AddTestOutput(this ILoggingBuilder logging, ITestOutputHelper testOutput, LogLevel minLogLevel = LogLevel.Information) + { + if (logging == null) + throw new ArgumentNullException(nameof(logging)); + + if (testOutput == null) + throw new ArgumentNullException(nameof(testOutput)); + + logging.AddProvider( + new TestOutputLoggerProvider(testOutput, minLogLevel) + ); + } + + /// + /// Log to test output. + /// + /// + /// The logger factory. + /// + /// + /// Output for the current test. + /// + /// + /// The minimum level to log at. + /// + /// + /// The logger factory (enables inline use / method-chaining). + /// + public static ILoggerFactory AddTestOutput(this ILoggerFactory loggers, ITestOutputHelper testOutput, LogLevel minLogLevel = LogLevel.Information) + { + if (loggers == null) + throw new ArgumentNullException(nameof(loggers)); + + if (testOutput == null) + throw new ArgumentNullException(nameof(testOutput)); + + loggers.AddProvider( + new TestOutputLoggerProvider(testOutput, minLogLevel) + ); + + return loggers; + } + } +} diff --git a/tests/Mock/MockKubeApiServer.cs b/tests/Mock/MockKubeApiServer.cs index d6da5cb..5a22b06 100644 --- a/tests/Mock/MockKubeApiServer.cs +++ b/tests/Mock/MockKubeApiServer.cs @@ -1,52 +1,62 @@ -using System; -using System.Linq; -using System.Net; -using System.Threading.Tasks; -using Microsoft.AspNetCore; -using Microsoft.AspNetCore.Builder; -using Microsoft.AspNetCore.Hosting; -using Microsoft.AspNetCore.Hosting.Server.Features; -using Microsoft.AspNetCore.Http; -using Microsoft.AspNetCore.Server.Kestrel.Core; - -namespace k8s.Tests.Mock -{ - public class MockKubeApiServer : IDisposable - { - // paste from minikube /api/v1/namespaces/default/pods - public const string MockPodResponse = - "{\r\n \"kind\": \"PodList\",\r\n \"apiVersion\": \"v1\",\r\n \"metadata\": {\r\n \"selfLink\": \"/api/v1/namespaces/default/pods\",\r\n \"resourceVersion\": \"1762810\"\r\n },\r\n \"items\": [\r\n {\r\n \"metadata\": {\r\n \"name\": \"nginx-1493591563-xb2v4\",\r\n \"generateName\": \"nginx-1493591563-\",\r\n \"namespace\": \"default\",\r\n \"selfLink\": \"/api/v1/namespaces/default/pods/nginx-1493591563-xb2v4\",\r\n \"uid\": \"ac1abb94-9c58-11e7-aaf5-00155d744505\",\r\n \"resourceVersion\": \"1737928\",\r\n \"creationTimestamp\": \"2017-09-18T10:03:51Z\",\r\n \"labels\": {\r\n \"app\": \"nginx\",\r\n \"pod-template-hash\": \"1493591563\"\r\n },\r\n \"annotations\": {\r\n \"kubernetes.io/created-by\": \"{\\\"kind\\\":\\\"SerializedReference\\\",\\\"apiVersion\\\":\\\"v1\\\",\\\"reference\\\":{\\\"kind\\\":\\\"ReplicaSet\\\",\\\"namespace\\\":\\\"default\\\",\\\"name\\\":\\\"nginx-1493591563\\\",\\\"uid\\\":\\\"ac013b63-9c58-11e7-aaf5-00155d744505\\\",\\\"apiVersion\\\":\\\"extensions\\\",\\\"resourceVersion\\\":\\\"5306\\\"}}\\n\"\r\n },\r\n \"ownerReferences\": [\r\n {\r\n \"apiVersion\": \"extensions/v1beta1\",\r\n \"kind\": \"ReplicaSet\",\r\n \"name\": \"nginx-1493591563\",\r\n \"uid\": \"ac013b63-9c58-11e7-aaf5-00155d744505\",\r\n \"controller\": true,\r\n \"blockOwnerDeletion\": true\r\n }\r\n ]\r\n },\r\n \"spec\": {\r\n \"volumes\": [\r\n {\r\n \"name\": \"default-token-3zzcj\",\r\n \"secret\": {\r\n \"secretName\": \"default-token-3zzcj\",\r\n \"defaultMode\": 420\r\n }\r\n }\r\n ],\r\n \"containers\": [\r\n {\r\n \"name\": \"nginx\",\r\n \"image\": \"nginx\",\r\n \"resources\": {},\r\n \"volumeMounts\": [\r\n {\r\n \"name\": \"default-token-3zzcj\",\r\n \"readOnly\": true,\r\n \"mountPath\": \"/var/run/secrets/kubernetes.io/serviceaccount\"\r\n }\r\n ],\r\n \"terminationMessagePath\": \"/dev/termination-log\",\r\n \"terminationMessagePolicy\": \"File\",\r\n \"imagePullPolicy\": \"Always\"\r\n }\r\n ],\r\n \"restartPolicy\": \"Always\",\r\n \"terminationGracePeriodSeconds\": 30,\r\n \"dnsPolicy\": \"ClusterFirst\",\r\n \"serviceAccountName\": \"default\",\r\n \"serviceAccount\": \"default\",\r\n \"nodeName\": \"ubuntu\",\r\n \"securityContext\": {},\r\n \"schedulerName\": \"default-scheduler\"\r\n },\r\n \"status\": {\r\n \"phase\": \"Running\",\r\n \"conditions\": [\r\n {\r\n \"type\": \"Initialized\",\r\n \"status\": \"True\",\r\n \"lastProbeTime\": null,\r\n \"lastTransitionTime\": \"2017-09-18T10:03:51Z\"\r\n },\r\n {\r\n \"type\": \"Ready\",\r\n \"status\": \"True\",\r\n \"lastProbeTime\": null,\r\n \"lastTransitionTime\": \"2017-10-12T07:09:21Z\"\r\n },\r\n {\r\n \"type\": \"PodScheduled\",\r\n \"status\": \"True\",\r\n \"lastProbeTime\": null,\r\n \"lastTransitionTime\": \"2017-09-18T10:03:51Z\"\r\n }\r\n ],\r\n \"hostIP\": \"192.168.188.42\",\r\n \"podIP\": \"172.17.0.5\",\r\n \"startTime\": \"2017-09-18T10:03:51Z\",\r\n \"containerStatuses\": [\r\n {\r\n \"name\": \"nginx\",\r\n \"state\": {\r\n \"running\": {\r\n \"startedAt\": \"2017-10-12T07:09:20Z\"\r\n }\r\n },\r\n \"lastState\": {\r\n \"terminated\": {\r\n \"exitCode\": 0,\r\n \"reason\": \"Completed\",\r\n \"startedAt\": \"2017-10-10T21:35:51Z\",\r\n \"finishedAt\": \"2017-10-12T07:07:37Z\",\r\n \"containerID\": \"docker://94df3f3965807421ad6dc76618e00b76cb15d024919c4946f3eb46a92659c62a\"\r\n }\r\n },\r\n \"ready\": true,\r\n \"restartCount\": 7,\r\n \"image\": \"nginx:latest\",\r\n \"imageID\": \"docker-pullable://nginx@sha256:004ac1d5e791e705f12a17c80d7bb1e8f7f01aa7dca7deee6e65a03465392072\",\r\n \"containerID\": \"docker://fa11bdd48c9b7d3a6c4c3f9b6d7319743c3455ab8d00c57d59c083b319b88194\"\r\n }\r\n ],\r\n \"qosClass\": \"BestEffort\"\r\n }\r\n }\r\n ]\r\n}" - ; - - private readonly IWebHost _webHost; - - public MockKubeApiServer(Func> shouldNext = null, Action listenConfigure = null, - string resp = MockPodResponse) - { - shouldNext = shouldNext ?? (_ => Task.FromResult(true)); - listenConfigure = listenConfigure ?? (_ => { }); - - _webHost = WebHost.CreateDefaultBuilder() - .Configure(app => app.Run(async httpContext => - { - if (await shouldNext(httpContext)) - { - await httpContext.Response.WriteAsync(resp); - } - })) - .UseKestrel(options => { options.Listen(IPAddress.Loopback, 0, listenConfigure); }) - .Build(); - - _webHost.Start(); - } - - public Uri Uri => _webHost.ServerFeatures.Get().Addresses - .Select(a => new Uri(a)).First(); - - public void Dispose() - { - _webHost.StopAsync(); - _webHost.WaitForShutdown(); - } - } -} \ No newline at end of file +using System; +using System.Linq; +using System.Net; +using System.Threading.Tasks; +using k8s.Tests.Logging; +using Microsoft.AspNetCore; +using Microsoft.AspNetCore.Builder; +using Microsoft.AspNetCore.Hosting; +using Microsoft.AspNetCore.Hosting.Server.Features; +using Microsoft.AspNetCore.Http; +using Microsoft.AspNetCore.Server.Kestrel.Core; +using Microsoft.Extensions.Logging; +using Xunit.Abstractions; + +namespace k8s.Tests.Mock +{ + public class MockKubeApiServer : IDisposable + { + // paste from minikube /api/v1/namespaces/default/pods + public const string MockPodResponse = + "{\r\n \"kind\": \"PodList\",\r\n \"apiVersion\": \"v1\",\r\n \"metadata\": {\r\n \"selfLink\": \"/api/v1/namespaces/default/pods\",\r\n \"resourceVersion\": \"1762810\"\r\n },\r\n \"items\": [\r\n {\r\n \"metadata\": {\r\n \"name\": \"nginx-1493591563-xb2v4\",\r\n \"generateName\": \"nginx-1493591563-\",\r\n \"namespace\": \"default\",\r\n \"selfLink\": \"/api/v1/namespaces/default/pods/nginx-1493591563-xb2v4\",\r\n \"uid\": \"ac1abb94-9c58-11e7-aaf5-00155d744505\",\r\n \"resourceVersion\": \"1737928\",\r\n \"creationTimestamp\": \"2017-09-18T10:03:51Z\",\r\n \"labels\": {\r\n \"app\": \"nginx\",\r\n \"pod-template-hash\": \"1493591563\"\r\n },\r\n \"annotations\": {\r\n \"kubernetes.io/created-by\": \"{\\\"kind\\\":\\\"SerializedReference\\\",\\\"apiVersion\\\":\\\"v1\\\",\\\"reference\\\":{\\\"kind\\\":\\\"ReplicaSet\\\",\\\"namespace\\\":\\\"default\\\",\\\"name\\\":\\\"nginx-1493591563\\\",\\\"uid\\\":\\\"ac013b63-9c58-11e7-aaf5-00155d744505\\\",\\\"apiVersion\\\":\\\"extensions\\\",\\\"resourceVersion\\\":\\\"5306\\\"}}\\n\"\r\n },\r\n \"ownerReferences\": [\r\n {\r\n \"apiVersion\": \"extensions/v1beta1\",\r\n \"kind\": \"ReplicaSet\",\r\n \"name\": \"nginx-1493591563\",\r\n \"uid\": \"ac013b63-9c58-11e7-aaf5-00155d744505\",\r\n \"controller\": true,\r\n \"blockOwnerDeletion\": true\r\n }\r\n ]\r\n },\r\n \"spec\": {\r\n \"volumes\": [\r\n {\r\n \"name\": \"default-token-3zzcj\",\r\n \"secret\": {\r\n \"secretName\": \"default-token-3zzcj\",\r\n \"defaultMode\": 420\r\n }\r\n }\r\n ],\r\n \"containers\": [\r\n {\r\n \"name\": \"nginx\",\r\n \"image\": \"nginx\",\r\n \"resources\": {},\r\n \"volumeMounts\": [\r\n {\r\n \"name\": \"default-token-3zzcj\",\r\n \"readOnly\": true,\r\n \"mountPath\": \"/var/run/secrets/kubernetes.io/serviceaccount\"\r\n }\r\n ],\r\n \"terminationMessagePath\": \"/dev/termination-log\",\r\n \"terminationMessagePolicy\": \"File\",\r\n \"imagePullPolicy\": \"Always\"\r\n }\r\n ],\r\n \"restartPolicy\": \"Always\",\r\n \"terminationGracePeriodSeconds\": 30,\r\n \"dnsPolicy\": \"ClusterFirst\",\r\n \"serviceAccountName\": \"default\",\r\n \"serviceAccount\": \"default\",\r\n \"nodeName\": \"ubuntu\",\r\n \"securityContext\": {},\r\n \"schedulerName\": \"default-scheduler\"\r\n },\r\n \"status\": {\r\n \"phase\": \"Running\",\r\n \"conditions\": [\r\n {\r\n \"type\": \"Initialized\",\r\n \"status\": \"True\",\r\n \"lastProbeTime\": null,\r\n \"lastTransitionTime\": \"2017-09-18T10:03:51Z\"\r\n },\r\n {\r\n \"type\": \"Ready\",\r\n \"status\": \"True\",\r\n \"lastProbeTime\": null,\r\n \"lastTransitionTime\": \"2017-10-12T07:09:21Z\"\r\n },\r\n {\r\n \"type\": \"PodScheduled\",\r\n \"status\": \"True\",\r\n \"lastProbeTime\": null,\r\n \"lastTransitionTime\": \"2017-09-18T10:03:51Z\"\r\n }\r\n ],\r\n \"hostIP\": \"192.168.188.42\",\r\n \"podIP\": \"172.17.0.5\",\r\n \"startTime\": \"2017-09-18T10:03:51Z\",\r\n \"containerStatuses\": [\r\n {\r\n \"name\": \"nginx\",\r\n \"state\": {\r\n \"running\": {\r\n \"startedAt\": \"2017-10-12T07:09:20Z\"\r\n }\r\n },\r\n \"lastState\": {\r\n \"terminated\": {\r\n \"exitCode\": 0,\r\n \"reason\": \"Completed\",\r\n \"startedAt\": \"2017-10-10T21:35:51Z\",\r\n \"finishedAt\": \"2017-10-12T07:07:37Z\",\r\n \"containerID\": \"docker://94df3f3965807421ad6dc76618e00b76cb15d024919c4946f3eb46a92659c62a\"\r\n }\r\n },\r\n \"ready\": true,\r\n \"restartCount\": 7,\r\n \"image\": \"nginx:latest\",\r\n \"imageID\": \"docker-pullable://nginx@sha256:004ac1d5e791e705f12a17c80d7bb1e8f7f01aa7dca7deee6e65a03465392072\",\r\n \"containerID\": \"docker://fa11bdd48c9b7d3a6c4c3f9b6d7319743c3455ab8d00c57d59c083b319b88194\"\r\n }\r\n ],\r\n \"qosClass\": \"BestEffort\"\r\n }\r\n }\r\n ]\r\n}" + ; + + private readonly IWebHost _webHost; + + public MockKubeApiServer(ITestOutputHelper testOutput, Func> shouldNext = null, Action listenConfigure = null, + string resp = MockPodResponse) + { + shouldNext = shouldNext ?? (_ => Task.FromResult(true)); + listenConfigure = listenConfigure ?? (_ => { }); + + _webHost = WebHost.CreateDefaultBuilder() + .Configure(app => app.Run(async httpContext => + { + if (await shouldNext(httpContext)) + { + await httpContext.Response.WriteAsync(resp); + } + })) + .UseKestrel(options => { options.Listen(IPAddress.Loopback, 0, listenConfigure); }) + .ConfigureLogging(logging => + { + logging.ClearProviders(); + + if (testOutput != null) + logging.AddTestOutput(testOutput); + }) + .Build(); + + _webHost.Start(); + } + + public Uri Uri => _webHost.ServerFeatures.Get().Addresses + .Select(a => new Uri(a)).First(); + + public void Dispose() + { + _webHost.StopAsync(); + _webHost.WaitForShutdown(); + } + } +} diff --git a/tests/Mock/MockWebSocketBuilder.cs b/tests/Mock/MockWebSocketBuilder.cs index dde2bbe..9dc5f05 100644 --- a/tests/Mock/MockWebSocketBuilder.cs +++ b/tests/Mock/MockWebSocketBuilder.cs @@ -1,3 +1,5 @@ +#if !NETCOREAPP2_1 + using System; using System.Collections.Generic; using System.Collections.ObjectModel; @@ -12,13 +14,13 @@ namespace k8s.tests.Mock { public Dictionary RequestHeaders { get; } = new Dictionary(); - public Collection Certificates { get; } = new Collection(); + public Collection Certificates { get; } = new Collection(); public Uri Uri { get; private set; } public WebSocket PublicWebSocket => this.WebSocket; - public override WebSocketBuilder AddClientCertificate(X509Certificate certificate) + public override WebSocketBuilder AddClientCertificate(X509Certificate2 certificate) { this.Certificates.Add(certificate); return this; @@ -27,7 +29,8 @@ namespace k8s.tests.Mock public override Task BuildAndConnectAsync(Uri uri, CancellationToken cancellationToken) { this.Uri = uri; - return Task.FromResult(this.WebSocket); + + return Task.FromResult(this.PublicWebSocket); } public override WebSocketBuilder SetRequestHeader(string headerName, string headerValue) @@ -37,3 +40,5 @@ namespace k8s.tests.Mock } } } + +#endif // !NETCOREAPP2_1 diff --git a/tests/Mock/Server/Controllers/PodExecController.cs b/tests/Mock/Server/Controllers/PodExecController.cs new file mode 100644 index 0000000..5d71b82 --- /dev/null +++ b/tests/Mock/Server/Controllers/PodExecController.cs @@ -0,0 +1,61 @@ +using Microsoft.AspNetCore.Mvc; +using System; +using System.Net.WebSockets; +using System.Threading; +using System.Threading.Tasks; + +namespace k8s.Tests.Mock.Server.Controllers +{ + /// + /// Controller for the mock Kubernetes exec-in-pod API. + /// + [Route("api/v1")] + public class PodExecController + : Controller + { + /// + /// Create a new . + /// + /// + /// The adapter used to capture sockets accepted by the test server and provide them to the calling test. + /// + public PodExecController(WebSocketTestAdapter webSocketTestAdapter) + { + if (webSocketTestAdapter == null) + throw new ArgumentNullException(nameof(webSocketTestAdapter)); + + WebSocketTestAdapter = webSocketTestAdapter; + } + + /// + /// The adapter used to capture sockets accepted by the test server and provide them to the calling test. + /// + WebSocketTestAdapter WebSocketTestAdapter { get; } + + /// + /// Mock Kubernetes API: exec-in-pod. + /// + /// + /// The target pod's containing namespace. + /// + /// + /// The target pod's name. + /// + [Route("namespaces/{kubeNamespace}/pods/{podName}/exec")] + public async Task Exec(string kubeNamespace, string podName) + { + if (!HttpContext.WebSockets.IsWebSocketRequest) + return BadRequest("Exec requires WebSockets"); + + WebSocket webSocket = await HttpContext.WebSockets.AcceptWebSocketAsync( + subProtocol: K8sProtocol.ChannelV1 + ); + + WebSocketTestAdapter.AcceptedPodExecV1Connection.AcceptServerSocket(webSocket); + + await WebSocketTestAdapter.TestCompleted; + + return Ok(); + } + } +} diff --git a/tests/Mock/Server/Controllers/PodPortForwardController.cs b/tests/Mock/Server/Controllers/PodPortForwardController.cs new file mode 100644 index 0000000..2f2ea0f --- /dev/null +++ b/tests/Mock/Server/Controllers/PodPortForwardController.cs @@ -0,0 +1,65 @@ +using Microsoft.AspNetCore.Mvc; +using System; +using System.Collections.Generic; +using System.Net.WebSockets; +using System.Threading; +using System.Threading.Tasks; + +namespace k8s.Tests.Mock.Server.Controllers +{ + /// + /// Controller for the mock Kubernetes pod-port-forward API. + /// + [Route("api/v1")] + public class PodPortForwardController + : Controller + { + /// + /// Create a new . + /// + /// + /// The adapter used to capture sockets accepted by the test server and provide them to the calling test. + /// + public PodPortForwardController(WebSocketTestAdapter webSocketTestAdapter) + { + if (webSocketTestAdapter == null) + throw new ArgumentNullException(nameof(webSocketTestAdapter)); + + WebSocketTestAdapter = webSocketTestAdapter; + } + + /// + /// The adapter used to capture sockets accepted by the test server and provide them to the calling test. + /// + WebSocketTestAdapter WebSocketTestAdapter { get; } + + /// + /// Mock Kubernetes API: port-forward for pod. + /// + /// + /// The target pod's containing namespace. + /// + /// + /// The target pod's name. + /// + /// + /// The port(s) to forward to the pod. + /// + [Route("namespaces/{kubeNamespace}/pods/{podName}/portforward")] + public async Task Exec(string kubeNamespace, string podName, IEnumerable ports) + { + if (!HttpContext.WebSockets.IsWebSocketRequest) + return BadRequest("PortForward requires WebSockets"); + + WebSocket webSocket = await HttpContext.WebSockets.AcceptWebSocketAsync( + subProtocol: K8sProtocol.ChannelV1 + ); + + WebSocketTestAdapter.AcceptedPodPortForwardV1Connection.AcceptServerSocket(webSocket); + + await WebSocketTestAdapter.TestCompleted; + + return Ok(); + } + } +} diff --git a/tests/Mock/Server/Startup.cs b/tests/Mock/Server/Startup.cs new file mode 100644 index 0000000..ce5ddc9 --- /dev/null +++ b/tests/Mock/Server/Startup.cs @@ -0,0 +1,54 @@ +using Microsoft.AspNetCore.Builder; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using System; + +namespace k8s.Tests.Mock.Server +{ + /// + /// Startup logic for the KubeClient WebSockets test server. + /// + public class Startup + { + /// + /// Create a new . + /// + public Startup() + { + } + + /// + /// Configure application services. + /// + /// + /// The service collection to configure. + /// + public void ConfigureServices(IServiceCollection services) + { + if (services == null) + throw new ArgumentNullException(nameof(services)); + + services.AddLogging(logging => + { + logging.ClearProviders(); // Logger provider will be added by the calling test. + }); + services.AddMvc(); + } + + /// + /// Configure the application pipeline. + /// + /// + /// The application pipeline builder. + /// + public void Configure(IApplicationBuilder app) + { + app.UseWebSockets(new WebSocketOptions + { + KeepAliveInterval = TimeSpan.FromSeconds(5), + ReceiveBufferSize = 2048 + }); + app.UseMvc(); + } + } +} diff --git a/tests/Mock/Server/WebSocketTestAdapter.cs b/tests/Mock/Server/WebSocketTestAdapter.cs new file mode 100644 index 0000000..1cc1173 --- /dev/null +++ b/tests/Mock/Server/WebSocketTestAdapter.cs @@ -0,0 +1,98 @@ +using System; +using System.Net.WebSockets; +using System.Runtime.CompilerServices; +using System.Threading.Tasks; + +namespace k8s.Tests.Mock.Server +{ + /// + /// Adapter used to capture WebSockets accepted by the test server and provide them to calling test. + /// + /// + /// Each AcceptedXXXConnection property returns an awaitable object that yields a the server-side WebSocket once a connection has been accepted. + /// + /// All server-side WebSockets will be closed when is called. + /// + public class WebSocketTestAdapter + { + /// + /// Completion source for the task. + /// + readonly TaskCompletionSource _testCompletion = new TaskCompletionSource(); + + /// + /// A that completes when the test is complete (providing is called). + /// + public Task TestCompleted => _testCompletion.Task; + + /// + /// await server-side acceptance of a WebSocket connection for the exec-in-pod (v1) API. + /// + public ServerSocketAcceptance AcceptedPodExecV1Connection { get; } = new ServerSocketAcceptance(); + + /// + /// await server-side acceptance of a WebSocket connection for the pod-port-forward (v1) API. + /// + public ServerSocketAcceptance AcceptedPodPortForwardV1Connection { get; } = new ServerSocketAcceptance(); + + /// + /// Mark the current test as complete, closing all server-side sockets. + /// + public void CompleteTest() => _testCompletion.SetResult(true); + + /// + /// An object that enables awaiting server-side acceptance of a WebSocket connection. + /// + /// + /// Simply await this object to wait for the server socket to be accepted. + /// + public class ServerSocketAcceptance + { + /// + /// Completion source for the task. + /// + readonly TaskCompletionSource _completion = new TaskCompletionSource(); + + /// + /// A that completes when the server accepts a WebSocket connection (i.e. when or is called). + /// + public Task Task => _completion.Task; + + /// + /// Notify the calling test that the server has accepted a WebSocket connection. + /// + /// + /// The server-side . + /// + public void AcceptServerSocket(WebSocket serverSocket) + { + if (serverSocket == null) + throw new ArgumentNullException(nameof(serverSocket)); + + _completion.SetResult(serverSocket); + } + + /// + /// Notify the calling test that the server has rejected a WebSocket connection. + /// + /// + /// An representing the reason that the connection was rejected. + /// + public void RejectServerSocket(Exception reason) + { + if (reason == null) + throw new ArgumentNullException(nameof(reason)); + + _completion.SetException(reason); + } + + /// + /// Get an awaiter for the socket-acceptance task. + /// + /// + /// The . + /// + public TaskAwaiter GetAwaiter() => Task.GetAwaiter(); + } + } +} diff --git a/tests/TaskAssert.cs b/tests/TaskAssert.cs new file mode 100644 index 0000000..7273f67 --- /dev/null +++ b/tests/TaskAssert.cs @@ -0,0 +1,43 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Xunit; + +namespace k8s.Tests +{ + static class TaskAssert + { + public static void NotCompleted(Task task,string message = "Task should not be completed") + { + Assert.False(task.IsCompleted, message); + } + + public static async Task Completed(Task task, TimeSpan timeout, string message = "Task timed out") + { + var timeoutTask = Task.Delay( + TimeSpan.FromMilliseconds(1000) + ); + + var completedTask = await Task.WhenAny(task, timeoutTask); + Assert.True(ReferenceEquals(task, completedTask), message); + + await completedTask; + } + + public static async Task Completed(Task task, TimeSpan timeout, string message = "Task timed out") + { + var timeoutTask = + Task.Delay( + TimeSpan.FromMilliseconds(1000) + ) + .ContinueWith( + completedTimeoutTask => default(T) // Value is never returned, but we need a task of the same result type in order to use Task.WhenAny. + ); + + var completedTask = await Task.WhenAny(task, timeoutTask); + Assert.True(ReferenceEquals(task, completedTask), message); + + return await completedTask; + } + } +} diff --git a/tests/TestBase.cs b/tests/TestBase.cs new file mode 100644 index 0000000..eb17c8a --- /dev/null +++ b/tests/TestBase.cs @@ -0,0 +1,131 @@ +using k8s.Tests.Logging; +using System; +using System.Reactive.Disposables; +using System.Reflection; +using System.Threading; +using Microsoft.Extensions.Logging; +using Xunit; +using Xunit.Abstractions; + +namespace k8s.Tests +{ + /// + /// The base class for test suites. + /// + public abstract class TestBase + : IDisposable + { + /// + /// Create a new test-suite. + /// + /// + /// Output for the current test. + /// + protected TestBase(ITestOutputHelper testOutput) + { + if (testOutput == null) + throw new ArgumentNullException(nameof(testOutput)); + + // We *must* have a synchronisation context for the test, or we'll see random deadlocks. + if (SynchronizationContext.Current == null) + { + SynchronizationContext.SetSynchronizationContext( + new SynchronizationContext() + ); + } + + TestOutput = testOutput; + LoggerFactory = new LoggerFactory().AddTestOutput(TestOutput, MinLogLevel); + Log = LoggerFactory.CreateLogger("CurrentTest"); + + // Ugly hack to get access to metadata for the current test. + CurrentTest = (ITest) + TestOutput.GetType() + .GetField("test", BindingFlags.NonPublic | BindingFlags.Instance) + .GetValue(TestOutput); + + Assert.True(CurrentTest != null, "Cannot retrieve current test from ITestOutputHelper."); + + Disposal.Add( + Log.BeginScope("CurrentTest", CurrentTest.DisplayName) + ); + } + + /// + /// Finaliser for . + /// + ~TestBase() + { + Dispose(false); + } + + /// + /// Dispose of resources being used by the test suite. + /// + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + /// + /// Dispose of resources being used by the test suite. + /// + /// + /// Explicit disposal? + /// + protected virtual void Dispose(bool disposing) + { + if (disposing) + { + try + { + Disposal.Dispose(); + } + finally + { + if (LoggerFactory is IDisposable loggerFactoryDisposal) + loggerFactoryDisposal.Dispose(); + + if (Log is IDisposable logDisposal) + logDisposal.Dispose(); + } + } + } + + /// + /// A representing resources used by the test. + /// + protected CompositeDisposable Disposal { get; } = new CompositeDisposable(); + + /// + /// Output for the current test. + /// + protected ITestOutputHelper TestOutput { get; } + + /// + /// A representing the current test. + /// + protected ITest CurrentTest { get; } + + /// + /// The logger for the current test. + /// + protected ILogger Log { get; } + + /// + /// The logger factory for the current test. + /// + protected ILoggerFactory LoggerFactory { get; } + + /// + /// The logging level for the current test. + /// + protected virtual LogLevel MinLogLevel => LogLevel.Information; + + /// + /// The test server logging level for the current test. + /// + protected virtual LogLevel MinServerLogLevel => LogLevel.Warning; + } +} diff --git a/tests/V1StatusObjectViewTests.cs b/tests/V1StatusObjectViewTests.cs index 8a90fb7..4161739 100644 --- a/tests/V1StatusObjectViewTests.cs +++ b/tests/V1StatusObjectViewTests.cs @@ -2,11 +2,17 @@ using k8s.Models; using k8s.Tests.Mock; using Newtonsoft.Json; using Xunit; +using Xunit.Abstractions; namespace k8s.Tests { - public class V1StatusObjectViewTests - { + public class V1StatusObjectViewTests + : TestBase + { + public V1StatusObjectViewTests(ITestOutputHelper testOutput) : base(testOutput) + { + } + [Fact] public void ReturnStatus() { @@ -16,7 +22,7 @@ namespace k8s.Tests Status = "test status" }; - using (var server = new MockKubeApiServer(resp: JsonConvert.SerializeObject(v1Status))) + using (var server = new MockKubeApiServer(TestOutput, resp: JsonConvert.SerializeObject(v1Status))) { var client = new Kubernetes(new KubernetesClientConfiguration { @@ -39,30 +45,30 @@ namespace k8s.Tests Metadata = new V1ObjectMeta() { Name = "test name" - }, + }, Status = new V1NamespaceStatus() { Phase = "test termating" } - }; + }; - using (var server = new MockKubeApiServer(resp: JsonConvert.SerializeObject(corev1Namespace))) + using (var server = new MockKubeApiServer(TestOutput, resp: JsonConvert.SerializeObject(corev1Namespace))) { var client = new Kubernetes(new KubernetesClientConfiguration { Host = server.Uri.ToString() }); - var status = client.DeleteNamespace(new V1DeleteOptions(), "test"); + var status = client.DeleteNamespace(new V1DeleteOptions(), "test"); Assert.True(status.HasObject); var obj = status.ObjectView(); Assert.Equal(obj.Metadata.Name, corev1Namespace.Metadata.Name); - Assert.Equal(obj.Status.Phase, corev1Namespace.Status.Phase); + Assert.Equal(obj.Status.Phase, corev1Namespace.Status.Phase); } } - } + } } diff --git a/tests/WatchTests.cs b/tests/WatchTests.cs index 30b472e..bec59d9 100644 --- a/tests/WatchTests.cs +++ b/tests/WatchTests.cs @@ -1,332 +1,371 @@ -using System; -using System.Collections.Generic; -using System.IO; -using System.Linq; -using System.Net; -using System.Net.Http; -using System.Threading; -using System.Threading.Tasks; -using k8s.Exceptions; -using k8s.Models; -using k8s.Tests.Mock; -using Microsoft.AspNetCore.Http; -using Newtonsoft.Json; -using Newtonsoft.Json.Converters; -using Xunit; - -namespace k8s.Tests -{ - public class WatchTests - { - private static readonly string MockAddedEventStreamLine = BuildWatchEventStreamLine(WatchEventType.Added); - private static readonly string MockDeletedStreamLine = BuildWatchEventStreamLine(WatchEventType.Deleted); - private static readonly string MockModifiedStreamLine = BuildWatchEventStreamLine(WatchEventType.Modified); - private static readonly string MockErrorStreamLine = BuildWatchEventStreamLine(WatchEventType.Error); - private static readonly string MockBadStreamLine = "bad json"; - - private static string BuildWatchEventStreamLine(WatchEventType eventType) - { - var corev1PodList = JsonConvert.DeserializeObject(MockKubeApiServer.MockPodResponse); - return JsonConvert.SerializeObject(new Watcher.WatchEvent - { - Type = eventType, - Object = corev1PodList.Items.First() - }, new StringEnumConverter()); - } - - private static async Task WriteStreamLine(HttpContext httpContext, string reponseLine) - { - const string crlf = "\r\n"; - await httpContext.Response.WriteAsync(reponseLine.Replace(crlf, "")); - await httpContext.Response.WriteAsync(crlf); - await httpContext.Response.Body.FlushAsync(); - } - - [Fact] - public void CannotWatch() - { - using (var server = new MockKubeApiServer()) - { - var client = new Kubernetes(new KubernetesClientConfiguration - { - Host = server.Uri.ToString() - }); - - // did not pass watch param - { - var listTask = client.ListNamespacedPodWithHttpMessagesAsync("default").Result; - Assert.ThrowsAny(() => - { - listTask.Watch((type, item) => { }); - }); - } - - // server did not response line by line - { - Assert.ThrowsAny(() => - { - var listTask = client.ListNamespacedPodWithHttpMessagesAsync("default", watch: true).Result; - - // this line did not throw - // listTask.Watch((type, item) => { }); - }); - } - } - } - - [Fact] - public void SuriveBadLine() - { - using (var server = new MockKubeApiServer(async httpContext => - { - httpContext.Response.StatusCode = (int) HttpStatusCode.OK; - httpContext.Response.ContentLength = null; - - await WriteStreamLine(httpContext, MockKubeApiServer.MockPodResponse); - await Task.Delay(TimeSpan.FromMilliseconds(100)); - - await WriteStreamLine(httpContext, MockBadStreamLine); - await Task.Delay(TimeSpan.FromMilliseconds(100)); - - await WriteStreamLine(httpContext, MockAddedEventStreamLine); - await Task.Delay(TimeSpan.FromMilliseconds(100)); - - await WriteStreamLine(httpContext, MockBadStreamLine); - await Task.Delay(TimeSpan.FromMilliseconds(100)); - - await WriteStreamLine(httpContext, MockModifiedStreamLine); - await Task.Delay(TimeSpan.FromMilliseconds(100)); - - // make server alive, cannot set to int.max as of it would block response - await Task.Delay(TimeSpan.FromDays(1)); - return false; - })) - { - var client = new Kubernetes(new KubernetesClientConfiguration - { - Host = server.Uri.ToString() - }); - - var listTask = client.ListNamespacedPodWithHttpMessagesAsync("default", watch: true).Result; - - - var events = new HashSet(); - var errors = 0; - - var watcher = listTask.Watch( - (type, item) => { events.Add(type); }, - e => { errors += 1; } - ); - - // wait server yields all events - Thread.Sleep(TimeSpan.FromMilliseconds(1000)); - - Assert.Contains(WatchEventType.Added, events); - Assert.Contains(WatchEventType.Modified, events); - - Assert.Equal(2, errors); - - Assert.True(watcher.Watching); - - // prevent from server down exception trigger - Thread.Sleep(TimeSpan.FromMilliseconds(1000)); - } - } - - [Fact] - public void DisposeWatch() - { - using (var server = new MockKubeApiServer(async httpContext => - { - await WriteStreamLine(httpContext, MockKubeApiServer.MockPodResponse); - await Task.Delay(TimeSpan.FromMilliseconds(100)); - - for (;;) - { - await WriteStreamLine(httpContext, MockAddedEventStreamLine); - await Task.Delay(TimeSpan.FromMilliseconds(100)); - - } - })) - { - var client = new Kubernetes(new KubernetesClientConfiguration - { - Host = server.Uri.ToString() - }); - - var listTask = client.ListNamespacedPodWithHttpMessagesAsync("default", watch: true).Result; - - - var events = new HashSet(); - - var watcher = listTask.Watch( - (type, item) => { events.Add(type); } - ); - - // wait at least an event - Thread.Sleep(TimeSpan.FromMilliseconds(1000)); - - Assert.NotEmpty(events); - Assert.True(watcher.Watching); - - watcher.Dispose(); - - events.Clear(); - - // make sure wait event called - Thread.Sleep(TimeSpan.FromMilliseconds(1000)); - Assert.Empty(events); - Assert.False(watcher.Watching); - - } - } - - [Fact] - public void WatchAllEvents() - { - using (var server = new MockKubeApiServer(async httpContext => - { - await WriteStreamLine(httpContext, MockKubeApiServer.MockPodResponse); - await Task.Delay(TimeSpan.FromMilliseconds(100)); - - await WriteStreamLine(httpContext, MockAddedEventStreamLine); - await Task.Delay(TimeSpan.FromMilliseconds(100)); - - await WriteStreamLine(httpContext, MockDeletedStreamLine); - await Task.Delay(TimeSpan.FromMilliseconds(100)); - - await WriteStreamLine(httpContext, MockModifiedStreamLine); - await Task.Delay(TimeSpan.FromMilliseconds(100)); - - await WriteStreamLine(httpContext, MockErrorStreamLine); - await Task.Delay(TimeSpan.FromMilliseconds(100)); - - // make server alive, cannot set to int.max as of it would block response - await Task.Delay(TimeSpan.FromDays(1)); - return false; - })) - { - var client = new Kubernetes(new KubernetesClientConfiguration - { - Host = server.Uri.ToString() - }); - - var listTask = client.ListNamespacedPodWithHttpMessagesAsync("default", watch: true).Result; - - - var events = new HashSet(); - var errors = 0; - - var watcher = listTask.Watch( - (type, item) => { events.Add(type); }, - e => { errors += 1; } - ); - - // wait server yields all events - Thread.Sleep(TimeSpan.FromMilliseconds(1000)); - - Assert.Contains(WatchEventType.Added, events); - Assert.Contains(WatchEventType.Deleted, events); - Assert.Contains(WatchEventType.Modified, events); - Assert.Contains(WatchEventType.Error, events); - - - Assert.Equal(0, errors); - - Assert.True(watcher.Watching); - } - } - - [Fact] - public void WatchServerDisconnect() - { - Watcher watcher; - Exception exceptionCatched = null; - - using (var server = new MockKubeApiServer(async httpContext => - { - await WriteStreamLine(httpContext, MockKubeApiServer.MockPodResponse); - - // make sure watch success - await Task.Delay(TimeSpan.FromMilliseconds(200)); - - throw new IOException("server down"); - })) - { - var client = new Kubernetes(new KubernetesClientConfiguration - { - Host = server.Uri.ToString() - }); - - var listTask = client.ListNamespacedPodWithHttpMessagesAsync("default", watch: true).Result; - - watcher = listTask.Watch( - (type, item) => { }, - e => { exceptionCatched = e; }); - } - - // wait server down - Thread.Sleep(TimeSpan.FromMilliseconds(1000)); - - Assert.False(watcher.Watching); - Assert.IsType(exceptionCatched); - } - - private class DummyHandler : DelegatingHandler - { - internal bool Called { get; private set; } - - protected override Task SendAsync(HttpRequestMessage request, - CancellationToken cancellationToken) - { - Called = true; - return base.SendAsync(request, cancellationToken); - } +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Net; +using System.Net.Http; +using System.Threading; +using System.Threading.Tasks; +using k8s.Exceptions; +using k8s.Models; +using k8s.Tests.Mock; +using Microsoft.AspNetCore.Http; +using Microsoft.Extensions.Logging; +using Newtonsoft.Json; +using Newtonsoft.Json.Converters; +using Xunit; +using Xunit.Abstractions; + +namespace k8s.Tests +{ + public class WatchTests + : TestBase + { + private static readonly string MockAddedEventStreamLine = BuildWatchEventStreamLine(WatchEventType.Added); + private static readonly string MockDeletedStreamLine = BuildWatchEventStreamLine(WatchEventType.Deleted); + private static readonly string MockModifiedStreamLine = BuildWatchEventStreamLine(WatchEventType.Modified); + private static readonly string MockErrorStreamLine = BuildWatchEventStreamLine(WatchEventType.Error); + private static readonly string MockBadStreamLine = "bad json"; + + public WatchTests(ITestOutputHelper testOutput) : base(testOutput) + { } - [Fact] - public void TestWatchWithHandlers() - { - using (var server = new MockKubeApiServer(async httpContext => - { - await WriteStreamLine(httpContext, MockKubeApiServer.MockPodResponse); - await Task.Delay(TimeSpan.FromMilliseconds(100)); - - await WriteStreamLine(httpContext, MockAddedEventStreamLine); + private static string BuildWatchEventStreamLine(WatchEventType eventType) + { + var corev1PodList = JsonConvert.DeserializeObject(MockKubeApiServer.MockPodResponse); + return JsonConvert.SerializeObject(new Watcher.WatchEvent + { + Type = eventType, + Object = corev1PodList.Items.First() + }, new StringEnumConverter()); + } + + private static async Task WriteStreamLine(HttpContext httpContext, string reponseLine) + { + const string crlf = "\r\n"; + await httpContext.Response.WriteAsync(reponseLine.Replace(crlf, "")); + await httpContext.Response.WriteAsync(crlf); + await httpContext.Response.Body.FlushAsync(); + } + + [Fact] + public void CannotWatch() + { + using (var server = new MockKubeApiServer(testOutput: TestOutput)) + { + var client = new Kubernetes(new KubernetesClientConfiguration + { + Host = server.Uri.ToString() + }); + + // did not pass watch param + { + var listTask = client.ListNamespacedPodWithHttpMessagesAsync("default").Result; + Assert.ThrowsAny(() => + { + listTask.Watch((type, item) => { }); + }); + } + + // server did not response line by line + { + Assert.ThrowsAny(() => + { + var listTask = client.ListNamespacedPodWithHttpMessagesAsync("default", watch: true).Result; + + // this line did not throw + // listTask.Watch((type, item) => { }); + }); + } + } + } + + [Fact] + public void SuriveBadLine() + { + using (CountdownEvent eventsReceived = new CountdownEvent(4 /* first line of response is eaten by WatcherDelegatingHandler */)) + using (var server = new MockKubeApiServer(TestOutput, async httpContext => + { + httpContext.Response.StatusCode = (int) HttpStatusCode.OK; + httpContext.Response.ContentLength = null; + + await WriteStreamLine(httpContext, MockKubeApiServer.MockPodResponse); await Task.Delay(TimeSpan.FromMilliseconds(100)); - - // make server alive, cannot set to int.max as of it would block response - await Task.Delay(TimeSpan.FromDays(1)); - return false; - })) - { - var handler1 = new DummyHandler(); + + await WriteStreamLine(httpContext, MockBadStreamLine); + await Task.Delay(TimeSpan.FromMilliseconds(100)); + + await WriteStreamLine(httpContext, MockAddedEventStreamLine); + await Task.Delay(TimeSpan.FromMilliseconds(100)); + + await WriteStreamLine(httpContext, MockBadStreamLine); + await Task.Delay(TimeSpan.FromMilliseconds(100)); + + await WriteStreamLine(httpContext, MockModifiedStreamLine); + await Task.Delay(TimeSpan.FromMilliseconds(100)); + + // make server alive, cannot set to int.max as of it would block response + await Task.Delay(TimeSpan.FromDays(1)); + return false; + })) + { + var client = new Kubernetes(new KubernetesClientConfiguration + { + Host = server.Uri.ToString() + }); + + var listTask = client.ListNamespacedPodWithHttpMessagesAsync("default", watch: true).Result; + + + var events = new HashSet(); + var errors = 0; + + var watcher = listTask.Watch( + (type, item) => + { + Log.LogInformation("Watcher received '{EventType}' event.", type); + + events.Add(type); + eventsReceived.Signal(); + }, + error => + { + Log.LogInformation("Watcher received '{ErrorType}' error.", error.GetType().FullName); + + errors += 1; + eventsReceived.Signal(); + } + ); + + // wait server yields all events + Assert.True( + eventsReceived.Wait(TimeSpan.FromMilliseconds(3000)), + "Timed out waiting for all events / errors to be received." + ); + + Assert.Contains(WatchEventType.Added, events); + Assert.Contains(WatchEventType.Modified, events); + + Assert.Equal(2, errors); + + Assert.True(watcher.Watching); + + // prevent from server down exception trigger + Thread.Sleep(TimeSpan.FromMilliseconds(1000)); + } + } + + [Fact] + public void DisposeWatch() + { + using (var server = new MockKubeApiServer(TestOutput, async httpContext => + { + await WriteStreamLine(httpContext, MockKubeApiServer.MockPodResponse); + await Task.Delay(TimeSpan.FromMilliseconds(100)); + + for (;;) + { + await WriteStreamLine(httpContext, MockAddedEventStreamLine); + await Task.Delay(TimeSpan.FromMilliseconds(100)); + + } + })) + { + var client = new Kubernetes(new KubernetesClientConfiguration + { + Host = server.Uri.ToString() + }); + + var listTask = client.ListNamespacedPodWithHttpMessagesAsync("default", watch: true).Result; + + + var events = new HashSet(); + + var watcher = listTask.Watch( + (type, item) => { events.Add(type); } + ); + + // wait at least an event + Thread.Sleep(TimeSpan.FromMilliseconds(1000)); + + Assert.NotEmpty(events); + Assert.True(watcher.Watching); + + watcher.Dispose(); + + events.Clear(); + + // make sure wait event called + Thread.Sleep(TimeSpan.FromMilliseconds(1000)); + Assert.Empty(events); + Assert.False(watcher.Watching); + + } + } + + [Fact] + public void WatchAllEvents() + { + using (CountdownEvent eventsReceived = new CountdownEvent(4 /* first line of response is eaten by WatcherDelegatingHandler */)) + using (var server = new MockKubeApiServer(TestOutput, async httpContext => + { + await WriteStreamLine(httpContext, MockKubeApiServer.MockPodResponse); + await Task.Delay(TimeSpan.FromMilliseconds(100)); + + await WriteStreamLine(httpContext, MockAddedEventStreamLine); + await Task.Delay(TimeSpan.FromMilliseconds(100)); + + await WriteStreamLine(httpContext, MockDeletedStreamLine); + await Task.Delay(TimeSpan.FromMilliseconds(100)); + + await WriteStreamLine(httpContext, MockModifiedStreamLine); + await Task.Delay(TimeSpan.FromMilliseconds(100)); + + await WriteStreamLine(httpContext, MockErrorStreamLine); + await Task.Delay(TimeSpan.FromMilliseconds(100)); + + // make server alive, cannot set to int.max as of it would block response + await Task.Delay(TimeSpan.FromDays(1)); + return false; + })) + { + var client = new Kubernetes(new KubernetesClientConfiguration + { + Host = server.Uri.ToString() + }); + + var listTask = client.ListNamespacedPodWithHttpMessagesAsync("default", watch: true).Result; + + + var events = new HashSet(); + var errors = 0; + + var watcher = listTask.Watch( + (type, item) => + { + Log.LogInformation("Watcher received '{EventType}' event.", type); + + events.Add(type); + eventsReceived.Signal(); + }, + error => + { + Log.LogInformation("Watcher received '{ErrorType}' error.", error.GetType().FullName); + + errors += 1; + eventsReceived.Signal(); + } + ); + + // wait server yields all events + Assert.True( + eventsReceived.Wait(TimeSpan.FromMilliseconds(3000)), + "Timed out waiting for all events / errors to be received." + ); + + Assert.Contains(WatchEventType.Added, events); + Assert.Contains(WatchEventType.Deleted, events); + Assert.Contains(WatchEventType.Modified, events); + Assert.Contains(WatchEventType.Error, events); + + + Assert.Equal(0, errors); + + Assert.True(watcher.Watching); + } + } + + [Fact] + public void WatchServerDisconnect() + { + Watcher watcher; + Exception exceptionCatched = null; + + using (var server = new MockKubeApiServer(TestOutput, async httpContext => + { + await WriteStreamLine(httpContext, MockKubeApiServer.MockPodResponse); + + // make sure watch success + await Task.Delay(TimeSpan.FromMilliseconds(200)); + + throw new IOException("server down"); + })) + { + var client = new Kubernetes(new KubernetesClientConfiguration + { + Host = server.Uri.ToString() + }); + + var listTask = client.ListNamespacedPodWithHttpMessagesAsync("default", watch: true).Result; + + watcher = listTask.Watch( + (type, item) => { }, + e => { exceptionCatched = e; }); + } + + // wait server down + Thread.Sleep(TimeSpan.FromMilliseconds(1000)); + + Assert.False(watcher.Watching); + Assert.IsType(exceptionCatched); + } + + private class DummyHandler : DelegatingHandler + { + internal bool Called { get; private set; } + + protected override Task SendAsync(HttpRequestMessage request, + CancellationToken cancellationToken) + { + Called = true; + return base.SendAsync(request, cancellationToken); + } + } + + [Fact] + public void TestWatchWithHandlers() + { + using (var server = new MockKubeApiServer(TestOutput, async httpContext => + { + await WriteStreamLine(httpContext, MockKubeApiServer.MockPodResponse); + await Task.Delay(TimeSpan.FromMilliseconds(100)); + + await WriteStreamLine(httpContext, MockAddedEventStreamLine); + await Task.Delay(TimeSpan.FromMilliseconds(100)); + + // make server alive, cannot set to int.max as of it would block response + await Task.Delay(TimeSpan.FromDays(1)); + return false; + })) + { + var handler1 = new DummyHandler(); var handler2 = new DummyHandler(); - - var client = new Kubernetes(new KubernetesClientConfiguration - { - Host = server.Uri.ToString() - }, handler1, handler2); - - Assert.False(handler1.Called); + + var client = new Kubernetes(new KubernetesClientConfiguration + { + Host = server.Uri.ToString() + }, handler1, handler2); + + Assert.False(handler1.Called); Assert.False(handler2.Called); - var listTask = client.ListNamespacedPodWithHttpMessagesAsync("default", watch: true).Result; - - var events = new HashSet(); - - var watcher = listTask.Watch( - (type, item) => { events.Add(type); } - ); - - // wait server yields all events - Thread.Sleep(TimeSpan.FromMilliseconds(500)); - - Assert.Contains(WatchEventType.Added, events); + var listTask = client.ListNamespacedPodWithHttpMessagesAsync("default", watch: true).Result; - Assert.True(handler1.Called); + var events = new HashSet(); + + var watcher = listTask.Watch( + (type, item) => { events.Add(type); } + ); + + // wait server yields all events + Thread.Sleep(TimeSpan.FromMilliseconds(500)); + + Assert.Contains(WatchEventType.Added, events); + + Assert.True(handler1.Called); Assert.True(handler2.Called); - } + } } - } + } } diff --git a/tests/WebSocketTestBase.cs b/tests/WebSocketTestBase.cs new file mode 100644 index 0000000..3980c3a --- /dev/null +++ b/tests/WebSocketTestBase.cs @@ -0,0 +1,332 @@ +using k8s.Tests.Logging; +using k8s.Tests.Mock.Server; +using Microsoft.AspNetCore; +using Microsoft.AspNetCore.Hosting; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Microsoft.Rest; +using System; +using System.IO; +using System.Net.WebSockets; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Xunit; +using Xunit.Abstractions; + +namespace k8s.Tests +{ + /// + /// The base class for Kubernetes WebSocket test suites. + /// + public abstract class WebSocketTestBase + : TestBase + { + /// + /// The next server port to use. + /// + static int NextPort = 13255; + + /// + /// Create a new . + /// + /// + /// Output for the current test. + /// + protected WebSocketTestBase(ITestOutputHelper testOutput) + : base(testOutput) + { + int port = Interlocked.Increment(ref NextPort); + + // Useful to diagnose test timeouts. + TestCancellation.Register( + () => Log.LogInformation("Test-level cancellation token has been canceled.") + ); + + ServerBaseAddress = new Uri($"http://localhost:{port}"); + WebSocketBaseAddress = new Uri($"ws://localhost:{port}"); + + Host = WebHost.CreateDefaultBuilder() + .UseStartup() + .ConfigureServices(ConfigureTestServerServices) + .ConfigureLogging(ConfigureTestServerLogging) + .UseUrls(ServerBaseAddress.AbsoluteUri) + .Build(); + + Disposal.Add(CancellationSource); + Disposal.Add(Host); + } + + /// + /// The test server's base address (http://). + /// + protected Uri ServerBaseAddress { get; } + + /// + /// The test server's base WebSockets address (ws://). + /// + protected Uri WebSocketBaseAddress { get; } + + /// + /// The test server's web host. + /// + protected IWebHost Host { get; } + + /// + /// Test adapter for accepting web sockets. + /// + protected WebSocketTestAdapter WebSocketTestAdapter { get; } = new WebSocketTestAdapter(); + + /// + /// The source for cancellation tokens used by the test. + /// + protected CancellationTokenSource CancellationSource { get; } = new CancellationTokenSource(); + + /// + /// A that can be used to cancel asynchronous operations. + /// + /// + protected CancellationToken TestCancellation => CancellationSource.Token; + + /// + /// Configure services for the test server. + /// + /// + /// The service collection to configure. + /// + protected virtual void ConfigureTestServerServices(IServiceCollection services) + { + if (services == null) + throw new ArgumentNullException(nameof(services)); + + // Inject WebSocketTestData. + services.AddSingleton(WebSocketTestAdapter); + } + + /// + /// Configure logging for the test server. + /// + /// + /// The logger factory to configure. + /// + protected virtual void ConfigureTestServerLogging(ILoggingBuilder logging) + { + if (logging == null) + throw new ArgumentNullException(nameof(logging)); + + logging.ClearProviders(); // Don't log to console. + logging.AddTestOutput(TestOutput, MinLogLevel); + } + + /// + /// Create a Kubernetes client that uses the test server. + /// + /// + /// Optional to use for authentication (defaults to anonymous, i.e. no credentials). + /// + /// + /// The configured client. + /// + protected virtual Kubernetes CreateTestClient(ServiceClientCredentials credentials = null) + { + return new Kubernetes(credentials ?? AnonymousClientCredentials.Instance) + { + BaseUri = ServerBaseAddress + }; + } + + /// + /// Asynchronously disconnect client and server WebSockets using the standard handshake. + /// + /// + /// The client-side . + /// + /// + /// The server-side . + /// + /// + /// An optional value indicating the reason for disconnection. + /// + /// Defaults to . + /// + /// + /// An optional textual description of the reason for disconnection. + /// + /// Defaults to "Normal Closure". + /// + /// + /// A representing the asynchronous operation. + /// + protected async Task Disconnect(WebSocket clientSocket, WebSocket serverSocket, WebSocketCloseStatus closeStatus = WebSocketCloseStatus.NormalClosure, string closeStatusDescription = "Normal Closure") + { + if (clientSocket == null) + throw new ArgumentNullException(nameof(clientSocket)); + + if (serverSocket == null) + throw new ArgumentNullException(nameof(serverSocket)); + + Log.LogInformation("Disconnecting..."); + + // Asynchronously perform the server's half of the handshake (the call to clientSocket.CloseAsync will block until it receives the server-side response). + ArraySegment receiveBuffer = new byte[1024]; + Task closeServerSocket = serverSocket.ReceiveAsync(receiveBuffer, TestCancellation) + .ContinueWith(async received => + { + if (received.IsFaulted) + Log.LogError(new EventId(0), received.Exception.Flatten().InnerExceptions[0], "Server socket operation to receive Close message failed."); + else if (received.IsCanceled) + Log.LogWarning("Server socket operation to receive Close message was canceled."); + else + { + Log.LogInformation("Received {MessageType} message from server socket (expecting {ExpectedMessageType}).", + received.Result.MessageType, + WebSocketMessageType.Close + ); + + if (received.Result.MessageType == WebSocketMessageType.Close) + { + Log.LogInformation("Closing server socket (with status {CloseStatus})...", received.Result.CloseStatus); + + await serverSocket.CloseAsync( + received.Result.CloseStatus.Value, + received.Result.CloseStatusDescription, + TestCancellation + ); + + Log.LogInformation("Server socket closed."); + } + + Assert.Equal(WebSocketMessageType.Close, received.Result.MessageType); + } + }); + + Log.LogInformation("Closing client socket..."); + + await clientSocket.CloseAsync(closeStatus, closeStatusDescription, TestCancellation).ConfigureAwait(false); + + Log.LogInformation("Client socket closed."); + + await closeServerSocket.ConfigureAwait(false); + + Log.LogInformation("Disconnected."); + + Assert.Equal(closeStatus, clientSocket.CloseStatus); + Assert.Equal(clientSocket.CloseStatus, serverSocket.CloseStatus); + + Assert.Equal(closeStatusDescription, clientSocket.CloseStatusDescription); + Assert.Equal(clientSocket.CloseStatusDescription, serverSocket.CloseStatusDescription); + } + + /// + /// Send text to a multiplexed substream over the specified WebSocket. + /// + /// + /// The target . + /// + /// + /// The 0-based index of the target substream. + /// + /// + /// The text to send. + /// + /// + /// The number of bytes sent to the WebSocket. + /// + protected async Task SendMultiplexed(WebSocket webSocket, byte streamIndex, string text) + { + if (webSocket == null) + throw new ArgumentNullException(nameof(webSocket)); + + if (text == null) + throw new ArgumentNullException(nameof(text)); + + byte[] payload = Encoding.ASCII.GetBytes(text); + byte[] sendBuffer = new byte[payload.Length + 1]; + + sendBuffer[0] = streamIndex; + Array.Copy(payload, 0, sendBuffer, 1, payload.Length); + + await webSocket.SendAsync(sendBuffer, WebSocketMessageType.Binary, + endOfMessage: true, + cancellationToken: TestCancellation + ); + + return sendBuffer.Length; + } + + /// + /// Receive text from a multiplexed substream over the specified WebSocket. + /// + /// + /// The target . + /// + /// + /// The text to send. + /// + /// + /// A tuple containing the received text, 0-based substream index, and total bytes received. + /// + protected async Task<(string text, byte streamIndex, int totalBytes)> ReceiveTextMultiplexed(WebSocket webSocket) + { + if (webSocket == null) + throw new ArgumentNullException(nameof(webSocket)); + + byte[] receivedData; + using (MemoryStream buffer = new MemoryStream()) + { + byte[] receiveBuffer = new byte[1024]; + WebSocketReceiveResult receiveResult = await webSocket.ReceiveAsync(receiveBuffer, TestCancellation); + if (receiveResult.MessageType != WebSocketMessageType.Binary) + throw new IOException($"Received unexpected WebSocket message of type '{receiveResult.MessageType}'."); + + buffer.Write(receiveBuffer, 0, receiveResult.Count); + + while (!receiveResult.EndOfMessage) + { + receiveResult = await webSocket.ReceiveAsync(receiveBuffer, TestCancellation); + buffer.Write(receiveBuffer, 0, receiveResult.Count); + } + + buffer.Flush(); + + receivedData = buffer.ToArray(); + } + + return ( + text: Encoding.ASCII.GetString(receivedData, 1, receivedData.Length - 1), + streamIndex: receivedData[0], + totalBytes: receivedData.Length + ); + } + + /// + /// A implementation representing no credentials (i.e. anonymous). + /// + protected class AnonymousClientCredentials + : ServiceClientCredentials + { + /// + /// The singleton instance of . + /// + public static readonly AnonymousClientCredentials Instance = new AnonymousClientCredentials(); + + /// + /// Create new . + /// + AnonymousClientCredentials() + { + } + } + + /// + /// Event Id constants used in WebSocket tests. + /// + protected static class EventIds + { + /// + /// An error occurred while closing the server-side socket. + /// + static readonly EventId ErrorClosingServerSocket = new EventId(1000, nameof(ErrorClosingServerSocket)); + } + } +} diff --git a/tests/tests.csproj b/tests/tests.csproj index 7726e93..a64dfd5 100755 --- a/tests/tests.csproj +++ b/tests/tests.csproj @@ -1,19 +1,35 @@ - netcoreapp2.0 false k8s.tests + netcoreapp2.0;netcoreapp2.1 + - - - - - + + + + + + + + + + + + + + + + + + + +