diff --git a/cmd/pitr/collector/collector.go b/cmd/pitr/collector/collector.go index c35cf41a0d..7e4b60edc8 100644 --- a/cmd/pitr/collector/collector.go +++ b/cmd/pitr/collector/collector.go @@ -57,6 +57,8 @@ type BackupAzure struct { StorageClass string `env:"AZURE_STORAGE_CLASS"` AccountName string `env:"AZURE_STORAGE_ACCOUNT,required"` AccountKey string `env:"AZURE_ACCESS_KEY,required"` + BlockSize int64 `env:"AZURE_BLOCK_SIZE"` + Concurrency int `env:"AZURE_CONCURRENCY"` } const ( @@ -85,7 +87,7 @@ func New(ctx context.Context, c Config) (*Collector, error) { if prefix != "" { prefix += "/" } - s, err = storage.NewAzure(c.BackupStorageAzure.AccountName, c.BackupStorageAzure.AccountKey, c.BackupStorageAzure.Endpoint, container, prefix) + s, err = storage.NewAzure(c.BackupStorageAzure.AccountName, c.BackupStorageAzure.AccountKey, c.BackupStorageAzure.Endpoint, container, prefix, c.BackupStorageAzure.BlockSize, c.BackupStorageAzure.Concurrency) if err != nil { return nil, errors.Wrap(err, "new azure storage") } diff --git a/cmd/pitr/recoverer/recoverer.go b/cmd/pitr/recoverer/recoverer.go index 9c7621c28b..ecbea8512b 100644 --- a/cmd/pitr/recoverer/recoverer.go +++ b/cmd/pitr/recoverer/recoverer.go @@ -76,11 +76,11 @@ func (c Config) storages(ctx context.Context) (storage.Storage, storage.Storage, case "azure": var err error container, prefix := getContainerAndPrefix(c.BinlogStorageAzure.ContainerPath) - binlogStorage, err = storage.NewAzure(c.BinlogStorageAzure.AccountName, c.BinlogStorageAzure.AccountKey, c.BinlogStorageAzure.Endpoint, container, prefix) + binlogStorage, err = storage.NewAzure(c.BinlogStorageAzure.AccountName, c.BinlogStorageAzure.AccountKey, c.BinlogStorageAzure.Endpoint, container, prefix, c.BinlogStorageAzure.BlockSize, c.BinlogStorageAzure.Concurrency) if err != nil { return nil, nil, errors.Wrap(err, "new azure storage") } - defaultStorage, err = storage.NewAzure(c.BackupStorageAzure.AccountName, c.BackupStorageAzure.AccountKey, c.BackupStorageAzure.Endpoint, c.BackupStorageAzure.ContainerName, c.BackupStorageAzure.BackupDest+".sst_info/") + defaultStorage, err = storage.NewAzure(c.BackupStorageAzure.AccountName, c.BackupStorageAzure.AccountKey, c.BackupStorageAzure.Endpoint, c.BackupStorageAzure.ContainerName, c.BackupStorageAzure.BackupDest+".sst_info/", c.BackupStorageAzure.BlockSize, c.BackupStorageAzure.Concurrency) if err != nil { return nil, nil, errors.Wrap(err, "new azure storage") } @@ -105,6 +105,8 @@ type BackupAzure struct { AccountName string `env:"AZURE_STORAGE_ACCOUNT,required"` AccountKey string `env:"AZURE_ACCESS_KEY,required"` BackupDest string `env:"BACKUP_PATH,required"` + BlockSize int64 `env:"AZURE_BLOCK_SIZE"` + Concurrency int `env:"AZURE_CONCURRENCY"` } type BinlogS3 struct { @@ -121,6 +123,8 @@ type BinlogAzure struct { StorageClass string `env:"BINLOG_AZURE_STORAGE_CLASS"` AccountName string `env:"BINLOG_AZURE_STORAGE_ACCOUNT,required"` AccountKey string `env:"BINLOG_AZURE_ACCESS_KEY,required"` + BlockSize int64 `env:"BINLOG_AZURE_BLOCK_SIZE"` + Concurrency int `env:"BINLOG_AZURE_CONCURRENCY"` } func (c *Config) Verify() { diff --git a/config/crd/bases/pxc.percona.com_perconaxtradbclusterbackups.yaml b/config/crd/bases/pxc.percona.com_perconaxtradbclusterbackups.yaml index a8e745cb8c..a6e717520a 100644 --- a/config/crd/bases/pxc.percona.com_perconaxtradbclusterbackups.yaml +++ b/config/crd/bases/pxc.percona.com_perconaxtradbclusterbackups.yaml @@ -161,6 +161,11 @@ spec: type: string storageClass: type: string + blockSize: + format: int64 + type: integer + concurrency: + type: integer type: object completed: format: date-time diff --git a/config/crd/bases/pxc.percona.com_perconaxtradbclusterrestores.yaml b/config/crd/bases/pxc.percona.com_perconaxtradbclusterrestores.yaml index 50e42bb776..ac8e66e702 100644 --- a/config/crd/bases/pxc.percona.com_perconaxtradbclusterrestores.yaml +++ b/config/crd/bases/pxc.percona.com_perconaxtradbclusterrestores.yaml @@ -59,6 +59,11 @@ spec: type: string storageClass: type: string + blockSize: + format: int64 + type: integer + concurrency: + type: integer type: object completed: format: date-time @@ -233,6 +238,11 @@ spec: type: string storageClass: type: string + blockSize: + format: int64 + type: integer + concurrency: + type: integer type: object completed: format: date-time diff --git a/config/crd/bases/pxc.percona.com_perconaxtradbclusters.yaml b/config/crd/bases/pxc.percona.com_perconaxtradbclusters.yaml index 27296db3a6..e4767daa27 100644 --- a/config/crd/bases/pxc.percona.com_perconaxtradbclusters.yaml +++ b/config/crd/bases/pxc.percona.com_perconaxtradbclusters.yaml @@ -599,6 +599,11 @@ spec: type: string storageClass: type: string + blockSize: + format: int64 + type: integer + concurrency: + type: integer type: object containerOptions: properties: diff --git a/deploy/bundle.yaml b/deploy/bundle.yaml index 433a815c78..48cfacbf03 100644 --- a/deploy/bundle.yaml +++ b/deploy/bundle.yaml @@ -160,6 +160,11 @@ spec: type: string storageClass: type: string + blockSize: + format: int64 + type: integer + concurrency: + type: integer type: object completed: format: date-time @@ -302,6 +307,11 @@ spec: type: string storageClass: type: string + blockSize: + format: int64 + type: integer + concurrency: + type: integer type: object completed: format: date-time @@ -476,6 +486,11 @@ spec: type: string storageClass: type: string + blockSize: + format: int64 + type: integer + concurrency: + type: integer type: object completed: format: date-time @@ -1504,6 +1519,11 @@ spec: type: string storageClass: type: string + blockSize: + format: int64 + type: integer + concurrency: + type: integer type: object containerOptions: properties: diff --git a/deploy/cr.yaml b/deploy/cr.yaml index e009d2421e..7278d66f99 100644 --- a/deploy/cr.yaml +++ b/deploy/cr.yaml @@ -672,6 +672,8 @@ spec: container: test # endpointUrl: https://accountName.blob.core.windows.net # storageClass: Hot +# blockSize: 4194304 +# concurrency: 4 fs-pvc: type: filesystem # nodeSelector: diff --git a/deploy/crd.yaml b/deploy/crd.yaml index 98e6861e79..ea2b5fe71a 100644 --- a/deploy/crd.yaml +++ b/deploy/crd.yaml @@ -160,6 +160,11 @@ spec: type: string storageClass: type: string + blockSize: + format: int64 + type: integer + concurrency: + type: integer type: object completed: format: date-time @@ -302,6 +307,11 @@ spec: type: string storageClass: type: string + blockSize: + format: int64 + type: integer + concurrency: + type: integer type: object completed: format: date-time @@ -476,6 +486,11 @@ spec: type: string storageClass: type: string + blockSize: + format: int64 + type: integer + concurrency: + type: integer type: object completed: format: date-time @@ -1504,6 +1519,11 @@ spec: type: string storageClass: type: string + blockSize: + format: int64 + type: integer + concurrency: + type: integer type: object containerOptions: properties: diff --git a/deploy/cw-bundle.yaml b/deploy/cw-bundle.yaml index 588c3c63c2..94d9623bc6 100644 --- a/deploy/cw-bundle.yaml +++ b/deploy/cw-bundle.yaml @@ -160,6 +160,11 @@ spec: type: string storageClass: type: string + blockSize: + format: int64 + type: integer + concurrency: + type: integer type: object completed: format: date-time @@ -302,6 +307,11 @@ spec: type: string storageClass: type: string + blockSize: + format: int64 + type: integer + concurrency: + type: integer type: object completed: format: date-time @@ -476,6 +486,11 @@ spec: type: string storageClass: type: string + blockSize: + format: int64 + type: integer + concurrency: + type: integer type: object completed: format: date-time @@ -1504,6 +1519,11 @@ spec: type: string storageClass: type: string + blockSize: + format: int64 + type: integer + concurrency: + type: integer type: object containerOptions: properties: diff --git a/pkg/apis/pxc/v1/pxc_types.go b/pkg/apis/pxc/v1/pxc_types.go index 160e62b394..65f689f757 100644 --- a/pkg/apis/pxc/v1/pxc_types.go +++ b/pkg/apis/pxc/v1/pxc_types.go @@ -725,6 +725,8 @@ type BackupStorageAzureSpec struct { ContainerPath string `json:"container"` Endpoint string `json:"endpointUrl"` StorageClass string `json:"storageClass"` + BlockSize int64 `json:"blockSize"` + Concurrency int `json:"concurrency"` } const ( diff --git a/pkg/pxc/backup/storage/options.go b/pkg/pxc/backup/storage/options.go index b10bc1081b..b8df12a9ce 100644 --- a/pkg/pxc/backup/storage/options.go +++ b/pkg/pxc/backup/storage/options.go @@ -52,6 +52,8 @@ func getAzureOptions(ctx context.Context, cl client.Client, backup *api.PerconaX Endpoint: backup.Status.Azure.Endpoint, Container: container, Prefix: prefix, + BlockSize: backup.Status.Azure.BlockSize, + Concurrency: backup.Status.Azure.Concurrency, }, nil } @@ -127,6 +129,8 @@ type AzureOptions struct { Endpoint string Container string Prefix string + BlockSize int64 + Concurrency int } func (o *AzureOptions) Type() api.BackupStorageType { diff --git a/pkg/pxc/backup/storage/storage.go b/pkg/pxc/backup/storage/storage.go index 8768894450..efb2667648 100644 --- a/pkg/pxc/backup/storage/storage.go +++ b/pkg/pxc/backup/storage/storage.go @@ -45,7 +45,7 @@ func NewClient(ctx context.Context, opts Options) (Storage, error) { if !ok { return nil, errors.New("invalid options type") } - return NewAzure(opts.StorageAccount, opts.AccessKey, opts.Endpoint, opts.Container, opts.Prefix) + return NewAzure(opts.StorageAccount, opts.AccessKey, opts.Endpoint, opts.Container, opts.Prefix, opts.BlockSize, opts.Concurrency) } return nil, errors.New("invalid storage type") } @@ -188,12 +188,14 @@ func (s *S3) DeleteObject(ctx context.Context, objectName string) error { // Azure is a type for working with Azure Blob storages type Azure struct { - client *azblob.Client // azure client for work with storage - container string - prefix string + client *azblob.Client // azure client for work with storage + container string + prefix string + blockSize int64 + concurrency int } -func NewAzure(storageAccount, accessKey, endpoint, container, prefix string) (Storage, error) { +func NewAzure(storageAccount, accessKey, endpoint, container, prefix string, blockSize int64, concurrency int) (Storage, error) { credential, err := azblob.NewSharedKeyCredential(storageAccount, accessKey) if err != nil { return nil, errors.Wrap(err, "new credentials") @@ -207,9 +209,11 @@ func NewAzure(storageAccount, accessKey, endpoint, container, prefix string) (St } return &Azure{ - client: cli, - container: container, - prefix: prefix, + client: cli, + container: container, + prefix: prefix, + blockSize: blockSize, + concurrency: concurrency, }, nil } @@ -227,7 +231,14 @@ func (a *Azure) GetObject(ctx context.Context, name string) (io.ReadCloser, erro func (a *Azure) PutObject(ctx context.Context, name string, data io.Reader, _ int64) error { objPath := path.Join(a.prefix, name) - _, err := a.client.UploadStream(ctx, a.container, objPath, data, nil) + uploadOptions := azblob.UploadStreamOptions{} + if a.blockSize > 0 { + uploadOptions.BlockSize = a.blockSize + } + if a.concurrency > 0 { + uploadOptions.Concurrency = a.concurrency + } + _, err := a.client.UploadStream(ctx, a.container, objPath, data, &uploadOptions) if err != nil { return errors.Wrapf(err, "upload stream: %s", objPath) }