Skip to content

Commit

Permalink
Feat/exponential backoff (#21)
Browse files Browse the repository at this point in the history
* add fetch option to config

* add exponential backoff + error callbacks + mutable fragment callbacks

* add retry paragraph to readme

* bump version
  • Loading branch information
ajuvercr authored Mar 19, 2024
1 parent 4f8a8d0 commit 2822557
Show file tree
Hide file tree
Showing 13 changed files with 376 additions and 132 deletions.
21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,27 @@ When a page is ready to be interpretted, the `helper` is asked to interpret the
A special value called `marker` is derived from the value of the incoming chain if the chain was important.
For example, when emitting members in order, the member manager can always extract the members that are found, but can only emit them when a marker is issued and only the members that are smaller than that marker.

**Fault tolerance**

The fetcher tries to be tault tolerant. HTTP codes that indicate that the server is overloaded or something else is going wrong are caught and retried.
This is the default behaviour when the provided config does not provide a fetch function.

Caught HTTP codes:

- 408: Request timeout
- 425: Too Early
- 429: Too Many requests
- 500: Internal Server Error
- 502: Bad Gateway
- 503: Service Unavailable
- 504: Gateway Timeout

```typescript
// Provide your own codes with a custom retry function
config.fetch = retry_fetch(fetch, [408, 425, 429, 500, 502, 503, 504], 500, 5);
```


### Member Manager

The member manager _just_ extract members and emits them when they are ready.
Expand Down
18 changes: 15 additions & 3 deletions bin/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,14 @@ program
.default("none"),
)
.option("-f, --follow", "follow the LDES, the client stays in sync")
.option("--after <after>", "follow only relations including members after a certain point in time")
.option("--before <before>", "follow only relations including members before a certain point in time")
.option(
"--after <after>",
"follow only relations including members after a certain point in time",
)
.option(
"--before <before>",
"follow only relations including members before a certain point in time",
)
.option("--poll-interval <number>", "specify poll interval")
.option("--shape-files [shapeFiles...]", "specify a shapefile")
.option(
Expand Down Expand Up @@ -88,6 +94,11 @@ program

program.parse(process.argv);

const f = global.fetch;
global.fetch = (req, options) => {
console.log("Global fetch", req);
return f(req, options);
};
async function main() {
const client = replicateLDES(
intoConfig({
Expand All @@ -103,7 +114,8 @@ async function main() {
shapeFiles,
onlyDefaultGraph,
after,
before
before,
// fetch: <typeof fetch>fetch_f,
}),
undefined,
undefined,
Expand Down
112 changes: 78 additions & 34 deletions lib/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,13 @@ import { CBDShapeExtractor } from "extract-cbd-shape";
import { RdfStore } from "rdf-stores";
import { DataFactory, Writer as NWriter } from "n3";
import { Quad_Object, Term } from "@rdfjs/types";
import { extractMainNodeShape, getObjects, ModulatorFactory, Notifier, streamToArray } from "./utils";
import {
extractMainNodeShape,
getObjects,
ModulatorFactory,
Notifier,
streamToArray,
} from "./utils";
import { LDES, SDS, SHACL, TREE } from "@treecg/types";
import { FetchedPage, Fetcher, longPromise, resetPromise } from "./pageFetcher";
import { Manager } from "./memberManager";
Expand All @@ -15,6 +21,7 @@ import debug from "debug";
import type { Writer } from "@ajuvercr/js-runner";

export { intoConfig } from "./config";
export { retry_fetch } from "./utils";
export type { Member, Page, Relation } from "./page";
export type { Config, MediatorConfig, ShapeConfig } from "./config";

Expand Down Expand Up @@ -65,10 +72,11 @@ async function getInfo(

const resp = await rdfDereference.dereference(shapeFile, {
localFiles: true,
fetch: config.fetch,
});
const quads = await streamToArray(resp.data);
// Add retrieved quads to local stores
quads.forEach(q => {
quads.forEach((q) => {
tempShapeStore.addQuad(q);
shapeConfigStore.addQuad(q);
});
Expand All @@ -77,7 +85,7 @@ async function getInfo(
// We have to find the actual IRI/Blank Node of the main shape within the file
config.shapes.push({
quads,
shapeId: extractMainNodeShape(tempShapeStore)
shapeId: extractMainNodeShape(tempShapeStore),
});
} else {
config.shapes.push({
Expand Down Expand Up @@ -111,6 +119,7 @@ async function getInfo(
logger("Maybe find more info at %s", ldesId.value);
const resp = await dereferencer.dereference(ldesId.value, {
localFiles: true,
fetch: config.fetch,
});
store = RdfStore.createDefault();
await new Promise((resolve, reject) => {
Expand All @@ -125,7 +134,7 @@ async function getInfo(
timestampPaths.length,
isVersionOfPaths.length,
);
} catch (ex: any) { }
} catch (ex: any) {}
}

if (timestampPaths.length > 1) {
Expand All @@ -145,11 +154,18 @@ async function getInfo(

if (config.shapes) {
for (const shape of config.shapes) {
const memberType = getObjects(shapeConfigStore, shape.shapeId, SHACL.terms.targetClass)[0];
const memberType = getObjects(
shapeConfigStore,
shape.shapeId,
SHACL.terms.targetClass,
)[0];
if (memberType) {
shapeMap.set(memberType.value, shape.shapeId);
} else {
console.error("Ignoring SHACL shape without a declared sh:targetClass: ", shape.shapeId);
console.error(
"Ignoring SHACL shape without a declared sh:targetClass: ",
shape.shapeId,
);
}
}
} else {
Expand All @@ -158,7 +174,10 @@ async function getInfo(
if (memberType) {
shapeMap.set(memberType.value, shapeId);
} else {
console.error("Ignoring SHACL shape without a declared sh:targetClass: ", shapeId);
console.error(
"Ignoring SHACL shape without a declared sh:targetClass: ",
shapeId,
);
}
}
}
Expand All @@ -168,8 +187,10 @@ async function getInfo(
config.shapes && config.shapes.length > 0 ? shapeConfigStore : store,
dereferencer,
{
// Updated upstream
cbdDefaultGraph: config.onlyDefaultGraph,
},
fetch: config.fetch,
}, //Stashed changes
),
shapeMap: config.noShape ? undefined : shapeMap,
timestampPath: timestampPaths[0],
Expand All @@ -184,7 +205,9 @@ type EventReceiver<T> = (params: T) => void;

export type ClientEvents = {
fragment: void;
mutable: void;
poll: void;
error: any;
};

export class Client {
Expand All @@ -200,7 +223,6 @@ export class Client {

private modulatorFactory;

private pollCycle: (() => void)[] = [];
private stateFactory: StateFactory;

private listeners: {
Expand Down Expand Up @@ -256,18 +278,18 @@ export class Client {
});
}

addPollCycle(cb: () => void) {
this.pollCycle.push(cb);
}

async init(
emit: (member: Member) => void,
close: () => void,
factory: ModulatorFactory,
): Promise<void> {
const logger = log.extend("init");
// Fetch the url
const root = await fetchPage(this.config.url, this.dereferencer);
const root = await fetchPage(
this.config.url,
this.dereferencer,
this.config.fetch,
);
// Try to get a shape
// TODO Choose a view
const viewQuads = root.data.getQuads(null, TREE.terms.view, null, null);
Expand Down Expand Up @@ -312,27 +334,44 @@ export class Client {
throw "Can only emit members in order, if LDES is configured with timestampPath";
}

this.fetcher = new Fetcher(this.dereferencer, this.config.loose, this.config.after, this.config.before);
this.fetcher = new Fetcher(
this.dereferencer,
this.config.loose,
this.config.fetch,
this.config.after,
this.config.before,
);

const notifier: Notifier<StrategyEvents, {}> = {
error: (ex: any) => this.emit("error", ex),
fragment: () => this.emit("fragment", undefined),
member: (m) => {
// Check if member is within date constraints (if any)
if (this.config.before) {
if (m.timestamp && m.timestamp instanceof Date && m.timestamp > this.config.before) {
if (
m.timestamp &&
m.timestamp instanceof Date &&
m.timestamp > this.config.before
) {
return;
}
}
if (this.config.after) {
if (m.timestamp && m.timestamp instanceof Date && m.timestamp < this.config.after) {
if (
m.timestamp &&
m.timestamp instanceof Date &&
m.timestamp < this.config.after
) {
return;
}
}
emit(m);
},
pollCycle: () => {
this.emit("poll", undefined);
this.pollCycle.forEach((cb) => cb());
},
mutable: () => {
this.emit("mutable", undefined);
},
close: () => {
this.stateFactory.write();
Expand All @@ -343,22 +382,22 @@ export class Client {
this.strategy =
this.ordered !== "none"
? new OrderedStrategy(
this.memberManager,
this.fetcher,
notifier,
factory,
this.ordered,
this.config.polling,
this.config.pollInterval,
)
this.memberManager,
this.fetcher,
notifier,
factory,
this.ordered,
this.config.polling,
this.config.pollInterval,
)
: new UnorderedStrategy(
this.memberManager,
this.fetcher,
notifier,
factory,
this.config.polling,
this.config.pollInterval,
);
this.memberManager,
this.fetcher,
notifier,
factory,
this.config.polling,
this.config.pollInterval,
);

logger("Found %d views, choosing %s", viewQuads.length, ldesId.value);
this.strategy.start(ldesId.value);
Expand All @@ -371,6 +410,7 @@ export class Client {
const emitted = longPromise();
const config: UnderlyingDefaultSource = {
start: async (controller: Controller) => {
this.on("error", controller.error.bind(controller));
this.modulatorFactory.pause();
await this.init(
(member) => {
Expand Down Expand Up @@ -403,8 +443,12 @@ export class Client {
async function fetchPage(
location: string,
dereferencer: RdfDereferencer,
fetch_f?: typeof fetch,
): Promise<FetchedPage> {
const resp = await dereferencer.dereference(location, { localFiles: true });
const resp = await dereferencer.dereference(location, {
localFiles: true,
fetch: fetch_f,
});
const url = resp.url;
const data = RdfStore.createDefault();
await new Promise((resolve, reject) => {
Expand Down
7 changes: 6 additions & 1 deletion lib/config.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { NamedNode, Quad } from "@rdfjs/types";
import { DefaultFetcherConfig, FetcherConfig } from "./pageFetcher";
import { retry_fetch } from "./utils";

export interface ShapeConfig {
quads: Quad[];
Expand Down Expand Up @@ -32,7 +33,7 @@ export interface Config {
shapes?: ShapeConfig[];
shapeFiles?: string[];
onlyDefaultGraph?: boolean;

fetch?: typeof fetch;
// Add flag to indicate in order (default true)
// Make sure that slower pages to first emit the first members
//
Expand Down Expand Up @@ -67,5 +68,9 @@ export async function getConfig(): Promise<Config & WithTarget> {
}

export function intoConfig(config: Partial<Config>): Config {
if (!config.fetch) {
config.fetch = retry_fetch(fetch, [408, 425, 429, 500, 502, 503, 504]);
}

return Object.assign({}, defaultConfig, defaultTarget, config);
}
Loading

0 comments on commit 2822557

Please sign in to comment.