introduce watch api

This commit is contained in:
Boshi Lian
2017-10-17 03:59:30 +08:00
parent eb77c23bb0
commit b5b95dfc4e
3 changed files with 163 additions and 1 deletions

View File

@@ -45,7 +45,7 @@ namespace k8s
// set credentails for the kubernernet client
this.SetCredentials(config, handler);
this.InitializeHttpClient(handler);
this.InitializeHttpClient(handler, new DelegatingHandler[]{new WatcherDelegatingHandler()});
}
private X509Certificate2 CaCert { get; set; }
@@ -78,6 +78,7 @@ namespace k8s
!string.IsNullOrWhiteSpace(config.ClientKey)))
{
var cert = Utils.GeneratePfx(config);
handler.ClientCertificates.Add(cert);
}
}

100
src/Watcher.cs Normal file
View File

@@ -0,0 +1,100 @@
using System;
using System.IO;
using System.Runtime.Serialization;
using System.Threading;
using System.Threading.Tasks;
using k8s.Exceptions;
using Microsoft.Rest;
using Microsoft.Rest.Serialization;
namespace k8s
{
public enum WatchEventType
{
[EnumMember(Value = "ADDED")] Added,
[EnumMember(Value = "MODIFIED")] Modified,
[EnumMember(Value = "DELETED")] Deleted,
[EnumMember(Value = "ERROR")] Error
}
public class Watcher<T> : IDisposable
{
private readonly StreamReader _streamReader;
private readonly CancellationTokenSource _cts;
public Watcher(StreamReader streamReader, Action<WatchEventType, T> onEvent, Action<Exception> onError)
{
_streamReader = streamReader;
OnEvent += onEvent;
OnError += onError;
_cts = new CancellationTokenSource();
var token = _cts.Token;
Task.Run(async () =>
{
while (!streamReader.EndOfStream)
{
if (token.IsCancellationRequested)
{
return;
}
try
{
var line = await streamReader.ReadLineAsync();
var @event = SafeJsonConvert.DeserializeObject<WatchEvent>(line);
OnEvent?.Invoke(@event.Type, @event.Object);
}
catch (Exception e)
{
OnError?.Invoke(e);
}
}
}, token);
}
public void Dispose()
{
_cts.Cancel();
_streamReader.Dispose();
}
public event Action<WatchEventType, T> OnEvent;
public event Action<Exception> OnError;
public class WatchEvent
{
public WatchEventType Type { get; set; }
public T Object { get; set; }
}
}
public static class WatcherExt
{
public static Watcher<T> Watch<T>(this HttpOperationResponse response,
Action<WatchEventType, T> onEvent,
Action<Exception> onError = null)
{
if (!(response.Response.Content is WatcherDelegatingHandler.LineSeparatedHttpContent content))
{
throw new KubernetesClientException("not a watchable request or failed response");
}
return new Watcher<T>(content.StreamReader, onEvent, onError);
}
public static Watcher<T> Watch<T>(this HttpOperationResponse<T> response,
Action<WatchEventType, T> onEvent,
Action<Exception> onError = null)
{
return Watch((HttpOperationResponse) response, onEvent, onError);
}
}
}

View File

@@ -0,0 +1,61 @@
using System.IO;
using System.Net;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
namespace k8s
{
internal class WatcherDelegatingHandler : DelegatingHandler
{
protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request,
CancellationToken cancellationToken)
{
var originResponse = await base.SendAsync(request, cancellationToken);
if (originResponse.IsSuccessStatusCode)
{
if ($"{request.RequestUri.Query}".Contains("watch=true"))
{
originResponse.Content = new LineSeparatedHttpContent(originResponse.Content);
}
}
return originResponse;
}
internal class LineSeparatedHttpContent : HttpContent
{
private readonly HttpContent _originContent;
private Stream _originStream;
public LineSeparatedHttpContent(HttpContent originContent)
{
_originContent = originContent;
}
internal StreamReader StreamReader { get; private set; }
protected override async Task SerializeToStreamAsync(Stream stream, TransportContext context)
{
_originStream = await _originContent.ReadAsStreamAsync();
StreamReader = new StreamReader(_originStream);
var firstLine = await StreamReader.ReadLineAsync();
var writer = new StreamWriter(stream);
// using (writer) // leave open
{
await writer.WriteAsync(firstLine);
await writer.FlushAsync();
}
}
protected override bool TryComputeLength(out long length)
{
length = 0;
return false;
}
}
}
}