Skip to content

Commit

Permalink
[ntuple] Use CommitSealedPagesV in RNTupleMerger
Browse files Browse the repository at this point in the history
This uses a bit more memory because all sealed pages of a cluster
must be kept in memory, but is up to a factor 2x faster.
  • Loading branch information
hahnjo committed Apr 10, 2024
1 parent 8a78465 commit 05203ae
Showing 1 changed file with 19 additions and 3 deletions.
22 changes: 19 additions & 3 deletions tree/ntuple/v7/src/RNTupleMerger.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
#include <TFile.h>
#include <TKey.h>

#include <deque>

Long64_t ROOT::Experimental::RNTuple::Merge(TCollection *inputs, TFileMergeInfo *mergeInfo)
{
// Check the inputs
Expand Down Expand Up @@ -193,6 +195,12 @@ void ROOT::Experimental::Internal::RNTupleMerger::Merge(std::span<RPageSource *>
while (clusterId != ROOT::Experimental::kInvalidDescriptorId) {
auto &cluster = descriptor->GetClusterDescriptor(clusterId);

std::vector<std::unique_ptr<unsigned char[]>> buffers;
// We use a std::deque so that references to the contained SealedPageSequence_t, and its iterators, are never
// invalidated.
std::deque<RPageStorage::SealedPageSequence_t> sealedPagesV;
std::vector<RPageStorage::RSealedPageGroup> sealedPageGroups;

for (const auto &column : columns) {

// See if this cluster contains this column
Expand All @@ -206,6 +214,8 @@ void ROOT::Experimental::Internal::RNTupleMerger::Merge(std::span<RPageSource *>
const auto &pages = cluster.GetPageRange(columnId);
size_t idx{0};

RPageStorage::SealedPageSequence_t sealedPages;

// Loop over the pages
for (const auto &pageInfo : pages.fPageInfos) {

Expand All @@ -222,17 +232,23 @@ void ROOT::Experimental::Internal::RNTupleMerger::Merge(std::span<RPageSource *>
sealedPage.fBuffer = buffer.get();
source->LoadSealedPage(columnId, clusterIndex, sealedPage);

// Now commit this page to the output
// Can we do this w/ a CommitSealedPageV
destination.CommitSealedPage(column.fColumnOutputId, sealedPage);
buffers.push_back(std::move(buffer));
sealedPages.push_back(std::move(sealedPage));

// Move on to the next index
idx += pageInfo.fNElements;

} // end of loop over pages

sealedPagesV.push_back(std::move(sealedPages));
sealedPageGroups.emplace_back(column.fColumnOutputId, sealedPagesV.back().cbegin(),
sealedPagesV.back().cend());

} // end of loop over columns

// Now commit all pages to the output
destination.CommitSealedPageV(sealedPageGroups);

// Commit the clusters
destination.CommitCluster(cluster.GetNEntries());

Expand Down

0 comments on commit 05203ae

Please sign in to comment.