Skip to content

Commit

Permalink
add fetch option to config
Browse files Browse the repository at this point in the history
  • Loading branch information
ajuvercr committed Mar 18, 2024
1 parent eb8bc08 commit 410c687
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 38 deletions.
18 changes: 15 additions & 3 deletions bin/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,14 @@ program
.default("none"),
)
.option("-f, --follow", "follow the LDES, the client stays in sync")
.option("--after <after>", "follow only relations including members after a certain point in time")
.option("--before <before>", "follow only relations including members before a certain point in time")
.option(
"--after <after>",
"follow only relations including members after a certain point in time",
)
.option(
"--before <before>",
"follow only relations including members before a certain point in time",
)
.option("--poll-interval <number>", "specify poll interval")
.option("--shape-files [shapeFiles...]", "specify a shapefile")
.option(
Expand Down Expand Up @@ -88,6 +94,11 @@ program

program.parse(process.argv);

const f = global.fetch;
global.fetch = (req, options) => {
console.log("Global fetch", req);
return f(req, options);
};
async function main() {
const client = replicateLDES(
intoConfig({
Expand All @@ -103,7 +114,8 @@ async function main() {
shapeFiles,
onlyDefaultGraph,
after,
before
before,
// fetch: <typeof fetch>fetch_f,
}),
undefined,
undefined,
Expand Down
98 changes: 70 additions & 28 deletions lib/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,13 @@ import { CBDShapeExtractor } from "extract-cbd-shape";
import { RdfStore } from "rdf-stores";
import { DataFactory, Writer as NWriter } from "n3";
import { Quad_Object, Term } from "@rdfjs/types";
import { extractMainNodeShape, getObjects, ModulatorFactory, Notifier, streamToArray } from "./utils";
import {
extractMainNodeShape,
getObjects,
ModulatorFactory,
Notifier,
streamToArray,
} from "./utils";
import { LDES, SDS, SHACL, TREE } from "@treecg/types";
import { FetchedPage, Fetcher, longPromise, resetPromise } from "./pageFetcher";
import { Manager } from "./memberManager";
Expand Down Expand Up @@ -65,10 +71,11 @@ async function getInfo(

const resp = await rdfDereference.dereference(shapeFile, {
localFiles: true,
fetch: config.fetch,
});
const quads = await streamToArray(resp.data);
// Add retrieved quads to local stores
quads.forEach(q => {
quads.forEach((q) => {
tempShapeStore.addQuad(q);
shapeConfigStore.addQuad(q);
});
Expand All @@ -77,7 +84,7 @@ async function getInfo(
// We have to find the actual IRI/Blank Node of the main shape within the file
config.shapes.push({
quads,
shapeId: extractMainNodeShape(tempShapeStore)
shapeId: extractMainNodeShape(tempShapeStore),
});
} else {
config.shapes.push({
Expand Down Expand Up @@ -111,6 +118,7 @@ async function getInfo(
logger("Maybe find more info at %s", ldesId.value);
const resp = await dereferencer.dereference(ldesId.value, {
localFiles: true,
fetch: config.fetch,
});
store = RdfStore.createDefault();
await new Promise((resolve, reject) => {
Expand All @@ -125,7 +133,7 @@ async function getInfo(
timestampPaths.length,
isVersionOfPaths.length,
);
} catch (ex: any) { }
} catch (ex: any) {}
}

if (timestampPaths.length > 1) {
Expand All @@ -145,11 +153,18 @@ async function getInfo(

if (config.shapes) {
for (const shape of config.shapes) {
const memberType = getObjects(shapeConfigStore, shape.shapeId, SHACL.terms.targetClass)[0];
const memberType = getObjects(
shapeConfigStore,
shape.shapeId,
SHACL.terms.targetClass,
)[0];
if (memberType) {
shapeMap.set(memberType.value, shape.shapeId);
} else {
console.error("Ignoring SHACL shape without a declared sh:targetClass: ", shape.shapeId);
console.error(
"Ignoring SHACL shape without a declared sh:targetClass: ",
shape.shapeId,
);
}
}
} else {
Expand All @@ -158,7 +173,10 @@ async function getInfo(
if (memberType) {
shapeMap.set(memberType.value, shapeId);
} else {
console.error("Ignoring SHACL shape without a declared sh:targetClass: ", shapeId);
console.error(
"Ignoring SHACL shape without a declared sh:targetClass: ",
shapeId,
);
}
}
}
Expand All @@ -168,8 +186,10 @@ async function getInfo(
config.shapes && config.shapes.length > 0 ? shapeConfigStore : store,
dereferencer,
{
// Updated upstream
cbdDefaultGraph: config.onlyDefaultGraph,
},
fetch: config.fetch,
}, //Stashed changes
),
shapeMap: config.noShape ? undefined : shapeMap,
timestampPath: timestampPaths[0],
Expand Down Expand Up @@ -267,7 +287,11 @@ export class Client {
): Promise<void> {
const logger = log.extend("init");
// Fetch the url
const root = await fetchPage(this.config.url, this.dereferencer);
const root = await fetchPage(
this.config.url,
this.dereferencer,
this.config.fetch,
);
// Try to get a shape
// TODO Choose a view
const viewQuads = root.data.getQuads(null, TREE.terms.view, null, null);
Expand Down Expand Up @@ -312,19 +336,33 @@ export class Client {
throw "Can only emit members in order, if LDES is configured with timestampPath";
}

this.fetcher = new Fetcher(this.dereferencer, this.config.loose, this.config.after, this.config.before);
this.fetcher = new Fetcher(
this.dereferencer,
this.config.loose,
this.config.fetch,
this.config.after,
this.config.before,
);

const notifier: Notifier<StrategyEvents, {}> = {
fragment: () => this.emit("fragment", undefined),
member: (m) => {
// Check if member is within date constraints (if any)
if (this.config.before) {
if (m.timestamp && m.timestamp instanceof Date && m.timestamp > this.config.before) {
if (
m.timestamp &&
m.timestamp instanceof Date &&
m.timestamp > this.config.before
) {
return;
}
}
if (this.config.after) {
if (m.timestamp && m.timestamp instanceof Date && m.timestamp < this.config.after) {
if (
m.timestamp &&
m.timestamp instanceof Date &&
m.timestamp < this.config.after
) {
return;
}
}
Expand All @@ -343,22 +381,22 @@ export class Client {
this.strategy =
this.ordered !== "none"
? new OrderedStrategy(
this.memberManager,
this.fetcher,
notifier,
factory,
this.ordered,
this.config.polling,
this.config.pollInterval,
)
this.memberManager,
this.fetcher,
notifier,
factory,
this.ordered,
this.config.polling,
this.config.pollInterval,
)
: new UnorderedStrategy(
this.memberManager,
this.fetcher,
notifier,
factory,
this.config.polling,
this.config.pollInterval,
);
this.memberManager,
this.fetcher,
notifier,
factory,
this.config.polling,
this.config.pollInterval,
);

logger("Found %d views, choosing %s", viewQuads.length, ldesId.value);
this.strategy.start(ldesId.value);
Expand Down Expand Up @@ -403,8 +441,12 @@ export class Client {
async function fetchPage(
location: string,
dereferencer: RdfDereferencer,
fetch_f?: typeof fetch,
): Promise<FetchedPage> {
const resp = await dereferencer.dereference(location, { localFiles: true });
const resp = await dereferencer.dereference(location, {
localFiles: true,
fetch: fetch_f,
});
const url = resp.url;
const data = RdfStore.createDefault();
await new Promise((resolve, reject) => {
Expand Down
2 changes: 1 addition & 1 deletion lib/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ export interface Config {
shapes?: ShapeConfig[];
shapeFiles?: string[];
onlyDefaultGraph?: boolean;

fetch?: typeof fetch;
// Add flag to indicate in order (default true)
// Make sure that slower pages to first emit the first members
//
Expand Down
19 changes: 17 additions & 2 deletions lib/pageFetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,20 @@ export type Cache = {
export class Fetcher {
private dereferencer: RdfDereferencer;
private loose: boolean;
private fetch_f?: typeof fetch;
private after?: Date;
private before?: Date;

constructor(dereferencer: RdfDereferencer, loose: boolean, after?: Date, before?: Date) {
constructor(
dereferencer: RdfDereferencer,
loose: boolean,
fetch_f?: typeof fetch,
after?: Date,
before?: Date,
) {
this.dereferencer = dereferencer;
this.loose = loose;
this.fetch_f = fetch_f;
if (after) this.after = after;
if (before) this.before = before;
}
Expand All @@ -88,6 +96,7 @@ export class Fetcher {

const resp = await this.dereferencer.dereference(node.target, {
localFiles: true,
fetch: this.fetch_f,
});

node.target = resp.url;
Expand Down Expand Up @@ -131,7 +140,13 @@ export class Fetcher {
});
logger("Got data %s (%d quads)", node.target, quadCount);

for (let rel of extractRelations(data, namedNode(resp.url), this.loose, this.after, this.before)) {
for (let rel of extractRelations(
data,
namedNode(resp.url),
this.loose,
this.after,
this.before,
)) {
if (!node.expected.some((x) => x == rel.node)) {
notifier.relationFound({ from: node, target: rel }, state);
}
Expand Down
10 changes: 8 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,13 @@
"@types/debug": "^4.1.10",
"commander": "^11.1.0",
"debug": "^4.3.4",
"extract-cbd-shape": "^0.1.0",
"extract-cbd-shape": "^0.1.1",
"heap-js": "^2.3.0",
"n3": "^1.17.2",
"rdf-data-factory": "^1.1.2",
"rdf-dereference": "^2.2.0",
"rdf-stores": "^1.0.0"
"rdf-stores": "^1.0.0",
"throttled-queue": "^2.1.4"
},
"devDependencies": {
"@jest/globals": "^29.7.0",
Expand Down

0 comments on commit 410c687

Please sign in to comment.