Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Zhenchi <[email protected]>
  • Loading branch information
zhongzc committed Jun 25, 2024
1 parent 9694925 commit 0f70f9b
Showing 1 changed file with 30 additions and 37 deletions.
67 changes: 30 additions & 37 deletions src/puffin/src/puffin_manager/cached_puffin_manager/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,23 +123,8 @@ where
.context(BlobNotFoundSnafu { blob: key })?;
let reader = file.blob_reader(blob_metadata)?;

let size = match blob_metadata.compression_codec {
Some(CompressionCodec::Lz4) => {
return UnsupportedDecompressionSnafu {
decompression: "lz4",
}
.fail();
}
Some(CompressionCodec::Zstd) => {
let reader = ZstdDecoder::new(BufReader::new(reader));
futures::io::copy(reader, &mut writer)
.await
.context(WriteSnafu)?
}
None => futures::io::copy(reader, &mut writer)
.await
.context(WriteSnafu)?,
};
let compression = blob_metadata.compression_codec;
let size = Self::handle_decompress(reader, &mut writer, compression).await?;

Ok(size)
})
Expand Down Expand Up @@ -185,29 +170,37 @@ where
);

let reader = file.blob_reader(blob_meta)?;
let mut writer = writer_provider.writer(&file_meta.relative_path).await?;
match blob_meta.compression_codec {
Some(CompressionCodec::Lz4) => {
UnsupportedDecompressionSnafu {
decompression: "lz4",
}
.fail()?;
}
Some(CompressionCodec::Zstd) => {
let reader = ZstdDecoder::new(BufReader::new(reader));
size += futures::io::copy(reader, &mut writer)
.await
.context(WriteSnafu)?;
}
None => {
size += futures::io::copy(reader, &mut writer)
.await
.context(WriteSnafu)?;
}
}
let writer = writer_provider.writer(&file_meta.relative_path).await?;

let compression = blob_meta.compression_codec;
size += Self::handle_decompress(reader, writer, compression).await?;
}

Ok(size)
})
}

/// Handles the decompression of the reader and writes the decompressed data to the writer.
/// Returns the number of bytes written.
async fn handle_decompress(
reader: impl AsyncRead,
mut writer: impl AsyncWrite + Unpin,
compression: Option<CompressionCodec>,
) -> Result<u64> {
match compression {
Some(CompressionCodec::Lz4) => UnsupportedDecompressionSnafu {
decompression: "lz4",
}
.fail(),
Some(CompressionCodec::Zstd) => {
let reader = ZstdDecoder::new(BufReader::new(reader));
futures::io::copy(reader, &mut writer)
.await
.context(WriteSnafu)
}
None => futures::io::copy(reader, &mut writer)
.await
.context(WriteSnafu),
}
}
}

0 comments on commit 0f70f9b

Please sign in to comment.