Skip to content

Commit

Permalink
Revert "Remove ContentSteering logic to just keep better Cdn prioriza…
Browse files Browse the repository at this point in the history
…tion for now"

This reverts commit 167c5ec.
  • Loading branch information
peaBerberian committed Oct 28, 2022
1 parent 56c206f commit 60a9f65
Show file tree
Hide file tree
Showing 38 changed files with 1,225 additions and 34 deletions.
90 changes: 90 additions & 0 deletions src/Content_Steering.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# Content Steering implementation

__LAST UPDATE: 2022-08-04__

## Overview

Content steering is a mechanism allowing a content provider to deterministically
prioritize a, or multiple, CDN over others - even during content playback - on
the server-side when multiple CDNs are available to load a given content.

For example, a distributor may want to rebalance load between multiple servers
while final users are watching the corresponding stream, though many other use
cases and reasons exist.

As of now, content steering only exist for HLS and DASH OTT streaming
technologies.
In both cases it takes the form of a separate file, in DASH called the "DASH
Content Steering Manifest" (or DCSM), giving the current priority.
This separate file has its own syntax, semantic and refreshing logic.


## Architecture in the RxPlayer


```
/parsers/SteeringManifest
+----------------------------------+
| Content Steering Manifest parser | Parse DCSM[1] into a
+----------------------------------+ transport-agnostic steering
^ Manifest structure
|
| Uses when parsing
|
|
| /transports
+---------------------------+
| Transport |
| |
| new functions: |
| - loadSteeringManifest | Construct DCSM[1]'s URL, performs
| - parseSteeringManifest | requests and parses it.
+---------------------------+
^
|
| Relies on
|
|
| /core/fetchers/steering_manifest
+-------------------------+
| SteeringManifestFetcher | Fetches and parses a Content Steering
+-------------------------+ Manifest in a transport-agnostic way
^ + handle retries and error formatting
|
| Uses an instance of to load, parse and refresh the
| Steering Manifest periodically according to its TTL[2]
|
|
| /core/fetchers/cdn_prioritizer.ts
+----------------+ Signals the priority between multiple
| CdnPrioritizer | potential CDNs for each resource.
+----------------+ (This is done on demand, the `CdnPrioritizer`
^ knows of no resource in advance).
|
| Asks to sort a segment's available base urls by order of
| priority (and to filter out those that should not be
| used).
| Also signals when it should prevent a base url from
| being used temporarily (e.g. due to request issues).
|
|
| /core/fetchers/segment
+----------------+
| SegmentFetcher | Fetches and parses a segment in a
+----------------+ transport-agnostic way
^ + handle retries and error formatting
|
| Ask to load segment(s)
|
| /core/stream/representation
+----------------+
| Representation | Logic behind finding the right segment to
| Stream | load, loading it and pushing it to the buffer.
+----------------+ One RepresentationStream is created per
actively-loaded Period and one per
actively-loaded buffer type.
[1] DCSM: DASH Content Steering Manifest
[2] TTL: Time To Live: a delay after which a Content Steering Manifest should be refreshed
```
226 changes: 217 additions & 9 deletions src/core/fetchers/cdn_prioritizer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,39 @@
*/

import config from "../../config";
import { ICdnMetadata } from "../../parsers/manifest";
import { formatError } from "../../errors";
import log from "../../log";
import Manifest from "../../manifest";
import {
ICdnMetadata,
IContentSteeringMetadata,
} from "../../parsers/manifest";
import { ISteeringManifest } from "../../parsers/SteeringManifest";
import { IPlayerError } from "../../public_types";
import { ITransportPipelines } from "../../transports";
import arrayFindIndex from "../../utils/array_find_index";
import arrayIncludes from "../../utils/array_includes";
import EventEmitter from "../../utils/event_emitter";
import { CancellationSignal } from "../../utils/task_canceller";
import createSharedReference, {
ISharedReference,
} from "../../utils/reference";
import SyncOrAsync, {
ISyncOrAsyncValue,
} from "../../utils/sync_or_async";
import TaskCanceller, {
CancellationError,
CancellationSignal,
} from "../../utils/task_canceller";
import SteeringManifestFetcher from "./steering_manifest";

/**
* Class signaling the priority between multiple CDN available for any given
* resource.
*
* It might rely behind the hood on a fetched document giving priorities such as
* a Content Steering Manifest and also on issues that appeared with some given
* CDN in the [close] past.
*
* This class might perform requests and schedule timeouts by itself to keep its
* internal list of CDN priority up-to-date.
* When it is not needed anymore, you should call the `dispose` method to clear
Expand All @@ -33,6 +56,16 @@ import { CancellationSignal } from "../../utils/task_canceller";
* @class CdnPrioritizer
*/
export default class CdnPrioritizer extends EventEmitter<ICdnPrioritizerEvents> {
/**
* Metadata parsed from the last Content Steering Manifest loaded.
*
* `null` either if there's no such Manifest or if it is currently being
* loaded for the first time.
*/
private _lastSteeringManifest : ISteeringManifest | null;

private _defaultCdnId : string | undefined;

/**
* Structure keeping a list of CDN currently downgraded.
* Downgraded CDN immediately have a lower priority than any non-downgraded
Expand All @@ -57,12 +90,90 @@ export default class CdnPrioritizer extends EventEmitter<ICdnPrioritizerEvents>
};

/**
* TaskCanceller allowing to abort the process of loading and refreshing the
* Content Steering Manifest.
* Set to `null` when no such process is pending.
*/
private _steeringManifestUpdateCanceller : TaskCanceller | null;

private _readyState : ISharedReference<"not-ready" | "ready" | "disposed">;

/**
* @param {Object} manifest
* @param {Object} transport
* @param {Object} destroySignal
*/
constructor(destroySignal : CancellationSignal) {
constructor(
manifest : Manifest,
transport : ITransportPipelines,
destroySignal : CancellationSignal
) {
super();
this._lastSteeringManifest = null;
this._downgradedCdnList = { metadata: [], timeouts: [] };
this._steeringManifestUpdateCanceller = null;
this._defaultCdnId = manifest.contentSteering?.defaultId;

const steeringManifestFetcher = transport.steeringManifest === null ?
null :
new SteeringManifestFetcher(transport.steeringManifest,
{ maxRetryOffline: undefined,
maxRetryRegular: undefined });

let currentContentSteering = manifest.contentSteering;

manifest.addEventListener("manifestUpdate", () => {
const prevContentSteering = currentContentSteering;
currentContentSteering = manifest.contentSteering;
if (prevContentSteering === null) {
if (currentContentSteering !== null) {
if (steeringManifestFetcher === null) {
log.warn("CP: Steering manifest declared but no way to fetch it");
} else {
log.info("CP: A Steering Manifest is declared in a new Manifest");
this._autoRefreshSteeringManifest(steeringManifestFetcher,
currentContentSteering);
}
}
} else if (currentContentSteering === null) {
log.info("CP: A Steering Manifest is removed in a new Manifest");
this._steeringManifestUpdateCanceller?.cancel();
this._steeringManifestUpdateCanceller = null;
} else if (prevContentSteering.url !== currentContentSteering.url ||
prevContentSteering.proxyUrl !== currentContentSteering.proxyUrl)
{
log.info("CP: A Steering Manifest's information changed in a new Manifest");
this._steeringManifestUpdateCanceller?.cancel();
this._steeringManifestUpdateCanceller = null;
if (steeringManifestFetcher === null) {
log.warn("CP: Steering manifest changed but no way to fetch it");
} else {
this._autoRefreshSteeringManifest(steeringManifestFetcher,
currentContentSteering);
}
}
}, destroySignal);

if (manifest.contentSteering !== null) {
if (steeringManifestFetcher === null) {
log.warn("CP: Steering Manifest initially present but no way to fetch it.");
this._readyState = createSharedReference("ready");
} else {
const readyState = manifest.contentSteering.queryBeforeStart ? "not-ready" :
"ready";
this._readyState = createSharedReference(readyState);
this._autoRefreshSteeringManifest(steeringManifestFetcher,
manifest.contentSteering);
}
} else {
this._readyState = createSharedReference("ready");
}
destroySignal.register(() => {
this._readyState.setValue("disposed");
this._readyState.finish();
this._steeringManifestUpdateCanceller?.cancel();
this._steeringManifestUpdateCanceller = null;
this._lastSteeringManifest = null;
for (const timeout of this._downgradedCdnList.timeouts) {
clearTimeout(timeout);
}
Expand All @@ -84,20 +195,38 @@ export default class CdnPrioritizer extends EventEmitter<ICdnPrioritizerEvents>
* @param {Array.<string>} everyCdnForResource - Array of ALL available CDN
* able to reach the wanted resource - even those which might not be used in
* the end.
* @returns {Array.<Object>} - Array of CDN that can be tried to reach the
* @returns {Object} - Array of CDN that can be tried to reach the
* resource, sorted by order of CDN preference, according to the
* `CdnPrioritizer`'s own list of priorities.
*
* This value is wrapped in a `ISyncOrAsyncValue` as in relatively rare
* scenarios, the order can only be known once the steering Manifest has been
* fetched.
*/
public getCdnPreferenceForResource(
everyCdnForResource : ICdnMetadata[]
) : ICdnMetadata[] {
) : ISyncOrAsyncValue<ICdnMetadata[]> {
if (everyCdnForResource.length <= 1) {
// The huge majority of contents have only one CDN available.
// Here, prioritizing make no sense.
return everyCdnForResource;
return SyncOrAsync.createSync(everyCdnForResource);
}

return this._innerGetCdnPreferenceForResource(everyCdnForResource);
if (this._readyState.getValue() === "not-ready") {
const val = new Promise<ICdnMetadata[]>((res, rej) => {
this._readyState.onUpdate((readyState) => {
if (readyState === "ready") {
res(this._innerGetCdnPreferenceForResource(everyCdnForResource));
} else if (readyState === "disposed") {
rej(new CancellationError());
}
});
});
return SyncOrAsync.createAsync(val);
}
return SyncOrAsync.createSync(
this._innerGetCdnPreferenceForResource(everyCdnForResource)
);
}

/**
Expand All @@ -116,7 +245,8 @@ export default class CdnPrioritizer extends EventEmitter<ICdnPrioritizerEvents>
}

const { DEFAULT_CDN_DOWNGRADE_TIME } = config.getCurrent();
const downgradeTime = DEFAULT_CDN_DOWNGRADE_TIME;
const downgradeTime = this._lastSteeringManifest?.lifetime ??
DEFAULT_CDN_DOWNGRADE_TIME;
this._downgradedCdnList.metadata.push(metadata);
const timeout = window.setTimeout(() => {
const newIndex = indexOfMetadata(this._downgradedCdnList.metadata, metadata);
Expand Down Expand Up @@ -150,7 +280,33 @@ export default class CdnPrioritizer extends EventEmitter<ICdnPrioritizerEvents>
private _innerGetCdnPreferenceForResource(
everyCdnForResource : ICdnMetadata[]
) : ICdnMetadata[] {
const [allowedInOrder, downgradedInOrder] = everyCdnForResource
let cdnBase;
if (this._lastSteeringManifest !== null) {
const priorities = this._lastSteeringManifest.priorities;
const inSteeringManifest = everyCdnForResource.filter(available =>
available.id !== undefined && arrayIncludes(priorities, available.id));
if (inSteeringManifest.length > 0) {
cdnBase = inSteeringManifest;
}
}

// (If using the SteeringManifest gave nothing, or if it just didn't exist.) */
if (cdnBase === undefined) {
// (If a default CDN was indicated, try to use it) */
if (this._defaultCdnId !== undefined) {
const indexOf = arrayFindIndex(everyCdnForResource, (x) =>
x.id !== undefined && x.id === this._defaultCdnId);
if (indexOf >= 0) {
const elem = everyCdnForResource.splice(indexOf, 1)[0];
everyCdnForResource.unshift(elem);
}
}

if (cdnBase === undefined) {
cdnBase = everyCdnForResource;
}
}
const [allowedInOrder, downgradedInOrder] = cdnBase
.reduce((acc : [ICdnMetadata[], ICdnMetadata[]], elt : ICdnMetadata) => {
if (this._downgradedCdnList.metadata.some(c => c.id === elt.id &&
c.baseUrl === elt.baseUrl))
Expand All @@ -164,6 +320,58 @@ export default class CdnPrioritizer extends EventEmitter<ICdnPrioritizerEvents>
return allowedInOrder.concat(downgradedInOrder);
}

private _autoRefreshSteeringManifest(
steeringManifestFetcher : SteeringManifestFetcher,
contentSteering : IContentSteeringMetadata
) {
if (this._steeringManifestUpdateCanceller === null) {
const steeringManifestUpdateCanceller = new TaskCanceller();
this._steeringManifestUpdateCanceller = steeringManifestUpdateCanceller;
}
const canceller : TaskCanceller = this._steeringManifestUpdateCanceller;
steeringManifestFetcher.fetch(contentSteering.url,
(err : IPlayerError) => this.trigger("warnings", [err]),
canceller.signal)
.then((parse) => {
const parsed = parse((errs) => this.trigger("warnings", errs));
const prevSteeringManifest = this._lastSteeringManifest;
this._lastSteeringManifest = parsed;
if (parsed.lifetime > 0) {
const timeout = window.setTimeout(() => {
canceller.signal.deregister(onTimeoutEnd);
this._autoRefreshSteeringManifest(steeringManifestFetcher, contentSteering);
}, parsed.lifetime * 1000);
const onTimeoutEnd = () => {
clearTimeout(timeout);
};
canceller.signal.register(onTimeoutEnd);
}
if (this._readyState.getValue() === "not-ready") {
this._readyState.setValue("ready");
}
if (canceller.isUsed) {
return;
}
if (prevSteeringManifest === null ||
prevSteeringManifest.priorities.length !== parsed.priorities.length ||
prevSteeringManifest.priorities
.some((val, idx) => val !== parsed.priorities[idx]))
{
this.trigger("priorityChange", null);
}
})
.catch((err) => {
if (err instanceof CancellationError) {
return;
}
const formattedError = formatError(err, {
defaultCode: "NONE",
defaultReason: "Unknown error when fetching and parsing the steering Manifest",
});
this.trigger("warnings", [formattedError]);
});
}

/**
* @param {number} index
*/
Expand Down
Loading

0 comments on commit 60a9f65

Please sign in to comment.