From 6838aecca686af1087cde0d2b8edda32d6077023 Mon Sep 17 00:00:00 2001 From: Fred Rolland Date: Tue, 10 Oct 2023 17:15:28 +0300 Subject: [PATCH] Add events for config daemon K8s Events on SriovNetworkNodeStates objects will be created on following stages: - Status change of SriovNetworkNodeStates - Start of config daemon - Node reboot started - Node drain started/finished Signed-off-by: Fred Rolland --- cmd/sriov-network-config-daemon/start.go | 8 +++- pkg/daemon/daemon.go | 8 ++++ pkg/daemon/daemon_test.go | 3 ++ pkg/daemon/event_recorder.go | 52 ++++++++++++++++++++++++ pkg/daemon/writer.go | 24 +++++++++-- 5 files changed, 91 insertions(+), 4 deletions(-) create mode 100644 pkg/daemon/event_recorder.go diff --git a/cmd/sriov-network-config-daemon/start.go b/cmd/sriov-network-config-daemon/start.go index b9ee61ac5..52e4536fa 100644 --- a/cmd/sriov-network-config-daemon/start.go +++ b/cmd/sriov-network-config-daemon/start.go @@ -159,8 +159,11 @@ func runStartCmd(cmd *cobra.Command, args []string) { glog.V(0).Info("dev mode enabled") } + eventRecorder := daemon.NewEventRecorder(writerclient, startOpts.nodeName, kubeclient) + defer eventRecorder.Shutdown() + glog.V(0).Info("starting node writer") - nodeWriter := daemon.NewNodeStateStatusWriter(writerclient, startOpts.nodeName, closeAllConns, devMode) + nodeWriter := daemon.NewNodeStateStatusWriter(writerclient, startOpts.nodeName, closeAllConns, eventRecorder, devMode) destdir := os.Getenv("DEST_DIR") if destdir == "" { @@ -187,6 +190,8 @@ func runStartCmd(cmd *cobra.Command, args []string) { panic(err.Error()) } + eventRecorder.SendEvent("ConfigDaemonStart", "Config Daemon starting") + // block the deamon process until nodeWriter finish first its run err = nodeWriter.RunOnce(destdir, platformType) if err != nil { @@ -207,6 +212,7 @@ func runStartCmd(cmd *cobra.Command, args []string) { refreshCh, platformType, startOpts.systemd, + eventRecorder, devMode, ).Run(stopCh, exitCh) if err != nil { diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index 5c9f6f7bd..5feb37e12 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -112,6 +112,8 @@ type Daemon struct { storeManager utils.StoreManagerInterface hostManager host.HostManagerInterface + + eventRecorder *EventRecorder } const ( @@ -149,6 +151,7 @@ func New( refreshCh chan<- Message, platformType utils.PlatformType, useSystemdService bool, + er *EventRecorder, devMode bool, ) *Daemon { return &Daemon{ @@ -186,6 +189,7 @@ func New( workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewMaxOfRateLimiter( &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(updateDelay), 1)}, workqueue.NewItemExponentialFailureRateLimiter(1*time.Second, maxUpdateBackoff)), "SriovNetworkNodeState"), + eventRecorder: er, } } @@ -656,6 +660,7 @@ func (dn *Daemon) nodeStateSyncHandler() error { if reqReboot { glog.Info("nodeStateSyncHandler(): reboot node") + dn.eventRecorder.SendEvent("RebootNode", "Reboot node has been initiated") rebootNode() return nil } @@ -1030,6 +1035,7 @@ func (dn *Daemon) drainNode() error { var lastErr error glog.Info("drainNode(): Start draining") + dn.eventRecorder.SendEvent("DrainNode", "Drain node has been initiated") if err = wait.ExponentialBackoff(backoff, func() (bool, error) { err := drain.RunCordonOrUncordon(dn.drainer, dn.node, true) if err != nil { @@ -1048,9 +1054,11 @@ func (dn *Daemon) drainNode() error { if err == wait.ErrWaitTimeout { glog.Errorf("drainNode(): failed to drain node (%d tries): %v :%v", backoff.Steps, err, lastErr) } + dn.eventRecorder.SendEvent("DrainNode", "Drain node failed") glog.Errorf("drainNode(): failed to drain node: %v", err) return err } + dn.eventRecorder.SendEvent("DrainNode", "Drain node completed") glog.Info("drainNode(): drain complete") return nil } diff --git a/pkg/daemon/daemon_test.go b/pkg/daemon/daemon_test.go index 973882c4a..5300a1a65 100644 --- a/pkg/daemon/daemon_test.go +++ b/pkg/daemon/daemon_test.go @@ -104,6 +104,8 @@ var _ = Describe("Config Daemon", func() { err = sriovnetworkv1.InitNicIDMapFromConfigMap(kubeClient, namespace) Expect(err).ToNot(HaveOccurred()) + er := NewEventRecorder(client, "test-node", kubeClient) + sut = New("test-node", client, kubeClient, @@ -114,6 +116,7 @@ var _ = Describe("Config Daemon", func() { refreshCh, utils.Baremetal, false, + er, false, ) diff --git a/pkg/daemon/event_recorder.go b/pkg/daemon/event_recorder.go new file mode 100644 index 000000000..9f3b42cfb --- /dev/null +++ b/pkg/daemon/event_recorder.go @@ -0,0 +1,52 @@ +package daemon + +import ( + "context" + + "github.com/golang/glog" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + typedv1core "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/record" + + snclientset "github.com/k8snetworkplumbingwg/sriov-network-operator/pkg/client/clientset/versioned" +) + +type EventRecorder struct { + client snclientset.Interface + node string + eventRecorder record.EventRecorder + eventBroadcaster record.EventBroadcaster +} + +// NewEventRecorder Create a new EventRecorder +func NewEventRecorder(c snclientset.Interface, n string, kubeclient kubernetes.Interface) *EventRecorder { + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartStructuredLogging(4) + eventBroadcaster.StartRecordingToSink(&typedv1core.EventSinkImpl{Interface: kubeclient.CoreV1().Events("")}) + eventRecorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "config-daemon"}) + return &EventRecorder{ + client: c, + node: n, + eventRecorder: eventRecorder, + eventBroadcaster: eventBroadcaster, + } +} + +// SendEvent Send an Event on the NodeState object +func (e *EventRecorder) SendEvent(eventType string, msg string) { + nodeState, err := e.client.SriovnetworkV1().SriovNetworkNodeStates(namespace).Get(context.Background(), e.node, metav1.GetOptions{}) + if err != nil { + glog.Warningf("SendEvent(): Failed to fetch node state %s (%v); skip SendEvent", e.node, err) + return + } + e.eventRecorder.Event(nodeState, corev1.EventTypeNormal, eventType, msg) +} + +// Shutdown Close the EventBroadcaster +func (e *EventRecorder) Shutdown() { + e.eventBroadcaster.Shutdown() +} diff --git a/pkg/daemon/writer.go b/pkg/daemon/writer.go index 89abd8137..96ea2558e 100644 --- a/pkg/daemon/writer.go +++ b/pkg/daemon/writer.go @@ -21,6 +21,7 @@ import ( const ( CheckpointFileName = "sno-initial-node-state.json" + Unknown = "Unknown" ) type NodeStateStatusWriter struct { @@ -31,14 +32,16 @@ type NodeStateStatusWriter struct { openStackDevicesInfo utils.OSPDevicesInfo withUnsupportedDevices bool storeManager utils.StoreManagerInterface + eventRecorder *EventRecorder } // NewNodeStateStatusWriter Create a new NodeStateStatusWriter -func NewNodeStateStatusWriter(c snclientset.Interface, n string, f func(), devMode bool) *NodeStateStatusWriter { +func NewNodeStateStatusWriter(c snclientset.Interface, n string, f func(), er *EventRecorder, devMode bool) *NodeStateStatusWriter { return &NodeStateStatusWriter{ client: c, node: n, OnHeartbeatFailure: f, + eventRecorder: er, withUnsupportedDevices: devMode, } } @@ -170,9 +173,24 @@ func (w *NodeStateStatusWriter) setNodeStateStatus(msg Message) (*sriovnetworkv1 // clear lastSyncError when sync Succeeded nodeState.Status.LastSyncError = msg.lastSyncError } - nodeState.Status.SyncStatus = msg.syncStatus - + oldStatus := nodeState.Status.SyncStatus + newStatus := msg.syncStatus + nodeState.Status.SyncStatus = newStatus glog.V(0).Infof("setNodeStateStatus(): syncStatus: %s, lastSyncError: %s", nodeState.Status.SyncStatus, nodeState.Status.LastSyncError) + + if oldStatus != newStatus { + if oldStatus == "" { + oldStatus = Unknown + } + if newStatus == "" { + newStatus = Unknown + } + eventMsg := fmt.Sprintf("Status changed from: %s to: %s", oldStatus, newStatus) + if nodeState.Status.LastSyncError != "" { + eventMsg = fmt.Sprintf("%s. Last Error: %s", eventMsg, nodeState.Status.LastSyncError) + } + w.eventRecorder.SendEvent("SyncStatusChanged", eventMsg) + } }) if err != nil { return nil, err