using System;
using System.Buffers;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;
namespace k8s
{
///
///
/// The allows you to interact with processes running in a container in a Kubernetes pod. You can start an exec or attach command
/// by calling
/// or . These methods
/// will return you a connection.
///
///
/// Kubernetes 'multiplexes' multiple channels over this connection, such as standard input, standard output and standard error. The
/// allows you to extract individual s from this class. You can then use these streams to send/receive data from that process.
///
///
public class StreamDemuxer : IStreamDemuxer
{
private readonly WebSocket webSocket;
private readonly Dictionary buffers = new Dictionary();
private readonly CancellationTokenSource cts = new CancellationTokenSource();
private readonly StreamType streamType;
private readonly bool ownsSocket;
private Task runLoop;
private bool disposedValue;
///
/// Initializes a new instance of the class.
///
///
/// A which contains a multiplexed stream, such as the returned by the exec or attach commands.
///
///
/// A specifies the type of the stream.
///
///
/// A value indicating whether this instance of the owns the underlying ,
/// and should dispose of it when this instance is disposed of.
///
public StreamDemuxer(WebSocket webSocket, StreamType streamType = StreamType.RemoteCommand,
bool ownsSocket = false)
{
this.streamType = streamType;
this.webSocket = webSocket ?? throw new ArgumentNullException(nameof(webSocket));
this.ownsSocket = ownsSocket;
}
public event EventHandler ConnectionClosed;
///
/// Starts reading the data sent by the server.
///
public void Start()
{
runLoop = Task.Run(async () => await RunLoop(cts.Token).ConfigureAwait(false));
}
///
/// Gets a which allows you to read to and/or write from a remote channel.
///
///
/// The index of the channel from which to read.
///
///
/// The index of the channel to which to write.
///
///
/// A which allows you to read/write to the requested channels.
///
public Stream GetStream(ChannelIndex? inputIndex, ChannelIndex? outputIndex)
{
return GetStream((byte?)inputIndex, (byte?)outputIndex);
}
///
/// Gets a which allows you to read to and/or write from a remote channel.
///
///
/// The index of the channel from which to read.
///
///
/// The index of the channel to which to write.
///
///
/// A which allows you to read/write to the requested channels.
///
public Stream GetStream(byte? inputIndex, byte? outputIndex)
{
lock (buffers)
{
if (inputIndex != null && !buffers.ContainsKey(inputIndex.Value))
{
var buffer = new ByteBuffer();
buffers.Add(inputIndex.Value, buffer);
}
}
var inputBuffer = inputIndex == null ? null : buffers[inputIndex.Value];
return new MuxedStream(this, inputBuffer, outputIndex);
}
///
/// Directly writes data to a channel.
///
///
/// The index of the channel to which to write.
///
///
/// The buffer from which to read data.
///
///
/// The offset at which to start reading.
///
///
/// The number of bytes to read.
///
///
/// A which can be used to cancel the asynchronous operation.
///
///
/// A which represents the asynchronous operation.
///
public Task Write(ChannelIndex index, byte[] buffer, int offset, int count,
CancellationToken cancellationToken = default)
{
return Write((byte)index, buffer, offset, count, cancellationToken);
}
///
/// Directly writes data to a channel.
///
///
/// The index of the channel to which to write.
///
///
/// The buffer from which to read data.
///
///
/// The offset at which to start reading.
///
///
/// The number of bytes to read.
///
///
/// A which can be used to cancel the asynchronous operation.
///
///
/// A which represents the asynchronous operation.
///
public async Task Write(byte index, byte[] buffer, int offset, int count,
CancellationToken cancellationToken = default)
{
var writeBuffer = ArrayPool.Shared.Rent(count + 1);
try
{
writeBuffer[0] = (byte)index;
Array.Copy(buffer, offset, writeBuffer, 1, count);
var segment = new ArraySegment(writeBuffer, 0, count + 1);
await webSocket.SendAsync(segment, WebSocketMessageType.Binary, false, cancellationToken)
.ConfigureAwait(false);
}
finally
{
ArrayPool.Shared.Return(writeBuffer);
}
}
protected async Task RunLoop(CancellationToken cancellationToken)
{
// Get a 1KB buffer
var buffer = ArrayPool.Shared.Rent(1024 * 1024);
// This maps remembers bytes skipped for each stream.
var streamBytesToSkipMap = new Dictionary();
try
{
var segment = new ArraySegment(buffer);
while (!cancellationToken.IsCancellationRequested && webSocket.CloseStatus == null)
{
// We always get data in this format:
// [stream index] (1 for stdout, 2 for stderr)
// [payload]
var result = await webSocket.ReceiveAsync(segment, cancellationToken).ConfigureAwait(false);
// Ignore empty messages
if (result.Count < 2)
{
continue;
}
var streamIndex = buffer[0];
var extraByteCount = 1;
while (true)
{
var bytesToSkip = 0;
if (!streamBytesToSkipMap.TryGetValue(streamIndex, out bytesToSkip))
{
// When used in port-forwarding, the first 2 bytes from the web socket is port bytes, skip.
// https://github.com/kubernetes/kubernetes/blob/master/pkg/kubelet/server/portforward/websocket.go
bytesToSkip = streamType == StreamType.PortForward ? 2 : 0;
}
var bytesCount = result.Count - extraByteCount;
if (bytesToSkip > 0 && bytesToSkip >= bytesCount)
{
// skip the entire data.
bytesToSkip -= bytesCount;
extraByteCount += bytesCount;
bytesCount = 0;
}
else
{
bytesCount -= bytesToSkip;
extraByteCount += bytesToSkip;
bytesToSkip = 0;
if (buffers.ContainsKey(streamIndex))
{
buffers[streamIndex].Write(buffer, extraByteCount, bytesCount);
}
}
streamBytesToSkipMap[streamIndex] = bytesToSkip;
if (result.EndOfMessage == true)
{
break;
}
extraByteCount = 0;
result = await webSocket.ReceiveAsync(segment, cancellationToken).ConfigureAwait(false);
}
}
}
finally
{
ArrayPool.Shared.Return(buffer);
runLoop = null;
foreach (var b in buffers.Values)
{
b.WriteEnd();
}
ConnectionClosed?.Invoke(this, EventArgs.Empty);
}
}
protected virtual void Dispose(bool disposing)
{
if (!disposedValue)
{
if (disposing)
{
try
{
if (runLoop != null)
{
cts.Cancel();
cts.Dispose();
runLoop.Wait();
}
}
catch (Exception ex)
{
// Dispose methods can never throw.
Debug.Write(ex);
}
if (ownsSocket)
{
webSocket.Dispose();
}
}
disposedValue = true;
}
}
// // TODO: override finalizer only if 'Dispose(bool disposing)' has code to free unmanaged resources
// ~StreamDemuxer()
// {
// // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
// Dispose(disposing: false);
// }
public void Dispose()
{
// Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
Dispose(true);
GC.SuppressFinalize(this);
}
}
}