diff --git a/README.md b/README.md index 94930d9..eb58f00 100644 --- a/README.md +++ b/README.md @@ -69,6 +69,27 @@ When a page is ready to be interpretted, the `helper` is asked to interpret the A special value called `marker` is derived from the value of the incoming chain if the chain was important. For example, when emitting members in order, the member manager can always extract the members that are found, but can only emit them when a marker is issued and only the members that are smaller than that marker. +**Fault tolerance** + +The fetcher tries to be tault tolerant. HTTP codes that indicate that the server is overloaded or something else is going wrong are caught and retried. +This is the default behaviour when the provided config does not provide a fetch function. + +Caught HTTP codes: + +- 408: Request timeout +- 425: Too Early +- 429: Too Many requests +- 500: Internal Server Error +- 502: Bad Gateway +- 503: Service Unavailable +- 504: Gateway Timeout + +```typescript +// Provide your own codes with a custom retry function +config.fetch = retry_fetch(fetch, [408, 425, 429, 500, 502, 503, 504], 500, 5); +``` + + ### Member Manager The member manager _just_ extract members and emits them when they are ready. diff --git a/bin/cli.ts b/bin/cli.ts index d460979..0b1e1b4 100644 --- a/bin/cli.ts +++ b/bin/cli.ts @@ -29,8 +29,14 @@ program .default("none"), ) .option("-f, --follow", "follow the LDES, the client stays in sync") - .option("--after ", "follow only relations including members after a certain point in time") - .option("--before ", "follow only relations including members before a certain point in time") + .option( + "--after ", + "follow only relations including members after a certain point in time", + ) + .option( + "--before ", + "follow only relations including members before a certain point in time", + ) .option("--poll-interval ", "specify poll interval") .option("--shape-files [shapeFiles...]", "specify a shapefile") .option( @@ -88,6 +94,11 @@ program program.parse(process.argv); +const f = global.fetch; +global.fetch = (req, options) => { + console.log("Global fetch", req); + return f(req, options); +}; async function main() { const client = replicateLDES( intoConfig({ @@ -103,7 +114,8 @@ async function main() { shapeFiles, onlyDefaultGraph, after, - before + before, + // fetch: fetch_f, }), undefined, undefined, diff --git a/lib/client.ts b/lib/client.ts index 0d677bb..cff80f0 100644 --- a/lib/client.ts +++ b/lib/client.ts @@ -6,7 +6,13 @@ import { CBDShapeExtractor } from "extract-cbd-shape"; import { RdfStore } from "rdf-stores"; import { DataFactory, Writer as NWriter } from "n3"; import { Quad_Object, Term } from "@rdfjs/types"; -import { extractMainNodeShape, getObjects, ModulatorFactory, Notifier, streamToArray } from "./utils"; +import { + extractMainNodeShape, + getObjects, + ModulatorFactory, + Notifier, + streamToArray, +} from "./utils"; import { LDES, SDS, SHACL, TREE } from "@treecg/types"; import { FetchedPage, Fetcher, longPromise, resetPromise } from "./pageFetcher"; import { Manager } from "./memberManager"; @@ -15,6 +21,7 @@ import debug from "debug"; import type { Writer } from "@ajuvercr/js-runner"; export { intoConfig } from "./config"; +export { retry_fetch } from "./utils"; export type { Member, Page, Relation } from "./page"; export type { Config, MediatorConfig, ShapeConfig } from "./config"; @@ -65,10 +72,11 @@ async function getInfo( const resp = await rdfDereference.dereference(shapeFile, { localFiles: true, + fetch: config.fetch, }); const quads = await streamToArray(resp.data); // Add retrieved quads to local stores - quads.forEach(q => { + quads.forEach((q) => { tempShapeStore.addQuad(q); shapeConfigStore.addQuad(q); }); @@ -77,7 +85,7 @@ async function getInfo( // We have to find the actual IRI/Blank Node of the main shape within the file config.shapes.push({ quads, - shapeId: extractMainNodeShape(tempShapeStore) + shapeId: extractMainNodeShape(tempShapeStore), }); } else { config.shapes.push({ @@ -111,6 +119,7 @@ async function getInfo( logger("Maybe find more info at %s", ldesId.value); const resp = await dereferencer.dereference(ldesId.value, { localFiles: true, + fetch: config.fetch, }); store = RdfStore.createDefault(); await new Promise((resolve, reject) => { @@ -125,7 +134,7 @@ async function getInfo( timestampPaths.length, isVersionOfPaths.length, ); - } catch (ex: any) { } + } catch (ex: any) {} } if (timestampPaths.length > 1) { @@ -145,11 +154,18 @@ async function getInfo( if (config.shapes) { for (const shape of config.shapes) { - const memberType = getObjects(shapeConfigStore, shape.shapeId, SHACL.terms.targetClass)[0]; + const memberType = getObjects( + shapeConfigStore, + shape.shapeId, + SHACL.terms.targetClass, + )[0]; if (memberType) { shapeMap.set(memberType.value, shape.shapeId); } else { - console.error("Ignoring SHACL shape without a declared sh:targetClass: ", shape.shapeId); + console.error( + "Ignoring SHACL shape without a declared sh:targetClass: ", + shape.shapeId, + ); } } } else { @@ -158,7 +174,10 @@ async function getInfo( if (memberType) { shapeMap.set(memberType.value, shapeId); } else { - console.error("Ignoring SHACL shape without a declared sh:targetClass: ", shapeId); + console.error( + "Ignoring SHACL shape without a declared sh:targetClass: ", + shapeId, + ); } } } @@ -168,8 +187,10 @@ async function getInfo( config.shapes && config.shapes.length > 0 ? shapeConfigStore : store, dereferencer, { + // Updated upstream cbdDefaultGraph: config.onlyDefaultGraph, - }, + fetch: config.fetch, + }, //Stashed changes ), shapeMap: config.noShape ? undefined : shapeMap, timestampPath: timestampPaths[0], @@ -184,7 +205,9 @@ type EventReceiver = (params: T) => void; export type ClientEvents = { fragment: void; + mutable: void; poll: void; + error: any; }; export class Client { @@ -200,7 +223,6 @@ export class Client { private modulatorFactory; - private pollCycle: (() => void)[] = []; private stateFactory: StateFactory; private listeners: { @@ -256,10 +278,6 @@ export class Client { }); } - addPollCycle(cb: () => void) { - this.pollCycle.push(cb); - } - async init( emit: (member: Member) => void, close: () => void, @@ -267,7 +285,11 @@ export class Client { ): Promise { const logger = log.extend("init"); // Fetch the url - const root = await fetchPage(this.config.url, this.dereferencer); + const root = await fetchPage( + this.config.url, + this.dereferencer, + this.config.fetch, + ); // Try to get a shape // TODO Choose a view const viewQuads = root.data.getQuads(null, TREE.terms.view, null, null); @@ -312,19 +334,34 @@ export class Client { throw "Can only emit members in order, if LDES is configured with timestampPath"; } - this.fetcher = new Fetcher(this.dereferencer, this.config.loose, this.config.after, this.config.before); + this.fetcher = new Fetcher( + this.dereferencer, + this.config.loose, + this.config.fetch, + this.config.after, + this.config.before, + ); const notifier: Notifier = { + error: (ex: any) => this.emit("error", ex), fragment: () => this.emit("fragment", undefined), member: (m) => { // Check if member is within date constraints (if any) if (this.config.before) { - if (m.timestamp && m.timestamp instanceof Date && m.timestamp > this.config.before) { + if ( + m.timestamp && + m.timestamp instanceof Date && + m.timestamp > this.config.before + ) { return; } } if (this.config.after) { - if (m.timestamp && m.timestamp instanceof Date && m.timestamp < this.config.after) { + if ( + m.timestamp && + m.timestamp instanceof Date && + m.timestamp < this.config.after + ) { return; } } @@ -332,7 +369,9 @@ export class Client { }, pollCycle: () => { this.emit("poll", undefined); - this.pollCycle.forEach((cb) => cb()); + }, + mutable: () => { + this.emit("mutable", undefined); }, close: () => { this.stateFactory.write(); @@ -343,22 +382,22 @@ export class Client { this.strategy = this.ordered !== "none" ? new OrderedStrategy( - this.memberManager, - this.fetcher, - notifier, - factory, - this.ordered, - this.config.polling, - this.config.pollInterval, - ) + this.memberManager, + this.fetcher, + notifier, + factory, + this.ordered, + this.config.polling, + this.config.pollInterval, + ) : new UnorderedStrategy( - this.memberManager, - this.fetcher, - notifier, - factory, - this.config.polling, - this.config.pollInterval, - ); + this.memberManager, + this.fetcher, + notifier, + factory, + this.config.polling, + this.config.pollInterval, + ); logger("Found %d views, choosing %s", viewQuads.length, ldesId.value); this.strategy.start(ldesId.value); @@ -371,6 +410,7 @@ export class Client { const emitted = longPromise(); const config: UnderlyingDefaultSource = { start: async (controller: Controller) => { + this.on("error", controller.error.bind(controller)); this.modulatorFactory.pause(); await this.init( (member) => { @@ -403,8 +443,12 @@ export class Client { async function fetchPage( location: string, dereferencer: RdfDereferencer, + fetch_f?: typeof fetch, ): Promise { - const resp = await dereferencer.dereference(location, { localFiles: true }); + const resp = await dereferencer.dereference(location, { + localFiles: true, + fetch: fetch_f, + }); const url = resp.url; const data = RdfStore.createDefault(); await new Promise((resolve, reject) => { diff --git a/lib/config.ts b/lib/config.ts index a375081..c6726f1 100644 --- a/lib/config.ts +++ b/lib/config.ts @@ -1,5 +1,6 @@ import { NamedNode, Quad } from "@rdfjs/types"; import { DefaultFetcherConfig, FetcherConfig } from "./pageFetcher"; +import { retry_fetch } from "./utils"; export interface ShapeConfig { quads: Quad[]; @@ -32,7 +33,7 @@ export interface Config { shapes?: ShapeConfig[]; shapeFiles?: string[]; onlyDefaultGraph?: boolean; - + fetch?: typeof fetch; // Add flag to indicate in order (default true) // Make sure that slower pages to first emit the first members // @@ -67,5 +68,9 @@ export async function getConfig(): Promise { } export function intoConfig(config: Partial): Config { + if (!config.fetch) { + config.fetch = retry_fetch(fetch, [408, 425, 429, 500, 502, 503, 504]); + } + return Object.assign({}, defaultConfig, defaultTarget, config); } diff --git a/lib/pageFetcher.ts b/lib/pageFetcher.ts index 2a45c1f..a274f89 100644 --- a/lib/pageFetcher.ts +++ b/lib/pageFetcher.ts @@ -61,8 +61,8 @@ export interface Helper { export type FetchEvent = { relationFound: { from: Node; target: Relation }; pageFetched: FetchedPage; - // seen: {}; scheduleFetch: Node; + error: any; }; export type Cache = { @@ -73,12 +73,20 @@ export type Cache = { export class Fetcher { private dereferencer: RdfDereferencer; private loose: boolean; + private fetch_f?: typeof fetch; private after?: Date; private before?: Date; - constructor(dereferencer: RdfDereferencer, loose: boolean, after?: Date, before?: Date) { + constructor( + dereferencer: RdfDereferencer, + loose: boolean, + fetch_f?: typeof fetch, + after?: Date, + before?: Date, + ) { this.dereferencer = dereferencer; this.loose = loose; + this.fetch_f = fetch_f; if (after) this.after = after; if (before) this.before = before; } @@ -86,58 +94,69 @@ export class Fetcher { async fetch(node: Node, state: S, notifier: Notifier) { const logger = log.extend("fetch"); - const resp = await this.dereferencer.dereference(node.target, { - localFiles: true, - }); - - node.target = resp.url; - - const cache = {} as Cache; - if (resp.headers) { - const cacheControlCandidate = resp.headers.get("cache-control"); - if (cacheControlCandidate) { - const controls = cacheControlCandidate - .split(",") - .map((x) => x.split("=", 2).map((x) => x.trim())); - - for (let control of controls) { - if (control[0] == "max-age") { - cache.maxAge = parseInt(control[1]); - } - - if (control[0] == "immutable") { - cache.immutable = true; + try { + const resp = await this.dereferencer.dereference(node.target, { + localFiles: true, + fetch: this.fetch_f, + }); + + node.target = resp.url; + + const cache = {} as Cache; + if (resp.headers) { + const cacheControlCandidate = resp.headers.get("cache-control"); + if (cacheControlCandidate) { + const controls = cacheControlCandidate + .split(",") + .map((x) => x.split("=", 2).map((x) => x.trim())); + + for (let control of controls) { + if (control[0] == "max-age") { + cache.maxAge = parseInt(control[1]); + } + + if (control[0] == "immutable") { + cache.immutable = true; + } } } } - } - if (!cache.immutable) { - notifier.scheduleFetch(node, state); - } + if (!cache.immutable) { + notifier.scheduleFetch(node, state); + } - logger("Cache for %s %o", node.target, cache); - - const data = RdfStore.createDefault(); - let quadCount = 0; - await new Promise((resolve, reject) => { - resp.data - .on("data", (quad) => { - data.addQuad(quad); - quadCount++; - }) - .on("end", resolve) - .on("error", reject); - }); - logger("Got data %s (%d quads)", node.target, quadCount); - - for (let rel of extractRelations(data, namedNode(resp.url), this.loose, this.after, this.before)) { - if (!node.expected.some((x) => x == rel.node)) { - notifier.relationFound({ from: node, target: rel }, state); + logger("Cache for %s %o", node.target, cache); + + const data = RdfStore.createDefault(); + let quadCount = 0; + await new Promise((resolve, reject) => { + resp.data + .on("data", (quad) => { + data.addQuad(quad); + quadCount++; + }) + .on("end", resolve) + .on("error", reject); + }); + logger("Got data %s (%d quads)", node.target, quadCount); + + for (let rel of extractRelations( + data, + namedNode(resp.url), + this.loose, + this.after, + this.before, + )) { + if (!node.expected.some((x) => x == rel.node)) { + notifier.relationFound({ from: node, target: rel }, state); + } } - } - // TODO check this, is node.target correct? - notifier.pageFetched({ data, url: resp.url }, state); + // TODO check this, is node.target correct? + notifier.pageFetched({ data, url: resp.url }, state); + } catch (ex) { + notifier.error(ex, state); + } } } diff --git a/lib/strategy/index.ts b/lib/strategy/index.ts index f473405..99e6eba 100644 --- a/lib/strategy/index.ts +++ b/lib/strategy/index.ts @@ -30,6 +30,8 @@ export type PageAndRelation = { export type StrategyEvents = { member: Member; fragment: {}; + mutable: {}; pollCycle: {}; close: {}; + error: any; }; diff --git a/lib/strategy/ordered.ts b/lib/strategy/ordered.ts index 18e6692..6e15ec4 100644 --- a/lib/strategy/ordered.ts +++ b/lib/strategy/ordered.ts @@ -81,9 +81,13 @@ export class OrderedStrategy { // start member extraction // - relationFound: a relation has been found, put the extended chain in the queue this.fetchNotifier = { + error: (error: any) => { + this.notifier.error(error, {}); + }, scheduleFetch: ({ target, expected }, { chain }) => { chain.target = target; this.toPoll.push({ chain, expected }); + this.notifier.mutable({}, {}); }, pageFetched: (page, { chain, index }) => { logger("Page fetched %s", page.url); diff --git a/lib/strategy/unordered.ts b/lib/strategy/unordered.ts index b75bcfd..98f4fec 100644 --- a/lib/strategy/unordered.ts +++ b/lib/strategy/unordered.ts @@ -43,8 +43,12 @@ export class UnorderedStrategy { // start member extraction // - relationFound: a relation has been found, inFlight += 1 and put it in the queue this.fetchNotifier = { + error: (error: any) => { + this.notifier.error(error, {}); + }, scheduleFetch: (node: Node) => { this.cacheList.push(node); + this.notifier.mutable({}, {}); }, pageFetched: (page, { index }) => this.handleFetched(page, index), relationFound: ({ from, target }) => { @@ -54,8 +58,8 @@ export class UnorderedStrategy { }, }; - // Callbacks for the member extractor - // - done: all members have been extracted, we are finally done with a page inFlight -= 1 + // Callbacks for the member extractor + // - done: all members have been extracted, we are finally done with a page inFlight -= 1 // - extracted: a member has been found, yeet it this.memberNotifier = { done: () => { diff --git a/lib/utils.ts b/lib/utils.ts index edb87e9..4478941 100644 --- a/lib/utils.ts +++ b/lib/utils.ts @@ -73,28 +73,35 @@ export function streamToArray( * If more than one is found an exception is thrown. */ export function extractMainNodeShape(store: RdfStore): NamedNode { - const nodeShapes = getSubjects(store, RDF.terms.type, SHACL.terms.NodeShape, null); + const nodeShapes = getSubjects( + store, + RDF.terms.type, + SHACL.terms.NodeShape, + null, + ); let mainNodeShape = null; if (nodeShapes && nodeShapes.length > 0) { - for (const ns of nodeShapes) { - const isNotReferenced = getSubjects(store, null, ns, null).length === 0; - - if (isNotReferenced) { - if (!mainNodeShape) { - mainNodeShape = ns; - } else { - throw new Error("There are multiple main node shapes in a given shape graph. Unrelated shapes must be given as separate shape graphs"); - } + for (const ns of nodeShapes) { + const isNotReferenced = getSubjects(store, null, ns, null).length === 0; + + if (isNotReferenced) { + if (!mainNodeShape) { + mainNodeShape = ns; + } else { + throw new Error( + "There are multiple main node shapes in a given shape graph. Unrelated shapes must be given as separate shape graphs", + ); } - } - if (mainNodeShape) { - return mainNodeShape; - } else { - throw new Error("No main SHACL Node Shapes found in given shape graph"); - } + } + } + if (mainNodeShape) { + return mainNodeShape; + } else { + throw new Error("No main SHACL Node Shapes found in given shape graph"); + } } else { - throw new Error("No SHACL Node Shapes found in given shape graph"); + throw new Error("No SHACL Node Shapes found in given shape graph"); } } @@ -281,3 +288,33 @@ class ModulatorInstance implements Modulator { } } } + +export function retry_fetch( + fetch_f: typeof fetch, + httpCodes: number[], + base = 500, + maxRetries = 5, +): typeof fetch { + const retry: typeof fetch = async (input, init) => { + let tryCount = 0; + let retryTime = base; + while (tryCount < maxRetries) { + const resp = await fetch_f(input, init); + if (!resp.ok) { + if (httpCodes.some((x) => x == resp.status)) { + // Wait 500ms, 1 second, 2 seconds, 4 seconds, 8 seconds, fail + tryCount += 1; + await new Promise((res) => setTimeout(res, retryTime)); + retryTime *= 2; + continue; + } + return resp; + } + return resp; + } + + throw "Max retries"; + }; + + return retry; +} diff --git a/package-lock.json b/package-lock.json index 22f9e75..bcae349 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "ldes-client", - "version": "0.0.8", + "version": "0.0.9-alpha.0", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "ldes-client", - "version": "0.0.8", + "version": "0.0.9-alpha.0", "license": "MIT", "dependencies": { "@ajuvercr/js-runner": "^0.1.17", @@ -14,12 +14,13 @@ "@types/debug": "^4.1.10", "commander": "^11.1.0", "debug": "^4.3.4", - "extract-cbd-shape": "^0.1.0", + "extract-cbd-shape": "^0.1.1", "heap-js": "^2.3.0", "n3": "^1.17.2", "rdf-data-factory": "^1.1.2", "rdf-dereference": "^2.2.0", - "rdf-stores": "^1.0.0" + "rdf-stores": "^1.0.0", + "throttled-queue": "^2.1.4" }, "bin": { "ldes-client": "dist/bin/cli.js" @@ -4615,6 +4616,11 @@ "node": "*" } }, + "node_modules/throttled-queue": { + "version": "2.1.4", + "resolved": "https://registry.npmjs.org/throttled-queue/-/throttled-queue-2.1.4.tgz", + "integrity": "sha512-YGdk8sdmr4ge3g+doFj/7RLF5kLM+Mi7DEciu9PHxnMJZMeVuZeTj31g4VE7ekUffx/IdbvrtOCiz62afg0mkg==" + }, "node_modules/tmpl": { "version": "1.0.5", "dev": true, diff --git a/package.json b/package.json index b465222..3c97375 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "ldes-client", "description": "This package provides common tooling to work with LDESes.", - "version": "0.0.8", + "version": "0.0.9-alpha.3", "main": "dist/lib/client.js", "bin": { "ldes-client": "dist/bin/cli.js" @@ -27,12 +27,13 @@ "@types/debug": "^4.1.10", "commander": "^11.1.0", "debug": "^4.3.4", - "extract-cbd-shape": "^0.1.0", + "extract-cbd-shape": "^0.1.1", "heap-js": "^2.3.0", "n3": "^1.17.2", "rdf-data-factory": "^1.1.2", "rdf-dereference": "^2.2.0", - "rdf-stores": "^1.0.0" + "rdf-stores": "^1.0.0", + "throttled-queue": "^2.1.4" }, "devDependencies": { "@jest/globals": "^29.7.0", diff --git a/tests/helper.ts b/tests/helper.ts index 57b1877..8704f3e 100644 --- a/tests/helper.ts +++ b/tests/helper.ts @@ -32,24 +32,31 @@ function relationToQuads(rel: Relation): Quad[] { } export async function read(stream: ReadableStream): Promise { - return new Promise(async (res) => { - const out: Member[] = []; - const reader = stream.getReader(); - - let el = await reader.read(); - while (el) { - if (el.done || !el.value) break; - out.push(el.value); - el = await reader.read(); - } + return new Promise(async (res, rej) => { + try { + const out: Member[] = []; + const reader = stream.getReader(); + + let el = await reader.read(); + while (el) { + if (el.done || !el.value) break; + out.push(el.value); + el = await reader.read(); + } - res(out); + res(out); + } catch (ex) { + console.log("expect", ex); + rej(ex); + } }); } export class Fragment { private members: { member: T; id: string }[] = []; private relations: Relation[] = []; + + private failCount = 0; delay?: number; constructor(delay?: number) { @@ -60,6 +67,11 @@ export class Fragment { ldesId: string, memberToQuads: (id: string, member: T) => Quad[], ): Quad[] { + if (this.failCount > 0) { + this.failCount -= 1; + throw "I'm failing, oh no"; + } + const out: Quad[] = []; for (let rel of this.relations) { out.push(...relationToQuads(rel)); @@ -73,6 +85,11 @@ export class Fragment { return out; } + setFailcount(count: number): typeof this { + this.failCount = count; + return this; + } + addMember(id: string, member: T): typeof this { this.members.push({ member, id }); return this; @@ -149,15 +166,20 @@ export class Tree { if (fragment.delay) { await new Promise((res) => setTimeout(res, fragment.delay)); } - quads.push(...fragment.toQuads(BASE + this.root(), this.memberToQuads)); + try { + quads.push(...fragment.toQuads(BASE + this.root(), this.memberToQuads)); - const respText = new Writer().quadsToString(quads); + const respText = new Writer().quadsToString(quads); - const resp = new Response(respText, { - headers: { "content-type": "text/turtle" }, - }); + const resp = new Response(respText, { + headers: { "content-type": "text/turtle" }, + }); - return resp; + return resp; + } catch (ex) { + const resp = new Response("I'm too loaded yo", { status: 429 }); + return resp; + } }); } } diff --git a/tests/unordered.test.ts b/tests/unordered.test.ts index 96d68b1..765b362 100644 --- a/tests/unordered.test.ts +++ b/tests/unordered.test.ts @@ -1,7 +1,7 @@ import { afterEach, beforeEach, describe, expect, test } from "@jest/globals"; import { read, Tree } from "./helper"; -import { replicateLDES } from "../lib/client"; +import { replicateLDES, retry_fetch } from "../lib/client"; import { intoConfig } from "../lib/config"; import { Parser } from "n3"; import { TREE } from "@treecg/types"; @@ -338,7 +338,7 @@ describe("more complex tree", () => { let added = false; - client.addPollCycle(() => { + client.on("poll", () => { console.log("Poll cycle!"); if (!added) { tree.fragment(tree.root()).addMember("b", 7); @@ -389,7 +389,7 @@ describe("more complex tree", () => { let added = false; - client.addPollCycle(() => { + client.on("poll", () => { console.log("Poll cycle!"); if (!added) { tree.fragment(tree.root()).addMember("b", 7); @@ -414,4 +414,71 @@ describe("more complex tree", () => { await reader.cancel(); }); + + test("Exponential backoff works", async () => { + const tree = new Tree( + (x, numb) => + new Parser().parse(`<${x}> ${numb}.`), + "http://example.com/value", + ); + tree.fragment(tree.root()).addMember("a", 5); + const frag = tree.newFragment(); + tree.fragment(tree.root()).relation(frag, "https://w3id.org/tree#relation"); + tree.fragment(frag).setFailcount(2).addMember("b", 7); + + const base = tree.base() + tree.root(); + const mock = tree.mock(); + global.fetch = mock; + + const client = replicateLDES( + intoConfig({ + url: base, + fetcher: { maxFetched: 2, concurrentRequests: 10 }, + }), + undefined, + undefined, + "none", + ); + + const members = await read(client.stream()); + expect(members.length).toBe(2); + }); + + test("Exponential backoff works, handle max retries", async () => { + const tree = new Tree( + (x, numb) => + new Parser().parse(`<${x}> ${numb}.`), + "http://example.com/value", + ); + tree.fragment(tree.root()).addMember("a", 5); + const frag = tree.newFragment(); + tree.fragment(tree.root()).relation(frag, "https://w3id.org/tree#relation"); + tree.fragment(frag).setFailcount(5).addMember("b", 7); + + const base = tree.base() + tree.root(); + const mock = tree.mock(); + global.fetch = mock; + + const client = replicateLDES( + intoConfig({ + url: base, + fetch: retry_fetch(fetch, [408, 425, 429, 500, 502, 503, 504], 100, 2), + }), + undefined, + undefined, + "none", + ); + + let thrown = false; + + try { + const members = await read(client.stream()); + console.log("Here", members); + } catch (ex) { + console.log("Throw", ex); + thrown = true; + } + + expect(thrown).toBeTruthy(); + }); });