Inline watcher (#681)

* ignore more files

* inline watcher

* remove watcher handler

* generated files

* mark LineSeparatedHttpContent internal

* method removed

* fix warning
This commit is contained in:
Boshi Lian
2021-09-14 11:20:05 -07:00
committed by GitHub
parent d66c914c86
commit b9032960e9
10 changed files with 1965 additions and 813 deletions

3
.gitignore vendored
View File

@@ -13,3 +13,6 @@ bin/
# JetBrains Rider # JetBrains Rider
.idea/ .idea/
*.sln.iml *.sln.iml
launchSettings.json
*.DotSettings

View File

@@ -29,8 +29,7 @@ namespace httpClientFactory
serviceProvider.GetRequiredService<KubernetesClientConfiguration>(), serviceProvider.GetRequiredService<KubernetesClientConfiguration>(),
httpClient); httpClient);
}) })
.ConfigurePrimaryHttpMessageHandler(config.CreateDefaultHttpClientHandler) .ConfigurePrimaryHttpMessageHandler(config.CreateDefaultHttpClientHandler);
.AddHttpMessageHandler(KubernetesClientConfiguration.CreateWatchHandler);
// Add the class that uses the client // Add the class that uses the client
services.AddHostedService<PodListHostedService>(); services.AddHostedService<PodListHostedService>();

View File

@@ -366,6 +366,7 @@ namespace k8s
HttpResponseMessage _httpResponse = null; HttpResponseMessage _httpResponse = null;
_httpRequest.Method = HttpMethod.{{Method}}; _httpRequest.Method = HttpMethod.{{Method}};
_httpRequest.RequestUri = new System.Uri(_url); _httpRequest.RequestUri = new System.Uri(_url);
_httpRequest.Version = HttpVersion.Version20;
// Set Headers // Set Headers
@@ -403,7 +404,7 @@ namespace k8s
ServiceClientTracing.SendRequest(_invocationId, _httpRequest); ServiceClientTracing.SendRequest(_invocationId, _httpRequest);
} }
cancellationToken.ThrowIfCancellationRequested(); cancellationToken.ThrowIfCancellationRequested();
_httpResponse = await HttpClient.SendAsync(_httpRequest, cancellationToken).ConfigureAwait(false); _httpResponse = await HttpClient.SendAsync(_httpRequest, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false);
if (_shouldTrace) if (_shouldTrace)
{ {
ServiceClientTracing.ReceiveResponse(_invocationId, _httpResponse); ServiceClientTracing.ReceiveResponse(_invocationId, _httpResponse);
@@ -445,6 +446,12 @@ namespace k8s
_result.Body = await _httpResponse.Content.ReadAsStreamAsync().ConfigureAwait(false); _result.Body = await _httpResponse.Content.ReadAsStreamAsync().ConfigureAwait(false);
{{/IfReturnType operation "stream"}} {{/IfReturnType operation "stream"}}
{{#IfReturnType operation "obj"}} {{#IfReturnType operation "obj"}}
{{#IfParamCotains operation "watch"}}
if (watch == true)
{
_httpResponse.Content = new LineSeparatedHttpContent(_httpResponse.Content, cancellationToken);
}
{{/IfParamCotains operation "watch"}}
_responseContent = await _httpResponse.Content.ReadAsStringAsync().ConfigureAwait(false); _responseContent = await _httpResponse.Content.ReadAsStringAsync().ConfigureAwait(false);
try try
{ {

View File

@@ -4,15 +4,11 @@ using NSwag;
using Nustache.Core; using Nustache.Core;
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Data.Common;
using System.Globalization;
using System.IO; using System.IO;
using System.Linq; using System.Linq;
using System.Reflection.Metadata;
using System.Runtime.CompilerServices; using System.Runtime.CompilerServices;
using System.Security; using System.Security;
using System.Text.RegularExpressions; using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace KubernetesWatchGenerator namespace KubernetesWatchGenerator
@@ -117,6 +113,7 @@ namespace KubernetesWatchGenerator
Helpers.Register(nameof(GetRequestMethod), GetRequestMethod); Helpers.Register(nameof(GetRequestMethod), GetRequestMethod);
Helpers.Register(nameof(EscapeDataString), EscapeDataString); Helpers.Register(nameof(EscapeDataString), EscapeDataString);
Helpers.Register(nameof(IfReturnType), IfReturnType); Helpers.Register(nameof(IfReturnType), IfReturnType);
Helpers.Register(nameof(IfParamCotains), IfParamCotains);
Helpers.Register(nameof(GetModelCtorParam), GetModelCtorParam); Helpers.Register(nameof(GetModelCtorParam), GetModelCtorParam);
Helpers.Register(nameof(IfType), IfType); Helpers.Register(nameof(IfType), IfType);
@@ -642,7 +639,6 @@ namespace KubernetesWatchGenerator
{ {
return GetDotNetType(schema.Type, parent.Name, parent.IsRequired, schema.Format); return GetDotNetType(schema.Type, parent.Name, parent.IsRequired, schema.Format);
} }
} }
return GetDotNetType(parent.Type, parent.Name, parent.IsRequired, parent.Format); return GetDotNetType(parent.Type, parent.Name, parent.IsRequired, parent.Format);
@@ -683,7 +679,6 @@ namespace KubernetesWatchGenerator
{ {
context.Write($" = null"); context.Write($" = null");
} }
} }
else if (arguments != null && arguments.Count > 0 && arguments[0] != null && arguments[0] is string) else if (arguments != null && arguments.Count > 0 && arguments[0] != null && arguments[0] is string)
{ {
@@ -763,7 +758,6 @@ namespace KubernetesWatchGenerator
break; break;
case "field": case "field":
return GetDotNetName(jsonName, "fieldctor").ToPascalCase(); return GetDotNetName(jsonName, "fieldctor").ToPascalCase();
} }
return jsonName.ToCamelCase(); return jsonName.ToCamelCase();
@@ -973,6 +967,36 @@ namespace KubernetesWatchGenerator
} }
} }
private static void IfParamCotains(RenderContext context, IList<object> arguments, IDictionary<string, object> options,
RenderBlock fn, RenderBlock inverse)
{
var operation = arguments?.FirstOrDefault() as SwaggerOperation;
if (operation != null)
{
string name = null;
if (arguments.Count > 1)
{
name = arguments[1] as string;
}
bool found = false;
foreach (var param in operation.Parameters)
{
if (param.Name == name)
{
found = true;
break;
}
}
if (found)
{
fn(null);
}
}
}
private static void EscapeDataString(RenderContext context, IList<object> arguments, IDictionary<string, object> options, private static void EscapeDataString(RenderContext context, IList<object> arguments, IDictionary<string, object> options,
RenderBlock fn, RenderBlock inverse) RenderBlock fn, RenderBlock inverse)
{ {

View File

@@ -1,8 +0,0 @@
{
"profiles": {
"KubernetesGenerator": {
"commandName": "Project",
"commandLineArgs": "x D:\\workspace\\k8scsharp\\src\\KubernetesClient\\generated"
}
}
}

View File

@@ -247,7 +247,6 @@ namespace k8s
} }
} }
AppendDelegatingHandler<WatcherDelegatingHandler>();
HttpClient = new HttpClient(FirstMessageHandler, false); HttpClient = new HttpClient(FirstMessageHandler, false);
} }

View File

@@ -51,7 +51,5 @@ namespace k8s
handler.ClientCertificates.Add(cert); handler.ClientCertificates.Add(cert);
} }
} }
public static DelegatingHandler CreateWatchHandler() => new WatcherDelegatingHandler();
} }
} }

View File

@@ -8,31 +8,39 @@ using System.Threading.Tasks;
namespace k8s namespace k8s
{ {
/// <summary> internal class LineSeparatedHttpContent : HttpContent
/// This HttpDelegatingHandler is to rewrite the response and return first line to autorest client
/// then use WatchExt to create a watch object which interact with the replaced http response to get watch works.
/// </summary>
internal class WatcherDelegatingHandler : DelegatingHandler
{ {
protected override async Task<HttpResponseMessage> SendAsync( private readonly HttpContent _originContent;
HttpRequestMessage request, private readonly CancellationToken _cancellationToken;
CancellationToken cancellationToken) private Stream _originStream;
public LineSeparatedHttpContent(HttpContent originContent, CancellationToken cancellationToken)
{ {
request.Version = HttpVersion.Version20; _originContent = originContent;
var originResponse = await base.SendAsync(request, cancellationToken).ConfigureAwait(false); _cancellationToken = cancellationToken;
}
// all watches are GETs, so we can ignore others public TextReader StreamReader { get; private set; }
if (originResponse.IsSuccessStatusCode && request.Method == HttpMethod.Get)
{
var query = request.RequestUri.Query;
var index = query.IndexOf("watch=true", StringComparison.InvariantCulture);
if (index > 0 && (query[index - 1] == '&' || query[index - 1] == '?'))
{
originResponse.Content = new LineSeparatedHttpContent(originResponse.Content, cancellationToken);
}
}
return originResponse; protected override async Task SerializeToStreamAsync(Stream stream, TransportContext context)
{
_originStream = await _originContent.ReadAsStreamAsync().ConfigureAwait(false);
var reader = new PeekableStreamReader(new CancelableStream(_originStream, _cancellationToken));
StreamReader = reader;
var firstLine = await reader.PeekLineAsync().ConfigureAwait(false);
var writer = new StreamWriter(stream);
await writer.WriteAsync(firstLine).ConfigureAwait(false);
await writer.FlushAsync().ConfigureAwait(false);
}
protected override bool TryComputeLength(out long length)
{
length = 0;
return false;
} }
internal class CancelableStream : Stream internal class CancelableStream : Stream
@@ -143,42 +151,6 @@ namespace k8s
} }
} }
public class LineSeparatedHttpContent : HttpContent
{
private readonly HttpContent _originContent;
private readonly CancellationToken _cancellationToken;
private Stream _originStream;
public LineSeparatedHttpContent(HttpContent originContent, CancellationToken cancellationToken)
{
_originContent = originContent;
_cancellationToken = cancellationToken;
}
public TextReader StreamReader { get; private set; }
protected override async Task SerializeToStreamAsync(Stream stream, TransportContext context)
{
_originStream = await _originContent.ReadAsStreamAsync().ConfigureAwait(false);
var reader = new PeekableStreamReader(new CancelableStream(_originStream, _cancellationToken));
StreamReader = reader;
var firstLine = await reader.PeekLineAsync().ConfigureAwait(false);
var writer = new StreamWriter(stream);
await writer.WriteAsync(firstLine).ConfigureAwait(false);
await writer.FlushAsync().ConfigureAwait(false);
}
protected override bool TryComputeLength(out long length)
{
length = 0;
return false;
}
}
internal class PeekableStreamReader : TextReader internal class PeekableStreamReader : TextReader
{ {
private readonly Queue<string> _buffer; private readonly Queue<string> _buffer;

View File

@@ -30,7 +30,7 @@ namespace k8s
{ {
var response = await responseTask.ConfigureAwait(false); var response = await responseTask.ConfigureAwait(false);
if (!(response.Response.Content is WatcherDelegatingHandler.LineSeparatedHttpContent content)) if (!(response.Response.Content is LineSeparatedHttpContent content))
{ {
throw new KubernetesClientException("not a watchable request or failed response"); throw new KubernetesClientException("not a watchable request or failed response");
} }

File diff suppressed because it is too large Load Diff