Skip to content

Commit

Permalink
Set VM metadata outside of VM creation
Browse files Browse the repository at this point in the history
This commit handles some small cleanup items to make things more
consistent.
  • Loading branch information
kthomas committed Feb 23, 2024
1 parent 6c862b5 commit 7a6e9b3
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 36 deletions.
13 changes: 10 additions & 3 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,20 +136,27 @@ func (a *Agent) Start() {

// Request a handshake with the host indicating the agent is "all the way" up
// NOTE: the agent process will request a VM shutdown if this fails
func (a *Agent) RequestHandshake() error {
func (a *Agent) requestHandshake() error {
msg := agentapi.HandshakeRequest{
MachineID: a.md.VmID,
StartTime: a.started,
Message: a.md.Message,
}
raw, _ := json.Marshal(msg)

_, err := a.nc.Request(agentapi.NexAgentSubjectHandshake, raw, time.Millisecond*defaultAgentHandshakeTimeoutMillis)
resp, err := a.nc.Request(agentapi.NexAgentSubjectHandshake, raw, time.Millisecond*defaultAgentHandshakeTimeoutMillis)
if err != nil {
a.LogError(fmt.Sprintf("Agent failed to request initial sync message: %s", err))
return err
}

var handshakeResponse *agentapi.HandshakeResponse
err = json.Unmarshal(resp.Data, &handshakeResponse)
if err != nil {
a.LogError(fmt.Sprintf("Failed to parse handshake response: %s", err))
return err
}

a.LogInfo("Agent is up")
return nil
}
Expand Down Expand Up @@ -304,7 +311,7 @@ func (a *Agent) handleHealthz(w http.ResponseWriter, req *http.Request) {
}

func (a *Agent) init() error {
err := a.RequestHandshake()
err := a.requestHandshake()
if err != nil {
a.LogError(fmt.Sprintf("Failed to handshake with node: %s", err))
return err
Expand Down
3 changes: 3 additions & 0 deletions internal/agent-api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ type HandshakeRequest struct {
Message *string `json:"message,omitempty"`
}

type HandshakeResponse struct {
}

type HostServicesKeyValueRequest struct {
Key *string `json:"key"`
Value *json.RawMessage `json:"value,omitempty"`
Expand Down
59 changes: 41 additions & 18 deletions internal/node/machine_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,12 @@ func (m *MachineManager) Start() {
continue
}

err = m.setMetadata(vm)
if err != nil {
m.log.Warn("Failed to set metadata on VM for warming pool.", slog.Any("err", err))
continue
}

go m.awaitHandshake(vm.vmmID)

m.allVMs[vm.vmmID] = vm
Expand Down Expand Up @@ -276,6 +282,15 @@ func (m *MachineManager) StopMachine(vmID string, undeploy bool) error {

m.log.Debug("Attempting to stop virtual machine", slog.String("vmid", vmID), slog.Bool("undeploy", undeploy))

for _, sub := range m.vmsubz[vmID] {
err := sub.Drain()
if err != nil {
m.log.Warn(fmt.Sprintf("failed to drain subscription to subject %s associated with vm %s: %s", sub.Subject, vmID, err.Error()))
}

m.log.Debug(fmt.Sprintf("drained subscription to subject %s associated with vm %s", sub.Subject, vmID))
}

if vm.deployRequest != nil && undeploy {
// we do a request here to allow graceful shutdown of the workload being undeployed
subject := fmt.Sprintf("agentint.%s.undeploy", vm.vmmID)
Expand All @@ -286,15 +301,6 @@ func (m *MachineManager) StopMachine(vmID string, undeploy bool) error {
}
}

for _, sub := range m.vmsubz[vmID] {
err := sub.Drain()
if err != nil {
m.log.Warn(fmt.Sprintf("failed to drain subscription to subject %s associated with vm %s: %s", sub.Subject, vmID, err.Error()))
}

m.log.Debug(fmt.Sprintf("drained subscription to subject %s associated with vm %s", sub.Subject, vmID))
}

vm.shutdown()
delete(m.allVMs, vmID)
delete(m.stopMutex, vmID)
Expand Down Expand Up @@ -335,8 +341,6 @@ func (m *MachineManager) awaitHandshake(vmid string) {
if time.Now().UTC().After(timeoutAt) {
m.log.Error("Did not receive NATS handshake from agent within timeout.", slog.String("vmid", vmid))
return
// _ = m.Stop()
// FIXME!!! os.Exit(1) // FIXME
}

_, handshakeOk = m.handshakes[vmid]
Expand Down Expand Up @@ -474,21 +478,31 @@ func (m *MachineManager) handleAgentEvent(msg *nats.Msg) {
// This handshake uses the request pattern to force a full round trip to ensure connectivity is working properly as
// fire-and-forget publishes from inside the firecracker VM could potentially be lost
func (m *MachineManager) handleHandshake(msg *nats.Msg) {
var shake agentapi.HandshakeRequest
err := json.Unmarshal(msg.Data, &shake)
var req agentapi.HandshakeRequest
err := json.Unmarshal(msg.Data, &req)
if err != nil {
m.log.Error("Failed to handle agent handshake", slog.String("vmid", *shake.MachineID), slog.String("message", *shake.Message))
m.log.Error("Failed to handle agent handshake", slog.String("vmid", *req.MachineID), slog.String("message", *req.Message))
return
}

now := time.Now().UTC()
m.handshakes[*shake.MachineID] = now.Format(time.RFC3339)
m.log.Info("Received agent handshake", slog.String("vmid", *req.MachineID), slog.String("message", *req.Message))

m.log.Info("Received agent handshake", slog.String("vmid", *shake.MachineID), slog.String("message", *shake.Message))
err = msg.Respond([]byte("OK"))
_, ok := m.allVMs[*req.MachineID]
if !ok {
m.log.Warn("Received agent handshake attempt from a VM we don't know about.")
return
}

resp, _ := json.Marshal(&agentapi.HandshakeResponse{})

err = msg.Respond(resp)
if err != nil {
m.log.Error("Failed to reply to agent handshake", slog.Any("err", err))
return
}

now := time.Now().UTC()
m.handshakes[*req.MachineID] = now.Format(time.RFC3339)
}

func (m *MachineManager) resetCNI() error {
Expand Down Expand Up @@ -723,6 +737,15 @@ func (m *MachineManager) publishMachineStopped(vm *runningFirecracker) error {
return nil
}

func (m *MachineManager) setMetadata(vm *runningFirecracker) error {
return vm.setMetadata(&agentapi.MachineMetadata{
Message: agentapi.StringOrNil("Host-supplied metadata"),
NodeNatsHost: vm.config.InternalNodeHost,
NodeNatsPort: vm.config.InternalNodePort,
VmID: &vm.vmmID,
})
}

func (m *MachineManager) stopping() bool {
return (atomic.LoadUint32(&m.closing) > 0)
}
Expand Down
29 changes: 14 additions & 15 deletions internal/node/running_vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type runningFirecracker struct {
vmmID string

closing uint32
config *NodeConfiguration
deployRequest *agentapi.DeployRequest
ip net.IP
log *slog.Logger
Expand All @@ -42,6 +43,16 @@ func (vm *runningFirecracker) isEssential() bool {
return vm.deployRequest != nil && vm.deployRequest.Essential != nil && *vm.deployRequest.Essential
}

func (vm *runningFirecracker) setMetadata(metadata *agentapi.MachineMetadata) error {
err := vm.machine.SetMetadata(vm.vmmCtx, metadata)
if err != nil {
vm.vmmCancel()
return fmt.Errorf("failed to set machine metadata: %s", err)
}

return nil
}

func (vm *runningFirecracker) shutdown() {
if atomic.AddUint32(&vm.closing, 1) == 1 {
vm.log.Info("Machine stopping",
Expand Down Expand Up @@ -137,24 +148,11 @@ func createAndStartVM(ctx context.Context, config *NodeConfiguration, log *slog.
return nil, fmt.Errorf("failed creating machine: %s", err)
}

md := agentapi.MachineMetadata{
VmID: &vmmID,
NodeNatsHost: config.InternalNodeHost,
NodeNatsPort: config.InternalNodePort,
Message: agentapi.StringOrNil("Host-supplied metadata"),
}

if err := m.Start(vmmCtx); err != nil {
vmmCancel()
return nil, fmt.Errorf("failed to start machine: %v", err)
}

err = m.SetMetadata(vmmCtx, md)
if err != nil {
vmmCancel()
return nil, fmt.Errorf("failed to set machine metadata: %s", err)
}

gw := m.Cfg.NetworkInterfaces[0].StaticConfiguration.IPConfiguration.Gateway
ip := m.Cfg.NetworkInterfaces[0].StaticConfiguration.IPConfiguration.IPAddr.IP
hosttap := m.Cfg.NetworkInterfaces[0].StaticConfiguration.HostDevName
Expand All @@ -166,11 +164,12 @@ func createAndStartVM(ctx context.Context, config *NodeConfiguration, log *slog.
slog.Any("gateway", gw),
slog.String("netmask", mask.String()),
slog.String("hosttap", hosttap),
slog.String("nats_host", *md.NodeNatsHost),
slog.Int("nats_port", *md.NodeNatsPort),
slog.String("nats_host", *config.InternalNodeHost),
slog.Int("nats_port", *config.InternalNodePort),
)

return &runningFirecracker{
config: config,
ip: ip,
log: log,
machine: m,
Expand Down

0 comments on commit 7a6e9b3

Please sign in to comment.