Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] support external blob #580

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ require (
github.com/sirupsen/logrus v1.9.3
github.com/stretchr/testify v1.9.0
github.com/urfave/cli/v2 v2.27.2
github.com/vmihailenco/msgpack/v5 v5.4.1
go.etcd.io/bbolt v1.3.10
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56
golang.org/x/net v0.27.0
Expand All @@ -61,6 +62,8 @@ require (
k8s.io/utils v0.0.0-20240711033017-18e509b52bc8
)

require github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect

require (
github.com/AdamKorcz/go-118-fuzz-build v0.0.0-20230306123547-8075edf89bb0 // indirect
github.com/Microsoft/go-winio v0.6.2 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,10 @@ github.com/urfave/cli/v2 v2.27.2 h1:6e0H+AkS+zDckwPCUrZkKX38mRaau4nL2uipkJpbkcI=
github.com/urfave/cli/v2 v2.27.2/go.mod h1:g0+79LmHHATl7DAcHO99smiR/T7uGLw84w8Y42x+4eM=
github.com/vbatts/tar-split v0.11.5 h1:3bHCTIheBm1qFTcgh9oPu+nNBtX+XJIupG/vacinCts=
github.com/vbatts/tar-split v0.11.5/go.mod h1:yZbwRsSeGjusneWgA781EKej9HF8vme8okylkAeNKLk=
github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8=
github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok=
github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g=
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb h1:zGWFAtiMcyryUHoUjUJX0/lt1H2+i2Ka2n+D3DImSNo=
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
Expand Down
89 changes: 86 additions & 3 deletions pkg/converter/convert_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ func seekFile(ra content.ReaderAt, targetName string, handle func(io.Reader, *ta
// provided by the caller.
//
// Important: the caller must check `io.WriteCloser.Close() == nil` to ensure
// the conversion workflow is finished.
// the conversion workflow is finished if `io.WriteCloser == nil`.
func Pack(ctx context.Context, dest io.Writer, opt PackOption) (io.WriteCloser, error) {
if opt.FsVersion == "" {
opt.FsVersion = "6"
Expand Down Expand Up @@ -348,14 +348,97 @@ func Pack(ctx context.Context, dest io.Writer, opt PackOption) (io.WriteCloser,
return nil, fmt.Errorf("'--batch-size' can only be supported by fs version 6")
}

if opt.FromDir != "" {
return nil, packFromDirectory(ctx, dest, opt, builderPath, opt.FromDir)
}

if opt.features.Contains(tool.FeatureTar2Rafs) {
return packFromTar(ctx, dest, opt)
}

return packFromDirectory(ctx, dest, opt, builderPath)
return packFromUnpackedTar(ctx, dest, opt, builderPath)
}

func packFromDirectory(ctx context.Context, dest io.Writer, opt PackOption, builderPath, sourceDir string) error {
workDir, err := ensureWorkDir(opt.WorkDir)
if err != nil {
return errors.Wrap(err, "ensure work directory")
}
defer func() {
if err != nil {
os.RemoveAll(workDir)
}
}()

blobPath := filepath.Join(workDir, "blob")
blobFifo, err := fifo.OpenFifo(ctx, blobPath, syscall.O_CREAT|syscall.O_RDONLY|syscall.O_NONBLOCK, 0640)
if err != nil {
return errors.Wrapf(err, "create fifo file for blob")
}
defer blobFifo.Close()

externalBlobPath := ""
var externalBlobFifo io.ReadWriteCloser
if opt.ExternalBlobWriter != nil {
var err error
externalBlobPath = filepath.Join(workDir, "external-blob")
externalBlobFifo, err = fifo.OpenFifo(ctx, externalBlobPath, syscall.O_CREAT|syscall.O_RDONLY|syscall.O_NONBLOCK, 0640)
if err != nil {
return errors.Wrapf(err, "create fifo file for external blob")
}
defer externalBlobFifo.Close()
}

go func() {
err := tool.Pack(tool.PackOption{
BuilderPath: builderPath,

BlobPath: blobPath,
ExternalBlobPath: externalBlobPath,
FsVersion: opt.FsVersion,
SourcePath: sourceDir,
ChunkDictPath: opt.ChunkDictPath,
AttributesPath: opt.AttributesPath,
PrefetchPatterns: opt.PrefetchPatterns,
AlignedChunk: opt.AlignedChunk,
ChunkSize: opt.ChunkSize,
BatchSize: opt.BatchSize,
Compressor: opt.Compressor,
Timeout: opt.Timeout,
Encrypt: opt.Encrypt,

Features: opt.features,
})
if err != nil {
blobFifo.Close()
}
}()

eg := errgroup.Group{}
eg.Go(func() error {
buffer := bufPool.Get().(*[]byte)
defer bufPool.Put(buffer)
if _, err := io.CopyBuffer(dest, blobFifo, *buffer); err != nil {
return errors.Wrap(err, "pack to nydus blob")
}
return nil
})

if opt.ExternalBlobWriter != nil {
eg.Go(func() error {
buffer := bufPool.Get().(*[]byte)
defer bufPool.Put(buffer)
if _, err := io.CopyBuffer(opt.ExternalBlobWriter, externalBlobFifo, *buffer); err != nil {
return errors.Wrap(err, "pack to nydus external blob")
}
return nil
})
}

return eg.Wait()
}

func packFromDirectory(ctx context.Context, dest io.Writer, opt PackOption, builderPath string) (io.WriteCloser, error) {
func packFromUnpackedTar(ctx context.Context, dest io.Writer, opt PackOption, builderPath string) (io.WriteCloser, error) {
workDir, err := ensureWorkDir(opt.WorkDir)
if err != nil {
return nil, errors.Wrap(err, "ensure work directory")
Expand Down
37 changes: 31 additions & 6 deletions pkg/converter/tool/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@ func isSignalKilled(err error) bool {
type PackOption struct {
BuilderPath string

BootstrapPath string
BlobPath string
ExternalBlobPath string
FsVersion string
SourcePath string
ChunkDictPath string
AttributesPath string
PrefetchPatterns string
Compressor string
OCIRef bool
Expand Down Expand Up @@ -75,7 +76,7 @@ type outputJSON struct {
Blobs []string
}

func buildPackArgs(option PackOption) []string {
func buildPackArgs(option PackOption) ([]string, error) {
if option.FsVersion == "" {
option.FsVersion = "6"
}
Expand All @@ -97,10 +98,25 @@ func buildPackArgs(option PackOption) []string {
if option.Features.Contains(FeatureTar2Rafs) {
args = append(
args,
"--type",
"tar-rafs",
"--blob-inline-meta",
)
info, err := os.Stat(option.SourcePath)
if err != nil {
return nil, err
}
if info.IsDir() {
args = append(
args,
"--type",
"dir-rafs",
)
} else {
args = append(
args,
"--type",
"tar-rafs",
)
}
if option.FsVersion == "6" {
args = append(
args,
Expand Down Expand Up @@ -140,9 +156,15 @@ func buildPackArgs(option PackOption) []string {
if option.Encrypt {
args = append(args, "--encrypt")
}
if option.AttributesPath != "" {
args = append(args, "--attributes", option.AttributesPath)
}
if option.ExternalBlobPath != "" {
args = append(args, "--external-blob", option.ExternalBlobPath)
}
args = append(args, option.SourcePath)

return args
return args, nil
}

func Pack(option PackOption) error {
Expand All @@ -157,7 +179,10 @@ func Pack(option PackOption) error {
defer cancel()
}

args := buildPackArgs(option)
args, err := buildPackArgs(option)
if err != nil {
return err
}
logrus.Debugf("\tCommand: %s %s", option.BuilderPath, strings.Join(args, " "))

cmd := exec.CommandContext(ctx, option.BuilderPath, args...)
Expand Down
7 changes: 7 additions & 0 deletions pkg/converter/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"context"
"errors"
"fmt"
"io"
"strings"
"time"

Expand Down Expand Up @@ -83,6 +84,12 @@ type PackOption struct {
Timeout *time.Duration
// Whether the generated Nydus blobs should be encrypted.
Encrypt bool
// Pack to nydus blob from a directory.
FromDir string
// Path to specified nydus attributes configuration file.
AttributesPath string
// Path to outputted nydus external blob file.
ExternalBlobWriter io.Writer

// Features keeps a feature list supported by newer version of builder,
// It is detected automatically, so don't export it.
Expand Down
59 changes: 59 additions & 0 deletions pkg/external/backend/backend.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package backend

import "context"

const (
DefaultFileChunkSize = 1024 * 1024 * 1 // 1 MB
DefaultThrottleFileSize = 1024 * 1024 * 2 // 2 MB
)

type Backend struct {
Type string `json:"type"`
Config map[string]string `json:"config"`
}

type Result struct {
Chunks []Chunk
Files []string
Backend Backend
}

type File struct {
RelativePath string
Size int64
}

// Handler is the interface for backend handler.
type Handler interface {
// Backend returns the backend information.
Backend(ctx context.Context) (*Backend, error)
// Handle handles the file and returns the object information.
Handle(ctx context.Context, file File) ([]Chunk, error)
}

type Chunk interface {
ObjectID() uint32
ObjectContent() interface{}
ObjectOffset() uint64
}

// SplitObjectOffsets splits the total size into object offsets
// with the specified chunk size.
func SplitObjectOffsets(totalSize, chunkSize int64) []uint64 {
objectOffsets := []uint64{}
if chunkSize <= 0 {
return objectOffsets
}

chunkN := totalSize / chunkSize

for i := int64(0); i < chunkN; i++ {
objectOffsets = append(objectOffsets, uint64(i*chunkSize))
}

if totalSize%chunkSize > 0 {
objectOffsets = append(objectOffsets, uint64(chunkN*chunkSize))
}

return objectOffsets
}
15 changes: 15 additions & 0 deletions pkg/external/backend/backend_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package backend

import (
"fmt"
"testing"
"unsafe"

"github.com/stretchr/testify/require"
)

func TestLayout(t *testing.T) {
require.Equal(t, fmt.Sprintf("%d", 4096), fmt.Sprintf("%d", unsafe.Sizeof(Header{})))
require.Equal(t, fmt.Sprintf("%d", 256), fmt.Sprintf("%d", unsafe.Sizeof(ChunkMeta{})))
require.Equal(t, fmt.Sprintf("%d", 256), fmt.Sprintf("%d", unsafe.Sizeof(ObjectMeta{})))
}
55 changes: 55 additions & 0 deletions pkg/external/backend/layout.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package backend

const MetaMagic uint32 = 0x0AF5_E1E2
const MetaVersion uint32 = 0x0000_0001

// Layout
//
// header: magic | version | chunk_meta_offset | object_meta_offset
// chunks: chunk_meta | chunk | chunk | ...
// objects: object_meta | [object_offsets] | object | object | ...

// 4096 bytes
type Header struct {
Magic uint32
Version uint32

ChunkMetaOffset uint32
ObjectMetaOffset uint32

Reserved2 [4080]byte
}

// 256 bytes
type ChunkMeta struct {
EntryCount uint32
EntrySize uint32

Reserved [248]byte
}

// 256 bytes
type ObjectMeta struct {
EntryCount uint32
// = 0 means indeterminate entry size, and len(object_offsets) > 0.
// > 0 means fixed entry size, and len(object_offsets) == 0.
EntrySize uint32

Reserved [248]byte
}

// 8 bytes
type ChunkOndisk struct {
ObjectIndex uint32
Reserved [4]byte
ObjectOffset uint64
}

// 4 bytes
type ObjectOffset uint32

// Size depends on different external backend implementations
type ObjectOndisk struct {
EntrySize uint32
EncodedData []byte
}
Loading
Loading