From 041f4a3c1aaab437582b4a2c8857ce93ab0f34cf Mon Sep 17 00:00:00 2001 From: Yuriy Kaminskiy Date: Fri, 18 Oct 2024 18:03:34 +0300 Subject: [PATCH 1/8] change DqJoin Flags from TCoAtomList into TCoNameValueTupleList --- .../yql/core/expr_nodes/yql_expr_nodes.json | 4 +- .../yql/core/type_ann/type_ann_join.cpp | 3 +- ydb/library/yql/core/yql_expr_constraint.cpp | 9 +-- .../yql/dq/expr_nodes/dq_expr_nodes.json | 2 +- ydb/library/yql/dq/opt/dq_opt_join.cpp | 71 +++++++++++-------- ydb/library/yql/dq/type_ann/dq_type_ann.cpp | 15 ++-- .../common/mkql/yql_provider_mkql.cpp | 4 +- .../provider/yql_dq_datasink_constraints.cpp | 9 +-- 8 files changed, 71 insertions(+), 46 deletions(-) diff --git a/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json b/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json index cc4becb1e1a6..1fcf2eb1ac39 100644 --- a/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json +++ b/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json @@ -1626,7 +1626,7 @@ {"Index": 6, "Name": "RightRenames", "Type": "TCoAtomList"}, {"Index": 7, "Name": "LeftKeysColumnNames", "Type": "TCoAtomList"}, {"Index": 8, "Name": "RightKeysColumnNames", "Type": "TCoAtomList"}, - {"Index": 9, "Name": "Flags", "Type": "TCoAtomList"} + {"Index": 9, "Name": "Flags", "Type": "TCoNameValueTupleList"} ] }, { @@ -1642,7 +1642,7 @@ {"Index": 5, "Name": "RightRenames", "Type": "TCoAtomList"}, {"Index": 6, "Name": "LeftKeysColumnNames", "Type": "TCoAtomList"}, {"Index": 7, "Name": "RightKeysColumnNames", "Type": "TCoAtomList"}, - {"Index": 8, "Name": "Flags", "Type": "TCoAtomList"} + {"Index": 8, "Name": "Flags", "Type": "TCoNameValueTupleList"} ] }, { diff --git a/ydb/library/yql/core/type_ann/type_ann_join.cpp b/ydb/library/yql/core/type_ann/type_ann_join.cpp index 930938ece32a..faca9111d4c6 100644 --- a/ydb/library/yql/core/type_ann/type_ann_join.cpp +++ b/ydb/library/yql/core/type_ann/type_ann_join.cpp @@ -617,7 +617,8 @@ namespace NTypeAnnImpl { } for (auto i = 0U; i < input->Tail().ChildrenSize(); ++i) { - if (const auto& flag = *input->Tail().Child(i); !flag.IsAtom({"LeftAny", "RightAny"})) { + const auto& flag = *input->Tail().Child(i)->Child(0); + if (!flag.IsAtom({"LeftAny","RightAny"})) { ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(flag.Pos()), TStringBuilder() << "Unsupported grace join option: " << flag.Content())); return IGraphTransformer::TStatus::Error; } diff --git a/ydb/library/yql/core/yql_expr_constraint.cpp b/ydb/library/yql/core/yql_expr_constraint.cpp index cb28ea1c8ec7..c899b034f70d 100644 --- a/ydb/library/yql/core/yql_expr_constraint.cpp +++ b/ydb/library/yql/core/yql_expr_constraint.cpp @@ -2266,12 +2266,13 @@ class TCallableConstraintTransformer : public TCallableTransformerBaseGetConstraint(); const TUniqueConstraintNode* rUnique = rightInput->GetConstraint(); diff --git a/ydb/library/yql/dq/expr_nodes/dq_expr_nodes.json b/ydb/library/yql/dq/expr_nodes/dq_expr_nodes.json index e594d028c15c..cc4dfa601cf0 100644 --- a/ydb/library/yql/dq/expr_nodes/dq_expr_nodes.json +++ b/ydb/library/yql/dq/expr_nodes/dq_expr_nodes.json @@ -44,7 +44,7 @@ "Match": {"Type": "Callable", "Name": "DqJoin"}, "Children": [ {"Index": 8, "Name": "JoinAlgo", "Type": "TCoAtom"}, - {"Index": 9, "Name": "Flags", "Type": "TCoAtomList", "Optional": true} + {"Index": 9, "Name": "Flags", "Type": "TCoNameValueTupleList", "Optional": true} ] }, { diff --git a/ydb/library/yql/dq/opt/dq_opt_join.cpp b/ydb/library/yql/dq/opt/dq_opt_join.cpp index 0f235ee41205..29184d55ea86 100644 --- a/ydb/library/yql/dq/opt/dq_opt_join.cpp +++ b/ydb/library/yql/dq/opt/dq_opt_join.cpp @@ -237,11 +237,20 @@ TMaybe BuildDqJoin(const TCoEquiJoinTuple& joinTuple, .Done(); return TJoinInputDesc(Nothing(), dqJoin, std::move(resultKeys)); } else { - TExprNode::TListType flags; - if (leftAny) - flags.emplace_back(ctx.NewAtom(joinTuple.Pos(), "LeftAny", TNodeFlags::Default)); - if (rightAny) - flags.emplace_back(ctx.NewAtom(joinTuple.Pos(), "RightAny", TNodeFlags::Default)); + TVector flags; + + if (leftAny) { + flags.push_back( + Build(ctx, joinTuple.Pos()) + .Name().Build("LeftAny"sv) + .Done()); + } + if (rightAny) { + flags.push_back( + Build(ctx, joinTuple.Pos()) + .Name().Build("RightAny"sv) + .Done()); + } auto dqJoin = Build(ctx, joinTuple.Pos()) .LeftInput(BuildDqJoinInput(ctx, joinTuple.Pos(), left->Input, leftJoinKeys, false)) @@ -522,20 +531,26 @@ TExprBase DqRewriteRightJoinToLeft(const TExprBase node, TExprContext& ctx) { return node; } - TMaybeNode newFlags; - if (TMaybeNode flags = dqJoin.Flags()) { - auto flagsBuilder = Build(ctx, flags.Cast().Pos()); + TMaybeNode newFlags; + if (TMaybeNode flags = dqJoin.Flags()) { + TVector list; for (auto flag: flags.Cast()) { TStringBuf tail; - if( flag.Value().AfterPrefix("Left", tail)) { - flagsBuilder.Add().Value("Right" + TString(tail)).Build(); - } else if ( flag.Value().AfterPrefix("Right", tail)) { - flagsBuilder.Add().Value("Left" + TString(tail)).Build(); + if( flag.Name().Value().AfterPrefix("Left", tail)) { + list.push_back(Build(ctx, flag.Pos()) + .Name().Build("Right" + TString(tail)) + .Done()); + } else if ( flag.Name().Value().AfterPrefix("Right", tail)) { + list.push_back(Build(ctx, flag.Pos()) + .Name().Build("Left" + TString(tail)) + .Done()); } else { - flagsBuilder.Add(flag); + list.push_back(flag); } } - newFlags = flagsBuilder.Done(); + newFlags = Build(ctx, flags.Cast().Pos()) + .Add(list) + .Done(); } auto joinKeysBuilder = Build(ctx, dqJoin.Pos()); @@ -640,15 +655,13 @@ TExprBase DqBuildPhyJoin(const TDqJoin& join, bool pushLeftStage, TExprContext& return join; } - TExprNode::TListType flags; if (const auto maybeFlags = join.Flags()) { - flags = maybeFlags.Cast().Ref().ChildrenList(); - } - - for (auto& flag : flags) { - if (flag->IsAtom("LeftAny") || flag->IsAtom("RightAny")) { - ctx.AddError(TIssue(ctx.GetPosition(join.Ptr()->Pos()), "ANY join kind is not currently supported")); - return join; + for (const auto& flag : maybeFlags.Cast()) { + auto name = flag.Name().Value(); + if (name == "LeftAny"sv || name == "RightAny"sv) { + ctx.AddError(TIssue(ctx.GetPosition(join.Ptr()->Pos()), "ANY join kind is not currently supported")); + return join; + } } } @@ -1514,13 +1527,15 @@ TExprBase DqBuildHashJoin(const TDqJoin& join, EHashJoinMode mode, TExprContext& [[fallthrough]]; case EHashJoinMode::Dict: { bool leftAny = false, rightAny = false; - for (auto& flag : flags) { - if (flag->IsAtom("LeftAny")) { + TExprNode::TListType dictFlags; + for (const auto& flag : flags) { + const auto name = flag->Child(0)->Content(); + if (name == "LeftAny"sv) { leftAny = true; - flag = ctx.NewAtom(flag->Pos(), "LeftUnique", TNodeFlags::Default); - } else if (flag->IsAtom("RightAny")) { + dictFlags.push_back(ctx.NewAtom(flag->Pos(), "LeftUnique", TNodeFlags::Default)); + } else if (name == "RightAny"sv) { rightAny = true; - flag = ctx.NewAtom(flag->Pos(), "RightUnique", TNodeFlags::Default); + dictFlags.push_back(ctx.NewAtom(flag->Pos(), "RightUnique", TNodeFlags::Default)); } } @@ -1538,7 +1553,7 @@ TExprBase DqBuildHashJoin(const TDqJoin& join, EHashJoinMode mode, TExprContext& .Arg(0, "left") .Arg(1, "right") .Add(2, join.JoinType().Ptr()) - .List(3).Add(std::move(flags)).Seal() + .List(3).Add(std::move(dictFlags)).Seal() .Seal() .Seal() .Seal() diff --git a/ydb/library/yql/dq/type_ann/dq_type_ann.cpp b/ydb/library/yql/dq/type_ann/dq_type_ann.cpp index 9241eb4c83ed..a102ca1a59b8 100644 --- a/ydb/library/yql/dq/type_ann/dq_type_ann.cpp +++ b/ydb/library/yql/dq/type_ann/dq_type_ann.cpp @@ -444,7 +444,8 @@ const TStructExprType* GetDqJoinResultType(const TExprNode::TPtr& input, bool st } } - if (!EnsureAtom(*input->Child(TDqJoin::idx_JoinType), ctx)) { + const auto& joinType = *input->Child(TDqJoin::idx_JoinType); + if (!EnsureAtom(joinType, ctx)) { return nullptr; } @@ -503,11 +504,17 @@ const TStructExprType* GetDqJoinResultType(const TExprNode::TPtr& input, bool st : TStringBuf(""); if (input->ChildrenSize() > 9U) { - for (auto i = 0U; i < input->Tail().ChildrenSize(); ++i) { - if (const auto& flag = *input->Tail().Child(i); !flag.IsAtom({"LeftAny", "RightAny"})) { - ctx.AddError(TIssue(ctx.GetPosition(flag.Pos()), TStringBuilder() << "Unsupported DQ join option: " << flag.Content())); + auto&& flags = *input->Child(TDqJoin::idx_Flags); + for (ui32 i = 0; i < flags.ChildrenSize(); ++i) { + auto&& flag = *input->Tail().Child(i); + if ( !EnsureTupleOfAtoms(flag, ctx) || !EnsureTupleMinSize(flag, 1, ctx)) return nullptr; + auto&& name = *flag.Child(TCoNameValueTuple::idx_Name); + if (name.IsAtom({"LeftAny", "RightAny"})) { + continue; } + ctx.AddError(TIssue(ctx.GetPosition(flag.Pos()), TStringBuilder() << "DqJoin: Unsupported DQ join option: " << name.Content())); + return nullptr; } } diff --git a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp index f2d22ea28d8d..9b849302a7ba 100644 --- a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp +++ b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp @@ -1700,9 +1700,9 @@ TMkqlCommonCallableCompiler::TShared::TShared() { auto anyJoinSettings = EAnyJoinSettings::None; node.Tail().ForEachChild([&](const TExprNode& flag) { - if (flag.IsAtom("LeftAny")) + if (flag.Child(0)->IsAtom("LeftAny")) anyJoinSettings = EAnyJoinSettings::Right == anyJoinSettings ? EAnyJoinSettings::Both : EAnyJoinSettings::Left; - else if (flag.IsAtom("RightAny")) + else if (flag.Child(0)->IsAtom("RightAny")) anyJoinSettings = EAnyJoinSettings::Left == anyJoinSettings ? EAnyJoinSettings::Both : EAnyJoinSettings::Right; }); diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasink_constraints.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_datasink_constraints.cpp index 604a8ab8a5f9..b9acbd55588d 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_datasink_constraints.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasink_constraints.cpp @@ -144,12 +144,13 @@ class TDqDataSinkConstraintTransformer : public TVisitorTransformerBase { bool leftAny = false, rightAny = false; if (const auto maybeJoin = join.Maybe()) { if (const auto maybeFlags = maybeJoin.Cast().Flags()) { - maybeFlags.Cast().Ref().ForEachChild([&](const TExprNode& flag) { - if (flag.IsAtom("LeftAny")) + for (const auto &flag: maybeFlags.Cast()) { + const auto name = flag.Name().Value(); + if (name == "LeftAny"sv) leftAny = true; - else if (flag.IsAtom("RightAny")) + else if (name == "RightAny"sv) rightAny = true; - }); + } } } From ed8589d3e14e4f6bea13690704eccb4a0655d9ab Mon Sep 17 00:00:00 2001 From: Yuriy Kaminskiy Date: Fri, 18 Oct 2024 18:07:03 +0300 Subject: [PATCH 2/8] pass streamlookup() join parameters --- ydb/library/yql/core/yql_join.cpp | 18 +++++++-- ydb/library/yql/core/yql_join.h | 1 + ydb/library/yql/dq/opt/dq_opt_join.cpp | 8 +++- ydb/library/yql/dq/type_ann/dq_type_ann.cpp | 12 ++++++ .../providers/dq/opt/physical_optimize.cpp | 38 +++++++++++++++++-- .../dq/planner/execution_planner.cpp | 6 ++- ydb/library/yql/sql/v1/join.cpp | 6 ++- ydb/library/yql/sql/v1/source.h | 1 + ydb/library/yql/sql/v1/sql_select.cpp | 1 + 9 files changed, 81 insertions(+), 10 deletions(-) diff --git a/ydb/library/yql/core/yql_join.cpp b/ydb/library/yql/core/yql_join.cpp index 044e49873e84..ed79383ecfe7 100644 --- a/ydb/library/yql/core/yql_join.cpp +++ b/ydb/library/yql/core/yql_join.cpp @@ -319,8 +319,16 @@ namespace { } } else if (option.IsAtom("forceSortedMerge") || option.IsAtom("forceStreamLookup")) { - if (!EnsureTupleSize(*child, 1, ctx)) { - return IGraphTransformer::TStatus::Error; + if (option.IsAtom("forceStreamLookup")) { + if (child->ChildrenSize() % 2 == 0) { + ctx.AddError(TIssue(ctx.GetPosition(option.Pos()), TStringBuilder() << + "Odd number of options, streamlookup() supports only KEY VALUE... pairs")); + return IGraphTransformer::TStatus::Error; + } + } else { + if (!EnsureTupleSize(*child, 1, ctx)) { + return IGraphTransformer::TStatus::Error; + } } if (hasJoinStrategyHint) { ctx.AddError(TIssue(ctx.GetPosition(option.Pos()), TStringBuilder() << @@ -1352,8 +1360,12 @@ TEquiJoinLinkSettings GetEquiJoinLinkSettings(const TExprNode& linkSettings) { result.ForceSortedMerge = HasSetting(linkSettings, "forceSortedMerge"); - if (HasSetting(linkSettings, "forceStreamLookup")) { + if(auto streamlookup = GetSetting(linkSettings, "forceStreamLookup")) { result.JoinAlgo = EJoinAlgoType::StreamLookupJoin; + auto size = streamlookup->ChildrenSize(); + for (decltype(size) i = 1; i < size; ++i) { + result.Options.push_back(TString(streamlookup->Child(i)->Content())); + } } if (HasSetting(linkSettings, "compact")) { diff --git a/ydb/library/yql/core/yql_join.h b/ydb/library/yql/core/yql_join.h index 81f57f2fbb71..0c3267e0750f 100644 --- a/ydb/library/yql/core/yql_join.h +++ b/ydb/library/yql/core/yql_join.h @@ -147,6 +147,7 @@ struct TEquiJoinLinkSettings { EJoinAlgoType JoinAlgo = EJoinAlgoType::Undefined; // JOIN implementation may ignore this flags if SortedMerge strategy is not supported bool ForceSortedMerge = false; + TVector Options; bool Compact = false; }; diff --git a/ydb/library/yql/dq/opt/dq_opt_join.cpp b/ydb/library/yql/dq/opt/dq_opt_join.cpp index 29184d55ea86..41f10ff2ddf9 100644 --- a/ydb/library/yql/dq/opt/dq_opt_join.cpp +++ b/ydb/library/yql/dq/opt/dq_opt_join.cpp @@ -219,7 +219,7 @@ TMaybe BuildDqJoin(const TCoEquiJoinTuple& joinTuple, rightJoinKeyNames.emplace_back(rightColumnName); } - if (EHashJoinMode::Off == mode || EHashJoinMode::Map == mode || !(leftAny || rightAny)) { + if (EHashJoinMode::Off == mode || EHashJoinMode::Map == mode || !(leftAny || rightAny || !linkSettings.Options.empty())) { auto dqJoin = Build(ctx, joinTuple.Pos()) .LeftInput(BuildDqJoinInput(ctx, joinTuple.Pos(), left->Input, leftJoinKeys, leftAny)) .LeftLabel(leftTableLabel) @@ -251,6 +251,12 @@ TMaybe BuildDqJoin(const TCoEquiJoinTuple& joinTuple, .Name().Build("RightAny"sv) .Done()); } + for (ui32 i = 0; i + 1 < linkSettings.Options.size(); i += 2) + flags.push_back( + Build(ctx, joinTuple.Pos()) + .Name().Build(linkSettings.Options[i]) + .Value().Build(linkSettings.Options[i + 1]) + .Done()); auto dqJoin = Build(ctx, joinTuple.Pos()) .LeftInput(BuildDqJoinInput(ctx, joinTuple.Pos(), left->Input, leftJoinKeys, false)) diff --git a/ydb/library/yql/dq/type_ann/dq_type_ann.cpp b/ydb/library/yql/dq/type_ann/dq_type_ann.cpp index a102ca1a59b8..b9d4c4f5ed14 100644 --- a/ydb/library/yql/dq/type_ann/dq_type_ann.cpp +++ b/ydb/library/yql/dq/type_ann/dq_type_ann.cpp @@ -502,6 +502,11 @@ const TStructExprType* GetDqJoinResultType(const TExprNode::TPtr& input, bool st auto rightTableLabel = join.RightLabel().Maybe() ? join.RightLabel().Cast().Value() : TStringBuf(""); + const auto& joinAlgo = *input->Child(TDqJoin::idx_JoinAlgo); + if (joinAlgo.IsAtom("StreamLooup") && !joinType.IsAtom("Left")) { + ctx.AddError(TIssue(ctx.GetPosition(joinType.Pos()), TStringBuilder() << "DqJoin: Unsupported DQ join type " << joinType.Content() << " for join algorithm " << joinAlgo.Content())); + return nullptr; + } if (input->ChildrenSize() > 9U) { auto&& flags = *input->Child(TDqJoin::idx_Flags); @@ -510,6 +515,13 @@ const TStructExprType* GetDqJoinResultType(const TExprNode::TPtr& input, bool st if ( !EnsureTupleOfAtoms(flag, ctx) || !EnsureTupleMinSize(flag, 1, ctx)) return nullptr; auto&& name = *flag.Child(TCoNameValueTuple::idx_Name); + if (name.IsAtom({"TTL", "MaxCachedRows", "MaxDelayedRows"})) { + if (joinAlgo.IsAtom("StreamLookup")) { + if (!EnsureTupleSize(flag, 2, ctx)) + return nullptr; + continue; + } + } else if (name.IsAtom({"LeftAny", "RightAny"})) { continue; } diff --git a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp index e5dbdb423845..fe772f77a4b0 100644 --- a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp +++ b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp @@ -252,6 +252,38 @@ class TDqsPhysicalOptProposalTransformer : public TOptimizeTransformerBase { if (!left) { return node; } + TExprNode::TPtr ttl = nullptr; + TExprNode::TPtr maxCachedRows = nullptr; + TExprNode::TPtr maxDelayedRows = nullptr; + if (const auto maybeFlags = join.Flags()) { + for (auto&& flag: maybeFlags.Cast()) { + auto&& name = flag.Name().Value(); + if (name == "TTL"sv) { + YQL_ENSURE(flag.Ref().ChildrenSize() == 2); + ttl = flag.Ref().Child(1); + } else if (name == "MaxCachedRows"sv) { + YQL_ENSURE(flag.Ref().ChildrenSize() == 2); + maxCachedRows = flag.Ref().Child(1); + } else if (name == "MaxDelayedRows"sv) { + YQL_ENSURE(flag.Ref().ChildrenSize() == 2); + maxDelayedRows = flag.Ref().Child(1); + } else if (name == "RightAny"sv) { + YQL_ENSURE(flag.Ref().ChildrenSize() == 1); + continue; + } else { + Y_DEBUG_ABORT("Impossible flag"); // should have been checked by dq_type_ann + } + } + } + if (!ttl) { + ttl = ctx.NewAtom(pos, 300); + } + if (!maxCachedRows) { + maxCachedRows = ctx.NewAtom(pos, 1'000'000); + } + if (!maxDelayedRows) { + maxDelayedRows = ctx.NewAtom(pos, 1'000'000); + } auto cn = Build(ctx, pos) .Output(left.Output().Cast()) .LeftLabel(join.LeftLabel().Cast()) @@ -261,9 +293,9 @@ class TDqsPhysicalOptProposalTransformer : public TOptimizeTransformerBase { .JoinType(join.JoinType()) .LeftJoinKeyNames(join.LeftJoinKeyNames()) .RightJoinKeyNames(join.RightJoinKeyNames()) - .TTL(ctx.NewAtom(pos, 300)) //TODO configure me - .MaxCachedRows(ctx.NewAtom(pos, 1'000'000)) //TODO configure me - .MaxDelayedRows(ctx.NewAtom(pos, 1'000'000)) //Configure me + .TTL(ttl) + .MaxCachedRows(maxCachedRows) + .MaxDelayedRows(maxDelayedRows) .Done(); auto lambda = Build(ctx, pos) diff --git a/ydb/library/yql/providers/dq/planner/execution_planner.cpp b/ydb/library/yql/providers/dq/planner/execution_planner.cpp index 6c611db479e1..166b5c78177e 100644 --- a/ydb/library/yql/providers/dq/planner/execution_planner.cpp +++ b/ydb/library/yql/providers/dq/planner/execution_planner.cpp @@ -613,8 +613,10 @@ namespace NYql::NDqs { const auto narrowOutputRowType = GetSeqItemType(streamLookup.Ptr()->GetTypeAnn()); Y_ABORT_UNLESS(narrowOutputRowType->GetKind() == ETypeAnnotationKind::Struct); settings.SetNarrowOutputRowType(NYql::NCommon::GetSerializedTypeAnnotation(narrowOutputRowType)); - settings.SetCacheLimit(1'000'000); //TODO configure me - settings.SetCacheTtlSeconds(60); //TODO configure me + // FIXME RESOLVE BEFORE MERGE + settings.SetCacheLimit(atoll(streamLookup.MaxCachedRows().StringValue().c_str())); + settings.SetCacheTtlSeconds(atoll(streamLookup.TTL().StringValue().c_str())); + settings.SetMaxDelay(atoll(streamLookup.MaxDelay().StringValue().c_str())); const auto inputRowType = GetSeqItemType(streamLookup.Output().Stage().Program().Ref().GetTypeAnn()); const auto outputRowType = GetSeqItemType(stage.Program().Args().Arg(inputIndex).Ref().GetTypeAnn()); diff --git a/ydb/library/yql/sql/v1/join.cpp b/ydb/library/yql/sql/v1/join.cpp index b23771b3df15..fa0c281096f5 100644 --- a/ydb/library/yql/sql/v1/join.cpp +++ b/ydb/library/yql/sql/v1/join.cpp @@ -502,7 +502,11 @@ class TEquiJoin: public TJoinBase { if (TJoinLinkSettings::EStrategy::SortedMerge == descr.LinkSettings.Strategy) { linkOptions = L(linkOptions, Q(Y(Q("forceSortedMerge")))); } else if (TJoinLinkSettings::EStrategy::StreamLookup == descr.LinkSettings.Strategy) { - linkOptions = L(linkOptions, Q(Y(Q("forceStreamLookup")))); + auto streamlookup = Y(Q("forceStreamLookup")); + for (auto&& option: descr.LinkSettings.Values) { + streamlookup = L(streamlookup, Q(option)); + } + linkOptions = L(linkOptions, Q(streamlookup)); } else if (TJoinLinkSettings::EStrategy::ForceMap == descr.LinkSettings.Strategy) { linkOptions = L(linkOptions, Q(Y(Q("join_algo"), Q("MapJoin")))); } else if (TJoinLinkSettings::EStrategy::ForceGrace == descr.LinkSettings.Strategy) { diff --git a/ydb/library/yql/sql/v1/source.h b/ydb/library/yql/sql/v1/source.h index 35129fffbb83..7bcda0af7552 100644 --- a/ydb/library/yql/sql/v1/source.h +++ b/ydb/library/yql/sql/v1/source.h @@ -172,6 +172,7 @@ namespace NSQLTranslationV1 { ForceGrace }; EStrategy Strategy = EStrategy::Default; + TVector Values; bool Compact = false; }; diff --git a/ydb/library/yql/sql/v1/sql_select.cpp b/ydb/library/yql/sql/v1/sql_select.cpp index 4a06f8e51b14..c27aa8f126ab 100644 --- a/ydb/library/yql/sql/v1/sql_select.cpp +++ b/ydb/library/yql/sql/v1/sql_select.cpp @@ -43,6 +43,7 @@ bool CollectJoinLinkSettings(TPosition pos, TJoinLinkSettings& linkSettings, TCo if (TJoinLinkSettings::EStrategy::Default == linkSettings.Strategy) { linkSettings.Strategy = newStrategy; + linkSettings.Values = hint.Values; } else if (newStrategy == linkSettings.Strategy) { ctx.Error() << "Duplicate join strategy hint"; return false; From c124abd73b7da1ed255fe51086c4c0473858ead2 Mon Sep 17 00:00:00 2001 From: Yuriy Kaminskiy Date: Fri, 18 Oct 2024 18:07:14 +0300 Subject: [PATCH 3/8] Reject unsupported streamlookup variants --- ydb/library/yql/dq/opt/dq_opt_join.cpp | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/ydb/library/yql/dq/opt/dq_opt_join.cpp b/ydb/library/yql/dq/opt/dq_opt_join.cpp index 41f10ff2ddf9..c85d51f5b1c4 100644 --- a/ydb/library/yql/dq/opt/dq_opt_join.cpp +++ b/ydb/library/yql/dq/opt/dq_opt_join.cpp @@ -155,6 +155,13 @@ TMaybe BuildDqJoin(const TCoEquiJoinTuple& joinTuple, } TStringBuf joinType = joinTuple.Type().Value(); + + if (linkSettings.JoinAlgo == EJoinAlgoType::StreamLookupJoin) { + YQL_ENSURE(joinType == TStringBuf("Left"), "Streamlookup supports only LEFT JOIN ... ANY"); + YQL_ENSURE(!leftAny, "Streamlookup ANY LEFT join is not implemented"); + YQL_ENSURE(rightAny, "Streamlookup supports only LEFT JOIN ... ANY"); + } + TSet> resultKeys; if (joinType != TStringBuf("RightOnly") && joinType != TStringBuf("RightSemi")) { resultKeys.insert(left->Keys.begin(), left->Keys.end()); From 60f010a48d1255b600ba1439e123e2c0f575c759 Mon Sep 17 00:00:00 2001 From: Yuriy Kaminskiy Date: Fri, 18 Oct 2024 18:32:08 +0300 Subject: [PATCH 4/8] fix rebase error --- ydb/library/yql/providers/dq/planner/execution_planner.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/library/yql/providers/dq/planner/execution_planner.cpp b/ydb/library/yql/providers/dq/planner/execution_planner.cpp index 166b5c78177e..1f29b2b5ede2 100644 --- a/ydb/library/yql/providers/dq/planner/execution_planner.cpp +++ b/ydb/library/yql/providers/dq/planner/execution_planner.cpp @@ -616,7 +616,7 @@ namespace NYql::NDqs { // FIXME RESOLVE BEFORE MERGE settings.SetCacheLimit(atoll(streamLookup.MaxCachedRows().StringValue().c_str())); settings.SetCacheTtlSeconds(atoll(streamLookup.TTL().StringValue().c_str())); - settings.SetMaxDelay(atoll(streamLookup.MaxDelay().StringValue().c_str())); + settings.SetMaxDelayedRows(atoll(streamLookup.MaxDelayedRows().StringValue().c_str())); const auto inputRowType = GetSeqItemType(streamLookup.Output().Stage().Program().Ref().GetTypeAnn()); const auto outputRowType = GetSeqItemType(stage.Program().Args().Arg(inputIndex).Ref().GetTypeAnn()); From 62f0faa81b41b764e1c707299e5238e97d04290b Mon Sep 17 00:00:00 2001 From: Yuriy Kaminskiy Date: Fri, 18 Oct 2024 18:41:18 +0300 Subject: [PATCH 5/8] fix typo --- ydb/library/yql/dq/type_ann/dq_type_ann.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/library/yql/dq/type_ann/dq_type_ann.cpp b/ydb/library/yql/dq/type_ann/dq_type_ann.cpp index b9d4c4f5ed14..ec04a85d02ee 100644 --- a/ydb/library/yql/dq/type_ann/dq_type_ann.cpp +++ b/ydb/library/yql/dq/type_ann/dq_type_ann.cpp @@ -516,7 +516,7 @@ const TStructExprType* GetDqJoinResultType(const TExprNode::TPtr& input, bool st return nullptr; auto&& name = *flag.Child(TCoNameValueTuple::idx_Name); if (name.IsAtom({"TTL", "MaxCachedRows", "MaxDelayedRows"})) { - if (joinAlgo.IsAtom("StreamLookup")) { + if (joinAlgo.IsAtom("StreamLookupJoin")) { if (!EnsureTupleSize(flag, 2, ctx)) return nullptr; continue; From 165c892742a1cb1826d37df9fba4b7acdd9e8c14 Mon Sep 17 00:00:00 2001 From: Yuriy Kaminskiy Date: Fri, 18 Oct 2024 20:59:44 +0300 Subject: [PATCH 6/8] fix ut --- ydb/library/yql/core/ut/yql_expr_constraint_ut.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/library/yql/core/ut/yql_expr_constraint_ut.cpp b/ydb/library/yql/core/ut/yql_expr_constraint_ut.cpp index df3fdef8e5b3..253507ef1018 100644 --- a/ydb/library/yql/core/ut/yql_expr_constraint_ut.cpp +++ b/ydb/library/yql/core/ut/yql_expr_constraint_ut.cpp @@ -1939,7 +1939,7 @@ Y_UNIT_TEST_SUITE(TYqlExprConstraints) { (let flow1 (ExpandMap (ToFlow list1) (lambda '(item) (Member item 'key1) (Member item 'subkey1) (Member item 'value1)))) (let flow2 (ExpandMap (ToFlow list2) (lambda '(item) (Member item 'key2) (Member item 'subkey2) (Member item 'value2)))) - (let join (GraceJoinCore flow1 flow2 'Inner '('0 '1) '('0 '1) '('0 '0 '1 '1 '2 '2) '('0 '3 '1 '4 '2 '5) '() '() '('LeftAny 'RightAny))) + (let join (GraceJoinCore flow1 flow2 'Inner '('0 '1) '('0 '1) '('0 '0 '1 '1 '2 '2) '('0 '3 '1 '4 '2 '5) '() '() '('("LeftAny") '('"RightAny")))) (let list (Collect (NarrowMap join (lambda '(lk ls lv rk rs rv) (AsStruct '('lk lk) '('ls ls) '('lv lv) '('rk rk) '('rs rs) '('rv rv)))))) (let res_sink (DataSink 'yt (quote plato))) From 8415564abc110f0f11eb4ee1cacd61e4f81622ee Mon Sep 17 00:00:00 2001 From: Yuriy Kaminskiy Date: Fri, 18 Oct 2024 21:00:26 +0300 Subject: [PATCH 7/8] fix index out of range --- ydb/library/yql/dq/type_ann/dq_type_ann.cpp | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/ydb/library/yql/dq/type_ann/dq_type_ann.cpp b/ydb/library/yql/dq/type_ann/dq_type_ann.cpp index ec04a85d02ee..f5207fde90d5 100644 --- a/ydb/library/yql/dq/type_ann/dq_type_ann.cpp +++ b/ydb/library/yql/dq/type_ann/dq_type_ann.cpp @@ -502,16 +502,17 @@ const TStructExprType* GetDqJoinResultType(const TExprNode::TPtr& input, bool st auto rightTableLabel = join.RightLabel().Maybe() ? join.RightLabel().Cast().Value() : TStringBuf(""); + if (input->ChildrenSize() > TDqJoin::idx_JoinAlgo) { const auto& joinAlgo = *input->Child(TDqJoin::idx_JoinAlgo); if (joinAlgo.IsAtom("StreamLooup") && !joinType.IsAtom("Left")) { ctx.AddError(TIssue(ctx.GetPosition(joinType.Pos()), TStringBuilder() << "DqJoin: Unsupported DQ join type " << joinType.Content() << " for join algorithm " << joinAlgo.Content())); return nullptr; } - if (input->ChildrenSize() > 9U) { - auto&& flags = *input->Child(TDqJoin::idx_Flags); + if (input->ChildrenSize() > TDqJoin::idx_Flags) { + auto&& flags = input->Tail(); for (ui32 i = 0; i < flags.ChildrenSize(); ++i) { - auto&& flag = *input->Tail().Child(i); + auto&& flag = *flags.Child(i); if ( !EnsureTupleOfAtoms(flag, ctx) || !EnsureTupleMinSize(flag, 1, ctx)) return nullptr; auto&& name = *flag.Child(TCoNameValueTuple::idx_Name); @@ -529,6 +530,7 @@ const TStructExprType* GetDqJoinResultType(const TExprNode::TPtr& input, bool st return nullptr; } } + } return GetDqJoinResultType(join.Pos(), *leftStructType, leftTableLabel, *rightStructType, rightTableLabel, join.JoinType(), join.JoinKeys(), ctx); From 67706a7b655519446f603140a7d4c44460562fcc Mon Sep 17 00:00:00 2001 From: Yuriy Kaminskiy Date: Sat, 19 Oct 2024 11:17:27 +0300 Subject: [PATCH 8/8] fix typo --- ydb/library/yql/core/ut/yql_expr_constraint_ut.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/library/yql/core/ut/yql_expr_constraint_ut.cpp b/ydb/library/yql/core/ut/yql_expr_constraint_ut.cpp index 253507ef1018..0446273745c1 100644 --- a/ydb/library/yql/core/ut/yql_expr_constraint_ut.cpp +++ b/ydb/library/yql/core/ut/yql_expr_constraint_ut.cpp @@ -1939,7 +1939,7 @@ Y_UNIT_TEST_SUITE(TYqlExprConstraints) { (let flow1 (ExpandMap (ToFlow list1) (lambda '(item) (Member item 'key1) (Member item 'subkey1) (Member item 'value1)))) (let flow2 (ExpandMap (ToFlow list2) (lambda '(item) (Member item 'key2) (Member item 'subkey2) (Member item 'value2)))) - (let join (GraceJoinCore flow1 flow2 'Inner '('0 '1) '('0 '1) '('0 '0 '1 '1 '2 '2) '('0 '3 '1 '4 '2 '5) '() '() '('("LeftAny") '('"RightAny")))) + (let join (GraceJoinCore flow1 flow2 'Inner '('0 '1) '('0 '1) '('0 '0 '1 '1 '2 '2) '('0 '3 '1 '4 '2 '5) '() '() '('('"LeftAny") '('"RightAny")))) (let list (Collect (NarrowMap join (lambda '(lk ls lv rk rs rv) (AsStruct '('lk lk) '('ls ls) '('lv lv) '('rk rk) '('rs rs) '('rv rv)))))) (let res_sink (DataSink 'yt (quote plato)))