Skip to content

Commit

Permalink
add directio writer for payload files
Browse files Browse the repository at this point in the history
Signed-off-by: Matthias Bertschy <[email protected]>
  • Loading branch information
matthyx committed Sep 28, 2024
1 parent 9018758 commit 15c0fe2
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 14 deletions.
50 changes: 50 additions & 0 deletions pkg/registry/file/directio.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"io"

"github.com/ncw/directio"
"github.com/spf13/afero"
)

// DirectIOReader is a reader that reads data from the underlying reader using direct I/O.
Expand Down Expand Up @@ -45,3 +46,52 @@ func (d *DirectIOReader) Read(p []byte) (int, error) {
func (d *DirectIOReader) ReadByte() (byte, error) {
panic("ReadByte not implemented, gob.Decode should not be using this")
}

// DirectIOWriter is a writer that writes data to the underlying writer using direct I/O.
type DirectIOWriter struct {
buf []byte
fileSize int64
off int
wr afero.File
}

var _ io.Writer = (*DirectIOWriter)(nil)

func NewDirectIOWriter(wr afero.File) *DirectIOWriter {
return &DirectIOWriter{
buf: directio.AlignedBlock(directio.BlockSize),
wr: wr,
}
}

func (d *DirectIOWriter) Close() error {
_, err := d.wr.Write(d.buf)
if err != nil {
return err
}
return d.wr.Truncate(d.fileSize + int64(d.off))
}

func (d *DirectIOWriter) Write(p []byte) (int, error) {
pointer := 0
for pointer < len(p) {
// copy data to the buffer
var n int
if len(p)-pointer > directio.BlockSize-d.off {
// too big, copy only the fitting data to the buffer
n = copy(d.buf[d.off:], p[pointer:pointer+directio.BlockSize-d.off])
// write data to the underlying writer
_, err := d.wr.Write(d.buf)
if err != nil {
return pointer, err
}
d.off = 0
} else {
n = copy(d.buf[d.off:], p[pointer:])
d.off += n
}
pointer += n
}
d.fileSize += int64(pointer)
return pointer, nil
}
37 changes: 23 additions & 14 deletions pkg/registry/file/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,17 +146,25 @@ func (s *StorageImpl) writeFiles(key string, obj runtime.Object, metaOut runtime
return fmt.Errorf("mkdir: %w", err)
}
// prepare payload file
payloadFile, err := s.appFs.OpenFile(makePayloadPath(p), os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
payloadFile, err := s.appFs.OpenFile(makePayloadPath(p), syscall.O_DIRECT|os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
if err != nil {
return fmt.Errorf("open payload file: %w", err)
}
directIOWriter := NewDirectIOWriter(payloadFile)
defer func() {
_ = directIOWriter.Close()
_ = payloadFile.Close()
}()
// prepare metadata file
metadataFile, err := s.appFs.OpenFile(makeMetadataPath(p), os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
if err != nil {
return fmt.Errorf("open metadata file: %w", err)
}
defer func() {
_ = metadataFile.Close()
}()
// write payload
payloadEncoder := gob.NewEncoder(payloadFile)
payloadEncoder := gob.NewEncoder(directIOWriter)
if err := payloadEncoder.Encode(obj); err != nil {
return fmt.Errorf("encode payload: %w", err)
}
Expand Down Expand Up @@ -226,16 +234,19 @@ func (s *StorageImpl) Delete(ctx context.Context, key string, metaOut runtime.Ob
spanLock.End()
p := filepath.Join(s.root, key)
// read metadata file
file, err := s.appFs.Open(makeMetadataPath(p))
metadataFile, err := s.appFs.Open(makeMetadataPath(p))
if err != nil {
if errors.Is(err, afero.ErrFileNotFound) {
return storage.NewKeyNotFoundError(key, 0)
}
logger.L().Ctx(ctx).Error("Delete - read file failed", helpers.Error(err), helpers.String("key", key))
return err
}
defer func() {
_ = metadataFile.Close()
}()
// try to fill metaOut
decoder := json.NewDecoder(file)
decoder := json.NewDecoder(metadataFile)
err = decoder.Decode(metaOut)
if err != nil {
logger.L().Ctx(ctx).Error("Delete - json unmarshal failed", helpers.Error(err), helpers.String("key", key))
Expand Down Expand Up @@ -303,6 +314,9 @@ func (s *StorageImpl) get(ctx context.Context, key string, opts storage.GetOptio
logger.L().Ctx(ctx).Error("Get - read file failed", helpers.Error(err), helpers.String("key", key))
return err
}
defer func() {
_ = payloadFile.Close()
}()
decoder := gob.NewDecoder(NewDirectIOReader(payloadFile))
err = decoder.Decode(objPtr)
if err != nil {
Expand Down Expand Up @@ -728,13 +742,16 @@ func (s *StorageImpl) appendObjectFromFile(path string, v reflect.Value) error {
key := s.keyFromPath(path)
s.locks.RLock(key)
defer s.locks.RUnlock(key)
file, err := s.appFs.Open(path)
metadataFile, err := s.appFs.Open(path)
if err != nil {
// skip if file is not readable, maybe it was deleted
return nil
}
defer func() {
_ = metadataFile.Close()
}()

obj, err := getUnmarshaledRuntimeObject(v, file)
obj, err := getUnmarshaledRuntimeObject(v, metadataFile)
if err != nil {
return nil
}
Expand All @@ -748,14 +765,6 @@ func getUnmarshaledRuntimeObject(v reflect.Value, file afero.File) (runtime.Obje
elem := v.Type().Elem()
obj := reflect.New(elem).Interface().(runtime.Object)

if strings.HasSuffix(file.Name(), GobExt) {
decoder := gob.NewDecoder(file)
if err := decoder.Decode(obj); err != nil {
return nil, err
}
return obj, nil
}

decoder := json.NewDecoder(file)
if err := decoder.Decode(obj); err != nil {
return nil, err
Expand Down

0 comments on commit 15c0fe2

Please sign in to comment.