Skip to content

Memorify DeflateManagedStream #47389

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -156,60 +156,48 @@ public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, Asy
public override int EndRead(IAsyncResult asyncResult) =>
TaskToApm.End<int>(asyncResult);

public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
private ValueTask<int> ReadAsyncInternal(Memory<byte> buffer, CancellationToken cancellationToken)
{
// We use this checking order for compat to earlier versions:
if (_asyncOperations != 0)
throw new InvalidOperationException(SR.InvalidBeginCall);

ValidateBufferArguments(buffer, offset, count);
EnsureNotDisposed();

if (cancellationToken.IsCancellationRequested)
{
return Task.FromCanceled<int>(cancellationToken);
return ValueTask.FromCanceled<int>(cancellationToken);
}

Interlocked.Increment(ref _asyncOperations);
Task<int>? readTask = null;
bool startedAsyncOperation = false;

try
{
// Try to read decompressed data in output buffer
int bytesRead = _inflater.Inflate(buffer, offset, count);
int bytesRead = _inflater.Inflate(buffer);
if (bytesRead != 0)
{
// If decompression output buffer is not empty, return immediately.
return Task.FromResult(bytesRead);
return ValueTask.FromResult(bytesRead);
}

if (_inflater.Finished())
{
// end of compression stream
return Task.FromResult(0);
return ValueTask.FromResult(0);
}

// If there is no data on the output buffer and we are not at
// the end of the stream, we need to get more data from the base stream
readTask = _stream!.ReadAsync(_buffer, 0, _buffer.Length, cancellationToken);
if (readTask == null)
{
throw new InvalidOperationException(SR.NotSupported_UnreadableStream);
}

return ReadAsyncCore(readTask, buffer, offset, count, cancellationToken);
startedAsyncOperation = true;
return ReadAsyncCore(_stream!.ReadAsync(_buffer.AsMemory(), cancellationToken), buffer, cancellationToken);
}
finally
{
// if we haven't started any async work, decrement the counter to end the transaction
if (readTask == null)
if (!startedAsyncOperation)
{
Interlocked.Decrement(ref _asyncOperations);
}
}
}

private async Task<int> ReadAsyncCore(Task<int> readTask, byte[] buffer, int offset, int count, CancellationToken cancellationToken)
private async ValueTask<int> ReadAsyncCore(ValueTask<int> readTask, Memory<byte> buffer, CancellationToken cancellationToken)
{
try
{
Expand All @@ -234,17 +222,13 @@ private async Task<int> ReadAsyncCore(Task<int> readTask, byte[] buffer, int off

// Feed the data from base stream into decompression engine
_inflater.SetInput(_buffer, 0, bytesRead);
bytesRead = _inflater.Inflate(buffer, offset, count);
bytesRead = _inflater.Inflate(buffer);

if (bytesRead == 0 && !_inflater.Finished())
{
// We could have read in head information and didn't get any data.
// Read from the base stream again.
readTask = _stream!.ReadAsync(_buffer, 0, _buffer.Length, cancellationToken);
if (readTask == null)
{
throw new InvalidOperationException(SR.NotSupported_UnreadableStream);
}
readTask = _stream!.ReadAsync(_buffer.AsMemory(), cancellationToken);
}
else
{
Expand All @@ -258,6 +242,29 @@ private async Task<int> ReadAsyncCore(Task<int> readTask, byte[] buffer, int off
}
}

public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
// We use this checking order for compat to earlier versions:
if (_asyncOperations != 0)
throw new InvalidOperationException(SR.InvalidBeginCall);

ValidateBufferArguments(buffer, offset, count);
EnsureNotDisposed();

return ReadAsyncInternal(buffer.AsMemory(offset, count), cancellationToken).AsTask();
}

public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
{
// We use this checking order for compat to earlier versions:
if (_asyncOperations != 0)
throw new InvalidOperationException(SR.InvalidBeginCall);

EnsureNotDisposed();

return ReadAsyncInternal(buffer, cancellationToken);
}

public override void Write(byte[] buffer, int offset, int count)
{
throw new InvalidOperationException(SR.CannotWriteToDeflateStream);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Buffers;
using System.Diagnostics;
using System.Runtime.InteropServices;

namespace System.IO.Compression
{
Expand Down Expand Up @@ -108,7 +110,7 @@ public void SetInput(byte[] inputBytes, int offset, int length) =>

public int AvailableOutput => _output.AvailableBytes;

public int Inflate(byte[] bytes, int offset, int length)
public int Inflate(Memory<byte> bytes)
{
// copy bytes from output to outputbytes if we have available bytes
// if buffer is not filled up. keep decoding until no input are available
Expand All @@ -119,14 +121,14 @@ public int Inflate(byte[] bytes, int offset, int length)
int copied = 0;
if (_uncompressedSize == -1)
{
copied = _output.CopyTo(bytes, offset, length);
copied = _output.CopyTo(bytes);
}
else
{
if (_uncompressedSize > _currentInflatedCount)
{
length = (int)Math.Min(length, _uncompressedSize - _currentInflatedCount);
copied = _output.CopyTo(bytes, offset, length);
bytes = bytes.Slice(0, (int)Math.Min(bytes.Length, _uncompressedSize - _currentInflatedCount));
copied = _output.CopyTo(bytes);
_currentInflatedCount += copied;
}
else
Expand All @@ -140,16 +142,27 @@ public int Inflate(byte[] bytes, int offset, int length)
if (_hasFormatReader)
{
Debug.Assert(_formatReader != null);
_formatReader.UpdateWithBytesRead(bytes, offset, copied);
// Skip the copy if bytes is actually an array.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is all dead code. _formatReader is always going to be null.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's awesome! I think what I'm going to do is wait until #47408 is merged, and then I'll redo this pull-request. That way we'll avoid merge conflicts.

if (MemoryMarshal.TryGetArray(bytes, out ArraySegment<byte> segment))
{
_formatReader.UpdateWithBytesRead(segment.Array!, segment.Offset, segment.Count);
}
else
{
byte[] rentedBuffer = ArrayPool<byte>.Shared.Rent(copied);
bytes.Slice(0, copied).CopyTo(rentedBuffer.AsMemory());
_formatReader.UpdateWithBytesRead(rentedBuffer, 0, copied);
ArrayPool<byte>.Shared.Return(rentedBuffer);
}
}

offset += copied;
bytes = bytes.Slice(copied);
count += copied;
length -= copied;
}

if (length == 0)
{ // filled in the bytes array
if (bytes.IsEmpty)
{
// Filled in the buffer.
break;
}
// Decode will return false when more input is needed
Expand All @@ -169,6 +182,11 @@ public int Inflate(byte[] bytes, int offset, int length)
return count;
}

public int Inflate(byte[] bytes, int offset, int length)
{
return Inflate(bytes.AsMemory(offset, length));
}

//Each block of compressed data begins with 3 header bits
// containing the following data:
// first bit BFINAL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,15 @@ namespace System.IO.Compression

internal sealed class InputBuffer
{
private byte[]? _buffer; // byte array to store input
private int _start; // start poisition of the buffer
private int _end; // end position of the buffer
private Memory<byte> _buffer; // byte array to store input
private uint _bitBuffer; // store the bits here, we can quickly shift in this buffer
private int _bitsInBuffer; // number of bits available in bitBuffer

/// <summary>Total bits available in the input buffer.</summary>
public int AvailableBits => _bitsInBuffer;

/// <summary>Total bytes available in the input buffer.</summary>
public int AvailableBytes => (_end - _start) + (_bitsInBuffer / 8);
public int AvailableBytes => _buffer.Length + (_bitsInBuffer / 8);

/// <summary>Ensure that count bits are in the bit buffer.</summary>
/// <param name="count">Can be up to 16.</param>
Expand All @@ -43,9 +41,9 @@ public bool EnsureBitsAvailable(int count)
{
return false;
}
Debug.Assert(_buffer != null);
// insert a byte to bitbuffer
_bitBuffer |= (uint)_buffer[_start++] << _bitsInBuffer;
_bitBuffer |= (uint)_buffer.Span[0] << _bitsInBuffer;
_buffer = _buffer.Slice(1);
_bitsInBuffer += 8;

if (_bitsInBuffer < count)
Expand All @@ -55,7 +53,8 @@ public bool EnsureBitsAvailable(int count)
return false;
}
// insert a byte to bitbuffer
_bitBuffer |= (uint)_buffer[_start++] << _bitsInBuffer;
_bitBuffer |= (uint)_buffer.Span[0] << _bitsInBuffer;
_buffer = _buffer.Slice(1);
_bitsInBuffer += 8;
}
}
Expand All @@ -72,26 +71,28 @@ public bool EnsureBitsAvailable(int count)
/// </summary>
public uint TryLoad16Bits()
{
Debug.Assert(_buffer != null);
if (_bitsInBuffer < 8)
{
if (_start < _end)
if (!_buffer.IsEmpty)
{
_bitBuffer |= (uint)_buffer[_start++] << _bitsInBuffer;
_bitBuffer |= (uint)_buffer.Span[0] << _bitsInBuffer;
_buffer = _buffer.Slice(1);
_bitsInBuffer += 8;
}

if (_start < _end)
if (!_buffer.IsEmpty)
{
_bitBuffer |= (uint)_buffer[_start++] << _bitsInBuffer;
_bitBuffer |= (uint)_buffer.Span[0] << _bitsInBuffer;
_buffer = _buffer.Slice(1);
_bitsInBuffer += 8;
}
}
else if (_bitsInBuffer < 16)
{
if (_start < _end)
if (!_buffer.IsEmpty)
{
_bitBuffer |= (uint)_buffer[_start++] << _bitsInBuffer;
_bitBuffer |= (uint)_buffer.Span[0] << _bitsInBuffer;
_buffer = _buffer.Slice(1);
_bitsInBuffer += 8;
}
}
Expand Down Expand Up @@ -147,23 +148,36 @@ public int CopyTo(byte[] output, int offset, int length)
return bytesFromBitBuffer;
}

int avail = _end - _start;
if (length > avail)
if (length > _buffer.Length)
{
length = avail;
length = _buffer.Length;
}

Debug.Assert(_buffer != null);
Array.Copy(_buffer, _start, output, offset, length);
_start += length;
_buffer.CopyTo(output.AsMemory(offset, length));
_buffer = _buffer.Slice(length);
return bytesFromBitBuffer + length;
}

/// <summary>
/// Return true is all input bytes are used.
/// This means the caller can call SetInput to add more input.
/// </summary>
public bool NeedsInput() => _start == _end;
public bool NeedsInput() => _buffer.IsEmpty;

/// <summary>
/// Set the buffer to be processed.
/// All the bits remained in bitBuffer will be processed before the new bytes.
/// We don't clone the buffer here since it is expensive.
/// The caller should make sure after a buffer is passed in.
/// It will not be changed before calling this function again.
/// </summary>
public void SetInput(Memory<byte> buffer)
{
if (_buffer.IsEmpty)
{
_buffer = buffer;
}
}

/// <summary>
/// Set the byte array to be processed.
Expand All @@ -179,12 +193,7 @@ public void SetInput(byte[] buffer, int offset, int length)
Debug.Assert(length >= 0);
Debug.Assert(offset <= buffer.Length - length);

if (_start == _end)
{
_buffer = buffer;
_start = offset;
_end = offset + length;
}
SetInput(buffer.AsMemory(offset, length));
}

/// <summary>Skip n bits in the buffer.</summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,38 +117,45 @@ public int CopyFrom(InputBuffer input, int length)
/// <summary>Bytes not consumed in output window.</summary>
public int AvailableBytes => _bytesUsed;

/// <summary>Copy the decompressed bytes to output array.</summary>
public int CopyTo(byte[] output, int offset, int length)
/// <summary>Copy the decompressed bytes to output buffer.</summary>
public int CopyTo(Memory<byte> output)
{
int copy_end;

if (length > _bytesUsed)
if (output.Length > _bytesUsed)
{
// we can copy all the decompressed bytes out
copy_end = _end;
length = _bytesUsed;
output = output.Slice(0, _bytesUsed);
}
else
{
copy_end = (_end - _bytesUsed + length) & WindowMask; // copy length of bytes
copy_end = (_end - _bytesUsed + output.Length) & WindowMask; // copy length of bytes
}

int copied = length;
int copied = output.Length;

int tailLen = length - copy_end;
int tailLen = output.Length - copy_end;
int length = output.Length;
if (tailLen > 0)
{
// this means we need to copy two parts separately
// copy tailLen bytes from the end of output window
Array.Copy(_window, WindowSize - tailLen,
output, offset, tailLen);
offset += tailLen;
// copy tailLen bytes from the end of the output window
_window.AsMemory(WindowSize - tailLen, tailLen).CopyTo(output);

output = output.Slice(tailLen);
length = copy_end;
}
Array.Copy(_window, copy_end - length, output, offset, length);
_window.AsMemory(copy_end - length, length).CopyTo(output);
_bytesUsed -= copied;
Debug.Assert(_bytesUsed >= 0, "check this function and find why we copied more bytes than we have");
Debug.Assert(_bytesUsed >= 0, "check this function and find out why we copied more bytes than we have");
return copied;
}

/// <summary>Copy the decompressed bytes to output array.</summary>
public int CopyTo(byte[] output, int offset, int length)
{
return CopyTo(output.AsMemory(offset, length));
}
}
}