From fcff1417142c5e2f80b3ef0fbde3baf711cd3905 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Leszko?= Date: Mon, 22 Jul 2024 12:13:30 +0200 Subject: [PATCH] Change Serf communication from local to http calls (#1330) --- api/http.go | 9 ++- api/http_internal.go | 14 +++-- cluster/cluster.go | 27 +++++--- config/cli.go | 1 + handlers/events.go | 78 ++++++++++++++++++++++-- handlers/events_test.go | 65 +++++++++++++++++++- handlers/geolocation/geolocation.go | 25 ++++++-- handlers/geolocation/geolocation_test.go | 24 +++++--- main.go | 69 +++++++-------------- mapic/mistapiconnector_app.go | 2 + 10 files changed, 228 insertions(+), 86 deletions(-) diff --git a/api/http.go b/api/http.go index ede34709a..4631a3e36 100644 --- a/api/http.go +++ b/api/http.go @@ -8,7 +8,6 @@ import ( "github.com/golang/glog" "github.com/julienschmidt/httprouter" "github.com/livepeer/catalyst-api/balancer" - "github.com/livepeer/catalyst-api/cluster" "github.com/livepeer/catalyst-api/config" "github.com/livepeer/catalyst-api/handlers" "github.com/livepeer/catalyst-api/handlers/analytics" @@ -21,8 +20,8 @@ import ( "github.com/livepeer/go-api-client" ) -func ListenAndServe(ctx context.Context, cli config.Cli, vodEngine *pipeline.Coordinator, bal balancer.Balancer, c cluster.Cluster, mapic mistapiconnector.IMac) error { - router := NewCatalystAPIRouter(cli, vodEngine, bal, c, mapic) +func ListenAndServe(ctx context.Context, cli config.Cli, vodEngine *pipeline.Coordinator, bal balancer.Balancer, mapic mistapiconnector.IMac) error { + router := NewCatalystAPIRouter(cli, vodEngine, bal, mapic) server := http.Server{Addr: cli.HTTPAddress, Handler: router} ctx, cancel := context.WithCancel(ctx) @@ -48,7 +47,7 @@ func ListenAndServe(ctx context.Context, cli config.Cli, vodEngine *pipeline.Coo return server.Shutdown(ctx) } -func NewCatalystAPIRouter(cli config.Cli, vodEngine *pipeline.Coordinator, bal balancer.Balancer, c cluster.Cluster, mapic mistapiconnector.IMac) *httprouter.Router { +func NewCatalystAPIRouter(cli config.Cli, vodEngine *pipeline.Coordinator, bal balancer.Balancer, mapic mistapiconnector.IMac) *httprouter.Router { router := httprouter.New() withLogging := middleware.LogRequest() withCORS := middleware.AllowCORS() @@ -59,7 +58,7 @@ func NewCatalystAPIRouter(cli config.Cli, vodEngine *pipeline.Coordinator, bal b AccessToken: cli.APIToken, }) catalystApiHandlers := &handlers.CatalystAPIHandlersCollection{VODEngine: vodEngine} - geoHandlers := geolocation.NewGeolocationHandlersCollection(bal, c, cli, lapi) + geoHandlers := geolocation.NewGeolocationHandlersCollection(bal, cli, lapi) router.GET("/ok", withLogging(catalystApiHandlers.Ok())) router.GET("/healthcheck", withLogging(catalystApiHandlers.Healthcheck())) diff --git a/api/http_internal.go b/api/http_internal.go index e975c3474..db6c62ccc 100644 --- a/api/http_internal.go +++ b/api/http_internal.go @@ -68,12 +68,12 @@ func NewCatalystAPIRouterInternal(cli config.Cli, vodEngine *pipeline.Coordinato Server: cli.APIServer, AccessToken: cli.APIToken, }) - geoHandlers := geolocation.NewGeolocationHandlersCollection(bal, c, cli, lapi) + geoHandlers := geolocation.NewGeolocationHandlersCollection(bal, cli, lapi) spkiPublicKey, _ := crypto.ConvertToSpki(cli.VodDecryptPublicKey) catalystApiHandlers := &handlers.CatalystAPIHandlersCollection{VODEngine: vodEngine} - eventsHandler := &handlers.EventsHandlersCollection{Cluster: c} + eventsHandler := handlers.NewEventsHandlersCollection(c, mapic, bal) ffmpegSegmentingHandlers := &ffmpeg.HandlersCollection{VODEngine: vodEngine} accessControlHandlers := accesscontrol.NewAccessControlHandlersCollection(cli, mapic) analyticsHandlers := analytics.NewAnalyticsHandler(metricsDB) @@ -109,9 +109,14 @@ func NewCatalystAPIRouterInternal(cli config.Cli, vodEngine *pipeline.Coordinato ), ) - // Public handler to propagate an event to all Catalyst nodes + // Handler to get members Catalyst API => Catalyst + router.GET("/api/serf/members", withLogging(adminHandlers.MembersHandler())) + // Public handler to propagate an event to all Catalyst nodes, execute from Studio API => Catalyst router.POST("/api/events", withLogging(eventsHandler.Events())) + // Handler to forward the user event from Catalyst => Catalyst API + router.POST("/api/serf/receiveUserEvent", withLogging(eventsHandler.ReceiveUserEvent())) + // Public GET handler to retrieve the public key for vod encryption router.GET("/api/pubkey", withLogging(encryptionHandlers.PublicKeyHandler())) @@ -130,9 +135,6 @@ func NewCatalystAPIRouterInternal(cli config.Cli, vodEngine *pipeline.Coordinato // Endpoint to receive segments and manifests that ffmpeg produces router.POST("/api/ffmpeg/:id/:filename", withLogging(ffmpegSegmentingHandlers.NewFile())) - // Temporary endpoint for admin queries - router.GET("/admin/members", withLogging(adminHandlers.MembersHandler())) - return router } diff --git a/cluster/cluster.go b/cluster/cluster.go index f9d83dc68..e83173383 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -182,10 +182,25 @@ func (c *ClusterImpl) retryJoin(ctx context.Context) { } func (c *ClusterImpl) MembersFiltered(filter map[string]string, status, name string) ([]Member, error) { - all := c.serf.Members() - nodes := []Member{} + return FilterMembers(toClusterMembers(c.serf.Members()), filter, status, name) +} + +func toClusterMembers(members []serf.Member) []Member { + var nodes []Member + for _, member := range members { + nodes = append(nodes, Member{ + Name: member.Name, + Tags: member.Tags, + Status: member.Status.String(), + }) + } + return nodes +} + +func FilterMembers(all []Member, filter map[string]string, status string, name string) ([]Member, error) { + var nodes []Member for _, member := range all { - if status != "" && status != member.Status.String() { + if status != "" && status != member.Status { continue } if name != "" && name != member.Name { @@ -200,11 +215,7 @@ func (c *ClusterImpl) MembersFiltered(filter map[string]string, status, name str } } if matches { - nodes = append(nodes, Member{ - Name: member.Name, - Tags: member.Tags, - Status: member.Status.String(), - }) + nodes = append(nodes, member) } } return nodes, nil diff --git a/config/cli.go b/config/cli.go index a968ce80f..418812fea 100644 --- a/config/cli.go +++ b/config/cli.go @@ -71,6 +71,7 @@ type Cli struct { KafkaUser string KafkaPassword string AnalyticsKafkaTopic string + SerfMembersEndpoint string // mapping playbackId to value between 0.0 to 100.0 CdnRedirectPlaybackPct map[string]float64 diff --git a/handlers/events.go b/handlers/events.go index 7efac8eb2..a7d78abd3 100644 --- a/handlers/events.go +++ b/handlers/events.go @@ -3,25 +3,43 @@ package handlers import ( "encoding/json" "fmt" + "github.com/golang/glog" "github.com/hashicorp/serf/serf" "github.com/julienschmidt/httprouter" + "github.com/livepeer/catalyst-api/balancer" "github.com/livepeer/catalyst-api/cluster" "github.com/livepeer/catalyst-api/errors" + "github.com/livepeer/catalyst-api/events" + mistapiconnector "github.com/livepeer/catalyst-api/mapic" "github.com/xeipuuv/gojsonschema" "io" "net/http" + "strings" ) type EventsHandlersCollection struct { - Cluster cluster.Cluster + cluster cluster.Cluster + + mapic mistapiconnector.IMac + bal balancer.Balancer } -func (d *EventsHandlersCollection) Events() httprouter.Handle { - type Event struct { - Resource string `json:"resource"` - PlaybackID string `json:"playback_id"` +type Event struct { + Resource string `json:"resource"` + PlaybackID string `json:"playback_id"` +} + +func NewEventsHandlersCollection(cluster cluster.Cluster, mapic mistapiconnector.IMac, bal balancer.Balancer) *EventsHandlersCollection { + return &EventsHandlersCollection{ + cluster: cluster, + mapic: mapic, + bal: bal, } +} +// Events is a handler called by Studio API to send an event, e.g., to refresh a stream or nuke a stream. +// This event is then propagated to all Serf nodes and then forwarded to catalyst-api and handled by ReceiveUserEvent(). +func (d *EventsHandlersCollection) Events() httprouter.Handle { schema := inputSchemasCompiled["Event"] return func(w http.ResponseWriter, req *http.Request, _ httprouter.Params) { payload, err := io.ReadAll(req.Body) @@ -44,7 +62,7 @@ func (d *EventsHandlersCollection) Events() httprouter.Handle { return } - err = d.Cluster.BroadcastEvent(serf.UserEvent{ + err = d.cluster.BroadcastEvent(serf.UserEvent{ Name: fmt.Sprintf("%s-%s", event.Resource, event.PlaybackID), Payload: payload, Coalesce: true, @@ -56,3 +74,51 @@ func (d *EventsHandlersCollection) Events() httprouter.Handle { } } } + +// ReceiveUserEvent is a handler to receive Serf events from Catalyst. +// The idea is that: +// 1. Studio API sends an event to Catalyst (received by Events() handler) +// 2. Events() handler propagates the event to all Serf nodes +// 3. Each Serf node sends tne event to its corresponding catalyst-api instance (to the ReceiveUserEvent() handler) +// 4. ReceiveUserEvent() handler processes the event +func (c *EventsHandlersCollection) ReceiveUserEvent() httprouter.Handle { + return func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { + userEventPayload, err := io.ReadAll(r.Body) + if err != nil { + glog.Errorf("cannot read payload: %s", err) + return + } + e, err := events.Unmarshal(userEventPayload) + if err != nil { + glog.Errorf("cannot unmarshal received serf event %v: %s", userEventPayload, err) + return + } + switch event := e.(type) { + case *events.StreamEvent: + glog.V(5).Infof("received serf StreamEvent: %v", event.PlaybackID) + c.mapic.RefreshStreamIfNeeded(event.PlaybackID) + case *events.NukeEvent: + glog.V(5).Infof("received serf NukeEvent: %v", event.PlaybackID) + c.mapic.NukeStream(event.PlaybackID) + return + case *events.StopSessionsEvent: + glog.V(5).Infof("received serf StopSessionsEvent: %v", event.PlaybackID) + c.mapic.StopSessions(event.PlaybackID) + return + case *events.NodeUpdateEvent: + if glog.V(5) { + glog.Infof("received serf NodeUpdateEvent. Node: %s. Length: %d bytes. Ingest Streams: %v. Non-Ingest Streams: %v", event.NodeID, len(userEventPayload), strings.Join(event.GetIngestStreams(), ","), strings.Join(event.GetStreams(), ",")) + } + + c.bal.UpdateNodes(event.NodeID, event.NodeMetrics) + for _, stream := range event.GetStreams() { + c.bal.UpdateStreams(event.NodeID, stream, false) + } + for _, stream := range event.GetIngestStreams() { + c.bal.UpdateStreams(event.NodeID, stream, true) + } + default: + glog.Errorf("unsupported serf event: %v", e) + } + } +} diff --git a/handlers/events_test.go b/handlers/events_test.go index 797bc0f3a..57e31f519 100644 --- a/handlers/events_test.go +++ b/handlers/events_test.go @@ -5,6 +5,7 @@ import ( "github.com/hashicorp/serf/serf" "github.com/julienschmidt/httprouter" mockcluster "github.com/livepeer/catalyst-api/mocks/cluster" + mock_mistapiconnector "github.com/livepeer/catalyst-api/mocks/mistapiconnector" "github.com/stretchr/testify/require" "net/http" "net/http/httptest" @@ -62,7 +63,7 @@ func TestEventHandler(t *testing.T) { return nil }).AnyTimes() - catalystApiHandlers := EventsHandlersCollection{Cluster: mc} + catalystApiHandlers := NewEventsHandlersCollection(mc, nil, nil) router := httprouter.New() router.POST("/events", catalystApiHandlers.Events()) @@ -74,3 +75,65 @@ func TestEventHandler(t *testing.T) { require.Equal(rr.Result().StatusCode, tt.wantHttpCode) } } + +func TestReceiveUserEventHandler(t *testing.T) { + require := require.New(t) + playbackId := "123456789" + + tests := []struct { + name string + requestBody string + functionCalled string + }{ + { + name: "Refresh Stream", + requestBody: `{ + "resource": "stream", + "playback_id": "123456789" + }`, + functionCalled: "RefreshStreamIfNeeded", + }, + { + name: "Nuke Stream", + requestBody: `{ + "resource": "nuke", + "playback_id": "123456789" + }`, + functionCalled: "NukeStream", + }, + { + name: "Stop Sessions", + requestBody: `{ + "resource": "stopSessions", + "playback_id": "123456789" + }`, + functionCalled: "StopSessions", + }, + } + + ctrl := gomock.NewController(t) + mac := mock_mistapiconnector.NewMockIMac(ctrl) + + catalystApiHandlers := NewEventsHandlersCollection(nil, mac, nil) + router := httprouter.New() + router.POST("/receiveUserEvent", catalystApiHandlers.ReceiveUserEvent()) + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + switch tt.functionCalled { + case "RefreshStreamIfNeeded": + mac.EXPECT().RefreshStreamIfNeeded(playbackId).Times(1) + case "NukeStream": + mac.EXPECT().NukeStream(playbackId).Times(1) + case "StopSessions": + mac.EXPECT().StopSessions(playbackId).Times(1) + } + + req, _ := http.NewRequest("POST", "/receiveUserEvent", strings.NewReader(tt.requestBody)) + rr := httptest.NewRecorder() + router.ServeHTTP(rr, req) + + require.Equal(rr.Result().StatusCode, 200) + }) + } +} diff --git a/handlers/geolocation/geolocation.go b/handlers/geolocation/geolocation.go index b6ce64f4d..fcbba6957 100644 --- a/handlers/geolocation/geolocation.go +++ b/handlers/geolocation/geolocation.go @@ -67,16 +67,14 @@ func (l *streamPullRateLimit) mark(playbackID string) { type GeolocationHandlersCollection struct { Balancer balancer.Balancer - Cluster cluster.Cluster Config config.Cli Lapi *api.Client streamPullRateLimit *streamPullRateLimit } -func NewGeolocationHandlersCollection(balancer balancer.Balancer, cluster cluster.Cluster, config config.Cli, lapi *api.Client) *GeolocationHandlersCollection { +func NewGeolocationHandlersCollection(balancer balancer.Balancer, config config.Cli, lapi *api.Client) *GeolocationHandlersCollection { return &GeolocationHandlersCollection{ Balancer: balancer, - Cluster: cluster, Config: config, Lapi: lapi, streamPullRateLimit: newStreamPullRateLimit(streamSourceRetryInterval), @@ -244,7 +242,7 @@ func (c *GeolocationHandlersCollection) resolveNodeURL(streamURL string) (string } func (c *GeolocationHandlersCollection) clusterMember(filter map[string]string, status, name string) (cluster.Member, error) { - members, err := c.Cluster.MembersFiltered(filter, "", name) + members, err := c.membersFiltered(filter, "", name) if err != nil { return cluster.Member{}, err } @@ -404,7 +402,7 @@ func (c *GeolocationHandlersCollection) getStreamPull(playbackID string, retryCo } func (c *GeolocationHandlersCollection) sendPlaybackRequestAsync(playbackID string, region string) { - members, err := c.Cluster.MembersFiltered(map[string]string{"region": region}, "", "") + members, err := c.membersFiltered(map[string]string{"region": region}, "", "") if err != nil || len(members) == 0 { glog.Errorf("Error fetching member list: %v", err) return @@ -422,6 +420,23 @@ func (c *GeolocationHandlersCollection) sendPlaybackRequestAsync(playbackID stri }() } +func (c *GeolocationHandlersCollection) membersFiltered(filter map[string]string, status, name string) ([]cluster.Member, error) { + resp, err := http.Get(c.Config.SerfMembersEndpoint) + if err != nil { + return []cluster.Member{}, err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return []cluster.Member{}, fmt.Errorf("failed to get members: %s", resp.Status) + } + var members []cluster.Member + if err := json.NewDecoder(resp.Body).Decode(&members); err != nil { + return []cluster.Member{}, err + } + + return cluster.FilterMembers(members, filter, status, name) +} + func parsePlus(plusString string) (string, string) { slice := strings.Split(plusString, "+") prefix := "" diff --git a/handlers/geolocation/geolocation_test.go b/handlers/geolocation/geolocation_test.go index da5578d68..94115bdea 100644 --- a/handlers/geolocation/geolocation_test.go +++ b/handlers/geolocation/geolocation_test.go @@ -2,6 +2,7 @@ package geolocation import ( "context" + "encoding/json" "errors" "fmt" "math/rand" @@ -17,7 +18,6 @@ import ( "github.com/livepeer/catalyst-api/config" "github.com/livepeer/catalyst-api/metrics" mockbalancer "github.com/livepeer/catalyst-api/mocks/balancer" - mockcluster "github.com/livepeer/catalyst-api/mocks/cluster" "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/require" ) @@ -30,7 +30,7 @@ const ( ) var fakeSerfMember = cluster.Member{ - Name: "fake-serf-member", + Name: "someurl.com", Tags: map[string]string{ "http": fmt.Sprintf("http://%s", closestNodeAddr), "https": fmt.Sprintf("https://%s", closestNodeAddr), @@ -142,7 +142,6 @@ func getHLSURLsWithSeg(proto, host, seg, query string) []string { func mockHandlers(t *testing.T) *GeolocationHandlersCollection { ctrl := gomock.NewController(t) mb := mockbalancer.NewMockBalancer(ctrl) - mc := mockcluster.NewMockCluster(ctrl) mb.EXPECT(). GetBestNode(context.Background(), prefixes[:], playbackID, "", "", "", gomock.Any()). AnyTimes(). @@ -158,16 +157,23 @@ func mockHandlers(t *testing.T) *GeolocationHandlersCollection { AnyTimes(). Return("", "", errors.New("")) - mc.EXPECT(). - MembersFiltered(map[string]string{}, gomock.Any(), closestNodeAddr). - AnyTimes(). - Return([]cluster.Member{fakeSerfMember}, nil) + router := httprouter.New() + router.GET("/api/serf/members", func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { + w.Header().Set("Content-Type", "application/json") + res, err := json.Marshal([]cluster.Member{fakeSerfMember}) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + w.Write(res) // nolint:errcheck + }) + testServer := httptest.NewServer(router) coll := GeolocationHandlersCollection{ Balancer: mb, - Cluster: mc, Config: config.Cli{ - RedirectPrefixes: prefixes[:], + RedirectPrefixes: prefixes[:], + SerfMembersEndpoint: fmt.Sprintf("%s/api/serf/members", testServer.URL), }, } return &coll diff --git a/main.go b/main.go index 7a70b0b1f..ca30baef9 100644 --- a/main.go +++ b/main.go @@ -1,15 +1,16 @@ package main import ( + "bytes" "context" "crypto/rsa" "database/sql" "flag" "fmt" "log" + "net/http" "os" "os/signal" - "strings" "syscall" "time" @@ -127,6 +128,7 @@ func main() { fs.StringVar(&cli.KafkaUser, "kafka-user", "", "Kafka Username") fs.StringVar(&cli.KafkaPassword, "kafka-password", "", "Kafka Password") fs.StringVar(&cli.AnalyticsKafkaTopic, "analytics-kafka-topic", "", "Kafka Topic used to send analytics logs") + fs.StringVar(&cli.SerfMembersEndpoint, "serf-members-endpoint", "http://127.0.0.1:7979/api/serf/members", "Endpoint to get the current members in the cluster") pprofPort := fs.Int("pprof-port", 6061, "Pprof listen port") fs.String("send-audio", "", "[DEPRECATED] ignored, will be removed") @@ -306,7 +308,7 @@ func main() { }) group.Go(func() error { - return api.ListenAndServe(ctx, cli, vodEngine, bal, c, mapic) + return api.ListenAndServe(ctx, cli, vodEngine, bal, mapic) }) group.Go(func() error { @@ -333,7 +335,8 @@ func main() { }) group.Go(func() error { - return handleClusterEvents(ctx, mapic, bal, c) + serfUserEventCallbackEndpoint := fmt.Sprintf("%s/api/serf/receiveUserEvent", cli.OwnInternalURL()) + return handleClusterEvents(ctx, serfUserEventCallbackEndpoint, c) }) err = group.Wait() @@ -343,75 +346,49 @@ func main() { // Eventually this will be the main loop of the state machine, but we just have one variable right now. func reconcileBalancer(ctx context.Context, bal balancer.Balancer, c cluster.Cluster) error { memberCh := c.MemberChan() - ticker := time.NewTicker(1 * time.Minute) for { - var members []cluster.Member - var err error select { case <-ctx.Done(): return nil - case <-ticker.C: - members, err = c.MembersFiltered(cluster.MediaFilter, "alive", "") + case list := <-memberCh: + err := bal.UpdateMembers(ctx, list) if err != nil { - glog.Errorf("Error getting serf members: %v", err) - continue + return fmt.Errorf("failed to update load balancer from member list: %w", err) } - case members = <-memberCh: - } - err = bal.UpdateMembers(ctx, members) - if err != nil { - glog.Errorf("Failed to update load balancer from member list: %v", err) - continue } } } -func handleClusterEvents(ctx context.Context, mapic mistapiconnector.IMac, bal balancer.Balancer, c cluster.Cluster) error { +func handleClusterEvents(ctx context.Context, callbackEndpoint string, c cluster.Cluster) error { eventCh := c.EventChan() for { select { case <-ctx.Done(): return nil case e := <-eventCh: - processClusterEvent(mapic, bal, e) + processClusterEvent(callbackEndpoint, e) } } } -func processClusterEvent(mapic mistapiconnector.IMac, bal balancer.Balancer, userEvent serf.UserEvent) { +func processClusterEvent(callbackEndpoint string, userEvent serf.UserEvent) { + client := &http.Client{} + glog.V(5).Infof("received serf user event, propagating to %s, event=%s", callbackEndpoint, userEvent.String()) + go func() { - e, err := events.Unmarshal(userEvent.Payload) + req, err := http.NewRequest("POST", callbackEndpoint, bytes.NewBuffer(userEvent.Payload)) if err != nil { - glog.Errorf("cannot unmarshal received serf event %v: %s", userEvent, err) + glog.Errorf("error creating request: %v", err) return } - switch event := e.(type) { - case *events.StreamEvent: - glog.V(5).Infof("received serf StreamEvent: %v", event.PlaybackID) - mapic.RefreshStreamIfNeeded(event.PlaybackID) - case *events.NukeEvent: - glog.V(5).Infof("received serf NukeEvent: %v", event.PlaybackID) - mapic.NukeStream(event.PlaybackID) - return - case *events.StopSessionsEvent: - glog.V(5).Infof("received serf StopSessionsEvent: %v", event.PlaybackID) - mapic.StopSessions(event.PlaybackID) + resp, err := client.Do(req) + if err != nil { + glog.Errorf("error sending request: %v", err) return - case *events.NodeUpdateEvent: - if glog.V(5) { - glog.Infof("received serf NodeUpdateEvent. Node: %s. Length: %d bytes. Ingest Streams: %v. Non-Ingest Streams: %v", event.NodeID, len(userEvent.Payload), strings.Join(event.GetIngestStreams(), ","), strings.Join(event.GetStreams(), ",")) - } - - bal.UpdateNodes(event.NodeID, event.NodeMetrics) - for _, stream := range event.GetStreams() { - bal.UpdateStreams(event.NodeID, stream, false) - } - for _, stream := range event.GetIngestStreams() { - bal.UpdateStreams(event.NodeID, stream, true) - } - default: - glog.Errorf("unsupported serf event: %v", e) } + defer resp.Body.Close() + + glog.V(5).Infof("propagated serf user event to %s, event=%s", callbackEndpoint, userEvent.String()) }() } diff --git a/mapic/mistapiconnector_app.go b/mapic/mistapiconnector_app.go index feec9887b..ac7cc1857 100644 --- a/mapic/mistapiconnector_app.go +++ b/mapic/mistapiconnector_app.go @@ -1,6 +1,8 @@ //nolint:all package mistapiconnector +//go:generate mockgen -source=./mistapiconnector_app.go -destination=../mocks/mistapiconnector/mistapiconnector_app.go + import ( "context" "errors"