Skip to content

Commit

Permalink
Fixed session per file in CLI ydb import file (ydb-platform#7785)
Browse files Browse the repository at this point in the history
  • Loading branch information
Gazizonoki authored Aug 14, 2024
1 parent 79898e6 commit 0982055
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 100 deletions.
176 changes: 81 additions & 95 deletions ydb/public/lib/ydb_cli/import/import.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,43 @@ TStatus WaitForQueue(const size_t maxQueueSize, std::vector<TAsyncStatus>& inFli
return MakeStatus();
}

void InitCsvParser(TCsvParser& parser,
bool& removeLastDelimiter,
TString&& defaultHeader,
const TImportFileSettings& settings,
const std::map<TString, TType>* columnTypes,
const NTable::TTableDescription* dbTableInfo) {
if (settings.Header_ || settings.HeaderRow_) {
TString headerRow;
if (settings.Header_) {
headerRow = std::move(defaultHeader);
}
if (settings.HeaderRow_) {
headerRow = settings.HeaderRow_;
}
if (headerRow.EndsWith("\r\n")) {
headerRow.erase(headerRow.Size() - 2);
}
if (headerRow.EndsWith("\n")) {
headerRow.erase(headerRow.Size() - 1);
}
if (headerRow.EndsWith(settings.Delimiter_)) {
removeLastDelimiter = true;
headerRow.erase(headerRow.Size() - settings.Delimiter_.Size());
}
parser = TCsvParser(std::move(headerRow), settings.Delimiter_[0], settings.NullValue_, columnTypes);
return;
}

TVector<TString> columns;
Y_ENSURE_BT(dbTableInfo);
for (const auto& column : dbTableInfo->GetColumns()) {
columns.push_back(column.Name);
}
parser = TCsvParser(std::move(columns), settings.Delimiter_[0], settings.NullValue_, columnTypes);
return;
}

FHANDLE GetStdinFileno() {
#if defined(_win32_)
return GetStdHandle(STD_INPUT_HANDLE);
Expand Down Expand Up @@ -222,9 +259,8 @@ class TCsvFileReader {
} // namespace

TImportFileClient::TImportFileClient(const TDriver& driver, const TClientCommand::TConfig& rootConfig)
: OperationClient(std::make_shared<NOperation::TOperationClient>(driver))
: TableClient(std::make_shared<NTable::TTableClient>(driver))
, SchemeClient(std::make_shared<NScheme::TSchemeClient>(driver))
, TableClient(std::make_shared<NTable::TTableClient>(driver))
{
RetrySettings
.MaxRetries(TImportFileSettings::MaxRetries)
Expand All @@ -239,11 +275,25 @@ TStatus TImportFileClient::Import(const TVector<TString>& filePaths, const TStri
TStringBuilder() << "Illegal delimiter for TSV format, only tab is allowed");
}

auto result = NDump::DescribePath(*SchemeClient, dbPath);
auto resultStatus = result.GetStatus();
if (resultStatus != EStatus::SUCCESS) {
return MakeStatus(EStatus::SCHEME_ERROR,
TStringBuilder() << result.GetIssues().ToString() << dbPath);
auto resultStatus = TableClient->RetryOperationSync(
[this, dbPath](NTable::TSession session) {
auto result = session.DescribeTable(dbPath).ExtractValueSync();
if (result.IsSuccess()) {
DbTableInfo = std::make_unique<const NTable::TTableDescription>(result.GetTableDescription());
}
return result;
}, NTable::TRetryOperationSettings{RetrySettings}.MaxRetries(10));

if (!resultStatus.IsSuccess()) {
/// TODO: Remove this after server fix: https://github.com/ydb-platform/ydb/issues/7791
if (resultStatus.GetStatus() == EStatus::SCHEME_ERROR) {
auto describePathResult = NDump::DescribePath(*SchemeClient, dbPath);
if (describePathResult.GetStatus() != EStatus::SUCCESS) {
return MakeStatus(EStatus::SCHEME_ERROR,
TStringBuilder() << describePathResult.GetIssues().ToString() << dbPath);
}
}
return resultStatus;
}

UpsertSettings
Expand Down Expand Up @@ -374,45 +424,13 @@ TStatus TImportFileClient::UpsertCsv(IInputStream& input, const TString& dbPath,

TCountingInput countInput(&input);
NCsvFormat::TLinesSplitter splitter(countInput);
TCsvParser parser;
bool RemoveLastDelimiter = false;

NTable::TCreateSessionResult sessionResult = TableClient->GetSession(NTable::TCreateSessionSettings()).GetValueSync();
if (!sessionResult.IsSuccess())
return sessionResult;
NTable::TDescribeTableResult tableResult = sessionResult.GetSession().DescribeTable(dbPath).GetValueSync();
if (!tableResult.IsSuccess())
return tableResult;
auto columnTypes = GetColumnTypes();
ValidateTValueUpsertTable();

auto columnTypes = GetColumnTypes(tableResult.GetTableDescription());
ValidateTable(tableResult.GetTableDescription());

if (settings.Header_ || settings.HeaderRow_) {
TString headerRow;
if (settings.Header_) {
headerRow = splitter.ConsumeLine();
}
if (settings.HeaderRow_) {
headerRow = settings.HeaderRow_;
}
if (headerRow.EndsWith("\r\n")) {
headerRow.erase(headerRow.Size() - 2);
}
if (headerRow.EndsWith("\n")) {
headerRow.erase(headerRow.Size() - 1);
}
if (headerRow.EndsWith(settings.Delimiter_)) {
RemoveLastDelimiter = true;
headerRow.erase(headerRow.Size() - settings.Delimiter_.Size());
}
parser = TCsvParser(std::move(headerRow), settings.Delimiter_[0], settings.NullValue_, &columnTypes);
} else {
TVector<TString> columns;
for (const auto& column : tableResult.GetTableDescription().GetColumns()) {
columns.push_back(column.Name);
}
parser = TCsvParser(std::move(columns), settings.Delimiter_[0], settings.NullValue_, &columnTypes);
}
TCsvParser parser;
bool removeLastDelimiter = false;
InitCsvParser(parser, removeLastDelimiter, splitter.ConsumeLine(), settings, &columnTypes, DbTableInfo.get());

for (ui32 i = 0; i < settings.SkipRows_; ++i) {
splitter.ConsumeLine();
Expand Down Expand Up @@ -450,7 +468,7 @@ TStatus TImportFileClient::UpsertCsv(IInputStream& input, const TString& dbPath,
readBytes += line.Size();
batchBytes += line.Size();

if (RemoveLastDelimiter) {
if (removeLastDelimiter) {
if (!line.EndsWith(settings.Delimiter_)) {
return MakeStatus(EStatus::BAD_REQUEST,
"According to the header, lines should end with a delimiter");
Expand Down Expand Up @@ -498,42 +516,14 @@ TStatus TImportFileClient::UpsertCsv(IInputStream& input, const TString& dbPath,
TStatus TImportFileClient::UpsertCsvByBlocks(const TString& filePath, const TString& dbPath, const TImportFileSettings& settings) {
TMaxInflightGetter inFlightGetter(settings.MaxInFlightRequests_, FilesCount);
TString headerRow;
TCsvParser parser;
TCsvFileReader splitter(filePath, settings, headerRow, inFlightGetter);
bool RemoveLastDelimiter = false;

NTable::TCreateSessionResult sessionResult = TableClient->GetSession(NTable::TCreateSessionSettings()).GetValueSync();
if (!sessionResult.IsSuccess())
return sessionResult;
NTable::TDescribeTableResult tableResult = sessionResult.GetSession().DescribeTable(dbPath).GetValueSync();
if (!tableResult.IsSuccess())
return tableResult;

auto columnTypes = GetColumnTypes(tableResult.GetTableDescription());
ValidateTable(tableResult.GetTableDescription());
auto columnTypes = GetColumnTypes();
ValidateTValueUpsertTable();

if (settings.Header_ || settings.HeaderRow_) {
if (settings.HeaderRow_) {
headerRow = settings.HeaderRow_;
}
if (headerRow.EndsWith("\r\n")) {
headerRow.erase(headerRow.Size() - 2);
}
if (headerRow.EndsWith("\n")) {
headerRow.erase(headerRow.Size() - 1);
}
if (headerRow.EndsWith(settings.Delimiter_)) {
RemoveLastDelimiter = true;
headerRow.erase(headerRow.Size() - settings.Delimiter_.Size());
}
parser = TCsvParser(std::move(headerRow), settings.Delimiter_[0], settings.NullValue_, &columnTypes);
} else {
TVector<TString> columns;
for (const auto& column : tableResult.GetTableDescription().GetColumns()) {
columns.push_back(column.Name);
}
parser = TCsvParser(std::move(columns), settings.Delimiter_[0], settings.NullValue_, &columnTypes);
}
TCsvParser parser;
bool removeLastDelimiter = false;
InitCsvParser(parser, removeLastDelimiter, std::move(headerRow), settings, &columnTypes, DbTableInfo.get());

TType lineType = parser.GetColumnsType();

Expand Down Expand Up @@ -565,7 +555,7 @@ TStatus TImportFileClient::UpsertCsvByBlocks(const TString& filePath, const TStr
}
readBytes += line.size();
batchBytes += line.size();
if (RemoveLastDelimiter) {
if (removeLastDelimiter) {
if (!line.EndsWith(settings.Delimiter_)) {
return MakeStatus(EStatus::BAD_REQUEST,
"According to the header, lines should end with a delimiter");
Expand Down Expand Up @@ -611,15 +601,8 @@ TStatus TImportFileClient::UpsertCsvByBlocks(const TString& filePath, const TStr

TStatus TImportFileClient::UpsertJson(IInputStream& input, const TString& dbPath, const TImportFileSettings& settings,
std::optional<ui64> inputSizeHint, ProgressCallbackFunc & progressCallback) {
NTable::TCreateSessionResult sessionResult = TableClient->GetSession(NTable::TCreateSessionSettings()).GetValueSync();
if (!sessionResult.IsSuccess())
return sessionResult;
NTable::TDescribeTableResult tableResult = sessionResult.GetSession().DescribeTable(dbPath).GetValueSync();
if (!tableResult.IsSuccess())
return tableResult;

const TType tableType = GetTableType(tableResult.GetTableDescription());
ValidateTable(tableResult.GetTableDescription());
const TType tableType = GetTableType();
ValidateTValueUpsertTable();
const NYdb::EBinaryStringEncoding stringEncoding =
(settings.Format_ == EOutputFormat::JsonBase64) ? NYdb::EBinaryStringEncoding::Base64 :
NYdb::EBinaryStringEncoding::Unicode;
Expand Down Expand Up @@ -818,36 +801,39 @@ TAsyncStatus TImportFileClient::UpsertParquetBuffer(const TString& dbPath, const
return TableClient->RetryOperation(upsert, RetrySettings);
}

TType TImportFileClient::GetTableType(const NTable::TTableDescription& tableDescription) {
TType TImportFileClient::GetTableType() {
TTypeBuilder typeBuilder;
typeBuilder.BeginStruct();
const auto& columns = tableDescription.GetTableColumns();
Y_ENSURE_BT(DbTableInfo);
const auto& columns = DbTableInfo->GetTableColumns();
for (auto it = columns.begin(); it != columns.end(); it++) {
typeBuilder.AddMember((*it).Name, (*it).Type);
}
typeBuilder.EndStruct();
return typeBuilder.Build();
}

std::map<TString, TType> TImportFileClient::GetColumnTypes(const NTable::TTableDescription& tableDescription) {
std::map<TString, TType> TImportFileClient::GetColumnTypes() {
std::map<TString, TType> columnTypes;
const auto& columns = tableDescription.GetTableColumns();
Y_ENSURE_BT(DbTableInfo);
const auto& columns = DbTableInfo->GetTableColumns();
for (auto it = columns.begin(); it != columns.end(); it++) {
columnTypes.insert({(*it).Name, (*it).Type});
}
return columnTypes;
}

void TImportFileClient::ValidateTable(const NTable::TTableDescription& tableDescription) {
auto columnTypes = GetColumnTypes(tableDescription);
void TImportFileClient::ValidateTValueUpsertTable() {
auto columnTypes = GetColumnTypes();
bool hasPgType = false;
for (const auto& [_, type] : columnTypes) {
if (TTypeParser(type).GetKind() == TTypeParser::ETypeKind::Pg) {
hasPgType = true;
break;
}
}
if (tableDescription.GetStoreType() == NTable::EStoreType::Column && hasPgType) {
Y_ENSURE_BT(DbTableInfo);
if (DbTableInfo->GetStoreType() == NTable::EStoreType::Column && hasPgType) {
throw TMisuseException() << "Import into column table with Pg type columns in not supported";
}
}
Expand Down
12 changes: 7 additions & 5 deletions ydb/public/lib/ydb_cli/import/import.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,14 @@ class TImportFileClient {
TStatus Import(const TVector<TString>& fsPaths, const TString& dbPath, const TImportFileSettings& settings = {});

private:
std::shared_ptr<NOperation::TOperationClient> OperationClient;
std::shared_ptr<NScheme::TSchemeClient> SchemeClient;
std::shared_ptr<NTable::TTableClient> TableClient;
std::shared_ptr<NScheme::TSchemeClient> SchemeClient;

NTable::TBulkUpsertSettings UpsertSettings;
NTable::TRetryOperationSettings RetrySettings;

std::unique_ptr<const NTable::TTableDescription> DbTableInfo;

std::atomic<ui64> FilesCount;

static constexpr ui32 VerboseModeReadSize = 1 << 27; // 100 MB
Expand All @@ -86,13 +87,14 @@ class TImportFileClient {

TStatus UpsertJson(IInputStream &input, const TString &dbPath, const TImportFileSettings &settings,
std::optional<ui64> inputSizeHint, ProgressCallbackFunc & progressCallback);
TType GetTableType(const NTable::TTableDescription& tableDescription);
std::map<TString, TType> GetColumnTypes(const NTable::TTableDescription& tableDescription);
void ValidateTable(const NTable::TTableDescription& tableDescription);

TStatus UpsertParquet(const TString& filename, const TString& dbPath, const TImportFileSettings& settings,
ProgressCallbackFunc & progressCallback);
TAsyncStatus UpsertParquetBuffer(const TString& dbPath, const TString& buffer, const TString& strSchema);

TType GetTableType();
std::map<TString, TType> GetColumnTypes();
void ValidateTValueUpsertTable();
};

}
Expand Down

0 comments on commit 0982055

Please sign in to comment.