Skip to content

Commit

Permalink
immediate write for bulk upsert (ydb-platform#9489)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Sep 20, 2024
1 parent 51770b3 commit 076a801
Show file tree
Hide file tree
Showing 26 changed files with 282 additions and 130 deletions.
1 change: 1 addition & 0 deletions ydb/core/kqp/ut/common/kqp_ut_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ TKikimrRunner::TKikimrRunner(const TKikimrSettings& settings) {
appConfig.MutableTableServiceConfig()->SetEnableRowsDuplicationCheck(true);
ServerSettings->SetAppConfig(appConfig);
ServerSettings->SetFeatureFlags(settings.FeatureFlags);
ServerSettings->FeatureFlags.SetEnableImmediateWritingOnBulkUpsert(true);
ServerSettings->SetNodeCount(settings.NodeCount);
ServerSettings->SetEnableKqpSpilling(enableSpilling);
ServerSettings->SetEnableDataColumnForIndexTable(true);
Expand Down
9 changes: 1 addition & 8 deletions ydb/core/kqp/ut/olap/indexes_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {

{
auto alterQuery = TStringBuilder() <<
"ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_OPTIONS, SCHEME_NEED_ACTUALIZATION=`true`, EXTERNAL_GUARANTEE_EXCLUSIVE_PK=`true`);";
"ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_OPTIONS, SCHEME_NEED_ACTUALIZATION=`true`);";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
Expand Down Expand Up @@ -336,13 +336,6 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
}
{
auto alterQuery = TStringBuilder() <<
"ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_OPTIONS, EXTERNAL_GUARANTEE_EXCLUSIVE_PK=`true`);";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
}

std::vector<TString> uids;
std::vector<TString> resourceIds;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8172,7 +8172,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
csController->WaitCompactions(TDuration::Seconds(5));
}

testHelper.ReadData("SELECT value FROM `/Root/ColumnTableTest`", "[[#];[#];[[42u]];[[43u]]]");
testHelper.ReadData("SELECT value FROM `/Root/ColumnTableTest` ORDER BY value", "[[#];[#];[[42u]];[[43u]]]");
}

Y_UNIT_TEST(DropThenAddColumn) {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/data_events.proto
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ message TEvWrite {
repeated uint32 ColumnIds = 3 [packed = true];
optional uint64 PayloadIndex = 4;
optional EDataFormat PayloadFormat = 5;
optional string PayloadSchema = 6;
}

// Transaction operations
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/feature_flags.proto
Original file line number Diff line number Diff line change
Expand Up @@ -162,4 +162,5 @@ message TFeatureFlags {
optional bool EnableExternalDataSourcesOnServerless = 143 [default = true];
optional bool EnableSparsedColumns = 144 [default = false];
optional bool EnableParameterizedDecimal = 145 [default = false];
optional bool EnableImmediateWritingOnBulkUpsert = 146 [default = false];
}
52 changes: 27 additions & 25 deletions ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,11 @@
namespace NKikimr::NColumnShard {

bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSerializedBatch& batch, const TInsertWriteId writeId) {
NKikimrTxColumnShard::TLogicalMetadata meta;
meta.SetNumRows(batch->GetRowsCount());
meta.SetRawBytes(batch->GetRawBytes());
meta.SetDirtyWriteTimeSeconds(batch.GetStartInstant().Seconds());
meta.SetSpecialKeysRawData(batch->GetSpecialKeysFullSafe());
meta.SetSpecialKeysPayloadData(batch->GetSpecialKeysPayloadSafe());

const auto& blobRange = batch.GetRange();
Y_ABORT_UNLESS(blobRange.GetBlobId().IsValid());
auto userData = batch.BuildInsertionUserData(*Self);
NOlap::TInsertedData insertData(writeId, userData);

// First write wins
TBlobGroupSelector dsGroupSelector(Self->Info());
NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector);

const auto& writeMeta = batch.GetAggregation().GetWriteMeta();
meta.SetModificationType(TEnumOperator<NEvWrite::EModificationType>::SerializeToProto(writeMeta.GetModificationType()));
*meta.MutableSchemaSubset() = batch.GetAggregation().GetSchemaSubset().SerializeToProto();
auto schemeVersion = batch.GetAggregation().GetSchemaVersion();
auto tableSchema = Self->TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetSchemaVerified(schemeVersion);

auto userData = std::make_shared<NOlap::TUserData>(writeMeta.GetTableId(), blobRange, meta, tableSchema->GetVersion(), batch->GetData());
NOlap::TInsertedData insertData(writeId, userData);
bool ok = Self->InsertTable->Insert(dbTable, std::move(insertData));
if (ok) {
Self->UpdateInsertTableCounters();
Expand All @@ -36,6 +19,18 @@ bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSeriali
return false;
}

bool TTxWrite::CommitOneBlob(TTransactionContext& txc, const NOlap::TWideSerializedBatch& batch, const TInsertWriteId writeId) {
auto userData = batch.BuildInsertionUserData(*Self);
TBlobGroupSelector dsGroupSelector(Self->Info());
NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector);
NOlap::TCommittedData commitData(userData, Self->GetLastPlannedSnapshot(), Self->Generation(), writeId);
if (Self->TablesManager.HasTable(userData->GetPathId())) {
Self->InsertTable->CommitEphemeral(dbTable, std::move(commitData));
}
Self->UpdateInsertTableCounters();
return true;
}

bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
TMemoryProfileGuard mpg("TTxWrite::Execute");
NActors::TLogContextGuard logGuard =
Expand Down Expand Up @@ -65,10 +60,17 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
operation = Self->OperationsManager->GetOperationVerified((TOperationWriteId)writeMeta.GetWriteId());
Y_ABORT_UNLESS(operation->GetStatus() == EOperationStatus::Started);
for (auto&& i : aggr->GetSplittedBlobs()) {
const TInsertWriteId insertWriteId = Self->InsertTable->BuildNextWriteId(txc);
aggr->AddInsertWriteId(insertWriteId);
AFL_VERIFY(InsertOneBlob(txc, i, insertWriteId))("write_id", writeMeta.GetWriteId())("insert_write_id", insertWriteId)(
"size", aggr->GetSplittedBlobs().size());
if (operation->GetBehaviour() == EOperationBehaviour::NoTxWrite) {
static TAtomicCounter Counter = 0;
const TInsertWriteId insertWriteId = (TInsertWriteId)Counter.Inc();
AFL_VERIFY(CommitOneBlob(txc, i, insertWriteId))("write_id", writeMeta.GetWriteId())("insert_write_id", insertWriteId)(
"size", aggr->GetSplittedBlobs().size());
} else {
const TInsertWriteId insertWriteId = Self->InsertTable->BuildNextWriteId(txc);
aggr->AddInsertWriteId(insertWriteId);
AFL_VERIFY(InsertOneBlob(txc, i, insertWriteId))("write_id", writeMeta.GetWriteId())("insert_write_id", insertWriteId)(
"size", aggr->GetSplittedBlobs().size());
}
}
}
}
Expand All @@ -92,8 +94,6 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
if (operation->GetBehaviour() == EOperationBehaviour::NoTxWrite) {
auto ev = NEvents::TDataEvents::TEvWriteResult::BuildCompleted(Self->TabletID());
Results.emplace_back(std::move(ev), writeMeta.GetSource(), operation->GetCookie());
Self->OperationsManager->AddTemporaryTxLink(operation->GetLockId());
Self->OperationsManager->CommitTransactionOnExecute(*Self, operation->GetLockId(), txc, Self->GetLastTxSnapshot());
} else if (operation->GetBehaviour() == EOperationBehaviour::InTxWrite) {
NKikimrTxColumnShard::TCommitWriteTxBody proto;
proto.SetLockId(operation->GetLockId());
Expand Down Expand Up @@ -156,13 +156,15 @@ void TTxWrite::Complete(const TActorContext& ctx) {
Self->GetOperationsManager().AddEventForLock(*Self, op->GetLockId(), evWrite);
}
if (op->GetBehaviour() == EOperationBehaviour::NoTxWrite) {
Self->OperationsManager->AddTemporaryTxLink(op->GetLockId());
Self->OperationsManager->CommitTransactionOnComplete(*Self, op->GetLockId(), Self->GetLastTxSnapshot());
}
}
Self->Counters.GetCSCounters().OnWriteTxComplete(now - writeMeta.GetWriteStartInstant());
Self->Counters.GetCSCounters().OnSuccessWriteResponse();
}
Self->Counters.GetTabletCounters()->IncCounter(COUNTER_IMMEDIATE_TX_COMPLETED);
Self->SetupIndexation();
}

} // namespace NKikimr::NColumnShard
5 changes: 3 additions & 2 deletions ydb/core/tx/columnshard/blobs_action/transaction/tx_write.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ class TTxWrite : public NTabletFlatExecutor::TTransactionBase<TColumnShard> {
TEvPrivate::TEvWriteBlobsResult::TPtr PutBlobResult;
const ui32 TabletTxNo;

bool CommitOneBlob(TTransactionContext& txc, const NOlap::TWideSerializedBatch& batch, const TInsertWriteId writeId);
bool InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSerializedBatch& batch, const TInsertWriteId writeId);

class TReplyInfo {
private:
std::unique_ptr<NActors::IEventBase> Event;
Expand All @@ -43,8 +46,6 @@ class TTxWrite : public NTabletFlatExecutor::TTransactionBase<TColumnShard> {
std::vector<std::shared_ptr<TTxController::ITransactionOperator>> ResultOperators;


bool InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSerializedBatch& batch, const TInsertWriteId writeId);

TStringBuilder TxPrefix() const {
return TStringBuilder() << "TxWrite[" << ToString(TabletTxNo) << "] ";
}
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/columnshard/columnshard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,10 @@ class TColumnShard
public:
ui64 TabletTxCounter = 0;

const TTablesManager& GetTablesManager() const {
return TablesManager;
}

bool HasLongTxWrites(const TInsertWriteId insertWriteId) const {
return LongTxWrites.contains(insertWriteId);
}
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/engines/insert_table/committed.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ class TCommittedData: public TUserDataContainer {
, DedupId(dedupId) {
}

TCommittedData(const std::shared_ptr<TUserData>& userData, const TSnapshot& ss, const TInsertWriteId insertWriteId)
TCommittedData(const std::shared_ptr<TUserData>& userData, const TSnapshot& ss, const ui64 generation, const TInsertWriteId ephemeralWriteId)
: TBase(userData)
, Snapshot(ss)
, DedupId(ToString(ss.GetPlanStep()) + ":" + ToString((ui64)insertWriteId)) {
, DedupId(ToString(generation) + ":" + ToString(ephemeralWriteId)) {
}

void SetRemove() {
Expand Down
16 changes: 16 additions & 0 deletions ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,22 @@ TInsertionSummary::TCounters TInsertTable::Commit(
return counters;
}

TInsertionSummary::TCounters TInsertTable::CommitEphemeral(IDbWrapper& dbTable, TCommittedData&& data) {
TInsertionSummary::TCounters counters;
counters.Rows += data.GetMeta().GetNumRows();
counters.RawBytes += data.GetMeta().GetRawBytes();
counters.Bytes += data.BlobSize();

AddBlobLink(data.GetBlobRange().BlobId);
const ui64 pathId = data.GetPathId();
auto& pathInfo = Summary.GetPathInfo(pathId);
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "commit_insertion")("path_id", pathId)("blob_range", data.GetBlobRange().ToString());
dbTable.Commit(data);
pathInfo.AddCommitted(std::move(data));

return counters;
}

void TInsertTable::Abort(IDbWrapper& dbTable, const THashSet<TInsertWriteId>& writeIds) {
Y_ABORT_UNLESS(!writeIds.empty());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ class TInsertTable: public TInsertTableAccessor {
bool Insert(IDbWrapper& dbTable, TInsertedData&& data);
TInsertionSummary::TCounters Commit(
IDbWrapper& dbTable, ui64 planStep, ui64 txId, const THashSet<TInsertWriteId>& writeIds, std::function<bool(ui64)> pathExists);
TInsertionSummary::TCounters CommitEphemeral(IDbWrapper& dbTable, TCommittedData&& data);
void Abort(IDbWrapper& dbTable, const THashSet<TInsertWriteId>& writeIds);
void MarkAsNotAbortable(const TInsertWriteId writeId) {
Summary.MarkAsNotAbortable(writeId);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/engines/insert_table/inserted.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

namespace NKikimr::NOlap {

TCommittedData TInsertedData::Commit(const ui64 planStep, const ui64 txId) {
TCommittedData TInsertedData::Commit(const ui64 planStep, const ui64 txId) const {
return TCommittedData(UserData, planStep, txId, InsertWriteId);
}

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/engines/insert_table/inserted.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class TInsertedData: public TUserDataContainer {
/// One of them wins and becomes committed. Original DedupId would be lost then.
/// After commit we use original Initiator:WriteId as DedupId of inserted blob inside {PlanStep, TxId}.
/// pathId, initiator, {writeId}, {dedupId} -> pathId, planStep, txId, {dedupId}
[[nodiscard]] TCommittedData Commit(const ui64 planStep, const ui64 txId);
[[nodiscard]] TCommittedData Commit(const ui64 planStep, const ui64 txId) const;
};

} // namespace NKikimr::NOlap
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ TConclusionStatus TStartMergeTask::DoExecuteImpl() {
break;
}
}
if (MergingContext->IsExclusiveInterval() && sourcesInMemory) {
if ((MergingContext->IsExclusiveInterval() || Context->GetCommonContext()->GetReadMetadata()->HasGuaranteeExclusivePK()) &&
sourcesInMemory) {
TMemoryProfileGuard mGuard("SCAN_PROFILE::MERGE::EXCLUSIVE", IS_DEBUG_LOG_ENABLED(NKikimrServices::TX_COLUMNSHARD_SCAN_MEMORY));
auto& container = Sources.begin()->second->GetStageResult().GetBatch();
if (container && container->num_rows()) {
Expand Down
14 changes: 8 additions & 6 deletions ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -451,8 +451,8 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
engine.Load(db);

std::vector<TCommittedData> dataToIndex = {
TCommittedData(TUserData::Build(paths[0], blobRanges[0], TLocalHelper::GetMetaProto(), 0, {}), TSnapshot(1, 2), (TInsertWriteId)2),
TCommittedData(TUserData::Build(paths[0], blobRanges[1], TLocalHelper::GetMetaProto(), 0, {}), TSnapshot(2, 1), (TInsertWriteId)1)
TCommittedData(TUserData::Build(paths[0], blobRanges[0], TLocalHelper::GetMetaProto(), 0, {}), TSnapshot(1, 2), 0, (TInsertWriteId)2),
TCommittedData(TUserData::Build(paths[0], blobRanges[1], TLocalHelper::GetMetaProto(), 0, {}), TSnapshot(2, 1), 0, (TInsertWriteId)1)
};

// write
Expand Down Expand Up @@ -553,7 +553,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
std::vector<TCommittedData> dataToIndex;
TSnapshot ss(planStep, txId);
dataToIndex.push_back(
TCommittedData(TUserData::Build(pathId, blobRange, TLocalHelper::GetMetaProto(), 0, {}), ss, (TInsertWriteId)txId));
TCommittedData(TUserData::Build(pathId, blobRange, TLocalHelper::GetMetaProto(), 0, {}), ss, 0, (TInsertWriteId)txId));

bool ok = Insert(engine, db, ss, std::move(dataToIndex), blobs, step);
UNIT_ASSERT(ok);
Expand Down Expand Up @@ -651,7 +651,8 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
// PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata]
std::vector<TCommittedData> dataToIndex;
TSnapshot ss(planStep, txId);
dataToIndex.push_back(TCommittedData(TUserData::Build(pathId, blobRange, TLocalHelper::GetMetaProto(), 0, {}), ss, (TInsertWriteId)txId));
dataToIndex.push_back(
TCommittedData(TUserData::Build(pathId, blobRange, TLocalHelper::GetMetaProto(), 0, {}), ss, 0, (TInsertWriteId)txId));

bool ok = Insert(engine, db, ss, std::move(dataToIndex), blobs, step);
blobsAll.Merge(std::move(blobs));
Expand Down Expand Up @@ -682,7 +683,8 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
// PlanStep, TxId, PathId, DedupId, BlobId, Data, [Metadata]
std::vector<TCommittedData> dataToIndex;
TSnapshot ss(planStep, txId);
dataToIndex.push_back(TCommittedData(TUserData::Build(pathId, blobRange, TLocalHelper::GetMetaProto(), 0, {}), ss, TInsertWriteId(txId)));
dataToIndex.push_back(
TCommittedData(TUserData::Build(pathId, blobRange, TLocalHelper::GetMetaProto(), 0, {}), ss, 0, TInsertWriteId(txId)));

bool ok = Insert(engine, db, ss, std::move(dataToIndex), blobs, step);
UNIT_ASSERT(ok);
Expand Down Expand Up @@ -730,7 +732,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
TSnapshot ss(planStep, txId);
std::vector<TCommittedData> dataToIndex;
dataToIndex.push_back(
TCommittedData(TUserData::Build(pathId, blobRange, TLocalHelper::GetMetaProto(), 0, {}), ss, TInsertWriteId(txId)));
TCommittedData(TUserData::Build(pathId, blobRange, TLocalHelper::GetMetaProto(), 0, {}), ss, 0, TInsertWriteId(txId)));

bool ok = Insert(engine, db, ss, std::move(dataToIndex), blobs, step);
UNIT_ASSERT(ok);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
#include "indexed_blob_constructor.h"

#include <ydb/core/tx/columnshard/defs.h>
#include <ydb/core/tx/columnshard/blob.h>
#include <ydb/core/tx/columnshard/columnshard_impl.h>
#include <ydb/core/tx/columnshard/columnshard_private_events.h>

#include <ydb/core/tx/columnshard/defs.h>

namespace NKikimr::NOlap {

TIndexedWriteController::TIndexedWriteController(const TActorId& dstActor, const std::shared_ptr<IBlobsWritingAction>& action, std::vector<std::shared_ptr<TWriteAggregation>>&& aggregations)
TIndexedWriteController::TIndexedWriteController(
const TActorId& dstActor, const std::shared_ptr<IBlobsWritingAction>& action, std::vector<std::shared_ptr<TWriteAggregation>>&& aggregations)
: Buffer(action, std::move(aggregations))
, DstActor(dstActor)
{
, DstActor(dstActor) {
auto blobs = Buffer.GroupIntoBlobs();
for (auto&& b : blobs) {
auto& task = AddWriteTask(TBlobWriteInfo::BuildWriteTask(b.ExtractBlobData(), action));
Expand All @@ -33,6 +33,26 @@ void TWideSerializedBatch::InitBlobId(const TUnifiedBlobId& id) {
Range.BlobId = id;
}

std::shared_ptr<NKikimr::NOlap::TUserData> TWideSerializedBatch::BuildInsertionUserData(const NColumnShard::TColumnShard& owner) const {
NKikimrTxColumnShard::TLogicalMetadata meta;
meta.SetNumRows(SplittedBlobs.GetRowsCount());
meta.SetRawBytes(SplittedBlobs.GetRawBytes());
meta.SetDirtyWriteTimeSeconds(GetStartInstant().Seconds());
meta.SetSpecialKeysRawData(SplittedBlobs.GetSpecialKeysFullSafe());
meta.SetSpecialKeysPayloadData(SplittedBlobs.GetSpecialKeysPayloadSafe());

const auto& blobRange = Range;
Y_ABORT_UNLESS(blobRange.GetBlobId().IsValid());

const auto& writeMeta = GetAggregation().GetWriteMeta();
meta.SetModificationType(TEnumOperator<NEvWrite::EModificationType>::SerializeToProto(writeMeta.GetModificationType()));
*meta.MutableSchemaSubset() = GetAggregation().GetSchemaSubset().SerializeToProto();
auto schemeVersion = GetAggregation().GetSchemaVersion();
auto tableSchema = owner.GetTablesManager().GetPrimaryIndex()->GetVersionedIndex().GetSchemaVerified(schemeVersion);

return std::make_shared<NOlap::TUserData>(writeMeta.GetTableId(), blobRange, meta, tableSchema->GetVersion(), SplittedBlobs.GetData());
}

void TWritingBuffer::InitReadyInstant(const TMonotonic instant) {
for (auto&& aggr : Aggregations) {
aggr->MutableWriteMeta().SetWriteMiddle5StartInstant(instant);
Expand Down Expand Up @@ -89,4 +109,4 @@ TString TWritingBlob::ExtractBlobData() {
return result;
}

}
} // namespace NKikimr::NOlap
Loading

0 comments on commit 076a801

Please sign in to comment.