From 93d0971560d08716a41ad063a6513cfd8c892aed Mon Sep 17 00:00:00 2001 From: shaan1337 Date: Tue, 5 Nov 2024 10:13:25 +0400 Subject: [PATCH 1/4] ChunkDataReadStream: Support async reads & remove support for sync reads --- src/EventStore.Plugins/Transforms/ChunkDataReadStream.cs | 4 +++- src/EventStore.Plugins/Transforms/ChunkDataWriteStream.cs | 2 ++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/src/EventStore.Plugins/Transforms/ChunkDataReadStream.cs b/src/EventStore.Plugins/Transforms/ChunkDataReadStream.cs index 31d5f23..5d327e6 100644 --- a/src/EventStore.Plugins/Transforms/ChunkDataReadStream.cs +++ b/src/EventStore.Plugins/Transforms/ChunkDataReadStream.cs @@ -12,13 +12,15 @@ public class ChunkDataReadStream(Stream chunkFileStream) : Stream { public sealed override bool CanRead => true; public sealed override bool CanSeek => true; public sealed override bool CanWrite => false; + public sealed override int Read(byte[] buffer, int offset, int count) => throw new InvalidOperationException("use ReadAsync"); public sealed override void Write(byte[] buffer, int offset, int count) => throw new InvalidOperationException(); public sealed override void Flush() => throw new InvalidOperationException(); public sealed override void SetLength(long value) => throw new InvalidOperationException(); public override long Length => throw new NotSupportedException(); // reads must always return exactly `count` bytes as we never read past the (flushed) writer checkpoint - public override int Read(byte[] buffer, int offset, int count) => ChunkFileStream.Read(buffer, offset, count); + public override ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) => + ChunkFileStream.ReadAsync(buffer, cancellationToken); // seeks need to support only `SeekOrigin.Begin` public override long Seek(long offset, SeekOrigin origin) { diff --git a/src/EventStore.Plugins/Transforms/ChunkDataWriteStream.cs b/src/EventStore.Plugins/Transforms/ChunkDataWriteStream.cs index 47ad52b..3563a36 100644 --- a/src/EventStore.Plugins/Transforms/ChunkDataWriteStream.cs +++ b/src/EventStore.Plugins/Transforms/ChunkDataWriteStream.cs @@ -15,6 +15,8 @@ public class ChunkDataWriteStream(Stream chunkFileStream, HashAlgorithm checksum public sealed override bool CanSeek => false; public sealed override bool CanWrite => true; public sealed override int Read(byte[] buffer, int offset, int count) => throw new InvalidOperationException(); + public sealed override ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) => + throw new InvalidOperationException(); public sealed override long Seek(long offset, SeekOrigin origin) => throw new InvalidOperationException(); public override void Write(byte[] buffer, int offset, int count) { From 492071492d065388782df6451a6e618d59a2cf37 Mon Sep 17 00:00:00 2001 From: shaan1337 Date: Tue, 5 Nov 2024 11:12:50 +0400 Subject: [PATCH 2/4] ChunkDataWriteStream / IChunkWriteTransform: Support async writes & remove support for sync writes --- .../Transforms/ChunkDataReadStream.cs | 4 +++ .../Transforms/ChunkDataWriteStream.cs | 28 +++++++++++++++---- .../Transforms/IChunkWriteTransform.cs | 4 +-- 3 files changed, 28 insertions(+), 8 deletions(-) diff --git a/src/EventStore.Plugins/Transforms/ChunkDataReadStream.cs b/src/EventStore.Plugins/Transforms/ChunkDataReadStream.cs index 5d327e6..91600f4 100644 --- a/src/EventStore.Plugins/Transforms/ChunkDataReadStream.cs +++ b/src/EventStore.Plugins/Transforms/ChunkDataReadStream.cs @@ -14,7 +14,11 @@ public class ChunkDataReadStream(Stream chunkFileStream) : Stream { public sealed override bool CanWrite => false; public sealed override int Read(byte[] buffer, int offset, int count) => throw new InvalidOperationException("use ReadAsync"); public sealed override void Write(byte[] buffer, int offset, int count) => throw new InvalidOperationException(); + public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) => + throw new InvalidOperationException(); public sealed override void Flush() => throw new InvalidOperationException(); + public sealed override Task FlushAsync(CancellationToken cancellationToken) => + throw new InvalidOperationException(); public sealed override void SetLength(long value) => throw new InvalidOperationException(); public override long Length => throw new NotSupportedException(); diff --git a/src/EventStore.Plugins/Transforms/ChunkDataWriteStream.cs b/src/EventStore.Plugins/Transforms/ChunkDataWriteStream.cs index 3563a36..9fbeb0a 100644 --- a/src/EventStore.Plugins/Transforms/ChunkDataWriteStream.cs +++ b/src/EventStore.Plugins/Transforms/ChunkDataWriteStream.cs @@ -1,8 +1,7 @@ // Copyright (c) Event Store Ltd and/or licensed to Event Store Ltd under one or more agreements. // Event Store Ltd licenses this file to you under the Event Store License v2 (see LICENSE.md). -using System; -using System.IO; +using System.Buffers; using System.Security.Cryptography; namespace EventStore.Plugins.Transforms; @@ -17,14 +16,19 @@ public class ChunkDataWriteStream(Stream chunkFileStream, HashAlgorithm checksum public sealed override int Read(byte[] buffer, int offset, int count) => throw new InvalidOperationException(); public sealed override ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) => throw new InvalidOperationException(); + public sealed override void Write(byte[] buffer, int offset, int count) => + throw new InvalidOperationException("use WriteAsync"); + public sealed override void Flush() => + throw new InvalidOperationException("use FlushAsync"); + public sealed override long Seek(long offset, SeekOrigin origin) => throw new InvalidOperationException(); - public override void Write(byte[] buffer, int offset, int count) { - ChunkFileStream.Write(buffer, offset, count); - ChecksumAlgorithm.TransformBlock(buffer, 0, count, null, 0); + public override async ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) { + await ChunkFileStream.WriteAsync(buffer, cancellationToken); + Checksum(buffer); } - public override void Flush() => ChunkFileStream.Flush(); + public override Task FlushAsync(CancellationToken ct) => ChunkFileStream.FlushAsync(ct); public override void SetLength(long value) => ChunkFileStream.SetLength(value); public override long Length => ChunkFileStream.Length; public override long Position { @@ -40,6 +44,18 @@ public override long Position { } } + public void Checksum(ReadOnlyMemory data) { + // HashAlgorithm.TransformBlock() doesn't support span/memory, so we need to rent a byte array from the pool + byte[] tmp = ArrayPool.Shared.Rent(data.Length); + try { + data.CopyTo(tmp.AsMemory()); + ChecksumAlgorithm.TransformBlock(tmp, 0, data.Length, null, 0); + Array.Clear(tmp, 0, data.Length); + } finally { + ArrayPool.Shared.Return(tmp); + } + } + private void ReadAndChecksum(long count) { var buffer = new byte[4096]; long toRead = count; diff --git a/src/EventStore.Plugins/Transforms/IChunkWriteTransform.cs b/src/EventStore.Plugins/Transforms/IChunkWriteTransform.cs index b354d47..f52169d 100644 --- a/src/EventStore.Plugins/Transforms/IChunkWriteTransform.cs +++ b/src/EventStore.Plugins/Transforms/IChunkWriteTransform.cs @@ -7,6 +7,6 @@ namespace EventStore.Plugins.Transforms; public interface IChunkWriteTransform { ChunkDataWriteStream TransformData(ChunkDataWriteStream stream); - void CompleteData(int footerSize, int alignmentSize); - void WriteFooter(ReadOnlySpan footer, out int fileSize); + ValueTask CompleteData(int footerSize, int alignmentSize, CancellationToken cancellationToken = default); + ValueTask WriteFooter(ReadOnlyMemory footer, CancellationToken cancellationToken = default); } From 9e6748ec724b949734966bc5429beb72b587b2a2 Mon Sep 17 00:00:00 2001 From: Roman Sakno Date: Wed, 11 Dec 2024 14:35:07 +0200 Subject: [PATCH 3/4] Make transformations async compatible --- .../Transforms/ChunkDataReadStream.cs | 36 ++++---- .../Transforms/ChunkDataStream.cs | 53 +++++++++++ .../Transforms/ChunkDataWriteStream.cs | 88 +++++++++---------- .../Transforms/IChunkTransformFactory.cs | 15 ++-- .../Transforms/IChunkWriteTransform.cs | 6 +- 5 files changed, 121 insertions(+), 77 deletions(-) create mode 100644 src/EventStore.Plugins/Transforms/ChunkDataStream.cs diff --git a/src/EventStore.Plugins/Transforms/ChunkDataReadStream.cs b/src/EventStore.Plugins/Transforms/ChunkDataReadStream.cs index 91600f4..0c10bee 100644 --- a/src/EventStore.Plugins/Transforms/ChunkDataReadStream.cs +++ b/src/EventStore.Plugins/Transforms/ChunkDataReadStream.cs @@ -1,34 +1,32 @@ // Copyright (c) Event Store Ltd and/or licensed to Event Store Ltd under one or more agreements. // Event Store Ltd licenses this file to you under the Event Store License v2 (see LICENSE.md). -using System; -using System.IO; - namespace EventStore.Plugins.Transforms; -public class ChunkDataReadStream(Stream chunkFileStream) : Stream { - public Stream ChunkFileStream => chunkFileStream; +public class ChunkDataReadStream(Stream chunkFileStream) : ChunkDataStream(chunkFileStream) { public sealed override bool CanRead => true; public sealed override bool CanSeek => true; public sealed override bool CanWrite => false; - public sealed override int Read(byte[] buffer, int offset, int count) => throw new InvalidOperationException("use ReadAsync"); - public sealed override void Write(byte[] buffer, int offset, int count) => throw new InvalidOperationException(); - public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) => - throw new InvalidOperationException(); - public sealed override void Flush() => throw new InvalidOperationException(); - public sealed override Task FlushAsync(CancellationToken cancellationToken) => - throw new InvalidOperationException(); - public sealed override void SetLength(long value) => throw new InvalidOperationException(); - public override long Length => throw new NotSupportedException(); - // reads must always return exactly `count` bytes as we never read past the (flushed) writer checkpoint - public override ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) => - ChunkFileStream.ReadAsync(buffer, cancellationToken); + public override bool CanTimeout => true; + + public override int Read(Span buffer) => ChunkFileStream.Read(buffer); + + public sealed override void Write(ReadOnlySpan buffer) + => throw new NotSupportedException(); + + public sealed override void Flush() => throw new NotSupportedException(); + public sealed override Task FlushAsync(CancellationToken cancellationToken) + => Task.FromException(new NotSupportedException()); + + public sealed override void SetLength(long value) => throw new NotSupportedException(); + + public override long Length => throw new NotSupportedException(); // seeks need to support only `SeekOrigin.Begin` public override long Seek(long offset, SeekOrigin origin) { - if (origin != SeekOrigin.Begin) + if (origin is not SeekOrigin.Begin) throw new NotSupportedException(); return ChunkFileStream.Seek(offset, origin); @@ -44,7 +42,7 @@ protected override void Dispose(bool disposing) { if (!disposing) return; - chunkFileStream.Dispose(); + ChunkFileStream.Dispose(); } finally { base.Dispose(disposing); } diff --git a/src/EventStore.Plugins/Transforms/ChunkDataStream.cs b/src/EventStore.Plugins/Transforms/ChunkDataStream.cs new file mode 100644 index 0000000..6e95128 --- /dev/null +++ b/src/EventStore.Plugins/Transforms/ChunkDataStream.cs @@ -0,0 +1,53 @@ +// Copyright (c) Event Store Ltd and/or licensed to Event Store Ltd under one or more agreements. +// Event Store Ltd licenses this file to you under the Event Store License v2 (see LICENSE.md). + +using System.Runtime.CompilerServices; + +namespace EventStore.Plugins.Transforms; + +public abstract class ChunkDataStream(Stream chunkFileStream) : Stream { + protected Stream ChunkFileStream => chunkFileStream; + + public override int Read(Span buffer) => chunkFileStream.Read(buffer); + + public sealed override int Read(byte[] buffer, int offset, int count) { + ValidateBufferArguments(buffer, offset, count); + + return Read(buffer.AsSpan(offset, count)); + } + + public sealed override int ReadByte() { + Unsafe.SkipInit(out byte b); + + return Read(new(ref b)) > 0 ? b : -1; + } + + public override ValueTask ReadAsync(Memory buffer, CancellationToken token = default) => + chunkFileStream.ReadAsync(buffer, token); + + public sealed override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken token) + => ReadAsync(buffer.AsMemory(offset, count), token).AsTask(); + + public override void Write(ReadOnlySpan buffer) => ChunkFileStream.Write(buffer); + + public sealed override void Write(byte[] buffer, int offset, int count) { + ValidateBufferArguments(buffer, offset, count); + + Write(new(buffer, offset, count)); + } + + public sealed override void WriteByte(byte value) => Write(new(ref value)); + + public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken token = default) + => chunkFileStream.WriteAsync(buffer, token); + + public sealed override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken token) + => WriteAsync(buffer.AsMemory(offset, count), token).AsTask(); + + protected override void Dispose(bool disposing) { + if (disposing) + ChunkFileStream.Dispose(); + + base.Dispose(disposing); + } +} diff --git a/src/EventStore.Plugins/Transforms/ChunkDataWriteStream.cs b/src/EventStore.Plugins/Transforms/ChunkDataWriteStream.cs index 9fbeb0a..c7e95c2 100644 --- a/src/EventStore.Plugins/Transforms/ChunkDataWriteStream.cs +++ b/src/EventStore.Plugins/Transforms/ChunkDataWriteStream.cs @@ -2,81 +2,73 @@ // Event Store Ltd licenses this file to you under the Event Store License v2 (see LICENSE.md). using System.Buffers; +using System.Diagnostics; +using System.Runtime.CompilerServices; using System.Security.Cryptography; namespace EventStore.Plugins.Transforms; -public class ChunkDataWriteStream(Stream chunkFileStream, HashAlgorithm checksumAlgorithm) : Stream { - public Stream ChunkFileStream => chunkFileStream; - public HashAlgorithm ChecksumAlgorithm => checksumAlgorithm; +public class ChunkDataWriteStream(Stream chunkFileStream, IncrementalHash checksumAlgorithm) : ChunkDataStream(chunkFileStream) { + private long? _positionToHash; public sealed override bool CanRead => false; public sealed override bool CanSeek => false; public sealed override bool CanWrite => true; - public sealed override int Read(byte[] buffer, int offset, int count) => throw new InvalidOperationException(); - public sealed override ValueTask ReadAsync(Memory buffer, CancellationToken cancellationToken = default) => - throw new InvalidOperationException(); - public sealed override void Write(byte[] buffer, int offset, int count) => - throw new InvalidOperationException("use WriteAsync"); - public sealed override void Flush() => - throw new InvalidOperationException("use FlushAsync"); + + public sealed override int Read(Span buffer) => throw new NotSupportedException(); + + public override void Write(ReadOnlySpan buffer) => ChunkFileStream.Write(buffer); + + public override void Flush() => ChunkFileStream.Flush(); public sealed override long Seek(long offset, SeekOrigin origin) => throw new InvalidOperationException(); - public override async ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) { - await ChunkFileStream.WriteAsync(buffer, cancellationToken); - Checksum(buffer); + public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken token = default) + => _positionToHash is { } count ? WriteAndChecksumAsync(count, buffer, token) : WriteWithoutChecksumAsync(buffer, token); + + [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))] + private async ValueTask WriteWithoutChecksumAsync(ReadOnlyMemory buffer, CancellationToken token) { + await ChunkFileStream.WriteAsync(buffer, token); + checksumAlgorithm.AppendData(buffer.Span); + } + + private async ValueTask WriteAndChecksumAsync(long count, ReadOnlyMemory buffer, CancellationToken token) { + await ReadAndChecksumAsync(count, token); + + Debug.Assert(ChunkFileStream.Position == count); + await ChunkFileStream.WriteAsync(buffer, token); + checksumAlgorithm.AppendData(buffer.Span); + _positionToHash = null; } public override Task FlushAsync(CancellationToken ct) => ChunkFileStream.FlushAsync(ct); public override void SetLength(long value) => ChunkFileStream.SetLength(value); public override long Length => ChunkFileStream.Length; public override long Position { - get => ChunkFileStream.Position; + get => _positionToHash ?? ChunkFileStream.Position; set { - if (ChunkFileStream.Position != 0) + if (ChunkFileStream.Position is not 0L) throw new InvalidOperationException("Writer's position can only be moved from 0 to a higher value."); - ReadAndChecksum(value); - - if (ChunkFileStream.Position != value) - throw new Exception($"Writer's position ({ChunkFileStream.Position:N0}) is not at the expected position ({value:N0})"); + if (value is not 0L) + _positionToHash = value; } } - public void Checksum(ReadOnlyMemory data) { - // HashAlgorithm.TransformBlock() doesn't support span/memory, so we need to rent a byte array from the pool - byte[] tmp = ArrayPool.Shared.Rent(data.Length); - try { - data.CopyTo(tmp.AsMemory()); - ChecksumAlgorithm.TransformBlock(tmp, 0, data.Length, null, 0); - Array.Clear(tmp, 0, data.Length); - } finally { - ArrayPool.Shared.Return(tmp); - } - } - - private void ReadAndChecksum(long count) { - var buffer = new byte[4096]; - long toRead = count; - while (toRead > 0) { - int read = ChunkFileStream.Read(buffer, 0, (int)Math.Min(toRead, buffer.Length)); - if (read == 0) - break; - - ChecksumAlgorithm.TransformBlock(buffer, 0, read, null, 0); - toRead -= read; - } - } + private async ValueTask ReadAndChecksumAsync(long count, CancellationToken token) { + var buffer = ArrayPool.Shared.Rent(4096); - protected override void Dispose(bool disposing) { try { - if (!disposing) - return; + for (int bytesRead; count > 0L; count -= bytesRead) { + bytesRead = await ChunkFileStream.ReadAsync(buffer.AsMemory(0, (int)long.Min(count, buffer.Length)), + token); + if (bytesRead is 0) + break; - chunkFileStream.Dispose(); + checksumAlgorithm.AppendData(new ReadOnlySpan(buffer, 0, bytesRead)); + } } finally { - base.Dispose(disposing); + ArrayPool.Shared.Return(buffer); } } } diff --git a/src/EventStore.Plugins/Transforms/IChunkTransformFactory.cs b/src/EventStore.Plugins/Transforms/IChunkTransformFactory.cs index eb331c5..59ce67b 100644 --- a/src/EventStore.Plugins/Transforms/IChunkTransformFactory.cs +++ b/src/EventStore.Plugins/Transforms/IChunkTransformFactory.cs @@ -1,15 +1,18 @@ // Copyright (c) Event Store Ltd and/or licensed to Event Store Ltd under one or more agreements. // Event Store Ltd licenses this file to you under the Event Store License v2 (see LICENSE.md). -using System; -using System.IO; - namespace EventStore.Plugins.Transforms; public interface IChunkTransformFactory { TransformType Type { get; } + int TransformDataPosition(int dataPosition); - ReadOnlyMemory CreateTransformHeader(); - ReadOnlyMemory ReadTransformHeader(Stream stream); - IChunkTransform CreateTransform(ReadOnlyMemory transformHeader); + + int CreateTransformHeader(Span transformHeader); + + ValueTask ReadTransformHeader(Stream stream, Memory transformHeader, CancellationToken token = default); + + IChunkTransform CreateTransform(ReadOnlySpan transformHeader); + + int TransformHeaderLength { get; } } diff --git a/src/EventStore.Plugins/Transforms/IChunkWriteTransform.cs b/src/EventStore.Plugins/Transforms/IChunkWriteTransform.cs index f52169d..d5bdcc3 100644 --- a/src/EventStore.Plugins/Transforms/IChunkWriteTransform.cs +++ b/src/EventStore.Plugins/Transforms/IChunkWriteTransform.cs @@ -1,12 +1,10 @@ // Copyright (c) Event Store Ltd and/or licensed to Event Store Ltd under one or more agreements. // Event Store Ltd licenses this file to you under the Event Store License v2 (see LICENSE.md). -using System; - namespace EventStore.Plugins.Transforms; public interface IChunkWriteTransform { ChunkDataWriteStream TransformData(ChunkDataWriteStream stream); - ValueTask CompleteData(int footerSize, int alignmentSize, CancellationToken cancellationToken = default); - ValueTask WriteFooter(ReadOnlyMemory footer, CancellationToken cancellationToken = default); + ValueTask CompleteData(int footerSize, int alignmentSize, CancellationToken token = default); + ValueTask WriteFooter(ReadOnlyMemory footer, CancellationToken token = default); } From c38180709ec7ebb2a685ae4b6845b1332dac09c3 Mon Sep 17 00:00:00 2001 From: Roman Sakno Date: Wed, 11 Dec 2024 14:37:46 +0200 Subject: [PATCH 4/4] Fixed return type because transform header length is known through the separated property --- src/EventStore.Plugins/Transforms/IChunkTransformFactory.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/EventStore.Plugins/Transforms/IChunkTransformFactory.cs b/src/EventStore.Plugins/Transforms/IChunkTransformFactory.cs index 59ce67b..7461e04 100644 --- a/src/EventStore.Plugins/Transforms/IChunkTransformFactory.cs +++ b/src/EventStore.Plugins/Transforms/IChunkTransformFactory.cs @@ -8,7 +8,7 @@ public interface IChunkTransformFactory { int TransformDataPosition(int dataPosition); - int CreateTransformHeader(Span transformHeader); + void CreateTransformHeader(Span transformHeader); ValueTask ReadTransformHeader(Stream stream, Memory transformHeader, CancellationToken token = default);