Skip to content

Commit

Permalink
Add import ACL from S3 for tables (#9181)
Browse files Browse the repository at this point in the history
  • Loading branch information
pixcc authored Sep 13, 2024
1 parent f405ee4 commit 3dce9fe
Show file tree
Hide file tree
Showing 18 changed files with 482 additions and 24 deletions.
6 changes: 6 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard__init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4380,6 +4380,12 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
item.Scheme = scheme;
}

if (rowset.HaveValue<Schema::ImportItems::Permissions>()) {
Ydb::Scheme::ModifyPermissionsRequest permissions;
Y_ABORT_UNLESS(ParseFromStringNoSizeLimit(permissions, rowset.GetValue<Schema::ImportItems::Permissions>()));
item.Permissions = permissions;
}

item.State = static_cast<TImportInfo::EState>(rowset.GetValue<Schema::ImportItems::State>());
item.WaitTxId = rowset.GetValueOrDefault<Schema::ImportItems::WaitTxId>(InvalidTxId);
item.NextIndexIdx = rowset.GetValueOrDefault<Schema::ImportItems::NextIndexIdx>(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,9 @@ TVector<ISubOperation::TPtr> CreateIndexedTable(TOperationId nextId, const TTxTr
if (tx.HasAlterUserAttributes()) {
scheme.MutableAlterUserAttributes()->CopyFrom(tx.GetAlterUserAttributes());
}
if (tx.HasModifyACL()) {
scheme.MutableModifyACL()->CopyFrom(tx.GetModifyACL());
}

result.push_back(CreateNewTable(NextPartId(nextId, result), scheme, sequences));
}
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_import.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,11 @@ void TSchemeShard::PersistImportItemScheme(NIceDb::TNiceDb& db, const TImportInf
db.Table<Schema::ImportItems>().Key(importInfo->Id, itemIdx).Update(
NIceDb::TUpdate<Schema::ImportItems::Scheme>(item.Scheme.SerializeAsString())
);
if (item.Permissions.Defined()) {
db.Table<Schema::ImportItems>().Key(importInfo->Id, itemIdx).Update(
NIceDb::TUpdate<Schema::ImportItems::Permissions>(item.Permissions->SerializeAsString())
);
}
}

void TSchemeShard::PersistImportItemDstPathId(NIceDb::TNiceDb& db, const TImportInfo::TPtr importInfo, ui32 itemIdx) {
Expand Down
14 changes: 10 additions & 4 deletions ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#include <ydb/core/base/path.h>
#include <ydb/core/ydb_convert/table_description.h>
#include <ydb/core/ydb_convert/ydb_convert.h>

namespace NKikimr {
namespace NSchemeShard {
Expand All @@ -20,10 +21,6 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateTablePropose(
auto propose = MakeHolder<TEvSchemeShard::TEvModifySchemeTransaction>(ui64(txId), ss->TabletID());
auto& record = propose->Record;

if (importInfo->UserSID) {
record.SetOwner(*importInfo->UserSID);
}

auto& modifyScheme = *record.AddTransaction();
modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateIndexedTable);
modifyScheme.SetInternal(true);
Expand Down Expand Up @@ -66,6 +63,15 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateTablePropose(
}
}

if (importInfo->UserSID) {
record.SetOwner(*importInfo->UserSID);
}
FillOwner(record, item.Permissions);

if (!FillACL(modifyScheme, item.Permissions, error)) {
return nullptr;
}

return propose;
}

Expand Down
124 changes: 109 additions & 15 deletions ydb/core/tx/schemeshard/schemeshard_import_scheme_getter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,29 @@ using namespace Aws::Client;
using namespace Aws::S3;
using namespace Aws;

// Downloads scheme-related objects from S3
class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
static TString SchemeKeyFromSettings(const Ydb::Import::ImportFromS3Settings& settings, ui32 itemIdx) {
Y_ABORT_UNLESS(itemIdx < (ui32)settings.items_size());
return TStringBuilder() << settings.items(itemIdx).source_prefix() << "/scheme.pb";
}

static TString PermissionsKeyFromSettings(const Ydb::Import::ImportFromS3Settings& settings, ui32 itemIdx) {
Y_ABORT_UNLESS(itemIdx < (ui32)settings.items_size());
return TStringBuilder() << settings.items(itemIdx).source_prefix() << "/permissions.pb";
}

void HeadObject(const TString& key) {
auto request = Model::HeadObjectRequest()
.WithKey(key);

Send(Client, new TEvExternalStorage::TEvHeadObjectRequest(request));
}

void Handle(TEvExternalStorage::TEvHeadObjectResponse::TPtr& ev) {
void HandleScheme(TEvExternalStorage::TEvHeadObjectResponse::TPtr& ev) {
const auto& result = ev->Get()->Result;

LOG_D("Handle TEvExternalStorage::TEvHeadObjectResponse"
LOG_D("HandleScheme TEvExternalStorage::TEvHeadObjectResponse"
<< ": self# " << SelfId()
<< ", result# " << result);

Expand All @@ -51,6 +57,25 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
GetObject(SchemeKey, std::make_pair(0, contentLength - 1));
}

void HandlePermissions(TEvExternalStorage::TEvHeadObjectResponse::TPtr& ev) {
const auto& result = ev->Get()->Result;

LOG_D("HandlePermissions TEvExternalStorage::TEvHeadObjectResponse"
<< ": self# " << SelfId()
<< ", result# " << result);

if (result.GetError().GetErrorType() == S3Errors::RESOURCE_NOT_FOUND
|| result.GetError().GetErrorType() == S3Errors::NO_SUCH_KEY) {
Reply(); // permissions are optional
return;
} else if (!CheckResult(result, "HeadObject")) {
return;
}

const auto contentLength = result.GetResult().GetContentLength();
GetObject(PermissionsKey, std::make_pair(0, contentLength - 1));
}

void GetObject(const TString& key, const std::pair<ui64, ui64>& range) {
auto request = Model::GetObjectRequest()
.WithKey(key)
Expand All @@ -59,11 +84,11 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
Send(Client, new TEvExternalStorage::TEvGetObjectRequest(request));
}

void Handle(TEvExternalStorage::TEvGetObjectResponse::TPtr& ev) {
void HandleScheme(TEvExternalStorage::TEvGetObjectResponse::TPtr& ev) {
const auto& msg = *ev->Get();
const auto& result = msg.Result;

LOG_D("Handle TEvExternalStorage::TEvGetObjectResponse"
LOG_D("HandleScheme TEvExternalStorage::TEvGetObjectResponse"
<< ": self# " << SelfId()
<< ", result# " << result);

Expand All @@ -74,14 +99,46 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
Y_ABORT_UNLESS(ItemIdx < ImportInfo->Items.size());
auto& item = ImportInfo->Items.at(ItemIdx);

LOG_T("Trying to parse"
LOG_T("Trying to parse scheme"
<< ": self# " << SelfId()
<< ", body# " << SubstGlobalCopy(msg.Body, "\n", "\\n"));

if (!google::protobuf::TextFormat::ParseFromString(msg.Body, &item.Scheme)) {
return Reply(false, "Cannot parse scheme");
}

if (NeedDownloadPermissions) {
StartDownloadingPermissions();
} else {
Reply();
}
}

void HandlePermissions(TEvExternalStorage::TEvGetObjectResponse::TPtr& ev) {
const auto& msg = *ev->Get();
const auto& result = msg.Result;

LOG_D("HandlePermissions TEvExternalStorage::TEvGetObjectResponse"
<< ": self# " << SelfId()
<< ", result# " << result);

if (!CheckResult(result, "GetObject")) {
return;
}

Y_ABORT_UNLESS(ItemIdx < ImportInfo->Items.size());
auto& item = ImportInfo->Items.at(ItemIdx);

LOG_T("Trying to parse permissions"
<< ": self# " << SelfId()
<< ", body# " << SubstGlobalCopy(msg.Body, "\n", "\\n"));

Ydb::Scheme::ModifyPermissionsRequest permissions;
if (!google::protobuf::TextFormat::ParseFromString(msg.Body, &permissions)) {
return Reply(false, "Cannot parse permissions");
}
item.Permissions = std::move(permissions);

Reply();
}

Expand Down Expand Up @@ -123,33 +180,67 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
TActor::PassAway();
}

void Download(const TString& key) {
if (Client) {
Send(Client, new TEvents::TEvPoisonPill());
}
Client = RegisterWithSameMailbox(CreateS3Wrapper(ExternalStorageConfig->ConstructStorageOperator()));

HeadObject(key);
}

void DownloadScheme() {
Download(SchemeKey);
}

void DownloadPermissions() {
Download(PermissionsKey);
}

void ResetRetries() {
Attempt = 0;
}

void StartDownloadingPermissions() {
ResetRetries();
DownloadPermissions();
Become(&TThis::StateDownloadPermissions);
}

public:
explicit TSchemeGetter(const TActorId& replyTo, TImportInfo::TPtr importInfo, ui32 itemIdx)
: ExternalStorageConfig(new NWrappers::NExternalStorage::TS3ExternalStorageConfig(importInfo->Settings))
, ReplyTo(replyTo)
, ImportInfo(importInfo)
, ItemIdx(itemIdx)
, SchemeKey(SchemeKeyFromSettings(importInfo->Settings, itemIdx))
, PermissionsKey(PermissionsKeyFromSettings(importInfo->Settings, itemIdx))
, Retries(importInfo->Settings.number_of_retries())
, NeedDownloadPermissions(!importInfo->Settings.no_acl())
{
}

void Bootstrap() {
if (Client) {
Send(Client, new TEvents::TEvPoisonPill());
}
Client = RegisterWithSameMailbox(CreateS3Wrapper(ExternalStorageConfig->ConstructStorageOperator()));
DownloadScheme();
Become(&TThis::StateDownloadScheme);
}

STATEFN(StateDownloadScheme) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvExternalStorage::TEvHeadObjectResponse, HandleScheme);
hFunc(TEvExternalStorage::TEvGetObjectResponse, HandleScheme);

HeadObject(SchemeKey);
Become(&TThis::StateWork);
sFunc(TEvents::TEvWakeup, DownloadScheme);
sFunc(TEvents::TEvPoisonPill, PassAway);
}
}

STATEFN(StateWork) {
STATEFN(StateDownloadPermissions) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvExternalStorage::TEvHeadObjectResponse, Handle);
hFunc(TEvExternalStorage::TEvGetObjectResponse, Handle);
hFunc(TEvExternalStorage::TEvHeadObjectResponse, HandlePermissions);
hFunc(TEvExternalStorage::TEvGetObjectResponse, HandlePermissions);

sFunc(TEvents::TEvWakeup, Bootstrap);
sFunc(TEvents::TEvWakeup, DownloadPermissions);
sFunc(TEvents::TEvPoisonPill, PassAway);
}
}
Expand All @@ -161,13 +252,16 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
const ui32 ItemIdx;

const TString SchemeKey;
const TString PermissionsKey;

const ui32 Retries;
ui32 Attempt = 0;

TDuration Delay = TDuration::Minutes(1);
static constexpr TDuration MaxDelay = TDuration::Minutes(10);

const bool NeedDownloadPermissions = true;

TActorId Client;

}; // TSchemeGetter
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/schemeshard/schemeshard_info_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -2769,6 +2769,7 @@ struct TImportInfo: public TSimpleRefCount<TImportInfo> {
TString DstPathName;
TPathId DstPathId;
Ydb::Table::CreateTableRequest Scheme;
TMaybeFail<Ydb::Scheme::ModifyPermissionsRequest> Permissions;

EState State = EState::GetScheme;
ESubState SubState = ESubState::AllocateTxId;
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -1520,6 +1520,7 @@ struct Schema : NIceDb::Schema {
struct DstPathOwnerId : Column<4, NScheme::NTypeIds::Uint64> { using Type = TOwnerId; };
struct DstPathLocalId : Column<5, NScheme::NTypeIds::Uint64> { using Type = TLocalPathId; };
struct Scheme : Column<6, NScheme::NTypeIds::String> {};
struct Permissions : Column<11, NScheme::NTypeIds::String> {};

struct State : Column<7, NScheme::NTypeIds::Byte> {};
struct WaitTxId : Column<8, NScheme::NTypeIds::Uint64> { using Type = TTxId; };
Expand All @@ -1534,6 +1535,7 @@ struct Schema : NIceDb::Schema {
DstPathOwnerId,
DstPathLocalId,
Scheme,
Permissions,
State,
WaitTxId,
NextIndexIdx,
Expand Down
20 changes: 18 additions & 2 deletions ydb/core/tx/schemeshard/ut_helpers/ls_checks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1207,9 +1207,9 @@ TCheckFunc HasOwner(const TString& owner) {
};
}

void CheckEffectiveRight(const NKikimrScheme::TEvDescribeSchemeResult& record, const TString& right, bool mustHave) {
void CheckRight(const NKikimrScheme::TEvDescribeSchemeResult& record, const TString& right, bool mustHave, bool isEffective) {
const auto& self = record.GetPathDescription().GetSelf();
TSecurityObject src(self.GetOwner(), self.GetEffectiveACL(), false);
TSecurityObject src(self.GetOwner(), isEffective ? self.GetEffectiveACL() : self.GetACL(), false);

NACLib::TSecurityObject required;
required.FromString(right);
Expand All @@ -1233,6 +1233,22 @@ void CheckEffectiveRight(const NKikimrScheme::TEvDescribeSchemeResult& record, c
}
}

TCheckFunc HasRight(const TString& right) {
return [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) {
CheckRight(record, right, true, true);
};
}

TCheckFunc HasNotRight(const TString& right) {
return [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) {
CheckRight(record, right, false, true);
};
}

void CheckEffectiveRight(const NKikimrScheme::TEvDescribeSchemeResult& record, const TString& right, bool mustHave) {
CheckRight(record, right, mustHave, true);
}

TCheckFunc HasEffectiveRight(const TString& right) {
return [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) {
CheckEffectiveRight(record, right, true);
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/tx/schemeshard/ut_helpers/ls_checks.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ namespace NLs {
TCheckFunc BackupHistoryCount(ui64 count);

TCheckFunc HasOwner(const TString& owner);
TCheckFunc HasRight(const TString& right);
TCheckFunc HasNotRight(const TString& right);
TCheckFunc HasEffectiveRight(const TString& right);
TCheckFunc HasNotEffectiveRight(const TString& right);

Expand Down
Loading

0 comments on commit 3dce9fe

Please sign in to comment.