diff --git a/clients/mist_client.go b/clients/mist_client.go index 809cc3590..0f9cdb9b2 100644 --- a/clients/mist_client.go +++ b/clients/mist_client.go @@ -23,6 +23,7 @@ type MistAPIClient interface { PushAutoRemove(streamParams []interface{}) error PushStop(id int64) error DeleteStream(streamName string) error + NukeStream(streamName string) error AddTrigger(streamName []string, triggerName string, sync bool) error DeleteTrigger(streamName []string, triggerName string) error GetStreamInfo(streamName string) (MistStreamInfo, error) @@ -253,6 +254,14 @@ func (mc *MistClient) DeleteStream(streamName string) error { return nil } +func (mc *MistClient) NukeStream(streamName string) error { + c := commandNukeStream(streamName) + if err := validateNukeStream(mc.sendCommand(c)); err != nil { + return err + } + return nil +} + // AddTrigger adds a trigger `triggerName` for the stream `streamName`. // Note that Mist API supports only overriding the whole trigger configuration, therefore this function needs to: // 1. Acquire a lock diff --git a/go.mod b/go.mod index a6adb2583..feeafdbc7 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/hashicorp/serf v0.10.1 github.com/julienschmidt/httprouter v1.3.0 github.com/lib/pq v1.10.9 - github.com/livepeer/go-api-client v0.4.9 + github.com/livepeer/go-api-client v0.4.10-0.20231016161852-adc198420ea1 github.com/livepeer/go-tools v0.3.2 github.com/livepeer/joy4 v0.1.1 github.com/livepeer/livepeer-data v0.7.4 diff --git a/go.sum b/go.sum index 4931482dc..5bb9e1fed 100644 --- a/go.sum +++ b/go.sum @@ -415,8 +415,8 @@ github.com/libp2p/go-msgio v0.2.0 h1:W6shmB+FeynDrUVl2dgFQvzfBZcXiyqY4VmpQLu9FqU github.com/libp2p/go-nat v0.1.0 h1:MfVsH6DLcpa04Xr+p8hmVRG4juse0s3J8HyNWYHffXg= github.com/libp2p/go-netroute v0.2.0 h1:0FpsbsvuSnAhXFnCY0VLFbJOzaK0VnP0r1QT/o4nWRE= github.com/libp2p/go-openssl v0.1.0 h1:LBkKEcUv6vtZIQLVTegAil8jbNpJErQ9AnT+bWV+Ooo= -github.com/livepeer/go-api-client v0.4.9 h1:3unGUWHtRuye9lb1wc+5C4+OJxsaTnRNvEGbBDycUM0= -github.com/livepeer/go-api-client v0.4.9/go.mod h1:Jdb+RI7JyzEZOHd1GUuKofwFDKMO/btTa80SdpUpYQw= +github.com/livepeer/go-api-client v0.4.10-0.20231016161852-adc198420ea1 h1:37vDY3affAfGfWc25IiNij7TVbRRnqVC53w1qQ0wkII= +github.com/livepeer/go-api-client v0.4.10-0.20231016161852-adc198420ea1/go.mod h1:Jdb+RI7JyzEZOHd1GUuKofwFDKMO/btTa80SdpUpYQw= github.com/livepeer/go-tools v0.3.2 h1:5pOUrOmkkGbbcWnpCt2yrSD6cD85G4GcpO4B25NpMJM= github.com/livepeer/go-tools v0.3.2/go.mod h1:qs31y68b3PQPmSr8nR8l5WQiIWI623z6pqOccqebjos= github.com/livepeer/joy4 v0.1.1 h1:Tz7gVcmvpG/nfUKHU+XJn6Qke/k32mTWMiH9qB0bhnM= diff --git a/main.go b/main.go index dbf7ae091..1b8c98f9a 100644 --- a/main.go +++ b/main.go @@ -334,10 +334,7 @@ func processClusterEvent(mapic mistapiconnector.IMac, e serf.UserEvent) { } switch eventPayload.Resource { case "stream": - mapic.RefreshMultistreamIfNeeded(eventPayload.PlaybackID) - return - case "nuke": - mapic.NukeStream(eventPayload.PlaybackID) + mapic.RefreshStreamIfNeeded(eventPayload.PlaybackID) return default: glog.Errorf("unsupported serf event: %v", e) diff --git a/mapic/mistapiconnector_app.go b/mapic/mistapiconnector_app.go index 88f68f14f..99898a36b 100644 --- a/mapic/mistapiconnector_app.go +++ b/mapic/mistapiconnector_app.go @@ -42,8 +42,7 @@ type ( IMac interface { Start(ctx context.Context) error MetricsHandler() http.Handler - RefreshMultistreamIfNeeded(playbackID string) - NukeStream(playbackID string) + RefreshStreamIfNeeded(playbackID string) } pushStatus struct { @@ -101,7 +100,7 @@ type ( config *config.Cli broker misttriggers.TriggerBroker mist clients.MistAPIClient - multistreamUpdated chan struct{} + streamUpdated chan struct{} metricsCollector *metricsCollector } ) @@ -154,7 +153,7 @@ func (mc *mac) Start(ctx context.Context) error { mc.metricsCollector = createMetricsCollector(mc.nodeID, mc.ownRegion, mc.mist, lapi, producer, ownExchangeName, mc) } - mc.multistreamUpdated = make(chan struct{}, 1) + mc.streamUpdated = make(chan struct{}, 1) go func() { mc.reconcileLoop(ctx) }() @@ -167,28 +166,21 @@ func (mc *mac) MetricsHandler() http.Handler { return metrics.Exporter } -func (mc *mac) RefreshMultistreamIfNeeded(playbackID string) { - if mc.streamExists(playbackID) { - mc.refreshMultistream(playbackID) - } -} - -func (mc *mac) NukeStream(playbackID string) { - // ignore the base name from the input, in case someone happens to send one in the playbackId - if strings.Contains(playbackID, "+") { - playbackID = strings.Split(playbackID, "+")[1] +func (mc *mac) RefreshStreamIfNeeded(playbackID string) { + if !mc.streamExists(playbackID) { + // Ignore streams that aren't already in memory. This is to avoid a surge of + // requests to the API on any event. For streams not synced to mapic memory, + // it will be reconciled on the next loop anyway (30s) and get fixed soon. + return } - - streamNames := []string{ - mc.wildcardPlaybackID(&api.Stream{PlaybackID: playbackID}), // not recorded - mc.wildcardPlaybackID(&api.Stream{PlaybackID: playbackID, Record: true}), // recorded + si, err := mc.refreshStream(playbackID) + if err != nil { + glog.Errorf("Error refreshing stream playbackID=%s", playbackID) + return } - for _, streamName := range streamNames { - if err := mc.mist.DeleteStream(streamName); err != nil { - glog.Errorf("Error nuking stream playbackId=%s streamName=%s err=%q", streamName, playbackID, err) - } - } + // trigger an immediate stream reconcile to already nuke it if needed + mc.reconcileSingleStream(si) } func (mc *mac) handleStreamBuffer(ctx context.Context, payload *misttriggers.StreamBufferPayload) error { @@ -288,7 +280,7 @@ func (mc *mac) handleLiveTrackList(ctx context.Context, payload *misttriggers.Li videoTracksNum := payload.CountVideoTracks() playbackID := mistStreamName2playbackID(payload.StreamName) glog.Infof("for video %s got %d video tracks", playbackID, videoTracksNum) - mc.refreshMultistream(playbackID) + mc.refreshStream(playbackID) }() return nil } @@ -300,18 +292,21 @@ func (mc *mac) streamExists(playbackID string) bool { return streamExists } -func (mc *mac) refreshMultistream(playbackID string) { - _, err := mc.refreshStreamInfo(playbackID) +func (mc *mac) refreshStream(playbackID string) (*streamInfo, error) { + si, err := mc.refreshStreamInfo(playbackID) if err != nil { glog.Errorf("Error refreshing stream info for playbackID=%s", playbackID) - return + return nil, err } + select { - case mc.multistreamUpdated <- struct{}{}: - // trigger reconcile multistream + case mc.streamUpdated <- struct{}{}: + // trigger reconcile loop default: // do not block if reconcile multistream already triggered } + + return si, nil } func (mc *mac) handlePushOutStart(ctx context.Context, payload *misttriggers.PushOutStartPayload) (string, error) { @@ -525,18 +520,62 @@ func (mc *mac) reconcileLoop(ctx context.Context) { case <-ctx.Done(): return case <-ticker.C: - case <-mc.multistreamUpdated: + case <-mc.streamUpdated: } mistState, err := mc.mist.GetState() if err != nil { glog.Errorf("error executing query on Mist, cannot reconcile err=%v", err) continue } + mc.reconcileStreams(mistState) mc.reconcileMultistream(mistState) mc.processStats(mistState) } } +func (mc *mac) reconcileStreams(mistState clients.MistState) { + for streamName, _ := range mistState.ActiveStreams { + // read map directly to avoid creating streamInfo object even for playback-only (pull) streams + mc.mu.RLock() + si := mc.streamInfo[mistStreamName2playbackID(streamName)] + mc.mu.RUnlock() + + if isIngestStream(streamName, si, mistState) { + if si == nil { + var err error + si, err = mc.refreshStreamInfo(streamName) + if err != nil { + glog.Errorf("error refreshing stream info streamName=%s err=%v", streamName, err) + continue + } + } + + mc.reconcileSingleStream(si) + } + } +} + +func (mc *mac) reconcileSingleStream(si *streamInfo) { + shouldNuke := si.stream.Deleted || si.stream.Suspended + if !shouldNuke { + // the only thing we do here is nuke + return + } + + streamNames := []string{mc.wildcardPlaybackID(si.stream)} + // make sure we nuke any possible stream names on mist to account for any inconsistencies + copy := *si.stream + copy.Record = !copy.Record + streamNames = append(streamNames, mc.wildcardPlaybackID(©)) + + for _, streamName := range streamNames { + err := mc.mist.NukeStream(streamName) + if err != nil { + glog.Errorf("error nuking stream playbackId=%s streamName=%s err=%q", si.stream.PlaybackID, streamName, err) + } + } +} + // reconcileMultistream makes sure that Mist contains the multistream pushes exactly as specified in streamInfo cache. // There may be multiple reasons why Mist is not in sync with streamInfo cache: // - streamInfo cache has changed (multistream target was turned on/off or multistream target was added/removed) @@ -790,5 +829,5 @@ func pushToMultistreamTargetInfo(pushInfo *pushStatus) data.MultistreamTargetInf // - MistState: active_streams have source "push://" for ingest streams; checking this condition only is not good // enough, because a freshly started stream may not be yet visible in Mist (though it's already started). func isIngestStream(stream string, si *streamInfo, mistState clients.MistState) bool { - return !si.isLazy || mistState.IsIngestStream(stream) + return (si != nil && !si.isLazy) || mistState.IsIngestStream(stream) }