Skip to content

Commit

Permalink
Test
Browse files Browse the repository at this point in the history
  • Loading branch information
char-1ee committed Feb 28, 2024
1 parent e8092e4 commit e78c5fd
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 107 deletions.
47 changes: 46 additions & 1 deletion ctriface/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ package ctriface
import (
"context"
"encoding/json"
"net"
"os"
"os/exec"
"path/filepath"
Expand Down Expand Up @@ -505,6 +506,9 @@ func (o *Orchestrator) LoadSnapshot(ctx context.Context, originVmID string, vmID
BackendPath: snap.GetMemFilePath(),
}

var sendfdConn *net.UnixConn
uffdListenerCh := make(chan struct{}, 1)

if o.GetUPFEnabled() {
logger.Debug("TEST: UPF is enabled")
conf.MemBackend.BackendType = uffdBackend
Expand All @@ -517,6 +521,44 @@ func (o *Orchestrator) LoadSnapshot(ctx context.Context, originVmID string, vmID
if err := o.memoryManager.FetchState(originVmID); err != nil {
return nil, nil, err
}

logger.Debug("TEST: start listening to uffd socket")
if _, err := os.Stat(conf.MemBackend.BackendPath); err == nil {
os.Remove(conf.MemBackend.BackendPath)
}

// connChan := make(chan *net.UnixConn, 1)
errChan := make(chan error, 1)
go func() {
listener, err := net.Listen("unix", conf.MemBackend.BackendPath)
if err != nil {
errChan <- errors.Wrapf(err, "failed to listen to uffd socket")
return
// return nil, nil, errors.Wrapf(err, "failed to listen to uffd socket")
}
defer listener.Close()

logger.Debug("Listening ...")
conn, err := listener.Accept()
if err != nil {
errChan <- errors.Wrapf(err, "failed to accept connection")
return
// return nil, nil, errors.Wrapf(err, "failed to accept connection")
}

sendfdConn, _ = conn.(*net.UnixConn)
close(uffdListenerCh)

// connChan <- sendfdConn
}()

// select {
// case sendfdConn = <-connChan:
// logger.Debug("Connection accepted and type-asserted to *net.UnixConn")
// case err := <-errChan:
// logger.Errorf("Error occurred: %v\n", err)
// }
time.Sleep(10 * time.Second)
}

tStart = time.Now()
Expand Down Expand Up @@ -556,12 +598,15 @@ func (o *Orchestrator) LoadSnapshot(ctx context.Context, originVmID string, vmID
}()

logger.Debug("TEST: CreatVM request sent")

<-loadDone

if o.GetUPFEnabled() {

logger.Debug("TEST: Registering VM with snap with the memory manager")

<-uffdListenerCh

stateCfg := manager.SnapshotStateCfg{
VMID: vmID,
GuestMemPath: o.getMemoryFile(vmID),
Expand All @@ -577,7 +622,7 @@ func (o *Orchestrator) LoadSnapshot(ctx context.Context, originVmID string, vmID
}

logger.Debug("TEST: activate VM in mm")
if activateErr = o.memoryManager.Activate(vmID); activateErr != nil {
if activateErr = o.memoryManager.Activate(vmID, sendfdConn); activateErr != nil {
logger.Warn("Failed to activate VM in the memory manager", activateErr)
}
}
Expand Down
15 changes: 8 additions & 7 deletions ctriface/iface_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ import (
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
"github.com/vhive-serverless/vhive/snapshotting"

"github.com/vhive-serverless/vhive/lg"
)

// TODO: Make it impossible to use lazy mode without UPF
Expand Down Expand Up @@ -82,38 +80,41 @@ func TestStartSnapStop(t *testing.T) {

vmID := "2"

log.Debug("STEP: StartVM")
_, _, err := orch.StartVM(ctx, vmID, testImageName)
require.NoError(t, err, "Failed to start VM")

log.Debug("STEP: PauseVM")
err = orch.PauseVM(ctx, vmID)
require.NoError(t, err, "Failed to pause VM")

log.Debug("STEP: NewSnapshot and CreateSnapshot")
snap := snapshotting.NewSnapshot(vmID, "/fccd/snapshots", testImageName)
err = orch.CreateSnapshot(ctx, vmID, snap)
require.NoError(t, err, "Failed to create snapshot of VM")

// _, err = orch.ResumeVM(ctx, vmID)
// require.NoError(t, err, "Failed to resume VM after created snapshot")

log.Debug("STEP: StopSingleVM")
err = orch.StopSingleVM(ctx, vmID)
require.NoError(t, err, "Failed to stop VM")

originVmID := vmID
vmID = "3"

log.Debug("STEP: LoadSnapshot")
_, _, err = orch.LoadSnapshot(ctx, originVmID, vmID, snap)
require.NoError(t, err, "Failed to load snapshot of VM")

log.Debug("TEST: LoadSnapshot completed")
lg.UniLogger.Println("This is a test")
log.Debug("STEP: ResumeVM")
_, err = orch.ResumeVM(ctx, vmID)
require.NoError(t, err, "Failed to resume VM")

time.Sleep(30 * time.Second)

log.Debug("STEP: StopeSingleVM")
err = orch.StopSingleVM(ctx, vmID)
require.NoError(t, err, "Failed to stop VM")

log.Debug("STEP: Cleanup")
_ = snap.Cleanup()
orch.Cleanup()
}
Expand Down
15 changes: 9 additions & 6 deletions ctriface/orch.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,18 +123,21 @@ func NewOrchestrator(snapshotter, hostIface string, opts ...OrchestratorOption)
}

if o.GetUPFEnabled() {
file, err := os.Create(o.uffdSockAddr)
if err != nil {
log.Fatalf("Failed to create socket file: %v", err)
}
defer file.Close()
// file, err := os.Create(o.uffdSockAddr)
// if err != nil {
// log.Fatalf("Failed to create socket file: %v", err)
// }
// defer file.Close()
// lg.UniLogger.Println("TEST: created the uffd sock addr")

managerCfg := manager.MemoryManagerCfg{
MetricsModeOn: o.isMetricsMode,
UffdSockAddr: o.uffdSockAddr,
}
o.memoryManager = manager.NewMemoryManager(managerCfg)
go o.memoryManager.ListenUffdSocket(o.uffdSockAddr)

// lg.UniLogger.Println("TEST: created a new memory manager. Start listen uffd socket")
// go o.memoryManager.ListenUffdSocket(o.uffdSockAddr)
}

log.Info("Creating containerd client")
Expand Down
2 changes: 1 addition & 1 deletion lg/uni_logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
var UniLogger *log.Logger

func init() {
file, err := os.OpenFile("output.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
file, err := os.OpenFile("uni_output.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
if err != nil {
log.Fatalln("Failed to open log file:", err)
}
Expand Down
79 changes: 12 additions & 67 deletions memory/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (m *MemoryManager) DeregisterVM(vmID string) error {
}

// Activate Creates an epoller to serve page faults for the VM
func (m *MemoryManager) Activate(vmID string) error {
func (m *MemoryManager) Activate(vmID string, conn *net.UnixConn) error {
logger := log.WithFields(log.Fields{"vmID": vmID})

logger.Debug("Activating instance in the memory manager")
Expand All @@ -164,14 +164,6 @@ func (m *MemoryManager) Activate(vmID string) error {

logger.Debug("TEST: Activate: fetch snapstate by vmID for UFFD")

// originID, ok := m.origins[vmID]

// if !ok {
// logger.Debug("TEST: not loaded from snapshot")
// }

// state, ok = m.instances[originID]

state, ok = m.instances[vmID]

if !ok {
Expand All @@ -187,29 +179,21 @@ func (m *MemoryManager) Activate(vmID string) error {
return errors.New("VM already active")
}

select {
case <-m.startEpollingCh:
if err := state.mapGuestMemory(); err != nil {
logger.Error("Failed to map guest memory")
return err
}

if err := state.getUFFD(); err != nil {
logger.Error("Failed to get uffd")
return err
}
if err := state.mapGuestMemory(); err != nil {
logger.Error("Failed to map guest memory")
return err
}

state.setupStateOnActivate()
if err := state.getUFFD(conn); err != nil {
logger.Error("Failed to get uffd")
return err
}

go state.pollUserPageFaults(readyCh)
state.setupStateOnActivate()

<-readyCh
go state.pollUserPageFaults(readyCh)

case <-time.After(100 * time.Second):
return errors.New("Uffd connection to firecracker timeout")
default:
return errors.New("Failed to start epoller")
}
<-readyCh

return nil
}
Expand All @@ -229,12 +213,6 @@ func (m *MemoryManager) FetchState(vmID string) error {

m.Lock()

// originID, ok := m.origins[vmID]
// if !ok {
// logger.Debug("TEST: not loaded from snapshot")
// }
// state, ok = m.instances[originID]

state, ok = m.instances[vmID]
if !ok {
m.Unlock()
Expand Down Expand Up @@ -409,39 +387,6 @@ func (m *MemoryManager) GetUPFLatencyStats(vmID string) ([]*metrics.Metric, erro
return state.latencyMetrics, nil
}

func (m *MemoryManager) ListenUffdSocket(uffdSockAddr string) error {
log.Debug("Start listening to uffd socket")

m.startEpollingOnce.Do(func() {
m.startEpollingCh = make(chan struct{})
})

ln, err := net.Listen("unix", uffdSockAddr)
if err != nil {
log.Errorf("Failed to listen on uffd socket: %v", err)
return errors.New("Failed to listen on uffd socket")
}
defer ln.Close()

for {
conn, err := ln.Accept()
if err != nil {
log.Printf("Failed to accept connection on uffd socket: %v", err)
continue
}
go func(conn net.Conn) {
defer conn.Close()
if err := ln.Close(); err != nil {
log.Printf("Failed to close uffd socket listener: %v", err)
}
close(m.startEpollingCh)
}(conn)
break
}

return nil
}

// Deprecated
// func (m *MemoryManager) GetUPFSockPath(vmID string, isSnapshotReady bool) (string, error) {
// logger := log.WithFields(log.Fields{"vmID": vmID})
Expand Down
57 changes: 32 additions & 25 deletions memory/manager/snapshot_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ package manager
import "C"

Check failure on line 28 in memory/manager/snapshot_state.go

View workflow job for this annotation

GitHub Actions / Build and check code quality (1.19)

could not import C (cgo preprocessing failed) (typecheck)

Check failure on line 28 in memory/manager/snapshot_state.go

View workflow job for this annotation

GitHub Actions / Build and check code quality (1.18)

could not import C (cgo preprocessing failed) (typecheck)

import (
"context"
"encoding/binary"
"errors"
"fmt"
Expand Down Expand Up @@ -134,37 +133,45 @@ func (s *SnapshotState) setupStateOnActivate() {
}
}

func (s *SnapshotState) getUFFD() error {
var d net.Dialer
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
func (s *SnapshotState) getUFFD(sendfdConn *net.UnixConn) error {
// var d net.Dialer
// ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
// defer cancel()

for {
c, err := d.DialContext(ctx, "unix", s.InstanceSockAddr)
if err != nil {
if ctx.Err() != nil {
log.Error("Failed to dial within the context timeout")
return err
}
time.Sleep(1 * time.Millisecond)
continue
}
log.Debugf("TEST: Dial uffd socket done: %s", s.InstanceSockAddr)
// for {
// c, err := d.DialContext(ctx, "unix", s.InstanceSockAddr)
// if err != nil {
// if ctx.Err() != nil {
// log.Error("Failed to dial within the context timeout")
// return err
// }
// time.Sleep(1 * time.Millisecond)
// continue
// }

defer c.Close()
// defer c.Close()

sendfdConn := c.(*net.UnixConn)
// sendfdConn := c.(*net.UnixConn)

fs, err := fd.Get(sendfdConn, 1, []string{"a file"})
if err != nil {
log.Error("Failed to receive the uffd")
return err
}
// fs, err := fd.Get(sendfdConn, 1, []string{"a file"})
// if err != nil {
// log.Error("Failed to receive the uffd")
// return err
// }

s.userFaultFD = fs[0]
// s.userFaultFD = fs[0]

return nil
// return nil
// }

fs, err := fd.Get(sendfdConn, 1, []string{"a file"})
if err != nil {
log.Error("Failed to receive the uffd")
return err
}

s.userFaultFD = fs[0]
return nil
}

func (s *SnapshotState) processMetrics() {
Expand Down
3 changes: 3 additions & 0 deletions scripts/clean_fcctr.sh
Original file line number Diff line number Diff line change
Expand Up @@ -80,5 +80,8 @@ sudo rm /var/lib/cni/networks/fcnet*/19* || echo clean already
echo Cleaning snapshots
sudo rm -rf /fccd/snapshots/*

echo Cleaning UFFD socket
sudo rm -f /tmp/uffd.sock

echo Creating a fresh devmapper
source $DIR/create_devmapper.sh

0 comments on commit e78c5fd

Please sign in to comment.