Skip to content

Commit

Permalink
mapic: Make it more kappa style
Browse files Browse the repository at this point in the history
  • Loading branch information
victorges committed Oct 16, 2023
1 parent cf4b2fe commit 684533c
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 38 deletions.
9 changes: 9 additions & 0 deletions clients/mist_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type MistAPIClient interface {
PushAutoRemove(streamParams []interface{}) error
PushStop(id int64) error
DeleteStream(streamName string) error
NukeStream(streamName string) error
AddTrigger(streamName []string, triggerName string, sync bool) error
DeleteTrigger(streamName []string, triggerName string) error
GetStreamInfo(streamName string) (MistStreamInfo, error)
Expand Down Expand Up @@ -253,6 +254,14 @@ func (mc *MistClient) DeleteStream(streamName string) error {
return nil
}

func (mc *MistClient) NukeStream(streamName string) error {
c := commandNukeStream(streamName)
if err := validateNukeStream(mc.sendCommand(c)); err != nil {
return err
}
return nil

Check warning on line 262 in clients/mist_client.go

View check run for this annotation

Codecov / codecov/patch

clients/mist_client.go#L257-L262

Added lines #L257 - L262 were not covered by tests
}

// AddTrigger adds a trigger `triggerName` for the stream `streamName`.
// Note that Mist API supports only overriding the whole trigger configuration, therefore this function needs to:
// 1. Acquire a lock
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ require (
github.com/hashicorp/serf v0.10.1
github.com/julienschmidt/httprouter v1.3.0
github.com/lib/pq v1.10.9
github.com/livepeer/go-api-client v0.4.9
github.com/livepeer/go-api-client v0.4.10-0.20231016161852-adc198420ea1
github.com/livepeer/go-tools v0.3.2
github.com/livepeer/joy4 v0.1.1
github.com/livepeer/livepeer-data v0.7.4
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -415,8 +415,8 @@ github.com/libp2p/go-msgio v0.2.0 h1:W6shmB+FeynDrUVl2dgFQvzfBZcXiyqY4VmpQLu9FqU
github.com/libp2p/go-nat v0.1.0 h1:MfVsH6DLcpa04Xr+p8hmVRG4juse0s3J8HyNWYHffXg=
github.com/libp2p/go-netroute v0.2.0 h1:0FpsbsvuSnAhXFnCY0VLFbJOzaK0VnP0r1QT/o4nWRE=
github.com/libp2p/go-openssl v0.1.0 h1:LBkKEcUv6vtZIQLVTegAil8jbNpJErQ9AnT+bWV+Ooo=
github.com/livepeer/go-api-client v0.4.9 h1:3unGUWHtRuye9lb1wc+5C4+OJxsaTnRNvEGbBDycUM0=
github.com/livepeer/go-api-client v0.4.9/go.mod h1:Jdb+RI7JyzEZOHd1GUuKofwFDKMO/btTa80SdpUpYQw=
github.com/livepeer/go-api-client v0.4.10-0.20231016161852-adc198420ea1 h1:37vDY3affAfGfWc25IiNij7TVbRRnqVC53w1qQ0wkII=
github.com/livepeer/go-api-client v0.4.10-0.20231016161852-adc198420ea1/go.mod h1:Jdb+RI7JyzEZOHd1GUuKofwFDKMO/btTa80SdpUpYQw=
github.com/livepeer/go-tools v0.3.2 h1:5pOUrOmkkGbbcWnpCt2yrSD6cD85G4GcpO4B25NpMJM=
github.com/livepeer/go-tools v0.3.2/go.mod h1:qs31y68b3PQPmSr8nR8l5WQiIWI623z6pqOccqebjos=
github.com/livepeer/joy4 v0.1.1 h1:Tz7gVcmvpG/nfUKHU+XJn6Qke/k32mTWMiH9qB0bhnM=
Expand Down
5 changes: 1 addition & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,10 +334,7 @@ func processClusterEvent(mapic mistapiconnector.IMac, e serf.UserEvent) {
}
switch eventPayload.Resource {
case "stream":
mapic.RefreshMultistreamIfNeeded(eventPayload.PlaybackID)
return
case "nuke":
mapic.NukeStream(eventPayload.PlaybackID)
mapic.RefreshStreamIfNeeded(eventPayload.PlaybackID)
return
default:
glog.Errorf("unsupported serf event: %v", e)
Expand Down
101 changes: 70 additions & 31 deletions mapic/mistapiconnector_app.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ type (
IMac interface {
Start(ctx context.Context) error
MetricsHandler() http.Handler
RefreshMultistreamIfNeeded(playbackID string)
NukeStream(playbackID string)
RefreshStreamIfNeeded(playbackID string)
}

pushStatus struct {
Expand Down Expand Up @@ -101,7 +100,7 @@ type (
config *config.Cli
broker misttriggers.TriggerBroker
mist clients.MistAPIClient
multistreamUpdated chan struct{}
streamUpdated chan struct{}
metricsCollector *metricsCollector
}
)
Expand Down Expand Up @@ -154,7 +153,7 @@ func (mc *mac) Start(ctx context.Context) error {
mc.metricsCollector = createMetricsCollector(mc.nodeID, mc.ownRegion, mc.mist, lapi, producer, ownExchangeName, mc)
}

mc.multistreamUpdated = make(chan struct{}, 1)
mc.streamUpdated = make(chan struct{}, 1)

Check warning on line 156 in mapic/mistapiconnector_app.go

View check run for this annotation

Codecov / codecov/patch

mapic/mistapiconnector_app.go#L156

Added line #L156 was not covered by tests
go func() {
mc.reconcileLoop(ctx)
}()
Expand All @@ -167,28 +166,21 @@ func (mc *mac) MetricsHandler() http.Handler {
return metrics.Exporter
}

func (mc *mac) RefreshMultistreamIfNeeded(playbackID string) {
if mc.streamExists(playbackID) {
mc.refreshMultistream(playbackID)
}
}

func (mc *mac) NukeStream(playbackID string) {
// ignore the base name from the input, in case someone happens to send one in the playbackId
if strings.Contains(playbackID, "+") {
playbackID = strings.Split(playbackID, "+")[1]
func (mc *mac) RefreshStreamIfNeeded(playbackID string) {
if !mc.streamExists(playbackID) {
// Ignore streams that aren't already in memory. This is to avoid a surge of
// requests to the API on any event. For streams not synced to mapic memory,
// it will be reconciled on the next loop anyway (30s) and get fixed soon.
return
}

streamNames := []string{
mc.wildcardPlaybackID(&api.Stream{PlaybackID: playbackID}), // not recorded
mc.wildcardPlaybackID(&api.Stream{PlaybackID: playbackID, Record: true}), // recorded
si, err := mc.refreshStream(playbackID)
if err != nil {
glog.Errorf("Error refreshing stream playbackID=%s", playbackID)
return

Check warning on line 179 in mapic/mistapiconnector_app.go

View check run for this annotation

Codecov / codecov/patch

mapic/mistapiconnector_app.go#L169-L179

Added lines #L169 - L179 were not covered by tests
}

for _, streamName := range streamNames {
if err := mc.mist.DeleteStream(streamName); err != nil {
glog.Errorf("Error nuking stream playbackId=%s streamName=%s err=%q", streamName, playbackID, err)
}
}
// trigger an immediate stream reconcile to already nuke it if needed
mc.reconcileSingleStream(si)

Check warning on line 183 in mapic/mistapiconnector_app.go

View check run for this annotation

Codecov / codecov/patch

mapic/mistapiconnector_app.go#L183

Added line #L183 was not covered by tests
}

func (mc *mac) handleStreamBuffer(ctx context.Context, payload *misttriggers.StreamBufferPayload) error {
Expand Down Expand Up @@ -288,7 +280,7 @@ func (mc *mac) handleLiveTrackList(ctx context.Context, payload *misttriggers.Li
videoTracksNum := payload.CountVideoTracks()
playbackID := mistStreamName2playbackID(payload.StreamName)
glog.Infof("for video %s got %d video tracks", playbackID, videoTracksNum)
mc.refreshMultistream(playbackID)
mc.refreshStream(playbackID)

Check warning on line 283 in mapic/mistapiconnector_app.go

View check run for this annotation

Codecov / codecov/patch

mapic/mistapiconnector_app.go#L283

Added line #L283 was not covered by tests
}()
return nil
}
Expand All @@ -300,18 +292,21 @@ func (mc *mac) streamExists(playbackID string) bool {
return streamExists
}

func (mc *mac) refreshMultistream(playbackID string) {
_, err := mc.refreshStreamInfo(playbackID)
func (mc *mac) refreshStream(playbackID string) (*streamInfo, error) {
si, err := mc.refreshStreamInfo(playbackID)

Check warning on line 296 in mapic/mistapiconnector_app.go

View check run for this annotation

Codecov / codecov/patch

mapic/mistapiconnector_app.go#L295-L296

Added lines #L295 - L296 were not covered by tests
if err != nil {
glog.Errorf("Error refreshing stream info for playbackID=%s", playbackID)
return
return nil, err

Check warning on line 299 in mapic/mistapiconnector_app.go

View check run for this annotation

Codecov / codecov/patch

mapic/mistapiconnector_app.go#L299

Added line #L299 was not covered by tests
}

select {
case mc.multistreamUpdated <- struct{}{}:
// trigger reconcile multistream
case mc.streamUpdated <- struct{}{}:

Check warning on line 303 in mapic/mistapiconnector_app.go

View check run for this annotation

Codecov / codecov/patch

mapic/mistapiconnector_app.go#L303

Added line #L303 was not covered by tests
// trigger reconcile loop
default:
// do not block if reconcile multistream already triggered
}

return si, nil

Check warning on line 309 in mapic/mistapiconnector_app.go

View check run for this annotation

Codecov / codecov/patch

mapic/mistapiconnector_app.go#L309

Added line #L309 was not covered by tests
}

func (mc *mac) handlePushOutStart(ctx context.Context, payload *misttriggers.PushOutStartPayload) (string, error) {
Expand Down Expand Up @@ -525,18 +520,62 @@ func (mc *mac) reconcileLoop(ctx context.Context) {
case <-ctx.Done():
return
case <-ticker.C:
case <-mc.multistreamUpdated:
case <-mc.streamUpdated:

Check warning on line 523 in mapic/mistapiconnector_app.go

View check run for this annotation

Codecov / codecov/patch

mapic/mistapiconnector_app.go#L523

Added line #L523 was not covered by tests
}
mistState, err := mc.mist.GetState()
if err != nil {
glog.Errorf("error executing query on Mist, cannot reconcile err=%v", err)
continue
}
mc.reconcileStreams(mistState)

Check warning on line 530 in mapic/mistapiconnector_app.go

View check run for this annotation

Codecov / codecov/patch

mapic/mistapiconnector_app.go#L530

Added line #L530 was not covered by tests
mc.reconcileMultistream(mistState)
mc.processStats(mistState)
}
}

func (mc *mac) reconcileStreams(mistState clients.MistState) {
for streamName, _ := range mistState.ActiveStreams {
// read map directly to avoid creating streamInfo object even for playback-only (pull) streams
mc.mu.RLock()
si := mc.streamInfo[mistStreamName2playbackID(streamName)]
mc.mu.RUnlock()

if isIngestStream(streamName, si, mistState) {
if si == nil {
var err error
si, err = mc.refreshStreamInfo(streamName)
if err != nil {
glog.Errorf("error refreshing stream info streamName=%s err=%v", streamName, err)
continue

Check warning on line 549 in mapic/mistapiconnector_app.go

View check run for this annotation

Codecov / codecov/patch

mapic/mistapiconnector_app.go#L536-L549

Added lines #L536 - L549 were not covered by tests
}
}

mc.reconcileSingleStream(si)

Check warning on line 553 in mapic/mistapiconnector_app.go

View check run for this annotation

Codecov / codecov/patch

mapic/mistapiconnector_app.go#L553

Added line #L553 was not covered by tests
}
}
}

func (mc *mac) reconcileSingleStream(si *streamInfo) {
shouldNuke := si.stream.Deleted || si.stream.Suspended
if !shouldNuke {
// the only thing we do here is nuke
return
}

Check warning on line 563 in mapic/mistapiconnector_app.go

View check run for this annotation

Codecov / codecov/patch

mapic/mistapiconnector_app.go#L558-L563

Added lines #L558 - L563 were not covered by tests

streamNames := []string{mc.wildcardPlaybackID(si.stream)}
// make sure we nuke any possible stream names on mist to account for any inconsistencies
copy := *si.stream
copy.Record = !copy.Record
streamNames = append(streamNames, mc.wildcardPlaybackID(&copy))

for _, streamName := range streamNames {
err := mc.mist.NukeStream(streamName)
if err != nil {
glog.Errorf("error nuking stream playbackId=%s streamName=%s err=%q", si.stream.PlaybackID, streamName, err)
}

Check warning on line 575 in mapic/mistapiconnector_app.go

View check run for this annotation

Codecov / codecov/patch

mapic/mistapiconnector_app.go#L565-L575

Added lines #L565 - L575 were not covered by tests
}
}

// reconcileMultistream makes sure that Mist contains the multistream pushes exactly as specified in streamInfo cache.
// There may be multiple reasons why Mist is not in sync with streamInfo cache:
// - streamInfo cache has changed (multistream target was turned on/off or multistream target was added/removed)
Expand Down Expand Up @@ -790,5 +829,5 @@ func pushToMultistreamTargetInfo(pushInfo *pushStatus) data.MultistreamTargetInf
// - MistState: active_streams have source "push://" for ingest streams; checking this condition only is not good
// enough, because a freshly started stream may not be yet visible in Mist (though it's already started).
func isIngestStream(stream string, si *streamInfo, mistState clients.MistState) bool {
return !si.isLazy || mistState.IsIngestStream(stream)
return (si != nil && !si.isLazy) || mistState.IsIngestStream(stream)
}

0 comments on commit 684533c

Please sign in to comment.