Skip to content

Commit

Permalink
feat(core-clp): Defer each archive's global metadata updates until it…
Browse files Browse the repository at this point in the history
… has been completed (resolves #685). (#705)
  • Loading branch information
davemarco authored Feb 14, 2025
1 parent 3e523d9 commit 2aa5c5c
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 27 deletions.
54 changes: 32 additions & 22 deletions components/core/src/clp/streaming_archive/writer/Archive.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,6 @@ void Archive::open(UserConfig const& user_config) {

m_global_metadata_db = user_config.global_metadata_db;

m_global_metadata_db->open();
m_global_metadata_db->add_archive(m_id_as_string, *m_local_metadata);
m_global_metadata_db->close();

m_file = nullptr;

// Open log-type dictionary
Expand Down Expand Up @@ -238,8 +234,18 @@ void Archive::close() {

m_metadata_file_writer.close();

update_global_metadata();
m_global_metadata_db = nullptr;

for (auto* file : m_file_metadata_for_global_update) {
delete file;
}
m_file_metadata_for_global_update.clear();

if (m_print_archive_stats_progress) {
print_archive_stats_progress();
}

m_metadata_db.close();

m_creator_id_as_string.clear();
Expand Down Expand Up @@ -557,8 +563,6 @@ void Archive::persist_file_metadata(vector<File*> const& files) {

m_metadata_db.update_files(files);

m_global_metadata_db->update_metadata_for_files(m_id_as_string, files);

// Mark files' metadata as clean
for (auto file : files) {
file->mark_metadata_as_clean();
Expand Down Expand Up @@ -593,16 +597,12 @@ void Archive::close_segment_and_persist_file_metadata(

for (auto file : files) {
file->mark_as_in_committed_segment();
m_file_metadata_for_global_update.emplace_back(file);
}

m_global_metadata_db->open();
persist_file_metadata(files);
update_metadata();
m_global_metadata_db->close();
update_local_metadata();

for (auto file : files) {
delete file;
}
files.clear();
}

Expand All @@ -628,23 +628,33 @@ uint64_t Archive::get_dynamic_compressed_size() {
return on_disk_size;
}

void Archive::update_metadata() {
auto Archive::print_archive_stats_progress() -> void {
nlohmann::json json_msg;
json_msg["id"] = m_id_as_string;
json_msg["uncompressed_size"] = m_local_metadata->get_uncompressed_size_bytes();
json_msg["size"] = m_local_metadata->get_compressed_size_bytes();
std::cout << json_msg.dump(-1, ' ', true, nlohmann::json::error_handler_t::ignore) << std::endl;
}

void Archive::update_local_metadata() {
m_local_metadata->set_dynamic_uncompressed_size(0);
m_local_metadata->set_dynamic_compressed_size(get_dynamic_compressed_size());
// Rewrite (overwrite) the metadata file
m_metadata_file_writer.seek_from_begin(0);
m_local_metadata->write_to_file(m_metadata_file_writer);
}

m_global_metadata_db->update_archive_metadata(m_id_as_string, *m_local_metadata);

if (m_print_archive_stats_progress) {
nlohmann::json json_msg;
json_msg["id"] = m_id_as_string;
json_msg["uncompressed_size"] = m_local_metadata->get_uncompressed_size_bytes();
json_msg["size"] = m_local_metadata->get_compressed_size_bytes();
std::cout << json_msg.dump(-1, ' ', true, nlohmann::json::error_handler_t::ignore)
<< std::endl;
auto Archive::update_global_metadata() -> void {
m_global_metadata_db->open();
if (false == m_local_metadata.has_value()) {
throw OperationFailed(ErrorCode_Failure, __FILENAME__, __LINE__);
}
m_global_metadata_db->add_archive(m_id_as_string, m_local_metadata.value());
m_global_metadata_db->update_metadata_for_files(
m_id_as_string,
m_file_metadata_for_global_update
);
m_global_metadata_db->close();
}

// Explicitly declare template specializations so that we can define the template methods in this
Expand Down
25 changes: 20 additions & 5 deletions components/core/src/clp/streaming_archive/writer/Archive.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -245,16 +245,16 @@ class Archive {
std::vector<File*>& files_in_segment
);
/**
* Writes the given files' metadata to the database using bulk writes
* Writes the given files' metadata to the local database using bulk writes.
* @param files
* @throw streaming_archive::writer::Archive::OperationFailed if failed to replace old
* metadata for any file
* @throw mongocxx::logic_error if invalid database operation is created
*/
void persist_file_metadata(std::vector<File*> const& files);
/**
* Closes a given segment, persists the metadata of the files in the segment, and cleans up
* any data remaining outside the segment
* Closes a given segment, persists metadata to local database for files in the segment, and
* cleans up any data remaining outside the segment.
* @param segment
* @param files
* @param segment_logtype_ids
Expand All @@ -274,10 +274,21 @@ class Archive {
* is closed
*/
uint64_t get_dynamic_compressed_size();

/**
* Prints archive progress statistics as a JSON object.
*/
auto print_archive_stats_progress() -> void;

/**
* Updates the archive's metadata
* Updates the archive's metadata in the local metadata database.
*/
void update_metadata();
void update_local_metadata();

/**
* Updates the archive's metadata in the global metadata database.
*/
auto update_global_metadata() -> void;

// Variables
boost::uuids::uuid m_id;
Expand Down Expand Up @@ -316,6 +327,10 @@ class Archive {
std::vector<File*> m_files_with_timestamps_in_segment;
std::vector<File*> m_files_without_timestamps_in_segment;

// Collection of `File`s which have been written to archive and deallocated, but whose
// metadata has not yet been persisted globally.
std::vector<File*> m_file_metadata_for_global_update;

size_t m_target_segment_uncompressed_size;
Segment m_segment_for_files_with_timestamps;
ArrayBackedPosIntSet<logtype_dictionary_id_t>
Expand Down

0 comments on commit 2aa5c5c

Please sign in to comment.