using Nito.AsyncEx; using System; using System.Collections.Concurrent; using System.Net.WebSockets; using System.Threading; using System.Threading.Tasks; namespace k8s.Tests.Mock { public class MockWebSocket : WebSocket { private WebSocketCloseStatus? closeStatus = null; private string closeStatusDescription; private WebSocketState state; private string subProtocol; private ConcurrentQueue receiveBuffers = new ConcurrentQueue(); private AsyncAutoResetEvent receiveEvent = new AsyncAutoResetEvent(false); private bool disposedValue; public MockWebSocket(string subProtocol = null) { this.subProtocol = subProtocol; } public void SetState(WebSocketState state) { this.state = state; } public EventHandler MessageSent { get; set; } public Task InvokeReceiveAsync(ArraySegment buffer, WebSocketMessageType messageType, bool endOfMessage) { this.receiveBuffers.Enqueue(new MessageData() { Buffer = buffer, MessageType = messageType, EndOfMessage = endOfMessage, }); this.receiveEvent.Set(); return Task.CompletedTask; } public override WebSocketCloseStatus? CloseStatus => this.closeStatus; public override string CloseStatusDescription => this.closeStatusDescription; public override WebSocketState State => this.state; public override string SubProtocol => this.subProtocol; public override void Abort() { throw new NotImplementedException(); } public override Task CloseAsync(WebSocketCloseStatus closeStatus, string statusDescription, CancellationToken cancellationToken) { this.closeStatus = closeStatus; this.closeStatusDescription = statusDescription; this.receiveBuffers.Enqueue(new MessageData() { Buffer = new ArraySegment(new byte[] { }), EndOfMessage = true, MessageType = WebSocketMessageType.Close, }); this.receiveEvent.Set(); return Task.CompletedTask; } public override Task CloseOutputAsync(WebSocketCloseStatus closeStatus, string statusDescription, CancellationToken cancellationToken) { throw new NotImplementedException(); } public override async Task ReceiveAsync(ArraySegment buffer, CancellationToken cancellationToken) { if (this.receiveBuffers.Count == 0) { await this.receiveEvent.WaitAsync(cancellationToken).ConfigureAwait(false); } int bytesReceived = 0; bool endOfMessage = true; WebSocketMessageType messageType = WebSocketMessageType.Close; MessageData received = null; if (this.receiveBuffers.TryPeek(out received)) { messageType = received.MessageType; if (received.Buffer.Count <= buffer.Count) { this.receiveBuffers.TryDequeue(out received); received.Buffer.CopyTo(buffer); bytesReceived = received.Buffer.Count; endOfMessage = received.EndOfMessage; } else { received.Buffer.Slice(0, buffer.Count).CopyTo(buffer); bytesReceived = buffer.Count; endOfMessage = false; received.Buffer = received.Buffer.Slice(buffer.Count); } } return new WebSocketReceiveResult(bytesReceived, messageType, endOfMessage); } public override Task SendAsync(ArraySegment buffer, WebSocketMessageType messageType, bool endOfMessage, CancellationToken cancellationToken) { this.MessageSent?.Invoke(this, new MessageDataEventArgs() { Data = new MessageData() { Buffer = buffer, MessageType = messageType, EndOfMessage = endOfMessage, }, }); return Task.CompletedTask; } public class MessageData { public ArraySegment Buffer { get; set; } public WebSocketMessageType MessageType { get; set; } public bool EndOfMessage { get; set; } } public class MessageDataEventArgs : EventArgs { public MessageData Data { get; set; } } protected virtual void Dispose(bool disposing) { if (!disposedValue) { if (disposing) { this.receiveBuffers.Clear(); this.receiveEvent.Set(); } disposedValue = true; } } // // TODO: override finalizer only if 'Dispose(bool disposing)' has code to free unmanaged resources // ~MockWebSocket() // { // // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method // Dispose(disposing: false); // } public override void Dispose() { // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method Dispose(disposing: true); GC.SuppressFinalize(this); } } }