diff --git a/redfish-exporter/.env b/redfish-exporter/.env index 355b003..e3be935 100644 --- a/redfish-exporter/.env +++ b/redfish-exporter/.env @@ -1,18 +1,20 @@ UPDATED="2024-09-24" DESCRIPTION="Redfish Event Listener/Exporter" -LISTENER_IP="0.0.0.0" -LISTENER_PORT="8080" +LISTENER_IP="10.11.18.55" +LISTENER_PORT="9003" METRICS_PORT="2112" USE_SSL="false" CERTFILE="path/to/certfile" KEYFILE="path/to/keyfile" -SLURM_USER="slurm user here" -SLURM_TOKEN="token string here, from secret when for real" -SLURM_CONTROL_NODE="slurm control node IP:Port" +SLURM_CONTROL_NODE="10.235.34.47" +SLURM_DRAIN_EXCLUDE_REASON_LIST="AMD|Pensando|RebootNeeded" +SLURM_SCONTROL_PATH="/usr/bin/scontrol" +TLS_TIMEOUT="15" TRIGGER_EVENTS="[\ -{\"Severity\":\"Fatal\",\"Action\":\"DrainNode\"},\ -{\"Severity\":\"Critical\",\"Action\":\"DrainNode\"} +{\"Severity\":\"Critical\",\"Message\":\"Image 'UBB_FPGA' is being verified at 'ERoT'|This is an e2e critical test event\",\"Action\":\"DrainNode\", \"DrainReasonPrefix\":\"RebootNeeded\"},\ +{\"Severity\":\"Info\",\"Message\":\"Image 'UBB_FPGA' is being verified at 'ERoT'\",\"Action\":\"DrainNode\", \"DrainReasonPrefix\":\"RebootNotNeeded\"},\ +{\"Severity\":\"Warning\",\"Message\":\"Image 'UBB_FPGA' is being verified at 'ERoT'|This is an e2e test event message\",\"Action\":\"DrainNode\", \"DrainReasonPrefix\":\"RebootNotNeeded\"} ]" # Subscription (v1.5+) @@ -28,11 +30,11 @@ TRIGGER_EVENTS="[\ # Deprecated \",\"password\":\"\"}" \ No newline at end of file diff --git a/redfish-exporter/config.go b/redfish-exporter/config.go index 7e941d3..b5bfb27 100644 --- a/redfish-exporter/config.go +++ b/redfish-exporter/config.go @@ -19,12 +19,15 @@ package main import ( "crypto/tls" "encoding/json" + "fmt" "log" + "net" "os" "strconv" "strings" "github.com/joho/godotenv" + "gopkg.in/yaml.v3" ) const ( @@ -32,6 +35,7 @@ const ( DefaultMetricsPort = "2112" DefaultUseSSL = "false" DefaultSeverityConfig = "Fatal,Critical,Informational" + NodeDrainPolicyFile = "nodeDrainPolicy.json" ) type Config struct { @@ -49,28 +53,56 @@ type Config struct { CertFile string KeyFile string } - SlurmToken string - SlurmControlNode string - SlurmUser string - SubscriptionPayload SubscriptionPayload - RedfishServers []RedfishServer - TriggerEvents []TriggerEvent - PrometheusConfig PrometheusConfig - context *tls.Config - eventCount int - dataBuffer []byte + SlurmToken string + SlurmControlNode string + SlurmUser string + SlurmScontrolPath string + SlurmDrainExcludeStr string + SubscriptionPayload SubscriptionPayload + RedfishServers []RedfishServer + TriggerEvents map[string]map[string][]EventInfo //map[Severity][MessageRegistry.MessageId][]EventInfo + PrometheusConfig PrometheusConfig + context *tls.Config + eventCount int + dataBuffer []byte + TlsTimeOut string +} + +type EventInfo struct { + UniqueString string + Category string + Subcategory string + DrainReasonPrefix string } type TriggerEvent struct { - Severity string `json:"Severity"` - Action string `json:"Action"` + Severity string `json:"Severity"` + Action string `json:"Action"` + Message string `json:"Message"` + DrainReasonPrefix string `json:"DrainReasonPrefix"` +} + +type TriggerEventsInfo struct { + Category string `json:"Category"` + Subcategory string `json:"Subcategory"` + MessageRegistry string `json:"MessageRegistry"` + MessageId string `json:"MessageId"` + UniqueString string `json:"UniqueString"` + Severity string `json:"Severity"` + DrainReasonPrefix string `json:"DrainReasonPrefix"` + Enable bool `json:"Enable"` } type PrometheusConfig struct { Severity []string `json:"Severity"` } -func setupConfig() Config { +type target struct { + Targets []string `yaml:"targets"` + Labels map[string]string `yaml:"labels"` +} + +func setupConfig(targetFile string) Config { // Load .env file err := godotenv.Load() if err != nil { @@ -119,20 +151,15 @@ func setupConfig() Config { AppConfig.SlurmToken = os.Getenv("SLURM_TOKEN") AppConfig.SlurmControlNode = os.Getenv("SLURM_CONTROL_NODE") AppConfig.SlurmUser = os.Getenv("SLURM_USER") + AppConfig.SlurmDrainExcludeStr = os.Getenv("SLURM_DRAIN_EXCLUDE_REASON_LIST") + AppConfig.SlurmScontrolPath = os.Getenv("SLURM_SCONTROL_PATH") + AppConfig.TlsTimeOut = os.Getenv("TLS_TIMEOUT") subscriptionPayloadJSON := os.Getenv("SUBSCRIPTION_PAYLOAD") if err := json.Unmarshal([]byte(subscriptionPayloadJSON), &AppConfig.SubscriptionPayload); err != nil { log.Fatalf("Failed to parse SUBSCRIPTION_PAYLOAD: %v", err) } - triggerEventsJSON := os.Getenv("TRIGGER_EVENTS") - if triggerEventsJSON != "" { - err = json.Unmarshal([]byte(triggerEventsJSON), &AppConfig.TriggerEvents) - if err != nil { - log.Fatalf("Failed to unmarshal TRIGGER_EVENTS: %v", err) - } - } - prometheusConfigJSON := os.Getenv("PROMETHEUS_CONFIG") if prometheusConfigJSON != "" { err = json.Unmarshal([]byte(prometheusConfigJSON), &AppConfig.PrometheusConfig) @@ -148,10 +175,113 @@ func setupConfig() Config { redfishServersJSON := os.Getenv("REDFISH_SERVERS") if redfishServersJSON == "" { log.Println("REDFISH_SERVERS environment variable is not set or is empty") + } else { + if err := json.Unmarshal([]byte(redfishServersJSON), &AppConfig.RedfishServers); err != nil { + log.Fatalf("Failed to parse REDFISH_SERVERS: %v", err) + } + } + + // Read the node drain policy config file + nodeDrainPolicyConfig, err := os.ReadFile(NodeDrainPolicyFile) + + if err != nil { + log.Fatalf("Failed to read: %v", NodeDrainPolicyFile) + } + + triggerEventsInfo := []TriggerEventsInfo{} + err = json.Unmarshal(nodeDrainPolicyConfig, &triggerEventsInfo) + if err != nil { + log.Fatalf("Failed to unmarshal file: %v | err: %v", NodeDrainPolicyFile, err) + } + + tInfoMap := map[string]map[string][]EventInfo{} + + for _, evt := range triggerEventsInfo { + fmt.Printf("Trigger Event: %+v\n", evt) + if evt.Enable != true { + continue + } + eInfo := EventInfo{} + eInfo.Category = evt.Category + eInfo.Subcategory = evt.Subcategory + eInfo.DrainReasonPrefix = evt.DrainReasonPrefix + eInfo.UniqueString = evt.UniqueString + key := "" + if evt.MessageRegistry == "" { + key = evt.MessageId + } else { + key = evt.MessageRegistry + "." + evt.MessageId + } + if ee, ok := tInfoMap[evt.Severity]; !ok { + eInfoMap := map[string][]EventInfo{} + eInfoMap[key] = []EventInfo{eInfo} + tInfoMap[evt.Severity] = eInfoMap + } else { + ee[key] = append(ee[key], eInfo) + } + } + + AppConfig.TriggerEvents = tInfoMap + + for kk, tt := range AppConfig.TriggerEvents { + fmt.Println("Severity: ", kk) + for kkk, ttt := range tt { + fmt.Println("key: ", kkk) + fmt.Printf("event: %+v\n", ttt) + } + } + + // Read and parse the REDFISH_SERVERS_COMMON_CONFIG environment variable + redfishServersCommonConfigJSON := os.Getenv("REDFISH_SERVERS_COMMON_CONFIG") + if redfishServersCommonConfigJSON == "" { + log.Println("redfishServersCommonConfigJSON environment variable is not set or is empty") return AppConfig } - if err := json.Unmarshal([]byte(redfishServersJSON), &AppConfig.RedfishServers); err != nil { - log.Fatalf("Failed to parse REDFISH_SERVERS: %v", err) + redfishServersCommonConfig := RedfishServersCommongConfig{} + if err := json.Unmarshal([]byte(redfishServersCommonConfigJSON), &redfishServersCommonConfig); err != nil { + log.Fatalf("Failed to parse REDFISH_SERVERS_COMMON_CONFIG: %v", err) + } + + if targetFile == "" { + log.Println("No target file provided") + return AppConfig + } + + targetYamlFile, err := os.ReadFile(targetFile) + + if err != nil { + log.Fatalf("Failed to read file: %v", targetFile) + } + + targets := []target{} + + err = yaml.Unmarshal(targetYamlFile, &targets) + + if err != nil { + log.Fatalf("Error parsing target file: %v | err: %v", targetFile, err) + } + + for _, t := range targets { + log.Println("target: ", t.Targets) + + for _, hostName := range t.Targets { + // add this target to Redfish servers + server := RedfishServer{} + bmcHost := fmt.Sprintf(hostName+".%v", redfishServersCommonConfig.HostSuffix) + ips, err := net.LookupIP(bmcHost) + if err != nil || len(ips) == 0 { + log.Printf("[error] Couldn't get the IP for host: %v | ips: %v | err: %v", bmcHost, ips, err) + continue + } + log.Println("IPs: ", ips) + + server.IP = fmt.Sprintf("https://%v", ips[0]) + server.LoginType = "Session" + server.Username = redfishServersCommonConfig.UserName + server.Password = redfishServersCommonConfig.Password + server.SlurmNode = hostName + AppConfig.RedfishServers = append(AppConfig.RedfishServers, server) + } } return AppConfig diff --git a/redfish-exporter/go.mod b/redfish-exporter/go.mod index 8c8370f..400dd6f 100644 --- a/redfish-exporter/go.mod +++ b/redfish-exporter/go.mod @@ -9,6 +9,7 @@ require ( github.com/nod-ai/ADA/redfish-exporter v0.0.0-20241002210630-2ef2d1070d90 github.com/prometheus/client_golang v1.20.4 github.com/stmcginnis/gofish v0.19.0 + gopkg.in/yaml.v3 v3.0.1 ) require ( diff --git a/redfish-exporter/go.sum b/redfish-exporter/go.sum index 0b9f2ce..b729e1a 100644 --- a/redfish-exporter/go.sum +++ b/redfish-exporter/go.sum @@ -26,3 +26,6 @@ golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI= golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/redfish-exporter/listener.go b/redfish-exporter/listener.go index c952682..e984d38 100644 --- a/redfish-exporter/listener.go +++ b/redfish-exporter/listener.go @@ -171,6 +171,37 @@ func (s *Server) handleConnection(AppConfig Config, conn net.Conn) { } } +func getDrainReasonPrefix(info EventInfo) string { + return info.DrainReasonPrefix + ": " + info.Category + ": " + info.Subcategory +} + +func isTriggerEvent(evt Event, config Config) (bool, string) { + tInfoMap := config.TriggerEvents + + if eInfoMap, ok := tInfoMap[evt.Severity]; !ok { + return false, "" + } else { + if eInfo, ok1 := eInfoMap[evt.MessageId]; !ok1 { + return false, "" + } else { + if len(eInfo) == 1 { + return true, getDrainReasonPrefix(eInfo[0]) + } else { + for _, info := range eInfo { + strs := strings.Split(info.UniqueString, "|") + for _, str := range strs { + if strings.Contains(evt.Message, str) == true { + return true, getDrainReasonPrefix(info) + } + } + + } + } + } + } + return false, "" +} + func (s *Server) processRequest(AppConfig Config, conn net.Conn, req *http.Request, eventCount *int, dataBuffer *[]byte) error { // Extract method, headers, and payload method := req.Method @@ -217,19 +248,26 @@ func (s *Server) processRequest(AppConfig Config, conn net.Conn, req *http.Reque log.Printf("Message ID: %s", messageId) log.Printf("Message Args: %v", messageArgs) log.Printf("Origin Of Condition: %s", originOfCondition) - for _, triggerEvent := range AppConfig.TriggerEvents { - if severity == triggerEvent.Severity { - log.Printf("Matched Trigger Event: %s with action %s", triggerEvent.Severity, triggerEvent.Action) - // Sending event belongs to redfish_utils. Each server may have different slurm node associated, and redfish_servers has the info/map. - if s.slurmQueue != nil { - redfishServerInfo := getServerInfoByIP(AppConfig.RedfishServers, ip) - if len(strings.TrimSpace(redfishServerInfo.SlurmNode)) == 0 { - log.Printf("failed to get the slurm node name, cannot perform action: %v", triggerEvent.Action) - break - } - s.slurmQueue.Add(redfishServerInfo.IP, redfishServerInfo.SlurmNode, triggerEvent.Severity, triggerEvent.Action) + + trigger, drainReason := isTriggerEvent(event, AppConfig) + if trigger == true { + log.Printf("Matched Trigger Event: %s | messageId: %s | message: %s", event.Severity, event.MessageId, event.Message) + // Sending event belongs to redfish_utils. Each server may have different slurm node associated, and redfish_servers has the info/map. + if s.slurmQueue != nil { + redfishServerInfo := getServerInfoByIP(AppConfig.RedfishServers, ip) + if len(strings.TrimSpace(redfishServerInfo.SlurmNode)) == 0 { + log.Println("failed to get the slurm node name, cannot perform drain action") + continue } - break + evt := slurm.AddEventReq{ + RedfishServerIP: redfishServerInfo.IP, + SlurmNodeName: redfishServerInfo.SlurmNode, + Severity: event.Severity, + DrainReason: drainReason, + ExcludeStr: AppConfig.SlurmDrainExcludeStr, + ScontrolPath: AppConfig.SlurmScontrolPath, + } + s.slurmQueue.Add(evt) } } } diff --git a/redfish-exporter/main.go b/redfish-exporter/main.go index 22704e7..cea722c 100644 --- a/redfish-exporter/main.go +++ b/redfish-exporter/main.go @@ -26,6 +26,7 @@ import ( "os/signal" "strconv" "strings" + "sync" "syscall" "time" @@ -36,15 +37,18 @@ import ( func main() { var ( - enableSlurm = flag.Bool("enable-slurm", false, "Enable slurm") + targetFile string + subscriptionMapLock sync.Mutex // to guard access to the map ) + + flag.StringVar(&targetFile, "target", "", "Path to the target file for host/slurm node names") flag.Parse() log.SetFlags(log.LstdFlags | log.Lshortfile) log.Println("Starting Redfish Event Listener/Exporter") // Setup configuration - AppConfig := setupConfig() + AppConfig := setupConfig(targetFile) // Log the initialized config log.Printf("Initialized Config: %+v", AppConfig) @@ -52,24 +56,14 @@ func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() var slurmQueue *slurm.SlurmQueue - if *enableSlurm { - if len(strings.TrimSpace(AppConfig.SlurmToken)) == 0 { - log.Fatalf("Provide slurm token to enable slurm") - } - if len(strings.TrimSpace(AppConfig.SlurmControlNode)) == 0 { - log.Fatalf("Provide slurm control node IP:Port to enable slurm") - } - _, err := slurm.NewClient(AppConfig.SlurmControlNode, AppConfig.SlurmUser, AppConfig.SlurmToken) - if err != nil { - log.Fatalf("failed to create slurm client, err: %+v", err) - } - slurmQueue = slurm.InitSlurmQueue(ctx) - go slurmQueue.ProcessEventActionQueue() - } + slurmQueue = slurm.InitSlurmQueue(ctx) + go slurmQueue.ProcessEventActionQueue() + + subscriptionMap := make(map[string]string) // Subscribe the listener to the event stream for all servers - subscriptionMap, err := CreateSubscriptionsForAllServers(AppConfig.RedfishServers, AppConfig.SubscriptionPayload) + err := CreateSubscriptionsForAllServers(AppConfig.RedfishServers, AppConfig.SubscriptionPayload, subscriptionMap, &subscriptionMapLock, AppConfig.TlsTimeOut) if err != nil { log.Fatal(err) } @@ -110,7 +104,9 @@ func main() { time.Sleep(time.Second) // Unsubscribe the listener from all servers + subscriptionMapLock.Lock() DeleteSubscriptionsFromAllServers(AppConfig.RedfishServers, subscriptionMap) + subscriptionMapLock.Unlock() cancel() diff --git a/redfish-exporter/redfish_utils.go b/redfish-exporter/redfish_utils.go index 8d70b86..ffefab7 100644 --- a/redfish-exporter/redfish_utils.go +++ b/redfish-exporter/redfish_utils.go @@ -19,12 +19,18 @@ package main import ( "fmt" "log" + "strconv" "sync" + "time" "github.com/stmcginnis/gofish" "github.com/stmcginnis/gofish/redfish" ) +const ( + PeriodicRetryTime = 30 +) + type RedfishServer struct { IP string `json:"ip"` Username string `json:"username"` @@ -33,6 +39,12 @@ type RedfishServer struct { SlurmNode string `json:"slurmNode"` } +type RedfishServersCommongConfig struct { + HostSuffix string `json:"hostSuffix"` + UserName string `json:"username"` + Password string `json:"password"` +} + type SubscriptionPayload struct { Destination string `json:"Destination,omitempty"` EventTypes []redfish.EventType `json:"EventTypes,omitempty"` @@ -45,13 +57,27 @@ type SubscriptionPayload struct { Context string `json:"Context,omitempty"` } +type RedfishSubsciptionFailedData struct { + server RedfishServer + payload SubscriptionPayload +} + // Create a new connection to a redfish server -func getRedfishClient(server RedfishServer) (*gofish.APIClient, error) { +func getRedfishClient(server RedfishServer, tlsTimeout string) (*gofish.APIClient, error) { + timeOut := 0 + if tlsTimeout != "" { + t, err := strconv.Atoi(tlsTimeout) + if err == nil { + timeOut = t + } + } + clientConfig := gofish.ClientConfig{ - Endpoint: server.IP, - Username: server.Username, - Password: server.Password, - Insecure: true, // TODO Set Based on login type + Endpoint: server.IP, + Username: server.Username, + Password: server.Password, + Insecure: true, // TODO Set Based on login type + TLSHandshakeTimeout: timeOut, } c, err := gofish.Connect(clientConfig) @@ -129,55 +155,55 @@ func createLegacySubscription(eventService *redfish.EventService, SubscriptionPa // Create subscriptions for all servers and return their URIs // Rollback if any subscription attempt fails -func CreateSubscriptionsForAllServers(redfishServers []RedfishServer, subscriptionPayload SubscriptionPayload) (map[string]string, error) { - var wg sync.WaitGroup - var mu sync.Mutex // to guard access to the map - - subscriptionMap := make(map[string]string) - - errChan := make(chan error, len(redfishServers)) - +func CreateSubscriptionsForAllServers(redfishServers []RedfishServer, subscriptionPayload SubscriptionPayload, subscriptionMap map[string]string, mu *sync.Mutex, tlsTimeout string) error { + failedSubsChan := make(chan RedfishSubsciptionFailedData) for _, server := range redfishServers { - wg.Add(1) - go func(server RedfishServer) { - defer wg.Done() + go doSubscription(server, subscriptionPayload, subscriptionMap, mu, failedSubsChan, tlsTimeout) + } - c, err := getRedfishClient(server) - if err != nil { - errChan <- fmt.Errorf("failed to connect to server %s: %v", server.IP, err) - return - } - defer c.Logout() + go periodicSubscriptionRetry(failedSubsChan, subscriptionMap, mu, tlsTimeout) + return nil +} - subscriptionURI, err := createSubscription(c, server, subscriptionPayload) - if err != nil { - errChan <- fmt.Errorf("subscription failed on server %s: %v", server.IP, err) - return - } - mu.Lock() - subscriptionMap[server.IP] = subscriptionURI - mu.Unlock() - log.Printf("Successfully created subscription on Redfish server %s: %s", server.IP, subscriptionURI) - }(server) - } +func periodicSubscriptionRetry(failedSubsChan chan RedfishSubsciptionFailedData, subscriptionMap map[string]string, mu *sync.Mutex, tlsTimeout string) { + failedSubsMap := map[string]RedfishSubsciptionFailedData{} - wg.Wait() - close(errChan) + ticker := time.NewTicker(PeriodicRetryTime * time.Second) + defer ticker.Stop() - // Any error that occurred during the subscription process - var allErrors []string - for err := range errChan { - if err != nil { - allErrors = append(allErrors, err.Error()) + for { + select { + case <-ticker.C: + for ip, data := range failedSubsMap { + log.Printf("Retrying subscription for: %v", ip) + go doSubscription(data.server, data.payload, subscriptionMap, mu, failedSubsChan, tlsTimeout) + delete(failedSubsMap, ip) + } + case data := <-failedSubsChan: + failedSubsMap[data.server.IP] = data } } +} - if len(allErrors) > 0 { - DeleteSubscriptionsFromAllServers(redfishServers, subscriptionMap) - return nil, fmt.Errorf("subscription process encountered errors: %s", allErrors) +func doSubscription(server RedfishServer, subscriptionPayload SubscriptionPayload, subscriptionMap map[string]string, mu *sync.Mutex, failedSubsChan chan RedfishSubsciptionFailedData, tlsTimeout string) { + c, err := getRedfishClient(server, tlsTimeout) + if err != nil { + log.Printf("[error] failed to connect to server %s: %v", server.IP, err) + failedSubsChan <- RedfishSubsciptionFailedData{server: server, payload: subscriptionPayload} + return } + defer c.Logout() - return subscriptionMap, nil + subscriptionURI, err := createSubscription(c, server, subscriptionPayload) + if err != nil { + log.Printf("[error] subscription failed on server %s: %v", server.IP, err) + failedSubsChan <- RedfishSubsciptionFailedData{server: server, payload: subscriptionPayload} + return + } + mu.Lock() + subscriptionMap[server.IP] = subscriptionURI + mu.Unlock() + log.Printf("Successfully created subscription on Redfish server %s: %s", server.IP, subscriptionURI) } // Delete all event subscriptions stored in the map @@ -192,7 +218,7 @@ func DeleteSubscriptionsFromAllServers(redfishServers []RedfishServer, subscript defer wg.Done() server := getServerInfo(redfishServers, serverIP) - c, err := getRedfishClient(server) + c, err := getRedfishClient(server, "") if err != nil { log.Printf("Failed to connect to server %s: %v", server.IP, err) return diff --git a/redfish-exporter/slurm/queue.go b/redfish-exporter/slurm/queue.go index cd40594..209259b 100644 --- a/redfish-exporter/slurm/queue.go +++ b/redfish-exporter/slurm/queue.go @@ -2,6 +2,7 @@ package slurm import ( "context" + "fmt" "log" "strings" @@ -9,14 +10,32 @@ import ( ) const ( - Drain = "DrainNode" + Drain = "DrainNode" + ExlcudeReasonSet = "DRAIN_EXCLUDE_REASON_SET" ) +type AddEventReq struct { + RedfishServerIP string + SlurmNodeName string + Severity string + Action string + DrainReason string + MessageId string + Message string + ExcludeStr string + ScontrolPath string +} + type eventsActionReq struct { redfishServerIP string slurmNodeName string severity string action string + drainReason string + messageId string + message string + excludeStr string + scontrolPath string } type SlurmQueue struct { @@ -28,12 +47,14 @@ func InitSlurmQueue(ctx context.Context) *SlurmQueue { return &SlurmQueue{ctx: ctx, queue: make(chan *eventsActionReq)} } -func (q *SlurmQueue) Add(redfishServerIP, slurmNodeName, severity, action string) { +func (q *SlurmQueue) Add(evt AddEventReq) { q.queue <- &eventsActionReq{ - redfishServerIP: redfishServerIP, - slurmNodeName: slurmNodeName, - severity: severity, - action: action, + redfishServerIP: evt.RedfishServerIP, + slurmNodeName: evt.SlurmNodeName, + severity: evt.Severity, + drainReason: evt.DrainReason, + excludeStr: evt.ExcludeStr, + scontrolPath: evt.ScontrolPath, } } @@ -52,35 +73,37 @@ func (q *SlurmQueue) ProcessEventActionQueue() { actionReq.redfishServerIP, actionReq.slurmNodeName, actionReq.severity, - actionReq.action).Inc() - return + "Drain").Inc() + } else { + metrics.SlurmAPISuccessMetric. + WithLabelValues( + actionReq.redfishServerIP, + actionReq.slurmNodeName, + actionReq.severity, + "Drain").Inc() } - metrics.SlurmAPISuccessMetric. - WithLabelValues( - actionReq.redfishServerIP, - actionReq.slurmNodeName, - actionReq.severity, - actionReq.action).Inc() } } } +func getDrainReasonString(prefix, msg, msgId, severity string) string { + ret := fmt.Sprintf("%s:redfishlistener:%s:%s:%s", prefix, severity, msgId, msg) + return ret +} + func (q *SlurmQueue) performEventAction(req *eventsActionReq) error { if len(strings.TrimSpace(req.slurmNodeName)) == 0 { return nil } - slurmClient := GetClient() - if slurmClient == nil { - return nil - } - - if req.action == Drain { - err := slurmClient.DrainNode(req.slurmNodeName) - if err != nil { - log.Printf("Error draining node: %v", err) - return err + err := DrainNodeWithScontrol(req.slurmNodeName, req.drainReason, req.excludeStr, req.scontrolPath) + if err != nil { + if strings.Contains(err.Error(), ExlcudeReasonSet) { + log.Printf("Node not drained: %v", err.Error()) + return nil } + log.Printf("Error draining node: %v", err) + return err } log.Printf("Performed action: %v on slurm node: %v successfully", req.action, req.slurmNodeName) diff --git a/redfish-exporter/slurm/slurm.go b/redfish-exporter/slurm/slurm.go index abbfd08..f2a6e49 100644 --- a/redfish-exporter/slurm/slurm.go +++ b/redfish-exporter/slurm/slurm.go @@ -2,10 +2,14 @@ package slurm import ( "context" + "encoding/json" "fmt" "log" "math" "net/http" + "os/exec" + "regexp" + "strings" "time" "github.com/nod-ai/ADA/redfish-exporter/api/generated/slurmrestdapi" @@ -67,6 +71,7 @@ func NewClient(slurmControlNode, slurmUser, slurmToken string) (*Client, error) c := &Client{apiClient: cl} log.Printf("[slurm] created slurm client for node: %v\n", slurmControlNode) + err := c.getConnectionStatus() if err != nil { log.Printf("[slurm] error in getting the connection status of the slurm node: %v, err: %+v\n", slurmControlNode, err) @@ -105,12 +110,14 @@ func (c *Client) ResumeNode(nodeName string) error { return nil } -func (c *Client) DrainNode(nodeName string) error { +func (c *Client) DrainNodeWithAPI(nodeName, reason, excludeStr, scontrolPath string) error { apiCall := func() (interface{}, *http.Response, error) { + uid := "0" ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) //slurm_v0039_update_node jreq := c.apiClient.SlurmAPI.SlurmV0039UpdateNode(ctx, nodeName) - req := slurmrestdapi.V0039UpdateNodeMsg{State: []string{"drain"}} + req := slurmrestdapi.V0039UpdateNodeMsg{State: []string{"drain"}, Reason: &reason, ReasonUid: &uid} + jreq = jreq.V0039UpdateNodeMsg(req) res, resp, err := c.apiClient.SlurmAPI.SlurmV0039UpdateNodeExecute(jreq) cancel() @@ -122,6 +129,14 @@ func (c *Client) DrainNode(nodeName string) error { return res, resp, nil } + curReason, err := c.GetNodeReasonWithAPI(nodeName) + if err != nil { + return err + } + log.Printf("node: %v, Reason: %v", nodeName, curReason) + if strings.Contains(curReason, excludeStr) { + return fmt.Errorf("%s: not draining node: %s | current reason: %s", ExlcudeReasonSet, nodeName, curReason) + } _, resp, err := CallWithRetry(apiCall, maxRetries, baseDelay) if err != nil { return err @@ -131,7 +146,33 @@ func (c *Client) DrainNode(nodeName string) error { return nil } -func (c *Client) GetNodes() ([]string, error) { +func DrainNodeWithScontrol(nodeName, reason, excludeStr, scontrolPath string) error { + + if excludeStr != "" { + curReason, err := GetNodeReasonWithScontrol(nodeName, scontrolPath) + if err != nil { + log.Printf("GetNodeReasonWithScontrol returned err: %v\n", err) + return err + } + + if curReason != "" { + re := regexp.MustCompile(excludeStr) + match := re.FindAllString(curReason, -1) + + if len(match) != 0 { + log.Printf("exlcudStr: %v, curReason: %v", excludeStr, curReason) + log.Printf("match: %v | len: %v", match, len(match)) + return fmt.Errorf("%s: not draining node: %s | current reason: %s", ExlcudeReasonSet, nodeName, curReason) + } + } + } + cmd := fmt.Sprintf("%s update NodeName=%s State=DRAIN Reason=\"%s\"", scontrolPath, nodeName, reason) + res := LocalCommandOutput(cmd) + log.Printf("Drain node result: %s", res) + return nil +} + +func (c *Client) GetNodesWithAPI() ([]string, error) { var nodes []string apiCall := func() (interface{}, *http.Response, error) { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) @@ -160,6 +201,183 @@ func (c *Client) GetNodes() ([]string, error) { return nodes, nil } +func (c *Client) GetNodeReasonWithAPI(nodeName string) (string, error) { + var reason string + apiCall := func() (interface{}, *http.Response, error) { + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + jreq := c.apiClient.SlurmAPI.SlurmV0039GetNode(ctx, nodeName) + res, resp, err := c.apiClient.SlurmAPI.SlurmV0039GetNodeExecute(jreq) + cancel() + if err != nil { + return res, resp, err + } else if resp.StatusCode != 200 { + return res, resp, fmt.Errorf("invalid status code: %v", resp.StatusCode) + } + return res, resp, nil + } + + res, resp, err := CallWithRetry(apiCall, maxRetries, baseDelay) + if err != nil { + return reason, err + } + defer resp.Body.Close() + + temp := res.(*slurmrestdapi.V0039NodesResponse) + nodes := temp.GetNodes() + if len(nodes) != 1 { + return reason, fmt.Errorf("GetNodeReason failed") + } + + reason = *nodes[0].Reason + log.Printf("[slurm] get node reasons(%s): %+v\n", nodeName, reason) + return reason, nil +} + +func GetNodeReasonWithScontrol(nodeName, scontrolPath string) (string, error) { + type scontrolShowNode struct { + Meta struct { + Plugins struct { + DataParser string `json:"data_parser"` + AccountingStorage string `json:"accounting_storage"` + } `json:"plugins"` + Command []string `json:"command"` + Slurm struct { + Version struct { + Major int `json:"major"` + Micro int `json:"micro"` + Minor int `json:"minor"` + } `json:"version"` + Release string `json:"release"` + } `json:"Slurm"` + } `json:"meta"` + Nodes []struct { + Architecture string `json:"architecture"` + BurstbufferNetworkAddress string `json:"burstbuffer_network_address"` + Boards int `json:"boards"` + BootTime int `json:"boot_time"` + ClusterName string `json:"cluster_name"` + Cores int `json:"cores"` + SpecializedCores int `json:"specialized_cores"` + CPUBinding int `json:"cpu_binding"` + CPULoad struct { + Set bool `json:"set"` + Infinite bool `json:"infinite"` + Number int `json:"number"` + } `json:"cpu_load"` + FreeMem struct { + Set bool `json:"set"` + Infinite bool `json:"infinite"` + Number int `json:"number"` + } `json:"free_mem"` + CPUs int `json:"cpus"` + EffectiveCPUs int `json:"effective_cpus"` + SpecializedCPUs string `json:"specialized_cpus"` + Energy struct { + AverageWatts int `json:"average_watts"` + BaseConsumedEnergy int `json:"base_consumed_energy"` + ConsumedEnergy int `json:"consumed_energy"` + CurrentWatts struct { + Set bool `json:"set"` + Infinite bool `json:"infinite"` + Number int `json:"number"` + } `json:"current_watts"` + PreviousConsumedEnergy int `json:"previous_consumed_energy"` + LastCollected int `json:"last_collected"` + } `json:"energy"` + ExternalSensors struct { + ConsumedEnergy struct { + Set bool `json:"set"` + Infinite bool `json:"infinite"` + Number int `json:"number"` + } `json:"consumed_energy"` + Temperature struct { + Set bool `json:"set"` + Infinite bool `json:"infinite"` + Number int `json:"number"` + } `json:"temperature"` + EnergyUpdateTime int `json:"energy_update_time"` + CurrentWatts int `json:"current_watts"` + } `json:"external_sensors"` + Extra string `json:"extra"` + Power struct { + MaximumWatts struct { + Set bool `json:"set"` + Infinite bool `json:"infinite"` + Number int `json:"number"` + } `json:"maximum_watts"` + CurrentWatts int `json:"current_watts"` + TotalEnergy int `json:"total_energy"` + NewMaximumWatts int `json:"new_maximum_watts"` + PeakWatts int `json:"peak_watts"` + LowestWatts int `json:"lowest_watts"` + NewJobTime int `json:"new_job_time"` + State int `json:"state"` + TimeStartDay int `json:"time_start_day"` + } `json:"power"` + Features []interface{} `json:"features"` + ActiveFeatures []interface{} `json:"active_features"` + Gres string `json:"gres"` + GresDrained string `json:"gres_drained"` + GresUsed string `json:"gres_used"` + LastBusy int `json:"last_busy"` + McsLabel string `json:"mcs_label"` + SpecializedMemory int `json:"specialized_memory"` + Name string `json:"name"` + NextStateAfterReboot []string `json:"next_state_after_reboot"` + Address string `json:"address"` + Hostname string `json:"hostname"` + State []string `json:"state"` + OperatingSystem string `json:"operating_system"` + Owner string `json:"owner"` + Partitions []string `json:"partitions"` + Port int `json:"port"` + RealMemory int `json:"real_memory"` + Comment string `json:"comment"` + Reason string `json:"reason"` + ReasonChangedAt int `json:"reason_changed_at"` + ReasonSetByUser string `json:"reason_set_by_user"` + ResumeAfter struct { + Set bool `json:"set"` + Infinite bool `json:"infinite"` + Number int `json:"number"` + } `json:"resume_after"` + Reservation string `json:"reservation"` + AllocMemory int `json:"alloc_memory"` + AllocCPUs int `json:"alloc_cpus"` + AllocIdleCPUs int `json:"alloc_idle_cpus"` + TresUsed string `json:"tres_used"` + TresWeighted float64 `json:"tres_weighted"` + SlurmdStartTime int `json:"slurmd_start_time"` + Sockets int `json:"sockets"` + Threads int `json:"threads"` + TemporaryDisk int `json:"temporary_disk"` + Weight int `json:"weight"` + Tres string `json:"tres"` + Version string `json:"version"` + } `json:"nodes"` + Warnings []interface{} `json:"warnings"` + Errors []interface{} `json:"errors"` + } + + cmd := fmt.Sprintf("%s show node %s --json", scontrolPath, nodeName) + ret := LocalCommandOutput(cmd) + + if ret == "" { + return "", fmt.Errorf("failed to get current node reason") + } + + res := scontrolShowNode{} + err := json.Unmarshal([]byte(ret), &res) + if err != nil { + return "", err + } + if len(res.Nodes) != 1 { + return "", fmt.Errorf("show node failed for %s", nodeName) + } + log.Printf("get node reasons(%s): %+v\n", nodeName, res.Nodes[0].Reason) + return res.Nodes[0].Reason, nil +} + func (c *Client) getConnectionStatus() error { apiCall := func() (interface{}, *http.Response, error) { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) @@ -196,3 +414,10 @@ func createRestClient(c *SlurmServerConfig) *slurmrestdapi.APIClient { client := slurmrestdapi.NewAPIClient(cfg) return client } + +// LocalCommandOutput runs a command on a node and returns output in string format +func LocalCommandOutput(command string) string { + log.Printf("Running cmd: %s\n", command) + out, _ := exec.Command("bash", "-c", command).CombinedOutput() + return strings.TrimSpace(string(out)) +}