diff --git a/examples/cp/Cp.cs b/examples/cp/Cp.cs index 896310b..c6b44b9 100644 --- a/examples/cp/Cp.cs +++ b/examples/cp/Cp.cs @@ -73,17 +73,7 @@ namespace cp memoryStream.Position = 0; - const int bufferSize = 31 * 1024 * 1024; // must be lower than 32 * 1024 * 1024 - byte[] localBuffer = new byte[bufferSize]; - while (true) - { - int numRead = await memoryStream.ReadAsync(localBuffer, 0, localBuffer.Length); - if (numRead <= 0) - { - break; - } - await stdIn.WriteAsync(localBuffer, 0, numRead); - } + await memoryStream.CopyToAsync(stdIn); await stdIn.FlushAsync(); } diff --git a/kubernetes-client.sln b/kubernetes-client.sln index 488798e..83884ea 100644 --- a/kubernetes-client.sln +++ b/kubernetes-client.sln @@ -71,6 +71,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "webApiDependencyInjection", EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "workerServiceDependencyInjection", "examples\workerServiceDependencyInjection\workerServiceDependencyInjection.csproj", "{05DC8884-AC54-4603-AC25-AE9D9F24E7AE}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "cp", "examples\cp\cp.csproj", "{CC41E248-2139-427E-8DD4-B047A8924FD2}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -453,6 +455,18 @@ Global {05DC8884-AC54-4603-AC25-AE9D9F24E7AE}.Release|x64.Build.0 = Release|Any CPU {05DC8884-AC54-4603-AC25-AE9D9F24E7AE}.Release|x86.ActiveCfg = Release|Any CPU {05DC8884-AC54-4603-AC25-AE9D9F24E7AE}.Release|x86.Build.0 = Release|Any CPU + {CC41E248-2139-427E-8DD4-B047A8924FD2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {CC41E248-2139-427E-8DD4-B047A8924FD2}.Debug|Any CPU.Build.0 = Debug|Any CPU + {CC41E248-2139-427E-8DD4-B047A8924FD2}.Debug|x64.ActiveCfg = Debug|Any CPU + {CC41E248-2139-427E-8DD4-B047A8924FD2}.Debug|x64.Build.0 = Debug|Any CPU + {CC41E248-2139-427E-8DD4-B047A8924FD2}.Debug|x86.ActiveCfg = Debug|Any CPU + {CC41E248-2139-427E-8DD4-B047A8924FD2}.Debug|x86.Build.0 = Debug|Any CPU + {CC41E248-2139-427E-8DD4-B047A8924FD2}.Release|Any CPU.ActiveCfg = Release|Any CPU + {CC41E248-2139-427E-8DD4-B047A8924FD2}.Release|Any CPU.Build.0 = Release|Any CPU + {CC41E248-2139-427E-8DD4-B047A8924FD2}.Release|x64.ActiveCfg = Release|Any CPU + {CC41E248-2139-427E-8DD4-B047A8924FD2}.Release|x64.Build.0 = Release|Any CPU + {CC41E248-2139-427E-8DD4-B047A8924FD2}.Release|x86.ActiveCfg = Release|Any CPU + {CC41E248-2139-427E-8DD4-B047A8924FD2}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -489,6 +503,7 @@ Global {8E266190-AE6E-44A8-948D-BD974AA82428} = {B70AFB57-57C9-46DC-84BE-11B7DDD34B40} {C0759F88-A010-4DEF-BD3B-E183D3328FFC} = {B70AFB57-57C9-46DC-84BE-11B7DDD34B40} {05DC8884-AC54-4603-AC25-AE9D9F24E7AE} = {B70AFB57-57C9-46DC-84BE-11B7DDD34B40} + {CC41E248-2139-427E-8DD4-B047A8924FD2} = {B70AFB57-57C9-46DC-84BE-11B7DDD34B40} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {049A763A-C891-4E8D-80CF-89DD3E22ADC7} diff --git a/src/KubernetesClient/StreamDemuxer.cs b/src/KubernetesClient/StreamDemuxer.cs index 90c8912..1d4ac74 100644 --- a/src/KubernetesClient/StreamDemuxer.cs +++ b/src/KubernetesClient/StreamDemuxer.cs @@ -21,6 +21,7 @@ namespace k8s /// public class StreamDemuxer : IStreamDemuxer { + private const int MAXFRAMESIZE = 15 * 1024 * 1024; // 15MB private readonly WebSocket webSocket; private readonly Dictionary buffers = new Dictionary(); private readonly CancellationTokenSource cts = new CancellationTokenSource(); @@ -156,15 +157,19 @@ namespace k8s public async Task Write(byte index, byte[] buffer, int offset, int count, CancellationToken cancellationToken = default) { - var writeBuffer = ArrayPool.Shared.Rent(count + 1); + var writeBuffer = ArrayPool.Shared.Rent(Math.Min(count, MAXFRAMESIZE) + 1); try { - writeBuffer[0] = (byte)index; - Array.Copy(buffer, offset, writeBuffer, 1, count); - var segment = new ArraySegment(writeBuffer, 0, count + 1); - await webSocket.SendAsync(segment, WebSocketMessageType.Binary, false, cancellationToken) - .ConfigureAwait(false); + writeBuffer[0] = index; + for (var i = 0; i < count; i += MAXFRAMESIZE) + { + var c = Math.Min(count - i, MAXFRAMESIZE); + Buffer.BlockCopy(buffer, offset + i, writeBuffer, 1, c); + var segment = new ArraySegment(writeBuffer, 0, c + 1); + await webSocket.SendAsync(segment, WebSocketMessageType.Binary, false, cancellationToken) + .ConfigureAwait(false); + } } finally { diff --git a/tests/E2E.Tests/E2E.Tests.csproj b/tests/E2E.Tests/E2E.Tests.csproj index 27b6d42..b2fcd7a 100644 --- a/tests/E2E.Tests/E2E.Tests.csproj +++ b/tests/E2E.Tests/E2E.Tests.csproj @@ -1,4 +1,4 @@ - + false true @@ -9,6 +9,7 @@ + diff --git a/tests/E2E.Tests/MinikubeTests.cs b/tests/E2E.Tests/MinikubeTests.cs index ec2cef1..bee75e8 100644 --- a/tests/E2E.Tests/MinikubeTests.cs +++ b/tests/E2E.Tests/MinikubeTests.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Collections.Generic; using System.Collections.ObjectModel; using System.Diagnostics; @@ -14,6 +14,9 @@ using k8s.Models; using k8s.Autorest; using Nito.AsyncEx; using Xunit; +using ICSharpCode.SharpZipLib.Tar; +using System.Text; +using System.Security.Cryptography; namespace k8s.E2E { @@ -398,7 +401,7 @@ namespace k8s.E2E async Task Pod() { var pods = client.CoreV1.ListNamespacedPod(namespaceParameter); - var pod = pods.Items.First(); + var pod = pods.Items.First(p => p.Metadata.Name == podName); while (pod.Status.Phase != "Running") { await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(false); @@ -548,6 +551,181 @@ namespace k8s.E2E } + [MinikubeFact] + public async Task CopyToPodTestAsync() + { + var namespaceParameter = "default"; + var podName = "k8scsharp-e2e-cp-pod"; + + var client = CreateClient(); + + async Task CopyFileToPodAsync(string name, string @namespace, string container, Stream inputFileStream, string destinationFilePath, CancellationToken cancellationToken = default(CancellationToken)) + { + // The callback which processes the standard input, standard output and standard error of exec method + var handler = new ExecAsyncCallback(async (stdIn, stdOut, stdError) => + { + var fileInfo = new FileInfo(destinationFilePath); + try + { + using (var memoryStream = new MemoryStream()) + { + using (var tarOutputStream = new TarOutputStream(memoryStream, Encoding.Default)) + { + tarOutputStream.IsStreamOwner = false; + + var fileSize = inputFileStream.Length; + var entry = TarEntry.CreateTarEntry(fileInfo.Name); + + entry.Size = fileSize; + + tarOutputStream.PutNextEntry(entry); + await inputFileStream.CopyToAsync(tarOutputStream).ConfigureAwait(false); + tarOutputStream.CloseEntry(); + } + + memoryStream.Position = 0; + + await memoryStream.CopyToAsync(stdIn).ConfigureAwait(false); + await memoryStream.FlushAsync().ConfigureAwait(false); + stdIn.Close(); + } + } + catch (Exception ex) + { + throw new IOException($"Copy command failed: {ex.Message}"); + } + + using StreamReader streamReader = new StreamReader(stdError); + while (streamReader.EndOfStream == false) + { + string error = await streamReader.ReadToEndAsync().ConfigureAwait(false); + throw new IOException($"Copy command failed: {error}"); + } + }); + + string destinationFolder = Path.GetDirectoryName(destinationFilePath).Replace("\\", "/"); + + return await client.NamespacedPodExecAsync( + name, + @namespace, + container, + new string[] { "tar", "-xmf", "-", "-C", destinationFolder }, + false, + handler, + cancellationToken).ConfigureAwait(false); + } + + + void Cleanup() + { + var pods = client.CoreV1.ListNamespacedPod(namespaceParameter); + while (pods.Items.Any(p => p.Metadata.Name == podName)) + { + try + { + client.CoreV1.DeleteNamespacedPod(podName, namespaceParameter); + } + catch (HttpOperationException e) + { + if (e.Response.StatusCode == System.Net.HttpStatusCode.NotFound) + { + return; + } + } + } + } + + try + { + Cleanup(); + + client.CoreV1.CreateNamespacedPod( + new V1Pod() + { + Metadata = new V1ObjectMeta { Name = podName, }, + Spec = new V1PodSpec + { + Containers = new[] + { + new V1Container() + { + Name = "container", + Image = "ubuntu", + // Image = "busybox", // TODO not work with busybox + Command = new[] { "sleep" }, + Args = new[] { "infinity" }, + }, + }, + }, + }, + namespaceParameter); + + var lines = new List(); + var started = new ManualResetEvent(false); + + async Task Pod() + { + var pods = client.CoreV1.ListNamespacedPod(namespaceParameter); + var pod = pods.Items.First(p => p.Metadata.Name == podName); + while (pod.Status.Phase != "Running") + { + await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(false); + return await Pod().ConfigureAwait(false); + } + + return pod; + } + + var pod = await Pod().ConfigureAwait(false); + + + async Task AssertMd5sumAsync(string file, byte[] orig) + { + var ws = await client.WebSocketNamespacedPodExecAsync( + pod.Metadata.Name, + pod.Metadata.NamespaceProperty, + new string[] { "md5sum", file }, + "container").ConfigureAwait(false); + + var demux = new StreamDemuxer(ws); + demux.Start(); + + var buff = new byte[4096]; + var stream = demux.GetStream(1, 1); + var read = stream.Read(buff, 0, 4096); + var remotemd5 = Encoding.Default.GetString(buff); + remotemd5 = remotemd5.Substring(0, 32); + + var md5 = MD5.Create().ComputeHash(orig); + var localmd5 = BitConverter.ToString(md5).Replace("-", string.Empty).ToLower(); + + Assert.Equal(localmd5, remotemd5); + } + + + // + { + // small + var content = new byte[1 * 1024 * 1024]; + new Random().NextBytes(content); + await CopyFileToPodAsync(pod.Metadata.Name, pod.Metadata.NamespaceProperty, "container", new MemoryStream(content), "/tmp/test").ConfigureAwait(false); + await AssertMd5sumAsync("/tmp/test", content).ConfigureAwait(false); + } + + { + // big + var content = new byte[40 * 1024 * 1024]; + new Random().NextBytes(content); + await CopyFileToPodAsync(pod.Metadata.Name, pod.Metadata.NamespaceProperty, "container", new MemoryStream(content), "/tmp/test").ConfigureAwait(false); + await AssertMd5sumAsync("/tmp/test", content).ConfigureAwait(false); + } + } + finally + { + Cleanup(); + } + } + public static IKubernetes CreateClient() { return new Kubernetes(KubernetesClientConfiguration.BuildDefaultConfig());