diff --git a/go.mod b/go.mod index 2c15d9c88..972268c09 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( github.com/google/uuid v1.3.0 github.com/gorilla/handlers v1.5.1 github.com/gorilla/mux v1.8.0 - github.com/longhorn/backupstore v0.0.0-20230830075002-fa25b1a97ffd + github.com/longhorn/backupstore v0.0.0-20230917124937-b5235e5ee814 github.com/longhorn/go-iscsi-helper v0.0.0-20230802055236-4ec8edae3fad github.com/longhorn/sparse-tools v0.0.0-20230408015858-c849def39d3c github.com/moby/moby v23.0.2+incompatible diff --git a/go.sum b/go.sum index 842dc3c43..7b93c0a3d 100644 --- a/go.sum +++ b/go.sum @@ -67,8 +67,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/longhorn/backupstore v0.0.0-20230830075002-fa25b1a97ffd h1:kMPcZ7n6NImlL0vay6onviYz1ncDk9U9JdNxqviRvUw= -github.com/longhorn/backupstore v0.0.0-20230830075002-fa25b1a97ffd/go.mod h1:wiEYTbvxEAIUxAAY1DmvMeuFuGqwWmJTzfVhZiBKlNo= +github.com/longhorn/backupstore v0.0.0-20230917124937-b5235e5ee814 h1:Kz/mCeQvHUWCemdds1EDsL+AairEmOFZwpZmA8t/y/g= +github.com/longhorn/backupstore v0.0.0-20230917124937-b5235e5ee814/go.mod h1:wiEYTbvxEAIUxAAY1DmvMeuFuGqwWmJTzfVhZiBKlNo= github.com/longhorn/go-iscsi-helper v0.0.0-20230802055236-4ec8edae3fad h1:1eZBx9ZSopMM969E/BTTUIhM0O2klBOgh8qnmGnXftA= github.com/longhorn/go-iscsi-helper v0.0.0-20230802055236-4ec8edae3fad/go.mod h1:hxy8Ra38KtX4MFmXZRAZUpJZSYcaI1pmnWmKA3ICA2c= github.com/longhorn/nsfilelock v0.0.0-20200723175406-fa7c83ad0003 h1:Jw9uANsGcHTxp6HcC++/vN17LfeuDmozHI2j6DoZf5E= diff --git a/vendor/github.com/longhorn/backupstore/backupstore.go b/vendor/github.com/longhorn/backupstore/backupstore.go index debaf3d6e..ef73f9467 100644 --- a/vendor/github.com/longhorn/backupstore/backupstore.go +++ b/vendor/github.com/longhorn/backupstore/backupstore.go @@ -5,8 +5,9 @@ import ( "net/url" "sync" - "github.com/longhorn/backupstore/util" "github.com/pkg/errors" + + "github.com/longhorn/backupstore/util" ) type Volume struct { @@ -21,6 +22,7 @@ type Volume struct { BackingImageChecksum string `json:",string"` CompressionMethod string `json:",string"` StorageClassName string `json:",string"` + BackendStoreDriver string `json:",string"` } type Snapshot struct { diff --git a/vendor/github.com/longhorn/backupstore/config.go b/vendor/github.com/longhorn/backupstore/config.go index 883e11106..3694922fa 100644 --- a/vendor/github.com/longhorn/backupstore/config.go +++ b/vendor/github.com/longhorn/backupstore/config.go @@ -269,6 +269,9 @@ func loadVolume(driver BackupStoreDriver, volumeName string) (*Volume, error) { log.Infof("Falling back compression method to %v for volume %v", LEGACY_COMPRESSION_METHOD, v.Name) v.CompressionMethod = LEGACY_COMPRESSION_METHOD } + if v.BackendStoreDriver == "" { + v.BackendStoreDriver = string(BackendStoreDriverV1) + } return v, nil } diff --git a/vendor/github.com/longhorn/backupstore/deltablock.go b/vendor/github.com/longhorn/backupstore/deltablock.go index e0eedc5e4..4812078a8 100644 --- a/vendor/github.com/longhorn/backupstore/deltablock.go +++ b/vendor/github.com/longhorn/backupstore/deltablock.go @@ -41,6 +41,13 @@ type BlockMapping struct { BlockChecksum string } +type Block struct { + offset int64 + blockChecksum string + compressionMethod string + isZeroBlock bool +} + type BlockInfo struct { checksum string path string @@ -81,6 +88,16 @@ func (r backupRequest) getBackupType() string { return "full" } +type progress struct { + sync.Mutex + + totalBlockCounts int64 + processedBlockCounts int64 + newBlockCounts int64 + + progress int +} + type DeltaBlockBackupOperations interface { HasSnapshot(id, volumeID string) bool CompareSnapshot(id, compareID, volumeID string) (*types.Mappings, error) @@ -91,26 +108,19 @@ type DeltaBlockBackupOperations interface { } type DeltaRestoreOperations interface { + OpenVolumeDev(volDevName string) (*os.File, string, error) + CloseVolumeDev(volDev *os.File) error UpdateRestoreStatus(snapshot string, restoreProgress int, err error) + Stop() + GetStopChan() chan struct{} } -const ( - DEFAULT_BLOCK_SIZE = 2 * 1024 * 1024 - LEGACY_COMPRESSION_METHOD = "gzip" - - BLOCKS_DIRECTORY = "blocks" - BLOCK_SEPARATE_LAYER1 = 2 - BLOCK_SEPARATE_LAYER2 = 4 - BLK_SUFFIX = ".blk" - - PROGRESS_PERCENTAGE_BACKUP_SNAPSHOT = 95 - PROGRESS_PERCENTAGE_BACKUP_TOTAL = 100 -) - +// CreateDeltaBlockBackup creates a delta block backup for the given volume and snapshot. func CreateDeltaBlockBackup(backupName string, config *DeltaBackupConfig) (isIncremental bool, err error) { if config == nil { return false, fmt.Errorf("BUG: invalid empty config for backup") } + volume := config.Volume snapshot := config.Snapshot destURL := config.DestURL @@ -158,6 +168,7 @@ func CreateDeltaBlockBackup(backupName string, config *DeltaBackupConfig) (isInc } config.Volume.CompressionMethod = volume.CompressionMethod + config.Volume.BackendStoreDriver = volume.BackendStoreDriver if err := deltaOps.OpenSnapshot(snapshot.Name, volume.Name); err != nil { return false, err @@ -362,7 +373,7 @@ func backupBlock(bsDriver BackupStoreDriver, config *DeltaBackupConfig, return nil } - log.Debugf("Creating new block file at %v", blkFile) + log.Tracef("Creating new block file at %v", blkFile) newBlock = true rs, err := util.CompressData(deltaBackup.CompressionMethod, block) if err != nil { @@ -382,7 +393,7 @@ func backupMapping(bsDriver BackupStoreDriver, config *DeltaBackupConfig, blkCounts := mapping.Size / blockSize for i := int64(0); i < blkCounts; i++ { - log.Debugf("Backup for %v: segment %+v, blocks %v/%v", snapshot.Name, mapping, i+1, blkCounts) + log.Tracef("Backup for %v: segment %+v, blocks %v/%v", snapshot.Name, mapping, i+1, blkCounts) offset := mapping.Offset + i*blockSize if err := deltaOps.ReadSnapshot(snapshot.Name, volume.Name, offset, block); err != nil { logrus.WithError(err).Errorf("Failed to read volume %v snapshot %v block at offset %v size %v", @@ -426,16 +437,6 @@ func backupMappings(ctx context.Context, bsDriver BackupStoreDriver, config *Del return errChan } -type progress struct { - sync.Mutex - - totalBlockCounts int64 - processedBlockCounts int64 - newBlockCounts int64 - - progress int -} - func getTotalBackupBlockCounts(delta *types.Mappings) (int64, error) { totalBlockCounts := int64(0) for _, d := range delta.Mappings { @@ -448,38 +449,6 @@ func getTotalBackupBlockCounts(delta *types.Mappings) (int64, error) { return totalBlockCounts, nil } -// mergeErrorChannels will merge all error channels into a single error out channel. -// the error out channel will be closed once the ctx is done or all error channels are closed -// if there is an error on one of the incoming channels the error will be relayed. -func mergeErrorChannels(ctx context.Context, channels ...<-chan error) <-chan error { - var wg sync.WaitGroup - wg.Add(len(channels)) - - out := make(chan error, len(channels)) - output := func(c <-chan error) { - defer wg.Done() - select { - case err, ok := <-c: - if ok { - out <- err - } - return - case <-ctx.Done(): - return - } - } - - for _, c := range channels { - go output(c) - } - - go func() { - wg.Wait() - close(out) - }() - return out -} - func sortBackupBlocks(blocks []BlockMapping, volumeSize, blockSize int64) []BlockMapping { sortedBlocks := make([]string, volumeSize/blockSize) for _, block := range blocks { @@ -584,6 +553,7 @@ func performBackup(bsDriver BackupStoreDriver, config *DeltaBackupConfig, delta volume.BackingImageChecksum = config.Volume.BackingImageChecksum volume.CompressionMethod = config.Volume.CompressionMethod volume.StorageClassName = config.Volume.StorageClassName + volume.BackendStoreDriver = config.Volume.BackendStoreDriver if err := saveVolume(bsDriver, volume); err != nil { return progress.progress, "", err @@ -636,6 +606,7 @@ func mergeSnapshotMap(deltaBackup, lastBackup *Backup) *Backup { return backup } +// RestoreDeltaBlockBackup restores a delta block backup for the given configuration func RestoreDeltaBlockBackup(config *DeltaRestoreConfig) error { if config == nil { return fmt.Errorf("invalid empty config for restore") @@ -678,24 +649,16 @@ func RestoreDeltaBlockBackup(config *DeltaRestoreConfig) error { } if vol.Size == 0 || vol.Size%DEFAULT_BLOCK_SIZE != 0 { - return fmt.Errorf("read invalid volume size %v", vol.Size) + return fmt.Errorf("invalid volume size %v", vol.Size) } - if _, err := os.Stat(volDevName); err == nil { - logrus.Warnf("File %s for the restore exists, will remove and re-create it", volDevName) - if err := os.RemoveAll(volDevName); err != nil { - return errors.Wrapf(err, "failed to clean up the existing file %v before restore", volDevName) - } - } - - volDev, err := os.Create(volDevName) + volDev, volDevPath, err := deltaOps.OpenVolumeDev(volDevName) if err != nil { - return err + return errors.Wrapf(err, "failed to open volume device %v", volDevName) } defer func() { - // make sure to close the device if err != nil { - _ = volDev.Close() + _ = deltaOps.CloseVolumeDev(volDev) } }() @@ -724,8 +687,14 @@ func RestoreDeltaBlockBackup(config *DeltaRestoreConfig) error { return err } go func() { - defer volDev.Close() - defer lock.Unlock() + var err error + currentProgress := 0 + + defer func() { + _ = deltaOps.CloseVolumeDev(volDev) + deltaOps.UpdateRestoreStatus(volDevName, currentProgress, err) + lock.Unlock() + }() progress := &progress{ totalBlockCounts: int64(len(backup.Blocks)), @@ -736,10 +705,10 @@ func RestoreDeltaBlockBackup(config *DeltaRestoreConfig) error { // closed. // https://github.com/longhorn/longhorn/issues/2503 // We want to truncate regular files, but not device - if stat.Mode()&os.ModeType == 0 { - log.Debugf("Truncate %v to size %v", volDevName, vol.Size) - if err := volDev.Truncate(vol.Size); err != nil { - deltaOps.UpdateRestoreStatus(volDevName, progress.progress, err) + if stat.Mode().IsRegular() { + log.Infof("Truncate %v to size %v", volDevName, vol.Size) + err = volDev.Truncate(vol.Size) + if err != nil { return } } @@ -751,18 +720,17 @@ func RestoreDeltaBlockBackup(config *DeltaRestoreConfig) error { errorChans := []<-chan error{errChan} for i := 0; i < int(concurrentLimit); i++ { - errorChans = append(errorChans, restoreBlocks(ctx, bsDriver, config, srcVolumeName, blockChan, progress)) + errorChans = append(errorChans, restoreBlocks(ctx, bsDriver, config.DeltaOps, volDevPath, srcVolumeName, blockChan, progress)) } mergedErrChan := mergeErrorChannels(ctx, errorChans...) err = <-mergedErrChan if err != nil { + currentProgress = progress.progress logrus.WithError(err).Errorf("Failed to delta restore volume %v backup %v", srcVolumeName, backup.Name) - deltaOps.UpdateRestoreStatus(volDevName, progress.progress, err) return } - - deltaOps.UpdateRestoreStatus(volDevName, PROGRESS_PERCENTAGE_BACKUP_TOTAL, nil) + currentProgress = PROGRESS_PERCENTAGE_BACKUP_TOTAL }() return nil @@ -908,13 +876,6 @@ func RestoreDeltaBlockBackupIncrementally(config *DeltaRestoreConfig) error { return nil } -type Block struct { - offset int64 - blockChecksum string - compressionMethod string - isZeroBlock bool -} - func populateBlocksForIncrementalRestore(bsDriver BackupStoreDriver, lastBackup, backup *Backup) (<-chan *Block, <-chan error) { blockChan := make(chan *Block, 10) errChan := make(chan error, 1) @@ -994,10 +955,7 @@ func populateBlocksForFullRestore(bsDriver BackupStoreDriver, backup *Backup) (< return blockChan, errChan } -func restoreBlock(bsDriver BackupStoreDriver, config *DeltaRestoreConfig, - volumeName string, volDev *os.File, block *Block, progress *progress) error { - deltaOps := config.DeltaOps - +func restoreBlock(bsDriver BackupStoreDriver, deltaOps DeltaRestoreOperations, volumeName string, volDev *os.File, block *Block, progress *progress) error { defer func() { progress.Lock() defer progress.Unlock() @@ -1018,32 +976,41 @@ func restoreBlock(bsDriver BackupStoreDriver, config *DeltaRestoreConfig, }) } -func restoreBlocks(ctx context.Context, bsDriver BackupStoreDriver, config *DeltaRestoreConfig, - volumeName string, in <-chan *Block, progress *progress) <-chan error { +func restoreBlocks(ctx context.Context, bsDriver BackupStoreDriver, deltaOps DeltaRestoreOperations, volDevPath, volumeName string, in <-chan *Block, progress *progress) <-chan error { errChan := make(chan error, 1) go func() { + var err error defer close(errChan) - volDevName := config.Filename - volDev, err := os.OpenFile(volDevName, os.O_RDWR, 0666) + volDev, err := os.OpenFile(volDevPath, os.O_RDWR, 0666) if err != nil { errChan <- err return } - defer volDev.Close() + defer func() { + volDev.Close() + if err != nil { + errChan <- err + } + }() for { select { case <-ctx.Done(): + logrus.Infof("Closing goroutine for restoring blocks for volume %v", volumeName) + return + case <-deltaOps.GetStopChan(): + logrus.Infof("Closing goroutine for restoring blocks for %v since received stop signal", volumeName) + err = fmt.Errorf("restoration is cancelled since received stop signal") return case block, open := <-in: if !open { return } - if err := restoreBlock(bsDriver, config, volumeName, volDev, block, progress); err != nil { - errChan <- err + err = restoreBlock(bsDriver, deltaOps, volumeName, volDev, block, progress) + if err != nil { return } } @@ -1069,12 +1036,11 @@ func performIncrementalRestore(bsDriver BackupStoreDriver, config *DeltaRestoreC errorChans := []<-chan error{errChan} for i := 0; i < int(concurrentLimit); i++ { - errorChans = append(errorChans, restoreBlocks(ctx, bsDriver, config, srcVolumeName, blockChan, progress)) + errorChans = append(errorChans, restoreBlocks(ctx, bsDriver, config.DeltaOps, config.Filename, srcVolumeName, blockChan, progress)) } mergedErrChan := mergeErrorChannels(ctx, errorChans...) err = <-mergedErrChan - if err != nil { logrus.WithError(err).Errorf("Failed to incrementally restore volume %v backup %v", srcVolumeName, backup.Name) } @@ -1330,16 +1296,3 @@ func getBlockNamesForVolume(driver BackupStoreDriver, volumeName string) ([]stri return util.ExtractNames(names, "", BLK_SUFFIX), nil } - -func getBlockPath(volumeName string) string { - return filepath.Join(getVolumePath(volumeName), BLOCKS_DIRECTORY) + "/" -} - -func getBlockFilePath(volumeName, checksum string) string { - blockSubDirLayer1 := checksum[0:BLOCK_SEPARATE_LAYER1] - blockSubDirLayer2 := checksum[BLOCK_SEPARATE_LAYER1:BLOCK_SEPARATE_LAYER2] - path := filepath.Join(getBlockPath(volumeName), blockSubDirLayer1, blockSubDirLayer2) - fileName := checksum + BLK_SUFFIX - - return filepath.Join(path, fileName) -} diff --git a/vendor/github.com/longhorn/backupstore/inspect.go b/vendor/github.com/longhorn/backupstore/inspect.go index 85bc55b93..4bddac84f 100644 --- a/vendor/github.com/longhorn/backupstore/inspect.go +++ b/vendor/github.com/longhorn/backupstore/inspect.go @@ -76,6 +76,7 @@ func fillVolumeInfo(volume *Volume) *VolumeInfo { BackingImageName: volume.BackingImageName, BackingImageChecksum: volume.BackingImageChecksum, StorageClassname: volume.StorageClassName, + BackendStoreDriver: volume.BackendStoreDriver, } } diff --git a/vendor/github.com/longhorn/backupstore/list.go b/vendor/github.com/longhorn/backupstore/list.go index 3ec2a7cf2..10ba29289 100644 --- a/vendor/github.com/longhorn/backupstore/list.go +++ b/vendor/github.com/longhorn/backupstore/list.go @@ -28,6 +28,7 @@ type VolumeInfo struct { BackingImageName string BackingImageChecksum string StorageClassname string + BackendStoreDriver string } type BackupInfo struct { diff --git a/vendor/github.com/longhorn/backupstore/types.go b/vendor/github.com/longhorn/backupstore/types.go new file mode 100644 index 000000000..023f48d28 --- /dev/null +++ b/vendor/github.com/longhorn/backupstore/types.go @@ -0,0 +1,21 @@ +package backupstore + +const ( + DEFAULT_BLOCK_SIZE = 2 * 1024 * 1024 + LEGACY_COMPRESSION_METHOD = "gzip" + + BLOCKS_DIRECTORY = "blocks" + BLOCK_SEPARATE_LAYER1 = 2 + BLOCK_SEPARATE_LAYER2 = 4 + BLK_SUFFIX = ".blk" + + PROGRESS_PERCENTAGE_BACKUP_SNAPSHOT = 95 + PROGRESS_PERCENTAGE_BACKUP_TOTAL = 100 +) + +type BackendStoreDriver string + +const ( + BackendStoreDriverV1 = BackendStoreDriver("v1") + BackendStoreDriverV2 = BackendStoreDriver("v2") +) diff --git a/vendor/github.com/longhorn/backupstore/util.go b/vendor/github.com/longhorn/backupstore/util.go new file mode 100644 index 000000000..3e6ddbe26 --- /dev/null +++ b/vendor/github.com/longhorn/backupstore/util.go @@ -0,0 +1,52 @@ +package backupstore + +import ( + "context" + "path/filepath" + "sync" +) + +func getBlockPath(volumeName string) string { + return filepath.Join(getVolumePath(volumeName), BLOCKS_DIRECTORY) + "/" +} + +func getBlockFilePath(volumeName, checksum string) string { + blockSubDirLayer1 := checksum[0:BLOCK_SEPARATE_LAYER1] + blockSubDirLayer2 := checksum[BLOCK_SEPARATE_LAYER1:BLOCK_SEPARATE_LAYER2] + path := filepath.Join(getBlockPath(volumeName), blockSubDirLayer1, blockSubDirLayer2) + fileName := checksum + BLK_SUFFIX + + return filepath.Join(path, fileName) +} + +// mergeErrorChannels will merge all error channels into a single error out channel. +// the error out channel will be closed once the ctx is done or all error channels are closed +// if there is an error on one of the incoming channels the error will be relayed. +func mergeErrorChannels(ctx context.Context, channels ...<-chan error) <-chan error { + var wg sync.WaitGroup + wg.Add(len(channels)) + + out := make(chan error, len(channels)) + output := func(c <-chan error) { + defer wg.Done() + select { + case err, ok := <-c: + if ok { + out <- err + } + return + case <-ctx.Done(): + return + } + } + + for _, c := range channels { + go output(c) + } + + go func() { + wg.Wait() + close(out) + }() + return out +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 42781ccde..e954f7a39 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -130,7 +130,7 @@ github.com/kr/pretty # github.com/kr/text v0.2.0 ## explicit github.com/kr/text -# github.com/longhorn/backupstore v0.0.0-20230830075002-fa25b1a97ffd +# github.com/longhorn/backupstore v0.0.0-20230917124937-b5235e5ee814 ## explicit; go 1.17 github.com/longhorn/backupstore github.com/longhorn/backupstore/azblob