Skip to content

Commit

Permalink
Mark mutable_id for YT writes (ydb-platform#1744)
Browse files Browse the repository at this point in the history
* init

* fix

* canonize

* canonize
  • Loading branch information
vitstn authored Feb 10, 2024
1 parent b1f681e commit 590c21c
Show file tree
Hide file tree
Showing 69 changed files with 3,121 additions and 3,076 deletions.
7 changes: 7 additions & 0 deletions ydb/library/yql/providers/yt/provider/yql_yt_datasink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,11 @@ class TYtDataSink : public TDataProviderBase {
ctx);
res = ctx.ChangeChild(*res, TYtWriteTable::idx_Settings, std::move(settings));
}
auto mutationId = ++NextMutationId_;
res = ctx.ChangeChild(*res, TYtWriteTable::idx_Settings,
NYql::AddSetting(*res->Child(TYtWriteTable::idx_Settings),
EYtSettingType::MutationId,
ctx.NewAtom(res->Child(TYtWriteTable::idx_Settings)->Pos(), ToString(mutationId)), ctx));
if (State_->Configuration->UseSystemColumns.Get().GetOrElse(DEFAULT_USE_SYS_COLUMNS)) {
res = ctx.ChangeChild(*res, TYtWriteTable::idx_Content,
ctx.Builder(node->Pos())
Expand All @@ -268,6 +273,7 @@ class TYtDataSink : public TDataProviderBase {
void Reset() final {
TDataProviderBase::Reset();
State_->Reset();
NextMutationId_ = 0;
}

bool CanExecute(const TExprNode& node) override {
Expand Down Expand Up @@ -567,6 +573,7 @@ class TYtDataSink : public TDataProviderBase {
}

private:
ui32 NextMutationId_ = 0;
TYtState::TPtr State_;
TLazyInitHolder<IGraphTransformer> IntentDeterminationTransformer_;
TLazyInitHolder<TVisitorTransformerBase> TypeAnnotationTransformer_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1435,6 +1435,7 @@ class TYtDataSinkTypeAnnotationTransformer : public TVisitorTransformerBase {
| EYtSettingType::PrimaryMedium
| EYtSettingType::Expiration
| EYtSettingType::MonotonicKeys
| EYtSettingType::MutationId
, ctx))
{
return TStatus::Error;
Expand Down Expand Up @@ -1712,6 +1713,7 @@ class TYtDataSinkTypeAnnotationTransformer : public TVisitorTransformerBase {
| EYtSettingType::PrimaryMedium
| EYtSettingType::Expiration
| EYtSettingType::MonotonicKeys
| EYtSettingType::MutationId
, ctx))
{
return TStatus::Error;
Expand Down
14 changes: 14 additions & 0 deletions ydb/library/yql/providers/yt/provider/yql_yt_op_settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,20 @@ bool ValidateSettings(const TExprNode& settingsNode, EYtSettingTypes accepted, T
return false;
}
break;
case EYtSettingType::MutationId:
if (!EnsureTupleSize(*setting, 2, ctx)) {
return false;
}
if (!EnsureAtom(*setting->Child(1), ctx)) {
return false;
}
ui32 mutationId;
if (!TryFromString(setting->Child(1)->Content(), mutationId)) {
ctx.AddError(TIssue(ctx.GetPosition(setting->Child(1)->Pos()), TStringBuilder()
<< "Expected a number, but got: " << TString{setting->Child(1)->Content()}.Quote()));
return false;
}
break;
}
}

Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/providers/yt/provider/yql_yt_op_settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ enum class EYtSettingType: ui64 {
PrimaryMedium = 1ull << 59 /* "primary_medium", "primarymedium" */,
KeepMeta = 1ull << 60 /* "keep_meta", "keepmeta" */,
MonotonicKeys = 1ull << 61 /* "monotonic_keys", "monotonickeys" */,
MutationId = 1ull << 62 /* "mutationid", "mutation_id" */,
};

Y_DECLARE_FLAGS(EYtSettingTypes, EYtSettingType);
Expand Down
10 changes: 10 additions & 0 deletions ydb/library/yql/sql/v1/sql_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,11 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core&
break;
}
case TRule_sql_stmt_core::kAltSqlStmtCore8: {
if (Ctx.ParallelModeCount > 0) {
Error() << humanStatementName << " statement is not supported in parallel mode";
return false;
}

Ctx.BodyPart();
const auto& rule = core.GetAlt_sql_stmt_core8().GetRule_commit_stmt1();
Token(rule.GetToken1());
Expand All @@ -324,6 +329,11 @@ bool TSqlQuery::Statement(TVector<TNodePtr>& blocks, const TRule_sql_stmt_core&
break;
}
case TRule_sql_stmt_core::kAltSqlStmtCore11: {
if (Ctx.ParallelModeCount > 0) {
Error() << humanStatementName << " statement is not supported in parallel mode";
return false;
}

Ctx.BodyPart();
const auto& rule = core.GetAlt_sql_stmt_core11().GetRule_rollback_stmt1();
Token(rule.GetToken1());
Expand Down

Large diffs are not rendered by default.

Loading

0 comments on commit 590c21c

Please sign in to comment.