Skip to content

Commit

Permalink
Documentation.
Browse files Browse the repository at this point in the history
  • Loading branch information
jtv committed Jan 30, 2020
1 parent a3f3b78 commit 872c23e
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 33 deletions.
55 changes: 28 additions & 27 deletions include/pqxx/pipeline.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,19 @@

namespace pqxx
{
/// Processes several queries in FIFO manner, optimized for high throughput
/** Use a pipeline if you want to execute queries without always sitting still
* while they execute. Result retrieval is decoupled from execution request;
* queries "go in at the front" and results "come out the back." Actually
* results may be retrieved in any order, if you want.
/// Processes several queries in FIFO manner, optimized for high throughput.
/** Use a pipeline if you want to keep doing useful work while your queries are
* executing. Result retrieval is decoupled from execution request; queries
* "go in at the front" and results "come out the back."
*
* Feel free to pump as many queries into the pipeline as possible, even if
* they were generated after looking at a result from the same pipeline. To
* get the best possible throughput, try to make insertion of queries run as
* far ahead of results retrieval as possible; issue each query as early as
* possible and retrieve their results as late as possible, so the pipeline has
* as many ongoing queries as possible at any given time. In other words, keep
* it busy!
* Actually, you can retrieve the results in any order if you want, but it may
* lead to surprising "time travel" effects if any of the queries fails. In
* particular, syntax errors in the queries can confuse things and show up too
* early in the stream of results.
*
* One warning: if any of the queries you insert leads to a syntactic error,
* the error may be returned as if it were generated by an older query. Future
* versions may try to work around this if working in a nontransaction.
* Generally, if any of the queries fails, it will throw an exception at the
* point where you request its result. But it may happen earlier, especially
* if you request results out of chronological order.
*/
class PQXX_LIBEXPORT pipeline : public internal::transactionfocus
{
Expand Down Expand Up @@ -79,19 +75,24 @@ public:
~pipeline() noexcept;

/// Add query to the pipeline.
/** Queries are accumulated in the pipeline and sent to the backend in a
* concatenated format, separated by semicolons. The queries you insert must
* not use this construct themselves, or the pipeline will get hopelessly
* confused!
* @return Identifier for this query, unique only within this pipeline
/** Queries accumulate in the pipeline, which sends them to the backend in a
* batch separated by semicolons. The queries you insert must not use this
* trick themselves, or the pipeline will get hopelessly confused!
*
* @return Identifier for this query, unique only within this pipeline.
*/
query_id insert(std::string_view);

/// Wait for all ongoing or pending operations to complete.
/** Detaches from the transaction when done. */
/// Wait for all ongoing or pending operations to complete, and detach.
/** Detaches from the transaction when done.
*
* This does not produce the queries' results, so it may not report any
* errors which may have occurred in their execution. To be sure that your
* statements succeeded, call @c retrieve() until the pipeline is empty.
*/
void complete();

/// Forget all ongoing or pending operations and retrieved results
/// Forget all ongoing or pending operations and retrieved results.
/** Queries already sent to the backend may still be completed, depending
* on implementation and timing.
*
Expand Down Expand Up @@ -129,14 +130,14 @@ public:
return retrieve(m_queries.find(qid)).second;
}

/// Retrieve oldest unretrieved result (possibly wait for one)
/** @return The query's identifier and its result set */
/// Retrieve oldest unretrieved result (possibly wait for one).
/** @return The query's identifier and its result set. */
std::pair<query_id, result> retrieve();

[[nodiscard]] bool empty() const noexcept { return m_queries.empty(); }

/// Set maximum number of queries to retain before issuing them to the
/// backend
/// backend.
/** The pipeline will perform better if multiple queries are issued at once,
* but retaining queries until the results are needed (as opposed to issuing
* them to the backend immediately) may negate any performance benefits the
Expand All @@ -151,7 +152,7 @@ public:
int retain(int retain_max = 2);


/// Resume retained query emission (harmless when not needed)
/// Resume retained query emission. Harmless when not needed.
void resume();

private:
Expand Down
13 changes: 7 additions & 6 deletions src/pipeline.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ bool pqxx::pipeline::obtain_result(bool expect_none)
"Got more results from pipeline than there were queries."};
}

// Must be the result for the oldest pending query
// Must be the result for the oldest pending query.
if (not m_issuedrange.first->second.get_result().empty())
internal_error("Multiple results for one query.");

Expand Down Expand Up @@ -294,6 +294,8 @@ void pqxx::pipeline::obtain_dummy()
return;
}

// XXX: Do we actually know that the queries did not execute?
// XXX: Can we actually re-issue statements after a failure?
/* Since none of the queries in the batch were actually executed, we can
* afford to replay them one by one until we find the exact query that
* caused the error. This gives us not only a more specific error message
Expand All @@ -310,7 +312,6 @@ void pqxx::pipeline::obtain_dummy()
// Retrieve that null result for the last query, if needed
obtain_result(true);


// Reset internal state to forget botched batch attempt
m_num_waiting += check_cast<int>(
std::distance(m_issuedrange.first, stop), "pipeline obtain_dummy()");
Expand Down Expand Up @@ -351,7 +352,7 @@ pqxx::pipeline::retrieve(pipeline::QueryMap::iterator q)
throw std::runtime_error{
"Could not complete query in pipeline due to error in earlier query."};

// If query hasn't issued yet, do it now
// If query hasn't issued yet, do it now.
if (
m_issuedrange.second != m_queries.end() and
(q->first >= m_issuedrange.second->first))
Expand All @@ -362,7 +363,7 @@ pqxx::pipeline::retrieve(pipeline::QueryMap::iterator q)
issue();
}

// If result not in yet, get it; else get at least whatever's convenient
// If result not in yet, get it; else get at least whatever's convenient.
if (have_pending())
{
if (q->first >= m_issuedrange.first->first)
Expand All @@ -381,7 +382,7 @@ pqxx::pipeline::retrieve(pipeline::QueryMap::iterator q)
throw std::runtime_error{
"Could not complete query in pipeline due to error in earlier query."};

// Don't leave the backend idle if there are queries waiting to be issued
// Don't leave the backend idle if there are queries waiting to be issued.
if (m_num_waiting and not have_pending() and (m_error == qid_limit()))
issue();

Expand Down Expand Up @@ -428,7 +429,7 @@ void pqxx::pipeline::receive(pipeline::QueryMap::const_iterator stop)
QueryMap::const_iterator{m_issuedrange.first} != stop)
;

// Also haul in any remaining "targets of opportunity"
// Also haul in any remaining "targets of opportunity".
if (QueryMap::const_iterator{m_issuedrange.first} == stop)
get_further_available_results();
}

0 comments on commit 872c23e

Please sign in to comment.