Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

curvefs: support x-amz-storage-class #2926

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions curvefs/conf/tools.conf
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ s3.useVirtualAddressing=false
# s3 objectPrefix, if set 0, means no prefix, if set 1, means inode prefix
# if set 2 and other values mean hash prefix
s3.objectPrefix=0
# s3 storage class
# support NOT_SET|STANDARD|REDUCED_REDUNDANCY|STANDARD_IA|ONEZONE_IA|INTELLIGENT_TIERING|GLACIER|DEEP_ARCHIVE
s3.storageClass=NOT_SET

# statistic info in xattr, hardlink will not be supported when enable
enableSumInDir=false

Expand Down
12 changes: 12 additions & 0 deletions curvefs/proto/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ message S3Info {
required uint64 blockSize = 5;
required uint64 chunkSize = 6;
optional uint32 objectPrefix = 7;
optional StorageClass storageClass = 8;
}

enum PartitionStatus {
Expand Down Expand Up @@ -109,3 +110,14 @@ message CopysetInfo {
required uint32 poolId = 1;
required uint32 copysetId = 2;
}

enum StorageClass {
NOT_SET = 0;
STANDARD = 1;
REDUCED_REDUNDANCY = 2;
STANDARD_IA = 3;
ONEZONE_IA = 4;
INTELLIGENT_TIERING = 5;
GLACIER = 6;
DEEP_ARCHIVE = 7;
}
4 changes: 4 additions & 0 deletions curvefs/src/client/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,7 @@ void SetFuseClientS3Option(FuseClientOption *clientOption,
clientOption->s3Opt.s3ClientAdaptorOpt.blockSize = fsS3Opt.blockSize;
clientOption->s3Opt.s3ClientAdaptorOpt.chunkSize = fsS3Opt.chunkSize;
clientOption->s3Opt.s3ClientAdaptorOpt.objectPrefix = fsS3Opt.objectPrefix;
clientOption->s3Opt.s3ClientAdaptorOpt.storageClass = fsS3Opt.storageClass;
clientOption->s3Opt.s3AdaptrOpt.s3Address = fsS3Opt.s3Address;
clientOption->s3Opt.s3AdaptrOpt.ak = fsS3Opt.ak;
clientOption->s3Opt.s3AdaptrOpt.sk = fsS3Opt.sk;
Expand All @@ -487,6 +488,9 @@ void S3Info2FsS3Option(const curvefs::common::S3Info& s3,
fsS3Opt->blockSize = s3.blocksize();
fsS3Opt->chunkSize = s3.chunksize();
fsS3Opt->objectPrefix = s3.has_objectprefix() ? s3.objectprefix() : 0;
fsS3Opt->storageClass =
s3.has_storageclass() ? Aws::S3::Model::StorageClass(s3.storageclass())
: Aws::S3::Model::StorageClass::NOT_SET;
}

} // namespace common
Expand Down
1 change: 1 addition & 0 deletions curvefs/src/client/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ struct S3ClientAdaptorOption {
uint32_t readRetryIntervalMs;
uint32_t objectPrefix;
DiskCacheOption diskCacheOpt;
Aws::S3::Model::StorageClass storageClass;
};

struct S3Option {
Expand Down
1 change: 1 addition & 0 deletions curvefs/src/client/s3/client_s3_adaptor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ S3ClientAdaptorImpl::Init(
maxReadRetryIntervalMs_ = option.maxReadRetryIntervalMs;
readRetryIntervalMs_ = option.readRetryIntervalMs;
objectPrefix_ = option.objectPrefix;
storageClass_ = option.storageClass;
client_ = client;
inodeManager_ = inodeManager;
mdsClient_ = mdsClient;
Expand Down
5 changes: 5 additions & 0 deletions curvefs/src/client/s3/client_s3_adaptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ class S3ClientAdaptorImpl : public S3ClientAdaptor {
return objectPrefix_;
}

Aws::S3::Model::StorageClass GetStorageClass() {
return storageClass_;
}

std::shared_ptr<FsCacheManager> GetFsCacheManager() {
return fsCacheManager_;
}
Expand Down Expand Up @@ -281,6 +285,7 @@ class S3ClientAdaptorImpl : public S3ClientAdaptor {
uint32_t maxReadRetryIntervalMs_;
uint32_t readRetryIntervalMs_;
uint32_t objectPrefix_;
Aws::S3::Model::StorageClass storageClass_;
Thread bgFlushThread_;
std::atomic<bool> toStop_;
std::mutex mtx_;
Expand Down
2 changes: 2 additions & 0 deletions curvefs/src/client/s3/client_s3_cache_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2544,6 +2544,8 @@ void DataCache::FlushTaskExecute(
s3Tasks.begin(), s3Tasks.end(),
[&](const std::shared_ptr<PutObjectAsyncContext> &context) {
context->cb = s3cb;
context->putObjectOptions = curve::common::PutObjectOptions{
s3ClientAdaptor_->GetStorageClass()};
if (CachePolicy::WRCache == cachePolicy) {
context->type = curve::common::ContextType::Disk;
s3ClientAdaptor_->GetDiskCacheManager()->Enqueue(context);
Expand Down
4 changes: 2 additions & 2 deletions curvefs/src/client/s3/disk_cache_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ int DiskCacheManager::Init(std::shared_ptr<S3Client> client,
const S3ClientAdaptorOption option) {
LOG(INFO) << "DiskCacheManager init start.";
client_ = client;

option_ = option;
FLAGS_diskTrimCheckIntervalSec = option.diskCacheOpt.trimCheckIntervalSec;
FLAGS_diskFullRatio = option.diskCacheOpt.fullRatio;
Expand All @@ -108,7 +107,8 @@ int DiskCacheManager::Init(std::shared_ptr<S3Client> client,
cmdTimeoutSec_ = option.diskCacheOpt.cmdTimeoutSec;
objectPrefix_ = option.objectPrefix;
cacheWrite_->Init(client_, posixWrapper_, cacheDir_, objectPrefix_,
option.diskCacheOpt.asyncLoadPeriodMs, cachedObjName_);
option.diskCacheOpt.asyncLoadPeriodMs, cachedObjName_,
option.storageClass);
cacheRead_->Init(posixWrapper_, cacheDir_, objectPrefix_);
int ret;
ret = CreateDir();
Expand Down
1 change: 1 addition & 0 deletions curvefs/src/client/s3/disk_cache_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ class DiskCacheManagerImpl {
std::shared_ptr<DiskCacheManager> diskCacheManager_;

bool forceFlush_;
Aws::S3::Model::StorageClass storageClass_;
std::shared_ptr<S3Client> client_;

int WriteClosure(std::shared_ptr<PutObjectAsyncContext> context);
Expand Down
7 changes: 6 additions & 1 deletion curvefs/src/client/s3/disk_cache_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,13 @@ void DiskCacheWrite::Init(std::shared_ptr<S3Client> client,
uint32_t objectPrefix,
uint64_t asyncLoadPeriodMs,
std::shared_ptr<SglLRUCache<
std::string>> cachedObjName) {
std::string>> cachedObjName,
Aws::S3::Model::StorageClass storageClass) {
client_ = client;
posixWrapper_ = posixWrapper;
asyncLoadPeriodMs_ = asyncLoadPeriodMs;
cachedObjName_ = cachedObjName;
storageClass_ = storageClass;
DiskCacheBase::Init(posixWrapper, cacheDir, objectPrefix);
}

Expand Down Expand Up @@ -168,6 +170,7 @@ int DiskCacheWrite::UploadFile(const std::string &name,
};
auto context = std::make_shared<PutObjectAsyncContext>(
name, buffer, fileSize, cb, curve::common::ContextType::S3);
context->putObjectOptions = curve::common::PutObjectOptions{storageClass_};
client_->UploadAsync(context);
VLOG(9) << "async upload end, file = " << name;
return 0;
Expand Down Expand Up @@ -424,6 +427,8 @@ int DiskCacheWrite::UploadAllCacheWriteFile() {
auto context = std::make_shared<PutObjectAsyncContext>(
curvefs::common::s3util::GenPathByObjName(*iter, objectPrefix_),
buffer, fileSize, cb, curve::common::ContextType::S3);
context->putObjectOptions =
curve::common::PutObjectOptions{storageClass_};
client_->UploadAsync(context);
}
if (pendingReq.load(std::memory_order_seq_cst)) {
Expand Down
5 changes: 4 additions & 1 deletion curvefs/src/client/s3/disk_cache_write.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ class DiskCacheWrite : public DiskCacheBase {
std::shared_ptr<PosixWrapper> posixWrapper,
const std::string cacheDir, uint32_t objectPrefix,
uint64_t asyncLoadPeriodMs,
std::shared_ptr<SglLRUCache<std::string>> cachedObjName);
std::shared_ptr<SglLRUCache<std::string>> cachedObjName,
Aws::S3::Model::StorageClass storageClass =
Aws::S3::Model::StorageClass::NOT_SET);
/**
* @brief write obj to write cache disk
* @param[in] client S3Client
Expand Down Expand Up @@ -166,6 +168,7 @@ class DiskCacheWrite : public DiskCacheBase {
std::shared_ptr<S3Metric> s3Metric_;

std::shared_ptr<SglLRUCache<std::string>> cachedObjName_;
Aws::S3::Model::StorageClass storageClass_;
};

} // namespace client
Expand Down
17 changes: 16 additions & 1 deletion curvefs/src/metaserver/s3compact_inode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "curvefs/src/common/s3util.h"
#include "curvefs/src/metaserver/copyset/copyset_node_manager.h"
#include "curvefs/src/metaserver/copyset/meta_operator.h"
#include "curvefs/src/metaserver/mds/fsinfo_manager.h"

using curve::common::Configuration;
using curve::common::InitS3AdaptorOptionExceptS3InfoOption;
Expand Down Expand Up @@ -344,6 +345,19 @@ int CompactInodeJob::WriteFullChunk(
const auto& blockSize = ctx.blockSize;
const auto& chunkSize = ctx.chunkSize;
const auto& newOff = newChunkInfo.newOff;
FsInfo fsInfo;
auto ok = FsInfoManager::GetInstance().GetFsInfo(ctx.fsId, &fsInfo);
if (!ok) {
LOG(ERROR) << "WriteFullChunk failed, fsId "
<< ctx.fsId << " not exist";
return ok;
}
const auto& s3Info = fsInfo.detail().s3info();
curve::common::PutObjectOptions putObjectOption;
putObjectOption.storageClass =
s3Info.has_storageclass()
? Aws::S3::Model::StorageClass(s3Info.storageclass())
: Aws::S3::Model::StorageClass::NOT_SET;
uint64_t offRoundDown = newOff / chunkSize * chunkSize;
uint64_t startIndex = (newOff - newOff / chunkSize * chunkSize) / blockSize;
for (uint64_t index = startIndex;
Expand All @@ -361,7 +375,8 @@ int CompactInodeJob::WriteFullChunk(
<< s3objEnd << "]";
ret = ctx.s3adapter->PutObject(
aws_key,
fullChunk.substr(s3objBegin - newOff, s3objEnd - s3objBegin + 1));
fullChunk.substr(s3objBegin - newOff, s3objEnd - s3objBegin + 1),
putObjectOption);
if (ret != 0) {
LOG(WARNING) << "s3compact: put s3 object " << objName << " failed";
return ret;
Expand Down
90 changes: 52 additions & 38 deletions curvefs/src/tools/create/curvefs_create_fs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ DECLARE_string(s3_bucket_name);
DECLARE_uint64(s3_blocksize);
DECLARE_uint64(s3_chunksize);
DECLARE_uint32(s3_objectPrefix);
DECLARE_string(s3_storageClass);
DECLARE_uint32(rpcTimeoutMs);
DECLARE_uint32(rpcRetryTimes);
DECLARE_bool(enableSumInDir);
Expand All @@ -76,44 +77,48 @@ using ::curvefs::common::BitmapLocation_Parse;
void CreateFsTool::PrintHelp() {
CurvefsToolRpc::PrintHelp();
std::cout << " -fsName=" << FLAGS_fsName << " [-user=" << FLAGS_user
<< "] [-capacity=" << FLAGS_capacity
<< "] [-blockSize=" << FLAGS_blockSize
<< "] [-enableSumInDir=" << FLAGS_enableSumInDir
<< "] [-mdsAddr=" << FLAGS_mdsAddr
<< "] [-rpcTimeoutMs=" << FLAGS_rpcTimeoutMs
<< " -rpcRetryTimes=" << FLAGS_rpcRetryTimes << "]"
<< "] [recycleTimeHour=" << FLAGS_recycleTimeHour
<< "] \n[-fsType=volume -volumeBlockGroupSize="
<< FLAGS_volumeBlockGroupSize
<< " -volumeBlockSize=" << FLAGS_volumeBlockSize
<< " -volumeName=" << FLAGS_volumeName
<< " -volumeUser=" << FLAGS_volumeUser
<< " -volumePassword=" << FLAGS_volumePassword
<< " -volumeBitmapLocation=AtStart|AtEnd"
<< " -volumeAutoExtend=false|true"
<< " -volumeExtendFactor=" << FLAGS_volumeExtendFactor
<< " -volumeCluster=" << FLAGS_volumeCluster
<< "]\n[-fsType=s3 -s3_ak=" << FLAGS_s3_ak
<< " -s3_sk=" << FLAGS_s3_sk
<< " -s3_endpoint=" << FLAGS_s3_endpoint
<< " -s3_bucket_name=" << FLAGS_s3_bucket_name
<< " -s3_blocksize=" << FLAGS_s3_blocksize
<< " -s3_chunksize=" << FLAGS_s3_chunksize
<< " -s3_objectPrefix=" << FLAGS_s3_objectPrefix
<< "]\n[-fsType=hybrid -volumeBlockGroupSize="
<< FLAGS_volumeBlockGroupSize
<< " -volumeBlockSize=" << FLAGS_volumeBlockSize
<< " -volumeName=" << FLAGS_volumeName
<< " -volumeUser=" << FLAGS_volumeUser
<< " -volumePassword=" << FLAGS_volumePassword
<< " -volumeBitmapLocation=AtStart|AtEnd"
<< " -s3_ak=" << FLAGS_s3_ak << " -s3_sk=" << FLAGS_s3_sk
<< " -s3_endpoint=" << FLAGS_s3_endpoint
<< " -s3_bucket_name=" << FLAGS_s3_bucket_name
<< " -s3_blocksize=" << FLAGS_s3_blocksize
<< " -s3_chunksize=" << FLAGS_s3_chunksize
<< " -s3_objectPrefix=" << FLAGS_s3_objectPrefix
<< "]" << std::endl;
<< "] [-capacity=" << FLAGS_capacity
<< "] [-blockSize=" << FLAGS_blockSize
<< "] [-enableSumInDir=" << FLAGS_enableSumInDir
<< "] [-mdsAddr=" << FLAGS_mdsAddr
<< "] [-rpcTimeoutMs=" << FLAGS_rpcTimeoutMs
<< " -rpcRetryTimes=" << FLAGS_rpcRetryTimes << "]"
<< "] [recycleTimeHour=" << FLAGS_recycleTimeHour
<< "] \n[-fsType=volume -volumeBlockGroupSize="
<< FLAGS_volumeBlockGroupSize
<< " -volumeBlockSize=" << FLAGS_volumeBlockSize
<< " -volumeName=" << FLAGS_volumeName
<< " -volumeUser=" << FLAGS_volumeUser
<< " -volumePassword=" << FLAGS_volumePassword
<< " -volumeBitmapLocation=AtStart|AtEnd"
<< " -volumeAutoExtend=false|true"
<< " -volumeExtendFactor=" << FLAGS_volumeExtendFactor
<< " -volumeCluster=" << FLAGS_volumeCluster
<< "]\n[-fsType=s3 -s3_ak=" << FLAGS_s3_ak
<< " -s3_sk=" << FLAGS_s3_sk
<< " -s3_endpoint=" << FLAGS_s3_endpoint
<< " -s3_bucket_name=" << FLAGS_s3_bucket_name
<< " -s3_blocksize=" << FLAGS_s3_blocksize
<< " -s3_chunksize=" << FLAGS_s3_chunksize
<< " -s3_objectPrefix=" << FLAGS_s3_objectPrefix
<< " -s3_storageClass=NOT_SET|STANDARD|REDUCED_REDUNDANCY|STANDARD_IA|"
"ONEZONE_IA|INTELLIGENT_TIERING|GLACIER|DEEP_ARCHIVE"
<< "]\n[-fsType=hybrid -volumeBlockGroupSize="
<< FLAGS_volumeBlockGroupSize
<< " -volumeBlockSize=" << FLAGS_volumeBlockSize
<< " -volumeName=" << FLAGS_volumeName
<< " -volumeUser=" << FLAGS_volumeUser
<< " -volumePassword=" << FLAGS_volumePassword
<< " -volumeBitmapLocation=AtStart|AtEnd"
<< " -s3_ak=" << FLAGS_s3_ak << " -s3_sk=" << FLAGS_s3_sk
<< " -s3_endpoint=" << FLAGS_s3_endpoint
<< " -s3_bucket_name=" << FLAGS_s3_bucket_name
<< " -s3_blocksize=" << FLAGS_s3_blocksize
<< " -s3_chunksize=" << FLAGS_s3_chunksize
<< " -s3_objectPrefix=" << FLAGS_s3_objectPrefix
<< " -s3_storageClass=NOT_SET|STANDARD|REDUCED_REDUNDANCY|STANDARD_IA|"
"ONEZONE_IA|INTELLIGENT_TIERING|GLACIER|DEEP_ARCHIVE"
<< "]" << std::endl;
}

void CreateFsTool::AddUpdateFlags() {
Expand All @@ -134,6 +139,7 @@ void CreateFsTool::AddUpdateFlags() {
AddUpdateFlagsFunc(curvefs::tools::SetS3_blocksize);
AddUpdateFlagsFunc(curvefs::tools::SetS3_chunksize);
AddUpdateFlagsFunc(curvefs::tools::SetS3_objectPrefix);
AddUpdateFlagsFunc(curvefs::tools::SetS3_storageClass);
AddUpdateFlagsFunc(curvefs::tools::SetRpcTimeoutMs);
AddUpdateFlagsFunc(curvefs::tools::SetRpcRetryTimes);
AddUpdateFlagsFunc(curvefs::tools::SetEnableSumInDir);
Expand Down Expand Up @@ -179,6 +185,14 @@ int CreateFsTool::Init() {
s3->set_blocksize(FLAGS_s3_blocksize);
s3->set_chunksize(FLAGS_s3_chunksize);
s3->set_objectprefix(FLAGS_s3_objectPrefix);
curvefs::common::StorageClass storageClass;
if (!StorageClass_Parse(FLAGS_s3_storageClass, &storageClass)) {
std::cerr << "Parse storageClass error, only support "
"NOT_SET|STANDARD|REDUCED_REDUNDANCY|STANDARD_IA|"
"ONEZONE_IA|INTELLIGENT_TIERING|GLACIER|DEEP_ARCHIVE";
return -1;
}
s3->set_storageclass(storageClass);
request.mutable_fsdetail()->set_allocated_s3info(s3);
return 0;
};
Expand Down
7 changes: 7 additions & 0 deletions curvefs/src/tools/curvefs_tool_define.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ DEFINE_string(s3_bucket_name, "bucketname", "s3 bucket name");
DEFINE_uint64(s3_blocksize, 1048576, "s3 block size");
DEFINE_uint64(s3_chunksize, 4194304, "s3 chunk size");
DEFINE_uint32(s3_objectPrefix, 0, "object prefix");
DEFINE_string(s3_storageClass, "NOT_SET", "s3 storage classes");
DEFINE_bool(enableSumInDir, false, "statistic info in xattr");
DEFINE_uint64(capacity, (uint64_t)0,
"capacity of fs, unit is bytes, default 0 to disable quota");
Expand Down Expand Up @@ -256,6 +257,12 @@ std::function<void(curve::common::Configuration*, google::CommandLineFlagInfo*)>
std::placeholders::_2, "s3_objectPrefix", "s3.objectPrefix",
&FLAGS_s3_objectPrefix);

std::function<void(curve::common::Configuration*, google::CommandLineFlagInfo*)>
SetS3_storageClass =
std::bind(&SetDiffFlagInfo<fLS::clstring>, std::placeholders::_1,
std::placeholders::_2, "s3_storageClass", "s3.storageClass",
&FLAGS_s3_storageClass);

std::function<void(curve::common::Configuration*, google::CommandLineFlagInfo*)>
SetEnableSumInDir = std::bind(&SetFlagInfo<bool>, std::placeholders::_1,
std::placeholders::_2, "enableSumInDir",
Expand Down
4 changes: 4 additions & 0 deletions curvefs/src/tools/curvefs_tool_define.h
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,10 @@ extern std::function<void(curve::common::Configuration*,
extern std::function<void(curve::common::Configuration*,
google::CommandLineFlagInfo*)>
SetS3_objectPrefix;
extern std::function<void(curve::common::Configuration*,
google::CommandLineFlagInfo*)>
SetS3_storageClass;

extern std::function<void(curve::common::Configuration*,
google::CommandLineFlagInfo*)>
SetEnableSumInDir;
Expand Down
Loading