Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(clp): Add the write path for single-file archives. #646

Open
wants to merge 23 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 22 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions components/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,9 @@ set(SOURCE_FILES_unitTest
src/clp/streaming_archive/reader/Segment.hpp
src/clp/streaming_archive/reader/SegmentManager.cpp
src/clp/streaming_archive/reader/SegmentManager.hpp
src/clp/streaming_archive/single_file_archive/Defs.hpp
src/clp/streaming_archive/single_file_archive/writer.cpp
src/clp/streaming_archive/single_file_archive/writer.hpp
Comment on lines +508 to +510
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Missing test coverage for the new single-file archive implementation

The new single-file archive files have been properly added to the CMake configuration, but I don't see any corresponding test files for this functionality. While there are several test files in the codebase:

  • There's only one test file that mentions "Archive": test-clp_s-end_to_end.cpp
  • None of the existing test files appear to specifically cover the new single-file archive implementation

Recommended actions:

  • Create unit tests for the new single-file archive writer implementation
  • Consider adding tests in these areas:
    • Basic writer functionality in writer.cpp
    • Edge cases and error handling
    • Integration with the existing archive system
🔗 Analysis chain

LGTM! The new single-file archive source files are properly integrated.

The files are correctly placed in a dedicated directory and follow the project's naming conventions.

Let's verify the test coverage for these new files:

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for corresponding test files and test coverage for the new SFA implementation.

# Look for test files related to single-file archive
rg -l "single_file_archive" tests/

# Check for test cases using the new writer
rg -A 5 "TEST.*single.*file.*archive" tests/

Length of output: 175


Script:

#!/bin/bash
# Let's first find the test directory location
fd -t d "test" .

# Then search for single file archive related files and tests
rg -l "single_file_archive" .

# Look for test cases specifically
rg -A 5 "TEST.*single.*file.*archive" .

Length of output: 1270


Script:

#!/bin/bash
# Let's check the test files in the core component
fd -t f ".*test.*\.(cpp|hpp)$" components/core/tests/

# Look for any test files that might be testing the archive functionality
rg -l "Archive" components/core/tests/

# Check if there are any test files that include "writer" in their name or content
rg -l "writer" components/core/tests/

Length of output: 1828

src/clp/streaming_archive/writer/Archive.cpp
src/clp/streaming_archive/writer/Archive.hpp
src/clp/streaming_archive/writer/File.cpp
Expand Down
3 changes: 3 additions & 0 deletions components/core/src/clp/clp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ set(
../streaming_archive/reader/Segment.hpp
../streaming_archive/reader/SegmentManager.cpp
../streaming_archive/reader/SegmentManager.hpp
../streaming_archive/single_file_archive/Defs.hpp
../streaming_archive/single_file_archive/writer.cpp
../streaming_archive/single_file_archive/writer.hpp
../streaming_archive/writer/Archive.cpp
../streaming_archive/writer/Archive.hpp
../streaming_archive/writer/File.cpp
Expand Down
4 changes: 4 additions & 0 deletions components/core/src/clp/clp/CommandLineArguments.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,10 @@ CommandLineArguments::parse_arguments(int argc, char const* argv[]) {
->default_value(m_schema_file_path),
"Path to a schema file. If not specified, heuristics are used to determine "
"dictionary variables. See README-Schema.md for details."
)(
"single-file-archive",
po::bool_switch(&m_single_file_archive),
"Output archive as a single-file archive"
);

po::options_description all_compression_options;
Expand Down
4 changes: 4 additions & 0 deletions components/core/src/clp/clp/CommandLineArguments.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class CommandLineArguments : public CommandLineArgumentsBase {
explicit CommandLineArguments(std::string const& program_name)
: CommandLineArgumentsBase(program_name),
m_show_progress(false),
m_single_file_archive(false),
m_sort_input_files(true),
m_print_archive_stats_progress(false),
m_target_segment_uncompressed_size(1L * 1024 * 1024 * 1024),
Expand All @@ -45,6 +46,8 @@ class CommandLineArguments : public CommandLineArgumentsBase {

bool show_progress() const { return m_show_progress; }

[[nodiscard]] auto single_file_archive() const -> bool { return m_single_file_archive; }

bool sort_input_files() const { return m_sort_input_files; }

bool print_archive_stats_progress() const { return m_print_archive_stats_progress; }
Expand Down Expand Up @@ -92,6 +95,7 @@ class CommandLineArguments : public CommandLineArgumentsBase {
std::string m_output_dir;
std::string m_schema_file_path;
bool m_show_progress;
bool m_single_file_archive;
bool m_print_archive_stats_progress;
size_t m_target_encoded_file_size;
size_t m_target_segment_uncompressed_size;
Expand Down
12 changes: 9 additions & 3 deletions components/core/src/clp/clp/FileCompressor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,9 @@ void FileCompressor::parse_and_encode_with_heuristic(

// Parse content from file
while (m_message_parser.parse_next_message(true, reader, m_parsed_message)) {
if (archive_writer.get_data_size_of_dictionaries() >= target_data_size_of_dicts) {
if (archive_writer.get_data_size_of_dictionaries() >= target_data_size_of_dicts
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just thinking, would it be cleaner if we add a new method with name like "should_split" to archive_writer, and embed this if logic into the method.

Now the same if statements have been duplicated at multiple places, which is inefficient and error prone since one change requires you to update multiple places

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will wait on making this change until I get feedback from kirk. I may want to remove no split functionality even though it is done in private branch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good

&& false == archive_writer.get_use_single_file_archive())
{
split_file_and_archive(
archive_user_config,
path_for_compression,
Expand Down Expand Up @@ -337,7 +339,9 @@ bool FileCompressor::try_compressing_as_archive(
parent_directories.emplace(file_parent_path);
}

if (archive_writer.get_data_size_of_dictionaries() >= target_data_size_of_dicts) {
if (archive_writer.get_data_size_of_dictionaries() >= target_data_size_of_dicts
&& false == archive_writer.get_use_single_file_archive())
{
split_archive(archive_user_config, archive_writer);
}

Expand Down Expand Up @@ -537,7 +541,9 @@ std::error_code FileCompressor::compress_ir_stream_by_encoding(
}

// Split archive/encoded file if necessary before writing the new event
if (archive.get_data_size_of_dictionaries() >= target_data_size_of_dicts) {
if (archive.get_data_size_of_dictionaries() >= target_data_size_of_dicts
&& false == archive.get_use_single_file_archive())
{
split_file_and_archive(
archive_user_config,
path,
Expand Down
9 changes: 7 additions & 2 deletions components/core/src/clp/clp/compression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ bool compress(
archive_user_config.global_metadata_db = global_metadata_db.get();
archive_user_config.print_archive_stats_progress
= command_line_args.print_archive_stats_progress();
archive_user_config.use_single_file_archive = command_line_args.single_file_archive();

// Open Archive
streaming_archive::writer::Archive archive_writer;
Expand Down Expand Up @@ -135,7 +136,9 @@ bool compress(
);
}
for (auto it = files_to_compress.cbegin(); it != files_to_compress.cend(); ++it) {
if (archive_writer.get_data_size_of_dictionaries() >= target_data_size_of_dictionaries) {
if (archive_writer.get_data_size_of_dictionaries() >= target_data_size_of_dictionaries
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBD

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto for this

&& false == archive_writer.get_use_single_file_archive())
{
split_archive(archive_user_config, archive_writer);
}
if (false
Expand Down Expand Up @@ -163,7 +166,9 @@ bool compress(
file_group_id_comparator);
// Compress grouped files
for (auto const& file_to_compress : grouped_files_to_compress) {
if (archive_writer.get_data_size_of_dictionaries() >= target_data_size_of_dictionaries) {
if (archive_writer.get_data_size_of_dictionaries() >= target_data_size_of_dictionaries
&& false == archive_writer.get_use_single_file_archive())
{
split_archive(archive_user_config, archive_writer);
}
if (false
Expand Down
37 changes: 37 additions & 0 deletions components/core/src/clp/streaming_archive/ArchiveMetadata.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,16 @@
#include <cstdint>

#include "../Defs.h"
#include "../ffi/encoding_methods.hpp"
#include "../FileReader.hpp"
#include "../FileWriter.hpp"
#include "Constants.hpp"
#include "msgpack.hpp"

namespace clp::streaming_archive {

static constexpr std::string_view cCompressionTypeZstd = "ZSTD";

/**
* A class to encapsulate metadata directly relating to an archive.
*/
Expand Down Expand Up @@ -79,6 +84,18 @@ class ArchiveMetadata {

[[nodiscard]] auto get_end_timestamp() const { return m_end_timestamp; }

[[nodiscard]] auto get_variable_encoding_methods_version() const -> std::string const& {
return m_variable_encoding_methods_version;
}

[[nodiscard]] auto get_variables_schema_version() const -> std::string const& {
return m_variables_schema_version;
}

[[nodiscard]] auto get_compression_type() const -> std::string const& {
return m_compression_type;
}

/**
* Expands the archive's time range based to encompass the given time range
* @param begin_timestamp
Expand All @@ -88,6 +105,20 @@ class ArchiveMetadata {

void write_to_file(FileWriter& file_writer) const;

// MsgPack serialization used for single-file archive format. Variables are renamed when
// serialized to match single-file archive specification.
MSGPACK_DEFINE_MAP(
MSGPACK_NVP("archive_format_version", m_archive_format_version),
MSGPACK_NVP("variable_encoding_methods_version", m_variable_encoding_methods_version),
MSGPACK_NVP("variables_schema_version", m_variables_schema_version),
MSGPACK_NVP("compression_type", m_compression_type),
MSGPACK_NVP("creator_id", m_creator_id),
MSGPACK_NVP("begin_timestamp", m_begin_timestamp),
MSGPACK_NVP("end_timestamp", m_end_timestamp),
MSGPACK_NVP("uncompressed_size", m_uncompressed_size),
MSGPACK_NVP("compressed_size", m_compressed_size)
);

private:
// Variables
archive_format_version_t m_archive_format_version{cArchiveFormatVersion};
Expand All @@ -102,6 +133,12 @@ class ArchiveMetadata {
// The size of the archive
uint64_t m_compressed_size{0};
uint64_t m_dynamic_compressed_size{0};
// TODO: The following fields are used in single-file archive; however, they are not
// currently part of multi-file archive metadata. Modifying multi-file archive metadata
// disk format is potentially a breaking change and not currently required.
std::string m_variable_encoding_methods_version{ffi::cVariableEncodingMethodsVersion};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess a high level question is. how are those constants related to multi-file-archive?

Technically, since they are not used by anything else, they can also be directly defined under struct "single_archive_metadata"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally these fields are written to multi-file archive metadata. However, since this is actually changing multi-file archives, I didn't want to make the change until we here from @kirkrodrigues. If there is no plan to add these to the multi-file metadata, perhaps we can move them into single file archive files

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I didn't fully answer your question, but I asked kirk about these fields earlier and supposedly they are also relevant to multi file archive

std::string m_variables_schema_version{ffi::cVariablesSchemaVersion};
std::string m_compression_type{cCompressionTypeZstd};
};
} // namespace clp::streaming_archive

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
#ifndef CLP_STREAMING_ARCHIVE_SINGLE_FILE_ARCHIVE_DEFS_HPP
#define CLP_STREAMING_ARCHIVE_SINGLE_FILE_ARCHIVE_DEFS_HPP

#include <cstdint>
#include <string>

#include "../../Defs.h"
#include "../ArchiveMetadata.hpp"
#include "../Constants.hpp"
#include "msgpack.hpp"

namespace clp::streaming_archive::single_file_archive {

using single_file_archive_format_version_t = uint32_t;

// Single file archive version.
constexpr uint8_t cArchiveMajorVersion{0};
constexpr uint8_t cArchiveMinorVersion{1};
constexpr uint16_t cArchivePatchVersion{1};
constexpr single_file_archive_format_version_t cArchiveVersion{
cArchiveMajorVersion << 24 | cArchiveMinorVersion << 16 | cArchivePatchVersion
};

static constexpr size_t cNumMagicNumberChars{4};
static constexpr std::array<uint8_t, cNumMagicNumberChars>
cUnstructuredSfaMagicNumber{'Y', 'C', 'L', 'P'};
static constexpr std::string_view cUnstructuredSfaExtension{".clp"};
static constexpr size_t cFileSizeWarningThreshold{100L * 1024 * 1024};

static constexpr size_t cNumStaticFiles{5};
constexpr std::array<char const*, cNumStaticFiles> cStaticArchiveFileNames{
cMetadataDBFileName,
cLogTypeDictFilename,
cLogTypeSegmentIndexFilename,
cVarDictFilename,
cVarSegmentIndexFilename
};

static constexpr size_t cNumUnused{6};

struct __attribute__((packed)) SingleFileArchiveHeader {
std::array<uint8_t, cNumMagicNumberChars> magic;
single_file_archive_format_version_t version;
uint64_t metadata_size;
std::array<uint64_t, cNumUnused> unused;
};

struct FileInfo {
davemarco marked this conversation as resolved.
Show resolved Hide resolved
std::string name;
uint64_t offset;
// Variables are renamed when serialized to match single-file archive specification.
MSGPACK_DEFINE_MAP(MSGPACK_NVP("n", name), MSGPACK_NVP("o", offset));
};

struct SingleFileArchiveMetadata {
std::vector<FileInfo> archive_files;
ArchiveMetadata archive_metadata;
uint64_t num_segments;
MSGPACK_DEFINE_MAP(archive_files, archive_metadata, num_segments);
};
} // namespace clp::streaming_archive::single_file_archive

#endif // CLP_STREAMING_ARCHIVE_SINGLE_FILE_ARCHIVE_DEFS_HPP
Loading
Loading