Skip to content

Commit

Permalink
Change logic flow of loadsnapshot
Browse files Browse the repository at this point in the history
Signed-off-by: char-1ee <[email protected]>
  • Loading branch information
char-1ee committed Mar 4, 2024
1 parent e0bef9a commit d3c914e
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 75 deletions.
3 changes: 2 additions & 1 deletion configs/firecracker-containerd/firecracker-runtime.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@
"kernel_args": "console=ttyS0 noapic reboot=k panic=1 pci=off nomodules ro systemd.journald.forward_to_console systemd.unit=firecracker.target init=/sbin/overlay-init",
"root_drive": "/var/lib/firecracker-containerd/runtime/default-rootfs.img",
"cpu_template": "T2",
"log_levels": ["info"]
"log_fifo": "/tmp/fc-logs.fifo",
"log_levels": ["debug"]
}
108 changes: 78 additions & 30 deletions ctriface/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"time"

"github.com/vhive-serverless/vhive/snapshotting"
"golang.org/x/sys/unix"

log "github.com/sirupsen/logrus"

Expand Down Expand Up @@ -65,6 +66,13 @@ type StartVMResponse struct {
GuestIP string
}

type GuestRegionUffdMapping struct {
BaseHostVirtAddr uint64 `json:"base_host_virt_addr"`
Size uint64 `json:"size"`
Offset uint64 `json:"offset"`
PageSizeKiB uint64 `json:"page_size_kib"`
}

const (
testImageName = "ghcr.io/ease-lab/helloworld:var_workload"
fileBackend = "File"
Expand Down Expand Up @@ -506,9 +514,6 @@ func (o *Orchestrator) LoadSnapshot(ctx context.Context, originVmID string, vmID
BackendPath: snap.GetMemFilePath(),
}

var sendfdConn *net.UnixConn
uffdListenerCh := make(chan struct{}, 1)

if o.GetUPFEnabled() {
logger.Debug("TEST: UPF is enabled")
conf.MemBackend.BackendType = uffdBackend
Expand All @@ -522,31 +527,9 @@ func (o *Orchestrator) LoadSnapshot(ctx context.Context, originVmID string, vmID
return nil, nil, err
}

logger.Debug("TEST: start listening to uffd socket")
if _, err := os.Stat(conf.MemBackend.BackendPath); err == nil {
os.Remove(conf.MemBackend.BackendPath)
}

go func() {
listener, err := net.Listen("unix", conf.MemBackend.BackendPath)
if err != nil {
logger.Error("failed to listen to uffd socket")
return
}
defer listener.Close()

logger.Debug("Listening ...")
conn, err := listener.Accept()
if err != nil {
logger.Error("failed to accept connection to uffd socket")
return
}
// ===========================

sendfdConn, _ = conn.(*net.UnixConn)
close(uffdListenerCh)
}()

time.Sleep(10 * time.Second) // TODO: sleep for 10 seconds to wait for the uffd socket to be ready
// ===========================
}

tStart = time.Now()
Expand Down Expand Up @@ -590,11 +573,76 @@ func (o *Orchestrator) LoadSnapshot(ctx context.Context, originVmID string, vmID
<-loadDone

if o.GetUPFEnabled() {
logger.Debug("TEST: start listening to uffd socket")
if _, err := os.Stat(conf.MemBackend.BackendPath); err == nil {
os.Remove(conf.MemBackend.BackendPath)
}

logger.Debug("TEST: Registering VM with snap with the memory manager")
var sendfdConn *net.UnixConn
uffdListenerCh := make(chan struct{}, 1)
var userfaultFD *os.File
var baseHostVirtAddr uint64

go func() {
defer close(uffdListenerCh)
listener, err := net.Listen("unix", conf.MemBackend.BackendPath) // TODO: find a better Listener API
if err != nil {
logger.Error("failed to listen to uffd socket")
return
}
defer listener.Close()

logger.Debug("Listening ...")
conn, err := listener.Accept() // blocking
if err != nil {
logger.Error("failed to accept connection to uffd socket")
return
}

sendfdConn, _ = conn.(*net.UnixConn)

// Parse mapping and fd
buff := make([]byte, 256) // set a maximum buffer size
oobBuff := make([]byte, unix.CmsgSpace(4))
n, oobn, _, _, err := sendfdConn.ReadMsgUnix(buff, oobBuff)
if err != nil {
logger.Error("failed to reading from uffd socket")
return
}
buff = buff[:n]

var fd int
if oobn > 0 {
scms, err := unix.ParseSocketControlMessage(oobBuff[:oobn])
if err != nil {
logger.Error("failed to parse socket control message")
return
}
for _, scm := range scms {
fds, err := unix.ParseUnixRights(&scm)
if err != nil {
logger.Error("failed to parse unix rights")
return
}
if len(fds) > 0 {
fd = fds[0] // Assuming only one fd is sent.
break
}
}
}
userfaultFD = os.NewFile(uintptr(fd), "userfaultfd")

var mapping []GuestRegionUffdMapping
if err := json.Unmarshal(buff, &mapping); err != nil {
logger.Error("failed to unmarshal data")
return
}
baseHostVirtAddr = mapping[0].BaseHostVirtAddr
}()

<-uffdListenerCh

logger.Debug("TEST: Registering VM with snap with the memory manager")
stateCfg := manager.SnapshotStateCfg{
VMID: vmID,
GuestMemPath: o.getMemoryFile(vmID),
Expand All @@ -609,8 +657,8 @@ func (o *Orchestrator) LoadSnapshot(ctx context.Context, originVmID string, vmID
logger.Error(err, "failed to register new VM with memory manager")
}

logger.Debug("TEST: activate VM in mm")
if activateErr = o.memoryManager.Activate(originVmID, sendfdConn); activateErr != nil {
logger.Debug("TEST: activate VM in mm") // TODO: pass too many params in Activate
if activateErr = o.memoryManager.Activate(originVmID, userfaultFD, baseHostVirtAddr); activateErr != nil {
logger.Warn("Failed to activate VM in the memory manager", activateErr)
}
}
Expand Down
14 changes: 8 additions & 6 deletions memory/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"encoding/csv"
"errors"
"fmt"
"net"
"os"
"strconv"
"sync"
Expand Down Expand Up @@ -149,7 +148,7 @@ func (m *MemoryManager) DeregisterVM(vmID string) error {
}

// Activate Creates an epoller to serve page faults for the VM
func (m *MemoryManager) Activate(vmID string, conn *net.UnixConn) error {
func (m *MemoryManager) Activate(vmID string, uffd *os.File, baseHostVirtAddr uint64) error {
logger := log.WithFields(log.Fields{"vmID": vmID})

logger.Debug("Activating instance in the memory manager")
Expand Down Expand Up @@ -184,10 +183,13 @@ func (m *MemoryManager) Activate(vmID string, conn *net.UnixConn) error {
return err
}

if err := state.getUFFD(conn); err != nil {
logger.Error("Failed to get uffd")
return err
}
// if err := state.getUFFD(); err != nil {
// logger.Error("Failed to get uffd")
// return err
// }

state.startAddress = baseHostVirtAddr
state.userFaultFD = uffd

state.setupStateOnActivate()

Expand Down
74 changes: 36 additions & 38 deletions memory/manager/snapshot_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,8 @@ import "C"

import (
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"net"
"os"
"path/filepath"
"sort"
Expand Down Expand Up @@ -140,42 +138,42 @@ type GuestRegionUffdMapping struct {
PageSizeKiB uint64 `json:"page_size_kib"`
}

func (s *SnapshotState) getUFFD(sendfdConn *net.UnixConn) error {
buff := make([]byte, 256) // set a maximum buffer size
oobBuff := make([]byte, unix.CmsgSpace(4))

n, oobn, _, _, err := sendfdConn.ReadMsgUnix(buff, oobBuff)
if err != nil {
return fmt.Errorf("error reading message: %w", err)
}
buff = buff[:n]

var fd int
if oobn > 0 {
scms, err := unix.ParseSocketControlMessage(oobBuff[:oobn])
if err != nil {
return fmt.Errorf("error parsing socket control message: %w", err)
}
for _, scm := range scms {
fds, err := unix.ParseUnixRights(&scm)
if err != nil {
return fmt.Errorf("error parsing unix rights: %w", err)
}
if len(fds) > 0 {
fd = fds[0] // Assuming only one fd is sent.
break
}
}
}
userfaultFD := os.NewFile(uintptr(fd), "userfaultfd")

var mapping []GuestRegionUffdMapping
if err := json.Unmarshal(buff, &mapping); err != nil {
return fmt.Errorf("error unmarshaling data: %w", err)
}

s.startAddress = mapping[0].BaseHostVirtAddr
s.userFaultFD = userfaultFD
func (s *SnapshotState) getUFFD() error {
// buff := make([]byte, 256) // set a maximum buffer size
// oobBuff := make([]byte, unix.CmsgSpace(4))

// n, oobn, _, _, err := sendfdConn.ReadMsgUnix(buff, oobBuff)
// if err != nil {
// return fmt.Errorf("error reading message: %w", err)
// }
// buff = buff[:n]

// var fd int
// if oobn > 0 {
// scms, err := unix.ParseSocketControlMessage(oobBuff[:oobn])
// if err != nil {
// return fmt.Errorf("error parsing socket control message: %w", err)
// }
// for _, scm := range scms {
// fds, err := unix.ParseUnixRights(&scm)
// if err != nil {
// return fmt.Errorf("error parsing unix rights: %w", err)
// }
// if len(fds) > 0 {
// fd = fds[0] // Assuming only one fd is sent.
// break
// }
// }
// }
// userfaultFD := os.NewFile(uintptr(fd), "userfaultfd")

// var mapping []GuestRegionUffdMapping
// if err := json.Unmarshal(buff, &mapping); err != nil {
// return fmt.Errorf("error unmarshaling data: %w", err)
// }

// s.startAddress = baseHostVirtAddr
// s.userFaultFD = userfaultFD
return nil
}

Expand Down

0 comments on commit d3c914e

Please sign in to comment.