Skip to content

Commit

Permalink
add exponential backoff + error callbacks + mutable fragment callbacks
Browse files Browse the repository at this point in the history
  • Loading branch information
ajuvercr committed Mar 19, 2024
1 parent 410c687 commit 908252f
Show file tree
Hide file tree
Showing 10 changed files with 247 additions and 100 deletions.
14 changes: 8 additions & 6 deletions lib/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import debug from "debug";
import type { Writer } from "@ajuvercr/js-runner";

export { intoConfig } from "./config";
export { retry_fetch } from "./utils";
export type { Member, Page, Relation } from "./page";
export type { Config, MediatorConfig, ShapeConfig } from "./config";

Expand Down Expand Up @@ -204,7 +205,9 @@ type EventReceiver<T> = (params: T) => void;

export type ClientEvents = {
fragment: void;
mutable: void;
poll: void;
error: any;
};

export class Client {
Expand All @@ -220,7 +223,6 @@ export class Client {

private modulatorFactory;

private pollCycle: (() => void)[] = [];
private stateFactory: StateFactory;

private listeners: {
Expand Down Expand Up @@ -276,10 +278,6 @@ export class Client {
});
}

addPollCycle(cb: () => void) {
this.pollCycle.push(cb);
}

async init(
emit: (member: Member) => void,
close: () => void,
Expand Down Expand Up @@ -345,6 +343,7 @@ export class Client {
);

const notifier: Notifier<StrategyEvents, {}> = {
error: (ex: any) => this.emit("error", ex),
fragment: () => this.emit("fragment", undefined),
member: (m) => {
// Check if member is within date constraints (if any)
Expand All @@ -370,7 +369,9 @@ export class Client {
},
pollCycle: () => {
this.emit("poll", undefined);
this.pollCycle.forEach((cb) => cb());
},
mutable: () => {
this.emit("mutable", undefined);
},
close: () => {
this.stateFactory.write();
Expand Down Expand Up @@ -409,6 +410,7 @@ export class Client {
const emitted = longPromise();
const config: UnderlyingDefaultSource = {
start: async (controller: Controller) => {
this.on("error", controller.error.bind(controller));
this.modulatorFactory.pause();
await this.init(
(member) => {
Expand Down
5 changes: 5 additions & 0 deletions lib/config.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { NamedNode, Quad } from "@rdfjs/types";
import { DefaultFetcherConfig, FetcherConfig } from "./pageFetcher";
import { retry_fetch } from "./utils";

export interface ShapeConfig {
quads: Quad[];
Expand Down Expand Up @@ -67,5 +68,9 @@ export async function getConfig(): Promise<Config & WithTarget> {
}

export function intoConfig(config: Partial<Config>): Config {
if (!config.fetch) {
config.fetch = retry_fetch(fetch, [408, 425, 429, 500, 502, 503, 504]);
}

return Object.assign({}, defaultConfig, defaultTarget, config);
}
112 changes: 58 additions & 54 deletions lib/pageFetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ export interface Helper {
export type FetchEvent = {
relationFound: { from: Node; target: Relation };
pageFetched: FetchedPage;
// seen: {};
scheduleFetch: Node;
error: any;
};

export type Cache = {
Expand Down Expand Up @@ -94,65 +94,69 @@ export class Fetcher {
async fetch<S>(node: Node, state: S, notifier: Notifier<FetchEvent, S>) {
const logger = log.extend("fetch");

const resp = await this.dereferencer.dereference(node.target, {
localFiles: true,
fetch: this.fetch_f,
});

node.target = resp.url;

const cache = {} as Cache;
if (resp.headers) {
const cacheControlCandidate = resp.headers.get("cache-control");
if (cacheControlCandidate) {
const controls = cacheControlCandidate
.split(",")
.map((x) => x.split("=", 2).map((x) => x.trim()));

for (let control of controls) {
if (control[0] == "max-age") {
cache.maxAge = parseInt(control[1]);
}

if (control[0] == "immutable") {
cache.immutable = true;
try {
const resp = await this.dereferencer.dereference(node.target, {
localFiles: true,
fetch: this.fetch_f,
});

node.target = resp.url;

const cache = {} as Cache;
if (resp.headers) {
const cacheControlCandidate = resp.headers.get("cache-control");
if (cacheControlCandidate) {
const controls = cacheControlCandidate
.split(",")
.map((x) => x.split("=", 2).map((x) => x.trim()));

for (let control of controls) {
if (control[0] == "max-age") {
cache.maxAge = parseInt(control[1]);
}

if (control[0] == "immutable") {
cache.immutable = true;
}
}
}
}
}

if (!cache.immutable) {
notifier.scheduleFetch(node, state);
}
if (!cache.immutable) {
notifier.scheduleFetch(node, state);
}

logger("Cache for %s %o", node.target, cache);

const data = RdfStore.createDefault();
let quadCount = 0;
await new Promise((resolve, reject) => {
resp.data
.on("data", (quad) => {
data.addQuad(quad);
quadCount++;
})
.on("end", resolve)
.on("error", reject);
});
logger("Got data %s (%d quads)", node.target, quadCount);

for (let rel of extractRelations(
data,
namedNode(resp.url),
this.loose,
this.after,
this.before,
)) {
if (!node.expected.some((x) => x == rel.node)) {
notifier.relationFound({ from: node, target: rel }, state);
logger("Cache for %s %o", node.target, cache);

const data = RdfStore.createDefault();
let quadCount = 0;
await new Promise((resolve, reject) => {
resp.data
.on("data", (quad) => {
data.addQuad(quad);
quadCount++;
})
.on("end", resolve)
.on("error", reject);
});
logger("Got data %s (%d quads)", node.target, quadCount);

for (let rel of extractRelations(
data,
namedNode(resp.url),
this.loose,
this.after,
this.before,
)) {
if (!node.expected.some((x) => x == rel.node)) {
notifier.relationFound({ from: node, target: rel }, state);
}
}
}

// TODO check this, is node.target correct?
notifier.pageFetched({ data, url: resp.url }, state);
// TODO check this, is node.target correct?
notifier.pageFetched({ data, url: resp.url }, state);
} catch (ex) {
notifier.error(ex, state);
}
}
}
2 changes: 2 additions & 0 deletions lib/strategy/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ export type PageAndRelation = {
export type StrategyEvents = {
member: Member;
fragment: {};
mutable: {};
pollCycle: {};
close: {};
error: any;
};
4 changes: 4 additions & 0 deletions lib/strategy/ordered.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,13 @@ export class OrderedStrategy {
// start member extraction
// - relationFound: a relation has been found, put the extended chain in the queue
this.fetchNotifier = {
error: (error: any) => {
this.notifier.error(error, {});
},
scheduleFetch: ({ target, expected }, { chain }) => {
chain.target = target;
this.toPoll.push({ chain, expected });
this.notifier.mutable({}, {});
},
pageFetched: (page, { chain, index }) => {
logger("Page fetched %s", page.url);
Expand Down
8 changes: 6 additions & 2 deletions lib/strategy/unordered.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,12 @@ export class UnorderedStrategy {
// start member extraction
// - relationFound: a relation has been found, inFlight += 1 and put it in the queue
this.fetchNotifier = {
error: (error: any) => {
this.notifier.error(error, {});
},
scheduleFetch: (node: Node) => {
this.cacheList.push(node);
this.notifier.mutable({}, {});
},
pageFetched: (page, { index }) => this.handleFetched(page, index),
relationFound: ({ from, target }) => {
Expand All @@ -54,8 +58,8 @@ export class UnorderedStrategy {
},
};

// Callbacks for the member extractor
// - done: all members have been extracted, we are finally done with a page inFlight -= 1
// Callbacks for the member extractor
// - done: all members have been extracted, we are finally done with a page inFlight -= 1
// - extracted: a member has been found, yeet it
this.memberNotifier = {
done: () => {
Expand Down
71 changes: 54 additions & 17 deletions lib/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,28 +73,35 @@ export function streamToArray<T extends BaseQuad>(
* If more than one is found an exception is thrown.
*/
export function extractMainNodeShape(store: RdfStore): NamedNode {
const nodeShapes = getSubjects(store, RDF.terms.type, SHACL.terms.NodeShape, null);
const nodeShapes = getSubjects(
store,
RDF.terms.type,
SHACL.terms.NodeShape,
null,
);
let mainNodeShape = null;

if (nodeShapes && nodeShapes.length > 0) {
for (const ns of nodeShapes) {
const isNotReferenced = getSubjects(store, null, ns, null).length === 0;

if (isNotReferenced) {
if (!mainNodeShape) {
mainNodeShape = ns;
} else {
throw new Error("There are multiple main node shapes in a given shape graph. Unrelated shapes must be given as separate shape graphs");
}
for (const ns of nodeShapes) {
const isNotReferenced = getSubjects(store, null, ns, null).length === 0;

if (isNotReferenced) {
if (!mainNodeShape) {
mainNodeShape = ns;
} else {
throw new Error(
"There are multiple main node shapes in a given shape graph. Unrelated shapes must be given as separate shape graphs",
);
}
}
if (mainNodeShape) {
return <NamedNode>mainNodeShape;
} else {
throw new Error("No main SHACL Node Shapes found in given shape graph");
}
}
}
if (mainNodeShape) {
return <NamedNode>mainNodeShape;
} else {
throw new Error("No main SHACL Node Shapes found in given shape graph");
}
} else {
throw new Error("No SHACL Node Shapes found in given shape graph");
throw new Error("No SHACL Node Shapes found in given shape graph");
}
}

Expand Down Expand Up @@ -281,3 +288,33 @@ class ModulatorInstance<T> implements Modulator<T> {
}
}
}

export function retry_fetch(
fetch_f: typeof fetch,
httpCodes: number[],
base = 500,
maxRetries = 5,
): typeof fetch {
const retry: typeof fetch = async (input, init) => {
let tryCount = 0;
let retryTime = base;
while (tryCount < maxRetries) {
const resp = await fetch_f(input, init);
if (!resp.ok) {
if (httpCodes.some((x) => x == resp.status)) {
// Wait 500ms, 1 second, 2 seconds, 4 seconds, 8 seconds, fail
tryCount += 1;
await new Promise((res) => setTimeout(res, retryTime));
retryTime *= 2;
continue;
}
return resp;
}
return resp;
}

throw "Max retries";
};

return retry;
}
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "ldes-client",
"description": "This package provides common tooling to work with LDESes.",
"version": "0.0.9-alpha.0",
"version": "0.0.9-alpha.2",
"main": "dist/lib/client.js",
"bin": {
"ldes-client": "dist/bin/cli.js"
Expand Down
Loading

0 comments on commit 908252f

Please sign in to comment.