diff --git a/Makefile b/Makefile index 8e4f6f655..7793ed1e0 100644 --- a/Makefile +++ b/Makefile @@ -24,11 +24,8 @@ SUBDIRS:=ctriface taps misc profile EXTRAGOARGS:=-v -race -cover EXTRAGOARGS_NORACE:=-v EXTRATESTFILES:=vhive_test.go stats.go vhive.go functions.go -# User-level page faults are temporarily disabled (gh-807) -# WITHUPF:=-upfTest -# WITHLAZY:=-lazyTest -WITHUPF:= -WITHLAZY:= +WITHUPF:=-upfTest +WITHLAZY:=-lazyTest WITHSNAPSHOTS:=-snapshotsTest CTRDLOGDIR:=/tmp/ctrd-logs @@ -45,6 +42,11 @@ test-all: test-subdirs test-orch test-orch: test test-man +debug: + ./scripts/clean_fcctr.sh + sudo mkdir -m777 -p $(CTRDLOGDIR) && sudo env "PATH=$(PATH)" /usr/local/bin/firecracker-containerd --config /etc/firecracker-containerd/config.toml 1>$(CTRDLOGDIR)/fccd_orch_upf_log.out 2>$(CTRDLOGDIR)/fccd_orch_upf_log.err & + sudo env "PATH=$(PATH)" go test $(EXTRATESTFILES) -short $(EXTRAGOARGS) -args $(WITHSNAPSHOTS) $(WITHUPF) + test: ./scripts/clean_fcctr.sh sudo mkdir -m777 -p $(CTRDLOGDIR) && sudo env "PATH=$(PATH)" /usr/local/bin/firecracker-containerd --config /etc/firecracker-containerd/config.toml 1>$(CTRDLOGDIR)/fccd_orch_noupf_log.out 2>$(CTRDLOGDIR)/fccd_orch_noupf_log.err & diff --git a/bin/containerd-shim-aws-firecracker b/bin/containerd-shim-aws-firecracker index 9aec8ccaa..d02c5f7be 100755 --- a/bin/containerd-shim-aws-firecracker +++ b/bin/containerd-shim-aws-firecracker @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:4208028fa44c5897563f67b4f1c56efd1400d9a145826b2556773c9d1876bd93 -size 33776240 +oid sha256:593f7fff0ae8512859e08bd129375da10c13088d836cbaeb1608847b72c1cf28 +size 36382664 diff --git a/bin/default-rootfs.img b/bin/default-rootfs.img index 2a4d5e66a..5755b47a4 100644 --- a/bin/default-rootfs.img +++ b/bin/default-rootfs.img @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:58d378a908efd9b604da4659c64c81cb835932a00d382c10b0e2d0d8770fc7d7 -size 64409600 +oid sha256:f042d5db9797c16255db5a32d9644e8cad838a2ef46da9c08a6ae38a24111f97 +size 73342976 diff --git a/bin/firecracker b/bin/firecracker index 750f24cfe..639384a49 100755 --- a/bin/firecracker +++ b/bin/firecracker @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:8060c35d1669a57197985e4589b3e98f4a221b334c6d1f102aee62a3f77822cd +oid sha256:c44d9ea84a0ff0c5315ed0d3672494f77bafed6f6edaaf6b050a4b5e3425ebe1 size 10012224 diff --git a/bin/firecracker-containerd b/bin/firecracker-containerd index 7cb01c83b..464134b33 100755 --- a/bin/firecracker-containerd +++ b/bin/firecracker-containerd @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:1959d901c4a8a6bdf8394628d5c62c0d6eba23a976acfdf8f9173fc21bc26e68 -size 69041344 +oid sha256:257ee96d4b2cd4a04f61833eafc31e2d814bb923e42ee0386077873f082aaa07 +size 72481176 diff --git a/bin/firecracker-ctr b/bin/firecracker-ctr index 40d64e16f..f171ddc4b 100755 --- a/bin/firecracker-ctr +++ b/bin/firecracker-ctr @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:bfc28b8f8092d10190a4f7fc74cea36be12a9ef9f4ec81a3b7b46cde7d7ea857 -size 33034096 +oid sha256:2f60c7f0984dfdba666c9159761fe93357d6558dbbe7c3326bc4959b3bdf72b6 +size 35333240 diff --git a/configs/firecracker-containerd/firecracker-runtime.json b/configs/firecracker-containerd/firecracker-runtime.json index 0986eb94d..d6f2bf977 100644 --- a/configs/firecracker-containerd/firecracker-runtime.json +++ b/configs/firecracker-containerd/firecracker-runtime.json @@ -4,5 +4,6 @@ "kernel_args": "console=ttyS0 noapic reboot=k panic=1 pci=off nomodules ro systemd.journald.forward_to_console systemd.unit=firecracker.target init=/sbin/overlay-init", "root_drive": "/var/lib/firecracker-containerd/runtime/default-rootfs.img", "cpu_template": "T2", - "log_levels": ["info"] + "log_levels": ["debug"], + "debug": true } \ No newline at end of file diff --git a/cri/firecracker/coordinator.go b/cri/firecracker/coordinator.go index 9acfe3d3c..a8e5c6c60 100644 --- a/cri/firecracker/coordinator.go +++ b/cri/firecracker/coordinator.go @@ -26,13 +26,14 @@ import ( "context" "errors" "fmt" - "github.com/google/uuid" - "github.com/vhive-serverless/vhive/snapshotting" "strconv" "sync" "sync/atomic" "time" + "github.com/google/uuid" + "github.com/vhive-serverless/vhive/snapshotting" + log "github.com/sirupsen/logrus" "github.com/vhive-serverless/vhive/ctriface" ) @@ -169,6 +170,7 @@ func (c *coordinator) orchStartVM(ctx context.Context, image, revision string, e func (c *coordinator) orchLoadInstance(ctx context.Context, snap *snapshotting.Snapshot) (*funcInstance, error) { vmID := c.getVMID() + originVmID := vmID logger := log.WithFields( log.Fields{ "vmID": vmID, @@ -181,7 +183,7 @@ func (c *coordinator) orchLoadInstance(ctx context.Context, snap *snapshotting.S ctxTimeout, cancel := context.WithTimeout(ctx, time.Second*30) defer cancel() - resp, _, err := c.orch.LoadSnapshot(ctxTimeout, vmID, snap) + resp, _, err := c.orch.LoadSnapshot(ctxTimeout, originVmID, vmID, snap) if err != nil { logger.WithError(err).Error("failed to load VM") return nil, err diff --git a/ctriface/Makefile b/ctriface/Makefile index bc8f6fcd3..7b57901f1 100644 --- a/ctriface/Makefile +++ b/ctriface/Makefile @@ -23,14 +23,17 @@ EXTRAGOARGS:=-v -race -cover EXTRATESTFILES:=iface_test.go iface.go orch_options.go orch.go BENCHFILES:=bench_test.go iface.go orch_options.go orch.go -# User-level page faults are temporarily disabled (gh-807) -# WITHUPF:=-upf -# WITHLAZY:=-lazy -WITHUPF:= -WITHLAZY:= +WITHUPF:=-upf +WITHLAZY:=-lazy GOBENCH:=-v -timeout 1500s CTRDLOGDIR:=/tmp/ctrd-logs +debug: + ./../scripts/clean_fcctr.sh + sudo mkdir -m777 -p $(CTRDLOGDIR) && sudo env "PATH=$(PATH)" /usr/local/bin/firecracker-containerd --config /etc/firecracker-containerd/config.toml 1>$(CTRDLOGDIR)/ctriface_log.out 2>$(CTRDLOGDIR)/ctriface_log.err & + sudo env "PATH=$(PATH)" go test $(EXTRATESTFILES) $(EXTRAGOARGS) -args $(WITHUPF) + ./../scripts/clean_fcctr.sh + test: ./../scripts/clean_fcctr.sh sudo mkdir -m777 -p $(CTRDLOGDIR) && sudo env "PATH=$(PATH)" /usr/local/bin/firecracker-containerd --config /etc/firecracker-containerd/config.toml 1>$(CTRDLOGDIR)/ctriface_log.out 2>$(CTRDLOGDIR)/ctriface_log.err & diff --git a/ctriface/failing_test.go b/ctriface/failing_test.go index 079684066..36cb30a61 100644 --- a/ctriface/failing_test.go +++ b/ctriface/failing_test.go @@ -22,62 +22,49 @@ package ctriface -import ( - "context" - "os" - "testing" - "time" +// func TestStartSnapStop(t *testing.T) { +// // BROKEN BECAUSE StopVM does not work yet. +// // t.Skip("skipping failing test") +// log.SetFormatter(&log.TextFormatter{ +// TimestampFormat: ctrdlog.RFC3339NanoFixed, +// FullTimestamp: true, +// }) +// //log.SetReportCaller(true) // FIXME: make sure it's false unless debugging - ctrdlog "github.com/containerd/containerd/log" - "github.com/containerd/containerd/namespaces" - log "github.com/sirupsen/logrus" - "github.com/stretchr/testify/require" - "github.com/vhive-serverless/vhive/snapshotting" -) +// log.SetOutput(os.Stdout) -func TestStartSnapStop(t *testing.T) { - // BROKEN BECAUSE StopVM does not work yet. - t.Skip("skipping failing test") - log.SetFormatter(&log.TextFormatter{ - TimestampFormat: ctrdlog.RFC3339NanoFixed, - FullTimestamp: true, - }) - //log.SetReportCaller(true) // FIXME: make sure it's false unless debugging +// log.SetLevel(log.DebugLevel) - log.SetOutput(os.Stdout) +// testTimeout := 120 * time.Second +// ctx, cancel := context.WithTimeout(namespaces.WithNamespace(context.Background(), namespaceName), testTimeout) +// defer cancel() - log.SetLevel(log.DebugLevel) +// orch := NewOrchestrator("devmapper", "", WithTestModeOn(true)) - testTimeout := 120 * time.Second - ctx, cancel := context.WithTimeout(namespaces.WithNamespace(context.Background(), namespaceName), testTimeout) - defer cancel() +// vmID := "2" - orch := NewOrchestrator("devmapper", "", WithTestModeOn(true)) +// _, _, err := orch.StartVM(ctx, vmID, testImageName) +// require.NoError(t, err, "Failed to start VM") - vmID := "2" +// err = orch.PauseVM(ctx, vmID) +// require.NoError(t, err, "Failed to pause VM") - _, _, err := orch.StartVM(ctx, vmID, testImageName) - require.NoError(t, err, "Failed to start VM") +// snap := snapshotting.NewSnapshot(vmID, "/fccd/snapshots", testImageName) +// err = orch.CreateSnapshot(ctx, vmID, snap) +// require.NoError(t, err, "Failed to create snapshot of VM") - err = orch.PauseVM(ctx, vmID) - require.NoError(t, err, "Failed to pause VM") +// err = orch.StopSingleVM(ctx, vmID) +// require.NoError(t, err, "Failed to stop VM") - snap := snapshotting.NewSnapshot(vmID, "/fccd/snapshots", testImageName) - err = orch.CreateSnapshot(ctx, vmID, snap) - require.NoError(t, err, "Failed to create snapshot of VM") +// _, _, err = orch.LoadSnapshot(ctx, "1", vmID, snap) +// require.NoError(t, err, "Failed to load snapshot of VM") - err = orch.StopSingleVM(ctx, vmID) - require.NoError(t, err, "Failed to stop VM") +// _, err = orch.ResumeVM(ctx, vmID) +// require.NoError(t, err, "Failed to resume VM") - _, _, err = orch.LoadSnapshot(ctx, vmID, snap) - require.NoError(t, err, "Failed to load snapshot of VM") +// err = orch.StopSingleVM(ctx, vmID) +// require.NoError(t, err, "Failed to stop VM") - _, err = orch.ResumeVM(ctx, vmID) - require.NoError(t, err, "Failed to resume VM") - - err = orch.StopSingleVM(ctx, vmID) - require.NoError(t, err, "Failed to stop VM") - - _ = snap.Cleanup() - orch.Cleanup() -} +// _ = snap.Cleanup() +// orch.Cleanup() +// } diff --git a/ctriface/iface.go b/ctriface/iface.go index d6e79a114..9271e1f04 100644 --- a/ctriface/iface.go +++ b/ctriface/iface.go @@ -24,7 +24,8 @@ package ctriface import ( "context" - "github.com/vhive-serverless/vhive/snapshotting" + "encoding/json" + "net" "os" "os/exec" "path/filepath" @@ -33,6 +34,8 @@ import ( "syscall" "time" + "github.com/vhive-serverless/vhive/snapshotting" + log "github.com/sirupsen/logrus" "github.com/containerd/containerd" @@ -64,6 +67,8 @@ type StartVMResponse struct { const ( testImageName = "ghcr.io/ease-lab/helloworld:var_workload" + fileBackend = "File" + uffdBackend = "Uffd" ) // StartVM Boots a VM if it does not exist @@ -205,17 +210,18 @@ func (o *Orchestrator) StartVMWithEnvironment(ctx context.Context, vmID, imageNa if o.GetUPFEnabled() { logger.Debug("Registering VM with the memory manager") + logger.Debugf("TEST (startWithEnv): current vmID used to registerVM is %v", vmID) stateCfg := manager.SnapshotStateCfg{ - VMID: vmID, - GuestMemPath: o.getMemoryFile(vmID), - BaseDir: o.getVMBaseDir(vmID), - GuestMemSize: int(conf.MachineCfg.MemSizeMib) * 1024 * 1024, - IsLazyMode: o.isLazyMode, - VMMStatePath: o.getSnapshotFile(vmID), - WorkingSetPath: o.getWorkingSetFile(vmID), - // FIXME (gh-807) - //InstanceSockAddr: resp.UPFSockPath, + VMID: vmID, + GuestMemPath: o.getMemoryFile(vmID), + BaseDir: o.getVMBaseDir(vmID), + GuestMemSize: int(conf.MachineCfg.MemSizeMib) * 1024 * 1024, + IsLazyMode: o.isLazyMode, + VMMStatePath: o.getSnapshotFile(vmID), + WorkingSetPath: o.getWorkingSetFile(vmID), + InstanceSockAddr: o.uffdSockAddr, } + logger.Debugf("TEST: show snapStat to be registered: %+v", stateCfg) if err := o.memoryManager.RegisterVM(stateCfg); err != nil { return nil, nil, errors.Wrap(err, "failed to register VM with memory manager") // NOTE (Plamen): Potentially need a defer(DeregisteVM) here if RegisterVM is not last to execute @@ -447,12 +453,12 @@ func (o *Orchestrator) CreateSnapshot(ctx context.Context, vmID string, snap *sn } // LoadSnapshot Loads a snapshot of a VM -func (o *Orchestrator) LoadSnapshot(ctx context.Context, vmID string, snap *snapshotting.Snapshot) (_ *StartVMResponse, _ *metrics.Metric, retErr error) { +func (o *Orchestrator) LoadSnapshot(ctx context.Context, originVmID string, vmID string, snap *snapshotting.Snapshot) (_ *StartVMResponse, _ *metrics.Metric, retErr error) { var ( loadSnapshotMetric *metrics.Metric = metrics.NewMetric() tStart time.Time loadErr, activateErr error - loadDone = make(chan int) + // loadDone = make(chan int) ) logger := log.WithFields(log.Fields{"vmID": vmID}) @@ -494,55 +500,130 @@ func (o *Orchestrator) LoadSnapshot(ctx context.Context, vmID string, snap *snap conf := o.getVMConfig(vm) conf.LoadSnapshot = true conf.SnapshotPath = snap.GetSnapshotFilePath() - conf.MemFilePath = snap.GetMemFilePath() conf.ContainerSnapshotPath = containerSnap.GetDevicePath() + conf.MemBackend = &proto.MemoryBackend{ + BackendType: fileBackend, + BackendPath: snap.GetMemFilePath(), + } + // conf.ResumeVM = true + // conf.EnableDiffSnapshots = false + + var sendfdConn *net.UnixConn + // uffdListenerCh := make(chan struct{}, 1) + var listener net.Listener if o.GetUPFEnabled() { - if err := o.memoryManager.FetchState(vmID); err != nil { + logger.Debug("TEST: UPF is enabled") + conf.MemBackend.BackendType = uffdBackend + conf.MemBackend.BackendPath = o.uffdSockAddr + logger.Debugf("TEST: the upf socket: %s", conf.MemBackend.BackendPath) + if err != nil { + return nil, nil, errors.Wrapf(err, "failed to get UPF socket path for uffd backend") + } + + if err := o.memoryManager.FetchState(originVmID); err != nil { return nil, nil, err } + + logger.Debug("TEST: start listening to uffd socket") + if _, err := os.Stat(conf.MemBackend.BackendPath); err == nil { + os.Remove(conf.MemBackend.BackendPath) + } + + // =============================================================== + listener, err = net.Listen("unix", conf.MemBackend.BackendPath) + if err != nil { + logger.Error("failed to listen to uffd socket") + return + } + // defer listener.Close() + + // logger.Debug("Listening ...") + // conn, err := listener.Accept() + // if err != nil { + // logger.Error("failed to accept connection to uffd socket") + // return + // } + + // sendfdConn, _ = conn.(*net.UnixConn) + // close(uffdListenerCh) + + // time.Sleep(10 * time.Second) // TODO: sleep for 10 seconds to wait for the uffd socket to be ready } tStart = time.Now() - go func() { - defer close(loadDone) + confStr, _ := json.Marshal(conf) + logger.Debugf("TEST: CreateVM request: %s", confStr) - if _, loadErr = o.fcClient.CreateVM(ctx, conf); loadErr != nil { - logger.Error("Failed to load snapshot of the VM: ", loadErr) - logger.Errorf("snapFilePath: %s, memFilePath: %s, newSnapshotPath: %s", snap.GetSnapshotFilePath(), snap.GetMemFilePath(), containerSnap.GetDevicePath()) - files, err := os.ReadDir(filepath.Dir(snap.GetSnapshotFilePath())) - if err != nil { - logger.Error(err) - } + if _, loadErr := o.fcClient.CreateVM(ctx, conf); loadErr != nil { + logger.Error("Failed to load snapshot of the VM: ", loadErr) + logger.Errorf("snapFilePath: %s, memFilePath: %s, newSnapshotPath: %s", snap.GetSnapshotFilePath(), snap.GetMemFilePath(), containerSnap.GetDevicePath()) + files, err := os.ReadDir(filepath.Dir(snap.GetSnapshotFilePath())) + if err != nil { + logger.Error(err) + } - snapFiles := "" - for _, f := range files { - snapFiles += f.Name() + ", " - } + snapFiles := "" + for _, f := range files { + snapFiles += f.Name() + ", " + } - logger.Error(snapFiles) + logger.Error(snapFiles) - files, _ = os.ReadDir(filepath.Dir(containerSnap.GetDevicePath())) - if err != nil { - logger.Error(err) - } + files, _ = os.ReadDir(filepath.Dir(containerSnap.GetDevicePath())) + if err != nil { + logger.Error(err) + } - snapFiles = "" - for _, f := range files { - snapFiles += f.Name() + ", " - } - logger.Error(snapFiles) + snapFiles = "" + for _, f := range files { + snapFiles += f.Name() + ", " } - }() + logger.Error(snapFiles) + } + logger.Debug("TEST: CreatVM request sent") + + // <-loadDone if o.GetUPFEnabled() { - if activateErr = o.memoryManager.Activate(vmID); activateErr != nil { + logger.Debug("Listening ...") + conn, err := listener.Accept() // TODO: a question, must accept() first before connect()? + if err != nil { + logger.Error("failed to accept connection to uffd socket") + return + } + sendfdConn, _ = conn.(*net.UnixConn) + listener.Close() + // close(uffdListenerCh) + + logger.Debug("TEST: Registering VM with snap with the memory manager") + + // <-uffdListenerCh + + stateCfg := manager.SnapshotStateCfg{ + VMID: vmID, + GuestMemPath: o.getMemoryFile(vmID), + BaseDir: o.getVMBaseDir(vmID), + GuestMemSize: int(conf.MachineCfg.MemSizeMib) * 1024 * 1024, + IsLazyMode: o.isLazyMode, + VMMStatePath: o.getSnapshotFile(vmID), + WorkingSetPath: o.getWorkingSetFile(vmID), + InstanceSockAddr: o.uffdSockAddr, + } + if err := o.memoryManager.RegisterVMFromSnap(originVmID, stateCfg); err != nil { + logger.Error(err, "failed to register new VM with memory manager") + } + + logger.Debug("TEST: activate VM in mm") + if activateErr = o.memoryManager.Activate(originVmID, sendfdConn); activateErr != nil { logger.Warn("Failed to activate VM in the memory manager", activateErr) } + + // time.Sleep(30 * time.Minute) // pause to see fc logs } - <-loadDone + // <-loadDone loadSnapshotMetric.MetricMap[metrics.LoadVMM] = metrics.ToUS(time.Since(tStart)) diff --git a/ctriface/iface_test.go b/ctriface/iface_test.go index 7e678c6eb..cdfce8700 100644 --- a/ctriface/iface_test.go +++ b/ctriface/iface_test.go @@ -24,9 +24,7 @@ package ctriface import ( "context" "flag" - "fmt" "os" - "sync" "testing" "time" @@ -47,16 +45,13 @@ var ( func TestMain(m *testing.M) { flag.Parse() - - if *isUPFEnabled { - log.Error("User-level page faults are temporarily disabled (gh-807)") - os.Exit(-1) - } - os.Exit(m.Run()) } -func TestPauseSnapResume(t *testing.T) { +// Test for ctriface uffd feature +func TestStartSnapStop(t *testing.T) { + // BROKEN BECAUSE StopVM does not work yet. + // t.Skip("skipping failing test") log.SetFormatter(&log.TextFormatter{ TimestampFormat: ctrdlog.RFC3339NanoFixed, FullTimestamp: true, @@ -65,266 +60,334 @@ func TestPauseSnapResume(t *testing.T) { log.SetOutput(os.Stdout) - log.SetLevel(log.InfoLevel) + log.SetLevel(log.DebugLevel) testTimeout := 120 * time.Second ctx, cancel := context.WithTimeout(namespaces.WithNamespace(context.Background(), namespaceName), testTimeout) defer cancel() + // uffdSockDir := "/home/char/uffd" + // uffdSockAddr := uffdSockDir + "/uffd.sock" + uffdSockAddr := "/tmp/uffd.sock" orch := NewOrchestrator( "devmapper", "", WithTestModeOn(true), + WithSnapshots(true), WithUPF(*isUPFEnabled), - WithLazyMode(*isLazyMode), + WithUffdSockAddr(uffdSockAddr), ) - vmID := "4" - revision := "myrev-4" + vmID := "2" + log.Debug("STEP: StartVM") _, _, err := orch.StartVM(ctx, vmID, testImageName) require.NoError(t, err, "Failed to start VM") + log.Debug("STEP: PauseVM") err = orch.PauseVM(ctx, vmID) require.NoError(t, err, "Failed to pause VM") - snap := snapshotting.NewSnapshot(revision, "/fccd/snapshots", testImageName) - err = snap.CreateSnapDir() - require.NoError(t, err, "Failed to create snapshots directory") - + log.Debug("STEP: NewSnapshot and CreateSnapshot") + snap := snapshotting.NewSnapshot(vmID, "/fccd/snapshots", testImageName) err = orch.CreateSnapshot(ctx, vmID, snap) require.NoError(t, err, "Failed to create snapshot of VM") - _, err = orch.ResumeVM(ctx, vmID) - require.NoError(t, err, "Failed to resume VM") - + log.Debug("STEP: StopSingleVM") err = orch.StopSingleVM(ctx, vmID) require.NoError(t, err, "Failed to stop VM") - _ = snap.Cleanup() - orch.Cleanup() -} + originVmID := vmID + vmID = "3" -func TestStartStopSerial(t *testing.T) { - log.SetFormatter(&log.TextFormatter{ - TimestampFormat: ctrdlog.RFC3339NanoFixed, - FullTimestamp: true, - }) - //log.SetReportCaller(true) // FIXME: make sure it's false unless debugging - - log.SetOutput(os.Stdout) - - log.SetLevel(log.InfoLevel) - - testTimeout := 120 * time.Second - ctx, cancel := context.WithTimeout(namespaces.WithNamespace(context.Background(), namespaceName), testTimeout) - defer cancel() - - orch := NewOrchestrator( - "devmapper", - "", - WithTestModeOn(true), - WithUPF(*isUPFEnabled), - WithLazyMode(*isLazyMode), - ) + log.Debug("STEP: LoadSnapshot") + _, _, err = orch.LoadSnapshot(ctx, originVmID, vmID, snap) + require.NoError(t, err, "Failed to load snapshot of VM") - vmID := "5" + log.Debug("STEP: ResumeVM") + _, err = orch.ResumeVM(ctx, vmID) + require.NoError(t, err, "Failed to resume VM") - _, _, err := orch.StartVM(ctx, vmID, testImageName) - require.NoError(t, err, "Failed to start VM") + time.Sleep(30 * time.Second) + log.Debug("STEP: StopeSingleVM") err = orch.StopSingleVM(ctx, vmID) require.NoError(t, err, "Failed to stop VM") + log.Debug("STEP: Cleanup") + _ = snap.Cleanup() orch.Cleanup() } -func TestPauseResumeSerial(t *testing.T) { - log.SetFormatter(&log.TextFormatter{ - TimestampFormat: ctrdlog.RFC3339NanoFixed, - FullTimestamp: true, - }) - //log.SetReportCaller(true) // FIXME: make sure it's false unless debugging - - log.SetOutput(os.Stdout) - - log.SetLevel(log.InfoLevel) - - testTimeout := 120 * time.Second - ctx, cancel := context.WithTimeout(namespaces.WithNamespace(context.Background(), namespaceName), testTimeout) - defer cancel() - - orch := NewOrchestrator( - "devmapper", - "", - WithTestModeOn(true), - WithUPF(*isUPFEnabled), - WithLazyMode(*isLazyMode), - ) - - vmID := "6" +// func TestPauseSnapResume(t *testing.T) { +// log.SetFormatter(&log.TextFormatter{ +// TimestampFormat: ctrdlog.RFC3339NanoFixed, +// FullTimestamp: true, +// }) +// //log.SetReportCaller(true) // FIXME: make sure it's false unless debugging - _, _, err := orch.StartVM(ctx, vmID, testImageName) - require.NoError(t, err, "Failed to start VM") +// log.SetOutput(os.Stdout) - err = orch.PauseVM(ctx, vmID) - require.NoError(t, err, "Failed to pause VM") +// log.SetLevel(log.DebugLevel) - _, err = orch.ResumeVM(ctx, vmID) - require.NoError(t, err, "Failed to resume VM") +// testTimeout := 120 * time.Second +// ctx, cancel := context.WithTimeout(namespaces.WithNamespace(context.Background(), namespaceName), testTimeout) +// defer cancel() - err = orch.StopSingleVM(ctx, vmID) - require.NoError(t, err, "Failed to stop VM") +// orch := NewOrchestrator( +// "devmapper", +// "", +// WithTestModeOn(true), +// WithUPF(*isUPFEnabled), +// WithLazyMode(*isLazyMode), +// ) - orch.Cleanup() -} - -func TestStartStopParallel(t *testing.T) { - log.SetFormatter(&log.TextFormatter{ - TimestampFormat: ctrdlog.RFC3339NanoFixed, - FullTimestamp: true, - }) - //log.SetReportCaller(true) // FIXME: make sure it's false unless debugging - - log.SetOutput(os.Stdout) - - log.SetLevel(log.InfoLevel) - - testTimeout := 360 * time.Second - ctx, cancel := context.WithTimeout(namespaces.WithNamespace(context.Background(), namespaceName), testTimeout) - defer cancel() +// vmID := "4" +// revision := "myrev-4" - vmNum := 10 - vmIDBase := 7 +// _, _, err := orch.StartVM(ctx, vmID, testImageName) +// require.NoError(t, err, "Failed to start VM") - orch := NewOrchestrator( - "devmapper", - "", - WithTestModeOn(true), - WithUPF(*isUPFEnabled), - WithLazyMode(*isLazyMode), - ) +// err = orch.PauseVM(ctx, vmID) +// require.NoError(t, err, "Failed to pause VM") - // Pull image - _, err := orch.getImage(ctx, testImageName) - require.NoError(t, err, "Failed to pull image "+testImageName) - - { - var vmGroup sync.WaitGroup - for i := vmIDBase; i < vmNum; i++ { - vmGroup.Add(1) - go func(i int) { - defer vmGroup.Done() - vmID := fmt.Sprintf("%d", i) - _, _, err := orch.StartVM(ctx, vmID, testImageName) - require.NoError(t, err, "Failed to start VM "+vmID) - }(i) - } - vmGroup.Wait() - } - - { - var vmGroup sync.WaitGroup - for i := vmIDBase; i < vmNum; i++ { - vmGroup.Add(1) - go func(i int) { - defer vmGroup.Done() - vmID := fmt.Sprintf("%d", i) - err := orch.StopSingleVM(ctx, vmID) - require.NoError(t, err, "Failed to stop VM "+vmID) - }(i) - } - vmGroup.Wait() - } +// snap := snapshotting.NewSnapshot(revision, "/fccd/snapshots", testImageName) +// err = snap.CreateSnapDir() +// require.NoError(t, err, "Failed to create snapshots directory") - orch.Cleanup() -} +// err = orch.CreateSnapshot(ctx, vmID, snap) +// require.NoError(t, err, "Failed to create snapshot of VM") -func TestPauseResumeParallel(t *testing.T) { - log.SetFormatter(&log.TextFormatter{ - TimestampFormat: ctrdlog.RFC3339NanoFixed, - FullTimestamp: true, - }) - //log.SetReportCaller(true) // FIXME: make sure it's false unless debugging +// _, err = orch.ResumeVM(ctx, vmID) +// require.NoError(t, err, "Failed to resume VM") - log.SetOutput(os.Stdout) +// err = orch.StopSingleVM(ctx, vmID) +// require.NoError(t, err, "Failed to stop VM") - log.SetLevel(log.InfoLevel) +// _ = snap.Cleanup() +// orch.Cleanup() +// } - testTimeout := 120 * time.Second - ctx, cancel := context.WithTimeout(namespaces.WithNamespace(context.Background(), namespaceName), testTimeout) - defer cancel() +// func TestStartStopSerial(t *testing.T) { +// log.SetFormatter(&log.TextFormatter{ +// TimestampFormat: ctrdlog.RFC3339NanoFixed, +// FullTimestamp: true, +// }) +// //log.SetReportCaller(true) // FIXME: make sure it's false unless debugging - vmNum := 10 - vmIDBase := 17 +// log.SetOutput(os.Stdout) - orch := NewOrchestrator( - "devmapper", - "", - WithTestModeOn(true), - WithUPF(*isUPFEnabled), - WithLazyMode(*isLazyMode), - ) +// log.SetLevel(log.InfoLevel) - // Pull image - _, err := orch.getImage(ctx, testImageName) - require.NoError(t, err, "Failed to pull image "+testImageName) - - { - var vmGroup sync.WaitGroup - for i := vmIDBase; i < vmNum; i++ { - vmGroup.Add(1) - go func(i int) { - defer vmGroup.Done() - vmID := fmt.Sprintf("%d", i) - _, _, err := orch.StartVM(ctx, vmID, testImageName) - require.NoError(t, err, "Failed to start VM") - }(i) - } - vmGroup.Wait() - } - - { - var vmGroup sync.WaitGroup - for i := vmIDBase; i < vmNum; i++ { - vmGroup.Add(1) - go func(i int) { - defer vmGroup.Done() - vmID := fmt.Sprintf("%d", i) - err := orch.PauseVM(ctx, vmID) - require.NoError(t, err, "Failed to pause VM") - }(i) - } - vmGroup.Wait() - } - - { - var vmGroup sync.WaitGroup - for i := vmIDBase; i < vmNum; i++ { - vmGroup.Add(1) - go func(i int) { - defer vmGroup.Done() - vmID := fmt.Sprintf("%d", i) - _, err := orch.ResumeVM(ctx, vmID) - require.NoError(t, err, "Failed to resume VM") - }(i) - } - vmGroup.Wait() - } - - { - var vmGroup sync.WaitGroup - for i := vmIDBase; i < vmNum; i++ { - vmGroup.Add(1) - go func(i int) { - defer vmGroup.Done() - vmID := fmt.Sprintf("%d", i) - err := orch.StopSingleVM(ctx, vmID) - require.NoError(t, err, "Failed to stop VM") - }(i) - } - vmGroup.Wait() - } +// testTimeout := 120 * time.Second +// ctx, cancel := context.WithTimeout(namespaces.WithNamespace(context.Background(), namespaceName), testTimeout) +// defer cancel() - orch.Cleanup() -} +// orch := NewOrchestrator( +// "devmapper", +// "", +// WithTestModeOn(true), +// WithUPF(*isUPFEnabled), +// WithLazyMode(*isLazyMode), +// ) + +// vmID := "5" + +// _, _, err := orch.StartVM(ctx, vmID, testImageName) +// require.NoError(t, err, "Failed to start VM") + +// err = orch.StopSingleVM(ctx, vmID) +// require.NoError(t, err, "Failed to stop VM") + +// orch.Cleanup() +// } + +// func TestPauseResumeSerial(t *testing.T) { +// log.SetFormatter(&log.TextFormatter{ +// TimestampFormat: ctrdlog.RFC3339NanoFixed, +// FullTimestamp: true, +// }) +// //log.SetReportCaller(true) // FIXME: make sure it's false unless debugging + +// log.SetOutput(os.Stdout) + +// log.SetLevel(log.InfoLevel) + +// testTimeout := 120 * time.Second +// ctx, cancel := context.WithTimeout(namespaces.WithNamespace(context.Background(), namespaceName), testTimeout) +// defer cancel() + +// orch := NewOrchestrator( +// "devmapper", +// "", +// WithTestModeOn(true), +// WithUPF(*isUPFEnabled), +// WithLazyMode(*isLazyMode), +// ) + +// vmID := "6" + +// _, _, err := orch.StartVM(ctx, vmID, testImageName) +// require.NoError(t, err, "Failed to start VM") + +// err = orch.PauseVM(ctx, vmID) +// require.NoError(t, err, "Failed to pause VM") + +// _, err = orch.ResumeVM(ctx, vmID) +// require.NoError(t, err, "Failed to resume VM") + +// err = orch.StopSingleVM(ctx, vmID) +// require.NoError(t, err, "Failed to stop VM") + +// orch.Cleanup() +// } + +// func TestStartStopParallel(t *testing.T) { +// log.SetFormatter(&log.TextFormatter{ +// TimestampFormat: ctrdlog.RFC3339NanoFixed, +// FullTimestamp: true, +// }) +// //log.SetReportCaller(true) // FIXME: make sure it's false unless debugging + +// log.SetOutput(os.Stdout) + +// log.SetLevel(log.InfoLevel) + +// testTimeout := 360 * time.Second +// ctx, cancel := context.WithTimeout(namespaces.WithNamespace(context.Background(), namespaceName), testTimeout) +// defer cancel() + +// vmNum := 10 +// vmIDBase := 7 + +// orch := NewOrchestrator( +// "devmapper", +// "", +// WithTestModeOn(true), +// WithUPF(*isUPFEnabled), +// WithLazyMode(*isLazyMode), +// ) + +// // Pull image +// _, err := orch.getImage(ctx, testImageName) +// require.NoError(t, err, "Failed to pull image "+testImageName) + +// { +// var vmGroup sync.WaitGroup +// for i := vmIDBase; i < vmNum; i++ { +// vmGroup.Add(1) +// go func(i int) { +// defer vmGroup.Done() +// vmID := fmt.Sprintf("%d", i) +// _, _, err := orch.StartVM(ctx, vmID, testImageName) +// require.NoError(t, err, "Failed to start VM "+vmID) +// }(i) +// } +// vmGroup.Wait() +// } + +// { +// var vmGroup sync.WaitGroup +// for i := vmIDBase; i < vmNum; i++ { +// vmGroup.Add(1) +// go func(i int) { +// defer vmGroup.Done() +// vmID := fmt.Sprintf("%d", i) +// err := orch.StopSingleVM(ctx, vmID) +// require.NoError(t, err, "Failed to stop VM "+vmID) +// }(i) +// } +// vmGroup.Wait() +// } + +// orch.Cleanup() +// } + +// func TestPauseResumeParallel(t *testing.T) { +// log.SetFormatter(&log.TextFormatter{ +// TimestampFormat: ctrdlog.RFC3339NanoFixed, +// FullTimestamp: true, +// }) +// //log.SetReportCaller(true) // FIXME: make sure it's false unless debugging + +// log.SetOutput(os.Stdout) + +// log.SetLevel(log.InfoLevel) + +// testTimeout := 120 * time.Second +// ctx, cancel := context.WithTimeout(namespaces.WithNamespace(context.Background(), namespaceName), testTimeout) +// defer cancel() + +// vmNum := 10 +// vmIDBase := 17 + +// orch := NewOrchestrator( +// "devmapper", +// "", +// WithTestModeOn(true), +// WithUPF(*isUPFEnabled), +// WithLazyMode(*isLazyMode), +// ) + +// // Pull image +// _, err := orch.getImage(ctx, testImageName) +// require.NoError(t, err, "Failed to pull image "+testImageName) + +// { +// var vmGroup sync.WaitGroup +// for i := vmIDBase; i < vmNum; i++ { +// vmGroup.Add(1) +// go func(i int) { +// defer vmGroup.Done() +// vmID := fmt.Sprintf("%d", i) +// _, _, err := orch.StartVM(ctx, vmID, testImageName) +// require.NoError(t, err, "Failed to start VM") +// }(i) +// } +// vmGroup.Wait() +// } + +// { +// var vmGroup sync.WaitGroup +// for i := vmIDBase; i < vmNum; i++ { +// vmGroup.Add(1) +// go func(i int) { +// defer vmGroup.Done() +// vmID := fmt.Sprintf("%d", i) +// err := orch.PauseVM(ctx, vmID) +// require.NoError(t, err, "Failed to pause VM") +// }(i) +// } +// vmGroup.Wait() +// } + +// { +// var vmGroup sync.WaitGroup +// for i := vmIDBase; i < vmNum; i++ { +// vmGroup.Add(1) +// go func(i int) { +// defer vmGroup.Done() +// vmID := fmt.Sprintf("%d", i) +// _, err := orch.ResumeVM(ctx, vmID) +// require.NoError(t, err, "Failed to resume VM") +// }(i) +// } +// vmGroup.Wait() +// } + +// { +// var vmGroup sync.WaitGroup +// for i := vmIDBase; i < vmNum; i++ { +// vmGroup.Add(1) +// go func(i int) { +// defer vmGroup.Done() +// vmID := fmt.Sprintf("%d", i) +// err := orch.StopSingleVM(ctx, vmID) +// require.NoError(t, err, "Failed to stop VM") +// }(i) +// } +// vmGroup.Wait() +// } + +// orch.Cleanup() +// } diff --git a/ctriface/manual_cleanup_test.go b/ctriface/manual_cleanup_test.go index 85ff312c1..505247421 100644 --- a/ctriface/manual_cleanup_test.go +++ b/ctriface/manual_cleanup_test.go @@ -90,8 +90,9 @@ func TestSnapLoad(t *testing.T) { require.NoError(t, err, "Failed to offload VM") vmID = "2" + originVmID := "1" - _, _, err = orch.LoadSnapshot(ctx, vmID, snap) + _, _, err = orch.LoadSnapshot(ctx, originVmID, vmID, snap) require.NoError(t, err, "Failed to load snapshot of VM") _, err = orch.ResumeVM(ctx, vmID) @@ -150,9 +151,10 @@ func TestSnapLoadMultiple(t *testing.T) { err = orch.StopSingleVM(ctx, vmID) require.NoError(t, err, "Failed to offload VM") + originVmID := vmID vmID = "4" - _, _, err = orch.LoadSnapshot(ctx, vmID, snap) + _, _, err = orch.LoadSnapshot(ctx, originVmID, vmID, snap) require.NoError(t, err, "Failed to load snapshot of VM") _, err = orch.ResumeVM(ctx, vmID) @@ -161,9 +163,10 @@ func TestSnapLoadMultiple(t *testing.T) { err = orch.StopSingleVM(ctx, vmID) require.NoError(t, err, "Failed to offload VM") + originVmID = vmID vmID = "5" - _, _, err = orch.LoadSnapshot(ctx, vmID, snap) + _, _, err = orch.LoadSnapshot(ctx, originVmID, vmID, snap) require.NoError(t, err, "Failed to load snapshot of VM") _, err = orch.ResumeVM(ctx, vmID) @@ -234,10 +237,11 @@ func TestParallelSnapLoad(t *testing.T) { err = orch.StopSingleVM(ctx, vmID) require.NoError(t, err, "Failed to offload VM, "+vmID) + originVmID := vmID vmIDInt, _ := strconv.Atoi(vmID) vmID = strconv.Itoa(vmIDInt + 1) - _, _, err = orch.LoadSnapshot(ctx, vmID, snap) + _, _, err = orch.LoadSnapshot(ctx, originVmID, vmID, snap) require.NoError(t, err, "Failed to load snapshot of VM, "+vmID) _, err = orch.ResumeVM(ctx, vmID) @@ -357,9 +361,10 @@ func TestParallelPhasedSnapLoad(t *testing.T) { defer vmGroup.Done() vmID := fmt.Sprintf("%d", i+vmIDBase) snap := snapshotting.NewSnapshot(vmID, "/fccd/snapshots", testImageName) + originVmID := vmID vmIDInt, _ := strconv.Atoi(vmID) vmID = strconv.Itoa(vmIDInt + 1) - _, _, err := orch.LoadSnapshot(ctx, vmID, snap) + _, _, err := orch.LoadSnapshot(ctx, originVmID, vmID, snap) require.NoError(t, err, "Failed to load snapshot of VM, "+vmID) }(i) } @@ -466,8 +471,9 @@ func TestRemoteSnapLoad(t *testing.T) { ctx, cancel := context.WithTimeout(namespaces.WithNamespace(context.Background(), namespaceName), testTimeout) defer cancel() - vmID := "37" revision := "myrev-37" + originVmID := "37" + vmID := "38" _, err := os.Stat(remoteSnapshotsDir) require.NoError(t, err, "Failed to stat remote snapshots directory") @@ -482,7 +488,7 @@ func TestRemoteSnapLoad(t *testing.T) { snap := snapshotting.NewSnapshot(revision, remoteSnapshotsDir, testImageName) - _, _, err = orch.LoadSnapshot(ctx, vmID, snap) + _, _, err = orch.LoadSnapshot(ctx, originVmID, vmID, snap) require.NoError(t, err, "Failed to load remote snapshot of VM") _, err = orch.ResumeVM(ctx, vmID) diff --git a/ctriface/orch.go b/ctriface/orch.go index d7dc5df7b..cf7618776 100644 --- a/ctriface/orch.go +++ b/ctriface/orch.go @@ -23,7 +23,6 @@ package ctriface import ( - "github.com/vhive-serverless/vhive/devmapper" "os" "os/signal" "path/filepath" @@ -32,6 +31,8 @@ import ( "syscall" "time" + "github.com/vhive-serverless/vhive/devmapper" + log "github.com/sirupsen/logrus" "github.com/containerd/containerd" @@ -88,6 +89,7 @@ type Orchestrator struct { isUPFEnabled bool isLazyMode bool snapshotsDir string + uffdSockAddr string isMetricsMode bool netPoolSize int @@ -121,10 +123,21 @@ func NewOrchestrator(snapshotter, hostIface string, opts ...OrchestratorOption) } if o.GetUPFEnabled() { + // file, err := os.Create(o.uffdSockAddr) + // if err != nil { + // log.Fatalf("Failed to create socket file: %v", err) + // } + // defer file.Close() + // lg.UniLogger.Println("TEST: created the uffd sock addr") + managerCfg := manager.MemoryManagerCfg{ MetricsModeOn: o.isMetricsMode, + UffdSockAddr: o.uffdSockAddr, } o.memoryManager = manager.NewMemoryManager(managerCfg) + + // lg.UniLogger.Println("TEST: created a new memory manager. Start listen uffd socket") + // go o.memoryManager.ListenUffdSocket(o.uffdSockAddr) } log.Info("Creating containerd client") @@ -208,6 +221,11 @@ func (o *Orchestrator) GetSnapshotsDir() string { return o.snapshotsDir } +// TODO: /tmp/uffd/firecracker-containerd#3-0/uffd.sock +func (o *Orchestrator) getUffdSockAddr(vmID string) string { + return filepath.Join(o.getVMBaseDir(vmID), "uffd.sock") +} + func (o *Orchestrator) getSnapshotFile(vmID string) string { return filepath.Join(o.getVMBaseDir(vmID), "snap_file") } diff --git a/ctriface/orch_options.go b/ctriface/orch_options.go index 8e9896d5f..0446d945d 100644 --- a/ctriface/orch_options.go +++ b/ctriface/orch_options.go @@ -49,6 +49,13 @@ func WithUPF(isUPFEnabled bool) OrchestratorOption { } } +// WithUffdSockAddr Sets the socket path for Uffd communication +func WithUffdSockAddr(uffdSockAddr string) OrchestratorOption { + return func(o *Orchestrator) { + o.uffdSockAddr = uffdSockAddr + } +} + // WithSnapshotsDir Sets the directory where // snapshots should be stored func WithSnapshotsDir(snapshotsDir string) OrchestratorOption { diff --git a/functions.go b/functions.go index 91146d34a..290815a84 100644 --- a/functions.go +++ b/functions.go @@ -25,7 +25,6 @@ package main import ( "context" "fmt" - "github.com/vhive-serverless/vhive/ctriface" "math/rand" "net" "os" @@ -35,6 +34,8 @@ import ( "syscall" "time" + "github.com/vhive-serverless/vhive/ctriface" + "golang.org/x/sync/semaphore" "google.golang.org/grpc" "google.golang.org/grpc/backoff" @@ -361,8 +362,12 @@ func (f *Function) AddInstance() *metrics.Metric { if f.isSnapshotReady { var resp *ctriface.StartVMResponse + // resp, metr = f.LoadInstance(f.getVMID()) + + originVmID := fmt.Sprintf("%s-%d", f.fID, f.lastInstanceID-1) + currVmID := f.getVMID() + resp, metr = f.LoadInstance(originVmID, currVmID) - resp, metr = f.LoadInstance(f.getVMID()) f.guestIP = resp.GuestIP f.vmID = f.getVMID() f.lastInstanceID++ @@ -479,7 +484,7 @@ func (f *Function) CreateInstanceSnapshot() { // LoadInstance Loads a new instance of the function from its snapshot and resumes it // The tap, the shim and the vmID remain the same -func (f *Function) LoadInstance(vmID string) (*ctriface.StartVMResponse, *metrics.Metric) { +func (f *Function) LoadInstance(originVmID string, vmID string) (*ctriface.StartVMResponse, *metrics.Metric) { logger := log.WithFields(log.Fields{"fID": f.fID}) logger.Debug("Loading instance") @@ -492,7 +497,7 @@ func (f *Function) LoadInstance(vmID string) (*ctriface.StartVMResponse, *metric log.Panic(err) } - resp, loadMetr, err := orch.LoadSnapshot(ctx, vmID, snap) + resp, loadMetr, err := orch.LoadSnapshot(ctx, originVmID, vmID, snap) if err != nil { log.Panic(err) } diff --git a/go.mod b/go.mod index 09fb95f9e..6f116b3d1 100644 --- a/go.mod +++ b/go.mod @@ -39,8 +39,12 @@ replace ( ) replace ( - github.com/firecracker-microvm/firecracker-containerd => github.com/vhive-serverless/firecracker-containerd v0.0.0-20230912063208-ad6383f05e45 + github.com/firecracker-microvm/firecracker-containerd => ../firecracker-containerd + github.com/vhive-serverless/vhive/ctriface => ./ctriface + + // github.com/firecracker-microvm/firecracker-containerd => github.com/char-1ee/firecracker-containerd v0.0.0-20231018191519-49cac5eea134 github.com/vhive-serverless/vhive/examples/protobuf/helloworld => ./examples/protobuf/helloworld + github.com/vhive-serverless/vhive/lg => ./lg ) require ( diff --git a/go.sum b/go.sum index c5ed066b7..20ae4a93e 100644 --- a/go.sum +++ b/go.sum @@ -1022,8 +1022,6 @@ github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyC github.com/valyala/fasthttp v1.2.0/go.mod h1:4vX61m6KN+xDduDNwXrhIAVZaZaZiQ1luJk8LWSxF3s= github.com/valyala/quicktemplate v1.1.1/go.mod h1:EH+4AkTd43SvgIbQHYu59/cJyxDoOVRUAfrukLPuGJ4= github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio= -github.com/vhive-serverless/firecracker-containerd v0.0.0-20230912063208-ad6383f05e45 h1:B+2NmtrRoWgfYkaqqG9Dyqud5HRjfibFpB8wbqER/PQ= -github.com/vhive-serverless/firecracker-containerd v0.0.0-20230912063208-ad6383f05e45/go.mod h1:XC5a/4PWbzipD5Ron745odZxoVy/J6d8xFldwTZJbSU= github.com/vishvananda/netlink v0.0.0-20171020171820-b2de5d10e38e/go.mod h1:+SR5DhBJrl6ZM7CoCKvpw5BKroDKQ+PJqOg65H/2ktk= github.com/vishvananda/netlink v0.0.0-20181108222139-023a6dafdcdf/go.mod h1:+SR5DhBJrl6ZM7CoCKvpw5BKroDKQ+PJqOg65H/2ktk= github.com/vishvananda/netlink v1.1.0/go.mod h1:cTgwzPIzzgDAYoQrMm0EdrjRUBkTqKYppBueQtXaqoE= diff --git a/lg/uni_logger.go b/lg/uni_logger.go new file mode 100644 index 000000000..f626cf650 --- /dev/null +++ b/lg/uni_logger.go @@ -0,0 +1,17 @@ +package lg + +import ( + "log" + "os" + // log "github.com/sirupsen/logrus" +) + +var UniLogger *log.Logger + +func init() { + file, err := os.OpenFile("uni_output.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) + if err != nil { + log.Fatalln("Failed to open log file:", err) + } + UniLogger = log.New(file, "DEBUG: ", log.Ldate|log.Ltime|log.Lshortfile) +} diff --git a/memory/manager/manager.go b/memory/manager/manager.go index c1f9464aa..b96acd5d9 100644 --- a/memory/manager/manager.go +++ b/memory/manager/manager.go @@ -26,6 +26,7 @@ import ( "encoding/csv" "errors" "fmt" + "net" "os" "strconv" "sync" @@ -46,13 +47,17 @@ const ( // MemoryManagerCfg Global config of the manager type MemoryManagerCfg struct { MetricsModeOn bool + UffdSockAddr string // it could not be appropriate to put sock here } // MemoryManager Serves page faults coming from VMs type MemoryManager struct { sync.Mutex MemoryManagerCfg - instances map[string]*SnapshotState // Indexed by vmID + instances map[string]*SnapshotState // Indexed by vmID + origins map[string]string // Track parent vm for vm loaded from snapshot + startEpollingCh chan struct{} + startEpollingOnce sync.Once } // NewMemoryManager Initializes a new memory manager @@ -61,7 +66,10 @@ func NewMemoryManager(cfg MemoryManagerCfg) *MemoryManager { m := new(MemoryManager) m.instances = make(map[string]*SnapshotState) + m.origins = make(map[string]string) + m.startEpollingCh = make(chan struct{}, 1) m.MemoryManagerCfg = cfg + m.startEpollingOnce = sync.Once{} return m } @@ -86,7 +94,31 @@ func (m *MemoryManager) RegisterVM(cfg SnapshotStateCfg) error { state := NewSnapshotState(cfg) m.instances[vmID] = state + return nil +} + +// RegisterVMFromSnap Registers a VM that is loaded from snapshot within the memory manager +func (m *MemoryManager) RegisterVMFromSnap(originVmID string, cfg SnapshotStateCfg) error { + m.Lock() + defer m.Unlock() + + vmID := cfg.VMID + + logger := log.WithFields(log.Fields{"vmID": vmID}) + logger.Debug("Registering the VM that loaded snapshot with the memory manager") + + if _, ok := m.instances[vmID]; ok { + logger.Error("VM already registered with the memory manager") + return errors.New("VM already registered with the memory manager") + } + + cfg.metricsModeOn = m.MetricsModeOn + state := NewSnapshotState(cfg) + // state := m.instances[originVmID] + + m.origins[vmID] = originVmID + m.instances[vmID] = state return nil } @@ -111,12 +143,13 @@ func (m *MemoryManager) DeregisterVM(vmID string) error { } delete(m.instances, vmID) + delete(m.origins, vmID) return nil } // Activate Creates an epoller to serve page faults for the VM -func (m *MemoryManager) Activate(vmID string) error { +func (m *MemoryManager) Activate(vmID string, conn *net.UnixConn) error { logger := log.WithFields(log.Fields{"vmID": vmID}) logger.Debug("Activating instance in the memory manager") @@ -129,7 +162,10 @@ func (m *MemoryManager) Activate(vmID string) error { m.Lock() + logger.Debug("TEST: Activate: fetch snapstate by vmID for UFFD") + state, ok = m.instances[vmID] + if !ok { m.Unlock() logger.Error("VM not registered with the memory manager") @@ -148,7 +184,7 @@ func (m *MemoryManager) Activate(vmID string) error { return err } - if err := state.getUFFD(); err != nil { + if err := state.getUFFD(conn); err != nil { logger.Error("Failed to get uffd") return err } @@ -180,7 +216,7 @@ func (m *MemoryManager) FetchState(vmID string) error { state, ok = m.instances[vmID] if !ok { m.Unlock() - logger.Error("VM not registered with the memory manager") + logger.Error("TEST(fetch state): VM not registered with the memory manager") return errors.New("VM not registered with the memory manager") } @@ -351,6 +387,42 @@ func (m *MemoryManager) GetUPFLatencyStats(vmID string) ([]*metrics.Metric, erro return state.latencyMetrics, nil } +// Deprecated +// func (m *MemoryManager) GetUPFSockPath(vmID string, isSnapshotReady bool) (string, error) { +// logger := log.WithFields(log.Fields{"vmID": vmID}) + +// logger.Debug("Get the path of firecracker unix domain socket") + +// m.Lock() + +// // id := "" +// // if isSnapshotReady { +// // logger.Debugf("TEST: to find originID by vmID %s", vmID) +// // originID, ok := m.origins[vmID] +// // if !ok { +// // logger.Debug("TEST: not loaded from snapshot") +// // } +// // id = originID +// // } +// // state, ok := m.instances[id] + +// state, ok := m.instances[vmID] +// if !ok { +// m.Unlock() +// logger.Error("VM not registered with the memory manager") +// return "", errors.New("VM not registered with the memory manager") +// } + +// m.Unlock() + +// if state.isActive { +// logger.Error("Cannot get stats while VM is active") +// return "", errors.New("Cannot get stats while VM is active") +// } + +// return m.instances[vmID].SnapshotStateCfg.InstanceSockAddr, nil +// } + func getLazyHeaderStats(state *SnapshotState, functionName string) ([]string, []string) { header := []string{ "FuncName", diff --git a/memory/manager/snapshot_state.go b/memory/manager/snapshot_state.go index 7cce3e2af..a0bb7d076 100644 --- a/memory/manager/snapshot_state.go +++ b/memory/manager/snapshot_state.go @@ -28,8 +28,8 @@ package manager import "C" import ( - "context" "encoding/binary" + "encoding/json" "errors" "fmt" "net" @@ -40,15 +40,23 @@ import ( "syscall" "time" - "github.com/ftrvxmtrx/fd" log "github.com/sirupsen/logrus" "golang.org/x/sys/unix" + "github.com/vhive-serverless/vhive/lg" "github.com/vhive-serverless/vhive/metrics" "unsafe" ) +// TODO: for test logging +type TestPageFault struct { + src uint64 + dst uint64 + mode uint64 + offset uint64 +} + // SnapshotStateCfg Config to initialize SnapshotState type SnapshotStateCfg struct { VMID string @@ -125,36 +133,50 @@ func (s *SnapshotState) setupStateOnActivate() { } } -func (s *SnapshotState) getUFFD() error { - var d net.Dialer - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) - defer cancel() - - for { - c, err := d.DialContext(ctx, "unix", s.InstanceSockAddr) - if err != nil { - if ctx.Err() != nil { - log.Error("Failed to dial within the context timeout") - return err - } - time.Sleep(1 * time.Millisecond) - continue - } +type GuestRegionUffdMapping struct { + BaseHostVirtAddr uint64 `json:"base_host_virt_addr"` + Size uint64 `json:"size"` + Offset uint64 `json:"offset"` + PageSizeKiB uint64 `json:"page_size_kib"` +} - defer c.Close() +func (s *SnapshotState) getUFFD(sendfdConn *net.UnixConn) error { + buff := make([]byte, 256) // set a maximum buffer size + oobBuff := make([]byte, unix.CmsgSpace(4)) - sendfdConn := c.(*net.UnixConn) + n, oobn, _, _, err := sendfdConn.ReadMsgUnix(buff, oobBuff) + if err != nil { + return fmt.Errorf("error reading message: %w", err) + } + buff = buff[:n] - fs, err := fd.Get(sendfdConn, 1, []string{"a file"}) + var fd int + if oobn > 0 { + scms, err := unix.ParseSocketControlMessage(oobBuff[:oobn]) if err != nil { - log.Error("Failed to receive the uffd") - return err + return fmt.Errorf("error parsing socket control message: %w", err) } + for _, scm := range scms { + fds, err := unix.ParseUnixRights(&scm) + if err != nil { + return fmt.Errorf("error parsing unix rights: %w", err) + } + if len(fds) > 0 { + fd = fds[0] // Assuming only one fd is sent. + break + } + } + } + userfaultFD := os.NewFile(uintptr(fd), "userfaultfd") - s.userFaultFD = fs[0] - - return nil + var mapping []GuestRegionUffdMapping + if err := json.Unmarshal(buff, &mapping); err != nil { + return fmt.Errorf("error unmarshaling data: %w", err) } + + s.startAddress = mapping[0].BaseHostVirtAddr + s.userFaultFD = userfaultFD + return nil } func (s *SnapshotState) processMetrics() { @@ -279,7 +301,9 @@ func (s *SnapshotState) pollUserPageFaults(readyCh chan int) { logger.Fatalf("register_epoller: %v", err) } + // TODO: config where the logger stream goes logger.Debug("Starting polling loop") + lg.UniLogger.Println("Starting polling loop") defer syscall.Close(s.epfd) @@ -301,6 +325,7 @@ func (s *SnapshotState) pollUserPageFaults(readyCh chan int) { panic("Wrong number of events") } + logger.Debugf("TEST: epoller found %d event", nevents) for i := 0; i < nevents; i++ { event := events[i] @@ -374,14 +399,20 @@ func (s *SnapshotState) servePageFault(fd int, address uint64) error { workingSetInstalled bool ) + // log.SetOutput(os.Stdout) + // log.SetLevel(log.DebugLevel) + + log.Debugf("TEST: servePageFault(fd: %d, address: %d)", fd, address) + s.firstPageFaultOnce.Do( func() { - s.startAddress = address + log.Debugf("TEST: first page fault address %d", address) if s.isRecordReady && !s.IsLazyMode { if s.metricsModeOn { tStart = time.Now() } + log.Debug("TEST: first page fault once installation") s.installWorkingSetPages(fd) if s.metricsModeOn { s.currentMetric.MetricMap[installWSMetric] = metrics.ToUS(time.Since(tStart)) @@ -401,6 +432,13 @@ func (s *SnapshotState) servePageFault(fd int, address uint64) error { dst := uint64(int64(address) & ^(int64(os.Getpagesize()) - 1)) mode := uint64(0) + testPF := TestPageFault{ + src: src, + dst: dst, + mode: mode, + offset: offset, + } + rec := Record{ offset: offset, } @@ -427,6 +465,7 @@ func (s *SnapshotState) servePageFault(fd int, address uint64) error { tStart = time.Now() } + log.Debugf("TEST: install happen for %v", testPF) err := installRegion(fd, src, dst, mode, 1) if s.metricsModeOn { diff --git a/run.sh b/run.sh new file mode 100644 index 000000000..dbae4c2cb --- /dev/null +++ b/run.sh @@ -0,0 +1,5 @@ +./scripts/clean_fcctr.sh +./scripts/cloudlab/setup_node.sh +go build -race -v -a ./... +make debug > output.log 2>&1 +code output.log diff --git a/scripts/clean_fcctr.sh b/scripts/clean_fcctr.sh index 01cc1f06c..61c30ea52 100755 --- a/scripts/clean_fcctr.sh +++ b/scripts/clean_fcctr.sh @@ -80,5 +80,8 @@ sudo rm /var/lib/cni/networks/fcnet*/19* || echo clean already echo Cleaning snapshots sudo rm -rf /fccd/snapshots/* +echo Cleaning UFFD socket +sudo rm -f /tmp/uffd.sock + echo Creating a fresh devmapper source $DIR/create_devmapper.sh diff --git a/vhive.go b/vhive.go index 829b0eed0..d829f63be 100644 --- a/vhive.go +++ b/vhive.go @@ -91,11 +91,6 @@ func main() { return } - if *isUPFEnabled { - log.Error("User-level page faults are temporarily disabled (gh-807)") - return - } - if *isUPFEnabled && !*isSnapshotsEnabled { log.Error("User-level page faults are not supported without snapshots") return diff --git a/vhive_test.go b/vhive_test.go index 9a7947f84..f76f543b6 100644 --- a/vhive_test.go +++ b/vhive_test.go @@ -26,8 +26,6 @@ import ( "context" "flag" "os" - "strconv" - "sync" "testing" ctrdlog "github.com/containerd/containerd/log" @@ -61,15 +59,10 @@ func TestMain(m *testing.M) { log.SetOutput(os.Stdout) - log.SetLevel(log.InfoLevel) + log.SetLevel(log.DebugLevel) flag.Parse() - if *isUPFEnabledTest { - log.Error("User-level page faults are temporarily disabled (gh-807)") - os.Exit(-1) - } - log.Infof("Orchestrator snapshots enabled: %t", *isSnapshotsEnabledTest) log.Infof("Orchestrator UPF enabled: %t", *isUPFEnabledTest) log.Infof("Orchestrator lazy serving mode enabled: %t", *isLazyModeTest) @@ -99,53 +92,53 @@ func TestMain(m *testing.M) { os.Exit(ret) } -func TestSendToFunctionSerial(t *testing.T) { - fID := "1" - var ( - servedTh uint64 - pinnedFuncNum int - ) - funcPool = NewFuncPool(!isSaveMemoryConst, servedTh, pinnedFuncNum, isTestModeConst) - - for i := 0; i < 2; i++ { - resp, _, err := funcPool.Serve(context.Background(), fID, testImageName, "world") - require.NoError(t, err, "Function returned error") - if i == 0 { - require.Equal(t, resp.IsColdStart, true) - } - - require.Equal(t, resp.Payload, "Hello, world!") - } - - message, err := funcPool.RemoveInstance(fID, testImageName, true) - require.NoError(t, err, "Function returned error, "+message) -} - -func TestSendToFunctionParallel(t *testing.T) { - fID := "2" - var ( - servedTh uint64 - pinnedFuncNum int - ) - funcPool = NewFuncPool(!isSaveMemoryConst, servedTh, pinnedFuncNum, isTestModeConst) - - var vmGroup sync.WaitGroup - for i := 0; i < 100; i++ { - vmGroup.Add(1) - - go func(i int) { - defer vmGroup.Done() - resp, _, err := funcPool.Serve(context.Background(), fID, testImageName, "world") - require.NoError(t, err, "Function returned error") - require.Equal(t, resp.Payload, "Hello, world!") - }(i) - - } - vmGroup.Wait() - - message, err := funcPool.RemoveInstance(fID, testImageName, true) - require.NoError(t, err, "Function returned error, "+message) -} +// func TestSendToFunctionSerial(t *testing.T) { +// fID := "1" +// var ( +// servedTh uint64 +// pinnedFuncNum int +// ) +// funcPool = NewFuncPool(!isSaveMemoryConst, servedTh, pinnedFuncNum, isTestModeConst) + +// for i := 0; i < 2; i++ { +// resp, _, err := funcPool.Serve(context.Background(), fID, testImageName, "world") +// require.NoError(t, err, "Function returned error") +// if i == 0 { +// require.Equal(t, resp.IsColdStart, true) +// } + +// require.Equal(t, resp.Payload, "Hello, world!") +// } + +// message, err := funcPool.RemoveInstance(fID, testImageName, true) +// require.NoError(t, err, "Function returned error, "+message) +// } + +// func TestSendToFunctionParallel(t *testing.T) { +// fID := "2" +// var ( +// servedTh uint64 +// pinnedFuncNum int +// ) +// funcPool = NewFuncPool(!isSaveMemoryConst, servedTh, pinnedFuncNum, isTestModeConst) + +// var vmGroup sync.WaitGroup +// for i := 0; i < 100; i++ { +// vmGroup.Add(1) + +// go func(i int) { +// defer vmGroup.Done() +// resp, _, err := funcPool.Serve(context.Background(), fID, testImageName, "world") +// require.NoError(t, err, "Function returned error") +// require.Equal(t, resp.Payload, "Hello, world!") +// }(i) + +// } +// vmGroup.Wait() + +// message, err := funcPool.RemoveInstance(fID, testImageName, true) +// require.NoError(t, err, "Function returned error, "+message) +// } func TestStartSendStopTwice(t *testing.T) { fID := "3" @@ -172,171 +165,171 @@ func TestStartSendStopTwice(t *testing.T) { require.Equal(t, 2, int(startsGot), "Cold start (starts) stats are wrong") } -func TestStatsNotNumericFunction(t *testing.T) { - fID := "not-cld" - var ( - servedTh uint64 = 1 - pinnedFuncNum int = 2 - ) - funcPool = NewFuncPool(isSaveMemoryConst, servedTh, pinnedFuncNum, isTestModeConst) - - resp, _, err := funcPool.Serve(context.Background(), fID, testImageName, "world") - require.NoError(t, err, "Function returned error") - require.Equal(t, resp.Payload, "Hello, world!") - - message, err := funcPool.RemoveInstance(fID, testImageName, true) - require.NoError(t, err, "Function returned error, "+message) - - servedGot := funcPool.stats.statMap[fID].served - require.Equal(t, 1, int(servedGot), "Cold start (served) stats are wrong") - startsGot := funcPool.stats.statMap[fID].started - require.Equal(t, 1, int(startsGot), "Cold start (starts) stats are wrong") -} - -func TestStatsNotColdFunction(t *testing.T) { - fID := "4" - var ( - servedTh uint64 = 1 - pinnedFuncNum int = 4 - ) - funcPool = NewFuncPool(isSaveMemoryConst, servedTh, pinnedFuncNum, isTestModeConst) - - resp, _, err := funcPool.Serve(context.Background(), fID, testImageName, "world") - require.NoError(t, err, "Function returned error") - require.Equal(t, resp.Payload, "Hello, world!") - - message, err := funcPool.RemoveInstance(fID, testImageName, true) - require.NoError(t, err, "Function returned error, "+message) - - servedGot := funcPool.stats.statMap[fID].served - require.Equal(t, 1, int(servedGot), "Cold start (served) stats are wrong") - startsGot := funcPool.stats.statMap[fID].started - require.Equal(t, 1, int(startsGot), "Cold start (starts) stats are wrong") -} - -func TestSaveMemorySerial(t *testing.T) { - fID := "5" - var ( - servedTh uint64 = 40 - pinnedFuncNum int = 2 - ) - funcPool = NewFuncPool(isSaveMemoryConst, servedTh, pinnedFuncNum, isTestModeConst) - - for i := 0; i < 100; i++ { - resp, _, err := funcPool.Serve(context.Background(), fID, testImageName, "world") - require.NoError(t, err, "Function returned error") - require.Equal(t, resp.Payload, "Hello, world!") - } - - startsGot := funcPool.stats.statMap[fID].started - require.Equal(t, 3, int(startsGot), "Cold start (starts) stats are wrong") - - message, err := funcPool.RemoveInstance(fID, testImageName, true) - require.NoError(t, err, "Function returned error, "+message) -} - -func TestSaveMemoryParallel(t *testing.T) { - fID := "6" - var ( - servedTh uint64 = 40 - pinnedFuncNum int = 2 - ) - funcPool = NewFuncPool(isSaveMemoryConst, servedTh, pinnedFuncNum, isTestModeConst) - - var vmGroup sync.WaitGroup - for i := 0; i < 100; i++ { - vmGroup.Add(1) - - go func(i int) { - defer vmGroup.Done() - - resp, _, err := funcPool.Serve(context.Background(), fID, testImageName, "world") - require.NoError(t, err, "Function returned error") - require.Equal(t, resp.Payload, "Hello, world!") - }(i) - - } - vmGroup.Wait() - - startsGot := funcPool.stats.statMap[fID].started - require.Equal(t, 3, int(startsGot), "Cold start (starts) stats are wrong") - - message, err := funcPool.RemoveInstance(fID, testImageName, true) - require.NoError(t, err, "Function returned error, "+message) -} - -func TestDirectStartStopVM(t *testing.T) { - fID := "7" - var ( - servedTh uint64 - pinnedFuncNum int - ) - funcPool = NewFuncPool(!isSaveMemoryConst, servedTh, pinnedFuncNum, isTestModeConst) - - message, err := funcPool.AddInstance(fID, testImageName) - require.NoError(t, err, "This error should never happen (addInstance())"+message) - - resp, _, err := funcPool.Serve(context.Background(), fID, testImageName, "world") - require.NoError(t, err, "Function returned error") - require.Equal(t, resp.Payload, "Hello, world!") - - message, err = funcPool.RemoveInstance(fID, testImageName, true) - require.NoError(t, err, "Function returned error, "+message) -} - -func TestAllFunctions(t *testing.T) { - - if testing.Short() { - t.Skip("skipping TestAllFunctions in non-nightly runs.") - } - - images := []string{ - "ghcr.io/ease-lab/helloworld:var_workload", - "ghcr.io/ease-lab/chameleon:var_workload", - "ghcr.io/ease-lab/pyaes:var_workload", - "ghcr.io/ease-lab/image_rotate:var_workload", - "ghcr.io/ease-lab/json_serdes:var_workload", - "ghcr.io/ease-lab/lr_serving:var_workload", - "ghcr.io/ease-lab/cnn_serving:var_workload", - "ghcr.io/ease-lab/rnn_serving:var_workload", - "ghcr.io/ease-lab/lr_training:var_workload", - "ghcr.io/ease-lab/springboot:var_workload", - } - var ( - servedTh uint64 - pinnedFuncNum int - ) - funcPool = NewFuncPool(!isSaveMemoryConst, servedTh, pinnedFuncNum, isTestModeConst) - - for i := 0; i < 2; i++ { - var vmGroup sync.WaitGroup - for fID, imageName := range images { - reqs := []string{"world", "record", "replay"} - resps := []string{"world", "record_response", "replay_response"} - for k := 0; k < 3; k++ { - vmGroup.Add(1) - go func(fID int, imageName, request, response string) { - defer vmGroup.Done() - - resp, _, err := funcPool.Serve(context.Background(), strconv.Itoa(8+fID), imageName, request) - require.NoError(t, err, "Function returned error") - - require.Equal(t, resp.Payload, "Hello, "+response+"!") - }(fID, imageName, reqs[k], resps[k]) - } - vmGroup.Wait() - } - } - - var vmGroup sync.WaitGroup - for fID, imageName := range images { - vmGroup.Add(1) - go func(fID int, imageName string) { - defer vmGroup.Done() - - message, err := funcPool.RemoveInstance(strconv.Itoa(8+fID), imageName, true) - require.NoError(t, err, "Function returned error, "+message) - }(fID, imageName) - } - vmGroup.Wait() -} +// func TestStatsNotNumericFunction(t *testing.T) { +// fID := "not-cld" +// var ( +// servedTh uint64 = 1 +// pinnedFuncNum int = 2 +// ) +// funcPool = NewFuncPool(isSaveMemoryConst, servedTh, pinnedFuncNum, isTestModeConst) + +// resp, _, err := funcPool.Serve(context.Background(), fID, testImageName, "world") +// require.NoError(t, err, "Function returned error") +// require.Equal(t, resp.Payload, "Hello, world!") + +// message, err := funcPool.RemoveInstance(fID, testImageName, true) +// require.NoError(t, err, "Function returned error, "+message) + +// servedGot := funcPool.stats.statMap[fID].served +// require.Equal(t, 1, int(servedGot), "Cold start (served) stats are wrong") +// startsGot := funcPool.stats.statMap[fID].started +// require.Equal(t, 1, int(startsGot), "Cold start (starts) stats are wrong") +// } + +// func TestStatsNotColdFunction(t *testing.T) { +// fID := "4" +// var ( +// servedTh uint64 = 1 +// pinnedFuncNum int = 4 +// ) +// funcPool = NewFuncPool(isSaveMemoryConst, servedTh, pinnedFuncNum, isTestModeConst) + +// resp, _, err := funcPool.Serve(context.Background(), fID, testImageName, "world") +// require.NoError(t, err, "Function returned error") +// require.Equal(t, resp.Payload, "Hello, world!") + +// message, err := funcPool.RemoveInstance(fID, testImageName, true) +// require.NoError(t, err, "Function returned error, "+message) + +// servedGot := funcPool.stats.statMap[fID].served +// require.Equal(t, 1, int(servedGot), "Cold start (served) stats are wrong") +// startsGot := funcPool.stats.statMap[fID].started +// require.Equal(t, 1, int(startsGot), "Cold start (starts) stats are wrong") +// } + +// func TestSaveMemorySerial(t *testing.T) { +// fID := "5" +// var ( +// servedTh uint64 = 40 +// pinnedFuncNum int = 2 +// ) +// funcPool = NewFuncPool(isSaveMemoryConst, servedTh, pinnedFuncNum, isTestModeConst) + +// for i := 0; i < 100; i++ { +// resp, _, err := funcPool.Serve(context.Background(), fID, testImageName, "world") +// require.NoError(t, err, "Function returned error") +// require.Equal(t, resp.Payload, "Hello, world!") +// } + +// startsGot := funcPool.stats.statMap[fID].started +// require.Equal(t, 3, int(startsGot), "Cold start (starts) stats are wrong") + +// message, err := funcPool.RemoveInstance(fID, testImageName, true) +// require.NoError(t, err, "Function returned error, "+message) +// } + +// func TestSaveMemoryParallel(t *testing.T) { +// fID := "6" +// var ( +// servedTh uint64 = 40 +// pinnedFuncNum int = 2 +// ) +// funcPool = NewFuncPool(isSaveMemoryConst, servedTh, pinnedFuncNum, isTestModeConst) + +// var vmGroup sync.WaitGroup +// for i := 0; i < 100; i++ { +// vmGroup.Add(1) + +// go func(i int) { +// defer vmGroup.Done() + +// resp, _, err := funcPool.Serve(context.Background(), fID, testImageName, "world") +// require.NoError(t, err, "Function returned error") +// require.Equal(t, resp.Payload, "Hello, world!") +// }(i) + +// } +// vmGroup.Wait() + +// startsGot := funcPool.stats.statMap[fID].started +// require.Equal(t, 3, int(startsGot), "Cold start (starts) stats are wrong") + +// message, err := funcPool.RemoveInstance(fID, testImageName, true) +// require.NoError(t, err, "Function returned error, "+message) +// } + +// func TestDirectStartStopVM(t *testing.T) { +// fID := "7" +// var ( +// servedTh uint64 +// pinnedFuncNum int +// ) +// funcPool = NewFuncPool(!isSaveMemoryConst, servedTh, pinnedFuncNum, isTestModeConst) + +// message, err := funcPool.AddInstance(fID, testImageName) +// require.NoError(t, err, "This error should never happen (addInstance())"+message) + +// resp, _, err := funcPool.Serve(context.Background(), fID, testImageName, "world") +// require.NoError(t, err, "Function returned error") +// require.Equal(t, resp.Payload, "Hello, world!") + +// message, err = funcPool.RemoveInstance(fID, testImageName, true) +// require.NoError(t, err, "Function returned error, "+message) +// } + +// func TestAllFunctions(t *testing.T) { + +// if testing.Short() { +// t.Skip("skipping TestAllFunctions in non-nightly runs.") +// } + +// images := []string{ +// "ghcr.io/ease-lab/helloworld:var_workload", +// "ghcr.io/ease-lab/chameleon:var_workload", +// "ghcr.io/ease-lab/pyaes:var_workload", +// "ghcr.io/ease-lab/image_rotate:var_workload", +// "ghcr.io/ease-lab/json_serdes:var_workload", +// "ghcr.io/ease-lab/lr_serving:var_workload", +// "ghcr.io/ease-lab/cnn_serving:var_workload", +// "ghcr.io/ease-lab/rnn_serving:var_workload", +// "ghcr.io/ease-lab/lr_training:var_workload", +// "ghcr.io/ease-lab/springboot:var_workload", +// } +// var ( +// servedTh uint64 +// pinnedFuncNum int +// ) +// funcPool = NewFuncPool(!isSaveMemoryConst, servedTh, pinnedFuncNum, isTestModeConst) + +// for i := 0; i < 2; i++ { +// var vmGroup sync.WaitGroup +// for fID, imageName := range images { +// reqs := []string{"world", "record", "replay"} +// resps := []string{"world", "record_response", "replay_response"} +// for k := 0; k < 3; k++ { +// vmGroup.Add(1) +// go func(fID int, imageName, request, response string) { +// defer vmGroup.Done() + +// resp, _, err := funcPool.Serve(context.Background(), strconv.Itoa(8+fID), imageName, request) +// require.NoError(t, err, "Function returned error") + +// require.Equal(t, resp.Payload, "Hello, "+response+"!") +// }(fID, imageName, reqs[k], resps[k]) +// } +// vmGroup.Wait() +// } +// } + +// var vmGroup sync.WaitGroup +// for fID, imageName := range images { +// vmGroup.Add(1) +// go func(fID int, imageName string) { +// defer vmGroup.Done() + +// message, err := funcPool.RemoveInstance(strconv.Itoa(8+fID), imageName, true) +// require.NoError(t, err, "Function returned error, "+message) +// }(fID, imageName) +// } +// vmGroup.Wait() +// }