Skip to content

Commit

Permalink
PerformGets/Puts only when absolutely necessary
Browse files Browse the repository at this point in the history
  • Loading branch information
franzpoeschel committed Nov 20, 2023
1 parent 89e962f commit dc764c9
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 18 deletions.
6 changes: 3 additions & 3 deletions include/openPMD/IO/ADIOS/ADIOS2IOHandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -751,23 +751,23 @@ namespace detail
BufferedAction &operator=(BufferedAction const &other) = delete;
BufferedAction &operator=(BufferedAction &&other) = default;

virtual void run(BufferedActions &) = 0;
virtual bool run(BufferedActions &) = 0;
};

struct BufferedGet : BufferedAction
{
std::string name;
Parameter<Operation::READ_DATASET> param;

void run(BufferedActions &) override;
bool run(BufferedActions &) override;
};

struct BufferedPut : BufferedAction
{
std::string name;
Parameter<Operation::WRITE_DATASET> param;

void run(BufferedActions &) override;
bool run(BufferedActions &) override;
};

struct BufferedUniquePtrPut
Expand Down
112 changes: 97 additions & 15 deletions src/IO/ADIOS/ADIOS2IOHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2088,15 +2088,17 @@ namespace detail
// variable has not been found, so we don't fill in any blocks
}

void BufferedGet::run(BufferedActions &ba)
bool BufferedGet::run(BufferedActions &ba)
{
switchAdios2VariableType<detail::DatasetReader>(
param.dtype, ba.m_impl, *this, ba.m_IO, ba.getEngine(), ba.m_file);
return true;
}

void BufferedPut::run(BufferedActions &ba)
bool BufferedPut::run(BufferedActions &ba)
{
switchAdios2VariableType<detail::WriteDataset>(param.dtype, ba, *this);
return true;
}

struct RunUniquePtrPut
Expand Down Expand Up @@ -3025,9 +3027,11 @@ namespace detail
requireActiveStep();
}
}

bool needsFlush = flushUnconditionally;
for (auto &ba : m_buffer)
{
ba->run(*this);
needsFlush |= ba->run(*this);
}

if (!initializedDefaults)
Expand All @@ -3042,6 +3046,7 @@ namespace detail
for (auto &entry : m_uniquePtrPuts)
{
entry.run(*this);
needsFlush = true;
}
}

Expand All @@ -3053,7 +3058,10 @@ namespace detail
switch (level)
{
case FlushLevel::UserFlush:
performPutGets(*this, eng);
if (needsFlush)
{
performPutGets(*this, eng);
}
m_updateSpans.clear();
m_buffer.clear();
m_alreadyEnqueued.clear();
Expand Down Expand Up @@ -3087,14 +3095,39 @@ namespace detail
}
}

namespace
{
enum class WhichAPICall
{
PerformGets,
PerformPuts,
PerformDataWrite
};
}

void BufferedActions::flush_impl(
ADIOS2FlushParams flushParams, bool writeLatePuts)
{
auto decideFlushAPICall = [this, flushTarget = flushParams.flushTarget](
adios2::Engine &engine) {
#if ADIOS2_VERSION_MAJOR * 1000000000 + ADIOS2_VERSION_MINOR * 100000000 + \
ADIOS2_VERSION_PATCH * 1000000 + ADIOS2_VERSION_TWEAK >= \
2701001223

auto whichAPICall =
[this, flushTarget = flushParams.flushTarget]() -> WhichAPICall {
switch (m_mode)
{
case adios2::Mode::Read:
case adios2::Mode::ReadRandomAccess:
return WhichAPICall::PerformGets;
case adios2::Mode::Write:
case adios2::Mode::Append:
// continue below
break;
default:
throw error::Internal("[ADIOS2] Unexpected access mode.");
break;
}

bool performDataWrite{};
switch (flushTarget)
{
Expand All @@ -3109,8 +3142,20 @@ namespace detail
}
performDataWrite = performDataWrite && m_engineType == "bp5";

if (performDataWrite)
return performDataWrite ? WhichAPICall::PerformDataWrite
: WhichAPICall::PerformPuts;
};

auto decideFlushAPICall = [this, whichAPICall](adios2::Engine &engine) {
switch (whichAPICall())
{
case WhichAPICall::PerformPuts:
engine.PerformPuts();
break;
case WhichAPICall::PerformGets:
engine.PerformGets();
break;
case WhichAPICall::PerformDataWrite:
/*
* Deliberately don't write buffered attributes now since
* readers won't be able to see them before EndStep anyway,
Expand All @@ -3126,18 +3171,47 @@ namespace detail
}
engine.PerformDataWrite();
m_uniquePtrPuts.clear();
break;
}
else
{
engine.PerformPuts();
}
};

#else
(void)this;
(void)flushTarget;
engine.PerformPuts();
auto whichAPICall = [this,
flushTarget = flushParams.flushTarget]() -> bool {
switch (m_mode)
{
case adios2::Mode::Read:
#if HAS_ADIOS_2_8
case adios2::Mode::ReadRandomAccess:
#endif
return WhichAPICall::PerformGets;
case adios2::Mode::Write:
case adios2::Mode::Append:
return WhichAPICall::PerformPuts;
default:
throw error::Internal("[ADIOS2] Unexpected access mode.");
break;
}
};

auto decideFlushAPICall =
[this, performDataWrites](adios2::Engine &engine, whichAPICall) {
(void)this;
(void)flushTarget;
switch (whichAPICall())
{
case WhichAPICall::PerformPuts:
engine.PerformPuts();
break;
case WhichAPICall::PerformGets:
engine.PerformGets();
break;
case WhichAPICall::PerformDataWrite:
throw std::runtime_error("");
}
};
#endif

flush_impl(
flushParams,
[decideFlushAPICall = std::move(decideFlushAPICall)](
Expand All @@ -3152,7 +3226,15 @@ namespace detail
}
},
writeLatePuts,
/* flushUnconditionally = */ false);
/*
* performDataWrites() is collective, so flushes should always run
* in that case. Otherwise, they can be skipped.
* Since EndStep and BeginStep are collective, we need only pay
* attention to this while a step is active.
*/
/* flushUnconditionally = */ whichAPICall() ==
WhichAPICall::PerformDataWrite &&
streamStatus != StreamStatus::OutsideOfStep);
}

AdvanceStatus
Expand Down

0 comments on commit dc764c9

Please sign in to comment.