Skip to content

Commit

Permalink
S3 Stream Read Actor Back Pressure
Browse files Browse the repository at this point in the history
  • Loading branch information
Hor911 committed Dec 26, 2022
1 parent 658ffdd commit f089705
Show file tree
Hide file tree
Showing 6 changed files with 227 additions and 62 deletions.
1 change: 1 addition & 0 deletions ydb/core/yq/libs/config/protos/read_actors_factory.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ message TS3ReadActorFactoryConfig {
NYql.NS3.TRetryConfig RetryConfig = 1;
uint64 RowsInBatch = 2; // Default = 1000
uint64 MaxInflight = 3; // Default = 20
uint64 DataInflight = 4; // Default = 1 GB
}

message TPqReadActorFactoryConfig {
Expand Down
22 changes: 13 additions & 9 deletions ydb/core/yq/libs/init/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,29 +71,31 @@ void Init(
{
Y_VERIFY(iyqSharedResources, "No YQ shared resources created");
TYqSharedResources::TPtr yqSharedResources = TYqSharedResources::Cast(iyqSharedResources);
const auto clientCounters = appData->Counters->GetSubgroup("counters", "yq")->GetSubgroup("subsystem", "ClientMetrics");

auto yqCounters = appData->Counters->GetSubgroup("counters", "yq");
const auto clientCounters = yqCounters->GetSubgroup("subsystem", "ClientMetrics");

if (protoConfig.GetControlPlaneStorage().GetEnabled()) {
auto controlPlaneStorage = protoConfig.GetControlPlaneStorage().GetUseInMemory()
? NYq::CreateInMemoryControlPlaneStorageServiceActor(protoConfig.GetControlPlaneStorage())
: NYq::CreateYdbControlPlaneStorageServiceActor(
protoConfig.GetControlPlaneStorage(),
protoConfig.GetCommon(),
appData->Counters->GetSubgroup("counters", "yq")->GetSubgroup("subsystem", "ControlPlaneStorage"),
yqCounters->GetSubgroup("subsystem", "ControlPlaneStorage"),
yqSharedResources,
credentialsProviderFactory,
tenant);
actorRegistrator(NYq::ControlPlaneStorageServiceActorId(), controlPlaneStorage);

actorRegistrator(NYq::ControlPlaneConfigActorId(),
CreateControlPlaneConfigActor(yqSharedResources, credentialsProviderFactory, protoConfig.GetControlPlaneStorage(),
appData->Counters->GetSubgroup("counters", "yq")->GetSubgroup("subsystem", "ControlPlaneConfig"))
yqCounters->GetSubgroup("subsystem", "ControlPlaneConfig"))
);
}

if (protoConfig.GetControlPlaneProxy().GetEnabled()) {
auto controlPlaneProxy = NYq::CreateControlPlaneProxyActor(protoConfig.GetControlPlaneProxy(),
appData->Counters->GetSubgroup("counters", "yq")->GetSubgroup("subsystem", "ControlPlaneProxy"), protoConfig.GetQuotasManager().GetEnabled());
yqCounters->GetSubgroup("subsystem", "ControlPlaneProxy"), protoConfig.GetQuotasManager().GetEnabled());
actorRegistrator(NYq::ControlPlaneProxyActorId(), controlPlaneProxy);
}

Expand All @@ -110,7 +112,7 @@ void Init(
if (protoConfig.GetAudit().GetEnabled()) {
auto* auditSerive = auditServiceFactory(
protoConfig.GetAudit(),
appData->Counters->GetSubgroup("counters", "yq")->GetSubgroup("subsystem", "audit"));
yqCounters->GetSubgroup("subsystem", "audit"));
actorRegistrator(NYq::YqAuditServiceActorId(), auditSerive);
}

Expand All @@ -125,7 +127,6 @@ void Init(
actorRegistrator(NYql::NDq::MakeCheckpointStorageID(), checkpointStorage.release());
}

auto yqCounters = appData->Counters->GetSubgroup("counters", "yq");
auto workerManagerCounters = NYql::NDqs::TWorkerManagerCounters(yqCounters->GetSubgroup("subsystem", "worker_manager"));

TVector<NKikimr::NMiniKQL::TComputationNodeFactory> compNodeFactories = {
Expand Down Expand Up @@ -170,10 +171,13 @@ void Init(
if (const ui64 maxInflight = s3readConfig.GetMaxInflight()) {
readActorFactoryCfg.MaxInflight = maxInflight;
}
if (const ui64 dataInflight = s3readConfig.GetDataInflight()) {
readActorFactoryCfg.DataInflight = dataInflight;
}
RegisterDqPqReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory, !protoConfig.GetReadActorsFactoryConfig().GetPqReadActorFactoryConfig().GetCookieCommitMode());
RegisterYdbReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory);
RegisterS3ReadActorFactory(*asyncIoFactory, credentialsFactory,
httpGateway, s3HttpRetryPolicy, readActorFactoryCfg);
RegisterS3ReadActorFactory(*asyncIoFactory, credentialsFactory, httpGateway, s3HttpRetryPolicy, readActorFactoryCfg,
yqCounters->GetSubgroup("subsystem", "S3ReadActor"), appData->Counters->GetSubgroup("counters", "dq_tasks"));
RegisterS3WriteActorFactory(*asyncIoFactory, credentialsFactory,
httpGateway, s3HttpRetryPolicy);
RegisterClickHouseReadActorFactory(*asyncIoFactory, credentialsFactory, httpGateway);
Expand Down Expand Up @@ -262,7 +266,7 @@ void Init(
pqCmConnections,
appData->FunctionRegistry,
httpGateway,
appData->Counters->GetSubgroup("counters", "yq")->GetSubgroup("subsystem", "TestConnection"));
yqCounters->GetSubgroup("subsystem", "TestConnection"));
actorRegistrator(NYq::TestConnectionActorId(), testConnection);
}

Expand Down
Loading

0 comments on commit f089705

Please sign in to comment.