Skip to content

Commit

Permalink
Introduce a properties struct for saving object metadata (#42)
Browse files Browse the repository at this point in the history
* Introduce a properties struct for saving objects

This allows us to set properties like cache control when we save objects

* fix tests

* change to a pointer
  • Loading branch information
mjh1 authored Jun 13, 2023
1 parent 153c5e9 commit e131f40
Show file tree
Hide file tree
Showing 9 changed files with 54 additions and 32 deletions.
11 changes: 8 additions & 3 deletions drivers/drivers.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ type FileInfoReader struct {
ContentRange string
}

type FileProperties struct {
Metadata map[string]string
CacheControl string
}

var AvailableDrivers = []OSDriver{
&FSOS{},
&GsOS{},
Expand Down Expand Up @@ -125,7 +130,7 @@ const (
type OSSession interface {
OS() OSDriver

SaveData(ctx context.Context, name string, data io.Reader, meta map[string]string, timeout time.Duration) (string, error)
SaveData(ctx context.Context, name string, data io.Reader, fields *FileProperties, timeout time.Duration) (string, error)
EndSession()

// Info in order to have this session used via RPC
Expand Down Expand Up @@ -301,14 +306,14 @@ func ParseOSURL(input string, useFullAPI bool) (OSDriver, error) {
}

// SaveRetried tries to SaveData specified number of times
func SaveRetried(ctx context.Context, sess OSSession, name string, data []byte, meta map[string]string, retryCount int) (string, error) {
func SaveRetried(ctx context.Context, sess OSSession, name string, data []byte, fields *FileProperties, retryCount int) (string, error) {
if retryCount < 1 {
return "", fmt.Errorf("invalid retry count %d", retryCount)
}
var uri string
var err error
for i := 0; i < retryCount; i++ {
uri, err = sess.SaveData(ctx, name, bytes.NewReader(data), meta, 0)
uri, err = sess.SaveData(ctx, name, bytes.NewReader(data), fields, 0)
if err == nil {
return uri, err
}
Expand Down
4 changes: 3 additions & 1 deletion drivers/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ type FSOS struct {
lock sync.RWMutex
}

var _ OSSession = (*FSSession)(nil)

type FSSession struct {
os *FSOS
path string
Expand Down Expand Up @@ -172,7 +174,7 @@ func (ostore *FSSession) GetInfo() *OSInfo {
return nil
}

func (ostore *FSSession) SaveData(ctx context.Context, name string, data io.Reader, meta map[string]string, timeout time.Duration) (string, error) {
func (ostore *FSSession) SaveData(ctx context.Context, name string, data io.Reader, fields *FileProperties, timeout time.Duration) (string, error) {
fullPath := ostore.getAbsoluteURI(name)
dir, name := path.Split(fullPath)
err := os.MkdirAll(dir, os.ModePerm)
Expand Down
18 changes: 11 additions & 7 deletions drivers/gs.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"google.golang.org/api/option"
)

var _ OSSession = (*gsSession)(nil)

type (
gsKeyJSON struct {
Type string `json:"type,omitempty"`
Expand Down Expand Up @@ -174,7 +176,7 @@ func (os *gsSession) DeleteFile(ctx context.Context, name string) error {
Delete(ctx)
}

func (os *gsSession) SaveData(ctx context.Context, name string, data io.Reader, meta map[string]string, timeout time.Duration) (string, error) {
func (os *gsSession) SaveData(ctx context.Context, name string, data io.Reader, fields *FileProperties, timeout time.Duration) (string, error) {
if os.useFullAPI {
if os.client == nil {
if err := os.createClient(); err != nil {
Expand All @@ -189,11 +191,13 @@ func (os *gsSession) SaveData(ctx context.Context, name string, data io.Reader,
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
wr := objh.NewWriter(ctx)
if len(meta) > 0 && wr.Metadata == nil {
wr.Metadata = make(map[string]string, len(meta))
}
for k, v := range meta {
wr.Metadata[k] = v
if fields != nil {
if len(fields.Metadata) > 0 && wr.Metadata == nil {
wr.Metadata = make(map[string]string, len(fields.Metadata))
}
for k, v := range fields.Metadata {
wr.Metadata[k] = v
}
}
data, contentType, err := os.peekContentType(name, data)
if err != nil {
Expand All @@ -211,7 +215,7 @@ func (os *gsSession) SaveData(ctx context.Context, name string, data io.Reader,
uri := os.getAbsURL(keyname)
return uri, err
}
return os.s3Session.SaveData(ctx, name, data, meta, timeout)
return os.s3Session.SaveData(ctx, name, data, fields, timeout)
}

type gsPageInfo struct {
Expand Down
4 changes: 3 additions & 1 deletion drivers/ipfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ type IpfsOS struct {
secret string
}

var _ OSSession = (*IpfsSession)(nil)

type IpfsSession struct {
os *IpfsOS
filename string
Expand Down Expand Up @@ -123,7 +125,7 @@ func (ostore *IpfsSession) DeleteFile(ctx context.Context, name string) error {
return ErrNotSupported
}

func (session *IpfsSession) SaveData(ctx context.Context, name string, data io.Reader, meta map[string]string, timeout time.Duration) (string, error) {
func (session *IpfsSession) SaveData(ctx context.Context, name string, data io.Reader, fields *FileProperties, timeout time.Duration) (string, error) {
// concatenate filename with name argument to get full filename, both may be empty
fullPath := session.getAbsolutePath(name)
if fullPath == "" {
Expand Down
4 changes: 3 additions & 1 deletion drivers/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ type MemoryOS struct {
lock sync.RWMutex
}

var _ OSSession = (*MemorySession)(nil)

type MemorySession struct {
os *MemoryOS
path string
Expand Down Expand Up @@ -220,7 +222,7 @@ func (ostore *MemoryOS) Description() string {
return "Memory driver."
}

func (ostore *MemorySession) SaveData(ctx context.Context, name string, data io.Reader, meta map[string]string, timeout time.Duration) (string, error) {
func (ostore *MemorySession) SaveData(ctx context.Context, name string, data io.Reader, fields *FileProperties, timeout time.Duration) (string, error) {
path, file := path.Split(ostore.getAbsolutePath(name))

ostore.dLock.Lock()
Expand Down
18 changes: 9 additions & 9 deletions drivers/overwrite_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ func TestOverwriteQueueShouldCallSave(t *testing.T) {

data1 := []byte("data01")

var meta map[string]string
mos.On("SaveData", "f1", dataReader(data1), meta, timeout).Return("not used", nil).Once()
var fields *FileProperties
mos.On("SaveData", "f1", dataReader(data1), fields, timeout).Return("not used", nil).Once()

oq.Save(data1)
oq.waitForQueueToClear(5 * time.Second)
Expand All @@ -51,10 +51,10 @@ func TestOverwriteQueueShouldRetry(t *testing.T) {

data1 := []byte("data01")

var meta map[string]string
mos.On("SaveData", "f1", dataReader(data1), meta, timeout).Return("not used", errors.New("no1")).Once()
var fields *FileProperties
mos.On("SaveData", "f1", dataReader(data1), fields, timeout).Return("not used", errors.New("no1")).Once()
timeout = time.Duration(float64(timeout) * timeoutMultiplier)
mos.On("SaveData", "f1", dataReader(data1), meta, timeout).Return("not used", nil).Once()
mos.On("SaveData", "f1", dataReader(data1), fields, timeout).Return("not used", nil).Once()

oq.Save(data1)
oq.waitForQueueToClear(5 * time.Second)
Expand All @@ -72,9 +72,9 @@ func TestOverwriteQueueShouldUseLastValue(t *testing.T) {
data2 := []byte("data02")
data3 := []byte("data03")

var meta map[string]string
mos.On("SaveData", "f1", dataReader(dataw1), meta, timeout).Return("not used", nil).Once()
mos.On("SaveData", "f1", dataReader(data3), meta, timeout).Return("not used", nil).Once()
var fields *FileProperties
mos.On("SaveData", "f1", dataReader(dataw1), fields, timeout).Return("not used", nil).Once()
mos.On("SaveData", "f1", dataReader(data3), fields, timeout).Return("not used", nil).Once()

mos.waitForCh = true
oq.Save(dataw1)
Expand All @@ -85,7 +85,7 @@ func TestOverwriteQueueShouldUseLastValue(t *testing.T) {

oq.waitForQueueToClear(5 * time.Second)
mos.AssertExpectations(t)
mos.AssertNotCalled(t, "SaveData", "f1", data2, meta, timeout)
mos.AssertNotCalled(t, "SaveData", "f1", data2, fields, timeout)
oq.StopAfter(0)
time.Sleep(10 * time.Millisecond)
}
Expand Down
19 changes: 12 additions & 7 deletions drivers/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ const (
defaultIgnoredRegion = "ignored"
)

var _ OSSession = (*s3Session)(nil)

// S3OS S3 backed object storage driver. For own storage access key and access key secret
// should be specified. To give to other nodes access to own S3 storage so called 'POST' policy
// is created. This policy is valid for S3_POLICY_EXPIRE_IN_HOURS hours.
Expand Down Expand Up @@ -372,13 +374,13 @@ func (os *s3Session) ReadDataRange(ctx context.Context, name, byteRange string)
return res, nil
}

func (os *s3Session) saveDataPut(ctx context.Context, name string, data io.Reader, meta map[string]string, timeout time.Duration) (string, error) {
func (os *s3Session) saveDataPut(ctx context.Context, name string, data io.Reader, fields *FileProperties, timeout time.Duration) (string, error) {
bucket := aws.String(os.bucket)
keyname := aws.String(path.Join(os.key, name))
var metadata map[string]*string
if len(meta) > 0 {
if fields != nil && len(fields.Metadata) > 0 {
metadata = make(map[string]*string)
for k, v := range meta {
for k, v := range fields.Metadata {
metadata[k] = aws.String(v)
}
}
Expand All @@ -399,6 +401,9 @@ func (os *s3Session) saveDataPut(ctx context.Context, name string, data io.Reade
Body: data,
ContentType: aws.String(contentType),
}
if fields != nil {
params.CacheControl = &fields.CacheControl
}
if timeout == 0 {
timeout = defaultSaveTimeout
}
Expand Down Expand Up @@ -428,12 +433,12 @@ func (os *s3Session) DeleteFile(ctx context.Context, name string) error {
return err
}

func (os *s3Session) SaveData(ctx context.Context, name string, data io.Reader, meta map[string]string, timeout time.Duration) (string, error) {
func (os *s3Session) SaveData(ctx context.Context, name string, data io.Reader, fields *FileProperties, timeout time.Duration) (string, error) {
if os.s3svc != nil {
return os.saveDataPut(ctx, name, data, meta, timeout)
return os.saveDataPut(ctx, name, data, fields, timeout)
}
_ = path.Join(os.host, os.key, name)
path, err := os.postData(ctx, name, data, meta, timeout)
path, err := os.postData(ctx, name, data, fields, timeout)
if err != nil {
// handle error
return "", err
Expand Down Expand Up @@ -480,7 +485,7 @@ func (os *s3Session) peekContentType(fileName string, data io.Reader) (*bufio.Re
}

// if s3 storage is not our own, we are saving data into it using POST request
func (os *s3Session) postData(ctx context.Context, fileName string, data io.Reader, meta map[string]string, timeout time.Duration) (string, error) {
func (os *s3Session) postData(ctx context.Context, fileName string, data io.Reader, props *FileProperties, timeout time.Duration) (string, error) {
data, fileType, err := os.peekContentType(fileName, data)
if err != nil {
return "", err
Expand Down
4 changes: 2 additions & 2 deletions drivers/session_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ func NewMockOSSession() *MockOSSession {
}
}

func (s *MockOSSession) SaveData(ctx context.Context, name string, data io.Reader, meta map[string]string, timeout time.Duration) (string, error) {
args := s.Called(name, data, meta, timeout)
func (s *MockOSSession) SaveData(ctx context.Context, name string, data io.Reader, fields *FileProperties, timeout time.Duration) (string, error) {
args := s.Called(name, data, fields, timeout)
if s.waitForCh {
s.back <- struct{}{}
<-s.waitCh
Expand Down
4 changes: 3 additions & 1 deletion drivers/w3s.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ type W3sOS struct {
pubId string
}

var _ OSSession = (*W3sSession)(nil)

type W3sSession struct {
os *W3sOS
}
Expand Down Expand Up @@ -134,7 +136,7 @@ func (session *W3sSession) DeleteFile(ctx context.Context, name string) error {
return ErrNotSupported
}

func (session *W3sSession) SaveData(ctx context.Context, name string, data io.Reader, meta map[string]string, timeout time.Duration) (string, error) {
func (session *W3sSession) SaveData(ctx context.Context, name string, data io.Reader, fields *FileProperties, timeout time.Duration) (string, error) {
if timeout <= 0 {
timeout = w3SDefaultSaveTimeout
}
Expand Down

0 comments on commit e131f40

Please sign in to comment.