Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
shnikd committed Jun 27, 2024
1 parent 0bcb6b0 commit 4a20d2f
Show file tree
Hide file tree
Showing 6 changed files with 1,007 additions and 0 deletions.
84 changes: 84 additions & 0 deletions ydb/tools/query_replay_yt/common_deps.inc
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
SET(YDB_REPLAY_SRCS
query_compiler.cpp
query_replay.cpp
query_replay.h
main.cpp
)

SET(YDB_REPLAY_PEERDIRS
ydb/library/actors/core
ydb/library/actors/interconnect
library/cpp/getopt
ydb/library/grpc/client
ydb/library/grpc/server
library/cpp/regex/pcre
ydb/core/actorlib_impl
ydb/core/base
ydb/core/blobstorage/pdisk
ydb/core/client
ydb/core/client/metadata
ydb/core/client/minikql_compile
ydb/core/client/scheme_cache_lib
ydb/core/cms/console
ydb/core/engine/minikql
ydb/core/fq/libs/init
ydb/core/fq/libs/mock
ydb/core/grpc_services
ydb/core/grpc_streaming
ydb/core/health_check
ydb/core/kesus/proxy
ydb/core/kesus/tablet
ydb/core/kqp
ydb/core/mind
ydb/core/mind/address_classification
ydb/core/mind/bscontroller
ydb/core/mind/hive
ydb/core/node_whiteboard
ydb/core/scheme
ydb/core/security
ydb/core/sys_view/processor
ydb/core/sys_view/service
ydb/core/tx/columnshard
ydb/core/tx/coordinator
ydb/core/tx/long_tx_service
ydb/core/tx/mediator
ydb/core/tx/time_cast
ydb/core/tx/tx_proxy
ydb/library/yql/core/services/mounts
ydb/library/yql/minikql/comp_nodes/llvm14
ydb/library/yql/providers/yt/codec/codegen
ydb/library/yql/providers/yt/comp_nodes/llvm14
ydb/library/yql/public/udf/service/exception_policy
ydb/public/api/protos
ydb/public/lib/base
ydb/public/lib/deprecated/kicli
ydb/public/sdk/cpp/client/ydb_table
ydb/services/cms
ydb/services/datastreams
ydb/services/discovery
ydb/services/kesus
ydb/services/persqueue_cluster_discovery
ydb/services/persqueue_v1
ydb/services/rate_limiter
ydb/services/ydb
ydb/library/yql/udfs/common/clickhouse/client
ydb/library/yql/udfs/common/datetime
ydb/library/yql/udfs/common/datetime2
ydb/library/yql/udfs/common/digest
ydb/library/yql/udfs/common/histogram
ydb/library/yql/udfs/common/hyperloglog
ydb/library/yql/udfs/common/hyperscan
ydb/library/yql/udfs/common/json
ydb/library/yql/udfs/common/json2
ydb/library/yql/udfs/common/math
ydb/library/yql/udfs/common/pire
ydb/library/yql/udfs/common/re2
ydb/library/yql/udfs/common/set
ydb/library/yql/udfs/common/stat
ydb/library/yql/udfs/common/string
ydb/library/yql/udfs/common/top
ydb/library/yql/udfs/common/topfreq
ydb/library/yql/udfs/common/yson2
ydb/library/yql/udfs/logs/dsv
ydb/library/yql/sql/pg_dummy
)
139 changes: 139 additions & 0 deletions ydb/tools/query_replay_yt/main.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
#include "query_replay.h"

#include <ydb/library/actors/core/actorsystem.h>
#include <ydb/core/client/minikql_compile/mkql_compile_service.h>
#include <ydb/core/kqp/rm_service/kqp_rm_service.h>
#include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h>

#include <yt/cpp/mapreduce/interface/operation.h>
#include <yt/cpp/mapreduce/interface/client.h>
#include <yt/cpp/mapreduce/interface/config.h>

#include <yt/cpp/mapreduce/interface/logging/logger.h>

using namespace NActors;

class TQueryReplayMapper
: public NYT::IMapper<NYT::TTableReader<NYT::TNode>, NYT::TTableWriter<NYT::TNode>>
{

THolder<NActors::TActorSystem> ActorSystem;
THolder<NKikimr::TAppData> AppData;
TIntrusivePtr<NKikimr::NScheme::TKikimrTypeRegistry> TypeRegistry;
TIntrusivePtr<NKikimr::NMiniKQL::IMutableFunctionRegistry> FunctionRegistry;
TIntrusivePtr<NKikimr::NKqp::TModuleResolverState> ModuleResolverState;

TQueryReplayConfig Config;

TString GetFailReason(const TQueryReplayEvents::TCheckQueryPlanStatus& status) {
switch (status) {
case TQueryReplayEvents::CompileError:
return "compile_error";
case TQueryReplayEvents::CompileTimeout:
return "compile_timeout";
case TQueryReplayEvents::TableMissing:
return "table_missing";
case TQueryReplayEvents::ExtraReadingOldEngine:
return "extra_reading_old_engine";
case TQueryReplayEvents::ExtraReadingNewEngine:
return "extra_reading_new_engine";
case TQueryReplayEvents::ReadTypesMismatch:
return "read_types_mismatch";
case TQueryReplayEvents::ReadLimitsMismatch:
return "read_limits_mismatch";
case TQueryReplayEvents::ReadColumnsMismatch:
return "read_columns_mismatch";
case TQueryReplayEvents::ExtraWriting:
return "extra_writing";
case TQueryReplayEvents::WriteColumnsMismatch:
return "write_columns_mismatch";
case TQueryReplayEvents::UncategorizedPlanMismatch:
return "uncategorized_plan_mismatch";
default:
return "unspecified";
}
}

public:
TQueryReplayMapper() = default;
TQueryReplayMapper(const TQueryReplayConfig& config) : Config(config) {
}

void Start(NYT::TTableWriter<NYT::TNode>*) override {
TypeRegistry.Reset(new NKikimr::NScheme::TKikimrTypeRegistry());
FunctionRegistry.Reset(NKikimr::NMiniKQL::CreateFunctionRegistry(NKikimr::NMiniKQL::CreateBuiltinRegistry())->Clone());
NKikimr::NMiniKQL::FillStaticModules(*FunctionRegistry);
AppData.Reset(new NKikimr::TAppData(0, 0, 0, 0, {}, TypeRegistry.Get(), FunctionRegistry.Get(), nullptr, nullptr));
auto setup = BuildActorSystemSetup(Config.ActorSystemThreadsCount);
ActorSystem.Reset(new TActorSystem(setup, AppData.Get()));
ActorSystem->Start();
ActorSystem->Register(NKikimr::NKqp::CreateKqpResourceManagerActor({}, nullptr));
ModuleResolverState = MakeIntrusive<NKikimr::NKqp::TModuleResolverState>();
Y_ABORT_UNLESS(GetYqlDefaultModuleResolver(ModuleResolverState->ExprCtx, ModuleResolverState->ModuleResolver));
}

void Do(NYT::TTableReader<NYT::TNode>* in, NYT::TTableWriter<NYT::TNode>* out) override {
for (; in->IsValid(); in->Next()) {
const auto& row = in->GetRow();
NJson::TJsonValue json(NJson::JSON_MAP);
for (const auto& [key, child]: row.AsMap()) {
if (key == "_logfeller_timestamp")
continue;
json.InsertValue(key, NJson::TJsonValue(child.AsString()));
}

auto compileActorId = ActorSystem->Register(CreateQueryCompiler(ModuleResolverState, FunctionRegistry.Get()));

auto future = ActorSystem->Ask<TQueryReplayEvents::TEvCompileResponse>(
compileActorId,
THolder(new TQueryReplayEvents::TEvCompileRequest(std::move(json))),
TDuration::Seconds(100));

auto response = future.ExtractValueSync();
auto status = response.Get()->Status;

TString failReason = GetFailReason(status);

NYT::TNode result;
result = result("query_id", row["query_id"].AsString());
result = result("fail_reason", failReason);
result = result("extra_message", response.Get()->Message);

out->AddRow(result);
}
}

void Finish(NYT::TTableWriter<NYT::TNode>*) override {
ActorSystem->Stop();
}
};

static NYT::TTableSchema OutputSchema() {
NYT::TTableSchema schema;
schema.AddColumn(NYT::TColumnSchema().Name("query_id").Type(NYT::VT_STRING));
schema.AddColumn(NYT::TColumnSchema().Name("fail_reason").Type(NYT::VT_STRING));
schema.AddColumn(NYT::TColumnSchema().Name("extra_message").Type(NYT::VT_STRING));
return schema;
}

REGISTER_NAMED_MAPPER("Query replay mapper", TQueryReplayMapper);

int main(int argc, const char** argv) {
NYT::Initialize(argc, argv);

TQueryReplayConfig config;
config.ParseConfig(argc, argv);
OutputSchema();

auto client = NYT::CreateClient(config.Cluster);
NYT::SetLogger(NYT::CreateStdErrLogger(NYT::ILogger::ELevel::INFO));

NYT::TMapOperationSpec spec;
spec.AddInput<NYT::TNode>(config.SrcPath);
spec.AddOutput<NYT::TNode>(NYT::TRichYPath(config.DstPath).Schema(OutputSchema()));
spec.MapperSpec(NYT::TUserJobSpec().MemoryLimit(5_GB));

client->Map(spec, new TQueryReplayMapper(config));

return EXIT_SUCCESS;
}
Loading

0 comments on commit 4a20d2f

Please sign in to comment.