diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.cpp b/ydb/core/kqp/ut/common/kqp_ut_common.cpp index cfdc89571c1d..e4417a2e0943 100644 --- a/ydb/core/kqp/ut/common/kqp_ut_common.cpp +++ b/ydb/core/kqp/ut/common/kqp_ut_common.cpp @@ -123,6 +123,7 @@ TKikimrRunner::TKikimrRunner(const TKikimrSettings& settings) { appConfig.MutableTableServiceConfig()->SetEnableRowsDuplicationCheck(true); ServerSettings->SetAppConfig(appConfig); ServerSettings->SetFeatureFlags(settings.FeatureFlags); + ServerSettings->FeatureFlags.SetEnableImmediateWritingOnBulkUpsert(true); ServerSettings->SetNodeCount(settings.NodeCount); ServerSettings->SetEnableKqpSpilling(enableSpilling); ServerSettings->SetEnableDataColumnForIndexTable(true); diff --git a/ydb/core/kqp/ut/olap/indexes_ut.cpp b/ydb/core/kqp/ut/olap/indexes_ut.cpp index 13e98b57e9b6..593fe44a5d80 100644 --- a/ydb/core/kqp/ut/olap/indexes_ut.cpp +++ b/ydb/core/kqp/ut/olap/indexes_ut.cpp @@ -80,7 +80,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) { { auto alterQuery = TStringBuilder() << - "ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_OPTIONS, SCHEME_NEED_ACTUALIZATION=`true`, EXTERNAL_GUARANTEE_EXCLUSIVE_PK=`true`);"; + "ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_OPTIONS, SCHEME_NEED_ACTUALIZATION=`true`);"; auto session = tableClient.CreateSession().GetValueSync().GetSession(); auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString()); @@ -336,13 +336,6 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) { auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString()); } - { - auto alterQuery = TStringBuilder() << - "ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_OPTIONS, EXTERNAL_GUARANTEE_EXCLUSIVE_PK=`true`);"; - auto session = tableClient.CreateSession().GetValueSync().GetSession(); - auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString()); - } std::vector uids; std::vector resourceIds; diff --git a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp index afcec4eb2062..62e345ba8258 100644 --- a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp +++ b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp @@ -8172,7 +8172,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) { csController->WaitCompactions(TDuration::Seconds(5)); } - testHelper.ReadData("SELECT value FROM `/Root/ColumnTableTest`", "[[#];[#];[[42u]];[[43u]]]"); + testHelper.ReadData("SELECT value FROM `/Root/ColumnTableTest` ORDER BY value", "[[#];[#];[[42u]];[[43u]]]"); } Y_UNIT_TEST(DropThenAddColumn) { diff --git a/ydb/core/protos/data_events.proto b/ydb/core/protos/data_events.proto index 0887a11b969c..804665fd1211 100644 --- a/ydb/core/protos/data_events.proto +++ b/ydb/core/protos/data_events.proto @@ -95,6 +95,7 @@ message TEvWrite { repeated uint32 ColumnIds = 3 [packed = true]; optional uint64 PayloadIndex = 4; optional EDataFormat PayloadFormat = 5; + optional string PayloadSchema = 6; } // Transaction operations diff --git a/ydb/core/protos/feature_flags.proto b/ydb/core/protos/feature_flags.proto index d3159e74524c..52b2398d7f02 100644 --- a/ydb/core/protos/feature_flags.proto +++ b/ydb/core/protos/feature_flags.proto @@ -162,4 +162,5 @@ message TFeatureFlags { optional bool EnableExternalDataSourcesOnServerless = 143 [default = true]; optional bool EnableSparsedColumns = 144 [default = false]; optional bool EnableParameterizedDecimal = 145 [default = false]; + optional bool EnableImmediateWritingOnBulkUpsert = 146 [default = false]; } diff --git a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp index 96a5cf794190..579a2e5eaa14 100644 --- a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp +++ b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp @@ -6,28 +6,11 @@ namespace NKikimr::NColumnShard { bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSerializedBatch& batch, const TInsertWriteId writeId) { - NKikimrTxColumnShard::TLogicalMetadata meta; - meta.SetNumRows(batch->GetRowsCount()); - meta.SetRawBytes(batch->GetRawBytes()); - meta.SetDirtyWriteTimeSeconds(batch.GetStartInstant().Seconds()); - meta.SetSpecialKeysRawData(batch->GetSpecialKeysFullSafe()); - meta.SetSpecialKeysPayloadData(batch->GetSpecialKeysPayloadSafe()); - - const auto& blobRange = batch.GetRange(); - Y_ABORT_UNLESS(blobRange.GetBlobId().IsValid()); + auto userData = batch.BuildInsertionUserData(*Self); + NOlap::TInsertedData insertData(writeId, userData); - // First write wins TBlobGroupSelector dsGroupSelector(Self->Info()); NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector); - - const auto& writeMeta = batch.GetAggregation().GetWriteMeta(); - meta.SetModificationType(TEnumOperator::SerializeToProto(writeMeta.GetModificationType())); - *meta.MutableSchemaSubset() = batch.GetAggregation().GetSchemaSubset().SerializeToProto(); - auto schemeVersion = batch.GetAggregation().GetSchemaVersion(); - auto tableSchema = Self->TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetSchemaVerified(schemeVersion); - - auto userData = std::make_shared(writeMeta.GetTableId(), blobRange, meta, tableSchema->GetVersion(), batch->GetData()); - NOlap::TInsertedData insertData(writeId, userData); bool ok = Self->InsertTable->Insert(dbTable, std::move(insertData)); if (ok) { Self->UpdateInsertTableCounters(); @@ -36,6 +19,18 @@ bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSeriali return false; } +bool TTxWrite::CommitOneBlob(TTransactionContext& txc, const NOlap::TWideSerializedBatch& batch, const TInsertWriteId writeId) { + auto userData = batch.BuildInsertionUserData(*Self); + TBlobGroupSelector dsGroupSelector(Self->Info()); + NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector); + NOlap::TCommittedData commitData(userData, Self->GetLastPlannedSnapshot(), Self->Generation(), writeId); + if (Self->TablesManager.HasTable(userData->GetPathId())) { + Self->InsertTable->CommitEphemeral(dbTable, std::move(commitData)); + } + Self->UpdateInsertTableCounters(); + return true; +} + bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) { TMemoryProfileGuard mpg("TTxWrite::Execute"); NActors::TLogContextGuard logGuard = @@ -65,10 +60,17 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) { operation = Self->OperationsManager->GetOperationVerified((TOperationWriteId)writeMeta.GetWriteId()); Y_ABORT_UNLESS(operation->GetStatus() == EOperationStatus::Started); for (auto&& i : aggr->GetSplittedBlobs()) { - const TInsertWriteId insertWriteId = Self->InsertTable->BuildNextWriteId(txc); - aggr->AddInsertWriteId(insertWriteId); - AFL_VERIFY(InsertOneBlob(txc, i, insertWriteId))("write_id", writeMeta.GetWriteId())("insert_write_id", insertWriteId)( - "size", aggr->GetSplittedBlobs().size()); + if (operation->GetBehaviour() == EOperationBehaviour::NoTxWrite) { + static TAtomicCounter Counter = 0; + const TInsertWriteId insertWriteId = (TInsertWriteId)Counter.Inc(); + AFL_VERIFY(CommitOneBlob(txc, i, insertWriteId))("write_id", writeMeta.GetWriteId())("insert_write_id", insertWriteId)( + "size", aggr->GetSplittedBlobs().size()); + } else { + const TInsertWriteId insertWriteId = Self->InsertTable->BuildNextWriteId(txc); + aggr->AddInsertWriteId(insertWriteId); + AFL_VERIFY(InsertOneBlob(txc, i, insertWriteId))("write_id", writeMeta.GetWriteId())("insert_write_id", insertWriteId)( + "size", aggr->GetSplittedBlobs().size()); + } } } } @@ -92,8 +94,6 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) { if (operation->GetBehaviour() == EOperationBehaviour::NoTxWrite) { auto ev = NEvents::TDataEvents::TEvWriteResult::BuildCompleted(Self->TabletID()); Results.emplace_back(std::move(ev), writeMeta.GetSource(), operation->GetCookie()); - Self->OperationsManager->AddTemporaryTxLink(operation->GetLockId()); - Self->OperationsManager->CommitTransactionOnExecute(*Self, operation->GetLockId(), txc, Self->GetLastTxSnapshot()); } else if (operation->GetBehaviour() == EOperationBehaviour::InTxWrite) { NKikimrTxColumnShard::TCommitWriteTxBody proto; proto.SetLockId(operation->GetLockId()); @@ -156,6 +156,7 @@ void TTxWrite::Complete(const TActorContext& ctx) { Self->GetOperationsManager().AddEventForLock(*Self, op->GetLockId(), evWrite); } if (op->GetBehaviour() == EOperationBehaviour::NoTxWrite) { + Self->OperationsManager->AddTemporaryTxLink(op->GetLockId()); Self->OperationsManager->CommitTransactionOnComplete(*Self, op->GetLockId(), Self->GetLastTxSnapshot()); } } @@ -163,6 +164,7 @@ void TTxWrite::Complete(const TActorContext& ctx) { Self->Counters.GetCSCounters().OnSuccessWriteResponse(); } Self->Counters.GetTabletCounters()->IncCounter(COUNTER_IMMEDIATE_TX_COMPLETED); + Self->SetupIndexation(); } } // namespace NKikimr::NColumnShard diff --git a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.h b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.h index 84ffbe7a9005..aacaac22384f 100644 --- a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.h +++ b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.h @@ -20,6 +20,9 @@ class TTxWrite : public NTabletFlatExecutor::TTransactionBase { TEvPrivate::TEvWriteBlobsResult::TPtr PutBlobResult; const ui32 TabletTxNo; + bool CommitOneBlob(TTransactionContext& txc, const NOlap::TWideSerializedBatch& batch, const TInsertWriteId writeId); + bool InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSerializedBatch& batch, const TInsertWriteId writeId); + class TReplyInfo { private: std::unique_ptr Event; @@ -43,8 +46,6 @@ class TTxWrite : public NTabletFlatExecutor::TTransactionBase { std::vector> ResultOperators; - bool InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSerializedBatch& batch, const TInsertWriteId writeId); - TStringBuilder TxPrefix() const { return TStringBuilder() << "TxWrite[" << ToString(TabletTxNo) << "] "; } diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index 2d41e3d05165..aa6ef920ece7 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -544,6 +544,10 @@ class TColumnShard public: ui64 TabletTxCounter = 0; + const TTablesManager& GetTablesManager() const { + return TablesManager; + } + bool HasLongTxWrites(const TInsertWriteId insertWriteId) const { return LongTxWrites.contains(insertWriteId); } diff --git a/ydb/core/tx/columnshard/engines/insert_table/committed.h b/ydb/core/tx/columnshard/engines/insert_table/committed.h index bd633647b5ec..b075d9b8a390 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/committed.h +++ b/ydb/core/tx/columnshard/engines/insert_table/committed.h @@ -25,10 +25,10 @@ class TCommittedData: public TUserDataContainer { , DedupId(dedupId) { } - TCommittedData(const std::shared_ptr& userData, const TSnapshot& ss, const TInsertWriteId insertWriteId) + TCommittedData(const std::shared_ptr& userData, const TSnapshot& ss, const ui64 generation, const TInsertWriteId ephemeralWriteId) : TBase(userData) , Snapshot(ss) - , DedupId(ToString(ss.GetPlanStep()) + ":" + ToString((ui64)insertWriteId)) { + , DedupId(ToString(generation) + ":" + ToString(ephemeralWriteId)) { } void SetRemove() { diff --git a/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp b/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp index 980882ad8eca..7b8bd9334ac4 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp +++ b/ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp @@ -57,6 +57,22 @@ TInsertionSummary::TCounters TInsertTable::Commit( return counters; } +TInsertionSummary::TCounters TInsertTable::CommitEphemeral(IDbWrapper& dbTable, TCommittedData&& data) { + TInsertionSummary::TCounters counters; + counters.Rows += data.GetMeta().GetNumRows(); + counters.RawBytes += data.GetMeta().GetRawBytes(); + counters.Bytes += data.BlobSize(); + + AddBlobLink(data.GetBlobRange().BlobId); + const ui64 pathId = data.GetPathId(); + auto& pathInfo = Summary.GetPathInfo(pathId); + AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "commit_insertion")("path_id", pathId)("blob_range", data.GetBlobRange().ToString()); + dbTable.Commit(data); + pathInfo.AddCommitted(std::move(data)); + + return counters; +} + void TInsertTable::Abort(IDbWrapper& dbTable, const THashSet& writeIds) { Y_ABORT_UNLESS(!writeIds.empty()); diff --git a/ydb/core/tx/columnshard/engines/insert_table/insert_table.h b/ydb/core/tx/columnshard/engines/insert_table/insert_table.h index 4f7544e10184..1eb6835053b7 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/insert_table.h +++ b/ydb/core/tx/columnshard/engines/insert_table/insert_table.h @@ -98,6 +98,7 @@ class TInsertTable: public TInsertTableAccessor { bool Insert(IDbWrapper& dbTable, TInsertedData&& data); TInsertionSummary::TCounters Commit( IDbWrapper& dbTable, ui64 planStep, ui64 txId, const THashSet& writeIds, std::function pathExists); + TInsertionSummary::TCounters CommitEphemeral(IDbWrapper& dbTable, TCommittedData&& data); void Abort(IDbWrapper& dbTable, const THashSet& writeIds); void MarkAsNotAbortable(const TInsertWriteId writeId) { Summary.MarkAsNotAbortable(writeId); diff --git a/ydb/core/tx/columnshard/engines/insert_table/inserted.cpp b/ydb/core/tx/columnshard/engines/insert_table/inserted.cpp index 2986fc0b4c35..abf866eaae82 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/inserted.cpp +++ b/ydb/core/tx/columnshard/engines/insert_table/inserted.cpp @@ -5,7 +5,7 @@ namespace NKikimr::NOlap { -TCommittedData TInsertedData::Commit(const ui64 planStep, const ui64 txId) { +TCommittedData TInsertedData::Commit(const ui64 planStep, const ui64 txId) const { return TCommittedData(UserData, planStep, txId, InsertWriteId); } diff --git a/ydb/core/tx/columnshard/engines/insert_table/inserted.h b/ydb/core/tx/columnshard/engines/insert_table/inserted.h index e124edeb57e5..74750297b87c 100644 --- a/ydb/core/tx/columnshard/engines/insert_table/inserted.h +++ b/ydb/core/tx/columnshard/engines/insert_table/inserted.h @@ -29,7 +29,7 @@ class TInsertedData: public TUserDataContainer { /// One of them wins and becomes committed. Original DedupId would be lost then. /// After commit we use original Initiator:WriteId as DedupId of inserted blob inside {PlanStep, TxId}. /// pathId, initiator, {writeId}, {dedupId} -> pathId, planStep, txId, {dedupId} - [[nodiscard]] TCommittedData Commit(const ui64 planStep, const ui64 txId); + [[nodiscard]] TCommittedData Commit(const ui64 planStep, const ui64 txId) const; }; } // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.cpp index 1981faaa4314..edea4214e298 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.cpp @@ -85,7 +85,8 @@ TConclusionStatus TStartMergeTask::DoExecuteImpl() { break; } } - if (MergingContext->IsExclusiveInterval() && sourcesInMemory) { + if ((MergingContext->IsExclusiveInterval() || Context->GetCommonContext()->GetReadMetadata()->HasGuaranteeExclusivePK()) && + sourcesInMemory) { TMemoryProfileGuard mGuard("SCAN_PROFILE::MERGE::EXCLUSIVE", IS_DEBUG_LOG_ENABLED(NKikimrServices::TX_COLUMNSHARD_SCAN_MEMORY)); auto& container = Sources.begin()->second->GetStageResult().GetBatch(); if (container && container->num_rows()) { diff --git a/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp index ecde3aa56673..379644d20864 100644 --- a/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp +++ b/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp @@ -451,8 +451,8 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { engine.Load(db); std::vector dataToIndex = { - TCommittedData(TUserData::Build(paths[0], blobRanges[0], TLocalHelper::GetMetaProto(), 0, {}), TSnapshot(1, 2), (TInsertWriteId)2), - TCommittedData(TUserData::Build(paths[0], blobRanges[1], TLocalHelper::GetMetaProto(), 0, {}), TSnapshot(2, 1), (TInsertWriteId)1) + TCommittedData(TUserData::Build(paths[0], blobRanges[0], TLocalHelper::GetMetaProto(), 0, {}), TSnapshot(1, 2), 0, (TInsertWriteId)2), + TCommittedData(TUserData::Build(paths[0], blobRanges[1], TLocalHelper::GetMetaProto(), 0, {}), TSnapshot(2, 1), 0, (TInsertWriteId)1) }; // write @@ -553,7 +553,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { std::vector dataToIndex; TSnapshot ss(planStep, txId); dataToIndex.push_back( - TCommittedData(TUserData::Build(pathId, blobRange, TLocalHelper::GetMetaProto(), 0, {}), ss, (TInsertWriteId)txId)); + TCommittedData(TUserData::Build(pathId, blobRange, TLocalHelper::GetMetaProto(), 0, {}), ss, 0, (TInsertWriteId)txId)); bool ok = Insert(engine, db, ss, std::move(dataToIndex), blobs, step); UNIT_ASSERT(ok); @@ -651,7 +651,8 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { // PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata] std::vector dataToIndex; TSnapshot ss(planStep, txId); - dataToIndex.push_back(TCommittedData(TUserData::Build(pathId, blobRange, TLocalHelper::GetMetaProto(), 0, {}), ss, (TInsertWriteId)txId)); + dataToIndex.push_back( + TCommittedData(TUserData::Build(pathId, blobRange, TLocalHelper::GetMetaProto(), 0, {}), ss, 0, (TInsertWriteId)txId)); bool ok = Insert(engine, db, ss, std::move(dataToIndex), blobs, step); blobsAll.Merge(std::move(blobs)); @@ -682,7 +683,8 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { // PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata] std::vector dataToIndex; TSnapshot ss(planStep, txId); - dataToIndex.push_back(TCommittedData(TUserData::Build(pathId, blobRange, TLocalHelper::GetMetaProto(), 0, {}), ss, TInsertWriteId(txId))); + dataToIndex.push_back( + TCommittedData(TUserData::Build(pathId, blobRange, TLocalHelper::GetMetaProto(), 0, {}), ss, 0, TInsertWriteId(txId))); bool ok = Insert(engine, db, ss, std::move(dataToIndex), blobs, step); UNIT_ASSERT(ok); @@ -730,7 +732,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { TSnapshot ss(planStep, txId); std::vector dataToIndex; dataToIndex.push_back( - TCommittedData(TUserData::Build(pathId, blobRange, TLocalHelper::GetMetaProto(), 0, {}), ss, TInsertWriteId(txId))); + TCommittedData(TUserData::Build(pathId, blobRange, TLocalHelper::GetMetaProto(), 0, {}), ss, 0, TInsertWriteId(txId))); bool ok = Insert(engine, db, ss, std::move(dataToIndex), blobs, step); UNIT_ASSERT(ok); diff --git a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp index ceacecf155b6..99d1a3e1fd59 100644 --- a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp +++ b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp @@ -1,16 +1,16 @@ #include "indexed_blob_constructor.h" -#include #include +#include #include - +#include namespace NKikimr::NOlap { -TIndexedWriteController::TIndexedWriteController(const TActorId& dstActor, const std::shared_ptr& action, std::vector>&& aggregations) +TIndexedWriteController::TIndexedWriteController( + const TActorId& dstActor, const std::shared_ptr& action, std::vector>&& aggregations) : Buffer(action, std::move(aggregations)) - , DstActor(dstActor) -{ + , DstActor(dstActor) { auto blobs = Buffer.GroupIntoBlobs(); for (auto&& b : blobs) { auto& task = AddWriteTask(TBlobWriteInfo::BuildWriteTask(b.ExtractBlobData(), action)); @@ -33,6 +33,26 @@ void TWideSerializedBatch::InitBlobId(const TUnifiedBlobId& id) { Range.BlobId = id; } +std::shared_ptr TWideSerializedBatch::BuildInsertionUserData(const NColumnShard::TColumnShard& owner) const { + NKikimrTxColumnShard::TLogicalMetadata meta; + meta.SetNumRows(SplittedBlobs.GetRowsCount()); + meta.SetRawBytes(SplittedBlobs.GetRawBytes()); + meta.SetDirtyWriteTimeSeconds(GetStartInstant().Seconds()); + meta.SetSpecialKeysRawData(SplittedBlobs.GetSpecialKeysFullSafe()); + meta.SetSpecialKeysPayloadData(SplittedBlobs.GetSpecialKeysPayloadSafe()); + + const auto& blobRange = Range; + Y_ABORT_UNLESS(blobRange.GetBlobId().IsValid()); + + const auto& writeMeta = GetAggregation().GetWriteMeta(); + meta.SetModificationType(TEnumOperator::SerializeToProto(writeMeta.GetModificationType())); + *meta.MutableSchemaSubset() = GetAggregation().GetSchemaSubset().SerializeToProto(); + auto schemeVersion = GetAggregation().GetSchemaVersion(); + auto tableSchema = owner.GetTablesManager().GetPrimaryIndex()->GetVersionedIndex().GetSchemaVerified(schemeVersion); + + return std::make_shared(writeMeta.GetTableId(), blobRange, meta, tableSchema->GetVersion(), SplittedBlobs.GetData()); +} + void TWritingBuffer::InitReadyInstant(const TMonotonic instant) { for (auto&& aggr : Aggregations) { aggr->MutableWriteMeta().SetWriteMiddle5StartInstant(instant); @@ -89,4 +109,4 @@ TString TWritingBlob::ExtractBlobData() { return result; } -} +} // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h index 92e59e9b197c..cca506a29dbe 100644 --- a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h +++ b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h @@ -3,14 +3,18 @@ #include "blob_constructor.h" #include "write_controller.h" -#include +#include #include #include +#include #include +#include #include -#include -#include +#include +namespace NKikimr::NColumnShard { +class TColumnShard; +} namespace NKikimr::NOlap { @@ -22,7 +26,9 @@ class TWideSerializedBatch { YDB_ACCESSOR_DEF(TBlobRange, Range); YDB_READONLY(TInstant, StartInstant, AppDataVerified().TimeProvider->Now()); TWriteAggregation* ParentAggregation; + public: + std::shared_ptr BuildInsertionUserData(const NColumnShard::TColumnShard& owner) const; void InitBlobId(const TUnifiedBlobId& id); const NArrow::TSerializedBatch& GetSplittedBlobs() const { @@ -43,9 +49,7 @@ class TWideSerializedBatch { TWideSerializedBatch(NArrow::TSerializedBatch&& splitted, TWriteAggregation& parentAggregation) : SplittedBlobs(std::move(splitted)) - , ParentAggregation(&parentAggregation) - { - + , ParentAggregation(&parentAggregation) { } }; @@ -55,6 +59,7 @@ class TWritingBlob { std::vector BlobData; ui64 BlobSize = 0; bool Extracted = false; + public: TWritingBlob() = default; bool AddData(TWideSerializedBatch& batch) { @@ -114,14 +119,14 @@ class TWriteAggregation { InsertWriteIds.emplace_back(id); } - TWriteAggregation(const NEvWrite::TWriteData& writeData, std::vector&& splittedBlobs, const std::shared_ptr& batch) + TWriteAggregation(const NEvWrite::TWriteData& writeData, std::vector&& splittedBlobs, + const std::shared_ptr& batch) : WriteMeta(writeData.GetWriteMeta()) , SchemaVersion(writeData.GetData()->GetSchemaVersion()) , Size(writeData.GetSize()) , BlobsAction(writeData.GetBlobsAction()) , SchemaSubset(writeData.GetSchemaSubsetVerified()) - , RecordBatch(batch) - { + , RecordBatch(batch) { for (auto&& s : splittedBlobs) { SplittedBlobs.emplace_back(std::move(s), *this); } @@ -132,7 +137,7 @@ class TWriteAggregation { TWriteAggregation(const NEvWrite::TWriteData& writeData) : WriteMeta(writeData.GetWriteMeta()) - , SchemaVersion(writeData.GetData()->GetSchemaVersion()) + , SchemaVersion(writeData.GetData()->GetSchemaVersion()) , Size(writeData.GetSize()) , BlobsAction(writeData.GetBlobsAction()) { AFL_VERIFY(!writeData.GetSchemaSubset()); @@ -145,12 +150,12 @@ class TWritingBuffer: public TMoveOnly { std::shared_ptr DeclareRemoveAction; YDB_READONLY_DEF(std::vector>, Aggregations); YDB_READONLY(ui64, SumSize, 0); + public: TWritingBuffer() = default; TWritingBuffer(const std::shared_ptr& action, std::vector>&& aggregations) : BlobsAction(action) - , Aggregations(std::move(aggregations)) - { + , Aggregations(std::move(aggregations)) { AFL_VERIFY(BlobsAction); for (auto&& aggr : Aggregations) { SumSize += aggr->GetSize(); @@ -187,12 +192,12 @@ class TWritingBuffer: public TMoveOnly { } std::vector> GetAddActions() const { - return {BlobsAction}; + return { BlobsAction }; } std::vector> GetRemoveActions() const { if (DeclareRemoveAction) { - return {DeclareRemoveAction}; + return { DeclareRemoveAction }; } else { return {}; } @@ -205,7 +210,8 @@ class TWritingBuffer: public TMoveOnly { std::vector GroupIntoBlobs(); }; -class TIndexedWriteController : public NColumnShard::IWriteController, public NColumnShard::TMonitoringObjectsCounter { +class TIndexedWriteController: public NColumnShard::IWriteController, + public NColumnShard::TMonitoringObjectsCounter { private: TWritingBuffer Buffer; TActorId DstActor; @@ -213,8 +219,8 @@ class TIndexedWriteController : public NColumnShard::IWriteController, public NC virtual void DoOnStartSending() override; public: - TIndexedWriteController(const TActorId& dstActor, const std::shared_ptr& action, std::vector>&& aggregations); - + TIndexedWriteController(const TActorId& dstActor, const std::shared_ptr& action, + std::vector>&& aggregations); }; -} +} // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/operations/write.cpp b/ydb/core/tx/columnshard/operations/write.cpp index 06b7701cd7bd..1068e7167413 100644 --- a/ydb/core/tx/columnshard/operations/write.cpp +++ b/ydb/core/tx/columnshard/operations/write.cpp @@ -35,7 +35,7 @@ void TWriteOperation::Start(TColumnShard& owner, const ui64 tableId, const NEvWr NEvWrite::TWriteData(writeMeta, data, owner.TablesManager.GetPrimaryIndex()->GetReplaceKey(), owner.StoragesManager->GetInsertOperator()->StartWritingAction(NOlap::NBlobOperations::EConsumer::WRITING_OPERATOR)), schema, owner.GetLastTxSnapshot(), owner.Counters.GetCSCounters().WritingCounters); - NConveyor::TCompServiceOperator::SendTaskToExecute(task); + NConveyor::TInsertServiceOperator::AsyncTaskToExecute(task); Status = EOperationStatus::Started; } diff --git a/ydb/core/tx/columnshard/operations/write_data.cpp b/ydb/core/tx/columnshard/operations/write_data.cpp index 56a0ad5e16cb..0f7440aaf5e4 100644 --- a/ydb/core/tx/columnshard/operations/write_data.cpp +++ b/ydb/core/tx/columnshard/operations/write_data.cpp @@ -12,26 +12,43 @@ bool TArrowData::Parse(const NKikimrDataEvents::TEvWrite_TOperation& proto, cons } IncomingData = payload.GetDataFromPayload(proto.GetPayloadIndex()); if (proto.HasType()) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "invalid_modification_type")("proto", proto.DebugString()); auto type = TEnumOperator::DeserializeFromProto(proto.GetType()); if (!type) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "invalid_modification_type")("proto", proto.DebugString()); return false; } ModificationType = *type; } - std::vector columns; - for (auto&& columnId : proto.GetColumnIds()) { - columns.emplace_back(columnId); + if (proto.HasPayloadSchema()) { + PayloadSchema = NArrow::DeserializeSchema(proto.GetPayloadSchema()); + } else { + std::vector columns; + for (auto&& columnId : proto.GetColumnIds()) { + columns.emplace_back(columnId); + } + if (columns.empty()) { + BatchSchema = IndexSchema; + } else { + BatchSchema = std::make_shared(IndexSchema, columns); + } + if (BatchSchema->GetColumnsCount() != columns.size()) { + return false; + } } - BatchSchema = std::make_shared(IndexSchema, columns); OriginalDataSize = IncomingData.size(); - return BatchSchema->GetColumnsCount() == columns.size() && !IncomingData.empty(); + return !!IncomingData; } TConclusion> TArrowData::ExtractBatch() { Y_ABORT_UNLESS(!!IncomingData); - auto result = NArrow::DeserializeBatch(IncomingData, std::make_shared(BatchSchema->GetSchema()->fields())); + std::shared_ptr result; + if (PayloadSchema) { + result = NArrow::DeserializeBatch(IncomingData, PayloadSchema); + } else { + result = NArrow::DeserializeBatch(IncomingData, std::make_shared(BatchSchema->GetSchema()->fields())); + } + IncomingData = ""; return result; } diff --git a/ydb/core/tx/columnshard/operations/write_data.h b/ydb/core/tx/columnshard/operations/write_data.h index f1c95285abd8..2b0599c4e92f 100644 --- a/ydb/core/tx/columnshard/operations/write_data.h +++ b/ydb/core/tx/columnshard/operations/write_data.h @@ -30,6 +30,7 @@ class TArrowData : public NEvWrite::IDataContainer { private: NOlap::ISnapshotSchema::TPtr IndexSchema; NOlap::ISnapshotSchema::TPtr BatchSchema; + std::shared_ptr PayloadSchema; TString IncomingData; NEvWrite::EModificationType ModificationType = NEvWrite::EModificationType::Upsert; }; diff --git a/ydb/core/tx/data_events/columnshard_splitter.h b/ydb/core/tx/data_events/columnshard_splitter.h index 5225ac6c88dd..2d0feda6b3a7 100644 --- a/ydb/core/tx/data_events/columnshard_splitter.h +++ b/ydb/core/tx/data_events/columnshard_splitter.h @@ -1,6 +1,8 @@ #pragma once +#include "events.h" #include "shards_splitter.h" +#include "payload_helper.h" #include #include @@ -8,11 +10,10 @@ #include #include - namespace NKikimr::NEvWrite { -class TColumnShardShardsSplitter : public IShardsSplitter { - class TShardInfo : public IShardInfo { +class TColumnShardShardsSplitter: public IShardsSplitter { + class TShardInfo: public IShardInfo { private: const TString SchemaData; const TString Data; @@ -23,25 +24,38 @@ class TColumnShardShardsSplitter : public IShardsSplitter { : SchemaData(schemaData) , Data(data) , RowsCount(rowsCount) - , GranuleShardingVersion(granuleShardingVersion) - {} + , GranuleShardingVersion(granuleShardingVersion) { + } - ui64 GetBytes() const override { + virtual ui64 GetBytes() const override { return Data.size(); } - ui32 GetRowsCount() const override { + virtual ui32 GetRowsCount() const override { return RowsCount; } - const TString& GetData() const override { + virtual const TString& GetData() const override { return Data; } - void Serialize(TEvWrite& evWrite) const override { + virtual void Serialize(TEvColumnShard::TEvWrite& evWrite) const override { evWrite.SetArrowData(SchemaData, Data); evWrite.Record.SetGranuleShardingVersion(GranuleShardingVersion); } + virtual void Serialize(NEvents::TDataEvents::TEvWrite& evWrite, const ui64 tableId, const ui64 schemaVersion) const override { + TPayloadWriter writer(evWrite); + TString data = Data; + writer.AddDataToPayload(std::move(data)); + + auto* operation = evWrite.Record.AddOperations(); + operation->SetPayloadSchema(SchemaData); + operation->SetType(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_REPLACE); + operation->SetPayloadFormat(NKikimrDataEvents::FORMAT_ARROW); + operation->SetPayloadIndex(0); + operation->MutableTableId()->SetTableId(tableId); + operation->MutableTableId()->SetSchemaVersion(schemaVersion); + } }; private: diff --git a/ydb/core/tx/data_events/shard_writer.cpp b/ydb/core/tx/data_events/shard_writer.cpp index 7f935f5270cc..e09a1bbc657a 100644 --- a/ydb/core/tx/data_events/shard_writer.cpp +++ b/ydb/core/tx/data_events/shard_writer.cpp @@ -7,9 +7,10 @@ namespace NKikimr::NEvWrite { - TWritersController::TWritersController(const ui32 writesCount, const NActors::TActorIdentity& longTxActorId, const NLongTxService::TLongTxId& longTxId) + TWritersController::TWritersController(const ui32 writesCount, const NActors::TActorIdentity& longTxActorId, const NLongTxService::TLongTxId& longTxId, const bool immediateWrite) : WritesCount(writesCount) , LongTxActorId(longTxActorId) + , ImmediateWrite(immediateWrite) , LongTxId(longTxId) { Y_ABORT_UNLESS(writesCount); @@ -39,28 +40,62 @@ namespace NKikimr::NEvWrite { } } - TShardWriter::TShardWriter(const ui64 shardId, const ui64 tableId, const TString& dedupId, const IShardInfo::TPtr& data, - const NWilson::TProfileSpan& parentSpan, TWritersController::TPtr externalController, const ui32 writePartIdx, const EModificationType mType) + TShardWriter::TShardWriter(const ui64 shardId, const ui64 tableId, const ui64 schemaVersion, const TString& dedupId, const IShardInfo::TPtr& data, + const NWilson::TProfileSpan& parentSpan, TWritersController::TPtr externalController, const ui32 writePartIdx, const EModificationType mType, const bool immediateWrite) : ShardId(shardId) , WritePartIdx(writePartIdx) , TableId(tableId) + , SchemaVersion(schemaVersion) , DedupId(dedupId) , DataForShard(data) , ExternalController(externalController) , LeaderPipeCache(MakePipePerNodeCacheID(false)) , ActorSpan(parentSpan.BuildChildrenSpan("ShardWriter")) , ModificationType(mType) + , ImmediateWrite(immediateWrite) { } + void TShardWriter::SendWriteRequest() { + if (ImmediateWrite) { + auto ev = MakeHolder(NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE); + DataForShard->Serialize(*ev, TableId, SchemaVersion); + SendToTablet(std::move(ev)); + } else { + auto ev = MakeHolder(SelfId(), ExternalController->GetLongTxId(), TableId, DedupId, "", WritePartIdx, ModificationType); + DataForShard->Serialize(*ev); + SendToTablet(std::move(ev)); + } + } + void TShardWriter::Bootstrap() { - auto ev = MakeHolder(SelfId(), ExternalController->GetLongTxId(), TableId, DedupId, "", WritePartIdx, ModificationType); - DataForShard->Serialize(*ev); - SendToTablet(std::move(ev)); + SendWriteRequest(); Become(&TShardWriter::StateMain); } - void TShardWriter::Handle(TEvWriteResult::TPtr& ev) { + void TShardWriter::Handle(NEvents::TDataEvents::TEvWriteResult::TPtr& ev) { + const auto* msg = ev->Get(); + Y_ABORT_UNLESS(msg->Record.GetOrigin() == ShardId); + + const auto ydbStatus = msg->GetStatus(); + if (ydbStatus == NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED) { + if (RetryWriteRequest(true)) { + return; + } + } + + auto gPassAway = PassAwayGuard(); + if (ydbStatus != NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED) { + ExternalController->OnFail(Ydb::StatusIds::GENERIC_ERROR, + TStringBuilder() << "Cannot write data into shard " << ShardId << " in longTx " << + ExternalController->GetLongTxId().ToString()); + return; + } + + ExternalController->OnSuccess(ShardId, 0, WritePartIdx); + } + + void TShardWriter::Handle(TEvColumnShard::TEvWriteResult::TPtr& ev) { const auto* msg = ev->Get(); Y_ABORT_UNLESS(msg->Record.GetOrigin() == ShardId); @@ -113,9 +148,7 @@ namespace NKikimr::NEvWrite { Schedule(OverloadTimeout(), new TEvents::TEvWakeup()); } else { ++NumRetries; - auto ev = MakeHolder(SelfId(), ExternalController->GetLongTxId(), TableId, DedupId, "", WritePartIdx, ModificationType); - DataForShard->Serialize(*ev); - SendToTablet(std::move(ev)); + SendWriteRequest(); } return true; } diff --git a/ydb/core/tx/data_events/shard_writer.h b/ydb/core/tx/data_events/shard_writer.h index 0a649a4dd3b9..323bffa5056f 100644 --- a/ydb/core/tx/data_events/shard_writer.h +++ b/ydb/core/tx/data_events/shard_writer.h @@ -1,7 +1,8 @@ #pragma once -#include "shards_splitter.h" #include "common/modification_type.h" +#include "events.h" +#include "shards_splitter.h" #include #include @@ -89,6 +90,7 @@ class TWritersController { NActors::TActorIdentity LongTxActorId; std::vector WriteIds; const TMonotonic StartInstant = TMonotonic::Now(); + const bool ImmediateWrite = false; YDB_READONLY_DEF(NLongTxService::TLongTxId, LongTxId); YDB_READONLY(std::shared_ptr, Counters, std::make_shared()); void SendReply() { @@ -96,6 +98,9 @@ class TWritersController { Counters->OnFailedFullReply(TMonotonic::Now() - StartInstant); AFL_VERIFY(Code); LongTxActorId.Send(LongTxActorId, new TEvPrivate::TEvShardsWriteResult(*Code, Issues)); + } else if (ImmediateWrite) { + Counters->OnSucceedFullReply(TMonotonic::Now() - StartInstant); + LongTxActorId.Send(LongTxActorId, new TEvPrivate::TEvShardsWriteResult(Ydb::StatusIds::SUCCESS)); } else { Counters->OnSucceedFullReply(TMonotonic::Now() - StartInstant); auto req = MakeHolder(LongTxId); @@ -129,7 +134,7 @@ class TWritersController { }; - TWritersController(const ui32 writesCount, const NActors::TActorIdentity& longTxActorId, const NLongTxService::TLongTxId& longTxId); + TWritersController(const ui32 writesCount, const NActors::TActorIdentity& longTxActorId, const NLongTxService::TLongTxId& longTxId, const bool immediateWrite); void OnSuccess(const ui64 shardId, const ui64 writeId, const ui32 writePartId); void OnFail(const Ydb::StatusIds::StatusCode code, const TString& message); }; @@ -144,6 +149,7 @@ class TShardWriter: public NActors::TActorBootstrapped { const ui64 ShardId; const ui64 WritePartIdx; const ui64 TableId; + const ui64 SchemaVersion; const TString DedupId; const IShardInfo::TPtr DataForShard; ui32 NumRetries = 0; @@ -151,7 +157,9 @@ class TShardWriter: public NActors::TActorBootstrapped { const TActorId LeaderPipeCache; NWilson::TProfileSpan ActorSpan; EModificationType ModificationType; + const bool ImmediateWrite = false; + void SendWriteRequest(); static TDuration OverloadTimeout() { return TDuration::MilliSeconds(OverloadedDelayMs); } @@ -164,21 +172,24 @@ class TShardWriter: public NActors::TActorBootstrapped { TBase::PassAway(); } public: - TShardWriter(const ui64 shardId, const ui64 tableId, const TString& dedupId, const IShardInfo::TPtr& data, - const NWilson::TProfileSpan& parentSpan, TWritersController::TPtr externalController, const ui32 writePartIdx, const EModificationType mType); + TShardWriter(const ui64 shardId, const ui64 tableId, const ui64 schemaVersion, const TString& dedupId, const IShardInfo::TPtr& data, + const NWilson::TProfileSpan& parentSpan, TWritersController::TPtr externalController, const ui32 writePartIdx, + const EModificationType mType, const bool immediateWrite); STFUNC(StateMain) { switch (ev->GetTypeRewrite()) { - hFunc(TEvWriteResult, Handle); + hFunc(TEvColumnShard::TEvWriteResult, Handle); hFunc(TEvPipeCache::TEvDeliveryProblem, Handle); + hFunc(NEvents::TDataEvents::TEvWriteResult, Handle); CFunc(TEvents::TSystem::Wakeup, HandleTimeout); } } void Bootstrap(); - void Handle(TEvWriteResult::TPtr& ev); + void Handle(TEvColumnShard::TEvWriteResult::TPtr& ev); void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev); + void Handle(NEvents::TDataEvents::TEvWriteResult::TPtr& ev); void HandleTimeout(const TActorContext& ctx); private: bool RetryWriteRequest(const bool delayed = true); diff --git a/ydb/core/tx/data_events/shards_splitter.h b/ydb/core/tx/data_events/shards_splitter.h index 77c04e1160e9..bde0656ccb91 100644 --- a/ydb/core/tx/data_events/shards_splitter.h +++ b/ydb/core/tx/data_events/shards_splitter.h @@ -11,9 +11,6 @@ namespace NKikimr::NEvWrite { -using TEvWrite = TEvColumnShard::TEvWrite; -using TEvWriteResult = TEvColumnShard::TEvWriteResult; - class IShardsSplitter { public: using TPtr = std::shared_ptr; @@ -43,7 +40,8 @@ class IShardsSplitter { using TPtr = std::shared_ptr; virtual ~IShardInfo() {} - virtual void Serialize(TEvWrite& evWrite) const = 0; + virtual void Serialize(TEvColumnShard::TEvWrite& evWrite) const = 0; + virtual void Serialize(NEvents::TDataEvents::TEvWrite& evWrite, const ui64 tableId, const ui64 schemaVersion) const = 0; virtual ui64 GetBytes() const = 0; virtual ui32 GetRowsCount() const = 0; virtual const TString& GetData() const = 0; @@ -68,6 +66,10 @@ class IShardsSplitter { TYdbConclusionStatus SplitData(const NSchemeCache::TSchemeCacheNavigate::TEntry& schemeEntry, const IEvWriteDataAccessor& data) { TableId = schemeEntry.TableId.PathId.LocalPathId; + AFL_VERIFY(schemeEntry.ColumnTableInfo); + AFL_VERIFY(schemeEntry.ColumnTableInfo->Description.HasSchema()); + SchemaVersion = schemeEntry.ColumnTableInfo->Description.GetSchema().GetVersion(); + AFL_VERIFY(SchemaVersion); return DoSplitData(schemeEntry, data); } @@ -75,6 +77,10 @@ class IShardsSplitter { return TableId; } + ui64 GetSchemaVersion() const { + return SchemaVersion; + } + const TFullSplitData& GetSplitData() const { Y_ABORT_UNLESS(FullSplitData); return *FullSplitData; @@ -86,6 +92,7 @@ class IShardsSplitter { virtual TYdbConclusionStatus DoSplitData(const NSchemeCache::TSchemeCacheNavigate::TEntry& schemeEntry, const IEvWriteDataAccessor& data) = 0; ui64 TableId = 0; + ui64 SchemaVersion = 0; protected: std::optional FullSplitData; }; diff --git a/ydb/core/tx/tx_proxy/rpc_long_tx.cpp b/ydb/core/tx/tx_proxy/rpc_long_tx.cpp index 557cf13c14cb..11650948fe93 100644 --- a/ydb/core/tx/tx_proxy/rpc_long_tx.cpp +++ b/ydb/core/tx/tx_proxy/rpc_long_tx.cpp @@ -27,14 +27,18 @@ class TLongTxWriteBase: public TActorBootstrapped { protected: using TThis = typename TBase::TThis; + const bool NoTxWrite = false; public: - TLongTxWriteBase(const TString& databaseName, const TString& path, const TString& token, const TLongTxId& longTxId, const TString& dedupId) - : DatabaseName(databaseName) + TLongTxWriteBase(const TString& databaseName, const TString& path, const TString& token, const TLongTxId& longTxId, const TString& dedupId, + const bool noTxWrite) + : NoTxWrite(noTxWrite) + , DatabaseName(databaseName) , Path(path) , DedupId(dedupId) , LongTxId(longTxId) - , ActorSpan(0, NWilson::TTraceId::NewTraceId(0, Max()), "TLongTxWriteBase") { + , ActorSpan(0, NWilson::TTraceId::NewTraceId(0, Max()), "TLongTxWriteBase") + { if (token) { UserToken.emplace(token); } @@ -91,7 +95,7 @@ class TLongTxWriteBase: public TActorBootstrapped { accessor.reset(); const auto& splittedData = shardsSplitter->GetSplitData(); - InternalController = std::make_shared(splittedData.GetShardRequestsCount(), this->SelfId(), LongTxId); + InternalController = std::make_shared(splittedData.GetShardRequestsCount(), this->SelfId(), LongTxId, NoTxWrite); ui32 sumBytes = 0; ui32 rowsCount = 0; ui32 writeIdx = 0; @@ -100,8 +104,9 @@ class TLongTxWriteBase: public TActorBootstrapped { InternalController->GetCounters()->OnRequest(shardInfo->GetRowsCount(), shardInfo->GetBytes()); sumBytes += shardInfo->GetBytes(); rowsCount += shardInfo->GetRowsCount(); - this->Register(new NEvWrite::TShardWriter(shard, shardsSplitter->GetTableId(), DedupId, shardInfo, ActorSpan, InternalController, - ++writeIdx, NEvWrite::EModificationType::Replace)); + this->Register(new NEvWrite::TShardWriter(shard, shardsSplitter->GetTableId(), shardsSplitter->GetSchemaVersion(), DedupId, shardInfo, + ActorSpan, InternalController, + ++writeIdx, NEvWrite::EModificationType::Replace, NoTxWrite)); } } pSpan.Attribute("affected_shards_count", (long)splittedData.GetShardsInfo().size()); @@ -125,11 +130,19 @@ class TLongTxWriteBase: public TActorBootstrapped { void Handle(NEvWrite::TWritersController::TEvPrivate::TEvShardsWriteResult::TPtr& ev) { NWilson::TProfileSpan pSpan(0, ActorSpan.GetTraceId(), "ShardsWriteResult"); const auto* msg = ev->Get(); - Y_ABORT_UNLESS(msg->Status != Ydb::StatusIds::SUCCESS); - for (auto& issue : msg->Issues) { - RaiseIssue(issue); + if (msg->Status == Ydb::StatusIds::SUCCESS) { + if (IndexReady) { + ReplySuccess(); + } else { + ColumnShardReady = true; + } + } else { + Y_ABORT_UNLESS(msg->Status != Ydb::StatusIds::SUCCESS); + for (auto& issue : msg->Issues) { + RaiseIssue(issue); + } + ReplyError(msg->Status); } - ReplyError(msg->Status); } void Handle(TEvLongTxService::TEvAttachColumnShardWritesResult::TPtr& ev) { @@ -217,12 +230,13 @@ class TLongTxWriteInternal: public TLongTxWriteBase { public: explicit TLongTxWriteInternal(const TActorId& replyTo, const TLongTxId& longTxId, const TString& dedupId, const TString& databaseName, const TString& path, std::shared_ptr navigateResult, std::shared_ptr batch, - std::shared_ptr issues) - : TBase(databaseName, path, TString(), longTxId, dedupId) + std::shared_ptr issues, const bool noTxWrite) + : TBase(databaseName, path, TString(), longTxId, dedupId, noTxWrite) , ReplyTo(replyTo) , NavigateResult(navigateResult) , Batch(batch) - , Issues(issues) { + , Issues(issues) + { Y_ABORT_UNLESS(Issues); DataAccessor = std::make_unique(Batch); } @@ -265,8 +279,9 @@ class TLongTxWriteInternal: public TLongTxWriteBase { TActorId DoLongTxWriteSameMailbox(const TActorContext& ctx, const TActorId& replyTo, const NLongTxService::TLongTxId& longTxId, const TString& dedupId, const TString& databaseName, const TString& path, std::shared_ptr navigateResult, std::shared_ptr batch, - std::shared_ptr issues) { - return ctx.RegisterWithSameMailbox(new TLongTxWriteInternal(replyTo, longTxId, dedupId, databaseName, path, navigateResult, batch, issues)); + std::shared_ptr issues, const bool noTxWrite) { + return ctx.RegisterWithSameMailbox( + new TLongTxWriteInternal(replyTo, longTxId, dedupId, databaseName, path, navigateResult, batch, issues, noTxWrite)); } // diff --git a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h index c1ae351e5dde..b1a023227a4f 100644 --- a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h +++ b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h @@ -181,8 +181,8 @@ namespace NTxProxy { TActorId DoLongTxWriteSameMailbox(const TActorContext& ctx, const TActorId& replyTo, const NLongTxService::TLongTxId& longTxId, const TString& dedupId, const TString& databaseName, const TString& path, - std::shared_ptr navigateResult, - std::shared_ptr batch, std::shared_ptr issues); + std::shared_ptr navigateResult, std::shared_ptr batch, + std::shared_ptr issues, const bool noTxWrite); template class TUploadRowsBase : public TActorBootstrapped> { @@ -224,6 +224,8 @@ class TUploadRowsBase : public TActorBootstrappedNow(); + ImmediateWrite = AppData(ctx)->FeatureFlags.GetEnableImmediateWritingOnBulkUpsert(); OnBeforeStart(ctx); ResolveTable(GetTable(), ctx); } @@ -921,8 +924,8 @@ class TUploadRowsBase : public TActorBootstrapped