Skip to content

Commit

Permalink
Add with_timeout func. Part I
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexander.A,Utkin committed Dec 4, 2024
1 parent db70321 commit fb805a3
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 40 deletions.
3 changes: 2 additions & 1 deletion pyreindexer/example/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ def create_items_example(db, namespace):
def select_item_query_example(db, namespace):
item_name_for_lookup = 'item_0'

return db.select("SELECT * FROM " + namespace + " WHERE name='" + item_name_for_lookup + "'")
return (db.with_timeout(1000)
.select("SELECT * FROM " + namespace + " WHERE name='" + item_name_for_lookup + "'"))


def transaction_example(db, namespace, items_in_base):
Expand Down
15 changes: 15 additions & 0 deletions pyreindexer/lib/src/rawpyreindexer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ PyObject* queryResultsWrapperIterate(uintptr_t qresWrapperAddr) {
static PyObject* Init(PyObject* self, PyObject* args) {
ReindexerConfig cfg;
char* clientName = nullptr;
int connectTimeout = 0;
int requestTimeout = 0;
unsigned enableCompression = 0;
unsigned startSpecialThread = 0;
unsigned maxReplUpdatesSize = 0;
Expand All @@ -69,6 +71,8 @@ static PyObject* Init(PyObject* self, PyObject* args) {
return nullptr;
}

cfg.connectTimeout = std::chrono::seconds(connectTimeout);
cfg.requestTimeout = std::chrono::seconds(requestTimeout);
cfg.enableCompression = (enableCompression != 0);
cfg.requestDedicatedThread = (startSpecialThread != 0);
cfg.appName = clientName;
Expand Down Expand Up @@ -127,6 +131,17 @@ static PyObject* Select(PyObject* self, PyObject* args) {
reinterpret_cast<uintptr_t>(qresWrapper.release()), count, totalCount);
}

static PyObject* WithTimeout(PyObject* self, PyObject* args) {
uintptr_t rx = 0;
unsigned timeout = 0;
if (!PyArg_ParseTuple(args, "kI", &rx, &timeout)) {
return nullptr;
}

auto err = getWrapper<DBInterface>(rx)->WithTimeout(std::chrono::milliseconds(timeout));
return pyErr(err);
}

// namespace ----------------------------------------------------------------------------------------------------------

static PyObject* NamespaceOpen(PyObject* self, PyObject* args) {
Expand Down
2 changes: 2 additions & 0 deletions pyreindexer/lib/src/rawpyreindexer.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ static PyObject* Init(PyObject* self, PyObject* args);
static PyObject* Destroy(PyObject* self, PyObject* args);
static PyObject* Connect(PyObject* self, PyObject* args);
static PyObject* Select(PyObject* self, PyObject* args);
static PyObject* WithTimeout(PyObject* self, PyObject* args);
// namespace
static PyObject* NamespaceOpen(PyObject* self, PyObject* args);
static PyObject* NamespaceClose(PyObject* self, PyObject* args);
Expand Down Expand Up @@ -104,6 +105,7 @@ static PyMethodDef module_methods[] = {
{"destroy", Destroy, METH_VARARGS, "destroy reindexer instance"},
{"connect", Connect, METH_VARARGS, "connect to reindexer database"},
{"select", Select, METH_VARARGS, "select query"},
{"with_timeout", WithTimeout, METH_VARARGS, "Add execution timeout"},
// namespace
{"namespace_open", NamespaceOpen, METH_VARARGS, "open namespace"},
{"namespace_close", NamespaceClose, METH_VARARGS, "close namespace"},
Expand Down
34 changes: 21 additions & 13 deletions pyreindexer/lib/src/reindexerinterface.cc
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#include "reindexerinterface.h"
#include <chrono>
#include "client/cororeindexer.h"
#include "core/reindexer.h"
#include "core/type_consts.h"
Expand Down Expand Up @@ -41,15 +40,15 @@ class GenericCommand : public ICommand {

template <>
ReindexerInterface<reindexer::Reindexer>::ReindexerInterface(const ReindexerConfig& cfg)
: db_(reindexer::ReindexerConfig()
.WithUpdatesSize(cfg.maxReplUpdatesSize)
.WithAllocatorCacheLimits(cfg.allocatorCacheLimit, cfg.allocatorCachePart))
: db_(std::make_unique<reindexer::Reindexer>(reindexer::ReindexerConfig()
.WithUpdatesSize(cfg.maxReplUpdatesSize)
.WithAllocatorCacheLimits(cfg.allocatorCacheLimit, cfg.allocatorCachePart)))
{ }

template <>
ReindexerInterface<reindexer::client::CoroReindexer>::ReindexerInterface(const ReindexerConfig& cfg)
: db_(reindexer::client::ReindexerConfig(4, 1, cfg.fetchAmount, 0, std::chrono::seconds(cfg.connectTimeout),
std::chrono::seconds(cfg.requestTimeout), cfg.enableCompression, cfg.requestDedicatedThread, cfg.appName))
: db_(std::make_unique<reindexer::client::CoroReindexer>(reindexer::client::ReindexerConfig(4, 1, cfg.fetchAmount,
0, cfg.connectTimeout, cfg.requestTimeout, cfg.enableCompression, cfg.requestDedicatedThread, cfg.appName)))
{
std::atomic_bool running{false};
executionThr_ = std::thread([this, &running] {
Expand Down Expand Up @@ -102,6 +101,15 @@ Error ReindexerInterface<DBT>::Select(const std::string& query, QueryResultsWrap
});
}

template <typename DBT>
Error ReindexerInterface<DBT>::WithTimeout(std::chrono::milliseconds timeout) {
return execute([this, timeout] {
auto db = db_->WithTimeout(timeout);
db_ = std::make_unique<DBT>(std::move(db));
return errOK;
});
}

template <typename DBT>
Error ReindexerInterface<DBT>::FetchResults(QueryResultsWrapper& result) {
return execute([&result] {
Expand Down Expand Up @@ -135,31 +143,31 @@ Error ReindexerInterface<reindexer::client::CoroReindexer>::modify(reindexer::cl
template <typename DBT>
Error ReindexerInterface<DBT>::commitTransaction(typename DBT::TransactionT& transaction, size_t& count) {
typename DBT::QueryResultsT qres(QRESULTS_FLAGS);
auto err = db_.CommitTransaction(transaction, qres);
auto err = db_->CommitTransaction(transaction, qres);
count = qres.Count();
return err;
}

template <typename DBT>
Error ReindexerInterface<DBT>::selectQuery(const reindexer::Query& query, QueryResultsWrapper& result) {
typename DBT::QueryResultsT qres(QRESULTS_FLAGS);
auto err = db_.Select(query, qres);
auto err = db_->Select(query, qres);
result.Wrap(std::move(qres));
return err;
}

template <typename DBT>
Error ReindexerInterface<DBT>::deleteQuery(const reindexer::Query& query, size_t& count) {
typename DBT::QueryResultsT qres;
auto err = db_.Delete(query, qres);
auto err = db_->Delete(query, qres);
count = qres.Count();
return err;
}

template <typename DBT>
Error ReindexerInterface<DBT>::updateQuery(const reindexer::Query& query, QueryResultsWrapper& result) {
typename DBT::QueryResultsT qres(QRESULTS_FLAGS);
auto err = db_.Update(query, qres);
auto err = db_->Update(query, qres);
result.Wrap(std::move(qres));
return err;
}
Expand All @@ -182,12 +190,12 @@ Error ReindexerInterface<reindexer::client::CoroReindexer>::execute(std::functio

template <>
Error ReindexerInterface<reindexer::Reindexer>::connect(const std::string& dsn) {
return db_.Connect(dsn);
return db_->Connect(dsn);
}

template <>
Error ReindexerInterface<reindexer::client::CoroReindexer>::connect(const std::string& dsn) {
return db_.Connect(dsn, loop_, reindexer::client::ConnectOpts().CreateDBIfMissing());
return db_->Connect(dsn, loop_, reindexer::client::ConnectOpts().CreateDBIfMissing());
}

template <>
Expand All @@ -197,7 +205,7 @@ Error ReindexerInterface<reindexer::Reindexer>::stop() {

template <>
Error ReindexerInterface<reindexer::client::CoroReindexer>::stop() {
db_.Stop();
db_->Stop();
stopCh_.close();
return errOK;
}
Expand Down
64 changes: 41 additions & 23 deletions pyreindexer/lib/src/reindexerinterface.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <chrono>
#include <condition_variable>
#include <thread>
#include "core/query/query.h"
Expand All @@ -23,8 +24,8 @@ class ICommand;

struct ReindexerConfig {
int fetchAmount{1000};
int connectTimeout{0};
int requestTimeout{0};
std::chrono::seconds connectTimeout{0};
std::chrono::seconds requestTimeout{0};
bool enableCompression{false};
bool requestDedicatedThread{false};
std::string appName;
Expand Down Expand Up @@ -79,7 +80,7 @@ class ReindexerInterface {
return execute([this, ns, &item] { return update(ns, item); });
}
Error Delete(std::string_view ns, typename DBT::ItemT& item) {
return execute([this, ns, &item] { return deleteImpl(ns, item); });
return execute([this, ns, &item] { return deleteItem(ns, item); });
}
Error PutMeta(std::string_view ns, const std::string& key, std::string_view data) {
return execute([this, ns, &key, data] { return putMeta(ns, key, data); });
Expand All @@ -94,6 +95,7 @@ class ReindexerInterface {
return execute([this, ns, &keys] { return enumMeta(ns, keys); });
}
Error Select(const std::string& query, QueryResultsWrapper& result);
Error WithTimeout(std::chrono::milliseconds timeout);
Error EnumNamespaces(std::vector<NamespaceDef>& defs, EnumNamespacesOpts opts) {
return execute([this, &defs, &opts] { return enumNamespaces(defs, opts); });
}
Expand Down Expand Up @@ -130,34 +132,50 @@ class ReindexerInterface {
Error execute(std::function<Error()> f);

Error connect(const std::string& dsn);
Error openNamespace(std::string_view ns) { return db_.OpenNamespace({ns.data(), ns.size()}); }
Error closeNamespace(std::string_view ns) { return db_.CloseNamespace({ns.data(), ns.size()}); }
Error dropNamespace(std::string_view ns) { return db_.DropNamespace({ns.data(), ns.size()}); }
Error addIndex(std::string_view ns, const IndexDef& idx) { return db_.AddIndex({ns.data(), ns.size()}, idx); }
Error updateIndex(std::string_view ns, const IndexDef& idx) { return db_.UpdateIndex({ns.data(), ns.size()}, idx); }
Error dropIndex(std::string_view ns, const IndexDef& idx) { return db_.DropIndex({ns.data(), ns.size()}, idx); }
typename DBT::ItemT newItem(std::string_view ns) { return db_.NewItem({ns.data(), ns.size()}); }
Error insert(std::string_view ns, typename DBT::ItemT& item) { return db_.Insert({ns.data(), ns.size()}, item); }
Error upsert(std::string_view ns, typename DBT::ItemT& item) { return db_.Upsert({ns.data(), ns.size()}, item); }
Error update(std::string_view ns, typename DBT::ItemT& item) { return db_.Update({ns.data(), ns.size()}, item); }
Error deleteImpl(std::string_view ns, typename DBT::ItemT& item) { return db_.Delete({ns.data(), ns.size()}, item); }
Error putMeta(std::string_view ns, const std::string& key, std::string_view data) { return db_.PutMeta({ns.data(), ns.size()}, key, {data.data(), data.size()}); }
Error getMeta(std::string_view ns, const std::string& key, std::string& data) { return db_.GetMeta({ns.data(), ns.size()}, key, data); }
Error deleteMeta(std::string_view ns, const std::string& key) { return db_.DeleteMeta({ns.data(), ns.size()}, key); }
Error enumMeta(std::string_view ns, std::vector<std::string>& keys) { return db_.EnumMeta({ns.data(), ns.size()}, keys); }
Error select(const std::string& query, typename DBT::QueryResultsT& result) { return db_.Select(query, result); }
Error enumNamespaces(std::vector<NamespaceDef>& defs, EnumNamespacesOpts opts) { return db_.EnumNamespaces(defs, opts); }
typename DBT::TransactionT startTransaction(std::string_view ns) { return db_.NewTransaction({ns.data(), ns.size()}); }
Error openNamespace(std::string_view ns) { return db_->OpenNamespace({ns.data(), ns.size()}); }
Error closeNamespace(std::string_view ns) { return db_->CloseNamespace({ns.data(), ns.size()}); }
Error dropNamespace(std::string_view ns) { return db_->DropNamespace({ns.data(), ns.size()}); }
Error addIndex(std::string_view ns, const IndexDef& idx) { return db_->AddIndex({ns.data(), ns.size()}, idx); }
Error updateIndex(std::string_view ns, const IndexDef& idx) {
return db_->UpdateIndex({ns.data(), ns.size()}, idx);
}
Error dropIndex(std::string_view ns, const IndexDef& idx) { return db_->DropIndex({ns.data(), ns.size()}, idx); }
typename DBT::ItemT newItem(std::string_view ns) { return db_->NewItem({ns.data(), ns.size()}); }
Error insert(std::string_view ns, typename DBT::ItemT& item) { return db_->Insert({ns.data(), ns.size()}, item); }
Error upsert(std::string_view ns, typename DBT::ItemT& item) { return db_->Upsert({ns.data(), ns.size()}, item); }
Error update(std::string_view ns, typename DBT::ItemT& item) { return db_->Update({ns.data(), ns.size()}, item); }
Error deleteItem(std::string_view ns, typename DBT::ItemT& item) {
return db_->Delete({ns.data(), ns.size()}, item);
}
Error putMeta(std::string_view ns, const std::string& key, std::string_view data) {
return db_->PutMeta({ns.data(), ns.size()}, key, {data.data(), data.size()});
}
Error getMeta(std::string_view ns, const std::string& key, std::string& data) {
return db_->GetMeta({ns.data(), ns.size()}, key, data);
}
Error deleteMeta(std::string_view ns, const std::string& key) {
return db_->DeleteMeta({ns.data(), ns.size()}, key);
}
Error enumMeta(std::string_view ns, std::vector<std::string>& keys) {
return db_->EnumMeta({ns.data(), ns.size()}, keys);
}
Error select(const std::string& query, typename DBT::QueryResultsT& result) { return db_->Select(query, result); }
Error enumNamespaces(std::vector<NamespaceDef>& defs, EnumNamespacesOpts opts) {
return db_->EnumNamespaces(defs, opts);
}
typename DBT::TransactionT startTransaction(std::string_view ns) {
return db_->NewTransaction({ns.data(), ns.size()});
}
typename DBT::ItemT newItem(typename DBT::TransactionT& tr) { return tr.NewItem(); }
Error modify(typename DBT::TransactionT& tr, typename DBT::ItemT&& item, ItemModifyMode mode);
Error commitTransaction(typename DBT::TransactionT& transaction, size_t& count);
Error rollbackTransaction(typename DBT::TransactionT& tr) { return db_.RollBackTransaction(tr); }
Error rollbackTransaction(typename DBT::TransactionT& tr) { return db_->RollBackTransaction(tr); }
Error selectQuery(const Query& query, QueryResultsWrapper& result);
Error deleteQuery(const Query& query, size_t& count);
Error updateQuery(const Query& query, QueryResultsWrapper& result);
Error stop();

DBT db_;
std::unique_ptr<DBT> db_;
std::thread executionThr_;
reindexer::net::ev::dynamic_loop loop_;
ICommand* curCmd_{nullptr};
Expand Down
23 changes: 20 additions & 3 deletions pyreindexer/rx_connector.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

from typing import Dict, List

from pyreindexer.query import Query
Expand Down Expand Up @@ -37,12 +39,12 @@ def __init__(self, dsn: str, *,
#### Arguments:
dsn (string): The connection string which contains a protocol
Examples: 'builtin:///tmp/pyrx', 'cproto://127.0.0.1:6534/pyrx
Examples: 'builtin:///tmp/pyrx', 'cproto://127.0.0.1:6534/pyrx'
cproto options:
fetch_amount (int): The number of items that will be fetched by one operation
connect_timeout (int): Connection and database login timeout value
request_timeout (int): Request execution timeout value
connect_timeout (int): Connection and database login timeout value [seconds]
request_timeout (int): Request execution timeout value [seconds]
enable_compression (bool): Flag enable/disable traffic compression
start_special_thread (bool): Determines whether to request a special thread of execution
on the server for this connection
Expand Down Expand Up @@ -393,6 +395,21 @@ def new_query(self, namespace: str) -> Query:
self.err_code, self.err_msg, query_wrapper_ptr = self.api.create_query(self.rx, namespace)
return Query(self.api, query_wrapper_ptr)

@raise_if_error
def with_timeout(self, timeout: int) -> RxConnector:
"""Add execution timeout to the next query
#### Arguments:
timeout (int): Optional server-side execution timeout for each subquery [milliseconds]
#### Returns:
(:obj:`RxConnector`): RxConnector object for further customizations
"""

self.api.with_timeout(self.rx, timeout)
return self

def _api_import(self, dsn):
"""Imports an API dynamically depending on protocol specified in dsn
Expand Down

0 comments on commit fb805a3

Please sign in to comment.