chunk websocket frame when buff > 15M (#981)
* ws is now chunked * remove chunk in example * fix build
This commit is contained in:
@@ -73,17 +73,7 @@ namespace cp
|
|||||||
|
|
||||||
memoryStream.Position = 0;
|
memoryStream.Position = 0;
|
||||||
|
|
||||||
const int bufferSize = 31 * 1024 * 1024; // must be lower than 32 * 1024 * 1024
|
await memoryStream.CopyToAsync(stdIn);
|
||||||
byte[] localBuffer = new byte[bufferSize];
|
|
||||||
while (true)
|
|
||||||
{
|
|
||||||
int numRead = await memoryStream.ReadAsync(localBuffer, 0, localBuffer.Length);
|
|
||||||
if (numRead <= 0)
|
|
||||||
{
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
await stdIn.WriteAsync(localBuffer, 0, numRead);
|
|
||||||
}
|
|
||||||
await stdIn.FlushAsync();
|
await stdIn.FlushAsync();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -71,6 +71,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "webApiDependencyInjection",
|
|||||||
EndProject
|
EndProject
|
||||||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "workerServiceDependencyInjection", "examples\workerServiceDependencyInjection\workerServiceDependencyInjection.csproj", "{05DC8884-AC54-4603-AC25-AE9D9F24E7AE}"
|
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "workerServiceDependencyInjection", "examples\workerServiceDependencyInjection\workerServiceDependencyInjection.csproj", "{05DC8884-AC54-4603-AC25-AE9D9F24E7AE}"
|
||||||
EndProject
|
EndProject
|
||||||
|
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "cp", "examples\cp\cp.csproj", "{CC41E248-2139-427E-8DD4-B047A8924FD2}"
|
||||||
|
EndProject
|
||||||
Global
|
Global
|
||||||
GlobalSection(SolutionConfigurationPlatforms) = preSolution
|
GlobalSection(SolutionConfigurationPlatforms) = preSolution
|
||||||
Debug|Any CPU = Debug|Any CPU
|
Debug|Any CPU = Debug|Any CPU
|
||||||
@@ -453,6 +455,18 @@ Global
|
|||||||
{05DC8884-AC54-4603-AC25-AE9D9F24E7AE}.Release|x64.Build.0 = Release|Any CPU
|
{05DC8884-AC54-4603-AC25-AE9D9F24E7AE}.Release|x64.Build.0 = Release|Any CPU
|
||||||
{05DC8884-AC54-4603-AC25-AE9D9F24E7AE}.Release|x86.ActiveCfg = Release|Any CPU
|
{05DC8884-AC54-4603-AC25-AE9D9F24E7AE}.Release|x86.ActiveCfg = Release|Any CPU
|
||||||
{05DC8884-AC54-4603-AC25-AE9D9F24E7AE}.Release|x86.Build.0 = Release|Any CPU
|
{05DC8884-AC54-4603-AC25-AE9D9F24E7AE}.Release|x86.Build.0 = Release|Any CPU
|
||||||
|
{CC41E248-2139-427E-8DD4-B047A8924FD2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
|
||||||
|
{CC41E248-2139-427E-8DD4-B047A8924FD2}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||||
|
{CC41E248-2139-427E-8DD4-B047A8924FD2}.Debug|x64.ActiveCfg = Debug|Any CPU
|
||||||
|
{CC41E248-2139-427E-8DD4-B047A8924FD2}.Debug|x64.Build.0 = Debug|Any CPU
|
||||||
|
{CC41E248-2139-427E-8DD4-B047A8924FD2}.Debug|x86.ActiveCfg = Debug|Any CPU
|
||||||
|
{CC41E248-2139-427E-8DD4-B047A8924FD2}.Debug|x86.Build.0 = Debug|Any CPU
|
||||||
|
{CC41E248-2139-427E-8DD4-B047A8924FD2}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||||
|
{CC41E248-2139-427E-8DD4-B047A8924FD2}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||||
|
{CC41E248-2139-427E-8DD4-B047A8924FD2}.Release|x64.ActiveCfg = Release|Any CPU
|
||||||
|
{CC41E248-2139-427E-8DD4-B047A8924FD2}.Release|x64.Build.0 = Release|Any CPU
|
||||||
|
{CC41E248-2139-427E-8DD4-B047A8924FD2}.Release|x86.ActiveCfg = Release|Any CPU
|
||||||
|
{CC41E248-2139-427E-8DD4-B047A8924FD2}.Release|x86.Build.0 = Release|Any CPU
|
||||||
EndGlobalSection
|
EndGlobalSection
|
||||||
GlobalSection(SolutionProperties) = preSolution
|
GlobalSection(SolutionProperties) = preSolution
|
||||||
HideSolutionNode = FALSE
|
HideSolutionNode = FALSE
|
||||||
@@ -489,6 +503,7 @@ Global
|
|||||||
{8E266190-AE6E-44A8-948D-BD974AA82428} = {B70AFB57-57C9-46DC-84BE-11B7DDD34B40}
|
{8E266190-AE6E-44A8-948D-BD974AA82428} = {B70AFB57-57C9-46DC-84BE-11B7DDD34B40}
|
||||||
{C0759F88-A010-4DEF-BD3B-E183D3328FFC} = {B70AFB57-57C9-46DC-84BE-11B7DDD34B40}
|
{C0759F88-A010-4DEF-BD3B-E183D3328FFC} = {B70AFB57-57C9-46DC-84BE-11B7DDD34B40}
|
||||||
{05DC8884-AC54-4603-AC25-AE9D9F24E7AE} = {B70AFB57-57C9-46DC-84BE-11B7DDD34B40}
|
{05DC8884-AC54-4603-AC25-AE9D9F24E7AE} = {B70AFB57-57C9-46DC-84BE-11B7DDD34B40}
|
||||||
|
{CC41E248-2139-427E-8DD4-B047A8924FD2} = {B70AFB57-57C9-46DC-84BE-11B7DDD34B40}
|
||||||
EndGlobalSection
|
EndGlobalSection
|
||||||
GlobalSection(ExtensibilityGlobals) = postSolution
|
GlobalSection(ExtensibilityGlobals) = postSolution
|
||||||
SolutionGuid = {049A763A-C891-4E8D-80CF-89DD3E22ADC7}
|
SolutionGuid = {049A763A-C891-4E8D-80CF-89DD3E22ADC7}
|
||||||
|
|||||||
@@ -21,6 +21,7 @@ namespace k8s
|
|||||||
/// </summary>
|
/// </summary>
|
||||||
public class StreamDemuxer : IStreamDemuxer
|
public class StreamDemuxer : IStreamDemuxer
|
||||||
{
|
{
|
||||||
|
private const int MAXFRAMESIZE = 15 * 1024 * 1024; // 15MB
|
||||||
private readonly WebSocket webSocket;
|
private readonly WebSocket webSocket;
|
||||||
private readonly Dictionary<byte, ByteBuffer> buffers = new Dictionary<byte, ByteBuffer>();
|
private readonly Dictionary<byte, ByteBuffer> buffers = new Dictionary<byte, ByteBuffer>();
|
||||||
private readonly CancellationTokenSource cts = new CancellationTokenSource();
|
private readonly CancellationTokenSource cts = new CancellationTokenSource();
|
||||||
@@ -156,15 +157,19 @@ namespace k8s
|
|||||||
public async Task Write(byte index, byte[] buffer, int offset, int count,
|
public async Task Write(byte index, byte[] buffer, int offset, int count,
|
||||||
CancellationToken cancellationToken = default)
|
CancellationToken cancellationToken = default)
|
||||||
{
|
{
|
||||||
var writeBuffer = ArrayPool<byte>.Shared.Rent(count + 1);
|
var writeBuffer = ArrayPool<byte>.Shared.Rent(Math.Min(count, MAXFRAMESIZE) + 1);
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
writeBuffer[0] = (byte)index;
|
writeBuffer[0] = index;
|
||||||
Array.Copy(buffer, offset, writeBuffer, 1, count);
|
for (var i = 0; i < count; i += MAXFRAMESIZE)
|
||||||
var segment = new ArraySegment<byte>(writeBuffer, 0, count + 1);
|
{
|
||||||
await webSocket.SendAsync(segment, WebSocketMessageType.Binary, false, cancellationToken)
|
var c = Math.Min(count - i, MAXFRAMESIZE);
|
||||||
.ConfigureAwait(false);
|
Buffer.BlockCopy(buffer, offset + i, writeBuffer, 1, c);
|
||||||
|
var segment = new ArraySegment<byte>(writeBuffer, 0, c + 1);
|
||||||
|
await webSocket.SendAsync(segment, WebSocketMessageType.Binary, false, cancellationToken)
|
||||||
|
.ConfigureAwait(false);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
finally
|
finally
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
<Project Sdk="Microsoft.NET.Sdk">
|
<Project Sdk="Microsoft.NET.Sdk">
|
||||||
<PropertyGroup>
|
<PropertyGroup>
|
||||||
<IsPackable>false</IsPackable>
|
<IsPackable>false</IsPackable>
|
||||||
<SignAssembly>true</SignAssembly>
|
<SignAssembly>true</SignAssembly>
|
||||||
@@ -9,6 +9,7 @@
|
|||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
|
|
||||||
<PackageReference Include="JsonPatch.Net" Version="2.0.3" />
|
<PackageReference Include="JsonPatch.Net" Version="2.0.3" />
|
||||||
|
<PackageReference Include="SharpZipLib" Version="1.3.3" />
|
||||||
|
|
||||||
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.3.0" />
|
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.3.0" />
|
||||||
|
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
using System;
|
using System;
|
||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
using System.Collections.ObjectModel;
|
using System.Collections.ObjectModel;
|
||||||
using System.Diagnostics;
|
using System.Diagnostics;
|
||||||
@@ -14,6 +14,9 @@ using k8s.Models;
|
|||||||
using k8s.Autorest;
|
using k8s.Autorest;
|
||||||
using Nito.AsyncEx;
|
using Nito.AsyncEx;
|
||||||
using Xunit;
|
using Xunit;
|
||||||
|
using ICSharpCode.SharpZipLib.Tar;
|
||||||
|
using System.Text;
|
||||||
|
using System.Security.Cryptography;
|
||||||
|
|
||||||
namespace k8s.E2E
|
namespace k8s.E2E
|
||||||
{
|
{
|
||||||
@@ -398,7 +401,7 @@ namespace k8s.E2E
|
|||||||
async Task<V1Pod> Pod()
|
async Task<V1Pod> Pod()
|
||||||
{
|
{
|
||||||
var pods = client.CoreV1.ListNamespacedPod(namespaceParameter);
|
var pods = client.CoreV1.ListNamespacedPod(namespaceParameter);
|
||||||
var pod = pods.Items.First();
|
var pod = pods.Items.First(p => p.Metadata.Name == podName);
|
||||||
while (pod.Status.Phase != "Running")
|
while (pod.Status.Phase != "Running")
|
||||||
{
|
{
|
||||||
await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(false);
|
await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(false);
|
||||||
@@ -548,6 +551,181 @@ namespace k8s.E2E
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
[MinikubeFact]
|
||||||
|
public async Task CopyToPodTestAsync()
|
||||||
|
{
|
||||||
|
var namespaceParameter = "default";
|
||||||
|
var podName = "k8scsharp-e2e-cp-pod";
|
||||||
|
|
||||||
|
var client = CreateClient();
|
||||||
|
|
||||||
|
async Task<int> CopyFileToPodAsync(string name, string @namespace, string container, Stream inputFileStream, string destinationFilePath, CancellationToken cancellationToken = default(CancellationToken))
|
||||||
|
{
|
||||||
|
// The callback which processes the standard input, standard output and standard error of exec method
|
||||||
|
var handler = new ExecAsyncCallback(async (stdIn, stdOut, stdError) =>
|
||||||
|
{
|
||||||
|
var fileInfo = new FileInfo(destinationFilePath);
|
||||||
|
try
|
||||||
|
{
|
||||||
|
using (var memoryStream = new MemoryStream())
|
||||||
|
{
|
||||||
|
using (var tarOutputStream = new TarOutputStream(memoryStream, Encoding.Default))
|
||||||
|
{
|
||||||
|
tarOutputStream.IsStreamOwner = false;
|
||||||
|
|
||||||
|
var fileSize = inputFileStream.Length;
|
||||||
|
var entry = TarEntry.CreateTarEntry(fileInfo.Name);
|
||||||
|
|
||||||
|
entry.Size = fileSize;
|
||||||
|
|
||||||
|
tarOutputStream.PutNextEntry(entry);
|
||||||
|
await inputFileStream.CopyToAsync(tarOutputStream).ConfigureAwait(false);
|
||||||
|
tarOutputStream.CloseEntry();
|
||||||
|
}
|
||||||
|
|
||||||
|
memoryStream.Position = 0;
|
||||||
|
|
||||||
|
await memoryStream.CopyToAsync(stdIn).ConfigureAwait(false);
|
||||||
|
await memoryStream.FlushAsync().ConfigureAwait(false);
|
||||||
|
stdIn.Close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
throw new IOException($"Copy command failed: {ex.Message}");
|
||||||
|
}
|
||||||
|
|
||||||
|
using StreamReader streamReader = new StreamReader(stdError);
|
||||||
|
while (streamReader.EndOfStream == false)
|
||||||
|
{
|
||||||
|
string error = await streamReader.ReadToEndAsync().ConfigureAwait(false);
|
||||||
|
throw new IOException($"Copy command failed: {error}");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
string destinationFolder = Path.GetDirectoryName(destinationFilePath).Replace("\\", "/");
|
||||||
|
|
||||||
|
return await client.NamespacedPodExecAsync(
|
||||||
|
name,
|
||||||
|
@namespace,
|
||||||
|
container,
|
||||||
|
new string[] { "tar", "-xmf", "-", "-C", destinationFolder },
|
||||||
|
false,
|
||||||
|
handler,
|
||||||
|
cancellationToken).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void Cleanup()
|
||||||
|
{
|
||||||
|
var pods = client.CoreV1.ListNamespacedPod(namespaceParameter);
|
||||||
|
while (pods.Items.Any(p => p.Metadata.Name == podName))
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
client.CoreV1.DeleteNamespacedPod(podName, namespaceParameter);
|
||||||
|
}
|
||||||
|
catch (HttpOperationException e)
|
||||||
|
{
|
||||||
|
if (e.Response.StatusCode == System.Net.HttpStatusCode.NotFound)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
Cleanup();
|
||||||
|
|
||||||
|
client.CoreV1.CreateNamespacedPod(
|
||||||
|
new V1Pod()
|
||||||
|
{
|
||||||
|
Metadata = new V1ObjectMeta { Name = podName, },
|
||||||
|
Spec = new V1PodSpec
|
||||||
|
{
|
||||||
|
Containers = new[]
|
||||||
|
{
|
||||||
|
new V1Container()
|
||||||
|
{
|
||||||
|
Name = "container",
|
||||||
|
Image = "ubuntu",
|
||||||
|
// Image = "busybox", // TODO not work with busybox
|
||||||
|
Command = new[] { "sleep" },
|
||||||
|
Args = new[] { "infinity" },
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
namespaceParameter);
|
||||||
|
|
||||||
|
var lines = new List<string>();
|
||||||
|
var started = new ManualResetEvent(false);
|
||||||
|
|
||||||
|
async Task<V1Pod> Pod()
|
||||||
|
{
|
||||||
|
var pods = client.CoreV1.ListNamespacedPod(namespaceParameter);
|
||||||
|
var pod = pods.Items.First(p => p.Metadata.Name == podName);
|
||||||
|
while (pod.Status.Phase != "Running")
|
||||||
|
{
|
||||||
|
await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(false);
|
||||||
|
return await Pod().ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
return pod;
|
||||||
|
}
|
||||||
|
|
||||||
|
var pod = await Pod().ConfigureAwait(false);
|
||||||
|
|
||||||
|
|
||||||
|
async Task AssertMd5sumAsync(string file, byte[] orig)
|
||||||
|
{
|
||||||
|
var ws = await client.WebSocketNamespacedPodExecAsync(
|
||||||
|
pod.Metadata.Name,
|
||||||
|
pod.Metadata.NamespaceProperty,
|
||||||
|
new string[] { "md5sum", file },
|
||||||
|
"container").ConfigureAwait(false);
|
||||||
|
|
||||||
|
var demux = new StreamDemuxer(ws);
|
||||||
|
demux.Start();
|
||||||
|
|
||||||
|
var buff = new byte[4096];
|
||||||
|
var stream = demux.GetStream(1, 1);
|
||||||
|
var read = stream.Read(buff, 0, 4096);
|
||||||
|
var remotemd5 = Encoding.Default.GetString(buff);
|
||||||
|
remotemd5 = remotemd5.Substring(0, 32);
|
||||||
|
|
||||||
|
var md5 = MD5.Create().ComputeHash(orig);
|
||||||
|
var localmd5 = BitConverter.ToString(md5).Replace("-", string.Empty).ToLower();
|
||||||
|
|
||||||
|
Assert.Equal(localmd5, remotemd5);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
//
|
||||||
|
{
|
||||||
|
// small
|
||||||
|
var content = new byte[1 * 1024 * 1024];
|
||||||
|
new Random().NextBytes(content);
|
||||||
|
await CopyFileToPodAsync(pod.Metadata.Name, pod.Metadata.NamespaceProperty, "container", new MemoryStream(content), "/tmp/test").ConfigureAwait(false);
|
||||||
|
await AssertMd5sumAsync("/tmp/test", content).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
// big
|
||||||
|
var content = new byte[40 * 1024 * 1024];
|
||||||
|
new Random().NextBytes(content);
|
||||||
|
await CopyFileToPodAsync(pod.Metadata.Name, pod.Metadata.NamespaceProperty, "container", new MemoryStream(content), "/tmp/test").ConfigureAwait(false);
|
||||||
|
await AssertMd5sumAsync("/tmp/test", content).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
Cleanup();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public static IKubernetes CreateClient()
|
public static IKubernetes CreateClient()
|
||||||
{
|
{
|
||||||
return new Kubernetes(KubernetesClientConfiguration.BuildDefaultConfig());
|
return new Kubernetes(KubernetesClientConfiguration.BuildDefaultConfig());
|
||||||
|
|||||||
Reference in New Issue
Block a user