diff --git a/src/EventStore.Plugins/Transforms/ChunkDataReadStream.cs b/src/EventStore.Plugins/Transforms/ChunkDataReadStream.cs index 31d5f23..0c10bee 100644 --- a/src/EventStore.Plugins/Transforms/ChunkDataReadStream.cs +++ b/src/EventStore.Plugins/Transforms/ChunkDataReadStream.cs @@ -1,28 +1,32 @@ // Copyright (c) Event Store Ltd and/or licensed to Event Store Ltd under one or more agreements. // Event Store Ltd licenses this file to you under the Event Store License v2 (see LICENSE.md). -using System; -using System.IO; - namespace EventStore.Plugins.Transforms; -public class ChunkDataReadStream(Stream chunkFileStream) : Stream { - public Stream ChunkFileStream => chunkFileStream; +public class ChunkDataReadStream(Stream chunkFileStream) : ChunkDataStream(chunkFileStream) { public sealed override bool CanRead => true; public sealed override bool CanSeek => true; public sealed override bool CanWrite => false; - public sealed override void Write(byte[] buffer, int offset, int count) => throw new InvalidOperationException(); - public sealed override void Flush() => throw new InvalidOperationException(); - public sealed override void SetLength(long value) => throw new InvalidOperationException(); - public override long Length => throw new NotSupportedException(); - // reads must always return exactly `count` bytes as we never read past the (flushed) writer checkpoint - public override int Read(byte[] buffer, int offset, int count) => ChunkFileStream.Read(buffer, offset, count); + public override bool CanTimeout => true; + + public override int Read(Span buffer) => ChunkFileStream.Read(buffer); + + public sealed override void Write(ReadOnlySpan buffer) + => throw new NotSupportedException(); + + public sealed override void Flush() => throw new NotSupportedException(); + public sealed override Task FlushAsync(CancellationToken cancellationToken) + => Task.FromException(new NotSupportedException()); + + public sealed override void SetLength(long value) => throw new NotSupportedException(); + + public override long Length => throw new NotSupportedException(); // seeks need to support only `SeekOrigin.Begin` public override long Seek(long offset, SeekOrigin origin) { - if (origin != SeekOrigin.Begin) + if (origin is not SeekOrigin.Begin) throw new NotSupportedException(); return ChunkFileStream.Seek(offset, origin); @@ -38,7 +42,7 @@ protected override void Dispose(bool disposing) { if (!disposing) return; - chunkFileStream.Dispose(); + ChunkFileStream.Dispose(); } finally { base.Dispose(disposing); } diff --git a/src/EventStore.Plugins/Transforms/ChunkDataStream.cs b/src/EventStore.Plugins/Transforms/ChunkDataStream.cs new file mode 100644 index 0000000..6e95128 --- /dev/null +++ b/src/EventStore.Plugins/Transforms/ChunkDataStream.cs @@ -0,0 +1,53 @@ +// Copyright (c) Event Store Ltd and/or licensed to Event Store Ltd under one or more agreements. +// Event Store Ltd licenses this file to you under the Event Store License v2 (see LICENSE.md). + +using System.Runtime.CompilerServices; + +namespace EventStore.Plugins.Transforms; + +public abstract class ChunkDataStream(Stream chunkFileStream) : Stream { + protected Stream ChunkFileStream => chunkFileStream; + + public override int Read(Span buffer) => chunkFileStream.Read(buffer); + + public sealed override int Read(byte[] buffer, int offset, int count) { + ValidateBufferArguments(buffer, offset, count); + + return Read(buffer.AsSpan(offset, count)); + } + + public sealed override int ReadByte() { + Unsafe.SkipInit(out byte b); + + return Read(new(ref b)) > 0 ? b : -1; + } + + public override ValueTask ReadAsync(Memory buffer, CancellationToken token = default) => + chunkFileStream.ReadAsync(buffer, token); + + public sealed override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken token) + => ReadAsync(buffer.AsMemory(offset, count), token).AsTask(); + + public override void Write(ReadOnlySpan buffer) => ChunkFileStream.Write(buffer); + + public sealed override void Write(byte[] buffer, int offset, int count) { + ValidateBufferArguments(buffer, offset, count); + + Write(new(buffer, offset, count)); + } + + public sealed override void WriteByte(byte value) => Write(new(ref value)); + + public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken token = default) + => chunkFileStream.WriteAsync(buffer, token); + + public sealed override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken token) + => WriteAsync(buffer.AsMemory(offset, count), token).AsTask(); + + protected override void Dispose(bool disposing) { + if (disposing) + ChunkFileStream.Dispose(); + + base.Dispose(disposing); + } +} diff --git a/src/EventStore.Plugins/Transforms/ChunkDataWriteStream.cs b/src/EventStore.Plugins/Transforms/ChunkDataWriteStream.cs index 47ad52b..c7e95c2 100644 --- a/src/EventStore.Plugins/Transforms/ChunkDataWriteStream.cs +++ b/src/EventStore.Plugins/Transforms/ChunkDataWriteStream.cs @@ -1,64 +1,74 @@ // Copyright (c) Event Store Ltd and/or licensed to Event Store Ltd under one or more agreements. // Event Store Ltd licenses this file to you under the Event Store License v2 (see LICENSE.md). -using System; -using System.IO; +using System.Buffers; +using System.Diagnostics; +using System.Runtime.CompilerServices; using System.Security.Cryptography; namespace EventStore.Plugins.Transforms; -public class ChunkDataWriteStream(Stream chunkFileStream, HashAlgorithm checksumAlgorithm) : Stream { - public Stream ChunkFileStream => chunkFileStream; - public HashAlgorithm ChecksumAlgorithm => checksumAlgorithm; +public class ChunkDataWriteStream(Stream chunkFileStream, IncrementalHash checksumAlgorithm) : ChunkDataStream(chunkFileStream) { + private long? _positionToHash; public sealed override bool CanRead => false; public sealed override bool CanSeek => false; public sealed override bool CanWrite => true; - public sealed override int Read(byte[] buffer, int offset, int count) => throw new InvalidOperationException(); + + public sealed override int Read(Span buffer) => throw new NotSupportedException(); + + public override void Write(ReadOnlySpan buffer) => ChunkFileStream.Write(buffer); + + public override void Flush() => ChunkFileStream.Flush(); + public sealed override long Seek(long offset, SeekOrigin origin) => throw new InvalidOperationException(); - public override void Write(byte[] buffer, int offset, int count) { - ChunkFileStream.Write(buffer, offset, count); - ChecksumAlgorithm.TransformBlock(buffer, 0, count, null, 0); + public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken token = default) + => _positionToHash is { } count ? WriteAndChecksumAsync(count, buffer, token) : WriteWithoutChecksumAsync(buffer, token); + + [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))] + private async ValueTask WriteWithoutChecksumAsync(ReadOnlyMemory buffer, CancellationToken token) { + await ChunkFileStream.WriteAsync(buffer, token); + checksumAlgorithm.AppendData(buffer.Span); } - public override void Flush() => ChunkFileStream.Flush(); + private async ValueTask WriteAndChecksumAsync(long count, ReadOnlyMemory buffer, CancellationToken token) { + await ReadAndChecksumAsync(count, token); + + Debug.Assert(ChunkFileStream.Position == count); + await ChunkFileStream.WriteAsync(buffer, token); + checksumAlgorithm.AppendData(buffer.Span); + _positionToHash = null; + } + + public override Task FlushAsync(CancellationToken ct) => ChunkFileStream.FlushAsync(ct); public override void SetLength(long value) => ChunkFileStream.SetLength(value); public override long Length => ChunkFileStream.Length; public override long Position { - get => ChunkFileStream.Position; + get => _positionToHash ?? ChunkFileStream.Position; set { - if (ChunkFileStream.Position != 0) + if (ChunkFileStream.Position is not 0L) throw new InvalidOperationException("Writer's position can only be moved from 0 to a higher value."); - ReadAndChecksum(value); - - if (ChunkFileStream.Position != value) - throw new Exception($"Writer's position ({ChunkFileStream.Position:N0}) is not at the expected position ({value:N0})"); + if (value is not 0L) + _positionToHash = value; } } - private void ReadAndChecksum(long count) { - var buffer = new byte[4096]; - long toRead = count; - while (toRead > 0) { - int read = ChunkFileStream.Read(buffer, 0, (int)Math.Min(toRead, buffer.Length)); - if (read == 0) - break; - - ChecksumAlgorithm.TransformBlock(buffer, 0, read, null, 0); - toRead -= read; - } - } + private async ValueTask ReadAndChecksumAsync(long count, CancellationToken token) { + var buffer = ArrayPool.Shared.Rent(4096); - protected override void Dispose(bool disposing) { try { - if (!disposing) - return; + for (int bytesRead; count > 0L; count -= bytesRead) { + bytesRead = await ChunkFileStream.ReadAsync(buffer.AsMemory(0, (int)long.Min(count, buffer.Length)), + token); + if (bytesRead is 0) + break; - chunkFileStream.Dispose(); + checksumAlgorithm.AppendData(new ReadOnlySpan(buffer, 0, bytesRead)); + } } finally { - base.Dispose(disposing); + ArrayPool.Shared.Return(buffer); } } } diff --git a/src/EventStore.Plugins/Transforms/IChunkTransformFactory.cs b/src/EventStore.Plugins/Transforms/IChunkTransformFactory.cs index eb331c5..7461e04 100644 --- a/src/EventStore.Plugins/Transforms/IChunkTransformFactory.cs +++ b/src/EventStore.Plugins/Transforms/IChunkTransformFactory.cs @@ -1,15 +1,18 @@ // Copyright (c) Event Store Ltd and/or licensed to Event Store Ltd under one or more agreements. // Event Store Ltd licenses this file to you under the Event Store License v2 (see LICENSE.md). -using System; -using System.IO; - namespace EventStore.Plugins.Transforms; public interface IChunkTransformFactory { TransformType Type { get; } + int TransformDataPosition(int dataPosition); - ReadOnlyMemory CreateTransformHeader(); - ReadOnlyMemory ReadTransformHeader(Stream stream); - IChunkTransform CreateTransform(ReadOnlyMemory transformHeader); + + void CreateTransformHeader(Span transformHeader); + + ValueTask ReadTransformHeader(Stream stream, Memory transformHeader, CancellationToken token = default); + + IChunkTransform CreateTransform(ReadOnlySpan transformHeader); + + int TransformHeaderLength { get; } } diff --git a/src/EventStore.Plugins/Transforms/IChunkWriteTransform.cs b/src/EventStore.Plugins/Transforms/IChunkWriteTransform.cs index b354d47..d5bdcc3 100644 --- a/src/EventStore.Plugins/Transforms/IChunkWriteTransform.cs +++ b/src/EventStore.Plugins/Transforms/IChunkWriteTransform.cs @@ -1,12 +1,10 @@ // Copyright (c) Event Store Ltd and/or licensed to Event Store Ltd under one or more agreements. // Event Store Ltd licenses this file to you under the Event Store License v2 (see LICENSE.md). -using System; - namespace EventStore.Plugins.Transforms; public interface IChunkWriteTransform { ChunkDataWriteStream TransformData(ChunkDataWriteStream stream); - void CompleteData(int footerSize, int alignmentSize); - void WriteFooter(ReadOnlySpan footer, out int fileSize); + ValueTask CompleteData(int footerSize, int alignmentSize, CancellationToken token = default); + ValueTask WriteFooter(ReadOnlyMemory footer, CancellationToken token = default); }