Skip to content

Commit

Permalink
Added local file path backend option.
Browse files Browse the repository at this point in the history
  • Loading branch information
someone1 committed Jul 17, 2017
1 parent e196bda commit 35a3c4c
Show file tree
Hide file tree
Showing 6 changed files with 209 additions and 8 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ This project was inspired by the [duplicity project](http://duplicity.nongnu.org
- Auth details: https://godoc.org/github.com/aws/aws-sdk-go/aws/session#hdr-Environment_Variables
* Any S3 Compatible Storage Provider (e.g. Minio, StorageMadeEasy, Ceph, etc.)
- Set the AWS_S3_CUSTOM_ENDPOINT environmental variable to the compatible target API URI
* Local file path (file://[relative|/absolute]/local/path)


### Compression:
Expand Down
3 changes: 3 additions & 0 deletions backends/backends.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,16 @@ func GetBackendForPrefix(prefix string) (Backend, error) {
return &GoogleCloudStorageBackend{}, nil
case AWSS3BackendPrefix:
return &AWSS3Backend{}, nil
case FileBackendPrefix:
return &FileBackend{}, nil
default:
return nil, ErrInvalidPrefix
}
}

type uploadWrapper func(vol *helpers.VolumeInfo) func() error

// Helper function to cancel uploads and do an exponential backoff when retrying an upload
func uploader(ctx context.Context, u uploadWrapper, prefix string, b backoff.BackOff, in <-chan *helpers.VolumeInfo, out chan<- *helpers.VolumeInfo) {
for vol := range in {
select {
Expand Down
186 changes: 186 additions & 0 deletions backends/file_backend.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
// Copyright © 2016 Prateek Malhotra ([email protected])
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package backends

import (
"context"
"io"
"os"
"path/filepath"
"strings"
"sync"

"github.com/someone1/zfsbackup-go/helpers"
)

// FileBackendPrefix is the URI prefix used for the FileBackend.
const FileBackendPrefix = "file"

// FileBackend provides a local destination storage option.
type FileBackend struct {
conf *BackendConfig
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
localPath string
}

// Init will initialize the FileBackend and verify the provided URI is valid/exists.
func (f *FileBackend) Init(ctx context.Context, conf *BackendConfig) error {
f.conf = conf
f.ctx, f.cancel = context.WithCancel(ctx)

cleanPrefix := strings.TrimPrefix(f.conf.TargetURI, "file://")
if cleanPrefix == f.conf.TargetURI {
return ErrInvalidURI
}

absLocalPath, err := filepath.Abs(cleanPrefix)
if err != nil {
helpers.AppLogger.Errorf("file backend: Error while verifying path %s - %v", cleanPrefix, err)
return ErrInvalidURI
}

fi, err := os.Stat(absLocalPath)
if err != nil {
helpers.AppLogger.Errorf("file backend: Error while verifying path %s - %v", absLocalPath, err)
return ErrInvalidURI
}

if !fi.IsDir() {
helpers.AppLogger.Errorf("file backend: Provided path is not a directory!")
return ErrInvalidURI
}

f.localPath = absLocalPath
return nil
}

// StartUpload will begin the file copy workers
func (f *FileBackend) StartUpload(in <-chan *helpers.VolumeInfo) <-chan *helpers.VolumeInfo {
out := make(chan *helpers.VolumeInfo)
f.wg.Add(f.conf.MaxParallelUploads)
for i := 0; i < f.conf.MaxParallelUploads; i++ {
go func() {
uploader(f.ctx, f.uploadWrapper, "file", f.conf.getExpBackoff(), in, out)
f.wg.Done()
}()
}

go func() {
f.Wait()
close(out)
}()

return out
}

func (f *FileBackend) uploadWrapper(vol *helpers.VolumeInfo) func() error {
return func() error {
select {
case <-f.ctx.Done():
return nil
default:
f.conf.MaxParallelUploadBuffer <- true
defer func() {
<-f.conf.MaxParallelUploadBuffer
}()

if err := vol.OpenVolume(); err != nil {
helpers.AppLogger.Debugf("file backend: Error while opening volume %s - %v", vol.ObjectName, err)
return err
}
defer vol.Close()
destinationPath := filepath.Join(f.localPath, vol.ObjectName)
destinationDir := filepath.Dir(destinationPath)

if err := os.MkdirAll(destinationDir, os.ModePerm); err != nil {
helpers.AppLogger.Debugf("file backend: Could not create path %s due to error - %v", destinationDir, err)
return nil
}

w, err := os.Create(destinationPath)
if err != nil {
helpers.AppLogger.Debugf("file backend: Could not create file %s due to error - %v", destinationPath, err)
return nil
}

_, err = io.Copy(w, vol)
if err != nil {
helpers.AppLogger.Debugf("file backend: Error while copying volume %s - %v", vol.ObjectName, err)
// Check if the context was canceled, and if so, don't let the backoff function retry
select {
case <-f.ctx.Done():
return nil
default:
}
}
return err
}
}
}

// Delete will delete the given object from the provided path
func (f *FileBackend) Delete(filename string) error {
return os.Remove(filepath.Join(f.localPath, filename))
}

// PreDownload does nothing on this backend.
func (f *FileBackend) PreDownload(objects []string) error {
return nil
}

// Get will open the file for reading
func (f *FileBackend) Get(filename string) (io.ReadCloser, error) {
return os.Open(filepath.Join(f.localPath, filename))
}

// Wait will wait until all volumes have been processed from the incoming
// channel.
func (f *FileBackend) Wait() {
f.wg.Wait()
}

// Close will signal the upload workers to stop processing the contents of the incoming channel
// and to close the outgoing channel. It will also cancel any ongoing requests.
func (f *FileBackend) Close() error {
f.cancel()
f.Wait()
return nil
}

// List will return a list of all files matching the provided prefix
func (f *FileBackend) List(prefix string) ([]string, error) {
l := make([]string, 0, 1000)
err := filepath.Walk(f.localPath, func(path string, fi os.FileInfo, werr error) error {
if werr != nil {
return werr
}

trimmedPath := strings.TrimPrefix(path, f.localPath+string(filepath.Separator))
if !fi.IsDir() && strings.HasPrefix(trimmedPath, prefix) {
l = append(l, trimmedPath)
}
return nil
})

return l, err
}
9 changes: 5 additions & 4 deletions backup/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,13 @@ func prepareBackend(ctx context.Context, j *helpers.JobInfo, backendURI string,
MaxRetryTime: j.MaxRetryTime,
}

prefix := backendURI[:2]
if backendURI == backends.DeleteBackendPrefix {
prefix = backendURI
prefix := strings.Split(backendURI, "://")
if len(prefix) < 1 {
helpers.AppLogger.Errorf("Invalid destination URI provided: %s.", backendURI)
panic(helpers.Exit{Code: 5})
}

backend, err := backends.GetBackendForPrefix(prefix)
backend, err := backends.GetBackendForPrefix(prefix[0])
if err != nil {
helpers.AppLogger.Errorf("Could not get backend for prefix %s.", prefix)
panic(helpers.Exit{Code: 6})
Expand Down
9 changes: 7 additions & 2 deletions cmd/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,14 @@ func updateReceiveJobInfo(args []string) {
}

for _, destination := range jobInfo.Destinations {
_, err := backends.GetBackendForPrefix(destination[:2])
prefix := strings.Split(destination, "://")
if len(prefix) < 2 {
helpers.AppLogger.Errorf("Invalid destination URI provided: %s.", destination)
panic(helpers.Exit{Code: 10})
}
_, err := backends.GetBackendForPrefix(prefix[0])
if err == backends.ErrInvalidPrefix {
helpers.AppLogger.Errorf("Unsupported prefix provided in destination URI, was given %s", destination[:2])
helpers.AppLogger.Errorf("Unsupported prefix provided in destination URI, was given %s", prefix[0])
panic(helpers.Exit{Code: 10})
}
}
Expand Down
9 changes: 7 additions & 2 deletions cmd/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,14 @@ func updateJobInfo(args []string) {
}

for _, destination := range jobInfo.Destinations {
_, err = backends.GetBackendForPrefix(destination[:2])
prefix := strings.Split(destination, "://")
if len(prefix) < 2 {
helpers.AppLogger.Errorf("Invalid destination URI provided: %s.", destination)
panic(helpers.Exit{Code: 10})
}
_, err := backends.GetBackendForPrefix(prefix[0])
if err == backends.ErrInvalidPrefix {
helpers.AppLogger.Errorf("Unsupported prefix provided in destination URI, was given %s", destination[:2])
helpers.AppLogger.Errorf("Unsupported prefix provided in destination URI, was given %s", prefix[0])
panic(helpers.Exit{Code: 10})
}
}
Expand Down

0 comments on commit 35a3c4c

Please sign in to comment.