Skip to content

Commit

Permalink
Introduce cache similar requests
Browse files Browse the repository at this point in the history
  • Loading branch information
tvorogme committed Sep 9, 2024
1 parent 062440b commit e23e891
Showing 1 changed file with 79 additions and 25 deletions.
104 changes: 79 additions & 25 deletions lite-server-daemon/adnl-lite-proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@
using ValidUntil = td::int64;
using RateLimit = td::int32;

ton::Bits256 compute_file_hash(td::Slice data) {
ton::Bits256 data_hash;
td::sha256(data, td::MutableSlice{data_hash.data(), 32});
return data_hash;
}

static std::string string_block_id(ton::tl_object_ptr<ton::lite_api::tonNode_blockIdExt> &B) {
return "(" + std::to_string(B->workchain_) + ":" + std::to_string(B->shard_) + ":" + std::to_string(B->seqno_) +
Expand Down Expand Up @@ -872,17 +877,26 @@ namespace ton::liteserver {
}
}

void process_cache(td::BufferSlice data, td::BufferSlice result, const std::string &compiled_query = "",
td::Timer elapsed = {}) {
std::string data_hash = compute_file_hash(data).to_hex();

auto it = cache_similar.find(data_hash);

if (it != cache_similar.end()) {
for (auto &promise: it->second) {
LOG(INFO) << "Found cache for request: " << data_hash << " query: " << compiled_query;
promise.set_value(result.clone());
}

cache_similar.erase(it);
}
}

void check_ext_answer(adnl::AdnlNodeIdShort src, adnl::AdnlNodeIdShort dst, td::BufferSlice data,
td::Promise<td::BufferSlice> promise, int refire, td::Result<td::BufferSlice> result,
std::time_t started_at, td::Timer elapsed, adnl::AdnlNodeIdShort server_adnl,
const std::string &compiled_query = "") {
if (refire > allowed_refire) {
// td::actor::send_closure(actor_id(this), &LiteProxy::publish_call, dst, std::move(data), started_at,
// elapsed);
promise.set_result(std::move(result));
return;
}

if (result.is_ok()) {
auto res = result.move_as_ok();
auto lite_error = ton::fetch_tl_object<ton::lite_api::liteServer_error>(res.clone(), true);
Expand All @@ -893,49 +907,88 @@ namespace ton::liteserver {
for (const auto &substring: {"cannot compute block with specified transaction",
"cannot load block", "seqno not in db", "block not found"}) {
if (error->message_.find(substring) != std::string::npos) {
LOG(ERROR) << "Refire on cannot load block: " << compiled_query << " Elapsed: " << started_at;
td::actor::send_closure(actor_id(this), &LiteProxy::check_ext_query, src, dst,
std::move(data), std::move(promise), refire + 1);
if (refire + 1 > allowed_refire) {
LOG(ERROR) << "Too deep refire";
auto res = create_serialize_tl_object<lite_api::liteServer_error>(228, "Too deep refire");
process_cache(std::move(data), res.clone(), compiled_query, elapsed);
promise.set_value(std::move(res));
return;
} else {
LOG(ERROR) << "Refire on cannot load block: " << compiled_query << " Elapsed: " << elapsed;
td::actor::send_closure(actor_id(this), &LiteProxy::check_ext_query, src, dst,
std::move(data), std::move(promise), refire + 1);
}

return;
}
}

LOG(ERROR)
<< "Got unexpected error for refire: " << error->message_ << " Query: " << compiled_query << " Elapsed: "
<< started_at;
if (refire < allowed_refire) {
refire = allowed_refire - 1;
}

// td::actor::send_closure(actor_id(this), &LiteProxy::publish_call, dst, std::move(data), started_at,
// elapsed);
promise.set_value(std::move(res));
if (refire + 1 > allowed_refire) {
LOG(ERROR) << "Too deep refire";
auto res = create_serialize_tl_object<lite_api::liteServer_error>(228, "Too deep refire");
process_cache(std::move(data), res.clone(), compiled_query, elapsed);
promise.set_value(std::move(res));
return;
} else {
LOG(ERROR)
<< "Got unexpected error for refire: " << error->message_ << " Query: " << compiled_query
<< " Elapsed: "
<< elapsed;
td::actor::send_closure(actor_id(this), &LiteProxy::check_ext_query, src, dst,
std::move(data), std::move(promise), refire + 1);
}
return;

} else {
// td::actor::send_closure(actor_id(this), &LiteProxy::publish_call, dst, std::move(data), started_at,
// elapsed);
LOG(INFO)
<< "Query to: " << server_adnl << " success, Query: " << compiled_query << " Elapsed: " << elapsed;
process_cache(std::move(data), res.clone(), compiled_query, elapsed);
promise.set_value(std::move(res));
return;
}
} else {
auto error = result.move_as_error();

if (refire < allowed_refire) {
refire = allowed_refire;
refire = allowed_refire - 1;
}

LOG(ERROR)
<< "Got unexpected error for refire: " << error.message() << " server: " << server_adnl << " Query: "
<< compiled_query << " Elapsed: " << started_at;

td::actor::send_closure(actor_id(this), &LiteProxy::check_ext_query, src, dst,
std::move(data), std::move(promise), refire);
if (refire + 1 > allowed_refire) {
LOG(ERROR) << "Too deep refire";
auto res = create_serialize_tl_object<lite_api::liteServer_error>(228, "Too deep refire");
process_cache(std::move(data), res.clone(), compiled_query, elapsed);
promise.set_value(std::move(res));
return;
} else {
LOG(ERROR)
<< "Got unexpected error for refire: " << error.message() << " server: " << server_adnl << " Query: "
<< compiled_query << " Elapsed: " << elapsed;

td::actor::send_closure(actor_id(this), &LiteProxy::check_ext_query, src, dst,
std::move(data), std::move(promise), refire + 1);
}
}
}

void process_ext_query(adnl::AdnlNodeIdShort src, adnl::AdnlNodeIdShort dst, td::BufferSlice data,
td::Promise<td::BufferSlice> promise, int refire = 0, std::string query_compiled = "") {
std::string data_hash = compute_file_hash(data).to_hex();

auto it = cache_similar.find(data_hash);

if (it != cache_similar.end()) {
LOG(INFO) << "Found similar request, store: " << data_hash;
it->second.push_back(std::move(promise));
return;
} else {
cache_similar[data_hash] = std::vector<td::Promise<td::BufferSlice>>();
}


if (mode_ == 0) {
LOG(WARNING) << "New proxy query: " << dst.bits256_value().to_hex() << " size: " << data.size();

Expand Down Expand Up @@ -1644,6 +1697,7 @@ namespace ton::liteserver {
std::vector<adnl::AdnlNodeIdShort> uptodate_private_ls{};
ton::PublicKeyHash default_dht_node_ = ton::PublicKeyHash::zero();
std::map<ton::adnl::AdnlNodeIdShort, std::tuple<ValidUntil, RateLimit>> limits;
std::map<std::string, std::vector<td::Promise<td::BufferSlice>>> cache_similar;
std::map<ton::adnl::AdnlNodeIdShort, int> usage;
long long rps;
std::map<BlockSeqno, WaitList<td::actor::Actor, td::Unit>> shard_client_waiters_;
Expand Down

0 comments on commit e23e891

Please sign in to comment.