diff --git a/README.md b/README.md index a76fdbb..fa4e096 100644 --- a/README.md +++ b/README.md @@ -21,6 +21,7 @@ * [select](#pyreindexer.rx_connector.RxConnector.select) * [new\_transaction](#pyreindexer.rx_connector.RxConnector.new_transaction) * [new\_query](#pyreindexer.rx_connector.RxConnector.new_query) + * [with\_timeout](#pyreindexer.rx_connector.RxConnector.with_timeout) * [pyreindexer.query\_results](#pyreindexer.query_results) * [QueryResults](#pyreindexer.query_results.QueryResults) * [status](#pyreindexer.query_results.QueryResults.status) @@ -465,6 +466,22 @@ Creates a new query and return the query object to processing #### Raises: Exception: Raises with an error message when Reindexer instance is not initialized yet + + +### RxConnector.with\_timeout + +```python +def with_timeout(timeout: int) -> RxConnector +``` + +Add execution timeout to the next query + +#### Arguments: + timeout (int): Optional server-side execution timeout for each subquery [milliseconds] + +#### Returns: + (:obj:`RxConnector`): RxConnector object for further customizations + # pyreindexer.query\_results diff --git a/pyreindexer/lib/src/rawpyreindexer.cc b/pyreindexer/lib/src/rawpyreindexer.cc index 9cec0bf..d4af914 100644 --- a/pyreindexer/lib/src/rawpyreindexer.cc +++ b/pyreindexer/lib/src/rawpyreindexer.cc @@ -138,8 +138,9 @@ static PyObject* WithTimeout(PyObject* self, PyObject* args) { return nullptr; } - auto err = getWrapper(rx)->WithTimeout(std::chrono::milliseconds(timeout)); - return pyErr(err); + getWrapper(rx)->WithTimeout(std::chrono::milliseconds(timeout)); + + Py_RETURN_NONE; } // namespace ---------------------------------------------------------------------------------------------------------- diff --git a/pyreindexer/lib/src/reindexerinterface.cc b/pyreindexer/lib/src/reindexerinterface.cc index 3b7856c..7fd32d2 100644 --- a/pyreindexer/lib/src/reindexerinterface.cc +++ b/pyreindexer/lib/src/reindexerinterface.cc @@ -40,15 +40,14 @@ class GenericCommand : public ICommand { template <> ReindexerInterface::ReindexerInterface(const ReindexerConfig& cfg) - : db_(std::make_unique(reindexer::ReindexerConfig() - .WithUpdatesSize(cfg.maxReplUpdatesSize) - .WithAllocatorCacheLimits(cfg.allocatorCacheLimit, cfg.allocatorCachePart))) + : db_(reindexer::ReindexerConfig().WithUpdatesSize(cfg.maxReplUpdatesSize) + .WithAllocatorCacheLimits(cfg.allocatorCacheLimit, cfg.allocatorCachePart)) { } template <> ReindexerInterface::ReindexerInterface(const ReindexerConfig& cfg) - : db_(std::make_unique(reindexer::client::ReindexerConfig(4, 1, cfg.fetchAmount, - 0, cfg.connectTimeout, cfg.requestTimeout, cfg.enableCompression, cfg.requestDedicatedThread, cfg.appName))) + : db_(reindexer::client::ReindexerConfig(4, 1, cfg.fetchAmount, 0, + cfg.connectTimeout, cfg.requestTimeout, cfg.enableCompression, cfg.requestDedicatedThread, cfg.appName)) { std::atomic_bool running{false}; executionThr_ = std::thread([this, &running] { @@ -101,15 +100,6 @@ Error ReindexerInterface::Select(const std::string& query, QueryResultsWrap }); } -template -Error ReindexerInterface::WithTimeout(std::chrono::milliseconds timeout) { - return execute([this, timeout] { - auto db = db_->WithTimeout(timeout); - db_ = std::make_unique(std::move(db)); - return errOK; - }); -} - template Error ReindexerInterface::FetchResults(QueryResultsWrapper& result) { return execute([&result] { @@ -128,6 +118,125 @@ Error ReindexerInterface::StartTransaction(std::string_view ns, Transaction }); } +template +Error ReindexerInterface::openNamespace(std::string_view ns) { + auto err = db_.WithTimeout(timeout_).OpenNamespace({ns.data(), ns.size()}); + timeout_ = std::chrono::milliseconds{0}; + return err; +} + +template +Error ReindexerInterface::closeNamespace(std::string_view ns) { + auto err = db_.WithTimeout(timeout_).CloseNamespace({ns.data(), ns.size()}); + timeout_ = std::chrono::milliseconds{0}; + return err; +} + +template +Error ReindexerInterface::dropNamespace(std::string_view ns) { + auto err = db_.WithTimeout(timeout_).DropNamespace(ns); + timeout_ = std::chrono::milliseconds{0}; + return err; +} + +template +Error ReindexerInterface::addIndex(std::string_view ns, const IndexDef& idx) { + auto err = db_.WithTimeout(timeout_).AddIndex(ns, idx); + timeout_ = std::chrono::milliseconds{0}; + return err; +} + +template +Error ReindexerInterface::updateIndex(std::string_view ns, const IndexDef& idx) { + auto err = db_.WithTimeout(timeout_).UpdateIndex(ns, idx); + timeout_ = std::chrono::milliseconds{0}; + return err; +} + +template +Error ReindexerInterface::dropIndex(std::string_view ns, const IndexDef& idx) { + auto err = db_.WithTimeout(timeout_).DropIndex({ns.data(), ns.size()}, idx); + timeout_ = std::chrono::milliseconds{0}; + return err; +} + +template +typename DBT::ItemT ReindexerInterface::newItem(std::string_view ns) { + auto item = db_.WithTimeout(timeout_).NewItem({ns.data(), ns.size()}); + timeout_ = std::chrono::milliseconds{0}; + return item; +} + +template +Error ReindexerInterface::insert(std::string_view ns, typename DBT::ItemT& item) { + auto err = db_.WithTimeout(timeout_).Insert({ns.data(), ns.size()}, item); + timeout_ = std::chrono::milliseconds{0}; + return err; +} + +template +Error ReindexerInterface::upsert(std::string_view ns, typename DBT::ItemT& item) { + auto err = db_.WithTimeout(timeout_).Upsert({ns.data(), ns.size()}, item); + timeout_ = std::chrono::milliseconds{0}; + return err; +} + +template +Error ReindexerInterface::update(std::string_view ns, typename DBT::ItemT& item) { + auto err = db_.WithTimeout(timeout_).Update({ns.data(), ns.size()}, item); + timeout_ = std::chrono::milliseconds{0}; + return err; +} + +template +Error ReindexerInterface::deleteItem(std::string_view ns, typename DBT::ItemT& item) { + auto err = db_.WithTimeout(timeout_).Delete({ns.data(), ns.size()}, item); + timeout_ = std::chrono::milliseconds{0}; + return err; +} + +template +Error ReindexerInterface::putMeta(std::string_view ns, const std::string& key, std::string_view data) { + auto err = db_.WithTimeout(timeout_).PutMeta(ns, key, data); + timeout_ = std::chrono::milliseconds{0}; + return err; +} + +template +Error ReindexerInterface::getMeta(std::string_view ns, const std::string& key, std::string& data) { + auto err = db_.WithTimeout(timeout_).GetMeta(ns, key, data); + timeout_ = std::chrono::milliseconds{0}; + return err; +} + +template +Error ReindexerInterface::deleteMeta(std::string_view ns, const std::string& key) { + auto err = db_.WithTimeout(timeout_).DeleteMeta(ns, key); + timeout_ = std::chrono::milliseconds{0}; + return err; +} + +template +Error ReindexerInterface::enumMeta(std::string_view ns, std::vector& keys) { + auto err = db_.WithTimeout(timeout_).EnumMeta(ns, keys); + timeout_ = std::chrono::milliseconds{0}; + return err; +} + +template +Error ReindexerInterface::select(const std::string& query, typename DBT::QueryResultsT& result) { + auto err = db_.WithTimeout(timeout_).Select(query, result); + timeout_ = std::chrono::milliseconds{0}; + return err; +} + +template +Error ReindexerInterface::enumNamespaces(std::vector& defs, EnumNamespacesOpts opts) { + auto err = db_.WithTimeout(timeout_).EnumNamespaces(defs, opts); + timeout_ = std::chrono::milliseconds{0}; + return err; +} + template <> Error ReindexerInterface::modify(reindexer::Transaction& transaction, reindexer::Item&& item, ItemModifyMode mode) { @@ -140,18 +249,34 @@ Error ReindexerInterface::modify(reindexer::cl return transaction.Modify(std::move(item), mode); } +template +typename DBT::TransactionT ReindexerInterface::startTransaction(std::string_view ns) { + auto transaction = db_.WithTimeout(timeout_).NewTransaction(ns); + timeout_ = std::chrono::milliseconds{0}; + return transaction; +} + template Error ReindexerInterface::commitTransaction(typename DBT::TransactionT& transaction, size_t& count) { typename DBT::QueryResultsT qres(QRESULTS_FLAGS); - auto err = db_->CommitTransaction(transaction, qres); + auto err = db_.WithTimeout(timeout_).CommitTransaction(transaction, qres); + timeout_ = std::chrono::milliseconds{0}; count = qres.Count(); return err; } +template +Error ReindexerInterface::rollbackTransaction(typename DBT::TransactionT& tr) { + auto err = db_.WithTimeout(timeout_).RollBackTransaction(tr); + timeout_ = std::chrono::milliseconds{0}; + return err; +} + template Error ReindexerInterface::selectQuery(const reindexer::Query& query, QueryResultsWrapper& result) { typename DBT::QueryResultsT qres(QRESULTS_FLAGS); - auto err = db_->Select(query, qres); + auto err = db_.WithTimeout(timeout_).Select(query, qres); + timeout_ = std::chrono::milliseconds{0}; result.Wrap(std::move(qres)); return err; } @@ -159,7 +284,8 @@ Error ReindexerInterface::selectQuery(const reindexer::Query& query, QueryR template Error ReindexerInterface::deleteQuery(const reindexer::Query& query, size_t& count) { typename DBT::QueryResultsT qres; - auto err = db_->Delete(query, qres); + auto err = db_.WithTimeout(timeout_).Delete(query, qres); + timeout_ = std::chrono::milliseconds{0}; count = qres.Count(); return err; } @@ -167,7 +293,8 @@ Error ReindexerInterface::deleteQuery(const reindexer::Query& query, size_t template Error ReindexerInterface::updateQuery(const reindexer::Query& query, QueryResultsWrapper& result) { typename DBT::QueryResultsT qres(QRESULTS_FLAGS); - auto err = db_->Update(query, qres); + auto err = db_.WithTimeout(timeout_).Update(query, qres); + timeout_ = std::chrono::milliseconds{0}; result.Wrap(std::move(qres)); return err; } @@ -190,12 +317,16 @@ Error ReindexerInterface::execute(std::functio template <> Error ReindexerInterface::connect(const std::string& dsn) { - return db_->Connect(dsn); + auto err = db_.WithTimeout(timeout_).Connect(dsn); + timeout_ = std::chrono::milliseconds{0}; + return err; } template <> Error ReindexerInterface::connect(const std::string& dsn) { - return db_->Connect(dsn, loop_, reindexer::client::ConnectOpts().CreateDBIfMissing()); + auto err = db_.WithTimeout(timeout_).Connect(dsn, loop_, reindexer::client::ConnectOpts().CreateDBIfMissing()); + timeout_ = std::chrono::milliseconds{0}; + return err; } template <> @@ -205,7 +336,7 @@ Error ReindexerInterface::stop() { template <> Error ReindexerInterface::stop() { - db_->Stop(); + db_.Stop(); stopCh_.close(); return errOK; } diff --git a/pyreindexer/lib/src/reindexerinterface.h b/pyreindexer/lib/src/reindexerinterface.h index 4118c5e..7b4754f 100644 --- a/pyreindexer/lib/src/reindexerinterface.h +++ b/pyreindexer/lib/src/reindexerinterface.h @@ -95,7 +95,7 @@ class ReindexerInterface { return execute([this, ns, &keys] { return enumMeta(ns, keys); }); } Error Select(const std::string& query, QueryResultsWrapper& result); - Error WithTimeout(std::chrono::milliseconds timeout); + void WithTimeout(std::chrono::milliseconds timeout) { timeout_ = timeout; } Error EnumNamespaces(std::vector& defs, EnumNamespacesOpts opts) { return execute([this, &defs, &opts] { return enumNamespaces(defs, opts); }); } @@ -132,50 +132,34 @@ class ReindexerInterface { Error execute(std::function f); Error connect(const std::string& dsn); - Error openNamespace(std::string_view ns) { return db_->OpenNamespace({ns.data(), ns.size()}); } - Error closeNamespace(std::string_view ns) { return db_->CloseNamespace({ns.data(), ns.size()}); } - Error dropNamespace(std::string_view ns) { return db_->DropNamespace({ns.data(), ns.size()}); } - Error addIndex(std::string_view ns, const IndexDef& idx) { return db_->AddIndex({ns.data(), ns.size()}, idx); } - Error updateIndex(std::string_view ns, const IndexDef& idx) { - return db_->UpdateIndex({ns.data(), ns.size()}, idx); - } - Error dropIndex(std::string_view ns, const IndexDef& idx) { return db_->DropIndex({ns.data(), ns.size()}, idx); } - typename DBT::ItemT newItem(std::string_view ns) { return db_->NewItem({ns.data(), ns.size()}); } - Error insert(std::string_view ns, typename DBT::ItemT& item) { return db_->Insert({ns.data(), ns.size()}, item); } - Error upsert(std::string_view ns, typename DBT::ItemT& item) { return db_->Upsert({ns.data(), ns.size()}, item); } - Error update(std::string_view ns, typename DBT::ItemT& item) { return db_->Update({ns.data(), ns.size()}, item); } - Error deleteItem(std::string_view ns, typename DBT::ItemT& item) { - return db_->Delete({ns.data(), ns.size()}, item); - } - Error putMeta(std::string_view ns, const std::string& key, std::string_view data) { - return db_->PutMeta({ns.data(), ns.size()}, key, {data.data(), data.size()}); - } - Error getMeta(std::string_view ns, const std::string& key, std::string& data) { - return db_->GetMeta({ns.data(), ns.size()}, key, data); - } - Error deleteMeta(std::string_view ns, const std::string& key) { - return db_->DeleteMeta({ns.data(), ns.size()}, key); - } - Error enumMeta(std::string_view ns, std::vector& keys) { - return db_->EnumMeta({ns.data(), ns.size()}, keys); - } - Error select(const std::string& query, typename DBT::QueryResultsT& result) { return db_->Select(query, result); } - Error enumNamespaces(std::vector& defs, EnumNamespacesOpts opts) { - return db_->EnumNamespaces(defs, opts); - } - typename DBT::TransactionT startTransaction(std::string_view ns) { - return db_->NewTransaction({ns.data(), ns.size()}); - } + Error openNamespace(std::string_view ns); + Error closeNamespace(std::string_view ns); + Error dropNamespace(std::string_view ns); + Error addIndex(std::string_view ns, const IndexDef& idx); + Error updateIndex(std::string_view ns, const IndexDef& idx); + Error dropIndex(std::string_view ns, const IndexDef& idx); + typename DBT::ItemT newItem(std::string_view ns); + Error insert(std::string_view ns, typename DBT::ItemT& item); + Error upsert(std::string_view ns, typename DBT::ItemT& item); + Error update(std::string_view ns, typename DBT::ItemT& item); + Error deleteItem(std::string_view ns, typename DBT::ItemT& item); + Error putMeta(std::string_view ns, const std::string& key, std::string_view data); + Error getMeta(std::string_view ns, const std::string& key, std::string& data); + Error deleteMeta(std::string_view ns, const std::string& key); + Error enumMeta(std::string_view ns, std::vector& keys); + Error select(const std::string& query, typename DBT::QueryResultsT& result); + Error enumNamespaces(std::vector& defs, EnumNamespacesOpts opts); + typename DBT::TransactionT startTransaction(std::string_view ns); typename DBT::ItemT newItem(typename DBT::TransactionT& tr) { return tr.NewItem(); } Error modify(typename DBT::TransactionT& tr, typename DBT::ItemT&& item, ItemModifyMode mode); Error commitTransaction(typename DBT::TransactionT& transaction, size_t& count); - Error rollbackTransaction(typename DBT::TransactionT& tr) { return db_->RollBackTransaction(tr); } + Error rollbackTransaction(typename DBT::TransactionT& tr); Error selectQuery(const Query& query, QueryResultsWrapper& result); Error deleteQuery(const Query& query, size_t& count); Error updateQuery(const Query& query, QueryResultsWrapper& result); Error stop(); - std::unique_ptr db_; + DBT db_; std::thread executionThr_; reindexer::net::ev::dynamic_loop loop_; ICommand* curCmd_{nullptr}; @@ -183,6 +167,7 @@ class ReindexerInterface { std::mutex mtx_; std::condition_variable condVar_; reindexer::coroutine::channel stopCh_; + std::chrono::milliseconds timeout_{0}; }; } // namespace pyreindexer diff --git a/pyreindexer/rx_connector.py b/pyreindexer/rx_connector.py index d6c1d31..ba9d7e5 100644 --- a/pyreindexer/rx_connector.py +++ b/pyreindexer/rx_connector.py @@ -395,7 +395,6 @@ def new_query(self, namespace: str) -> Query: self.err_code, self.err_msg, query_wrapper_ptr = self.api.create_query(self.rx, namespace) return Query(self.api, query_wrapper_ptr) - @raise_if_error def with_timeout(self, timeout: int) -> RxConnector: """Add execution timeout to the next query