Add ChannelIndex enumeration, and some documentation (#142)

This commit is contained in:
Frederik Carlier
2018-04-27 06:14:38 +02:00
committed by Brendan Burns
parent d90289a094
commit 1c07966d72
3 changed files with 160 additions and 0 deletions

View File

@@ -0,0 +1,37 @@
namespace k8s
{
/// <summary>
/// These values identify the various channels which you can use when interacting with a process running in a container in a Kubernetes
/// pod.
/// </summary>
/// <seealso href="https://github.com/kubernetes/kubernetes/blob/master/pkg/kubelet/server/remotecommand/websocket.go#L29"/>
public enum ChannelIndex : byte
{
/// <summary>
/// The standard input channel. Use this channel to send input to a process running in a container inside a Kubernetes pod.
/// </summary>
StdIn,
/// <summary>
/// The standard output channel. Use this channel to read standard output generated by a process running in a container in a Kubernetes pod.
/// </summary>
StdOut,
/// <summary>
/// The standard error channel. Use this channel to read the error output generated by a process running in a container in a Kubernetes pod.
/// </summary>
StdErr,
/// <summary>
/// The error channel. This channel is used by Kubernetes to send you error messages, including the exit code of the process.
/// </summary>
Error,
/// <summary>
/// The resize channel. Use this channel to resize the terminal. You need to send a JSON-formatted object over this channel, which
/// has a Width and Height property.
/// </summary>
/// <seealso href="https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/client-go/tools/remotecommand/resize.go"/>
Resize
}
}

View File

@@ -3,12 +3,27 @@ using System.IO;
namespace k8s
{
/// <summary>
/// A <see cref="Stream"/> which reads/writes from a specific channel using a <see cref="StreamDemuxer" />.
/// </summary>
public class MuxedStream : Stream
{
private ByteBuffer inputBuffer;
private byte? outputIndex;
private StreamDemuxer muxer;
/// <summary>
/// Initializes a new instance of the <see cref="MuxedStream"/> class.
/// </summary>
/// <param name="muxer">
/// The <see cref="StreamDemuxer"/> to use to read from/write to the underlying stream.
/// </param>
/// <param name="inputBuffer">
/// The <see cref="inputBuffer"/> to read from.
/// </param>
/// <param name="outputIndex">
/// The index of the channel to which to write.
/// </param>
public MuxedStream(StreamDemuxer muxer, ByteBuffer inputBuffer, byte? outputIndex)
{
this.inputBuffer = inputBuffer;
@@ -22,20 +37,26 @@ namespace k8s
this.muxer = muxer ?? throw new ArgumentNullException(nameof(muxer));
}
/// <inheritdoc/>
public override bool CanRead => this.inputBuffer != null;
/// <inheritdoc/>
public override bool CanSeek => false;
/// <inheritdoc/>
public override bool CanWrite => this.outputIndex != null;
/// <inheritdoc/>
public override long Length => throw new NotSupportedException();
/// <inheritdoc/>
public override long Position
{
get => throw new NotSupportedException();
set => throw new NotSupportedException();
}
/// <inheritdoc/>
public override void Write(byte[] buffer, int offset, int count)
{
if (this.outputIndex == null)
@@ -48,6 +69,7 @@ namespace k8s
}
}
/// <inheritdoc/>
public override int Read(byte[] buffer, int offset, int count)
{
if (this.inputBuffer == null)
@@ -60,16 +82,19 @@ namespace k8s
}
}
/// <inheritdoc/>
public override void Flush()
{
throw new NotSupportedException();
}
/// <inheritdoc/>
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}
/// <inheritdoc/>
public override void SetLength(long value)
{
throw new NotSupportedException();

View File

@@ -9,6 +9,18 @@ using System.Threading.Tasks;
namespace k8s
{
/// <summary>
/// <para>
/// The <see cref="StreamDemuxer"/> allows you to interact with processes running in a container in a Kubernetes pod. You can start an exec or attach command
/// by calling <see cref="Kubernetes.WebSocketNamespacedPodExecAsync(string, string, IEnumerable{string}, string, bool, bool, bool, bool, Dictionary{string, List{string}}, CancellationToken)"/>
/// or <see cref="Kubernetes.WebSocketNamespacedPodAttachAsync(string, string, string, bool, bool, bool, bool, Dictionary{string, List{string}}, CancellationToken)"/>. These methods
/// will return you a <see cref="WebSocket"/> connection.
/// </para>
/// <para>
/// Kubernetes 'multiplexes' multiple channels over this <see cref="WebSocket"/> connection, such as standard input, standard output and standard error. The <see cref="StreamDemuxer"/>
/// allows you to extract individual <see cref="Stream"/>s from this <see cref="WebSocket"/> class. You can then use these streams to send/receive data from that process.
/// </para>
/// </summary>
public class StreamDemuxer : IDisposable
{
private readonly WebSocket webSocket;
@@ -16,6 +28,12 @@ namespace k8s
private readonly CancellationTokenSource cts = new CancellationTokenSource();
private Task runLoop;
/// <summary>
/// Initializes a new instance of the <see cref="StreamDemuxer"/> class.
/// </summary>
/// <param name="webSocket">
/// A <see cref="WebSocket"/> which contains a multiplexed stream, such as the <see cref="WebSocket"/> returned by the exec or attach commands.
/// </param>
public StreamDemuxer(WebSocket webSocket)
{
this.webSocket = webSocket ?? throw new ArgumentNullException(nameof(webSocket));
@@ -23,11 +41,15 @@ namespace k8s
public event EventHandler ConnectionClosed;
/// <summary>
/// Starts reading the data sent by the server.
/// </summary>
public void Start()
{
this.runLoop = this.RunLoop(this.cts.Token);
}
/// <inheritdoc/>
public void Dispose()
{
try
@@ -45,6 +67,35 @@ namespace k8s
}
}
/// <summary>
/// Gets a <see cref="Stream"/> which allows you to read to and/or write from a remote channel.
/// </summary>
/// <param name="inputIndex">
/// The index of the channel from which to read.
/// </param>
/// <param name="outputIndex">
/// The index of the channel to which to write.
/// </param>
/// <returns>
/// A <see cref="Stream"/> which allows you to read/write to the requested channels.
/// </returns>
public Stream GetStream(ChannelIndex? inputIndex, ChannelIndex? outputIndex)
{
return GetStream((byte?)inputIndex, (byte?)outputIndex);
}
/// <summary>
/// Gets a <see cref="Stream"/> which allows you to read to and/or write from a remote channel.
/// </summary>
/// <param name="inputIndex">
/// The index of the channel from which to read.
/// </param>
/// <param name="outputIndex">
/// The index of the channel to which to write.
/// </param>
/// <returns>
/// A <see cref="Stream"/> which allows you to read/write to the requested channels.
/// </returns>
public Stream GetStream(byte? inputIndex, byte? outputIndex)
{
if (inputIndex != null && !this.buffers.ContainsKey(inputIndex.Value))
@@ -60,6 +111,53 @@ namespace k8s
return new MuxedStream(this, inputBuffer, outputIndex);
}
/// <summary>
/// Directly writes data to a channel.
/// </summary>
/// <param name="index">
/// The index of the channel to which to write.
/// </param>
/// <param name="buffer">
/// The buffer from which to read data.
/// </param>
/// <param name="offset">
/// The offset at which to start reading.
/// </param>
/// <param name="count">
/// The number of bytes to read.
/// </param>
/// <param name="cancellationToken">
/// A <see cref="CancellationToken"/> which can be used to cancel the asynchronous operation.
/// </param>
/// <returns>
/// A <see cref="Task"/> which represents the asynchronous operation.
/// </returns>
public Task Write(ChannelIndex index, byte[] buffer, int offset, int count, CancellationToken cancellationToken = default(CancellationToken))
{
return Write((byte)index, buffer, offset, count, cancellationToken);
}
/// <summary>
/// Directly writes data to a channel.
/// </summary>
/// <param name="index">
/// The index of the channel to which to write.
/// </param>
/// <param name="buffer">
/// The buffer from which to read data.
/// </param>
/// <param name="offset">
/// The offset at which to start reading.
/// </param>
/// <param name="count">
/// The number of bytes to read.
/// </param>
/// <param name="cancellationToken">
/// A <see cref="CancellationToken"/> which can be used to cancel the asynchronous operation.
/// </param>
/// <returns>
/// A <see cref="Task"/> which represents the asynchronous operation.
/// </returns>
public async Task Write(byte index, byte[] buffer, int offset, int count, CancellationToken cancellationToken = default(CancellationToken))
{
byte[] writeBuffer = ArrayPool<byte>.Shared.Rent(count + 1);