Skip to content

Commit a1bb489

Browse files
authored
[KDB-403] Support async reads & writes in chunks
[KDB-403] Support async reads & writes in chunks
2 parents d2ba515 + c381807 commit a1bb489

File tree

5 files changed

+124
-56
lines changed

5 files changed

+124
-56
lines changed

src/EventStore.Plugins/Transforms/ChunkDataReadStream.cs

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,32 @@
11
// Copyright (c) Event Store Ltd and/or licensed to Event Store Ltd under one or more agreements.
22
// Event Store Ltd licenses this file to you under the Event Store License v2 (see LICENSE.md).
33

4-
using System;
5-
using System.IO;
6-
74
namespace EventStore.Plugins.Transforms;
85

9-
public class ChunkDataReadStream(Stream chunkFileStream) : Stream {
10-
public Stream ChunkFileStream => chunkFileStream;
6+
public class ChunkDataReadStream(Stream chunkFileStream) : ChunkDataStream(chunkFileStream) {
117

128
public sealed override bool CanRead => true;
139
public sealed override bool CanSeek => true;
1410
public sealed override bool CanWrite => false;
15-
public sealed override void Write(byte[] buffer, int offset, int count) => throw new InvalidOperationException();
16-
public sealed override void Flush() => throw new InvalidOperationException();
17-
public sealed override void SetLength(long value) => throw new InvalidOperationException();
18-
public override long Length => throw new NotSupportedException();
1911

20-
// reads must always return exactly `count` bytes as we never read past the (flushed) writer checkpoint
21-
public override int Read(byte[] buffer, int offset, int count) => ChunkFileStream.Read(buffer, offset, count);
12+
public override bool CanTimeout => true;
13+
14+
public override int Read(Span<byte> buffer) => ChunkFileStream.Read(buffer);
15+
16+
public sealed override void Write(ReadOnlySpan<byte> buffer)
17+
=> throw new NotSupportedException();
18+
19+
public sealed override void Flush() => throw new NotSupportedException();
20+
public sealed override Task FlushAsync(CancellationToken cancellationToken)
21+
=> Task.FromException(new NotSupportedException());
22+
23+
public sealed override void SetLength(long value) => throw new NotSupportedException();
24+
25+
public override long Length => throw new NotSupportedException();
2226

2327
// seeks need to support only `SeekOrigin.Begin`
2428
public override long Seek(long offset, SeekOrigin origin) {
25-
if (origin != SeekOrigin.Begin)
29+
if (origin is not SeekOrigin.Begin)
2630
throw new NotSupportedException();
2731

2832
return ChunkFileStream.Seek(offset, origin);
@@ -38,7 +42,7 @@ protected override void Dispose(bool disposing) {
3842
if (!disposing)
3943
return;
4044

41-
chunkFileStream.Dispose();
45+
ChunkFileStream.Dispose();
4246
} finally {
4347
base.Dispose(disposing);
4448
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
// Copyright (c) Event Store Ltd and/or licensed to Event Store Ltd under one or more agreements.
2+
// Event Store Ltd licenses this file to you under the Event Store License v2 (see LICENSE.md).
3+
4+
using System.Runtime.CompilerServices;
5+
6+
namespace EventStore.Plugins.Transforms;
7+
8+
public abstract class ChunkDataStream(Stream chunkFileStream) : Stream {
9+
protected Stream ChunkFileStream => chunkFileStream;
10+
11+
public override int Read(Span<byte> buffer) => chunkFileStream.Read(buffer);
12+
13+
public sealed override int Read(byte[] buffer, int offset, int count) {
14+
ValidateBufferArguments(buffer, offset, count);
15+
16+
return Read(buffer.AsSpan(offset, count));
17+
}
18+
19+
public sealed override int ReadByte() {
20+
Unsafe.SkipInit(out byte b);
21+
22+
return Read(new(ref b)) > 0 ? b : -1;
23+
}
24+
25+
public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken token = default) =>
26+
chunkFileStream.ReadAsync(buffer, token);
27+
28+
public sealed override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken token)
29+
=> ReadAsync(buffer.AsMemory(offset, count), token).AsTask();
30+
31+
public override void Write(ReadOnlySpan<byte> buffer) => ChunkFileStream.Write(buffer);
32+
33+
public sealed override void Write(byte[] buffer, int offset, int count) {
34+
ValidateBufferArguments(buffer, offset, count);
35+
36+
Write(new(buffer, offset, count));
37+
}
38+
39+
public sealed override void WriteByte(byte value) => Write(new(ref value));
40+
41+
public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken token = default)
42+
=> chunkFileStream.WriteAsync(buffer, token);
43+
44+
public sealed override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken token)
45+
=> WriteAsync(buffer.AsMemory(offset, count), token).AsTask();
46+
47+
protected override void Dispose(bool disposing) {
48+
if (disposing)
49+
ChunkFileStream.Dispose();
50+
51+
base.Dispose(disposing);
52+
}
53+
}
Lines changed: 43 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,64 +1,74 @@
11
// Copyright (c) Event Store Ltd and/or licensed to Event Store Ltd under one or more agreements.
22
// Event Store Ltd licenses this file to you under the Event Store License v2 (see LICENSE.md).
33

4-
using System;
5-
using System.IO;
4+
using System.Buffers;
5+
using System.Diagnostics;
6+
using System.Runtime.CompilerServices;
67
using System.Security.Cryptography;
78

89
namespace EventStore.Plugins.Transforms;
910

10-
public class ChunkDataWriteStream(Stream chunkFileStream, HashAlgorithm checksumAlgorithm) : Stream {
11-
public Stream ChunkFileStream => chunkFileStream;
12-
public HashAlgorithm ChecksumAlgorithm => checksumAlgorithm;
11+
public class ChunkDataWriteStream(Stream chunkFileStream, IncrementalHash checksumAlgorithm) : ChunkDataStream(chunkFileStream) {
12+
private long? _positionToHash;
1313

1414
public sealed override bool CanRead => false;
1515
public sealed override bool CanSeek => false;
1616
public sealed override bool CanWrite => true;
17-
public sealed override int Read(byte[] buffer, int offset, int count) => throw new InvalidOperationException();
17+
18+
public sealed override int Read(Span<byte> buffer) => throw new NotSupportedException();
19+
20+
public override void Write(ReadOnlySpan<byte> buffer) => ChunkFileStream.Write(buffer);
21+
22+
public override void Flush() => ChunkFileStream.Flush();
23+
1824
public sealed override long Seek(long offset, SeekOrigin origin) => throw new InvalidOperationException();
1925

20-
public override void Write(byte[] buffer, int offset, int count) {
21-
ChunkFileStream.Write(buffer, offset, count);
22-
ChecksumAlgorithm.TransformBlock(buffer, 0, count, null, 0);
26+
public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken token = default)
27+
=> _positionToHash is { } count ? WriteAndChecksumAsync(count, buffer, token) : WriteWithoutChecksumAsync(buffer, token);
28+
29+
[AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))]
30+
private async ValueTask WriteWithoutChecksumAsync(ReadOnlyMemory<byte> buffer, CancellationToken token) {
31+
await ChunkFileStream.WriteAsync(buffer, token);
32+
checksumAlgorithm.AppendData(buffer.Span);
2333
}
2434

25-
public override void Flush() => ChunkFileStream.Flush();
35+
private async ValueTask WriteAndChecksumAsync(long count, ReadOnlyMemory<byte> buffer, CancellationToken token) {
36+
await ReadAndChecksumAsync(count, token);
37+
38+
Debug.Assert(ChunkFileStream.Position == count);
39+
await ChunkFileStream.WriteAsync(buffer, token);
40+
checksumAlgorithm.AppendData(buffer.Span);
41+
_positionToHash = null;
42+
}
43+
44+
public override Task FlushAsync(CancellationToken ct) => ChunkFileStream.FlushAsync(ct);
2645
public override void SetLength(long value) => ChunkFileStream.SetLength(value);
2746
public override long Length => ChunkFileStream.Length;
2847
public override long Position {
29-
get => ChunkFileStream.Position;
48+
get => _positionToHash ?? ChunkFileStream.Position;
3049
set {
31-
if (ChunkFileStream.Position != 0)
50+
if (ChunkFileStream.Position is not 0L)
3251
throw new InvalidOperationException("Writer's position can only be moved from 0 to a higher value.");
3352

34-
ReadAndChecksum(value);
35-
36-
if (ChunkFileStream.Position != value)
37-
throw new Exception($"Writer's position ({ChunkFileStream.Position:N0}) is not at the expected position ({value:N0})");
53+
if (value is not 0L)
54+
_positionToHash = value;
3855
}
3956
}
4057

41-
private void ReadAndChecksum(long count) {
42-
var buffer = new byte[4096];
43-
long toRead = count;
44-
while (toRead > 0) {
45-
int read = ChunkFileStream.Read(buffer, 0, (int)Math.Min(toRead, buffer.Length));
46-
if (read == 0)
47-
break;
48-
49-
ChecksumAlgorithm.TransformBlock(buffer, 0, read, null, 0);
50-
toRead -= read;
51-
}
52-
}
58+
private async ValueTask ReadAndChecksumAsync(long count, CancellationToken token) {
59+
var buffer = ArrayPool<byte>.Shared.Rent(4096);
5360

54-
protected override void Dispose(bool disposing) {
5561
try {
56-
if (!disposing)
57-
return;
62+
for (int bytesRead; count > 0L; count -= bytesRead) {
63+
bytesRead = await ChunkFileStream.ReadAsync(buffer.AsMemory(0, (int)long.Min(count, buffer.Length)),
64+
token);
65+
if (bytesRead is 0)
66+
break;
5867

59-
chunkFileStream.Dispose();
68+
checksumAlgorithm.AppendData(new ReadOnlySpan<byte>(buffer, 0, bytesRead));
69+
}
6070
} finally {
61-
base.Dispose(disposing);
71+
ArrayPool<byte>.Shared.Return(buffer);
6272
}
6373
}
6474
}
Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,18 @@
11
// Copyright (c) Event Store Ltd and/or licensed to Event Store Ltd under one or more agreements.
22
// Event Store Ltd licenses this file to you under the Event Store License v2 (see LICENSE.md).
33

4-
using System;
5-
using System.IO;
6-
74
namespace EventStore.Plugins.Transforms;
85

96
public interface IChunkTransformFactory {
107
TransformType Type { get; }
8+
119
int TransformDataPosition(int dataPosition);
12-
ReadOnlyMemory<byte> CreateTransformHeader();
13-
ReadOnlyMemory<byte> ReadTransformHeader(Stream stream);
14-
IChunkTransform CreateTransform(ReadOnlyMemory<byte> transformHeader);
10+
11+
void CreateTransformHeader(Span<byte> transformHeader);
12+
13+
ValueTask ReadTransformHeader(Stream stream, Memory<byte> transformHeader, CancellationToken token = default);
14+
15+
IChunkTransform CreateTransform(ReadOnlySpan<byte> transformHeader);
16+
17+
int TransformHeaderLength { get; }
1518
}
Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,10 @@
11
// Copyright (c) Event Store Ltd and/or licensed to Event Store Ltd under one or more agreements.
22
// Event Store Ltd licenses this file to you under the Event Store License v2 (see LICENSE.md).
33

4-
using System;
5-
64
namespace EventStore.Plugins.Transforms;
75

86
public interface IChunkWriteTransform {
97
ChunkDataWriteStream TransformData(ChunkDataWriteStream stream);
10-
void CompleteData(int footerSize, int alignmentSize);
11-
void WriteFooter(ReadOnlySpan<byte> footer, out int fileSize);
8+
ValueTask CompleteData(int footerSize, int alignmentSize, CancellationToken token = default);
9+
ValueTask<int> WriteFooter(ReadOnlyMemory<byte> footer, CancellationToken token = default);
1210
}

0 commit comments

Comments
 (0)