Skip to content

Commit

Permalink
Add with_timeout func. Part II
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexander.A,Utkin committed Dec 5, 2024
1 parent fb805a3 commit c7c3618
Show file tree
Hide file tree
Showing 5 changed files with 194 additions and 61 deletions.
17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
* [select](#pyreindexer.rx_connector.RxConnector.select)
* [new\_transaction](#pyreindexer.rx_connector.RxConnector.new_transaction)
* [new\_query](#pyreindexer.rx_connector.RxConnector.new_query)
* [with\_timeout](#pyreindexer.rx_connector.RxConnector.with_timeout)
* [pyreindexer.query\_results](#pyreindexer.query_results)
* [QueryResults](#pyreindexer.query_results.QueryResults)
* [status](#pyreindexer.query_results.QueryResults.status)
Expand Down Expand Up @@ -465,6 +466,22 @@ Creates a new query and return the query object to processing
#### Raises:
Exception: Raises with an error message when Reindexer instance is not initialized yet

<a id="pyreindexer.rx_connector.RxConnector.with_timeout"></a>

### RxConnector.with\_timeout

```python
def with_timeout(timeout: int) -> RxConnector
```

Add execution timeout to the next query

#### Arguments:
timeout (int): Optional server-side execution timeout for each subquery [milliseconds]

#### Returns:
(:obj:`RxConnector`): RxConnector object for further customizations

<a id="pyreindexer.query_results"></a>

# pyreindexer.query\_results
Expand Down
5 changes: 3 additions & 2 deletions pyreindexer/lib/src/rawpyreindexer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,9 @@ static PyObject* WithTimeout(PyObject* self, PyObject* args) {
return nullptr;
}

auto err = getWrapper<DBInterface>(rx)->WithTimeout(std::chrono::milliseconds(timeout));
return pyErr(err);
getWrapper<DBInterface>(rx)->WithTimeout(std::chrono::milliseconds(timeout));

Py_RETURN_NONE;
}

// namespace ----------------------------------------------------------------------------------------------------------
Expand Down
173 changes: 152 additions & 21 deletions pyreindexer/lib/src/reindexerinterface.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,14 @@ class GenericCommand : public ICommand {

template <>
ReindexerInterface<reindexer::Reindexer>::ReindexerInterface(const ReindexerConfig& cfg)
: db_(std::make_unique<reindexer::Reindexer>(reindexer::ReindexerConfig()
.WithUpdatesSize(cfg.maxReplUpdatesSize)
.WithAllocatorCacheLimits(cfg.allocatorCacheLimit, cfg.allocatorCachePart)))
: db_(reindexer::ReindexerConfig().WithUpdatesSize(cfg.maxReplUpdatesSize)
.WithAllocatorCacheLimits(cfg.allocatorCacheLimit, cfg.allocatorCachePart))
{ }

template <>
ReindexerInterface<reindexer::client::CoroReindexer>::ReindexerInterface(const ReindexerConfig& cfg)
: db_(std::make_unique<reindexer::client::CoroReindexer>(reindexer::client::ReindexerConfig(4, 1, cfg.fetchAmount,
0, cfg.connectTimeout, cfg.requestTimeout, cfg.enableCompression, cfg.requestDedicatedThread, cfg.appName)))
: db_(reindexer::client::ReindexerConfig(4, 1, cfg.fetchAmount, 0,
cfg.connectTimeout, cfg.requestTimeout, cfg.enableCompression, cfg.requestDedicatedThread, cfg.appName))
{
std::atomic_bool running{false};
executionThr_ = std::thread([this, &running] {
Expand Down Expand Up @@ -101,15 +100,6 @@ Error ReindexerInterface<DBT>::Select(const std::string& query, QueryResultsWrap
});
}

template <typename DBT>
Error ReindexerInterface<DBT>::WithTimeout(std::chrono::milliseconds timeout) {
return execute([this, timeout] {
auto db = db_->WithTimeout(timeout);
db_ = std::make_unique<DBT>(std::move(db));
return errOK;
});
}

template <typename DBT>
Error ReindexerInterface<DBT>::FetchResults(QueryResultsWrapper& result) {
return execute([&result] {
Expand All @@ -128,6 +118,125 @@ Error ReindexerInterface<DBT>::StartTransaction(std::string_view ns, Transaction
});
}

template <typename DBT>
Error ReindexerInterface<DBT>::openNamespace(std::string_view ns) {
auto err = db_.WithTimeout(timeout_).OpenNamespace({ns.data(), ns.size()});
timeout_ = std::chrono::milliseconds{0};
return err;
}

template <typename DBT>
Error ReindexerInterface<DBT>::closeNamespace(std::string_view ns) {
auto err = db_.WithTimeout(timeout_).CloseNamespace({ns.data(), ns.size()});
timeout_ = std::chrono::milliseconds{0};
return err;
}

template <typename DBT>
Error ReindexerInterface<DBT>::dropNamespace(std::string_view ns) {
auto err = db_.WithTimeout(timeout_).DropNamespace(ns);
timeout_ = std::chrono::milliseconds{0};
return err;
}

template <typename DBT>
Error ReindexerInterface<DBT>::addIndex(std::string_view ns, const IndexDef& idx) {
auto err = db_.WithTimeout(timeout_).AddIndex(ns, idx);
timeout_ = std::chrono::milliseconds{0};
return err;
}

template <typename DBT>
Error ReindexerInterface<DBT>::updateIndex(std::string_view ns, const IndexDef& idx) {
auto err = db_.WithTimeout(timeout_).UpdateIndex(ns, idx);
timeout_ = std::chrono::milliseconds{0};
return err;
}

template <typename DBT>
Error ReindexerInterface<DBT>::dropIndex(std::string_view ns, const IndexDef& idx) {
auto err = db_.WithTimeout(timeout_).DropIndex({ns.data(), ns.size()}, idx);
timeout_ = std::chrono::milliseconds{0};
return err;
}

template <typename DBT>
typename DBT::ItemT ReindexerInterface<DBT>::newItem(std::string_view ns) {
auto item = db_.WithTimeout(timeout_).NewItem({ns.data(), ns.size()});
timeout_ = std::chrono::milliseconds{0};
return item;
}

template <typename DBT>
Error ReindexerInterface<DBT>::insert(std::string_view ns, typename DBT::ItemT& item) {
auto err = db_.WithTimeout(timeout_).Insert({ns.data(), ns.size()}, item);
timeout_ = std::chrono::milliseconds{0};
return err;
}

template <typename DBT>
Error ReindexerInterface<DBT>::upsert(std::string_view ns, typename DBT::ItemT& item) {
auto err = db_.WithTimeout(timeout_).Upsert({ns.data(), ns.size()}, item);
timeout_ = std::chrono::milliseconds{0};
return err;
}

template <typename DBT>
Error ReindexerInterface<DBT>::update(std::string_view ns, typename DBT::ItemT& item) {
auto err = db_.WithTimeout(timeout_).Update({ns.data(), ns.size()}, item);
timeout_ = std::chrono::milliseconds{0};
return err;
}

template <typename DBT>
Error ReindexerInterface<DBT>::deleteItem(std::string_view ns, typename DBT::ItemT& item) {
auto err = db_.WithTimeout(timeout_).Delete({ns.data(), ns.size()}, item);
timeout_ = std::chrono::milliseconds{0};
return err;
}

template <typename DBT>
Error ReindexerInterface<DBT>::putMeta(std::string_view ns, const std::string& key, std::string_view data) {
auto err = db_.WithTimeout(timeout_).PutMeta(ns, key, data);
timeout_ = std::chrono::milliseconds{0};
return err;
}

template <typename DBT>
Error ReindexerInterface<DBT>::getMeta(std::string_view ns, const std::string& key, std::string& data) {
auto err = db_.WithTimeout(timeout_).GetMeta(ns, key, data);
timeout_ = std::chrono::milliseconds{0};
return err;
}

template <typename DBT>
Error ReindexerInterface<DBT>::deleteMeta(std::string_view ns, const std::string& key) {
auto err = db_.WithTimeout(timeout_).DeleteMeta(ns, key);
timeout_ = std::chrono::milliseconds{0};
return err;
}

template <typename DBT>
Error ReindexerInterface<DBT>::enumMeta(std::string_view ns, std::vector<std::string>& keys) {
auto err = db_.WithTimeout(timeout_).EnumMeta(ns, keys);
timeout_ = std::chrono::milliseconds{0};
return err;
}

template <typename DBT>
Error ReindexerInterface<DBT>::select(const std::string& query, typename DBT::QueryResultsT& result) {
auto err = db_.WithTimeout(timeout_).Select(query, result);
timeout_ = std::chrono::milliseconds{0};
return err;
}

template <typename DBT>
Error ReindexerInterface<DBT>::enumNamespaces(std::vector<NamespaceDef>& defs, EnumNamespacesOpts opts) {
auto err = db_.WithTimeout(timeout_).EnumNamespaces(defs, opts);
timeout_ = std::chrono::milliseconds{0};
return err;
}

template <>
Error ReindexerInterface<reindexer::Reindexer>::modify(reindexer::Transaction& transaction,
reindexer::Item&& item, ItemModifyMode mode) {
Expand All @@ -140,34 +249,52 @@ Error ReindexerInterface<reindexer::client::CoroReindexer>::modify(reindexer::cl
return transaction.Modify(std::move(item), mode);
}

template <typename DBT>
typename DBT::TransactionT ReindexerInterface<DBT>::startTransaction(std::string_view ns) {
auto transaction = db_.WithTimeout(timeout_).NewTransaction(ns);
timeout_ = std::chrono::milliseconds{0};
return transaction;
}

template <typename DBT>
Error ReindexerInterface<DBT>::commitTransaction(typename DBT::TransactionT& transaction, size_t& count) {
typename DBT::QueryResultsT qres(QRESULTS_FLAGS);
auto err = db_->CommitTransaction(transaction, qres);
auto err = db_.WithTimeout(timeout_).CommitTransaction(transaction, qres);
timeout_ = std::chrono::milliseconds{0};
count = qres.Count();
return err;
}

template <typename DBT>
Error ReindexerInterface<DBT>::rollbackTransaction(typename DBT::TransactionT& tr) {
auto err = db_.WithTimeout(timeout_).RollBackTransaction(tr);
timeout_ = std::chrono::milliseconds{0};
return err;
}

template <typename DBT>
Error ReindexerInterface<DBT>::selectQuery(const reindexer::Query& query, QueryResultsWrapper& result) {
typename DBT::QueryResultsT qres(QRESULTS_FLAGS);
auto err = db_->Select(query, qres);
auto err = db_.WithTimeout(timeout_).Select(query, qres);
timeout_ = std::chrono::milliseconds{0};
result.Wrap(std::move(qres));
return err;
}

template <typename DBT>
Error ReindexerInterface<DBT>::deleteQuery(const reindexer::Query& query, size_t& count) {
typename DBT::QueryResultsT qres;
auto err = db_->Delete(query, qres);
auto err = db_.WithTimeout(timeout_).Delete(query, qres);
timeout_ = std::chrono::milliseconds{0};
count = qres.Count();
return err;
}

template <typename DBT>
Error ReindexerInterface<DBT>::updateQuery(const reindexer::Query& query, QueryResultsWrapper& result) {
typename DBT::QueryResultsT qres(QRESULTS_FLAGS);
auto err = db_->Update(query, qres);
auto err = db_.WithTimeout(timeout_).Update(query, qres);
timeout_ = std::chrono::milliseconds{0};
result.Wrap(std::move(qres));
return err;
}
Expand All @@ -190,12 +317,16 @@ Error ReindexerInterface<reindexer::client::CoroReindexer>::execute(std::functio

template <>
Error ReindexerInterface<reindexer::Reindexer>::connect(const std::string& dsn) {
return db_->Connect(dsn);
auto err = db_.WithTimeout(timeout_).Connect(dsn);
timeout_ = std::chrono::milliseconds{0};
return err;
}

template <>
Error ReindexerInterface<reindexer::client::CoroReindexer>::connect(const std::string& dsn) {
return db_->Connect(dsn, loop_, reindexer::client::ConnectOpts().CreateDBIfMissing());
auto err = db_.WithTimeout(timeout_).Connect(dsn, loop_, reindexer::client::ConnectOpts().CreateDBIfMissing());
timeout_ = std::chrono::milliseconds{0};
return err;
}

template <>
Expand All @@ -205,7 +336,7 @@ Error ReindexerInterface<reindexer::Reindexer>::stop() {

template <>
Error ReindexerInterface<reindexer::client::CoroReindexer>::stop() {
db_->Stop();
db_.Stop();
stopCh_.close();
return errOK;
}
Expand Down
59 changes: 22 additions & 37 deletions pyreindexer/lib/src/reindexerinterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class ReindexerInterface {
return execute([this, ns, &keys] { return enumMeta(ns, keys); });
}
Error Select(const std::string& query, QueryResultsWrapper& result);
Error WithTimeout(std::chrono::milliseconds timeout);
void WithTimeout(std::chrono::milliseconds timeout) { timeout_ = timeout; }
Error EnumNamespaces(std::vector<NamespaceDef>& defs, EnumNamespacesOpts opts) {
return execute([this, &defs, &opts] { return enumNamespaces(defs, opts); });
}
Expand Down Expand Up @@ -132,57 +132,42 @@ class ReindexerInterface {
Error execute(std::function<Error()> f);

Error connect(const std::string& dsn);
Error openNamespace(std::string_view ns) { return db_->OpenNamespace({ns.data(), ns.size()}); }
Error closeNamespace(std::string_view ns) { return db_->CloseNamespace({ns.data(), ns.size()}); }
Error dropNamespace(std::string_view ns) { return db_->DropNamespace({ns.data(), ns.size()}); }
Error addIndex(std::string_view ns, const IndexDef& idx) { return db_->AddIndex({ns.data(), ns.size()}, idx); }
Error updateIndex(std::string_view ns, const IndexDef& idx) {
return db_->UpdateIndex({ns.data(), ns.size()}, idx);
}
Error dropIndex(std::string_view ns, const IndexDef& idx) { return db_->DropIndex({ns.data(), ns.size()}, idx); }
typename DBT::ItemT newItem(std::string_view ns) { return db_->NewItem({ns.data(), ns.size()}); }
Error insert(std::string_view ns, typename DBT::ItemT& item) { return db_->Insert({ns.data(), ns.size()}, item); }
Error upsert(std::string_view ns, typename DBT::ItemT& item) { return db_->Upsert({ns.data(), ns.size()}, item); }
Error update(std::string_view ns, typename DBT::ItemT& item) { return db_->Update({ns.data(), ns.size()}, item); }
Error deleteItem(std::string_view ns, typename DBT::ItemT& item) {
return db_->Delete({ns.data(), ns.size()}, item);
}
Error putMeta(std::string_view ns, const std::string& key, std::string_view data) {
return db_->PutMeta({ns.data(), ns.size()}, key, {data.data(), data.size()});
}
Error getMeta(std::string_view ns, const std::string& key, std::string& data) {
return db_->GetMeta({ns.data(), ns.size()}, key, data);
}
Error deleteMeta(std::string_view ns, const std::string& key) {
return db_->DeleteMeta({ns.data(), ns.size()}, key);
}
Error enumMeta(std::string_view ns, std::vector<std::string>& keys) {
return db_->EnumMeta({ns.data(), ns.size()}, keys);
}
Error select(const std::string& query, typename DBT::QueryResultsT& result) { return db_->Select(query, result); }
Error enumNamespaces(std::vector<NamespaceDef>& defs, EnumNamespacesOpts opts) {
return db_->EnumNamespaces(defs, opts);
}
typename DBT::TransactionT startTransaction(std::string_view ns) {
return db_->NewTransaction({ns.data(), ns.size()});
}
Error openNamespace(std::string_view ns);
Error closeNamespace(std::string_view ns);
Error dropNamespace(std::string_view ns);
Error addIndex(std::string_view ns, const IndexDef& idx);
Error updateIndex(std::string_view ns, const IndexDef& idx);
Error dropIndex(std::string_view ns, const IndexDef& idx);
typename DBT::ItemT newItem(std::string_view ns);
Error insert(std::string_view ns, typename DBT::ItemT& item);
Error upsert(std::string_view ns, typename DBT::ItemT& item);
Error update(std::string_view ns, typename DBT::ItemT& item);
Error deleteItem(std::string_view ns, typename DBT::ItemT& item);
Error putMeta(std::string_view ns, const std::string& key, std::string_view data);
Error getMeta(std::string_view ns, const std::string& key, std::string& data);
Error deleteMeta(std::string_view ns, const std::string& key);
Error enumMeta(std::string_view ns, std::vector<std::string>& keys);
Error select(const std::string& query, typename DBT::QueryResultsT& result);
Error enumNamespaces(std::vector<NamespaceDef>& defs, EnumNamespacesOpts opts);
typename DBT::TransactionT startTransaction(std::string_view ns);
typename DBT::ItemT newItem(typename DBT::TransactionT& tr) { return tr.NewItem(); }
Error modify(typename DBT::TransactionT& tr, typename DBT::ItemT&& item, ItemModifyMode mode);
Error commitTransaction(typename DBT::TransactionT& transaction, size_t& count);
Error rollbackTransaction(typename DBT::TransactionT& tr) { return db_->RollBackTransaction(tr); }
Error rollbackTransaction(typename DBT::TransactionT& tr);
Error selectQuery(const Query& query, QueryResultsWrapper& result);
Error deleteQuery(const Query& query, size_t& count);
Error updateQuery(const Query& query, QueryResultsWrapper& result);
Error stop();

std::unique_ptr<DBT> db_;
DBT db_;
std::thread executionThr_;
reindexer::net::ev::dynamic_loop loop_;
ICommand* curCmd_{nullptr};
reindexer::net::ev::async cmdAsync_;
std::mutex mtx_;
std::condition_variable condVar_;
reindexer::coroutine::channel<bool> stopCh_;
std::chrono::milliseconds timeout_{0};
};

} // namespace pyreindexer
1 change: 0 additions & 1 deletion pyreindexer/rx_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,6 @@ def new_query(self, namespace: str) -> Query:
self.err_code, self.err_msg, query_wrapper_ptr = self.api.create_query(self.rx, namespace)
return Query(self.api, query_wrapper_ptr)

@raise_if_error
def with_timeout(self, timeout: int) -> RxConnector:
"""Add execution timeout to the next query
Expand Down

0 comments on commit c7c3618

Please sign in to comment.