Skip to content

Commit

Permalink
actually use concurrent requests
Browse files Browse the repository at this point in the history
small cleanup membermanager

catch member errors

don't error the stream controller on error

Failing to fetch a page, is actually fetching an empty page

stop emitting when errored

better start up and adapt rdf-connect processor

Bump version extract-cbd-shape
  • Loading branch information
ajuvercr committed Apr 18, 2024
1 parent 5a4cac3 commit 2d25b43
Show file tree
Hide file tree
Showing 13 changed files with 423 additions and 294 deletions.
56 changes: 40 additions & 16 deletions bin/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ import { Ordered, replicateLDES } from "../lib/client";
import { intoConfig } from "../lib/config";
import { Command, Option } from "commander";
import { Writer } from "n3";
import { enhanced_fetch, FetchConfig } from "../lib/utils";

const program = new Command();
let paramURL: string = "";
let paramFollow: boolean = false;
let polling: boolean = false;
let after: Date | undefined;
let before: Date | undefined;
let paramPollInterval: number;
Expand All @@ -20,8 +21,10 @@ let verbose: boolean = false;
let save: string | undefined;
let onlyDefaultGraph: boolean = false;
let loose: boolean = false;
let concurrent: number = 5;
let basicAuth: string | undefined;

let fetch_config: FetchConfig = {
retry: {},
};

program
.arguments("<url>")
Expand Down Expand Up @@ -69,21 +72,38 @@ program
"Allowed amount of concurrent HTTP request to the same domain",
"5",
)
.option(
"--retry-count <retry>",
"Retry count per failing request (0 is infinite)",
"3",
)
.option("--http-codes [codes...]", "What HTTP codes to retry")
.action((url: string, program) => {
urlIsView = program.urlIsView;
noShape = !program.shape;
save = program.save;
paramURL = url;
shapeFile = program.shapeFile;
paramFollow = program.follow;
polling = program.follow;
paramPollInterval = program.pollInterval;
ordered = program.ordered;
quiet = program.quiet;
verbose = program.verbose;
loose = program.loose;
onlyDefaultGraph = program.onlyDefaultGraph;
basicAuth = program.basicAuth;
concurrent = parseInt(program.concurrent);

fetch_config.concurrent = parseInt(program.concurrent);
if (program.basicAuth) {
fetch_config.auth = {
auth: program.basicAuth,
host: new URL(url).host,
type: "basic",
};
}
fetch_config.retry!.maxRetries = parseInt(program.retryCount);
if (program.httpCodes) {
fetch_config.retry!.codes = program.httpCodes.map(parseInt);
}

if (program.after) {
if (!isNaN(new Date(program.after).getTime())) {
Expand All @@ -110,28 +130,30 @@ async function main() {
intoConfig({
loose,
noShape,
polling: paramFollow,
polling: polling,
url: paramURL,
stateFile: save,
follow: paramFollow,
pollInterval: paramPollInterval,
fetcher: { maxFetched: 2, concurrentRequests: 10 },
urlIsView: urlIsView,
shapeFile,
onlyDefaultGraph,
after,
before,
basicAuth,
concurrent,
fetch: enhanced_fetch(fetch_config),
}),
undefined,
undefined,
ordered,
// intoConfig({ url: "http://marineregions.org/feed" }),
);

if (verbose) {
client.on("fragment", () => console.error("Fragment!"));
client.on("fragment", () => {
console.error("Fragment!");
});
}

if (!quiet) {
client.on("error", (error) => {
console.error("Error", error);
});
}

const reader = client.stream({ highWaterMark: 10 }).getReader();
Expand Down Expand Up @@ -170,4 +192,6 @@ async function main() {
}
}

main().catch(console.error);
main().catch(() => {
process.exit(1);
});
78 changes: 51 additions & 27 deletions lib/client.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
import { Config, intoConfig } from "./config";
import { Member } from "./page";
import rdfDereference, { RdfDereferencer } from "rdf-dereference";
import { FileStateFactory, NoStateFactory, State, StateFactory } from "./state";
import { FileStateFactory, NoStateFactory, StateFactory } from "./state";
import { CBDShapeExtractor } from "extract-cbd-shape";
import { RdfStore } from "rdf-stores";
import { DataFactory } from "rdf-data-factory";
import { Writer as NWriter } from "n3";
import { Quad_Object, Term } from "@rdfjs/types";
import {
enhanced_fetch,
extractMainNodeShape,
FetchConfig,
getObjects,
ModulatorFactory,
Notifier,
Expand All @@ -24,7 +26,7 @@ import type { Writer } from "@ajuvercr/js-runner";
export { intoConfig } from "./config";
export { retry_fetch, extractMainNodeShape } from "./utils";
export type { Member, Page, Relation } from "./page";
export type { Config, MediatorConfig, ShapeConfig } from "./config";
export type { Config, ShapeConfig } from "./config";

const log = debug("client");
const df = new DataFactory();
Expand All @@ -34,16 +36,12 @@ type Controller = ReadableStreamDefaultController<Member>;
export type Ordered = "ascending" | "descending" | "none";

export function replicateLDES(
config: Config,
states: {
membersState?: State;
fragmentState?: State;
dereferencer?: RdfDereferencer;
} = {},
streamId?: Term,
config: Partial<Config> & { url: string },
ordered: Ordered = "none",
dereferencer?: RdfDereferencer,
streamId?: Term,
): Client {
return new Client(config, states, streamId, ordered);
return new Client(intoConfig(config), ordered, dereferencer, streamId);
}

export type LDESInfo = {
Expand Down Expand Up @@ -193,15 +191,9 @@ export class Client {

constructor(
config: Config,
{
dereferencer,
}: {
membersState?: State;
fragmentState?: State;
dereferencer?: RdfDereferencer;
} = {},
stream?: Term,
ordered: Ordered = "none",
dereferencer?: RdfDereferencer,
stream?: Term,
) {
this.config = config;
this.dereferencer = dereferencer ?? rdfDereference;
Expand Down Expand Up @@ -371,8 +363,17 @@ export class Client {
}): ReadableStream<Member> {
const emitted = longPromise();
const config: UnderlyingDefaultSource = {
//
// Called when starting the stream
//
start: async (controller: Controller) => {
this.on("error", controller.error.bind(controller));
this.on("error", (error) => {
this.stateFactory.write();
this.memberManager.close();
this.fetcher.close();
controller.error(error);
});

this.modulatorFactory.pause();
await this.init(
(member) => {
Expand All @@ -383,17 +384,27 @@ export class Client {
this.modulatorFactory,
);
},

//
// Called when the internal buffer is not full
//
pull: async () => {
resetPromise(emitted);
this.modulatorFactory.unpause();
await emitted.waiting;
this.modulatorFactory.pause();
return;
},

//
// Called when canceled
//
cancel: async () => {
log("Canceled");
this.stateFactory.write();
console.log("Canceled");
this.strategy.cancle();
this.strategy.cancel();
this.memberManager.close();
this.fetcher.close();
},
};

Expand Down Expand Up @@ -433,9 +444,24 @@ export async function processor(
loose?: boolean,
urlIsView?: boolean,
verbose?: boolean,
fetch_config?: {
auth?: {
type: "basic";
auth: string;
host: string;
};
concurrent?: number;
retry?: {
codes: number[];
maxRetries: number;
};
},
) {
if (fetch_config?.auth) {
fetch_config.auth.host = new URL(url).host;
}
const client = replicateLDES(
intoConfig({
{
loose,
noShape,
shapeFile: shape,
Expand All @@ -444,13 +470,10 @@ export async function processor(
after,
before,
stateFile: save,
follow,
pollInterval: pollInterval,
fetcher: { maxFetched: 2, concurrentRequests: 10 },
urlIsView,
}),
undefined,
undefined,
fetch: fetch_config ? enhanced_fetch(fetch_config) : fetch,
},
<Ordered>ordered || "none",
);

Expand Down Expand Up @@ -500,3 +523,4 @@ export async function processor(
}
};
}

44 changes: 1 addition & 43 deletions lib/config.ts
Original file line number Diff line number Diff line change
@@ -1,49 +1,24 @@
import { NamedNode, Quad } from "@rdfjs/types";
import { DefaultFetcherConfig, FetcherConfig } from "./pageFetcher";
import {
handle_basic_auth,
limit_fetch_per_domain,
retry_fetch,
} from "./utils";

export interface ShapeConfig {
quads: Quad[];
shapeId: NamedNode;
}

export interface MediatorConfig {
maxRequests: number;
maxMembers: number;
}

const defaultMediatorConfig = {
maxRequests: 10,
maxMembers: 100,
};

export interface Config {
loose: boolean;
polling: boolean;
follow: boolean;
url: string;
urlIsView: boolean;
noShape: boolean;
stateFile?: string;
pollInterval: number;
mediator: MediatorConfig;
fetcher: FetcherConfig;
before?: Date;
after?: Date;
shape?: ShapeConfig;
shapeFile?: string;
onlyDefaultGraph?: boolean;
fetch?: typeof fetch;
basicAuth?: string;
concurrent?: number;
// Add flag to indicate in order (default true)
// Make sure that slower pages to first emit the first members
//
// Maybe we can go faster if we only emit the latests timestamp path members (maybe per version id)
}

export interface WithTarget {
Expand All @@ -55,35 +30,18 @@ const defaultConfig: Config = {
noShape: false,
loose: false,
polling: false,
follow: false,
url: "",
pollInterval: 200,
fetcher: DefaultFetcherConfig,
mediator: defaultMediatorConfig,
};

const defaultTarget: WithTarget = {
target: {},
};

export async function getConfig(): Promise<Config & WithTarget> {
// TODO: Get config from params
const extracted = {};
// TODO: Better merging of configs
return Object.assign({}, defaultConfig, defaultTarget, extracted);
return Object.assign({}, defaultConfig, defaultTarget);
}

export function intoConfig(config: Partial<Config>): Config {
if (!config.fetch) {
const fetch_f = config.basicAuth
? handle_basic_auth(fetch, config.basicAuth, new URL(config.url!))
: fetch;

config.fetch = limit_fetch_per_domain(
retry_fetch(fetch_f, [408, 425, 429, 500, 502, 503, 504, 404]),
config.concurrent || 5,
);
}

return Object.assign({}, defaultConfig, defaultTarget, config);
}
Loading

0 comments on commit 2d25b43

Please sign in to comment.