Fix an issue where growing the ByteBuffer would fail (#167)

* Fix an issue where growing a buffer when the read water mark = 0 would fail

* Simplify fix
This commit is contained in:
Frederik Carlier
2018-06-01 20:26:49 +02:00
committed by Brendan Burns
parent 86abfc1b7c
commit 6f38b7299e
2 changed files with 163 additions and 1 deletions

View File

@@ -67,6 +67,14 @@ namespace k8s
get { return this.buffer.Length; } get { return this.buffer.Length; }
} }
/// <summary>
/// Gets the maximum allowed size of the buffer.
/// </summary>
public int MaximumSize
{
get { return this.maximumSize; }
}
/// <summary> /// <summary>
/// Gets the offset from which the next byte will be read. Increased every time a caller reads data. /// Gets the offset from which the next byte will be read. Increased every time a caller reads data.
/// </summary> /// </summary>
@@ -259,6 +267,11 @@ namespace k8s
return toRead; return toRead;
} }
/// <summary>
/// The event which is raised when the buffer is resized.
/// </summary>
public event EventHandler OnResize;
/// <summary> /// <summary>
/// Increases the buffer size. Any call to this method must be protected with a lock. /// Increases the buffer size. Any call to this method must be protected with a lock.
/// </summary> /// </summary>
@@ -274,7 +287,7 @@ namespace k8s
var newBuffer = ArrayPool<byte>.Shared.Rent(size); var newBuffer = ArrayPool<byte>.Shared.Rent(size);
if (this.WriteWaterMark <= this.ReadWaterMark) if (this.WriteWaterMark < this.ReadWaterMark)
{ {
// Copy the data at the start // Copy the data at the start
Array.Copy(this.buffer, 0, newBuffer, 0, this.WriteWaterMark); Array.Copy(this.buffer, 0, newBuffer, 0, this.WriteWaterMark);
@@ -298,6 +311,7 @@ namespace k8s
this.buffer = newBuffer; this.buffer = newBuffer;
Debug.Assert(this.bytesRead + this.AvailableReadableBytes == this.bytesWritten); Debug.Assert(this.bytesRead + this.AvailableReadableBytes == this.bytesWritten);
this.OnResize?.Invoke(this, EventArgs.Empty);
} }
} }
} }

View File

@@ -1,4 +1,6 @@
using Nito.AsyncEx;
using System; using System;
using System.Security.Cryptography;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Xunit; using Xunit;
@@ -341,5 +343,151 @@ namespace k8s.Tests
read = buffer.Read(readData, 0, 0x10); read = buffer.Read(readData, 0, 0x10);
Assert.Equal(0, read); Assert.Equal(0, read);
} }
/// <summary>
/// Tests growing the byte buffer on the first write. This is a special case where
/// ReadWaterMark = WriteWaterMark = 0
/// </summary>
[Fact]
public void GrowOnFirstWriteTest()
{
// In the current implementation, the minimum size of the buffer will be 16 bytes,
// but that's not guaranteed.
ByteBuffer buffer = new ByteBuffer(1, 128);
byte[] data = new byte[buffer.Size + 1];
RandomNumberGenerator.Create().GetBytes(data);
byte[] output = new byte[buffer.Size + 1];
buffer.Write(data, 0, data.Length);
Assert.Equal(data.Length, buffer.AvailableReadableBytes);
buffer.Read(output, 0, output.Length);
Assert.Equal(data, output);
}
/// <summary>
/// Tests growing the byte buffer on the second write.
/// </summary>
[Fact]
public void GrowOnSecondFirstWriteTest()
{
// In the current implementation, the minimum size of the buffer will be 16 bytes,
// but that's not guaranteed.
ByteBuffer buffer = new ByteBuffer(1, 128);
byte[] data = new byte[buffer.Size + 1];
RandomNumberGenerator.Create().GetBytes(data);
byte[] output = new byte[buffer.Size + 1];
buffer.Write(data, 0, 1);
buffer.Write(data, 0, data.Length);
Assert.Equal(data.Length + 1, buffer.AvailableReadableBytes);
buffer.Read(output, 0, 1);
buffer.Read(output, 0, output.Length);
Assert.Equal(data, output);
}
/// <summary>
/// Tests reading from the buffer before data has been written, and makes sure
/// data is read correctly.
/// </summary>
/// <returns>
/// A <see cref="Task"/> which represents the asynchronous operation.
/// </returns>
[Fact]
public async Task ReadFirstTest()
{
ByteBuffer buffer = new ByteBuffer(1, 128);
byte[] data = new byte[buffer.Size + 1];
RandomNumberGenerator.Create().GetBytes(data);
byte[] output = new byte[buffer.Size + 1];
var readTask = Task.Run(() => buffer.Read(output, 0, output.Length));
await Task.Delay(TimeSpan.FromSeconds(1));
buffer.Write(data, 0, data.Length);
await readTask;
}
#if NETCOREAPP2_0
/// <summary>
/// A simpel test which will use a random number generator to write lots of data to
/// the buffer, and read that data using another thread. Makes sure the hashes of the
/// data written and read matches.
/// </summary>
/// <returns>
/// A <see cref="Task"/> which represents the asynchronous operation.
/// </returns>
[Fact]
public async Task RandomReadWriteTest()
{
ByteBuffer buffer = new ByteBuffer(1, 1024 * 1024);
var generatorTask = Task.Run(() => this.Generate(buffer, SHA256.Create()));
var consumerTask = Task.Run(() => this.Consume(buffer, SHA256.Create()));
await Task.WhenAll(generatorTask, consumerTask);
var generatorHash = await generatorTask;
var consumerHash = await consumerTask;
Assert.Equal(generatorHash, consumerHash);
}
private byte[] Generate(ByteBuffer buffer, HashAlgorithm hash)
{
RandomNumberGenerator g = RandomNumberGenerator.Create();
byte[] next = new byte[32];
int iterations = 0;
while (buffer.Size < buffer.MaximumSize)
{
iterations++;
g.GetBytes(next);
buffer.Write(next, 0, next.Length);
hash.TransformBlock(next, 0, next.Length, null, 0);
}
buffer.WriteEnd();
hash.TransformFinalBlock(next, 0, 0);
return hash.Hash;
}
private byte[] Consume(ByteBuffer buffer, HashAlgorithm hash)
{
byte[] data = new byte[32];
AsyncAutoResetEvent onBufferResized = new AsyncAutoResetEvent();
buffer.OnResize += (sender, e) => onBufferResized.Set();
int read;
int iterations = 0;
while ((read = buffer.Read(data, 0, data.Length)) > 0)
{
iterations++;
hash.TransformBlock(data, 0, read, null, 0);
// The reader task is probably much faster than the writer, as the writer should also generate
// random data. Wait at specific intervals for the writer to catch up and to force a resize
// of the buffer.
if (iterations % 1024 == 0 && buffer.Size < buffer.MaximumSize)
{
onBufferResized.Wait();
}
}
hash.TransformFinalBlock(data, 0, 0);
return hash.Hash;
}
#endif
} }
} }