Skip to content

Commit

Permalink
Fix issue with load stalling after abrupt position changing.
Browse files Browse the repository at this point in the history
  • Loading branch information
i-zolotarenko committed Jan 7, 2024
1 parent b2f7215 commit 5f77dbc
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 61 deletions.
8 changes: 2 additions & 6 deletions p2p-media-loader-demo/src/App.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,8 @@ function App() {
type: "customHls",
customType: {
customHls: (video: HTMLVideoElement) => {
const hls = new window.Hls({
// ...engine.getConfig(),
maxBufferLength: 20,
maxBufferSize: 0.05 * 1000000,
});
// engine.setHls(hls);
const hls = new window.Hls(engine.getConfig());
engine.setHls(hls);
hls.loadSource(video.src);
hls.attachMedia(video);
hlsInstance.current = hls;
Expand Down
78 changes: 40 additions & 38 deletions packages/p2p-media-loader-core/src/hybrid-loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ const FAILED_ATTEMPTS_CLEAR_INTERVAL = 60000;

export class HybridLoader {
private readonly requests: RequestsContainer;
private readonly engineRequests = new Map<Segment, EngineRequest>();
private engineRequest?: EngineRequest;
private readonly p2pLoaders: P2PLoadersContainer;
private readonly playback: Playback;
private readonly segmentAvgDuration: number;
Expand Down Expand Up @@ -106,7 +106,7 @@ export class HybridLoader {
);
}
} else {
this.engineRequests.set(segment, engineRequest);
this.engineRequest = engineRequest;
}
this.requestProcessQueueMicrotask();
}
Expand Down Expand Up @@ -142,7 +142,10 @@ export class HybridLoader {
const now = performance.now();
for (const request of this.requests.items()) {
const { type, status, segment, isHandledByProcessQueue } = request;
const engineRequest = this.engineRequests.get(segment);
const engineRequest =
this.engineRequest?.segment === segment
? this.engineRequest
: undefined;

switch (status) {
case "loading":
Expand All @@ -157,11 +160,13 @@ export class HybridLoader {
if (type === "http") {
this.p2pLoaders.currentLoader.broadcastAnnouncement();
}
engineRequest?.resolve(
request.data,
this.getBandwidth(loadedQueuePercent)
);
this.engineRequests.delete(segment);
if (engineRequest) {
engineRequest.resolve(
request.data,
this.getBandwidth(loadedQueuePercent)
);
this.engineRequest = undefined;
}
this.requests.remove(request);
void this.segmentStorage.storeSegment(request.segment, request.data);
this.eventHandlers?.onSegmentLoaded?.(request.data.byteLength, type);
Expand All @@ -179,7 +184,7 @@ export class HybridLoader {
engineRequest
) {
engineRequest.reject();
this.engineRequests.delete(segment);
this.engineRequest = undefined;
}
break;

Expand Down Expand Up @@ -207,29 +212,30 @@ export class HybridLoader {
const { queue, queueSegmentIds, loadedPercent } = this.generateQueue();
this.processRequests(queueSegmentIds, loadedPercent);

console.log(queue.map((i) => i.segment.externalId));

const {
simultaneousHttpDownloads,
simultaneousP2PDownloads,
httpErrorRetries,
} = this.settings;

// for (const engineRequest of this.engineRequests.values()) {
// if (this.requests.executingHttpCount >= simultaneousHttpDownloads) break;
// const request = this.requests.get(engineRequest.segment);
// if (
// !queueSegmentIds.has(engineRequest.segment.localId) &&
// engineRequest.status === "pending" &&
// (!request ||
// request.status === "not-started" ||
// (request.status === "failed" &&
// request.failedAttempts.httpAttemptsCount <
// this.settings.httpErrorRetries))
// ) {
// void this.loadThroughHttp(engineRequest.segment);
// }
// }
if (
this.engineRequest?.shouldBeStartedImmediately &&
this.engineRequest.status === "pending" &&
this.requests.executingHttpCount < simultaneousHttpDownloads
) {
const { engineRequest } = this;
const { segment } = engineRequest;
const request = this.requests.get(segment);
if (
!request ||
request.status === "not-started" ||
(request.status === "failed" &&
request.failedAttempts.httpAttemptsCount <
this.settings.httpErrorRetries)
) {
void this.loadThroughHttp(segment);
}
}

for (const item of queue) {
const { statuses, segment } = item;
Expand Down Expand Up @@ -298,17 +304,13 @@ export class HybridLoader {

// api method for engines
abortSegmentRequest(segmentLocalId: string) {
for (const engineRequest of this.engineRequests.values()) {
if (segmentLocalId === engineRequest.segment.localId) {
engineRequest.abort();
this.engineRequests.delete(engineRequest.segment);
this.logger(
"abort: ",
LoggerUtils.getSegmentString(engineRequest.segment)
);
break;
}
}
if (this.engineRequest?.segment.localId !== segmentLocalId) return;
this.engineRequest.abort();
this.logger(
"abort: ",
LoggerUtils.getSegmentString(this.engineRequest.segment)
);
this.engineRequest = undefined;
}

private async loadThroughHttp(segment: Segment) {
Expand Down Expand Up @@ -461,7 +463,6 @@ export class HybridLoader {

notifyLevelChanged() {
this.levelChangedTimestamp = performance.now();
console.log("LEVEL CHANGED");
}

updatePlayback(position: number, rate: number) {
Expand All @@ -478,6 +479,7 @@ export class HybridLoader {
if (isRateChanged && rate !== 0) this.playback.rate = rate;
if (isPositionSignificantlyChanged) {
this.logger("position significantly changed");
this.engineRequest?.markAsShouldBeStartedImmediately();
}
void this.requestProcessQueueMicrotask(isPositionSignificantlyChanged);
}
Expand Down
9 changes: 9 additions & 0 deletions packages/p2p-media-loader-core/src/requests/engine-request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ export type EngineCallbacks = {

export class EngineRequest {
private _status: "pending" | "succeed" | "failed" | "aborted" = "pending";
private _shouldBeStartedImmediately = false;

constructor(
readonly segment: Segment,
Expand All @@ -17,6 +18,10 @@ export class EngineRequest {
return this._status;
}

get shouldBeStartedImmediately() {
return this._shouldBeStartedImmediately;
}

resolve(data: ArrayBuffer, bandwidth: number) {
this.throwErrorIfNotPending();
this._status = "succeed";
Expand All @@ -35,6 +40,10 @@ export class EngineRequest {
this.engineCallbacks.onError(new CoreRequestError("aborted"));
}

markAsShouldBeStartedImmediately() {
return (this._shouldBeStartedImmediately = true);
}

private throwErrorIfNotPending() {
if (this._status !== "pending") {
throw new Error("Engine request has been already settled.");
Expand Down
19 changes: 2 additions & 17 deletions packages/p2p-media-loader-core/src/utils/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,7 @@ export function* generateQueue(
if (!first) return;

const firstStatuses = getSegmentPlaybackStatuses(first, playback, settings);
// console.log("firstStatuses", firstStatuses, lastRequestedSegment.externalId);
if (isNotActualStatuses(firstStatuses)) {
let isFirstYield = false;
const prev = stream.segments.getPrevTo(requestedSegmentId)?.[1];
if (prev) {
const prevStatuses = getSegmentPlaybackStatuses(prev, playback, settings);
// console.log(prevStatuses);
if (isNotActualStatuses(prevStatuses)) {
// console.log(prevStatuses);
firstStatuses.isHighDemand = true;
yield { segment: first, statuses: firstStatuses };
isFirstYield = true;
}
}
// for cases when engine requests segment that is a little bit
// earlier than current playhead position
// it could happen when playhead position is significantly changed by user
Expand All @@ -45,10 +32,8 @@ export function* generateQueue(
);

if (isNotActualStatuses(secondStatuses)) return;
if (!isFirstYield) {
firstStatuses.isHighDemand = true;
yield { segment: first, statuses: firstStatuses };
}
firstStatuses.isHighDemand = true;
yield { segment: first, statuses: firstStatuses };
yield { segment: second, statuses: secondStatuses };
} else {
yield { segment: first, statuses: firstStatuses };
Expand Down
1 change: 1 addition & 0 deletions packages/p2p-media-loader-hlsjs/src/engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ export class Engine {

private handleManifestLoaded = (event: string, data: ManifestLoadedData) => {
const { networkDetails } = data;

console.log(data.levels.map((i) => i.bitrate));
if (networkDetails instanceof XMLHttpRequest) {
this.core.setManifestResponseUrl(networkDetails.responseURL);
Expand Down

0 comments on commit 5f77dbc

Please sign in to comment.