Skip to content

Commit

Permalink
Upgrade extract cbd shape - and use RdfStore instead of N3.Store (#13)
Browse files Browse the repository at this point in the history
* Upgrade extract cbd shape - and use RdfStore instead of N3.Store

* Clean up imports

* Update packages

* formatting + fix code duplication

---------

Co-authored-by: ajuvercr <[email protected]>
  • Loading branch information
pietercolpaert and ajuvercr authored Feb 22, 2024
1 parent 2d3b3da commit c76f3f8
Show file tree
Hide file tree
Showing 8 changed files with 1,166 additions and 1,054 deletions.
57 changes: 32 additions & 25 deletions lib/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,25 @@ import { Member } from "./page";
import rdfDereference, { RdfDereferencer } from "rdf-dereference";
import { FileStateFactory, NoStateFactory, State, StateFactory } from "./state";
import { CBDShapeExtractor } from "extract-cbd-shape";
import { DataFactory, Quad_Object, Store, Writer as NWriter } from "n3";
import { Term } from "@rdfjs/types";
import { ModulatorFactory, Notifier, streamToArray } from "./utils";
import { RdfStore } from "rdf-stores";
import { DataFactory } from "rdf-data-factory";
import { Writer as NWriter } from "n3";
import { Quad_Object, Term } from "@rdfjs/types";
import { getObjects, ModulatorFactory, Notifier, streamToArray } from "./utils";
import { LDES, SDS, TREE } from "@treecg/types";
import { FetchedPage, Fetcher, longPromise, resetPromise } from "./pageFetcher";
import { Manager } from "./memberManager";
import { OrderedStrategy, StrategyEvents, UnorderedStrategy } from "./strategy";

// import * as JsRunner from "@ajuvercr/js-runner";
import debug from "debug";
import type { Writer } from "@ajuvercr/js-runner";

export { intoConfig } from "./config";
export type { Member, Page, Relation } from "./page";
export type { Config, MediatorConfig, ShapeConfig } from "./config";

// type B = JsRunner;
import debug from "debug";
const df = new DataFactory();
const log = debug("client");
const { namedNode, blankNode, quad } = DataFactory;
const { namedNode, blankNode, quad } = df;

type Controller = ReadableStreamDefaultController<Member>;

Expand Down Expand Up @@ -48,7 +49,7 @@ export type LDESInfo = {

async function getInfo(
ldesId: Term,
store: Store,
store: RdfStore,
dereferencer: RdfDereferencer,
noShape: boolean,
shapeConfig?: ShapeConfig,
Expand All @@ -71,15 +72,9 @@ async function getInfo(
};
}

let shapeIds = noShape
? []
: store.getObjects(ldesId, TREE.terms.shape, null);
let timestampPaths = store.getObjects(ldesId, LDES.terms.timestampPath, null);
let isVersionOfPaths = store.getObjects(
ldesId,
LDES.terms.versionOfPath,
null,
);
let shapeIds = noShape ? [] : getObjects(store, ldesId, TREE.terms.shape);
let timestampPaths = getObjects(store, ldesId, LDES.terms.timestampPath);
let isVersionOfPaths = getObjects(store, ldesId, LDES.terms.versionOfPath);

logger(
"Found %d shapes, %d timestampPaths, %d isVersionOfPaths",
Expand All @@ -99,10 +94,13 @@ async function getInfo(
const resp = await dereferencer.dereference(ldesId.value, {
localFiles: true,
});
store = new Store(await streamToArray(resp.data));
shapeIds = store.getObjects(null, TREE.terms.shape, null);
timestampPaths = store.getObjects(null, LDES.terms.timestampPath, null);
isVersionOfPaths = store.getObjects(null, LDES.terms.versionOfPath, null);
store = RdfStore.createDefault();
await new Promise((resolve, reject) => {
store.import(resp.data).on("end", resolve).on("error", reject);
});
shapeIds = getObjects(store, null, TREE.terms.shape);
timestampPaths = getObjects(store, null, LDES.terms.timestampPath);
isVersionOfPaths = getObjects(store, null, LDES.terms.versionOfPath);
logger(
"Found %d shapes, %d timestampPaths, %d isVersionOfPaths",
shapeIds.length,
Expand All @@ -128,9 +126,16 @@ async function getInfo(
);
}

let shapeConfigStore = RdfStore.createDefault();
if (shapeConfig) {
for (let quad of shapeConfig.quads) {
shapeConfigStore.addQuad(quad);
}
}

return {
extractor: new CBDShapeExtractor(
shapeConfig ? new Store(shapeConfig.quads) : store,
shapeConfig ? shapeConfigStore : store,
dereferencer,
),
shape: shapeConfig ? shapeConfig.shapeId : shapeIds[0],
Expand Down Expand Up @@ -359,8 +364,10 @@ async function fetchPage(
): Promise<FetchedPage> {
const resp = await dereferencer.dereference(location, { localFiles: true });
const url = resp.url;
const page = await streamToArray(resp.data);
const data = new Store(page);
const data = RdfStore.createDefault();
await new Promise((resolve, reject) => {
data.import(resp.data).on("end", resolve).on("error", reject);
});
return <FetchedPage>{ url, data };
}

Expand Down
12 changes: 8 additions & 4 deletions lib/memberManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import { TREE } from "@treecg/types";
import Heap from "heap-js";
import { LDESInfo } from "./client";
import debug from "debug";
import { Store } from "n3";
import { Notifier } from "./utils";
import { RdfStore } from "rdf-stores";

const log = debug("manager");

Expand All @@ -26,7 +26,11 @@ export type MemberEvents = {
extracted: Member;
done: Member[];
};

const getObjects = function (store: RdfStore, subject:Term|null, predicate: Term|null, graph?:Term|null) {
return store.getQuads(subject, predicate, null, graph).map((quad) => {
return quad.object;
});
}
export class Manager {
private members: Heap<Member>;
public queued: number = 0;
Expand Down Expand Up @@ -79,7 +83,7 @@ export class Manager {

private async extractMember(
member: Term,
data: Store,
data: RdfStore,
): Promise<Member | undefined> {
const quads = await this.extractor.extract(data, member, this.shapeId);

Expand Down Expand Up @@ -123,7 +127,7 @@ export class Manager {
notifier: Notifier<MemberEvents, S>,
) {
const logger = log.extend("extract");
const members = page.data.getObjects(this.ldesId, TREE.terms.member, null);
const members = getObjects(page.data, this.ldesId, TREE.terms.member, null);

logger("%d members", members.length);

Expand Down
28 changes: 12 additions & 16 deletions lib/page.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { Quad, Term } from "@rdfjs/types";
import { RDF, TREE } from "@treecg/types";
import { CBDShapeExtractor } from "extract-cbd-shape";
import { Store } from "n3";
import * as N3 from "n3";
import { State } from "./state";
import { RdfStore } from "rdf-stores";
import { getObjects } from "./utils";

export interface Member {
id: Term;
Expand All @@ -26,7 +26,7 @@ export interface Page {
}

export function extractMembers(
store: Store,
store: RdfStore,
stream: Term,
extractor: CBDShapeExtractor,
state: State,
Expand All @@ -35,15 +35,11 @@ export function extractMembers(
timestampPath?: Term,
isVersionOfPath?: Term,
): Promise<void>[] {
const members = store.getObjects(stream, TREE.terms.member, null);
const members = getObjects(store, stream, TREE.terms.member, null);

const extractMember = async (member: Term) => {
state.add(member.value);
const quads = await extractor.extract(
store,
<N3.Term>member,
<N3.Term>shapeId,
);
const quads = await extractor.extract(store, member, shapeId);
// Get timestamp
let timestamp: string | undefined;
if (timestampPath) {
Expand Down Expand Up @@ -74,21 +70,21 @@ export function extractMembers(
}

export function extractRelations(
store: Store,
store: RdfStore,
node: Term,
loose: boolean,
): Relation[] {
const relationIds = loose
? store.getObjects(null, TREE.terms.relation, null)
: store.getObjects(node, TREE.terms.relation, null);
? getObjects(store, null, TREE.terms.relation, null)
: getObjects(store, node, TREE.terms.relation, null);
const source = node.value;

const out: Relation[] = [];
for (let relationId of relationIds) {
const node = store.getObjects(relationId, TREE.terms.node, null)[0];
const ty = store.getObjects(relationId, RDF.terms.type, null);
const path = store.getObjects(relationId, TREE.terms.path, null)[0];
const value = store.getObjects(relationId, TREE.terms.value, null);
const node = getObjects(store, relationId, TREE.terms.node, null)[0];
const ty = getObjects(store, relationId, RDF.terms.type, null);
const path = getObjects(store, relationId, TREE.terms.path, null)[0];
const value = getObjects(store, relationId, TREE.terms.value, null);
out.push({
source,
node: node.value,
Expand Down
25 changes: 17 additions & 8 deletions lib/pageFetcher.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import { RdfDereferencer } from "rdf-dereference";
import { Notifier, streamToArray } from "./utils";
import { DataFactory, Store } from "n3";
import { Notifier } from "./utils";
import { extractRelations, Relation } from "./page";
import debug from "debug";
import { SimpleRelation } from "./relation";

import { RdfStore } from "rdf-stores";
import { DataFactory } from "rdf-data-factory";
const log = debug("fetcher");
const { namedNode } = DataFactory;
const { namedNode } = new DataFactory();

/**
* target: url to fetch
Expand All @@ -20,7 +20,7 @@ export type Node = {

export type FetchedPage = {
url: string;
data: Store;
data: RdfStore;
};

// At most concurrentRequests + maxFetched pages will be stored in memory
Expand Down Expand Up @@ -85,7 +85,6 @@ export class Fetcher {
const resp = await this.dereferencer.dereference(node.target, {
localFiles: true,
});
const page = await streamToArray(resp.data);

node.target = resp.url;

Expand Down Expand Up @@ -115,8 +114,18 @@ export class Fetcher {

logger("Cache for %s %o", node.target, cache);

const data = new Store(page);
logger("Got data %s (%d quads)", node.target, page.length);
const data = RdfStore.createDefault();
let quadCount = 0;
await new Promise((resolve, reject) => {
resp.data
.on("data", (quad) => {
data.addQuad(quad);
quadCount++;
})
.on("end", resolve)
.on("error", reject);
});
logger("Got data %s (%d quads)", node.target, quadCount);

for (let rel of extractRelations(data, namedNode(resp.url), this.loose)) {
if (!node.expected.some((x) => x == rel.node)) {
Expand Down
14 changes: 13 additions & 1 deletion lib/utils.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,23 @@
import { Stream } from "@rdfjs/types";
import { Stream, Term } from "@rdfjs/types";
import { BaseQuad } from "n3";
import { StateFactory, StateT } from "./state";
import { RdfStore } from "rdf-stores";

export type Notifier<Events, S> = {
[K in keyof Events]: (event: Events[K], state: S) => void;
};

export function getObjects(
store: RdfStore,
subject: Term | null,
predicate: Term | null,
graph?: Term | null,
) {
return store.getQuads(subject, predicate, null, graph).map((quad) => {
return quad.object;
});
}

export function readableToArray<T>(stream: ReadableStream<T>): Promise<T[]> {
const out: T[] = [];
const reader = stream.getReader();
Expand Down
Loading

0 comments on commit c76f3f8

Please sign in to comment.