Skip to content

Commit

Permalink
[feature] Count newlines during decompression to maket it work with n…
Browse files Browse the repository at this point in the history
…on-seekable input
  • Loading branch information
mxmlnkn committed May 26, 2024
1 parent 71e97bd commit 17e2eb7
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 56 deletions.
25 changes: 23 additions & 2 deletions src/rapidgzip/ChunkData.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ struct ChunkData :
bool crc32Enabled{ true };
std::optional<CompressionType> windowCompressionType;
bool windowSparsity{ true };
bool countNewlines{ false };
};

struct Subchunk
Expand All @@ -113,6 +114,7 @@ struct ChunkData :
&& ( decodedOffset == other.decodedOffset )
&& ( encodedSize == other.encodedSize )
&& ( decodedSize == other.decodedSize )
&& ( newlineCount == other.newlineCount )
&& ( static_cast<bool>( window ) == static_cast<bool>( other.window ) )
&& ( !static_cast<bool>( window ) || !static_cast<bool>( other.window )
|| ( *window == *other.window ) );
Expand All @@ -123,6 +125,7 @@ struct ChunkData :
size_t decodedOffset{ 0 };
size_t encodedSize{ 0 };
size_t decodedSize{ 0 };
std::optional<size_t> newlineCount{};
SharedWindow window{};
std::vector<bool> usedWindowSymbols{};
};
Expand Down Expand Up @@ -169,6 +172,7 @@ struct ChunkData :
fileType( configuration.fileType ),
splitChunkSize( configuration.splitChunkSize ),
windowSparsity( configuration.windowSparsity ),
countNewlines( configuration.countNewlines ),
m_windowCompressionType( configuration.windowCompressionType )
{
setCRC32Enabled( configuration.crc32Enabled );
Expand Down Expand Up @@ -337,9 +341,23 @@ struct ChunkData :
}
subchunk.usedWindowSymbols = std::vector<bool>(); // Free memory!
subchunk.window = std::make_shared<Window>( std::move( subchunkWindow ), windowCompressionType );

/* Count lines if requested. */
if ( countNewlines && !subchunk.newlineCount ) {
size_t newlineCount = 0;
using rapidgzip::deflate::DecodedData;
for ( auto it = DecodedData::Iterator( *this, subchunk.decodedOffset, subchunk.decodedSize );
static_cast<bool>( it ); ++it )
{
const auto& [buffer, size] = *it;
newlineCount += ::countNewlines( { reinterpret_cast<const char*>( buffer ), size } );
}
subchunk.newlineCount = newlineCount;
}
}
statistics.compressWindowDuration += duration( tWindowCompressionStart );

/* Check that it counts as fully post-processed from here on. */
if ( !hasBeenPostProcessed() ) {
std::stringstream message;
message << "[Info] Chunk is not recognized as post-processed even though it has been!\n";
Expand Down Expand Up @@ -449,8 +467,10 @@ struct ChunkData :
hasBeenPostProcessed() const
{
const auto subchunkHasBeenProcessed =
[] ( const auto& subchunk ) {
return static_cast<bool>( subchunk.window ) && subchunk.usedWindowSymbols.empty();
[this] ( const auto& subchunk ) {
return static_cast<bool>( subchunk.window )
&& subchunk.usedWindowSymbols.empty()
&& ( subchunk.newlineCount.has_value() || !countNewlines );
};
return !m_subchunks.empty() && !containsMarkers()
&& std::all_of( m_subchunks.begin(), m_subchunks.end(), subchunkHasBeenProcessed );
Expand Down Expand Up @@ -523,6 +543,7 @@ struct ChunkData :
bool stoppedPreemptively{ false };

bool windowSparsity{ true };
bool countNewlines{ false };

protected:
/**
Expand Down
59 changes: 53 additions & 6 deletions src/rapidgzip/GzipChunkFetcher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@

namespace rapidgzip
{
struct NewlineOffset
{
uint64_t lineOffset{ 0 };
uint64_t uncompressedOffsetInBytes{ 0 };
};


template<typename T_FetchingStrategy,
typename T_ChunkData = ChunkData>
class GzipChunkFetcher final :
Expand All @@ -46,6 +53,7 @@ class GzipChunkFetcher final :
using BlockFinder = typename BaseType::BlockFinder;
using PostProcessingFutures = std::map</* block offset */ size_t, std::future<void> >;
using UniqueSharedFileReader = std::unique_ptr<SharedFileReader>;
using SharedNewlineOffsets = std::shared_ptr<std::vector<NewlineOffset> >;

static constexpr bool REPLACE_MARKERS_IN_PARALLEL = true;

Expand Down Expand Up @@ -75,12 +83,14 @@ class GzipChunkFetcher final :
std::shared_ptr<BlockFinder> blockFinder,
std::shared_ptr<BlockMap> blockMap,
std::shared_ptr<WindowMap> windowMap,
SharedNewlineOffsets newlineOffsets,
size_t parallelization ) :
BaseType( blockFinder, parallelization ),
m_sharedFileReader( std::move( sharedFileReader ) ),
m_blockFinder( std::move( blockFinder ) ),
m_blockMap( std::move( blockMap ) ),
m_windowMap( std::move( windowMap ) ),
m_newlineOffsets( std::move( newlineOffsets ) ),
m_isBgzfFile( m_blockFinder->fileType() == FileType::BGZF )
{
if ( !m_sharedFileReader ) {
Expand Down Expand Up @@ -199,6 +209,7 @@ class GzipChunkFetcher final :
? toString( *m_windowCompressionType )
: std::string( "Default" ) ) << "\n";
out << " Window sparsity : " << m_windowSparsity << "\n";
out << " Count newlines : " << m_countNewlines << "\n";

std::cerr << std::move( out ).str();
}
Expand Down Expand Up @@ -257,6 +268,12 @@ class GzipChunkFetcher final :
m_windowSparsity = useSparseWindows;
}

void
setCountNewlines( bool countNewlines )
{
m_countNewlines = countNewlines;
}

private:
[[nodiscard]] std::pair</* decoded offset */ size_t, std::shared_ptr<ChunkData> >
getIndexedChunk( const size_t offset,
Expand Down Expand Up @@ -373,9 +390,9 @@ class GzipChunkFetcher final :
const std::vector<typename ChunkData::Subchunk>& subchunks,
const FasterVector<uint8_t>& lastWindow )
{
for ( const auto& boundary : subchunks ) {
m_blockMap->push( boundary.encodedOffset, boundary.encodedSize, boundary.decodedSize );
m_blockFinder->insert( boundary.encodedOffset + boundary.encodedSize );
for ( const auto& subchunk : subchunks ) {
m_blockMap->push( subchunk.encodedOffset, subchunk.encodedSize, subchunk.decodedSize );
m_blockFinder->insert( subchunk.encodedOffset + subchunk.encodedSize );
}

if ( subchunks.size() > 1 ) {
Expand All @@ -387,10 +404,10 @@ class GzipChunkFetcher final :
const auto lookupKey = !BaseType::test( chunkOffset ) && BaseType::test( partitionOffset )
? partitionOffset
: chunkOffset;
for ( const auto& boundary : subchunks ) {
for ( const auto& subchunk : subchunks ) {
/* This condition could be removed but makes the map slightly smaller. */
if ( boundary.encodedOffset != chunkOffset ) {
m_unsplitBlocks.emplace( boundary.encodedOffset, lookupKey );
if ( subchunk.encodedOffset != chunkOffset ) {
m_unsplitBlocks.emplace( subchunk.encodedOffset, lookupKey );
}
}
}
Expand Down Expand Up @@ -456,6 +473,33 @@ class GzipChunkFetcher final :
#endif
}
}

if ( subchunk.newlineCount ) {
const auto blockInfo = m_blockMap->getEncodedOffset( subchunk.encodedOffset );
if ( !blockInfo ) {
std::stringstream message;
message << "Failed to find subchunk offset: " << formatBits( subchunk.encodedOffset )
<< "even though it should have been inserted at the top of this method!";
throw std::logic_error( std::move( message ).str() );
}

if ( m_newlineOffsets->empty() ) {
m_newlineOffsets->emplace_back( NewlineOffset{ 0, 0 } );
}

const auto& lastLineCount = m_newlineOffsets->back();
if ( lastLineCount.uncompressedOffsetInBytes != blockInfo->decodedOffsetInBytes ) {
std::stringstream message;
message << "Did not find line count for preceding decompressed offset: "
<< formatBytes( blockInfo->decodedOffsetInBytes );
throw std::logic_error( std::move( message ).str() );
}

m_newlineOffsets->emplace_back( NewlineOffset{
lastLineCount.lineOffset + *subchunk.newlineCount,
blockInfo->decodedOffsetInBytes + subchunk.decodedSize
} );
}
}

m_statistics.queuePostProcessingDuration += duration( t0 );
Expand Down Expand Up @@ -698,6 +742,7 @@ class GzipChunkFetcher final :
chunkDataConfiguration.splitChunkSize = m_blockFinder->spacingInBits() / 8U;
chunkDataConfiguration.windowCompressionType = m_windowCompressionType;
chunkDataConfiguration.windowSparsity = m_windowSparsity;
chunkDataConfiguration.countNewlines = m_countNewlines;

/* If we are a BGZF file and we have not imported an index, then we can assume the
* window to be empty because we should only get offsets at gzip stream starts.
Expand Down Expand Up @@ -763,6 +808,7 @@ class GzipChunkFetcher final :
std::shared_ptr<BlockFinder> const m_blockFinder;
std::shared_ptr<BlockMap> const m_blockMap;
std::shared_ptr<WindowMap> const m_windowMap;
std::shared_ptr<std::vector<NewlineOffset> > const m_newlineOffsets;

const bool m_isBgzfFile;
std::atomic<size_t> m_maxDecompressedChunkSize{ std::numeric_limits<size_t>::max() };
Expand All @@ -777,5 +823,6 @@ class GzipChunkFetcher final :
PostProcessingFutures m_markersBeingReplaced;
std::optional<CompressionType> m_windowCompressionType;
bool m_windowSparsity{ true };
bool m_countNewlines{ false };
};
} // namespace rapidgzip
Loading

0 comments on commit 17e2eb7

Please sign in to comment.