Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove DumpFile operations #616

Merged
merged 1 commit into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 17 additions & 21 deletions config/daemonconfig/daemonconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ package daemonconfig

import (
"encoding/json"
"os"
"reflect"
"strings"
"sync"

"github.com/pkg/errors"

Expand All @@ -36,7 +36,6 @@ type DaemonConfig interface {
StorageBackend() (StorageBackendType, *BackendConfig)
UpdateMirrors(mirrorsConfigDir, registryHost string) error
DumpString() (string, error)
DumpFile(path string) error
}

// Daemon configurations factory
Expand Down Expand Up @@ -122,19 +121,14 @@ type DeviceConfig struct {
} `json:"cache"`
}

// For nydusd as FUSE daemon. Serialize Daemon info and persist to a json file
// We don't have to persist configuration file for fscache since its configuration
// is passed through HTTP API.
func DumpConfigFile(c interface{}, path string) error {
if config.IsBackendSourceEnabled() {
c = serializeWithSecretFilter(c)
}
b, err := json.Marshal(c)
if err != nil {
return errors.Wrapf(err, "marshal config")
}
var configRWMutex sync.RWMutex

return os.WriteFile(path, b, 0600)
type SupplementInfoInterface interface {
GetImageID() string
GetSnapshotID() string
IsVPCRegistry() bool
GetLabels() map[string]string
GetParams() map[string]string
}

func DumpConfigString(c interface{}) (string, error) {
Expand All @@ -143,20 +137,22 @@ func DumpConfigString(c interface{}) (string, error) {
}

// Achieve a daemon configuration from template or snapshotter's configuration
func SupplementDaemonConfig(c DaemonConfig, imageID, snapshotID string,
vpcRegistry bool, labels map[string]string, params map[string]string) error {
func SupplementDaemonConfig(c DaemonConfig, info SupplementInfoInterface) error {

configRWMutex.Lock()
defer configRWMutex.Unlock()

image, err := registry.ParseImage(imageID)
image, err := registry.ParseImage(info.GetImageID())
if err != nil {
return errors.Wrapf(err, "parse image %s", imageID)
return errors.Wrapf(err, "parse image %s", info.GetImageID())
}

backendType, _ := c.StorageBackend()

switch backendType {
case backendTypeRegistry:
registryHost := image.Host
if vpcRegistry {
if info.IsVPCRegistry() {
registryHost = registry.ConvertToVPCHost(registryHost)
} else if registryHost == "docker.io" {
// For docker.io images, we should use index.docker.io
Expand All @@ -170,8 +166,8 @@ func SupplementDaemonConfig(c DaemonConfig, imageID, snapshotID string,
// If no auth is provided, don't touch auth from provided nydusd configuration file.
// We don't validate the original nydusd auth from configuration file since it can be empty
// when repository is public.
keyChain := auth.GetRegistryKeyChain(registryHost, imageID, labels)
c.Supplement(registryHost, image.Repo, snapshotID, params)
keyChain := auth.GetRegistryKeyChain(registryHost, info.GetImageID(), info.GetLabels())
c.Supplement(registryHost, image.Repo, info.GetSnapshotID(), info.GetParams())
c.FillAuth(keyChain)

// Localfs and OSS backends don't need any update,
Expand Down
9 changes: 0 additions & 9 deletions config/daemonconfig/fscache.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ package daemonconfig
import (
"encoding/json"
"os"
"path"

"github.com/containerd/log"
"github.com/containerd/nydus-snapshotter/pkg/auth"
Expand Down Expand Up @@ -121,11 +120,3 @@ func (c *FscacheDaemonConfig) FillAuth(kc *auth.PassKeyChain) {
func (c *FscacheDaemonConfig) DumpString() (string, error) {
return DumpConfigString(c)
}

func (c *FscacheDaemonConfig) DumpFile(f string) error {
if err := os.MkdirAll(path.Dir(f), 0755); err != nil {
return err
}

return DumpConfigFile(c, f)
}
10 changes: 2 additions & 8 deletions config/daemonconfig/fuse.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ package daemonconfig
import (
"encoding/json"
"os"
"path"

"github.com/pkg/errors"

Expand Down Expand Up @@ -92,12 +91,7 @@ func (c *FuseDaemonConfig) StorageBackend() (string, *BackendConfig) {
}

func (c *FuseDaemonConfig) DumpString() (string, error) {
configRWMutex.Lock()
defer configRWMutex.Unlock()
return DumpConfigString(c)
}

func (c *FuseDaemonConfig) DumpFile(f string) error {
if err := os.MkdirAll(path.Dir(f), 0755); err != nil {
return err
}
return DumpConfigFile(c, f)
}
2 changes: 1 addition & 1 deletion misc/snapshotter/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ FROM base AS kubectl-sourcer
ARG TARGETARCH

RUN apk add -q --no-cache curl && \
curl -fsSL -o /usr/bin/kubectl https://storage.googleapis.com/kubernetes-release/release/"$(curl -L -s https://dl.k8s.io/release/stable.txt)"/bin/linux/"$TARGETARCH"/kubectl && \
curl -fsSL -o /usr/bin/kubectl https://dl.k8s.io/release/"$(curl -L -s https://dl.k8s.io/release/stable.txt)"/bin/linux/"$TARGETARCH"/kubectl && \
chmod +x /usr/bin/kubectl

FROM base
Expand Down
55 changes: 43 additions & 12 deletions pkg/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,21 @@ type Daemon struct {
state types.DaemonState
}

type NydusdSupplementInfo struct {
DaemonState ConfigState
ImageID string
SnapshotID string
Vpc bool
Labels map[string]string
Params map[string]string
}

func (s *NydusdSupplementInfo) GetImageID() string { return s.ImageID }
func (s *NydusdSupplementInfo) GetSnapshotID() string { return s.SnapshotID }
func (s *NydusdSupplementInfo) IsVPCRegistry() bool { return s.Vpc }
func (s *NydusdSupplementInfo) GetLabels() map[string]string { return s.Labels }
func (s *NydusdSupplementInfo) GetParams() map[string]string { return s.Params }

func (d *Daemon) Lock() {
d.mu.Lock()
}
Expand Down Expand Up @@ -250,12 +265,7 @@ func (d *Daemon) sharedFusedevMount(rafs *rafs.Rafs) error {
return err
}

c, err := daemonconfig.NewDaemonConfig(d.States.FsDriver, d.ConfigFile(rafs.SnapshotID))
if err != nil {
return errors.Wrapf(err, "Failed to reload instance configuration %s",
d.ConfigFile(rafs.SnapshotID))
}

c := d.Config
cfg, err := c.DumpString()
if err != nil {
return errors.Wrap(err, "dump instance configuration")
Expand All @@ -280,12 +290,7 @@ func (d *Daemon) sharedErofsMount(ra *rafs.Rafs) error {
return errors.Wrapf(err, "failed to create fscache work dir %s", ra.FscacheWorkDir())
}

c, err := daemonconfig.NewDaemonConfig(d.States.FsDriver, d.ConfigFile(ra.SnapshotID))
if err != nil {
log.L.Errorf("Failed to reload daemon configuration %s, %s", d.ConfigFile(ra.SnapshotID), err)
return err
}

c := d.Config
cfgStr, err := c.DumpString()
if err != nil {
return err
Expand Down Expand Up @@ -650,3 +655,29 @@ func NewDaemon(opt ...NewDaemonOpt) (*Daemon, error) {

return d, nil
}

func (d *Daemon) MountByAPI() error {
rafs := d.RafsCache.Head()
if rafs == nil {
return errors.Wrapf(errdefs.ErrNotFound, "daemon %s no rafs instance associated", d.ID())
}
client, err := d.GetClient()
if err != nil {
return errors.Wrapf(err, "mount instance %s", rafs.SnapshotID)
}
bootstrap, err := rafs.BootstrapFile()
if err != nil {
return err
}
c := d.Config
cfg, err := c.DumpString()
if err != nil {
return errors.Wrap(err, "dump instance configuration")
}
err = client.Mount("/", bootstrap, cfg)
if err != nil {
return errors.Wrapf(err, "mount rafs instance MountByAPI()")
}
return nil

}
53 changes: 26 additions & 27 deletions pkg/filesystem/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,19 @@ func NewFileSystem(ctx context.Context, opt ...NewFSOpt) (*Filesystem, error) {
if err != nil {
return errors.Wrapf(err, "get filesystem manager for daemon %s", d.States.ID)
}

supplementInfo, err := fsManager.GetInfo(d.ID())
if err != nil {
return errors.Wrap(err, "GetInfo failed")
}

cfg := d.Config
err = daemonconfig.SupplementDaemonConfig(cfg, supplementInfo)
if err != nil {
return errors.Wrap(err, "supplement configuration")
}
d.Config = cfg

if err := fsManager.StartDaemon(d); err != nil {
return errors.Wrapf(err, "start daemon %s", d.ID())
}
Expand Down Expand Up @@ -232,7 +245,6 @@ func (fs *Filesystem) Mount(ctx context.Context, snapshotID string, labels map[s
// Instance already exists, how could this happen? Can containerd handle this case?
return nil
}

fsDriver := config.GetFsDriver()
if label.IsTarfsDataLayer(labels) {
fsDriver = config.FsDriverBlockdev
Expand Down Expand Up @@ -302,34 +314,25 @@ func (fs *Filesystem) Mount(ctx context.Context, snapshotID string, labels map[s
daemonconfig.WorkDir: workDir,
daemonconfig.CacheDir: cacheDir,
}
supplementInfo := &daemon.NydusdSupplementInfo{
DaemonState: d.States,
ImageID: imageID,
SnapshotID: snapshotID,
Vpc: false,
Labels: labels,
Params: params,
}
cfg := deepcopy.Copy(*fsManager.DaemonConfig).(daemonconfig.DaemonConfig)
err = daemonconfig.SupplementDaemonConfig(cfg, imageID, snapshotID, false, labels, params)
err = daemonconfig.SupplementDaemonConfig(cfg, supplementInfo)
if err != nil {
return errors.Wrap(err, "supplement configuration")
}

if errs := fsManager.AddSupplementInfo(supplementInfo); errs != nil {
return errors.Wrapf(err, "AddSupplementInfo failed %s", d.States.ID)
}
// TODO: How to manage rafs configurations on-disk? separated json config file or DB record?
// In order to recover erofs mount, the configuration file has to be persisted.
var configSubDir string
if useSharedDaemon {
configSubDir = snapshotID
} else {
// Associate daemon config object when creating a new daemon object to avoid
// reading disk file again and again.
// For shared daemon, each rafs instance has its own configuration, so we don't
// attach a config interface to daemon in this case.
d.Config = cfg
}

err = cfg.DumpFile(d.ConfigFile(configSubDir))
if err != nil {
if errors.Is(err, errdefs.ErrAlreadyExists) {
log.L.Debugf("Configuration file %s already exits", d.ConfigFile(configSubDir))
} else {
return errors.Wrap(err, "dump daemon configuration file")
}
}

d.Config = cfg
d.AddRafsInstance(rafs)

// if publicKey is not empty we should verify bootstrap file of image
Expand Down Expand Up @@ -596,10 +599,6 @@ func (fs *Filesystem) initSharedDaemon(fsManager *manager.Manager) (err error) {
// it is loaded when requesting mount api
// Dump the configuration file since it is reloaded when recovering the nydusd
d.Config = *fsManager.DaemonConfig
err = d.Config.DumpFile(d.ConfigFile(""))
if err != nil && !errors.Is(err, errdefs.ErrAlreadyExists) {
return errors.Wrapf(err, "dump configuration file %s", d.ConfigFile(""))
}

if err := fsManager.StartDaemon(d); err != nil {
return errors.Wrap(err, "start shared daemon")
Expand Down
14 changes: 10 additions & 4 deletions pkg/manager/daemon_adaptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@ func (m *Manager) StartDaemon(d *daemon.Daemon) error {
if err := cmd.Start(); err != nil {
return err
}
fsDriver := config.GetFsDriver()
isSharedFusedev := fsDriver == config.FsDriverFusedev && config.GetDaemonMode() == config.DaemonModeShared
useSharedDaemon := fsDriver == config.FsDriverFscache || isSharedFusedev

if !useSharedDaemon {
errs := d.MountByAPI()
if errs != nil {
return errors.Wrapf(err, "failed to mount")
}
}

d.Lock()
defer d.Unlock()
Expand Down Expand Up @@ -155,10 +165,6 @@ func (m *Manager) BuildDaemonCommand(d *daemon.Daemon, bin string, upgrade bool)
return nil, errors.Wrapf(err, "locate bootstrap %s", bootstrap)
}

cmdOpts = append(cmdOpts,
command.WithConfig(d.ConfigFile("")),
command.WithBootstrap(bootstrap),
)
if config.IsBackendSourceEnabled() {
configAPIPath := fmt.Sprintf(endpointGetBackend, d.States.ID)
cmdOpts = append(cmdOpts,
Expand Down
25 changes: 18 additions & 7 deletions pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,23 @@ func (m *Manager) AddDaemon(daemon *daemon.Daemon) error {
return nil
}

func (m *Manager) AddSupplementInfo(supplementInfo *daemon.NydusdSupplementInfo) error {
m.mu.Lock()
defer m.mu.Unlock()
if err := m.store.AddInfo(supplementInfo); err != nil {
return errors.Wrapf(err, "add supplementInfo %s", supplementInfo.DaemonState.ID)
}
return nil
}

func (m *Manager) GetInfo(daemonID string) (*daemon.NydusdSupplementInfo, error) {
info, err := m.store.GetInfo(daemonID)
if err != nil {
return nil, errors.Wrapf(err, "add supplementInfo %s", daemonID)
}
return info, nil
}

func (m *Manager) UpdateDaemon(daemon *daemon.Daemon) error {
m.mu.Lock()
defer m.mu.Unlock()
Expand Down Expand Up @@ -322,13 +339,7 @@ func (m *Manager) recoverDaemons(ctx context.Context,
}

if d.States.FsDriver == config.FsDriverFusedev {
cfg, err := daemonconfig.NewDaemonConfig(d.States.FsDriver, d.ConfigFile(""))
if err != nil {
log.L.Errorf("Failed to reload daemon configuration %s, %s", d.ConfigFile(""), err)
return err
}

d.Config = cfg
d.Config = *m.DaemonConfig
}

state, err := d.GetState()
Expand Down
3 changes: 3 additions & 0 deletions pkg/manager/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ type Store interface {
WalkRafsInstances(ctx context.Context, cb func(*rafs.Rafs) error) error

NextInstanceSeq() (uint64, error)

AddInfo(supplementInfo *daemon.NydusdSupplementInfo) error
GetInfo(daemonID string) (*daemon.NydusdSupplementInfo, error)
}

var _ Store = &store.DaemonRafsStore{}
8 changes: 8 additions & 0 deletions pkg/store/daemonstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ func (s *DaemonRafsStore) AddDaemon(d *daemon.Daemon) error {
return s.db.SaveDaemon(context.TODO(), d)
}

func (s *DaemonRafsStore) AddInfo(supplementInfo *daemon.NydusdSupplementInfo) error {
return s.db.SaveInfo(context.TODO(), supplementInfo)
}

func (s *DaemonRafsStore) GetInfo(imageID string) (*daemon.NydusdSupplementInfo, error) {
return s.db.GetSupplementInfo(context.TODO(), imageID)
}

func (s *DaemonRafsStore) UpdateDaemon(d *daemon.Daemon) error {
return s.db.UpdateDaemon(context.TODO(), d)
}
Expand Down
Loading