Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add basic auth support #23

Merged
merged 3 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions bin/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ let verbose: boolean = false;
let save: string | undefined;
let onlyDefaultGraph: boolean = false;
let loose: boolean = false;
let basicAuth: string | undefined;

program
.arguments("<url>")
Expand Down Expand Up @@ -61,6 +62,7 @@ program
)
.option("-q --quiet", "be quiet")
.option("-v --verbose", "be verbose")
.option("--basic-auth <username>:<password>", "HTTP basic auth information")
.action((url: string, program) => {
urlIsView = program.urlIsView;
noShape = !program.shape;
Expand All @@ -74,6 +76,7 @@ program
verbose = program.verbose;
loose = program.loose;
onlyDefaultGraph = program.onlyDefaultGraph;
basicAuth = program.basicAuth;
if (program.after) {
if (!isNaN(new Date(program.after).getTime())) {
after = new Date(program.after);
Expand All @@ -94,11 +97,6 @@ program

program.parse(process.argv);

const f = global.fetch;
global.fetch = (req, options) => {
console.log("Global fetch", req);
return f(req, options);
};
async function main() {
const client = replicateLDES(
intoConfig({
Expand All @@ -115,7 +113,7 @@ async function main() {
onlyDefaultGraph,
after,
before,
// fetch: <typeof fetch>fetch_f,
basicAuth,
}),
undefined,
undefined,
Expand Down
5 changes: 2 additions & 3 deletions lib/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -187,10 +187,9 @@ async function getInfo(
config.shapes && config.shapes.length > 0 ? shapeConfigStore : store,
dereferencer,
{
// Updated upstream
cbdDefaultGraph: config.onlyDefaultGraph,
fetch: config.fetch,
}, //Stashed changes
},
),
shapeMap: config.noShape ? undefined : shapeMap,
timestampPath: timestampPaths[0],
Expand Down Expand Up @@ -430,7 +429,7 @@ export class Client {
},
cancel: async () => {
this.stateFactory.write();
console.log("Cancled");
console.log("Canceled");
this.strategy.cancle();
},
};
Expand Down
16 changes: 14 additions & 2 deletions lib/config.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import { NamedNode, Quad } from "@rdfjs/types";
import { DefaultFetcherConfig, FetcherConfig } from "./pageFetcher";
import { retry_fetch } from "./utils";
import {
handle_basic_auth,
limit_fetch_per_domain,
retry_fetch,
} from "./utils";

export interface ShapeConfig {
quads: Quad[];
Expand Down Expand Up @@ -34,6 +38,7 @@ export interface Config {
shapeFiles?: string[];
onlyDefaultGraph?: boolean;
fetch?: typeof fetch;
basicAuth?: string;
// Add flag to indicate in order (default true)
// Make sure that slower pages to first emit the first members
//
Expand Down Expand Up @@ -69,7 +74,14 @@ export async function getConfig(): Promise<Config & WithTarget> {

export function intoConfig(config: Partial<Config>): Config {
if (!config.fetch) {
config.fetch = retry_fetch(fetch, [408, 425, 429, 500, 502, 503, 504]);
const fetch_f = config.basicAuth
? handle_basic_auth(fetch, config.basicAuth, new URL(config.url!))
: fetch;

config.fetch = limit_fetch_per_domain(
retry_fetch(fetch_f, [408, 425, 429, 500, 502, 503, 504, 404]),
1,
);
}

return Object.assign({}, defaultConfig, defaultTarget, config);
Expand Down
98 changes: 96 additions & 2 deletions lib/utils.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { NamedNode, Quad_Subject, Stream, Term } from "@rdfjs/types";
import { NamedNode, Stream, Term } from "@rdfjs/types";
import { BaseQuad } from "n3";
import { StateFactory, StateT } from "./state";
import { RdfStore } from "rdf-stores";
import { RDF, SHACL } from "@treecg/types";
import debug from "debug";

export type Notifier<Events, S> = {
[K in keyof Events]: (event: Events[K], state: S) => void;
Expand Down Expand Up @@ -227,7 +228,6 @@ class ModulatorInstance<T> implements Modulator<T> {
this.notifier = notifier;
this.factory = factory;
for (let item of readd) {
console.log("Readding");
this.push(item.item);
}
}
Expand Down Expand Up @@ -289,19 +289,113 @@ class ModulatorInstance<T> implements Modulator<T> {
}
}

function urlToUrl(input: Parameters<typeof fetch>[0]): URL {
if (typeof input === "string") {
return new URL(input);
} else if (input instanceof URL) {
return input;
} else if (input instanceof Request) {
return new URL(input.url);
} else {
throw "Not a real url";
}
}

const log = debug("fetch");

export function limit_fetch_per_domain(
fetch_f: typeof fetch,
concurrent: number,
): typeof fetch {
const logger = log.extend("limit");
const domain_dict: { [domain: string]: Array<(value: void) => void> } = {};

const out: typeof fetch = async (input, init) => {
let url: URL = urlToUrl(input);
const domain = url.origin;

if (!(domain in domain_dict)) {
domain_dict[domain] = [];
}

const requests = domain_dict[domain];
await new Promise((res) => {
logger("%s capacity %d/%d", domain, requests.length, concurrent);
if (requests.length < concurrent) {
requests.push(res);
res({});
} else {
requests.push(res);
}
});
const resp = await fetch_f(input, init);

requests.shift();
for (let i = 0; i < concurrent; i++) {
if (requests[i]) {
requests[i]();
}
}

return resp;
};

return out;
}

export function handle_basic_auth(
fetch_f: typeof fetch,
basicAuth: string,
domain: URL,
): typeof fetch {
const logger = log.extend("auth");
let authRequired = false;

const basicAuthValue = `Basic ${Buffer.from(basicAuth).toString("base64")}`;
const setHeader = (init?: RequestInit): RequestInit => {
const reqInit = init || {};
const headers = new Headers(reqInit.headers);
headers.set("Authorization", basicAuthValue);
reqInit.headers = headers;
return reqInit;
};

const auth_f: typeof fetch = async (input, init) => {
let url: URL = urlToUrl(input);
if (authRequired && url.host === domain.host) {
return await fetch_f(input, setHeader(init));
}

const resp = await fetch_f(input, init);
if (resp.status === 401) {
logger("Unauthorized, adding basic auth");
if (url.host === domain.host) {
authRequired = true;
return await fetch_f(input, setHeader(init));
}
}

return resp;
};

return auth_f;
}

export function retry_fetch(
fetch_f: typeof fetch,
httpCodes: number[],
base = 500,
maxRetries = 5,
): typeof fetch {
const logger = log.extend("retry");
const retry: typeof fetch = async (input, init) => {
let tryCount = 0;
let retryTime = base;
while (tryCount < maxRetries) {
const resp = await fetch_f(input, init);
if (!resp.ok) {
if (httpCodes.some((x) => x == resp.status)) {
logger("Retry %s %d/%d", input, tryCount, maxRetries);
// Wait 500ms, 1 second, 2 seconds, 4 seconds, 8 seconds, fail
tryCount += 1;
await new Promise((res) => setTimeout(res, retryTime));
Expand Down
Loading
Loading