Custom validation of server certificate for WebSockets (#103)

* Improve SSL customisation for WebSockets

kubernetes-client/csharp#102

* First test for exec-in-pod over WebSockets.

Also, implement basic mock server for testing WebSockets.

kubernetes-client/csharp#102

* Attempt to handle raciness of Watcher tests.

kubernetes-client/csharp#102

* Attempt to handle raciness of ByteBuffer test.

kubernetes-client/csharp#102
This commit is contained in:
Adam Friedman
2018-03-20 16:03:28 +11:00
committed by Brendan Burns
parent 5b1a831a1f
commit c0a42ad884
28 changed files with 2405 additions and 420 deletions

View File

@@ -1,10 +1,19 @@
language: csharp
sudo: false
sudo: false
matrix:
include:
- dotnet: 2.0.0
mono: none
- mono: none
dist: trusty
# We need the .NET Core 2.1 (preview 1) SDK to build. Travis doesn't know how to install this yet.
before_install:
- echo 'Installing .NET Core...'
- export DOTNET_SKIP_FIRST_TIME_EXPERIENCE=1
- export DOTNET_CLI_TELEMETRY_OPTOUT=1
- curl https://packages.microsoft.com/keys/microsoft.asc | gpg --dearmor > microsoft.gpg
- sudo mv microsoft.gpg /etc/apt/trusted.gpg.d/microsoft.gpg
- sudo sh -c 'echo "deb [arch=amd64] https://packages.microsoft.com/repos/microsoft-ubuntu-trusty-prod trusty main" > /etc/apt/sources.list.d/dotnetdev.list'
- sudo apt-get -qq update
- sudo apt-get install -y dotnet-sdk-2.1.300-preview1-008174
script:
- ./ci.sh

9
ci.sh
View File

@@ -4,9 +4,12 @@
set -e
# Ensure no compile errors in all projects
find . -name *.csproj -exec dotnet build {} \;
dotnet restore
dotnet build --no-restore
# Execute Unit tests
cd tests
dotnet restore
dotnet test
dotnet test --no-restore --no-build
if [[ $? != 0 ]]; then
exit 1
fi

View File

@@ -1,4 +1,3 @@
Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15
VisualStudioVersion = 15.0.26430.16
@@ -25,4 +24,7 @@ Global
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {049A763A-C891-4E8D-80CF-89DD3E22ADC7}
EndGlobalSection
EndGlobal

566
src/CoreFX.cs Normal file
View File

@@ -0,0 +1,566 @@
/*
* This (temporary) code has been adapted from Microsoft's .NET Core 2.0.4 codebase. Original code copyright (c) .NET Foundation and Contributors.
* Hopefully, once .NET Core 2.1 lands, we can drop it in favour of the built-in ManagedWebSocket and SocketHttpHandler classes (providing they support custom validation of server certificates).
*
* Original code: https://github.com/dotnet/corefx/blob/v2.0.4/src/System.Net.WebSockets.Client/src/System/Net/WebSockets/WebSocketHandle.Managed.cs#L74
* License: https://github.com/dotnet/corefx/blob/v2.0.4/LICENSE.TXT
*
*/
#if NETCOREAPP2_1
using k8s;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Net;
using System.Net.Http.Headers;
using System.Net.Security;
using System.Net.Sockets;
using System.Net.WebSockets;
using System.Runtime.ExceptionServices;
using System.Security.Cryptography;
using System.Security.Cryptography.X509Certificates;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace CoreFX
{
/// <summary>
/// Connection factory for Kubernetes web sockets.
/// </summary>
internal static class K8sWebSocket
{
/// <summary>
/// GUID appended by the server as part of the security key response.
///
/// Defined in the RFC.
/// </summary>
const string WSServerGuid = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
/// <summary>
/// Asynchronously connect to a Kubernetes WebSocket.
/// </summary>
/// <param name="uri">
/// The target URI.
/// </param>
/// <param name="options">
/// <see cref="KubernetesWebSocketOptions"/> that control the WebSocket's configuration and connection process.
/// </param>
/// <param name="cancellationToken">
/// An optional <see cref="CancellationToken"/> that can be used to cancel the operation.
/// </param>
/// <returns>
/// A <see cref="WebSocket"/> representing the connection.
/// </returns>
public static async Task<WebSocket> ConnectAsync(Uri uri, KubernetesWebSocketOptions options, CancellationToken cancellationToken = default(CancellationToken))
{
try
{
// Connect to the remote server
Socket connectedSocket = await ConnectSocketAsync(uri.Host, uri.Port, cancellationToken).ConfigureAwait(false);
Stream stream = new NetworkStream(connectedSocket, ownsSocket: true);
// Upgrade to SSL if needed
if (uri.Scheme == "wss")
{
X509Certificate2Collection clientCertificates = new X509Certificate2Collection();
foreach (X509Certificate2 clientCertificate in options.ClientCertificates)
clientCertificates.Add(clientCertificate);
var sslStream = new SslStream(
innerStream: stream,
leaveInnerStreamOpen: false,
userCertificateValidationCallback: options.ServerCertificateCustomValidationCallback
);
await
sslStream.AuthenticateAsClientAsync(
uri.Host,
clientCertificates,
options.EnabledSslProtocols,
checkCertificateRevocation: false
)
.ConfigureAwait(false);
stream = sslStream;
}
// Create the security key and expected response, then build all of the request headers
(string secKey, string webSocketAccept) = CreateSecKeyAndSecWebSocketAccept();
byte[] requestHeader = BuildRequestHeader(uri, options, secKey);
// Write out the header to the connection
await stream.WriteAsync(requestHeader, 0, requestHeader.Length, cancellationToken).ConfigureAwait(false);
// Parse the response and store our state for the remainder of the connection
string subprotocol = await ParseAndValidateConnectResponseAsync(stream, options, webSocketAccept, cancellationToken).ConfigureAwait(false);
return WebSocket.CreateClientWebSocket(
stream,
subprotocol,
options.ReceiveBufferSize,
options.SendBufferSize,
options.KeepAliveInterval,
false,
WebSocket.CreateClientBuffer(options.ReceiveBufferSize, options.SendBufferSize)
);
}
catch (Exception unexpectedError)
{
throw new WebSocketException("WebSocket connection failure.", unexpectedError);
}
}
/// <summary>Connects a socket to the specified host and port, subject to cancellation and aborting.</summary>
/// <param name="host">The host to which to connect.</param>
/// <param name="port">The port to which to connect on the host.</param>
/// <param name="cancellationToken">The CancellationToken to use to cancel the websocket.</param>
/// <returns>The connected Socket.</returns>
private static async Task<Socket> ConnectSocketAsync(string host, int port, CancellationToken cancellationToken)
{
IPAddress[] addresses = await Dns.GetHostAddressesAsync(host).ConfigureAwait(false);
ExceptionDispatchInfo lastException = null;
foreach (IPAddress address in addresses)
{
var socket = new Socket(address.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
try
{
using (cancellationToken.Register(() => socket.Dispose()))
{
try
{
await socket.ConnectAsync(address, port).ConfigureAwait(false);
}
catch (ObjectDisposedException objectDisposed)
{
// If the socket was disposed because cancellation was requested, translate the exception
// into a new OperationCanceledException. Otherwise, let the original ObjectDisposedexception propagate.
if (cancellationToken.IsCancellationRequested)
{
throw new OperationCanceledException(new OperationCanceledException().Message, objectDisposed, cancellationToken);
}
}
}
cancellationToken.ThrowIfCancellationRequested(); // in case of a race and socket was disposed after the await
return socket;
}
catch (Exception exc)
{
socket.Dispose();
lastException = ExceptionDispatchInfo.Capture(exc);
}
}
lastException?.Throw();
Debug.Fail("We should never get here. We should have already returned or an exception should have been thrown.");
throw new WebSocketException("WebSocket connection failure.");
}
/// <summary>Creates a byte[] containing the headers to send to the server.</summary>
/// <param name="uri">The Uri of the server.</param>
/// <param name="options">The options used to configure the websocket.</param>
/// <param name="secKey">The generated security key to send in the Sec-WebSocket-Key header.</param>
/// <returns>The byte[] containing the encoded headers ready to send to the network.</returns>
private static byte[] BuildRequestHeader(Uri uri, KubernetesWebSocketOptions options, string secKey)
{
StringBuilder builder = new StringBuilder()
.Append("GET ")
.Append(uri.PathAndQuery)
.Append(" HTTP/1.1\r\n");
// Add all of the required headers, honoring Host header if set.
string hostHeader;
if (!options.RequestHeaders.TryGetValue(HttpKnownHeaderNames.Host, out hostHeader))
hostHeader = uri.Host;
builder.Append("Host: ");
if (String.IsNullOrEmpty(hostHeader))
{
builder.Append(uri.IdnHost).Append(':').Append(uri.Port).Append("\r\n");
}
else
{
builder.Append(hostHeader).Append("\r\n");
}
builder.Append("Connection: Upgrade\r\n");
builder.Append("Upgrade: websocket\r\n");
builder.Append("Sec-WebSocket-Version: 13\r\n");
builder.Append("Sec-WebSocket-Key: ").Append(secKey).Append("\r\n");
// Add all of the additionally requested headers
foreach (string key in options.RequestHeaders.Keys)
{
if (String.Equals(key, HttpKnownHeaderNames.Host, StringComparison.OrdinalIgnoreCase))
{
// Host header handled above
continue;
}
builder.Append(key).Append(": ").Append(options.RequestHeaders[key]).Append("\r\n");
}
// Add the optional subprotocols header
if (options.RequestedSubProtocols.Count > 0)
{
builder.Append(HttpKnownHeaderNames.SecWebSocketProtocol).Append(": ");
builder.Append(options.RequestedSubProtocols[0]);
for (int i = 1; i < options.RequestedSubProtocols.Count; i++)
{
builder.Append(", ").Append(options.RequestedSubProtocols[i]);
}
builder.Append("\r\n");
}
// End the headers
builder.Append("\r\n");
// Return the bytes for the built up header
return Encoding.ASCII.GetBytes(builder.ToString());
}
/// <summary>Read and validate the connect response headers from the server.</summary>
/// <param name="stream">The stream from which to read the response headers.</param>
/// <param name="options">The options used to configure the websocket.</param>
/// <param name="expectedSecWebSocketAccept">The expected value of the Sec-WebSocket-Accept header.</param>
/// <param name="cancellationToken">The CancellationToken to use to cancel the websocket.</param>
/// <returns>The agreed upon subprotocol with the server, or null if there was none.</returns>
static async Task<string> ParseAndValidateConnectResponseAsync(Stream stream, KubernetesWebSocketOptions options, string expectedSecWebSocketAccept, CancellationToken cancellationToken)
{
// Read the first line of the response
string statusLine = await ReadResponseHeaderLineAsync(stream, cancellationToken).ConfigureAwait(false);
// Depending on the underlying sockets implementation and timing, connecting to a server that then
// immediately closes the connection may either result in an exception getting thrown from the connect
// earlier, or it may result in getting to here but reading 0 bytes. If we read 0 bytes and thus have
// an empty status line, treat it as a connect failure.
if (String.IsNullOrEmpty(statusLine))
{
throw new WebSocketException("Connection failure.");
}
const string ExpectedStatusStart = "HTTP/1.1 ";
const string ExpectedStatusStatWithCode = "HTTP/1.1 101"; // 101 == SwitchingProtocols
// If the status line doesn't begin with "HTTP/1.1" or isn't long enough to contain a status code, fail.
if (!statusLine.StartsWith(ExpectedStatusStart, StringComparison.Ordinal) || statusLine.Length < ExpectedStatusStatWithCode.Length)
{
throw new WebSocketException(WebSocketError.HeaderError);
}
// If the status line doesn't contain a status code 101, or if it's long enough to have a status description
// but doesn't contain whitespace after the 101, fail.
if (!statusLine.StartsWith(ExpectedStatusStatWithCode, StringComparison.Ordinal) ||
(statusLine.Length > ExpectedStatusStatWithCode.Length && !char.IsWhiteSpace(statusLine[ExpectedStatusStatWithCode.Length])))
{
throw new WebSocketException(WebSocketError.HeaderError, $"Connection failure (status line = '{statusLine}').");
}
// Read each response header. Be liberal in parsing the response header, treating
// everything to the left of the colon as the key and everything to the right as the value, trimming both.
// For each header, validate that we got the expected value.
bool foundUpgrade = false, foundConnection = false, foundSecWebSocketAccept = false;
string subprotocol = null;
string line;
while (!String.IsNullOrEmpty(line = await ReadResponseHeaderLineAsync(stream, cancellationToken).ConfigureAwait(false)))
{
int colonIndex = line.IndexOf(':');
if (colonIndex == -1)
{
throw new WebSocketException(WebSocketError.HeaderError);
}
string headerName = line.SubstringTrim(0, colonIndex);
string headerValue = line.SubstringTrim(colonIndex + 1);
// The Connection, Upgrade, and SecWebSocketAccept headers are required and with specific values.
ValidateAndTrackHeader(HttpKnownHeaderNames.Connection, "Upgrade", headerName, headerValue, ref foundConnection);
ValidateAndTrackHeader(HttpKnownHeaderNames.Upgrade, "websocket", headerName, headerValue, ref foundUpgrade);
ValidateAndTrackHeader(HttpKnownHeaderNames.SecWebSocketAccept, expectedSecWebSocketAccept, headerName, headerValue, ref foundSecWebSocketAccept);
// The SecWebSocketProtocol header is optional. We should only get it with a non-empty value if we requested subprotocols,
// and then it must only be one of the ones we requested. If we got a subprotocol other than one we requested (or if we
// already got one in a previous header), fail. Otherwise, track which one we got.
if (String.Equals(HttpKnownHeaderNames.SecWebSocketProtocol, headerName, StringComparison.OrdinalIgnoreCase) &&
!String.IsNullOrWhiteSpace(headerValue))
{
if (options.RequestedSubProtocols.Count > 0)
{
string newSubprotocol = options.RequestedSubProtocols.Find(requested => String.Equals(requested, headerValue, StringComparison.OrdinalIgnoreCase));
if (newSubprotocol == null || subprotocol != null)
{
throw new WebSocketException(
String.Format("Unsupported sub-protocol '{0}' (expected one of [{1}]).",
newSubprotocol,
String.Join(", ", options.RequestedSubProtocols)
)
);
}
subprotocol = newSubprotocol;
}
}
}
if (!foundUpgrade || !foundConnection || !foundSecWebSocketAccept)
{
throw new WebSocketException("Connection failure.");
}
return subprotocol;
}
/// <summary>Validates a received header against expected values and tracks that we've received it.</summary>
/// <param name="targetHeaderName">The header name against which we're comparing.</param>
/// <param name="targetHeaderValue">The header value against which we're comparing.</param>
/// <param name="foundHeaderName">The actual header name received.</param>
/// <param name="foundHeaderValue">The actual header value received.</param>
/// <param name="foundHeader">A bool tracking whether this header has been seen.</param>
private static void ValidateAndTrackHeader(
string targetHeaderName, string targetHeaderValue,
string foundHeaderName, string foundHeaderValue,
ref bool foundHeader)
{
bool isTargetHeader = String.Equals(targetHeaderName, foundHeaderName, StringComparison.OrdinalIgnoreCase);
if (!foundHeader)
{
if (isTargetHeader)
{
if (!String.Equals(targetHeaderValue, foundHeaderValue, StringComparison.OrdinalIgnoreCase))
{
throw new WebSocketException(
$"Invalid value for '{foundHeaderName}' header: '{foundHeaderValue}' (expected '{targetHeaderValue}')."
);
}
foundHeader = true;
}
}
else
{
if (isTargetHeader)
{
throw new WebSocketException("Connection failure.");
}
}
}
/// <summary>Reads a line from the stream.</summary>
/// <param name="stream">The stream from which to read.</param>
/// <param name="cancellationToken">The CancellationToken used to cancel the websocket.</param>
/// <returns>The read line, or null if none could be read.</returns>
private static async Task<string> ReadResponseHeaderLineAsync(Stream stream, CancellationToken cancellationToken)
{
StringBuilder sb = new StringBuilder();
var arr = new byte[1];
char prevChar = '\0';
try
{
// TODO: Reading one byte is extremely inefficient. The problem, however,
// is that if we read multiple bytes, we could end up reading bytes post-headers
// that are part of messages meant to be read by the managed websocket after
// the connection. The likely solution here is to wrap the stream in a BufferedStream,
// though a) that comes at the expense of an extra set of virtual calls, b)
// it adds a buffer when the managed websocket will already be using a buffer, and
// c) it's not exposed on the version of the System.IO contract we're currently using.
while (await stream.ReadAsync(arr, 0, 1, cancellationToken).ConfigureAwait(false) == 1)
{
// Process the next char
char curChar = (char)arr[0];
if (prevChar == '\r' && curChar == '\n')
{
break;
}
sb.Append(curChar);
prevChar = curChar;
}
if (sb.Length > 0 && sb[sb.Length - 1] == '\r')
{
sb.Length = sb.Length - 1;
}
return sb.ToString();
}
finally
{
sb.Clear();
}
}
/// <summary>
/// Create a security key for sending in the Sec-WebSocket-Key header and the associated response we expect to receive as the Sec-WebSocket-Accept header value.
/// </summary>
/// <returns>A key-value pair of the request header security key and expected response header value.</returns>
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Security", "CA5350", Justification = "Required by RFC6455")]
static (string secKey, string expectedResponse) CreateSecKeyAndSecWebSocketAccept()
{
string secKey = Convert.ToBase64String(Guid.NewGuid().ToByteArray());
using (SHA1 sha = SHA1.Create())
{
return (
secKey,
Convert.ToBase64String(
sha.ComputeHash(Encoding.ASCII.GetBytes(secKey + WSServerGuid))
)
);
}
}
static void ValidateHeader(HttpHeaders headers, string name, string expectedValue)
{
if (!headers.TryGetValues(name, out IEnumerable<string> values))
ThrowConnectFailure();
Debug.Assert(values is string[]);
string[] array = (string[])values;
if (array.Length != 1 || !String.Equals(array[0], expectedValue, StringComparison.OrdinalIgnoreCase))
{
throw new WebSocketException(
$"Invalid WebSocker response header '{name}': [{String.Join(", ", array)}]"
);
}
}
static void ThrowConnectFailure() => throw new WebSocketException("Connection failure.");
}
/// <summary>
/// Well-known HTTP header names from CoreFX used by <see cref="K8sWebSocket"/>.
/// </summary>
static class HttpKnownHeaderNames
{
public const string Accept = "Accept";
public const string AcceptCharset = "Accept-Charset";
public const string AcceptEncoding = "Accept-Encoding";
public const string AcceptLanguage = "Accept-Language";
public const string AcceptPatch = "Accept-Patch";
public const string AcceptRanges = "Accept-Ranges";
public const string AccessControlAllowCredentials = "Access-Control-Allow-Credentials";
public const string AccessControlAllowHeaders = "Access-Control-Allow-Headers";
public const string AccessControlAllowMethods = "Access-Control-Allow-Methods";
public const string AccessControlAllowOrigin = "Access-Control-Allow-Origin";
public const string AccessControlExposeHeaders = "Access-Control-Expose-Headers";
public const string AccessControlMaxAge = "Access-Control-Max-Age";
public const string Age = "Age";
public const string Allow = "Allow";
public const string AltSvc = "Alt-Svc";
public const string Authorization = "Authorization";
public const string CacheControl = "Cache-Control";
public const string Connection = "Connection";
public const string ContentDisposition = "Content-Disposition";
public const string ContentEncoding = "Content-Encoding";
public const string ContentLanguage = "Content-Language";
public const string ContentLength = "Content-Length";
public const string ContentLocation = "Content-Location";
public const string ContentMD5 = "Content-MD5";
public const string ContentRange = "Content-Range";
public const string ContentSecurityPolicy = "Content-Security-Policy";
public const string ContentType = "Content-Type";
public const string Cookie = "Cookie";
public const string Cookie2 = "Cookie2";
public const string Date = "Date";
public const string ETag = "ETag";
public const string Expect = "Expect";
public const string Expires = "Expires";
public const string From = "From";
public const string Host = "Host";
public const string IfMatch = "If-Match";
public const string IfModifiedSince = "If-Modified-Since";
public const string IfNoneMatch = "If-None-Match";
public const string IfRange = "If-Range";
public const string IfUnmodifiedSince = "If-Unmodified-Since";
public const string KeepAlive = "Keep-Alive";
public const string LastModified = "Last-Modified";
public const string Link = "Link";
public const string Location = "Location";
public const string MaxForwards = "Max-Forwards";
public const string Origin = "Origin";
public const string P3P = "P3P";
public const string Pragma = "Pragma";
public const string ProxyAuthenticate = "Proxy-Authenticate";
public const string ProxyAuthorization = "Proxy-Authorization";
public const string ProxyConnection = "Proxy-Connection";
public const string PublicKeyPins = "Public-Key-Pins";
public const string Range = "Range";
public const string Referer = "Referer"; // NB: The spelling-mistake "Referer" for "Referrer" must be matched.
public const string RetryAfter = "Retry-After";
public const string SecWebSocketAccept = "Sec-WebSocket-Accept";
public const string SecWebSocketExtensions = "Sec-WebSocket-Extensions";
public const string SecWebSocketKey = "Sec-WebSocket-Key";
public const string SecWebSocketProtocol = "Sec-WebSocket-Protocol";
public const string SecWebSocketVersion = "Sec-WebSocket-Version";
public const string Server = "Server";
public const string SetCookie = "Set-Cookie";
public const string SetCookie2 = "Set-Cookie2";
public const string StrictTransportSecurity = "Strict-Transport-Security";
public const string TE = "TE";
public const string TSV = "TSV";
public const string Trailer = "Trailer";
public const string TransferEncoding = "Transfer-Encoding";
public const string Upgrade = "Upgrade";
public const string UpgradeInsecureRequests = "Upgrade-Insecure-Requests";
public const string UserAgent = "User-Agent";
public const string Vary = "Vary";
public const string Via = "Via";
public const string WWWAuthenticate = "WWW-Authenticate";
public const string Warning = "Warning";
public const string XAspNetVersion = "X-AspNet-Version";
public const string XContentDuration = "X-Content-Duration";
public const string XContentTypeOptions = "X-Content-Type-Options";
public const string XFrameOptions = "X-Frame-Options";
public const string XMSEdgeRef = "X-MSEdge-Ref";
public const string XPoweredBy = "X-Powered-By";
public const string XRequestID = "X-Request-ID";
public const string XUACompatible = "X-UA-Compatible";
}
/// <summary>
/// Extension methods for <see cref="String"/>s from the CoreFX codebase (used by <see cref="K8sWebSocket"/>).
/// </summary>
static class CoreFXStringExtensions
{
public static string SubstringTrim(this string value, int startIndex)
{
return SubstringTrim(value, startIndex, value.Length - startIndex);
}
public static string SubstringTrim(this string value, int startIndex, int length)
{
Debug.Assert(value != null, "string must be non-null");
Debug.Assert(startIndex >= 0, "startIndex must be non-negative");
Debug.Assert(length >= 0, "length must be non-negative");
Debug.Assert(startIndex <= value.Length - length, "startIndex + length must be <= value.Length");
if (length == 0)
{
return String.Empty;
}
int endIndex = startIndex + length - 1;
while (startIndex <= endIndex && char.IsWhiteSpace(value[startIndex]))
{
startIndex++;
}
while (endIndex >= startIndex && char.IsWhiteSpace(value[endIndex]))
{
endIndex--;
}
int newLength = endIndex - startIndex + 1;
Debug.Assert(newLength >= 0 && newLength <= value.Length, "Expected resulting length to be within value's length");
return
newLength == 0 ? String.Empty :
newLength == value.Length ? value :
value.Substring(startIndex, newLength);
}
}
}
#endif // NETCOREAPP2_1

46
src/K8sProtocol.cs Normal file
View File

@@ -0,0 +1,46 @@
namespace k8s
{
/// <summary>
/// Well-known WebSocket sub-protocols used by the Kubernetes API.
/// </summary>
public static class K8sProtocol
{
/// <summary>
/// Version 1 of the Kubernetes channel WebSocket protocol.
/// </summary>
/// <remarks>
/// This protocol prepends each binary message with a byte indicating the channel number (zero indexed) that the message was sent on.
/// Messages in both directions should prefix their messages with this channel byte.
///
/// When used for remote execution, the channel numbers are by convention defined to match the POSIX file-descriptors assigned to STDIN, STDOUT, and STDERR (0, 1, and 2).
/// No other conversion is performed on the raw subprotocol - writes are sent as they are received by the server.
///
/// Example client session:
///
/// CONNECT http://server.com with subprotocol "channel.k8s.io"
/// WRITE []byte{0, 102, 111, 111, 10} # send "foo\n" on channel 0 (STDIN)
/// READ []byte{1, 10} # receive "\n" on channel 1 (STDOUT)
/// CLOSE
/// </remarks>
public static readonly string ChannelV1 = "channel.k8s.io";
/// <summary>
/// Version 1 of the Kubernetes Base64-encoded channel WebSocket protocol.
/// </summary>
/// <remarks>
/// This protocol base64 encodes each message with a character representing the channel number (zero indexed) the message was sent on (if the channel number is 1, then the character is '1', i.e. a byte value of 49).
/// Messages in both directions should prefix their messages with this character.
///
/// When used for remote execution, the channel numbers are by convention defined to match the POSIX file-descriptors assigned to STDIN, STDOUT, and STDERR ('0', '1', and '2').
/// The data received on the server is base64 decoded (and must be be valid) and data written by the server to the client is base64 encoded.
///
/// Example client session:
///
/// CONNECT http://server.com with subprotocol "base64.channel.k8s.io"
/// WRITE []byte{48, 90, 109, 57, 118, 67, 103, 111, 61} # send "foo\n" (base64: "Zm9vCgo=") on channel '0' (STDIN)
/// READ []byte{49, 67, 103, 61, 61} # receive "\n" (base64: "Cg==") on channel '1' (STDOUT)
/// CLOSE
/// </remarks>
public static readonly string ChannelBase64V1 = "base64.channel.k8s.io";
}
}

View File

@@ -2,8 +2,10 @@ using Microsoft.AspNetCore.WebUtilities;
using Microsoft.Rest;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Net.WebSockets;
using System.Security.Cryptography.X509Certificates;
using System.Threading;
using System.Threading.Tasks;
@@ -204,11 +206,11 @@ namespace k8s
}
}
// Set Credentials
// Set Credentials
#if NET452
foreach (var cert in ((WebRequestHandler)this.HttpClientHandler).ClientCertificates)
foreach (var cert in ((WebRequestHandler)this.HttpClientHandler).ClientCertificates.OfType<X509Certificate2>())
#else
foreach (var cert in this.HttpClientHandler.ClientCertificates)
foreach (var cert in this.HttpClientHandler.ClientCertificates.OfType<X509Certificate2>())
#endif
{
webSocketBuilder.AddClientCertificate(cert);
@@ -222,6 +224,15 @@ namespace k8s
webSocketBuilder.SetRequestHeader(_header.Key, string.Join(" ", _header.Value));
}
#if NETCOREAPP2_1
if (this.CaCert != null)
webSocketBuilder.ExpectServerCertificate(this.CaCert);
else
webSocketBuilder.SkipServerCertificateValidation();
webSocketBuilder.Options.RequestedSubProtocols.Add(K8sProtocol.ChannelV1);
#endif // NETCOREAPP2_1
// Send Request
cancellationToken.ThrowIfCancellationRequested();

View File

@@ -9,8 +9,8 @@
<PackageProjectUrl>https://github.com/kubernetes-client/csharp</PackageProjectUrl>
<PackageTags>kubernetes;docker;containers;</PackageTags>
<TargetFrameworks>netstandard1.4;net452</TargetFrameworks>
<TargetFrameworks Condition="'$(OS)' != 'Windows_NT'">netstandard1.4</TargetFrameworks>
<TargetFrameworks>netstandard1.4;net452;netcoreapp2.1</TargetFrameworks>
<TargetFrameworks Condition="'$(OS)' != 'Windows_NT'">netstandard1.4;netcoreapp2.1</TargetFrameworks>
<RootNamespace>k8s</RootNamespace>
</PropertyGroup>

View File

@@ -0,0 +1,143 @@
#if NETCOREAPP2_1
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Net.Security;
using System.Net.WebSockets;
using System.Security.Authentication;
using System.Security.Cryptography.X509Certificates;
using System.Threading;
using System.Threading.Tasks;
namespace k8s
{
/// <summary>
/// The <see cref="WebSocketBuilder"/> creates a new <see cref="WebSocket"/> object which connects to a remote WebSocket.
/// </summary>
public sealed class WebSocketBuilder
{
public KubernetesWebSocketOptions Options { get; } = new KubernetesWebSocketOptions();
public WebSocketBuilder()
{
}
public WebSocketBuilder SetRequestHeader(string headerName, string headerValue)
{
Options.RequestHeaders[headerName] = headerValue;
return this;
}
public WebSocketBuilder AddClientCertificate(X509Certificate2 certificate)
{
Options.ClientCertificates.Add(certificate);
return this;
}
public WebSocketBuilder ExpectServerCertificate(X509Certificate2 serverCertificate)
{
Options.ServerCertificateCustomValidationCallback = (sender, certificate, chain, sslPolicyErrors) =>
{
if (sslPolicyErrors != SslPolicyErrors.RemoteCertificateChainErrors)
return false;
try
{
using (X509Chain certificateChain = new X509Chain())
{
certificateChain.ChainPolicy.ExtraStore.Add(serverCertificate);
certificateChain.ChainPolicy.VerificationFlags = X509VerificationFlags.AllowUnknownCertificateAuthority;
certificateChain.ChainPolicy.RevocationMode = X509RevocationMode.NoCheck;
return certificateChain.Build(
(X509Certificate2)certificate
);
}
}
catch (Exception chainException)
{
Debug.WriteLine(chainException);
return false;
}
};
return this;
}
public WebSocketBuilder SkipServerCertificateValidation()
{
Options.ServerCertificateCustomValidationCallback = (sender, certificate, chain, sslPolicyErrors) => true;
return this;
}
public async Task<WebSocket> BuildAndConnectAsync(Uri uri, CancellationToken cancellationToken)
{
return await CoreFX.K8sWebSocket.ConnectAsync(uri, Options, cancellationToken).ConfigureAwait(false);
}
}
/// <summary>
/// Options for connecting to Kubernetes web sockets.
/// </summary>
public class KubernetesWebSocketOptions
{
/// <summary>
/// The default size (in bytes) for WebSocket send / receive buffers.
/// </summary>
public static readonly int DefaultBufferSize = 2048;
/// <summary>
/// Create new <see cref="KubernetesWebSocketOptions"/>.
/// </summary>
public KubernetesWebSocketOptions()
{
}
/// <summary>
/// The requested size (in bytes) of the WebSocket send buffer.
/// </summary>
public int SendBufferSize { get; set; } = 2048;
/// <summary>
/// The requested size (in bytes) of the WebSocket receive buffer.
/// </summary>
public int ReceiveBufferSize { get; set; } = 2048;
/// <summary>
/// Custom request headers (if any).
/// </summary>
public Dictionary<string, string> RequestHeaders { get; } = new Dictionary<string, string>(StringComparer.OrdinalIgnoreCase);
/// <summary>
/// Requested sub-protocols (if any).
/// </summary>
public List<string> RequestedSubProtocols { get; } = new List<string>();
/// <summary>
/// Client certificates (if any) to use for authentication.
/// </summary>
public List<X509Certificate2> ClientCertificates = new List<X509Certificate2>();
/// <summary>
/// An optional delegate to use for authenticating the remote server certificate.
/// </summary>
public RemoteCertificateValidationCallback ServerCertificateCustomValidationCallback { get; set; }
/// <summary>
/// An <see cref="SslProtocols"/> value representing the SSL protocols that the client supports.
/// </summary>
public SslProtocols EnabledSslProtocols { get; set; } = SslProtocols.Tls;
/// <summary>
/// The WebSocket keep-alive interval.
/// </summary>
public TimeSpan KeepAliveInterval { get; set; } = TimeSpan.FromSeconds(5);
}
}
#endif // NETCOREAPP2_1

View File

@@ -1,3 +1,5 @@
#if !NETCOREAPP2_1
using System;
using System.Net.WebSockets;
using System.Security.Cryptography.X509Certificates;
@@ -19,7 +21,6 @@ namespace k8s
public WebSocketBuilder()
{
this.WebSocket = new ClientWebSocket();
}
public virtual WebSocketBuilder SetRequestHeader(string headerName, string headerValue)
@@ -28,7 +29,7 @@ namespace k8s
return this;
}
public virtual WebSocketBuilder AddClientCertificate(X509Certificate certificate)
public virtual WebSocketBuilder AddClientCertificate(X509Certificate2 certificate)
{
this.WebSocket.Options.ClientCertificates.Add(certificate);
return this;
@@ -41,3 +42,5 @@ namespace k8s
}
}
}
#endif // !NETCOREAPP2_1

View File

@@ -12,11 +12,17 @@ using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Server.Kestrel.Https;
using Microsoft.Rest;
using Xunit;
using Xunit.Abstractions;
namespace k8s.Tests
{
public class AuthTests
{
public class AuthTests
: TestBase
{
public AuthTests(ITestOutputHelper testOutput) : base(testOutput)
{
}
private static HttpOperationResponse<V1PodList> ExecuteListPods(IKubernetes client)
{
return client.ListNamespacedPodWithHttpMessagesAsync("default").Result;
@@ -25,7 +31,7 @@ namespace k8s.Tests
[Fact]
public void Anonymous()
{
using (var server = new MockKubeApiServer())
using (var server = new MockKubeApiServer(TestOutput))
{
var client = new Kubernetes(new KubernetesClientConfiguration
{
@@ -38,7 +44,7 @@ namespace k8s.Tests
Assert.Equal(1, listTask.Body.Items.Count);
}
using (var server = new MockKubeApiServer(cxt =>
using (var server = new MockKubeApiServer(TestOutput, cxt =>
{
cxt.Response.StatusCode = (int) HttpStatusCode.Unauthorized;
return Task.FromResult(false);
@@ -61,7 +67,7 @@ namespace k8s.Tests
const string testName = "test_name";
const string testPassword = "test_password";
using (var server = new MockKubeApiServer(cxt =>
using (var server = new MockKubeApiServer(TestOutput, cxt =>
{
var header = cxt.Request.Headers["Authorization"].FirstOrDefault();
@@ -167,7 +173,7 @@ namespace k8s.Tests
var clientCertificateValidationCalled = false;
using (var server = new MockKubeApiServer(listenConfigure: options =>
using (var server = new MockKubeApiServer(TestOutput, listenConfigure: options =>
{
options.UseHttps(new HttpsConnectionAdapterOptions
{
@@ -249,7 +255,7 @@ namespace k8s.Tests
{
const string token = "testingtoken";
using (var server = new MockKubeApiServer(cxt =>
using (var server = new MockKubeApiServer(TestOutput, cxt =>
{
var header = cxt.Request.Headers["Authorization"].FirstOrDefault();

View File

@@ -3,7 +3,7 @@ using System.Threading;
using System.Threading.Tasks;
using Xunit;
namespace k8s.tests
namespace k8s.Tests
{
/// <summary>
/// Tests the <see cref="ByteBuffer"/> class.
@@ -242,7 +242,7 @@ namespace k8s.tests
/// sure the call blocks until data is available.
/// </summary>
[Fact]
public void ReadBlocksUntilDataAvailableTest()
public async Task ReadBlocksUntilDataAvailableTest()
{
// Makes sure that the Read method does not return until data is available.
var buffer = new ByteBuffer();
@@ -251,15 +251,16 @@ namespace k8s.tests
// Kick off a read operation
var readTask = Task.Run(() => read = buffer.Read(readData, 0, readData.Length));
Thread.Sleep(250);
Assert.False(readTask.IsCompleted);
await Task.Delay(250);
Assert.False(readTask.IsCompleted, "Read task completed before data was available.");
// Write data to the buffer
buffer.Write(this.writeData, 0, 0x03);
Thread.Sleep(250);
Assert.True(readTask.IsCompleted);
await TaskAssert.Completed(readTask,
timeout: TimeSpan.FromMilliseconds(1000),
message: "Timed out waiting for read task to complete."
);
Assert.Equal(3, read);
Assert.Equal(0xF0, readData[0]);

View File

@@ -0,0 +1,92 @@
/*
* These tests are for the netcoreapp2.1 version of the client (there are separate tests for netstandard that don't actually connect to a server).
*/
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Net.WebSockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Rest;
using Xunit;
using Xunit.Abstractions;
namespace k8s.Tests
{
/// <summary>
/// Tests for <see cref="KubeApiClient"/>'s exec-in-pod functionality.
/// </summary>
public class PodExecTests
: WebSocketTestBase
{
/// <summary>
/// Create a new <see cref="KubeApiClient"/> exec-in-pod test suite.
/// </summary>
/// <param name="testOutput">
/// Output for the current test.
/// </param>
public PodExecTests(ITestOutputHelper testOutput)
: base(testOutput)
{
}
/// <summary>
/// Verify that the client can request execution of a command in a pod's default container, with only the STDOUT stream enabled.
/// </summary>
[Fact(DisplayName = "Can exec in pod's default container, STDOUT only")]
public async Task Exec_DefaultContainer_StdOut()
{
if (!Debugger.IsAttached)
{
CancellationSource.CancelAfter(
TimeSpan.FromSeconds(5)
);
}
await Host.StartAsync(TestCancellation);
using (Kubernetes client = CreateTestClient())
{
Log.LogInformation("Invoking exec operation...");
WebSocket clientSocket = await client.WebSocketNamespacedPodExecAsync(
name: "mypod",
@namespace: "mynamespace",
command: "/bin/bash",
container: "mycontainer",
stderr: false,
stdin: false,
stdout: true,
cancellationToken: TestCancellation
);
Assert.Equal(K8sProtocol.ChannelV1, clientSocket.SubProtocol); // For WebSockets, the Kubernetes API defaults to the binary channel (v1) protocol.
Log.LogInformation("Client socket connected (socket state is {ClientSocketState}). Waiting for server-side socket to become available...", clientSocket.State);
WebSocket serverSocket = await WebSocketTestAdapter.AcceptedPodExecV1Connection;
Log.LogInformation("Server-side socket is now available (socket state is {ServerSocketState}). Sending data to server socket...", serverSocket.State);
const int STDOUT = 1;
const string expectedOutput = "This is text send to STDOUT.";
int bytesSent = await SendMultiplexed(serverSocket, STDOUT, expectedOutput);
Log.LogInformation("Sent {ByteCount} bytes to server socket; receiving from client socket...", bytesSent);
(string receivedText, byte streamIndex, int bytesReceived) = await ReceiveTextMultiplexed(clientSocket);
Log.LogInformation("Received {ByteCount} bytes from client socket ('{ReceivedText}', stream {StreamIndex}).", bytesReceived, receivedText, streamIndex);
Assert.Equal(STDOUT, streamIndex);
Assert.Equal(expectedOutput, receivedText);
await Disconnect(clientSocket, serverSocket,
closeStatus: WebSocketCloseStatus.NormalClosure,
closeStatusDescription: "Normal Closure"
);
WebSocketTestAdapter.CompleteTest();
}
}
}
}

View File

@@ -1,3 +1,7 @@
/*
* These tests are only for the netstandard version of the client (there are separate tests for netcoreapp that connect to a local test-hosted server).
*/
using k8s.tests.Mock;
using Microsoft.Rest;
using System;

View File

@@ -0,0 +1,113 @@
using Microsoft.Extensions.Logging;
using System;
using System.Reactive.Disposables;
using Xunit.Abstractions;
namespace k8s.Tests.Logging
{
/// <summary>
/// An implementation of <see cref="ILogger"/> that writes to the output of the current Xunit test.
/// </summary>
sealed class TestOutputLogger
: ILogger
{
/// <summary>
/// Create a new <see cref="TestOutputLogger"/>.
/// </summary>
/// <param name="testOutput">
/// The output for the current test.
/// </param>
/// <param name="loggerCategory">
/// The logger's category name.
/// </param>
/// <param name="minLogLevel">
/// The logger's minimum log level.
/// </param>
public TestOutputLogger(ITestOutputHelper testOutput, string loggerCategory, LogLevel minLogLevel)
{
if (testOutput == null)
throw new ArgumentNullException(nameof(testOutput));
if (String.IsNullOrWhiteSpace(loggerCategory))
throw new ArgumentException("Argument cannot be null, empty, or entirely composed of whitespace: 'loggerCategory'.", nameof(loggerCategory));
TestOutput = testOutput;
LoggerCategory = loggerCategory;
MinLogLevel = minLogLevel;
}
/// <summary>
/// The output for the current test.
/// </summary>
public ITestOutputHelper TestOutput { get; }
/// <summary>
/// The logger's category name.
/// </summary>
public string LoggerCategory { get; }
/// <summary>
/// The logger's minimum log level.
/// </summary>
public LogLevel MinLogLevel { get; }
/// <summary>
/// Emit a log entry.
/// </summary>
/// <param name="level">
/// The log entry's level.
/// </param>
/// <param name="eventId">
/// The log entry's associated event Id.
/// </param>
/// <param name="state">
/// The log entry to be written. Can be also an object.
/// </param>
/// <param name="exception">
/// The exception (if any) related to the log entry.
/// </param>
/// <param name="formatter">
/// A function that creates a <c>string</c> log message from the <paramref name="state"/> and <paramref name="exception"/>.
/// </param>
public void Log<TState>(LogLevel level, EventId eventId, TState state, Exception exception, Func<TState, Exception, string> formatter)
{
if (formatter == null)
throw new ArgumentNullException(nameof(formatter));
TestOutput.WriteLine(String.Format("[{0}] {1}: {2}",
level,
LoggerCategory,
formatter(state, exception)
));
if (exception != null)
{
TestOutput.WriteLine(
exception.ToString()
);
}
}
/// <summary>
/// Check if the given <paramref name="logLevel"/> is enabled.
/// </summary>
/// <param name="logLevel">
/// The level to be checked.
/// </param>
/// <returns>
/// <c>true</c> if enabled; otherwise, <c>false</c>.
/// </returns>
public bool IsEnabled(LogLevel logLevel) => logLevel >= MinLogLevel;
/// <summary>
/// Begin a logical operation scope.
/// </summary>
/// <param name="state">
/// An identifier for the scope.
/// </param>
/// <returns>
/// An <see cref="IDisposable"/> that ends the logical operation scope when disposed.
/// </returns>
public IDisposable BeginScope<TState>(TState state) => Disposable.Empty;
}
}

View File

@@ -0,0 +1,59 @@
using Microsoft.Extensions.Logging;
using System;
using Xunit.Abstractions;
namespace k8s.Tests.Logging
{
/// <summary>
/// Logger provider for logging to Xunit test output.
/// </summary>
sealed class TestOutputLoggerProvider
: ILoggerProvider
{
/// <summary>
/// Create a new <see cref="TestOutputLoggerProvider"/>.
/// </summary>
/// <param name="testOutput">
/// The output for the current test.
/// </param>
/// <param name="minLogLevel">
/// The logger's minimum log level.
/// </param>
public TestOutputLoggerProvider(ITestOutputHelper testOutput, LogLevel minLogLevel)
{
if (testOutput == null)
throw new ArgumentNullException(nameof(testOutput));
TestOutput = testOutput;
MinLogLevel = minLogLevel;
}
/// <summary>
/// Dispose of resources being used by the logger provider.
/// </summary>
public void Dispose()
{
}
/// <summary>
/// The output for the current test.
/// </summary>
ITestOutputHelper TestOutput { get; }
/// <summary>
/// The logger's minimum log level.
/// </summary>
public LogLevel MinLogLevel { get; }
/// <summary>
/// Create a new logger.
/// </summary>
/// <param name="categoryName">
/// The logger category name.
/// </param>
/// <returns>
/// The logger, as an <see cref="ILogger"/>.
/// </returns>
public ILogger CreateLogger(string categoryName) => new TestOutputLogger(TestOutput, categoryName, MinLogLevel);
}
}

View File

@@ -0,0 +1,67 @@
using Microsoft.Extensions.Logging;
using System;
using Xunit.Abstractions;
namespace k8s.Tests.Logging
{
/// <summary>
/// Extension methods for logging to Xunit text output.
/// </summary>
public static class TestOutputLoggingExtensions
{
/// <summary>
/// Log to test output.
/// </summary>
/// <param name="logging">
/// The global logging configuration.
/// </param>
/// <param name="testOutput">
/// Output for the current test.
/// </param>
/// <param name="minLogLevel">
/// The minimum level to log at.
/// </param>
public static void AddTestOutput(this ILoggingBuilder logging, ITestOutputHelper testOutput, LogLevel minLogLevel = LogLevel.Information)
{
if (logging == null)
throw new ArgumentNullException(nameof(logging));
if (testOutput == null)
throw new ArgumentNullException(nameof(testOutput));
logging.AddProvider(
new TestOutputLoggerProvider(testOutput, minLogLevel)
);
}
/// <summary>
/// Log to test output.
/// </summary>
/// <param name="loggers">
/// The logger factory.
/// </param>
/// <param name="testOutput">
/// Output for the current test.
/// </param>
/// <param name="minLogLevel">
/// The minimum level to log at.
/// </param>
/// <returns>
/// The logger factory (enables inline use / method-chaining).
/// </returns>
public static ILoggerFactory AddTestOutput(this ILoggerFactory loggers, ITestOutputHelper testOutput, LogLevel minLogLevel = LogLevel.Information)
{
if (loggers == null)
throw new ArgumentNullException(nameof(loggers));
if (testOutput == null)
throw new ArgumentNullException(nameof(testOutput));
loggers.AddProvider(
new TestOutputLoggerProvider(testOutput, minLogLevel)
);
return loggers;
}
}
}

View File

@@ -1,52 +1,62 @@
using System;
using System.Linq;
using System.Net;
using System.Threading.Tasks;
using Microsoft.AspNetCore;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Hosting.Server.Features;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Server.Kestrel.Core;
namespace k8s.Tests.Mock
{
public class MockKubeApiServer : IDisposable
{
// paste from minikube /api/v1/namespaces/default/pods
public const string MockPodResponse =
"{\r\n \"kind\": \"PodList\",\r\n \"apiVersion\": \"v1\",\r\n \"metadata\": {\r\n \"selfLink\": \"/api/v1/namespaces/default/pods\",\r\n \"resourceVersion\": \"1762810\"\r\n },\r\n \"items\": [\r\n {\r\n \"metadata\": {\r\n \"name\": \"nginx-1493591563-xb2v4\",\r\n \"generateName\": \"nginx-1493591563-\",\r\n \"namespace\": \"default\",\r\n \"selfLink\": \"/api/v1/namespaces/default/pods/nginx-1493591563-xb2v4\",\r\n \"uid\": \"ac1abb94-9c58-11e7-aaf5-00155d744505\",\r\n \"resourceVersion\": \"1737928\",\r\n \"creationTimestamp\": \"2017-09-18T10:03:51Z\",\r\n \"labels\": {\r\n \"app\": \"nginx\",\r\n \"pod-template-hash\": \"1493591563\"\r\n },\r\n \"annotations\": {\r\n \"kubernetes.io/created-by\": \"{\\\"kind\\\":\\\"SerializedReference\\\",\\\"apiVersion\\\":\\\"v1\\\",\\\"reference\\\":{\\\"kind\\\":\\\"ReplicaSet\\\",\\\"namespace\\\":\\\"default\\\",\\\"name\\\":\\\"nginx-1493591563\\\",\\\"uid\\\":\\\"ac013b63-9c58-11e7-aaf5-00155d744505\\\",\\\"apiVersion\\\":\\\"extensions\\\",\\\"resourceVersion\\\":\\\"5306\\\"}}\\n\"\r\n },\r\n \"ownerReferences\": [\r\n {\r\n \"apiVersion\": \"extensions/v1beta1\",\r\n \"kind\": \"ReplicaSet\",\r\n \"name\": \"nginx-1493591563\",\r\n \"uid\": \"ac013b63-9c58-11e7-aaf5-00155d744505\",\r\n \"controller\": true,\r\n \"blockOwnerDeletion\": true\r\n }\r\n ]\r\n },\r\n \"spec\": {\r\n \"volumes\": [\r\n {\r\n \"name\": \"default-token-3zzcj\",\r\n \"secret\": {\r\n \"secretName\": \"default-token-3zzcj\",\r\n \"defaultMode\": 420\r\n }\r\n }\r\n ],\r\n \"containers\": [\r\n {\r\n \"name\": \"nginx\",\r\n \"image\": \"nginx\",\r\n \"resources\": {},\r\n \"volumeMounts\": [\r\n {\r\n \"name\": \"default-token-3zzcj\",\r\n \"readOnly\": true,\r\n \"mountPath\": \"/var/run/secrets/kubernetes.io/serviceaccount\"\r\n }\r\n ],\r\n \"terminationMessagePath\": \"/dev/termination-log\",\r\n \"terminationMessagePolicy\": \"File\",\r\n \"imagePullPolicy\": \"Always\"\r\n }\r\n ],\r\n \"restartPolicy\": \"Always\",\r\n \"terminationGracePeriodSeconds\": 30,\r\n \"dnsPolicy\": \"ClusterFirst\",\r\n \"serviceAccountName\": \"default\",\r\n \"serviceAccount\": \"default\",\r\n \"nodeName\": \"ubuntu\",\r\n \"securityContext\": {},\r\n \"schedulerName\": \"default-scheduler\"\r\n },\r\n \"status\": {\r\n \"phase\": \"Running\",\r\n \"conditions\": [\r\n {\r\n \"type\": \"Initialized\",\r\n \"status\": \"True\",\r\n \"lastProbeTime\": null,\r\n \"lastTransitionTime\": \"2017-09-18T10:03:51Z\"\r\n },\r\n {\r\n \"type\": \"Ready\",\r\n \"status\": \"True\",\r\n \"lastProbeTime\": null,\r\n \"lastTransitionTime\": \"2017-10-12T07:09:21Z\"\r\n },\r\n {\r\n \"type\": \"PodScheduled\",\r\n \"status\": \"True\",\r\n \"lastProbeTime\": null,\r\n \"lastTransitionTime\": \"2017-09-18T10:03:51Z\"\r\n }\r\n ],\r\n \"hostIP\": \"192.168.188.42\",\r\n \"podIP\": \"172.17.0.5\",\r\n \"startTime\": \"2017-09-18T10:03:51Z\",\r\n \"containerStatuses\": [\r\n {\r\n \"name\": \"nginx\",\r\n \"state\": {\r\n \"running\": {\r\n \"startedAt\": \"2017-10-12T07:09:20Z\"\r\n }\r\n },\r\n \"lastState\": {\r\n \"terminated\": {\r\n \"exitCode\": 0,\r\n \"reason\": \"Completed\",\r\n \"startedAt\": \"2017-10-10T21:35:51Z\",\r\n \"finishedAt\": \"2017-10-12T07:07:37Z\",\r\n \"containerID\": \"docker://94df3f3965807421ad6dc76618e00b76cb15d024919c4946f3eb46a92659c62a\"\r\n }\r\n },\r\n \"ready\": true,\r\n \"restartCount\": 7,\r\n \"image\": \"nginx:latest\",\r\n \"imageID\": \"docker-pullable://nginx@sha256:004ac1d5e791e705f12a17c80d7bb1e8f7f01aa7dca7deee6e65a03465392072\",\r\n \"containerID\": \"docker://fa11bdd48c9b7d3a6c4c3f9b6d7319743c3455ab8d00c57d59c083b319b88194\"\r\n }\r\n ],\r\n \"qosClass\": \"BestEffort\"\r\n }\r\n }\r\n ]\r\n}"
;
private readonly IWebHost _webHost;
public MockKubeApiServer(Func<HttpContext, Task<bool>> shouldNext = null, Action<ListenOptions> listenConfigure = null,
string resp = MockPodResponse)
{
shouldNext = shouldNext ?? (_ => Task.FromResult(true));
listenConfigure = listenConfigure ?? (_ => { });
_webHost = WebHost.CreateDefaultBuilder()
.Configure(app => app.Run(async httpContext =>
{
if (await shouldNext(httpContext))
{
await httpContext.Response.WriteAsync(resp);
}
}))
.UseKestrel(options => { options.Listen(IPAddress.Loopback, 0, listenConfigure); })
.Build();
_webHost.Start();
}
public Uri Uri => _webHost.ServerFeatures.Get<IServerAddressesFeature>().Addresses
.Select(a => new Uri(a)).First();
public void Dispose()
{
_webHost.StopAsync();
_webHost.WaitForShutdown();
}
}
}
using System;
using System.Linq;
using System.Net;
using System.Threading.Tasks;
using k8s.Tests.Logging;
using Microsoft.AspNetCore;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Hosting.Server.Features;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Server.Kestrel.Core;
using Microsoft.Extensions.Logging;
using Xunit.Abstractions;
namespace k8s.Tests.Mock
{
public class MockKubeApiServer : IDisposable
{
// paste from minikube /api/v1/namespaces/default/pods
public const string MockPodResponse =
"{\r\n \"kind\": \"PodList\",\r\n \"apiVersion\": \"v1\",\r\n \"metadata\": {\r\n \"selfLink\": \"/api/v1/namespaces/default/pods\",\r\n \"resourceVersion\": \"1762810\"\r\n },\r\n \"items\": [\r\n {\r\n \"metadata\": {\r\n \"name\": \"nginx-1493591563-xb2v4\",\r\n \"generateName\": \"nginx-1493591563-\",\r\n \"namespace\": \"default\",\r\n \"selfLink\": \"/api/v1/namespaces/default/pods/nginx-1493591563-xb2v4\",\r\n \"uid\": \"ac1abb94-9c58-11e7-aaf5-00155d744505\",\r\n \"resourceVersion\": \"1737928\",\r\n \"creationTimestamp\": \"2017-09-18T10:03:51Z\",\r\n \"labels\": {\r\n \"app\": \"nginx\",\r\n \"pod-template-hash\": \"1493591563\"\r\n },\r\n \"annotations\": {\r\n \"kubernetes.io/created-by\": \"{\\\"kind\\\":\\\"SerializedReference\\\",\\\"apiVersion\\\":\\\"v1\\\",\\\"reference\\\":{\\\"kind\\\":\\\"ReplicaSet\\\",\\\"namespace\\\":\\\"default\\\",\\\"name\\\":\\\"nginx-1493591563\\\",\\\"uid\\\":\\\"ac013b63-9c58-11e7-aaf5-00155d744505\\\",\\\"apiVersion\\\":\\\"extensions\\\",\\\"resourceVersion\\\":\\\"5306\\\"}}\\n\"\r\n },\r\n \"ownerReferences\": [\r\n {\r\n \"apiVersion\": \"extensions/v1beta1\",\r\n \"kind\": \"ReplicaSet\",\r\n \"name\": \"nginx-1493591563\",\r\n \"uid\": \"ac013b63-9c58-11e7-aaf5-00155d744505\",\r\n \"controller\": true,\r\n \"blockOwnerDeletion\": true\r\n }\r\n ]\r\n },\r\n \"spec\": {\r\n \"volumes\": [\r\n {\r\n \"name\": \"default-token-3zzcj\",\r\n \"secret\": {\r\n \"secretName\": \"default-token-3zzcj\",\r\n \"defaultMode\": 420\r\n }\r\n }\r\n ],\r\n \"containers\": [\r\n {\r\n \"name\": \"nginx\",\r\n \"image\": \"nginx\",\r\n \"resources\": {},\r\n \"volumeMounts\": [\r\n {\r\n \"name\": \"default-token-3zzcj\",\r\n \"readOnly\": true,\r\n \"mountPath\": \"/var/run/secrets/kubernetes.io/serviceaccount\"\r\n }\r\n ],\r\n \"terminationMessagePath\": \"/dev/termination-log\",\r\n \"terminationMessagePolicy\": \"File\",\r\n \"imagePullPolicy\": \"Always\"\r\n }\r\n ],\r\n \"restartPolicy\": \"Always\",\r\n \"terminationGracePeriodSeconds\": 30,\r\n \"dnsPolicy\": \"ClusterFirst\",\r\n \"serviceAccountName\": \"default\",\r\n \"serviceAccount\": \"default\",\r\n \"nodeName\": \"ubuntu\",\r\n \"securityContext\": {},\r\n \"schedulerName\": \"default-scheduler\"\r\n },\r\n \"status\": {\r\n \"phase\": \"Running\",\r\n \"conditions\": [\r\n {\r\n \"type\": \"Initialized\",\r\n \"status\": \"True\",\r\n \"lastProbeTime\": null,\r\n \"lastTransitionTime\": \"2017-09-18T10:03:51Z\"\r\n },\r\n {\r\n \"type\": \"Ready\",\r\n \"status\": \"True\",\r\n \"lastProbeTime\": null,\r\n \"lastTransitionTime\": \"2017-10-12T07:09:21Z\"\r\n },\r\n {\r\n \"type\": \"PodScheduled\",\r\n \"status\": \"True\",\r\n \"lastProbeTime\": null,\r\n \"lastTransitionTime\": \"2017-09-18T10:03:51Z\"\r\n }\r\n ],\r\n \"hostIP\": \"192.168.188.42\",\r\n \"podIP\": \"172.17.0.5\",\r\n \"startTime\": \"2017-09-18T10:03:51Z\",\r\n \"containerStatuses\": [\r\n {\r\n \"name\": \"nginx\",\r\n \"state\": {\r\n \"running\": {\r\n \"startedAt\": \"2017-10-12T07:09:20Z\"\r\n }\r\n },\r\n \"lastState\": {\r\n \"terminated\": {\r\n \"exitCode\": 0,\r\n \"reason\": \"Completed\",\r\n \"startedAt\": \"2017-10-10T21:35:51Z\",\r\n \"finishedAt\": \"2017-10-12T07:07:37Z\",\r\n \"containerID\": \"docker://94df3f3965807421ad6dc76618e00b76cb15d024919c4946f3eb46a92659c62a\"\r\n }\r\n },\r\n \"ready\": true,\r\n \"restartCount\": 7,\r\n \"image\": \"nginx:latest\",\r\n \"imageID\": \"docker-pullable://nginx@sha256:004ac1d5e791e705f12a17c80d7bb1e8f7f01aa7dca7deee6e65a03465392072\",\r\n \"containerID\": \"docker://fa11bdd48c9b7d3a6c4c3f9b6d7319743c3455ab8d00c57d59c083b319b88194\"\r\n }\r\n ],\r\n \"qosClass\": \"BestEffort\"\r\n }\r\n }\r\n ]\r\n}"
;
private readonly IWebHost _webHost;
public MockKubeApiServer(ITestOutputHelper testOutput, Func<HttpContext, Task<bool>> shouldNext = null, Action<ListenOptions> listenConfigure = null,
string resp = MockPodResponse)
{
shouldNext = shouldNext ?? (_ => Task.FromResult(true));
listenConfigure = listenConfigure ?? (_ => { });
_webHost = WebHost.CreateDefaultBuilder()
.Configure(app => app.Run(async httpContext =>
{
if (await shouldNext(httpContext))
{
await httpContext.Response.WriteAsync(resp);
}
}))
.UseKestrel(options => { options.Listen(IPAddress.Loopback, 0, listenConfigure); })
.ConfigureLogging(logging =>
{
logging.ClearProviders();
if (testOutput != null)
logging.AddTestOutput(testOutput);
})
.Build();
_webHost.Start();
}
public Uri Uri => _webHost.ServerFeatures.Get<IServerAddressesFeature>().Addresses
.Select(a => new Uri(a)).First();
public void Dispose()
{
_webHost.StopAsync();
_webHost.WaitForShutdown();
}
}
}

View File

@@ -1,3 +1,5 @@
#if !NETCOREAPP2_1
using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
@@ -12,13 +14,13 @@ namespace k8s.tests.Mock
{
public Dictionary<string, string> RequestHeaders { get; } = new Dictionary<string, string>();
public Collection<X509Certificate> Certificates { get; } = new Collection<X509Certificate>();
public Collection<X509Certificate2> Certificates { get; } = new Collection<X509Certificate2>();
public Uri Uri { get; private set; }
public WebSocket PublicWebSocket => this.WebSocket;
public override WebSocketBuilder AddClientCertificate(X509Certificate certificate)
public override WebSocketBuilder AddClientCertificate(X509Certificate2 certificate)
{
this.Certificates.Add(certificate);
return this;
@@ -27,7 +29,8 @@ namespace k8s.tests.Mock
public override Task<WebSocket> BuildAndConnectAsync(Uri uri, CancellationToken cancellationToken)
{
this.Uri = uri;
return Task.FromResult<WebSocket>(this.WebSocket);
return Task.FromResult(this.PublicWebSocket);
}
public override WebSocketBuilder SetRequestHeader(string headerName, string headerValue)
@@ -37,3 +40,5 @@ namespace k8s.tests.Mock
}
}
}
#endif // !NETCOREAPP2_1

View File

@@ -0,0 +1,61 @@
using Microsoft.AspNetCore.Mvc;
using System;
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;
namespace k8s.Tests.Mock.Server.Controllers
{
/// <summary>
/// Controller for the mock Kubernetes exec-in-pod API.
/// </summary>
[Route("api/v1")]
public class PodExecController
: Controller
{
/// <summary>
/// Create a new <see cref="PodExecController"/>.
/// </summary>
/// <param name="webSocketTestAdapter">
/// The adapter used to capture sockets accepted by the test server and provide them to the calling test.
/// </param>
public PodExecController(WebSocketTestAdapter webSocketTestAdapter)
{
if (webSocketTestAdapter == null)
throw new ArgumentNullException(nameof(webSocketTestAdapter));
WebSocketTestAdapter = webSocketTestAdapter;
}
/// <summary>
/// The adapter used to capture sockets accepted by the test server and provide them to the calling test.
/// </summary>
WebSocketTestAdapter WebSocketTestAdapter { get; }
/// <summary>
/// Mock Kubernetes API: exec-in-pod.
/// </summary>
/// <param name="kubeNamespace">
/// The target pod's containing namespace.
/// </param>
/// <param name="podName">
/// The target pod's name.
/// </param>
[Route("namespaces/{kubeNamespace}/pods/{podName}/exec")]
public async Task<IActionResult> Exec(string kubeNamespace, string podName)
{
if (!HttpContext.WebSockets.IsWebSocketRequest)
return BadRequest("Exec requires WebSockets");
WebSocket webSocket = await HttpContext.WebSockets.AcceptWebSocketAsync(
subProtocol: K8sProtocol.ChannelV1
);
WebSocketTestAdapter.AcceptedPodExecV1Connection.AcceptServerSocket(webSocket);
await WebSocketTestAdapter.TestCompleted;
return Ok();
}
}
}

View File

@@ -0,0 +1,65 @@
using Microsoft.AspNetCore.Mvc;
using System;
using System.Collections.Generic;
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;
namespace k8s.Tests.Mock.Server.Controllers
{
/// <summary>
/// Controller for the mock Kubernetes pod-port-forward API.
/// </summary>
[Route("api/v1")]
public class PodPortForwardController
: Controller
{
/// <summary>
/// Create a new <see cref="PodPortForwardController"/>.
/// </summary>
/// <param name="webSocketTestAdapter">
/// The adapter used to capture sockets accepted by the test server and provide them to the calling test.
/// </param>
public PodPortForwardController(WebSocketTestAdapter webSocketTestAdapter)
{
if (webSocketTestAdapter == null)
throw new ArgumentNullException(nameof(webSocketTestAdapter));
WebSocketTestAdapter = webSocketTestAdapter;
}
/// <summary>
/// The adapter used to capture sockets accepted by the test server and provide them to the calling test.
/// </summary>
WebSocketTestAdapter WebSocketTestAdapter { get; }
/// <summary>
/// Mock Kubernetes API: port-forward for pod.
/// </summary>
/// <param name="kubeNamespace">
/// The target pod's containing namespace.
/// </param>
/// <param name="podName">
/// The target pod's name.
/// </param>
/// <param name="ports">
/// The port(s) to forward to the pod.
/// </param>
[Route("namespaces/{kubeNamespace}/pods/{podName}/portforward")]
public async Task<IActionResult> Exec(string kubeNamespace, string podName, IEnumerable<string> ports)
{
if (!HttpContext.WebSockets.IsWebSocketRequest)
return BadRequest("PortForward requires WebSockets");
WebSocket webSocket = await HttpContext.WebSockets.AcceptWebSocketAsync(
subProtocol: K8sProtocol.ChannelV1
);
WebSocketTestAdapter.AcceptedPodPortForwardV1Connection.AcceptServerSocket(webSocket);
await WebSocketTestAdapter.TestCompleted;
return Ok();
}
}
}

View File

@@ -0,0 +1,54 @@
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using System;
namespace k8s.Tests.Mock.Server
{
/// <summary>
/// Startup logic for the KubeClient WebSockets test server.
/// </summary>
public class Startup
{
/// <summary>
/// Create a new <see cref="Startup"/>.
/// </summary>
public Startup()
{
}
/// <summary>
/// Configure application services.
/// </summary>
/// <param name="services">
/// The service collection to configure.
/// </param>
public void ConfigureServices(IServiceCollection services)
{
if (services == null)
throw new ArgumentNullException(nameof(services));
services.AddLogging(logging =>
{
logging.ClearProviders(); // Logger provider will be added by the calling test.
});
services.AddMvc();
}
/// <summary>
/// Configure the application pipeline.
/// </summary>
/// <param name="app">
/// The application pipeline builder.
/// </param>
public void Configure(IApplicationBuilder app)
{
app.UseWebSockets(new WebSocketOptions
{
KeepAliveInterval = TimeSpan.FromSeconds(5),
ReceiveBufferSize = 2048
});
app.UseMvc();
}
}
}

View File

@@ -0,0 +1,98 @@
using System;
using System.Net.WebSockets;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;
namespace k8s.Tests.Mock.Server
{
/// <summary>
/// Adapter used to capture WebSockets accepted by the test server and provide them to calling test.
/// </summary>
/// <remarks>
/// Each AcceptedXXXConnection property returns an awaitable object that yields a the server-side WebSocket once a connection has been accepted.
///
/// All server-side WebSockets will be closed when <see cref="CompleteTest"/> is called.
/// </remarks>
public class WebSocketTestAdapter
{
/// <summary>
/// Completion source for the <see cref="TestCompleted"/> task.
/// </summary>
readonly TaskCompletionSource<object> _testCompletion = new TaskCompletionSource<object>();
/// <summary>
/// A <see cref="Task"/> that completes when the test is complete (providing <see cref="CompleteTest"/> is called).
/// </summary>
public Task TestCompleted => _testCompletion.Task;
/// <summary>
/// <c>await</c> server-side acceptance of a WebSocket connection for the exec-in-pod (v1) API.
/// </summary>
public ServerSocketAcceptance AcceptedPodExecV1Connection { get; } = new ServerSocketAcceptance();
/// <summary>
/// <c>await</c> server-side acceptance of a WebSocket connection for the pod-port-forward (v1) API.
/// </summary>
public ServerSocketAcceptance AcceptedPodPortForwardV1Connection { get; } = new ServerSocketAcceptance();
/// <summary>
/// Mark the current test as complete, closing all server-side sockets.
/// </summary>
public void CompleteTest() => _testCompletion.SetResult(true);
/// <summary>
/// An object that enables awaiting server-side acceptance of a WebSocket connection.
/// </summary>
/// <remarks>
/// Simply <c>await</c> this object to wait for the server socket to be accepted.
/// </remarks>
public class ServerSocketAcceptance
{
/// <summary>
/// Completion source for the <see cref="ServerSocketAccepted"/> task.
/// </summary>
readonly TaskCompletionSource<WebSocket> _completion = new TaskCompletionSource<WebSocket>();
/// <summary>
/// A <see cref="Task"/> that completes when the server accepts a WebSocket connection (i.e. when <see cref="AcceptServerSocket"/> or <see cref="RejectServerSocket"/> is called).
/// </summary>
public Task<WebSocket> Task => _completion.Task;
/// <summary>
/// Notify the calling test that the server has accepted a WebSocket connection.
/// </summary>
/// <param name="serverSocket">
/// The server-side <see cref="WebSocket"/>.
/// </param>
public void AcceptServerSocket(WebSocket serverSocket)
{
if (serverSocket == null)
throw new ArgumentNullException(nameof(serverSocket));
_completion.SetResult(serverSocket);
}
/// <summary>
/// Notify the calling test that the server has rejected a WebSocket connection.
/// </summary>
/// <param name="reason">
/// An <see cref="Exception"/> representing the reason that the connection was rejected.
/// </param>
public void RejectServerSocket(Exception reason)
{
if (reason == null)
throw new ArgumentNullException(nameof(reason));
_completion.SetException(reason);
}
/// <summary>
/// Get an awaiter for the socket-acceptance task.
/// </summary>
/// <returns>
/// The <see cref="TaskAwaiter{TResult}"/>.
/// </returns>
public TaskAwaiter<WebSocket> GetAwaiter() => Task.GetAwaiter();
}
}
}

43
tests/TaskAssert.cs Normal file
View File

@@ -0,0 +1,43 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Xunit;
namespace k8s.Tests
{
static class TaskAssert
{
public static void NotCompleted(Task task,string message = "Task should not be completed")
{
Assert.False(task.IsCompleted, message);
}
public static async Task Completed(Task task, TimeSpan timeout, string message = "Task timed out")
{
var timeoutTask = Task.Delay(
TimeSpan.FromMilliseconds(1000)
);
var completedTask = await Task.WhenAny(task, timeoutTask);
Assert.True(ReferenceEquals(task, completedTask), message);
await completedTask;
}
public static async Task<T> Completed<T>(Task<T> task, TimeSpan timeout, string message = "Task timed out")
{
var timeoutTask =
Task.Delay(
TimeSpan.FromMilliseconds(1000)
)
.ContinueWith(
completedTimeoutTask => default(T) // Value is never returned, but we need a task of the same result type in order to use Task.WhenAny.
);
var completedTask = await Task.WhenAny(task, timeoutTask);
Assert.True(ReferenceEquals(task, completedTask), message);
return await completedTask;
}
}
}

131
tests/TestBase.cs Normal file
View File

@@ -0,0 +1,131 @@
using k8s.Tests.Logging;
using System;
using System.Reactive.Disposables;
using System.Reflection;
using System.Threading;
using Microsoft.Extensions.Logging;
using Xunit;
using Xunit.Abstractions;
namespace k8s.Tests
{
/// <summary>
/// The base class for test suites.
/// </summary>
public abstract class TestBase
: IDisposable
{
/// <summary>
/// Create a new test-suite.
/// </summary>
/// <param name="testOutput">
/// Output for the current test.
/// </param>
protected TestBase(ITestOutputHelper testOutput)
{
if (testOutput == null)
throw new ArgumentNullException(nameof(testOutput));
// We *must* have a synchronisation context for the test, or we'll see random deadlocks.
if (SynchronizationContext.Current == null)
{
SynchronizationContext.SetSynchronizationContext(
new SynchronizationContext()
);
}
TestOutput = testOutput;
LoggerFactory = new LoggerFactory().AddTestOutput(TestOutput, MinLogLevel);
Log = LoggerFactory.CreateLogger("CurrentTest");
// Ugly hack to get access to metadata for the current test.
CurrentTest = (ITest)
TestOutput.GetType()
.GetField("test", BindingFlags.NonPublic | BindingFlags.Instance)
.GetValue(TestOutput);
Assert.True(CurrentTest != null, "Cannot retrieve current test from ITestOutputHelper.");
Disposal.Add(
Log.BeginScope("CurrentTest", CurrentTest.DisplayName)
);
}
/// <summary>
/// Finaliser for <see cref="TestBase"/>.
/// </summary>
~TestBase()
{
Dispose(false);
}
/// <summary>
/// Dispose of resources being used by the test suite.
/// </summary>
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
/// <summary>
/// Dispose of resources being used by the test suite.
/// </summary>
/// <param name="disposing">
/// Explicit disposal?
/// </param>
protected virtual void Dispose(bool disposing)
{
if (disposing)
{
try
{
Disposal.Dispose();
}
finally
{
if (LoggerFactory is IDisposable loggerFactoryDisposal)
loggerFactoryDisposal.Dispose();
if (Log is IDisposable logDisposal)
logDisposal.Dispose();
}
}
}
/// <summary>
/// A <see cref="CompositeDisposable"/> representing resources used by the test.
/// </summary>
protected CompositeDisposable Disposal { get; } = new CompositeDisposable();
/// <summary>
/// Output for the current test.
/// </summary>
protected ITestOutputHelper TestOutput { get; }
/// <summary>
/// A <see cref="ITest"/> representing the current test.
/// </summary>
protected ITest CurrentTest { get; }
/// <summary>
/// The logger for the current test.
/// </summary>
protected ILogger Log { get; }
/// <summary>
/// The logger factory for the current test.
/// </summary>
protected ILoggerFactory LoggerFactory { get; }
/// <summary>
/// The logging level for the current test.
/// </summary>
protected virtual LogLevel MinLogLevel => LogLevel.Information;
/// <summary>
/// The test server logging level for the current test.
/// </summary>
protected virtual LogLevel MinServerLogLevel => LogLevel.Warning;
}
}

View File

@@ -2,11 +2,17 @@ using k8s.Models;
using k8s.Tests.Mock;
using Newtonsoft.Json;
using Xunit;
using Xunit.Abstractions;
namespace k8s.Tests
{
public class V1StatusObjectViewTests
{
public class V1StatusObjectViewTests
: TestBase
{
public V1StatusObjectViewTests(ITestOutputHelper testOutput) : base(testOutput)
{
}
[Fact]
public void ReturnStatus()
{
@@ -16,7 +22,7 @@ namespace k8s.Tests
Status = "test status"
};
using (var server = new MockKubeApiServer(resp: JsonConvert.SerializeObject(v1Status)))
using (var server = new MockKubeApiServer(TestOutput, resp: JsonConvert.SerializeObject(v1Status)))
{
var client = new Kubernetes(new KubernetesClientConfiguration
{
@@ -39,30 +45,30 @@ namespace k8s.Tests
Metadata = new V1ObjectMeta()
{
Name = "test name"
},
},
Status = new V1NamespaceStatus()
{
Phase = "test termating"
}
};
};
using (var server = new MockKubeApiServer(resp: JsonConvert.SerializeObject(corev1Namespace)))
using (var server = new MockKubeApiServer(TestOutput, resp: JsonConvert.SerializeObject(corev1Namespace)))
{
var client = new Kubernetes(new KubernetesClientConfiguration
{
Host = server.Uri.ToString()
});
var status = client.DeleteNamespace(new V1DeleteOptions(), "test");
var status = client.DeleteNamespace(new V1DeleteOptions(), "test");
Assert.True(status.HasObject);
var obj = status.ObjectView<V1Namespace>();
Assert.Equal(obj.Metadata.Name, corev1Namespace.Metadata.Name);
Assert.Equal(obj.Status.Phase, corev1Namespace.Status.Phase);
Assert.Equal(obj.Status.Phase, corev1Namespace.Status.Phase);
}
}
}
}
}

View File

@@ -1,332 +1,371 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using k8s.Exceptions;
using k8s.Models;
using k8s.Tests.Mock;
using Microsoft.AspNetCore.Http;
using Newtonsoft.Json;
using Newtonsoft.Json.Converters;
using Xunit;
namespace k8s.Tests
{
public class WatchTests
{
private static readonly string MockAddedEventStreamLine = BuildWatchEventStreamLine(WatchEventType.Added);
private static readonly string MockDeletedStreamLine = BuildWatchEventStreamLine(WatchEventType.Deleted);
private static readonly string MockModifiedStreamLine = BuildWatchEventStreamLine(WatchEventType.Modified);
private static readonly string MockErrorStreamLine = BuildWatchEventStreamLine(WatchEventType.Error);
private static readonly string MockBadStreamLine = "bad json";
private static string BuildWatchEventStreamLine(WatchEventType eventType)
{
var corev1PodList = JsonConvert.DeserializeObject<V1PodList>(MockKubeApiServer.MockPodResponse);
return JsonConvert.SerializeObject(new Watcher<V1Pod>.WatchEvent
{
Type = eventType,
Object = corev1PodList.Items.First()
}, new StringEnumConverter());
}
private static async Task WriteStreamLine(HttpContext httpContext, string reponseLine)
{
const string crlf = "\r\n";
await httpContext.Response.WriteAsync(reponseLine.Replace(crlf, ""));
await httpContext.Response.WriteAsync(crlf);
await httpContext.Response.Body.FlushAsync();
}
[Fact]
public void CannotWatch()
{
using (var server = new MockKubeApiServer())
{
var client = new Kubernetes(new KubernetesClientConfiguration
{
Host = server.Uri.ToString()
});
// did not pass watch param
{
var listTask = client.ListNamespacedPodWithHttpMessagesAsync("default").Result;
Assert.ThrowsAny<KubernetesClientException>(() =>
{
listTask.Watch<V1Pod>((type, item) => { });
});
}
// server did not response line by line
{
Assert.ThrowsAny<Exception>(() =>
{
var listTask = client.ListNamespacedPodWithHttpMessagesAsync("default", watch: true).Result;
// this line did not throw
// listTask.Watch<Corev1Pod>((type, item) => { });
});
}
}
}
[Fact]
public void SuriveBadLine()
{
using (var server = new MockKubeApiServer(async httpContext =>
{
httpContext.Response.StatusCode = (int) HttpStatusCode.OK;
httpContext.Response.ContentLength = null;
await WriteStreamLine(httpContext, MockKubeApiServer.MockPodResponse);
await Task.Delay(TimeSpan.FromMilliseconds(100));
await WriteStreamLine(httpContext, MockBadStreamLine);
await Task.Delay(TimeSpan.FromMilliseconds(100));
await WriteStreamLine(httpContext, MockAddedEventStreamLine);
await Task.Delay(TimeSpan.FromMilliseconds(100));
await WriteStreamLine(httpContext, MockBadStreamLine);
await Task.Delay(TimeSpan.FromMilliseconds(100));
await WriteStreamLine(httpContext, MockModifiedStreamLine);
await Task.Delay(TimeSpan.FromMilliseconds(100));
// make server alive, cannot set to int.max as of it would block response
await Task.Delay(TimeSpan.FromDays(1));
return false;
}))
{
var client = new Kubernetes(new KubernetesClientConfiguration
{
Host = server.Uri.ToString()
});
var listTask = client.ListNamespacedPodWithHttpMessagesAsync("default", watch: true).Result;
var events = new HashSet<WatchEventType>();
var errors = 0;
var watcher = listTask.Watch<V1Pod>(
(type, item) => { events.Add(type); },
e => { errors += 1; }
);
// wait server yields all events
Thread.Sleep(TimeSpan.FromMilliseconds(1000));
Assert.Contains(WatchEventType.Added, events);
Assert.Contains(WatchEventType.Modified, events);
Assert.Equal(2, errors);
Assert.True(watcher.Watching);
// prevent from server down exception trigger
Thread.Sleep(TimeSpan.FromMilliseconds(1000));
}
}
[Fact]
public void DisposeWatch()
{
using (var server = new MockKubeApiServer(async httpContext =>
{
await WriteStreamLine(httpContext, MockKubeApiServer.MockPodResponse);
await Task.Delay(TimeSpan.FromMilliseconds(100));
for (;;)
{
await WriteStreamLine(httpContext, MockAddedEventStreamLine);
await Task.Delay(TimeSpan.FromMilliseconds(100));
}
}))
{
var client = new Kubernetes(new KubernetesClientConfiguration
{
Host = server.Uri.ToString()
});
var listTask = client.ListNamespacedPodWithHttpMessagesAsync("default", watch: true).Result;
var events = new HashSet<WatchEventType>();
var watcher = listTask.Watch<V1Pod>(
(type, item) => { events.Add(type); }
);
// wait at least an event
Thread.Sleep(TimeSpan.FromMilliseconds(1000));
Assert.NotEmpty(events);
Assert.True(watcher.Watching);
watcher.Dispose();
events.Clear();
// make sure wait event called
Thread.Sleep(TimeSpan.FromMilliseconds(1000));
Assert.Empty(events);
Assert.False(watcher.Watching);
}
}
[Fact]
public void WatchAllEvents()
{
using (var server = new MockKubeApiServer(async httpContext =>
{
await WriteStreamLine(httpContext, MockKubeApiServer.MockPodResponse);
await Task.Delay(TimeSpan.FromMilliseconds(100));
await WriteStreamLine(httpContext, MockAddedEventStreamLine);
await Task.Delay(TimeSpan.FromMilliseconds(100));
await WriteStreamLine(httpContext, MockDeletedStreamLine);
await Task.Delay(TimeSpan.FromMilliseconds(100));
await WriteStreamLine(httpContext, MockModifiedStreamLine);
await Task.Delay(TimeSpan.FromMilliseconds(100));
await WriteStreamLine(httpContext, MockErrorStreamLine);
await Task.Delay(TimeSpan.FromMilliseconds(100));
// make server alive, cannot set to int.max as of it would block response
await Task.Delay(TimeSpan.FromDays(1));
return false;
}))
{
var client = new Kubernetes(new KubernetesClientConfiguration
{
Host = server.Uri.ToString()
});
var listTask = client.ListNamespacedPodWithHttpMessagesAsync("default", watch: true).Result;
var events = new HashSet<WatchEventType>();
var errors = 0;
var watcher = listTask.Watch<V1Pod>(
(type, item) => { events.Add(type); },
e => { errors += 1; }
);
// wait server yields all events
Thread.Sleep(TimeSpan.FromMilliseconds(1000));
Assert.Contains(WatchEventType.Added, events);
Assert.Contains(WatchEventType.Deleted, events);
Assert.Contains(WatchEventType.Modified, events);
Assert.Contains(WatchEventType.Error, events);
Assert.Equal(0, errors);
Assert.True(watcher.Watching);
}
}
[Fact]
public void WatchServerDisconnect()
{
Watcher<V1Pod> watcher;
Exception exceptionCatched = null;
using (var server = new MockKubeApiServer(async httpContext =>
{
await WriteStreamLine(httpContext, MockKubeApiServer.MockPodResponse);
// make sure watch success
await Task.Delay(TimeSpan.FromMilliseconds(200));
throw new IOException("server down");
}))
{
var client = new Kubernetes(new KubernetesClientConfiguration
{
Host = server.Uri.ToString()
});
var listTask = client.ListNamespacedPodWithHttpMessagesAsync("default", watch: true).Result;
watcher = listTask.Watch<V1Pod>(
(type, item) => { },
e => { exceptionCatched = e; });
}
// wait server down
Thread.Sleep(TimeSpan.FromMilliseconds(1000));
Assert.False(watcher.Watching);
Assert.IsType<IOException>(exceptionCatched);
}
private class DummyHandler : DelegatingHandler
{
internal bool Called { get; private set; }
protected override Task<HttpResponseMessage> SendAsync(HttpRequestMessage request,
CancellationToken cancellationToken)
{
Called = true;
return base.SendAsync(request, cancellationToken);
}
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using k8s.Exceptions;
using k8s.Models;
using k8s.Tests.Mock;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Newtonsoft.Json.Converters;
using Xunit;
using Xunit.Abstractions;
namespace k8s.Tests
{
public class WatchTests
: TestBase
{
private static readonly string MockAddedEventStreamLine = BuildWatchEventStreamLine(WatchEventType.Added);
private static readonly string MockDeletedStreamLine = BuildWatchEventStreamLine(WatchEventType.Deleted);
private static readonly string MockModifiedStreamLine = BuildWatchEventStreamLine(WatchEventType.Modified);
private static readonly string MockErrorStreamLine = BuildWatchEventStreamLine(WatchEventType.Error);
private static readonly string MockBadStreamLine = "bad json";
public WatchTests(ITestOutputHelper testOutput) : base(testOutput)
{
}
[Fact]
public void TestWatchWithHandlers()
{
using (var server = new MockKubeApiServer(async httpContext =>
{
await WriteStreamLine(httpContext, MockKubeApiServer.MockPodResponse);
await Task.Delay(TimeSpan.FromMilliseconds(100));
await WriteStreamLine(httpContext, MockAddedEventStreamLine);
private static string BuildWatchEventStreamLine(WatchEventType eventType)
{
var corev1PodList = JsonConvert.DeserializeObject<V1PodList>(MockKubeApiServer.MockPodResponse);
return JsonConvert.SerializeObject(new Watcher<V1Pod>.WatchEvent
{
Type = eventType,
Object = corev1PodList.Items.First()
}, new StringEnumConverter());
}
private static async Task WriteStreamLine(HttpContext httpContext, string reponseLine)
{
const string crlf = "\r\n";
await httpContext.Response.WriteAsync(reponseLine.Replace(crlf, ""));
await httpContext.Response.WriteAsync(crlf);
await httpContext.Response.Body.FlushAsync();
}
[Fact]
public void CannotWatch()
{
using (var server = new MockKubeApiServer(testOutput: TestOutput))
{
var client = new Kubernetes(new KubernetesClientConfiguration
{
Host = server.Uri.ToString()
});
// did not pass watch param
{
var listTask = client.ListNamespacedPodWithHttpMessagesAsync("default").Result;
Assert.ThrowsAny<KubernetesClientException>(() =>
{
listTask.Watch<V1Pod>((type, item) => { });
});
}
// server did not response line by line
{
Assert.ThrowsAny<Exception>(() =>
{
var listTask = client.ListNamespacedPodWithHttpMessagesAsync("default", watch: true).Result;
// this line did not throw
// listTask.Watch<Corev1Pod>((type, item) => { });
});
}
}
}
[Fact]
public void SuriveBadLine()
{
using (CountdownEvent eventsReceived = new CountdownEvent(4 /* first line of response is eaten by WatcherDelegatingHandler */))
using (var server = new MockKubeApiServer(TestOutput, async httpContext =>
{
httpContext.Response.StatusCode = (int) HttpStatusCode.OK;
httpContext.Response.ContentLength = null;
await WriteStreamLine(httpContext, MockKubeApiServer.MockPodResponse);
await Task.Delay(TimeSpan.FromMilliseconds(100));
// make server alive, cannot set to int.max as of it would block response
await Task.Delay(TimeSpan.FromDays(1));
return false;
}))
{
var handler1 = new DummyHandler();
await WriteStreamLine(httpContext, MockBadStreamLine);
await Task.Delay(TimeSpan.FromMilliseconds(100));
await WriteStreamLine(httpContext, MockAddedEventStreamLine);
await Task.Delay(TimeSpan.FromMilliseconds(100));
await WriteStreamLine(httpContext, MockBadStreamLine);
await Task.Delay(TimeSpan.FromMilliseconds(100));
await WriteStreamLine(httpContext, MockModifiedStreamLine);
await Task.Delay(TimeSpan.FromMilliseconds(100));
// make server alive, cannot set to int.max as of it would block response
await Task.Delay(TimeSpan.FromDays(1));
return false;
}))
{
var client = new Kubernetes(new KubernetesClientConfiguration
{
Host = server.Uri.ToString()
});
var listTask = client.ListNamespacedPodWithHttpMessagesAsync("default", watch: true).Result;
var events = new HashSet<WatchEventType>();
var errors = 0;
var watcher = listTask.Watch<V1Pod>(
(type, item) =>
{
Log.LogInformation("Watcher received '{EventType}' event.", type);
events.Add(type);
eventsReceived.Signal();
},
error =>
{
Log.LogInformation("Watcher received '{ErrorType}' error.", error.GetType().FullName);
errors += 1;
eventsReceived.Signal();
}
);
// wait server yields all events
Assert.True(
eventsReceived.Wait(TimeSpan.FromMilliseconds(3000)),
"Timed out waiting for all events / errors to be received."
);
Assert.Contains(WatchEventType.Added, events);
Assert.Contains(WatchEventType.Modified, events);
Assert.Equal(2, errors);
Assert.True(watcher.Watching);
// prevent from server down exception trigger
Thread.Sleep(TimeSpan.FromMilliseconds(1000));
}
}
[Fact]
public void DisposeWatch()
{
using (var server = new MockKubeApiServer(TestOutput, async httpContext =>
{
await WriteStreamLine(httpContext, MockKubeApiServer.MockPodResponse);
await Task.Delay(TimeSpan.FromMilliseconds(100));
for (;;)
{
await WriteStreamLine(httpContext, MockAddedEventStreamLine);
await Task.Delay(TimeSpan.FromMilliseconds(100));
}
}))
{
var client = new Kubernetes(new KubernetesClientConfiguration
{
Host = server.Uri.ToString()
});
var listTask = client.ListNamespacedPodWithHttpMessagesAsync("default", watch: true).Result;
var events = new HashSet<WatchEventType>();
var watcher = listTask.Watch<V1Pod>(
(type, item) => { events.Add(type); }
);
// wait at least an event
Thread.Sleep(TimeSpan.FromMilliseconds(1000));
Assert.NotEmpty(events);
Assert.True(watcher.Watching);
watcher.Dispose();
events.Clear();
// make sure wait event called
Thread.Sleep(TimeSpan.FromMilliseconds(1000));
Assert.Empty(events);
Assert.False(watcher.Watching);
}
}
[Fact]
public void WatchAllEvents()
{
using (CountdownEvent eventsReceived = new CountdownEvent(4 /* first line of response is eaten by WatcherDelegatingHandler */))
using (var server = new MockKubeApiServer(TestOutput, async httpContext =>
{
await WriteStreamLine(httpContext, MockKubeApiServer.MockPodResponse);
await Task.Delay(TimeSpan.FromMilliseconds(100));
await WriteStreamLine(httpContext, MockAddedEventStreamLine);
await Task.Delay(TimeSpan.FromMilliseconds(100));
await WriteStreamLine(httpContext, MockDeletedStreamLine);
await Task.Delay(TimeSpan.FromMilliseconds(100));
await WriteStreamLine(httpContext, MockModifiedStreamLine);
await Task.Delay(TimeSpan.FromMilliseconds(100));
await WriteStreamLine(httpContext, MockErrorStreamLine);
await Task.Delay(TimeSpan.FromMilliseconds(100));
// make server alive, cannot set to int.max as of it would block response
await Task.Delay(TimeSpan.FromDays(1));
return false;
}))
{
var client = new Kubernetes(new KubernetesClientConfiguration
{
Host = server.Uri.ToString()
});
var listTask = client.ListNamespacedPodWithHttpMessagesAsync("default", watch: true).Result;
var events = new HashSet<WatchEventType>();
var errors = 0;
var watcher = listTask.Watch<V1Pod>(
(type, item) =>
{
Log.LogInformation("Watcher received '{EventType}' event.", type);
events.Add(type);
eventsReceived.Signal();
},
error =>
{
Log.LogInformation("Watcher received '{ErrorType}' error.", error.GetType().FullName);
errors += 1;
eventsReceived.Signal();
}
);
// wait server yields all events
Assert.True(
eventsReceived.Wait(TimeSpan.FromMilliseconds(3000)),
"Timed out waiting for all events / errors to be received."
);
Assert.Contains(WatchEventType.Added, events);
Assert.Contains(WatchEventType.Deleted, events);
Assert.Contains(WatchEventType.Modified, events);
Assert.Contains(WatchEventType.Error, events);
Assert.Equal(0, errors);
Assert.True(watcher.Watching);
}
}
[Fact]
public void WatchServerDisconnect()
{
Watcher<V1Pod> watcher;
Exception exceptionCatched = null;
using (var server = new MockKubeApiServer(TestOutput, async httpContext =>
{
await WriteStreamLine(httpContext, MockKubeApiServer.MockPodResponse);
// make sure watch success
await Task.Delay(TimeSpan.FromMilliseconds(200));
throw new IOException("server down");
}))
{
var client = new Kubernetes(new KubernetesClientConfiguration
{
Host = server.Uri.ToString()
});
var listTask = client.ListNamespacedPodWithHttpMessagesAsync("default", watch: true).Result;
watcher = listTask.Watch<V1Pod>(
(type, item) => { },
e => { exceptionCatched = e; });
}
// wait server down
Thread.Sleep(TimeSpan.FromMilliseconds(1000));
Assert.False(watcher.Watching);
Assert.IsType<IOException>(exceptionCatched);
}
private class DummyHandler : DelegatingHandler
{
internal bool Called { get; private set; }
protected override Task<HttpResponseMessage> SendAsync(HttpRequestMessage request,
CancellationToken cancellationToken)
{
Called = true;
return base.SendAsync(request, cancellationToken);
}
}
[Fact]
public void TestWatchWithHandlers()
{
using (var server = new MockKubeApiServer(TestOutput, async httpContext =>
{
await WriteStreamLine(httpContext, MockKubeApiServer.MockPodResponse);
await Task.Delay(TimeSpan.FromMilliseconds(100));
await WriteStreamLine(httpContext, MockAddedEventStreamLine);
await Task.Delay(TimeSpan.FromMilliseconds(100));
// make server alive, cannot set to int.max as of it would block response
await Task.Delay(TimeSpan.FromDays(1));
return false;
}))
{
var handler1 = new DummyHandler();
var handler2 = new DummyHandler();
var client = new Kubernetes(new KubernetesClientConfiguration
{
Host = server.Uri.ToString()
}, handler1, handler2);
Assert.False(handler1.Called);
var client = new Kubernetes(new KubernetesClientConfiguration
{
Host = server.Uri.ToString()
}, handler1, handler2);
Assert.False(handler1.Called);
Assert.False(handler2.Called);
var listTask = client.ListNamespacedPodWithHttpMessagesAsync("default", watch: true).Result;
var events = new HashSet<WatchEventType>();
var watcher = listTask.Watch<V1Pod>(
(type, item) => { events.Add(type); }
);
// wait server yields all events
Thread.Sleep(TimeSpan.FromMilliseconds(500));
Assert.Contains(WatchEventType.Added, events);
var listTask = client.ListNamespacedPodWithHttpMessagesAsync("default", watch: true).Result;
Assert.True(handler1.Called);
var events = new HashSet<WatchEventType>();
var watcher = listTask.Watch<V1Pod>(
(type, item) => { events.Add(type); }
);
// wait server yields all events
Thread.Sleep(TimeSpan.FromMilliseconds(500));
Assert.Contains(WatchEventType.Added, events);
Assert.True(handler1.Called);
Assert.True(handler2.Called);
}
}
}
}
}
}

332
tests/WebSocketTestBase.cs Normal file
View File

@@ -0,0 +1,332 @@
using k8s.Tests.Logging;
using k8s.Tests.Mock.Server;
using Microsoft.AspNetCore;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Rest;
using System;
using System.IO;
using System.Net.WebSockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Xunit;
using Xunit.Abstractions;
namespace k8s.Tests
{
/// <summary>
/// The base class for Kubernetes WebSocket test suites.
/// </summary>
public abstract class WebSocketTestBase
: TestBase
{
/// <summary>
/// The next server port to use.
/// </summary>
static int NextPort = 13255;
/// <summary>
/// Create a new <see cref="WebSocketTestBase"/>.
/// </summary>
/// <param name="testOutput">
/// Output for the current test.
/// </param>
protected WebSocketTestBase(ITestOutputHelper testOutput)
: base(testOutput)
{
int port = Interlocked.Increment(ref NextPort);
// Useful to diagnose test timeouts.
TestCancellation.Register(
() => Log.LogInformation("Test-level cancellation token has been canceled.")
);
ServerBaseAddress = new Uri($"http://localhost:{port}");
WebSocketBaseAddress = new Uri($"ws://localhost:{port}");
Host = WebHost.CreateDefaultBuilder()
.UseStartup<Startup>()
.ConfigureServices(ConfigureTestServerServices)
.ConfigureLogging(ConfigureTestServerLogging)
.UseUrls(ServerBaseAddress.AbsoluteUri)
.Build();
Disposal.Add(CancellationSource);
Disposal.Add(Host);
}
/// <summary>
/// The test server's base address (http://).
/// </summary>
protected Uri ServerBaseAddress { get; }
/// <summary>
/// The test server's base WebSockets address (ws://).
/// </summary>
protected Uri WebSocketBaseAddress { get; }
/// <summary>
/// The test server's web host.
/// </summary>
protected IWebHost Host { get; }
/// <summary>
/// Test adapter for accepting web sockets.
/// </summary>
protected WebSocketTestAdapter WebSocketTestAdapter { get; } = new WebSocketTestAdapter();
/// <summary>
/// The source for cancellation tokens used by the test.
/// </summary>
protected CancellationTokenSource CancellationSource { get; } = new CancellationTokenSource();
/// <summary>
/// A <see cref="System.Threading.CancellationToken"/> that can be used to cancel asynchronous operations.
/// </summary>
/// <seealso cref="CancellationSource"/>
protected CancellationToken TestCancellation => CancellationSource.Token;
/// <summary>
/// Configure services for the test server.
/// </summary>
/// <param name="services">
/// The service collection to configure.
/// </param>
protected virtual void ConfigureTestServerServices(IServiceCollection services)
{
if (services == null)
throw new ArgumentNullException(nameof(services));
// Inject WebSocketTestData.
services.AddSingleton(WebSocketTestAdapter);
}
/// <summary>
/// Configure logging for the test server.
/// </summary>
/// <param name="services">
/// The logger factory to configure.
/// </param>
protected virtual void ConfigureTestServerLogging(ILoggingBuilder logging)
{
if (logging == null)
throw new ArgumentNullException(nameof(logging));
logging.ClearProviders(); // Don't log to console.
logging.AddTestOutput(TestOutput, MinLogLevel);
}
/// <summary>
/// Create a Kubernetes client that uses the test server.
/// </summary>
/// <param name="credentials">
/// Optional <see cref="ServiceClientCredentials"/> to use for authentication (defaults to anonymous, i.e. no credentials).
/// </param>
/// <returns>
/// The configured client.
/// </returns>
protected virtual Kubernetes CreateTestClient(ServiceClientCredentials credentials = null)
{
return new Kubernetes(credentials ?? AnonymousClientCredentials.Instance)
{
BaseUri = ServerBaseAddress
};
}
/// <summary>
/// Asynchronously disconnect client and server WebSockets using the standard handshake.
/// </summary>
/// <param name="clientSocket">
/// The client-side <see cref="WebSocket"/>.
/// </param>
/// <param name="serverSocket">
/// The server-side <see cref="WebSocket"/>.
/// </param>
/// <param name="closeStatus">
/// An optional <see cref="WebSocketCloseStatus"/> value indicating the reason for disconnection.
///
/// Defaults to <see cref="WebSocketCloseStatus.NormalClosure"/>.
/// </param>
/// <param name="closeStatusDescription">
/// An optional textual description of the reason for disconnection.
///
/// Defaults to "Normal Closure".
/// </param>
/// <returns>
/// A <see cref="Task"/> representing the asynchronous operation.
/// </returns>
protected async Task Disconnect(WebSocket clientSocket, WebSocket serverSocket, WebSocketCloseStatus closeStatus = WebSocketCloseStatus.NormalClosure, string closeStatusDescription = "Normal Closure")
{
if (clientSocket == null)
throw new ArgumentNullException(nameof(clientSocket));
if (serverSocket == null)
throw new ArgumentNullException(nameof(serverSocket));
Log.LogInformation("Disconnecting...");
// Asynchronously perform the server's half of the handshake (the call to clientSocket.CloseAsync will block until it receives the server-side response).
ArraySegment<byte> receiveBuffer = new byte[1024];
Task closeServerSocket = serverSocket.ReceiveAsync(receiveBuffer, TestCancellation)
.ContinueWith(async received =>
{
if (received.IsFaulted)
Log.LogError(new EventId(0), received.Exception.Flatten().InnerExceptions[0], "Server socket operation to receive Close message failed.");
else if (received.IsCanceled)
Log.LogWarning("Server socket operation to receive Close message was canceled.");
else
{
Log.LogInformation("Received {MessageType} message from server socket (expecting {ExpectedMessageType}).",
received.Result.MessageType,
WebSocketMessageType.Close
);
if (received.Result.MessageType == WebSocketMessageType.Close)
{
Log.LogInformation("Closing server socket (with status {CloseStatus})...", received.Result.CloseStatus);
await serverSocket.CloseAsync(
received.Result.CloseStatus.Value,
received.Result.CloseStatusDescription,
TestCancellation
);
Log.LogInformation("Server socket closed.");
}
Assert.Equal(WebSocketMessageType.Close, received.Result.MessageType);
}
});
Log.LogInformation("Closing client socket...");
await clientSocket.CloseAsync(closeStatus, closeStatusDescription, TestCancellation).ConfigureAwait(false);
Log.LogInformation("Client socket closed.");
await closeServerSocket.ConfigureAwait(false);
Log.LogInformation("Disconnected.");
Assert.Equal(closeStatus, clientSocket.CloseStatus);
Assert.Equal(clientSocket.CloseStatus, serverSocket.CloseStatus);
Assert.Equal(closeStatusDescription, clientSocket.CloseStatusDescription);
Assert.Equal(clientSocket.CloseStatusDescription, serverSocket.CloseStatusDescription);
}
/// <summary>
/// Send text to a multiplexed substream over the specified WebSocket.
/// </summary>
/// <param name="webSocket">
/// The target <see cref="WebSocket"/>.
/// </param>
/// <param name="streamIndex">
/// The 0-based index of the target substream.
/// </param>
/// <param name="text">
/// The text to send.
/// </param>
/// <returns>
/// The number of bytes sent to the WebSocket.
/// </returns>
protected async Task<int> SendMultiplexed(WebSocket webSocket, byte streamIndex, string text)
{
if (webSocket == null)
throw new ArgumentNullException(nameof(webSocket));
if (text == null)
throw new ArgumentNullException(nameof(text));
byte[] payload = Encoding.ASCII.GetBytes(text);
byte[] sendBuffer = new byte[payload.Length + 1];
sendBuffer[0] = streamIndex;
Array.Copy(payload, 0, sendBuffer, 1, payload.Length);
await webSocket.SendAsync(sendBuffer, WebSocketMessageType.Binary,
endOfMessage: true,
cancellationToken: TestCancellation
);
return sendBuffer.Length;
}
/// <summary>
/// Receive text from a multiplexed substream over the specified WebSocket.
/// </summary>
/// <param name="webSocket">
/// The target <see cref="WebSocket"/>.
/// </param>
/// <param name="text">
/// The text to send.
/// </param>
/// <returns>
/// A tuple containing the received text, 0-based substream index, and total bytes received.
/// </returns>
protected async Task<(string text, byte streamIndex, int totalBytes)> ReceiveTextMultiplexed(WebSocket webSocket)
{
if (webSocket == null)
throw new ArgumentNullException(nameof(webSocket));
byte[] receivedData;
using (MemoryStream buffer = new MemoryStream())
{
byte[] receiveBuffer = new byte[1024];
WebSocketReceiveResult receiveResult = await webSocket.ReceiveAsync(receiveBuffer, TestCancellation);
if (receiveResult.MessageType != WebSocketMessageType.Binary)
throw new IOException($"Received unexpected WebSocket message of type '{receiveResult.MessageType}'.");
buffer.Write(receiveBuffer, 0, receiveResult.Count);
while (!receiveResult.EndOfMessage)
{
receiveResult = await webSocket.ReceiveAsync(receiveBuffer, TestCancellation);
buffer.Write(receiveBuffer, 0, receiveResult.Count);
}
buffer.Flush();
receivedData = buffer.ToArray();
}
return (
text: Encoding.ASCII.GetString(receivedData, 1, receivedData.Length - 1),
streamIndex: receivedData[0],
totalBytes: receivedData.Length
);
}
/// <summary>
/// A <see cref="ServiceClientCredentials"/> implementation representing no credentials (i.e. anonymous).
/// </summary>
protected class AnonymousClientCredentials
: ServiceClientCredentials
{
/// <summary>
/// The singleton instance of <see cref="AnonymousClientCredentials"/>.
/// </summary>
public static readonly AnonymousClientCredentials Instance = new AnonymousClientCredentials();
/// <summary>
/// Create new <see cref="AnonymousClientCredentials"/>.
/// </summary>
AnonymousClientCredentials()
{
}
}
/// <summary>
/// Event Id constants used in WebSocket tests.
/// </summary>
protected static class EventIds
{
/// <summary>
/// An error occurred while closing the server-side socket.
/// </summary>
static readonly EventId ErrorClosingServerSocket = new EventId(1000, nameof(ErrorClosingServerSocket));
}
}
}

View File

@@ -1,19 +1,35 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netcoreapp2.0</TargetFramework>
<IsPackable>false</IsPackable>
<RootNamespace>k8s.tests</RootNamespace>
<TargetFrameworks>netcoreapp2.0;netcoreapp2.1</TargetFrameworks>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore" Version="2.0.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.3.0" />
<PackageReference Include="xunit" Version="2.3.0" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.3.0" />
<DotNetCliToolReference Include="dotnet-xunit" Version="2.3.0" />
<PackageReference Include="Microsoft.AspNetCore.All" Version="2.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="2.0.0" />
<PackageReference Include="System.Reactive" Version="3.1.1" />
</ItemGroup>
<ItemGroup Condition=" '$(TargetFramework)' == 'netcoreapp2.0' ">
<Compile Remove="Kubernetes.Exec.Tests.cs" />
</ItemGroup>
<ItemGroup Condition=" '$(TargetFramework)' == 'netcoreapp2.1' ">
<Compile Remove="Kubernetes.WebSockets.Tests.cs" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.7.0-preview-20180221-13" />
<PackageReference Include="xunit" Version="2.3.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.3.1" NoWarn="NU1701" />
<DotNetCliToolReference Include="dotnet-xunit" Version="2.3.1" />
</ItemGroup>
<ItemGroup>
<None Include="assets/*" CopyToOutputDirectory="Always" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\src\KubernetesClient.csproj" />
</ItemGroup>