From 7a6e9b3a5d68199fddc85c8c2b880adc94b7412c Mon Sep 17 00:00:00 2001 From: kt Date: Fri, 23 Feb 2024 11:21:08 -0500 Subject: [PATCH] Set VM metadata outside of VM creation This commit handles some small cleanup items to make things more consistent. --- agent/agent.go | 13 ++++++-- internal/agent-api/types.go | 3 ++ internal/node/machine_mgr.go | 59 +++++++++++++++++++++++++----------- internal/node/running_vm.go | 29 +++++++++--------- 4 files changed, 68 insertions(+), 36 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index 77edb27d..543649f4 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -136,7 +136,7 @@ func (a *Agent) Start() { // Request a handshake with the host indicating the agent is "all the way" up // NOTE: the agent process will request a VM shutdown if this fails -func (a *Agent) RequestHandshake() error { +func (a *Agent) requestHandshake() error { msg := agentapi.HandshakeRequest{ MachineID: a.md.VmID, StartTime: a.started, @@ -144,12 +144,19 @@ func (a *Agent) RequestHandshake() error { } raw, _ := json.Marshal(msg) - _, err := a.nc.Request(agentapi.NexAgentSubjectHandshake, raw, time.Millisecond*defaultAgentHandshakeTimeoutMillis) + resp, err := a.nc.Request(agentapi.NexAgentSubjectHandshake, raw, time.Millisecond*defaultAgentHandshakeTimeoutMillis) if err != nil { a.LogError(fmt.Sprintf("Agent failed to request initial sync message: %s", err)) return err } + var handshakeResponse *agentapi.HandshakeResponse + err = json.Unmarshal(resp.Data, &handshakeResponse) + if err != nil { + a.LogError(fmt.Sprintf("Failed to parse handshake response: %s", err)) + return err + } + a.LogInfo("Agent is up") return nil } @@ -304,7 +311,7 @@ func (a *Agent) handleHealthz(w http.ResponseWriter, req *http.Request) { } func (a *Agent) init() error { - err := a.RequestHandshake() + err := a.requestHandshake() if err != nil { a.LogError(fmt.Sprintf("Failed to handshake with node: %s", err)) return err diff --git a/internal/agent-api/types.go b/internal/agent-api/types.go index 78a183ff..6a54242a 100644 --- a/internal/agent-api/types.go +++ b/internal/agent-api/types.go @@ -141,6 +141,9 @@ type HandshakeRequest struct { Message *string `json:"message,omitempty"` } +type HandshakeResponse struct { +} + type HostServicesKeyValueRequest struct { Key *string `json:"key"` Value *json.RawMessage `json:"value,omitempty"` diff --git a/internal/node/machine_mgr.go b/internal/node/machine_mgr.go index 9620d931..ce652e75 100644 --- a/internal/node/machine_mgr.go +++ b/internal/node/machine_mgr.go @@ -157,6 +157,12 @@ func (m *MachineManager) Start() { continue } + err = m.setMetadata(vm) + if err != nil { + m.log.Warn("Failed to set metadata on VM for warming pool.", slog.Any("err", err)) + continue + } + go m.awaitHandshake(vm.vmmID) m.allVMs[vm.vmmID] = vm @@ -276,6 +282,15 @@ func (m *MachineManager) StopMachine(vmID string, undeploy bool) error { m.log.Debug("Attempting to stop virtual machine", slog.String("vmid", vmID), slog.Bool("undeploy", undeploy)) + for _, sub := range m.vmsubz[vmID] { + err := sub.Drain() + if err != nil { + m.log.Warn(fmt.Sprintf("failed to drain subscription to subject %s associated with vm %s: %s", sub.Subject, vmID, err.Error())) + } + + m.log.Debug(fmt.Sprintf("drained subscription to subject %s associated with vm %s", sub.Subject, vmID)) + } + if vm.deployRequest != nil && undeploy { // we do a request here to allow graceful shutdown of the workload being undeployed subject := fmt.Sprintf("agentint.%s.undeploy", vm.vmmID) @@ -286,15 +301,6 @@ func (m *MachineManager) StopMachine(vmID string, undeploy bool) error { } } - for _, sub := range m.vmsubz[vmID] { - err := sub.Drain() - if err != nil { - m.log.Warn(fmt.Sprintf("failed to drain subscription to subject %s associated with vm %s: %s", sub.Subject, vmID, err.Error())) - } - - m.log.Debug(fmt.Sprintf("drained subscription to subject %s associated with vm %s", sub.Subject, vmID)) - } - vm.shutdown() delete(m.allVMs, vmID) delete(m.stopMutex, vmID) @@ -335,8 +341,6 @@ func (m *MachineManager) awaitHandshake(vmid string) { if time.Now().UTC().After(timeoutAt) { m.log.Error("Did not receive NATS handshake from agent within timeout.", slog.String("vmid", vmid)) return - // _ = m.Stop() - // FIXME!!! os.Exit(1) // FIXME } _, handshakeOk = m.handshakes[vmid] @@ -474,21 +478,31 @@ func (m *MachineManager) handleAgentEvent(msg *nats.Msg) { // This handshake uses the request pattern to force a full round trip to ensure connectivity is working properly as // fire-and-forget publishes from inside the firecracker VM could potentially be lost func (m *MachineManager) handleHandshake(msg *nats.Msg) { - var shake agentapi.HandshakeRequest - err := json.Unmarshal(msg.Data, &shake) + var req agentapi.HandshakeRequest + err := json.Unmarshal(msg.Data, &req) if err != nil { - m.log.Error("Failed to handle agent handshake", slog.String("vmid", *shake.MachineID), slog.String("message", *shake.Message)) + m.log.Error("Failed to handle agent handshake", slog.String("vmid", *req.MachineID), slog.String("message", *req.Message)) return } - now := time.Now().UTC() - m.handshakes[*shake.MachineID] = now.Format(time.RFC3339) + m.log.Info("Received agent handshake", slog.String("vmid", *req.MachineID), slog.String("message", *req.Message)) - m.log.Info("Received agent handshake", slog.String("vmid", *shake.MachineID), slog.String("message", *shake.Message)) - err = msg.Respond([]byte("OK")) + _, ok := m.allVMs[*req.MachineID] + if !ok { + m.log.Warn("Received agent handshake attempt from a VM we don't know about.") + return + } + + resp, _ := json.Marshal(&agentapi.HandshakeResponse{}) + + err = msg.Respond(resp) if err != nil { m.log.Error("Failed to reply to agent handshake", slog.Any("err", err)) + return } + + now := time.Now().UTC() + m.handshakes[*req.MachineID] = now.Format(time.RFC3339) } func (m *MachineManager) resetCNI() error { @@ -723,6 +737,15 @@ func (m *MachineManager) publishMachineStopped(vm *runningFirecracker) error { return nil } +func (m *MachineManager) setMetadata(vm *runningFirecracker) error { + return vm.setMetadata(&agentapi.MachineMetadata{ + Message: agentapi.StringOrNil("Host-supplied metadata"), + NodeNatsHost: vm.config.InternalNodeHost, + NodeNatsPort: vm.config.InternalNodePort, + VmID: &vm.vmmID, + }) +} + func (m *MachineManager) stopping() bool { return (atomic.LoadUint32(&m.closing) > 0) } diff --git a/internal/node/running_vm.go b/internal/node/running_vm.go index 07d5db43..d875ad79 100644 --- a/internal/node/running_vm.go +++ b/internal/node/running_vm.go @@ -29,6 +29,7 @@ type runningFirecracker struct { vmmID string closing uint32 + config *NodeConfiguration deployRequest *agentapi.DeployRequest ip net.IP log *slog.Logger @@ -42,6 +43,16 @@ func (vm *runningFirecracker) isEssential() bool { return vm.deployRequest != nil && vm.deployRequest.Essential != nil && *vm.deployRequest.Essential } +func (vm *runningFirecracker) setMetadata(metadata *agentapi.MachineMetadata) error { + err := vm.machine.SetMetadata(vm.vmmCtx, metadata) + if err != nil { + vm.vmmCancel() + return fmt.Errorf("failed to set machine metadata: %s", err) + } + + return nil +} + func (vm *runningFirecracker) shutdown() { if atomic.AddUint32(&vm.closing, 1) == 1 { vm.log.Info("Machine stopping", @@ -137,24 +148,11 @@ func createAndStartVM(ctx context.Context, config *NodeConfiguration, log *slog. return nil, fmt.Errorf("failed creating machine: %s", err) } - md := agentapi.MachineMetadata{ - VmID: &vmmID, - NodeNatsHost: config.InternalNodeHost, - NodeNatsPort: config.InternalNodePort, - Message: agentapi.StringOrNil("Host-supplied metadata"), - } - if err := m.Start(vmmCtx); err != nil { vmmCancel() return nil, fmt.Errorf("failed to start machine: %v", err) } - err = m.SetMetadata(vmmCtx, md) - if err != nil { - vmmCancel() - return nil, fmt.Errorf("failed to set machine metadata: %s", err) - } - gw := m.Cfg.NetworkInterfaces[0].StaticConfiguration.IPConfiguration.Gateway ip := m.Cfg.NetworkInterfaces[0].StaticConfiguration.IPConfiguration.IPAddr.IP hosttap := m.Cfg.NetworkInterfaces[0].StaticConfiguration.HostDevName @@ -166,11 +164,12 @@ func createAndStartVM(ctx context.Context, config *NodeConfiguration, log *slog. slog.Any("gateway", gw), slog.String("netmask", mask.String()), slog.String("hosttap", hosttap), - slog.String("nats_host", *md.NodeNatsHost), - slog.Int("nats_port", *md.NodeNatsPort), + slog.String("nats_host", *config.InternalNodeHost), + slog.Int("nats_port", *config.InternalNodePort), ) return &runningFirecracker{ + config: config, ip: ip, log: log, machine: m,