diff --git a/src/FileIO/WithPipeLines.cs b/src/FileIO/WithPipeLines.cs index 5a49eb4..a33a0e7 100644 --- a/src/FileIO/WithPipeLines.cs +++ b/src/FileIO/WithPipeLines.cs @@ -23,6 +23,25 @@ public async Task ProcessFileAsync(string filePath, Employee[] employeeReco if (!File.Exists(filePath)) return position; await using var fileStream = File.OpenRead(filePath); var pipeReader = PipeReader.Create(fileStream); + return await ReadFromPipe(pipeReader, employeeRecords, position); + } + + public async Task ProcessWithFullPipeAsync(string filePath, Employee[] employeeRecords) + { + var position = 0; + if (!File.Exists(filePath)) return position; + await using var fileStream = File.OpenRead(filePath); + Pipe p = new(); + var fillPipe = FillPipe(fileStream, p.Writer); + var readPipe = ReadFromPipe(p.Reader, employeeRecords, position); + await Task.WhenAll(fillPipe, readPipe); + + return await readPipe; + } + + private static async Task ReadFromPipe(PipeReader pipeReader, Employee[] employeeRecords, int position) + { + int pos = position; while (true) { var fileData = await pipeReader.ReadAsync(); @@ -30,7 +49,7 @@ public async Task ProcessFileAsync(string filePath, Employee[] employeeReco // convert to Buffer var fileDataBuffer = fileData.Buffer; - var sequencePosition = ParseLines(employeeRecords, fileDataBuffer, ref position); + var sequencePosition = ParseLines(employeeRecords, fileDataBuffer, ref pos); pipeReader.AdvanceTo(sequencePosition, fileDataBuffer.End); @@ -40,8 +59,14 @@ public async Task ProcessFileAsync(string filePath, Employee[] employeeReco } } - await pipeReader.CompleteAsync(); // marking pipereader as Completed - return position; + await pipeReader.CompleteAsync(); // marking pipe reader as Completed + return pos; + } + + private async Task FillPipe(FileStream fileStream, PipeWriter writer) + { + await fileStream.CopyToAsync(writer.AsStream()); + await writer.CompleteAsync(); } private static SequencePosition ParseLines(Employee[] employeeRecords, in ReadOnlySequence buffer, ref int position) diff --git a/tests/FileIO.Benchmarks/FileIOTest.cs b/tests/FileIO.Benchmarks/FileIOTest.cs index 2d55254..4b97fc5 100644 --- a/tests/FileIO.Benchmarks/FileIOTest.cs +++ b/tests/FileIO.Benchmarks/FileIOTest.cs @@ -39,6 +39,23 @@ public async Task PipeLines() } } + [Benchmark] + public async Task FullPipe() + { + var pool = ArrayPool.Shared; + var employeeRecords = pool.Rent(100000); + var pipeLinesTest = new WithPipeLines(); + + try + { + await pipeLinesTest.ProcessWithFullPipeAsync(_filePath, employeeRecords); + } + finally + { + pool.Return(employeeRecords, clearArray: true); + } + } + [Benchmark] public async Task> AsyncStream() {