Skip to content

Commit

Permalink
Change Serf communication from local to http calls (#1330)
Browse files Browse the repository at this point in the history
  • Loading branch information
leszko authored Jul 22, 2024
1 parent 775ce78 commit fcff141
Show file tree
Hide file tree
Showing 10 changed files with 228 additions and 86 deletions.
9 changes: 4 additions & 5 deletions api/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/golang/glog"
"github.com/julienschmidt/httprouter"
"github.com/livepeer/catalyst-api/balancer"
"github.com/livepeer/catalyst-api/cluster"
"github.com/livepeer/catalyst-api/config"
"github.com/livepeer/catalyst-api/handlers"
"github.com/livepeer/catalyst-api/handlers/analytics"
Expand All @@ -21,8 +20,8 @@ import (
"github.com/livepeer/go-api-client"
)

func ListenAndServe(ctx context.Context, cli config.Cli, vodEngine *pipeline.Coordinator, bal balancer.Balancer, c cluster.Cluster, mapic mistapiconnector.IMac) error {
router := NewCatalystAPIRouter(cli, vodEngine, bal, c, mapic)
func ListenAndServe(ctx context.Context, cli config.Cli, vodEngine *pipeline.Coordinator, bal balancer.Balancer, mapic mistapiconnector.IMac) error {
router := NewCatalystAPIRouter(cli, vodEngine, bal, mapic)
server := http.Server{Addr: cli.HTTPAddress, Handler: router}
ctx, cancel := context.WithCancel(ctx)

Expand All @@ -48,7 +47,7 @@ func ListenAndServe(ctx context.Context, cli config.Cli, vodEngine *pipeline.Coo
return server.Shutdown(ctx)
}

func NewCatalystAPIRouter(cli config.Cli, vodEngine *pipeline.Coordinator, bal balancer.Balancer, c cluster.Cluster, mapic mistapiconnector.IMac) *httprouter.Router {
func NewCatalystAPIRouter(cli config.Cli, vodEngine *pipeline.Coordinator, bal balancer.Balancer, mapic mistapiconnector.IMac) *httprouter.Router {
router := httprouter.New()
withLogging := middleware.LogRequest()
withCORS := middleware.AllowCORS()
Expand All @@ -59,7 +58,7 @@ func NewCatalystAPIRouter(cli config.Cli, vodEngine *pipeline.Coordinator, bal b
AccessToken: cli.APIToken,
})
catalystApiHandlers := &handlers.CatalystAPIHandlersCollection{VODEngine: vodEngine}
geoHandlers := geolocation.NewGeolocationHandlersCollection(bal, c, cli, lapi)
geoHandlers := geolocation.NewGeolocationHandlersCollection(bal, cli, lapi)

router.GET("/ok", withLogging(catalystApiHandlers.Ok()))
router.GET("/healthcheck", withLogging(catalystApiHandlers.Healthcheck()))
Expand Down
14 changes: 8 additions & 6 deletions api/http_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ func NewCatalystAPIRouterInternal(cli config.Cli, vodEngine *pipeline.Coordinato
Server: cli.APIServer,
AccessToken: cli.APIToken,
})
geoHandlers := geolocation.NewGeolocationHandlersCollection(bal, c, cli, lapi)
geoHandlers := geolocation.NewGeolocationHandlersCollection(bal, cli, lapi)

spkiPublicKey, _ := crypto.ConvertToSpki(cli.VodDecryptPublicKey)

catalystApiHandlers := &handlers.CatalystAPIHandlersCollection{VODEngine: vodEngine}
eventsHandler := &handlers.EventsHandlersCollection{Cluster: c}
eventsHandler := handlers.NewEventsHandlersCollection(c, mapic, bal)
ffmpegSegmentingHandlers := &ffmpeg.HandlersCollection{VODEngine: vodEngine}
accessControlHandlers := accesscontrol.NewAccessControlHandlersCollection(cli, mapic)
analyticsHandlers := analytics.NewAnalyticsHandler(metricsDB)
Expand Down Expand Up @@ -109,9 +109,14 @@ func NewCatalystAPIRouterInternal(cli config.Cli, vodEngine *pipeline.Coordinato
),
)

// Public handler to propagate an event to all Catalyst nodes
// Handler to get members Catalyst API => Catalyst
router.GET("/api/serf/members", withLogging(adminHandlers.MembersHandler()))
// Public handler to propagate an event to all Catalyst nodes, execute from Studio API => Catalyst
router.POST("/api/events", withLogging(eventsHandler.Events()))

// Handler to forward the user event from Catalyst => Catalyst API
router.POST("/api/serf/receiveUserEvent", withLogging(eventsHandler.ReceiveUserEvent()))

// Public GET handler to retrieve the public key for vod encryption
router.GET("/api/pubkey", withLogging(encryptionHandlers.PublicKeyHandler()))

Expand All @@ -130,9 +135,6 @@ func NewCatalystAPIRouterInternal(cli config.Cli, vodEngine *pipeline.Coordinato
// Endpoint to receive segments and manifests that ffmpeg produces
router.POST("/api/ffmpeg/:id/:filename", withLogging(ffmpegSegmentingHandlers.NewFile()))

// Temporary endpoint for admin queries
router.GET("/admin/members", withLogging(adminHandlers.MembersHandler()))

return router
}

Expand Down
27 changes: 19 additions & 8 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,25 @@ func (c *ClusterImpl) retryJoin(ctx context.Context) {
}

func (c *ClusterImpl) MembersFiltered(filter map[string]string, status, name string) ([]Member, error) {
all := c.serf.Members()
nodes := []Member{}
return FilterMembers(toClusterMembers(c.serf.Members()), filter, status, name)
}

func toClusterMembers(members []serf.Member) []Member {
var nodes []Member
for _, member := range members {
nodes = append(nodes, Member{
Name: member.Name,
Tags: member.Tags,
Status: member.Status.String(),
})
}
return nodes
}

func FilterMembers(all []Member, filter map[string]string, status string, name string) ([]Member, error) {
var nodes []Member
for _, member := range all {
if status != "" && status != member.Status.String() {
if status != "" && status != member.Status {
continue
}
if name != "" && name != member.Name {
Expand All @@ -200,11 +215,7 @@ func (c *ClusterImpl) MembersFiltered(filter map[string]string, status, name str
}
}
if matches {
nodes = append(nodes, Member{
Name: member.Name,
Tags: member.Tags,
Status: member.Status.String(),
})
nodes = append(nodes, member)
}
}
return nodes, nil
Expand Down
1 change: 1 addition & 0 deletions config/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type Cli struct {
KafkaUser string
KafkaPassword string
AnalyticsKafkaTopic string
SerfMembersEndpoint string

// mapping playbackId to value between 0.0 to 100.0
CdnRedirectPlaybackPct map[string]float64
Expand Down
78 changes: 72 additions & 6 deletions handlers/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,43 @@ package handlers
import (
"encoding/json"
"fmt"
"github.com/golang/glog"
"github.com/hashicorp/serf/serf"
"github.com/julienschmidt/httprouter"
"github.com/livepeer/catalyst-api/balancer"
"github.com/livepeer/catalyst-api/cluster"
"github.com/livepeer/catalyst-api/errors"
"github.com/livepeer/catalyst-api/events"
mistapiconnector "github.com/livepeer/catalyst-api/mapic"
"github.com/xeipuuv/gojsonschema"
"io"
"net/http"
"strings"
)

type EventsHandlersCollection struct {
Cluster cluster.Cluster
cluster cluster.Cluster

mapic mistapiconnector.IMac
bal balancer.Balancer
}

func (d *EventsHandlersCollection) Events() httprouter.Handle {
type Event struct {
Resource string `json:"resource"`
PlaybackID string `json:"playback_id"`
type Event struct {
Resource string `json:"resource"`
PlaybackID string `json:"playback_id"`
}

func NewEventsHandlersCollection(cluster cluster.Cluster, mapic mistapiconnector.IMac, bal balancer.Balancer) *EventsHandlersCollection {
return &EventsHandlersCollection{
cluster: cluster,
mapic: mapic,
bal: bal,
}
}

// Events is a handler called by Studio API to send an event, e.g., to refresh a stream or nuke a stream.
// This event is then propagated to all Serf nodes and then forwarded to catalyst-api and handled by ReceiveUserEvent().
func (d *EventsHandlersCollection) Events() httprouter.Handle {
schema := inputSchemasCompiled["Event"]
return func(w http.ResponseWriter, req *http.Request, _ httprouter.Params) {
payload, err := io.ReadAll(req.Body)
Expand All @@ -44,7 +62,7 @@ func (d *EventsHandlersCollection) Events() httprouter.Handle {
return
}

err = d.Cluster.BroadcastEvent(serf.UserEvent{
err = d.cluster.BroadcastEvent(serf.UserEvent{
Name: fmt.Sprintf("%s-%s", event.Resource, event.PlaybackID),
Payload: payload,
Coalesce: true,
Expand All @@ -56,3 +74,51 @@ func (d *EventsHandlersCollection) Events() httprouter.Handle {
}
}
}

// ReceiveUserEvent is a handler to receive Serf events from Catalyst.
// The idea is that:
// 1. Studio API sends an event to Catalyst (received by Events() handler)
// 2. Events() handler propagates the event to all Serf nodes
// 3. Each Serf node sends tne event to its corresponding catalyst-api instance (to the ReceiveUserEvent() handler)
// 4. ReceiveUserEvent() handler processes the event
func (c *EventsHandlersCollection) ReceiveUserEvent() httprouter.Handle {
return func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
userEventPayload, err := io.ReadAll(r.Body)
if err != nil {
glog.Errorf("cannot read payload: %s", err)
return
}
e, err := events.Unmarshal(userEventPayload)
if err != nil {
glog.Errorf("cannot unmarshal received serf event %v: %s", userEventPayload, err)
return
}
switch event := e.(type) {
case *events.StreamEvent:
glog.V(5).Infof("received serf StreamEvent: %v", event.PlaybackID)
c.mapic.RefreshStreamIfNeeded(event.PlaybackID)
case *events.NukeEvent:
glog.V(5).Infof("received serf NukeEvent: %v", event.PlaybackID)
c.mapic.NukeStream(event.PlaybackID)
return
case *events.StopSessionsEvent:
glog.V(5).Infof("received serf StopSessionsEvent: %v", event.PlaybackID)
c.mapic.StopSessions(event.PlaybackID)
return
case *events.NodeUpdateEvent:
if glog.V(5) {
glog.Infof("received serf NodeUpdateEvent. Node: %s. Length: %d bytes. Ingest Streams: %v. Non-Ingest Streams: %v", event.NodeID, len(userEventPayload), strings.Join(event.GetIngestStreams(), ","), strings.Join(event.GetStreams(), ","))
}

c.bal.UpdateNodes(event.NodeID, event.NodeMetrics)
for _, stream := range event.GetStreams() {
c.bal.UpdateStreams(event.NodeID, stream, false)
}
for _, stream := range event.GetIngestStreams() {
c.bal.UpdateStreams(event.NodeID, stream, true)
}
default:
glog.Errorf("unsupported serf event: %v", e)
}
}
}
65 changes: 64 additions & 1 deletion handlers/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/hashicorp/serf/serf"
"github.com/julienschmidt/httprouter"
mockcluster "github.com/livepeer/catalyst-api/mocks/cluster"
mock_mistapiconnector "github.com/livepeer/catalyst-api/mocks/mistapiconnector"
"github.com/stretchr/testify/require"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -62,7 +63,7 @@ func TestEventHandler(t *testing.T) {
return nil
}).AnyTimes()

catalystApiHandlers := EventsHandlersCollection{Cluster: mc}
catalystApiHandlers := NewEventsHandlersCollection(mc, nil, nil)
router := httprouter.New()
router.POST("/events", catalystApiHandlers.Events())

Expand All @@ -74,3 +75,65 @@ func TestEventHandler(t *testing.T) {
require.Equal(rr.Result().StatusCode, tt.wantHttpCode)
}
}

func TestReceiveUserEventHandler(t *testing.T) {
require := require.New(t)
playbackId := "123456789"

tests := []struct {
name string
requestBody string
functionCalled string
}{
{
name: "Refresh Stream",
requestBody: `{
"resource": "stream",
"playback_id": "123456789"
}`,
functionCalled: "RefreshStreamIfNeeded",
},
{
name: "Nuke Stream",
requestBody: `{
"resource": "nuke",
"playback_id": "123456789"
}`,
functionCalled: "NukeStream",
},
{
name: "Stop Sessions",
requestBody: `{
"resource": "stopSessions",
"playback_id": "123456789"
}`,
functionCalled: "StopSessions",
},
}

ctrl := gomock.NewController(t)
mac := mock_mistapiconnector.NewMockIMac(ctrl)

catalystApiHandlers := NewEventsHandlersCollection(nil, mac, nil)
router := httprouter.New()
router.POST("/receiveUserEvent", catalystApiHandlers.ReceiveUserEvent())

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
switch tt.functionCalled {
case "RefreshStreamIfNeeded":
mac.EXPECT().RefreshStreamIfNeeded(playbackId).Times(1)
case "NukeStream":
mac.EXPECT().NukeStream(playbackId).Times(1)
case "StopSessions":
mac.EXPECT().StopSessions(playbackId).Times(1)
}

req, _ := http.NewRequest("POST", "/receiveUserEvent", strings.NewReader(tt.requestBody))
rr := httptest.NewRecorder()
router.ServeHTTP(rr, req)

require.Equal(rr.Result().StatusCode, 200)
})
}
}
25 changes: 20 additions & 5 deletions handlers/geolocation/geolocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,14 @@ func (l *streamPullRateLimit) mark(playbackID string) {

type GeolocationHandlersCollection struct {
Balancer balancer.Balancer
Cluster cluster.Cluster
Config config.Cli
Lapi *api.Client
streamPullRateLimit *streamPullRateLimit
}

func NewGeolocationHandlersCollection(balancer balancer.Balancer, cluster cluster.Cluster, config config.Cli, lapi *api.Client) *GeolocationHandlersCollection {
func NewGeolocationHandlersCollection(balancer balancer.Balancer, config config.Cli, lapi *api.Client) *GeolocationHandlersCollection {
return &GeolocationHandlersCollection{
Balancer: balancer,
Cluster: cluster,
Config: config,
Lapi: lapi,
streamPullRateLimit: newStreamPullRateLimit(streamSourceRetryInterval),
Expand Down Expand Up @@ -244,7 +242,7 @@ func (c *GeolocationHandlersCollection) resolveNodeURL(streamURL string) (string
}

func (c *GeolocationHandlersCollection) clusterMember(filter map[string]string, status, name string) (cluster.Member, error) {
members, err := c.Cluster.MembersFiltered(filter, "", name)
members, err := c.membersFiltered(filter, "", name)
if err != nil {
return cluster.Member{}, err
}
Expand Down Expand Up @@ -404,7 +402,7 @@ func (c *GeolocationHandlersCollection) getStreamPull(playbackID string, retryCo
}

func (c *GeolocationHandlersCollection) sendPlaybackRequestAsync(playbackID string, region string) {
members, err := c.Cluster.MembersFiltered(map[string]string{"region": region}, "", "")
members, err := c.membersFiltered(map[string]string{"region": region}, "", "")
if err != nil || len(members) == 0 {
glog.Errorf("Error fetching member list: %v", err)
return
Expand All @@ -422,6 +420,23 @@ func (c *GeolocationHandlersCollection) sendPlaybackRequestAsync(playbackID stri
}()
}

func (c *GeolocationHandlersCollection) membersFiltered(filter map[string]string, status, name string) ([]cluster.Member, error) {
resp, err := http.Get(c.Config.SerfMembersEndpoint)
if err != nil {
return []cluster.Member{}, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return []cluster.Member{}, fmt.Errorf("failed to get members: %s", resp.Status)
}
var members []cluster.Member
if err := json.NewDecoder(resp.Body).Decode(&members); err != nil {
return []cluster.Member{}, err
}

return cluster.FilterMembers(members, filter, status, name)
}

func parsePlus(plusString string) (string, string) {
slice := strings.Split(plusString, "+")
prefix := ""
Expand Down
Loading

0 comments on commit fcff141

Please sign in to comment.