diff --git a/lib/client.ts b/lib/client.ts index 5805fa3..cff80f0 100644 --- a/lib/client.ts +++ b/lib/client.ts @@ -21,6 +21,7 @@ import debug from "debug"; import type { Writer } from "@ajuvercr/js-runner"; export { intoConfig } from "./config"; +export { retry_fetch } from "./utils"; export type { Member, Page, Relation } from "./page"; export type { Config, MediatorConfig, ShapeConfig } from "./config"; @@ -204,7 +205,9 @@ type EventReceiver = (params: T) => void; export type ClientEvents = { fragment: void; + mutable: void; poll: void; + error: any; }; export class Client { @@ -220,7 +223,6 @@ export class Client { private modulatorFactory; - private pollCycle: (() => void)[] = []; private stateFactory: StateFactory; private listeners: { @@ -276,10 +278,6 @@ export class Client { }); } - addPollCycle(cb: () => void) { - this.pollCycle.push(cb); - } - async init( emit: (member: Member) => void, close: () => void, @@ -345,6 +343,7 @@ export class Client { ); const notifier: Notifier = { + error: (ex: any) => this.emit("error", ex), fragment: () => this.emit("fragment", undefined), member: (m) => { // Check if member is within date constraints (if any) @@ -370,7 +369,9 @@ export class Client { }, pollCycle: () => { this.emit("poll", undefined); - this.pollCycle.forEach((cb) => cb()); + }, + mutable: () => { + this.emit("mutable", undefined); }, close: () => { this.stateFactory.write(); @@ -409,6 +410,7 @@ export class Client { const emitted = longPromise(); const config: UnderlyingDefaultSource = { start: async (controller: Controller) => { + this.on("error", controller.error.bind(controller)); this.modulatorFactory.pause(); await this.init( (member) => { diff --git a/lib/config.ts b/lib/config.ts index 028140e..c6726f1 100644 --- a/lib/config.ts +++ b/lib/config.ts @@ -1,5 +1,6 @@ import { NamedNode, Quad } from "@rdfjs/types"; import { DefaultFetcherConfig, FetcherConfig } from "./pageFetcher"; +import { retry_fetch } from "./utils"; export interface ShapeConfig { quads: Quad[]; @@ -67,5 +68,9 @@ export async function getConfig(): Promise { } export function intoConfig(config: Partial): Config { + if (!config.fetch) { + config.fetch = retry_fetch(fetch, [408, 425, 429, 500, 502, 503, 504]); + } + return Object.assign({}, defaultConfig, defaultTarget, config); } diff --git a/lib/pageFetcher.ts b/lib/pageFetcher.ts index c179ddd..a274f89 100644 --- a/lib/pageFetcher.ts +++ b/lib/pageFetcher.ts @@ -61,8 +61,8 @@ export interface Helper { export type FetchEvent = { relationFound: { from: Node; target: Relation }; pageFetched: FetchedPage; - // seen: {}; scheduleFetch: Node; + error: any; }; export type Cache = { @@ -94,65 +94,69 @@ export class Fetcher { async fetch(node: Node, state: S, notifier: Notifier) { const logger = log.extend("fetch"); - const resp = await this.dereferencer.dereference(node.target, { - localFiles: true, - fetch: this.fetch_f, - }); - - node.target = resp.url; - - const cache = {} as Cache; - if (resp.headers) { - const cacheControlCandidate = resp.headers.get("cache-control"); - if (cacheControlCandidate) { - const controls = cacheControlCandidate - .split(",") - .map((x) => x.split("=", 2).map((x) => x.trim())); - - for (let control of controls) { - if (control[0] == "max-age") { - cache.maxAge = parseInt(control[1]); - } - - if (control[0] == "immutable") { - cache.immutable = true; + try { + const resp = await this.dereferencer.dereference(node.target, { + localFiles: true, + fetch: this.fetch_f, + }); + + node.target = resp.url; + + const cache = {} as Cache; + if (resp.headers) { + const cacheControlCandidate = resp.headers.get("cache-control"); + if (cacheControlCandidate) { + const controls = cacheControlCandidate + .split(",") + .map((x) => x.split("=", 2).map((x) => x.trim())); + + for (let control of controls) { + if (control[0] == "max-age") { + cache.maxAge = parseInt(control[1]); + } + + if (control[0] == "immutable") { + cache.immutable = true; + } } } } - } - if (!cache.immutable) { - notifier.scheduleFetch(node, state); - } + if (!cache.immutable) { + notifier.scheduleFetch(node, state); + } - logger("Cache for %s %o", node.target, cache); - - const data = RdfStore.createDefault(); - let quadCount = 0; - await new Promise((resolve, reject) => { - resp.data - .on("data", (quad) => { - data.addQuad(quad); - quadCount++; - }) - .on("end", resolve) - .on("error", reject); - }); - logger("Got data %s (%d quads)", node.target, quadCount); - - for (let rel of extractRelations( - data, - namedNode(resp.url), - this.loose, - this.after, - this.before, - )) { - if (!node.expected.some((x) => x == rel.node)) { - notifier.relationFound({ from: node, target: rel }, state); + logger("Cache for %s %o", node.target, cache); + + const data = RdfStore.createDefault(); + let quadCount = 0; + await new Promise((resolve, reject) => { + resp.data + .on("data", (quad) => { + data.addQuad(quad); + quadCount++; + }) + .on("end", resolve) + .on("error", reject); + }); + logger("Got data %s (%d quads)", node.target, quadCount); + + for (let rel of extractRelations( + data, + namedNode(resp.url), + this.loose, + this.after, + this.before, + )) { + if (!node.expected.some((x) => x == rel.node)) { + notifier.relationFound({ from: node, target: rel }, state); + } } - } - // TODO check this, is node.target correct? - notifier.pageFetched({ data, url: resp.url }, state); + // TODO check this, is node.target correct? + notifier.pageFetched({ data, url: resp.url }, state); + } catch (ex) { + notifier.error(ex, state); + } } } diff --git a/lib/strategy/index.ts b/lib/strategy/index.ts index f473405..99e6eba 100644 --- a/lib/strategy/index.ts +++ b/lib/strategy/index.ts @@ -30,6 +30,8 @@ export type PageAndRelation = { export type StrategyEvents = { member: Member; fragment: {}; + mutable: {}; pollCycle: {}; close: {}; + error: any; }; diff --git a/lib/strategy/ordered.ts b/lib/strategy/ordered.ts index 18e6692..6e15ec4 100644 --- a/lib/strategy/ordered.ts +++ b/lib/strategy/ordered.ts @@ -81,9 +81,13 @@ export class OrderedStrategy { // start member extraction // - relationFound: a relation has been found, put the extended chain in the queue this.fetchNotifier = { + error: (error: any) => { + this.notifier.error(error, {}); + }, scheduleFetch: ({ target, expected }, { chain }) => { chain.target = target; this.toPoll.push({ chain, expected }); + this.notifier.mutable({}, {}); }, pageFetched: (page, { chain, index }) => { logger("Page fetched %s", page.url); diff --git a/lib/strategy/unordered.ts b/lib/strategy/unordered.ts index b75bcfd..98f4fec 100644 --- a/lib/strategy/unordered.ts +++ b/lib/strategy/unordered.ts @@ -43,8 +43,12 @@ export class UnorderedStrategy { // start member extraction // - relationFound: a relation has been found, inFlight += 1 and put it in the queue this.fetchNotifier = { + error: (error: any) => { + this.notifier.error(error, {}); + }, scheduleFetch: (node: Node) => { this.cacheList.push(node); + this.notifier.mutable({}, {}); }, pageFetched: (page, { index }) => this.handleFetched(page, index), relationFound: ({ from, target }) => { @@ -54,8 +58,8 @@ export class UnorderedStrategy { }, }; - // Callbacks for the member extractor - // - done: all members have been extracted, we are finally done with a page inFlight -= 1 + // Callbacks for the member extractor + // - done: all members have been extracted, we are finally done with a page inFlight -= 1 // - extracted: a member has been found, yeet it this.memberNotifier = { done: () => { diff --git a/lib/utils.ts b/lib/utils.ts index edb87e9..4478941 100644 --- a/lib/utils.ts +++ b/lib/utils.ts @@ -73,28 +73,35 @@ export function streamToArray( * If more than one is found an exception is thrown. */ export function extractMainNodeShape(store: RdfStore): NamedNode { - const nodeShapes = getSubjects(store, RDF.terms.type, SHACL.terms.NodeShape, null); + const nodeShapes = getSubjects( + store, + RDF.terms.type, + SHACL.terms.NodeShape, + null, + ); let mainNodeShape = null; if (nodeShapes && nodeShapes.length > 0) { - for (const ns of nodeShapes) { - const isNotReferenced = getSubjects(store, null, ns, null).length === 0; - - if (isNotReferenced) { - if (!mainNodeShape) { - mainNodeShape = ns; - } else { - throw new Error("There are multiple main node shapes in a given shape graph. Unrelated shapes must be given as separate shape graphs"); - } + for (const ns of nodeShapes) { + const isNotReferenced = getSubjects(store, null, ns, null).length === 0; + + if (isNotReferenced) { + if (!mainNodeShape) { + mainNodeShape = ns; + } else { + throw new Error( + "There are multiple main node shapes in a given shape graph. Unrelated shapes must be given as separate shape graphs", + ); } - } - if (mainNodeShape) { - return mainNodeShape; - } else { - throw new Error("No main SHACL Node Shapes found in given shape graph"); - } + } + } + if (mainNodeShape) { + return mainNodeShape; + } else { + throw new Error("No main SHACL Node Shapes found in given shape graph"); + } } else { - throw new Error("No SHACL Node Shapes found in given shape graph"); + throw new Error("No SHACL Node Shapes found in given shape graph"); } } @@ -281,3 +288,33 @@ class ModulatorInstance implements Modulator { } } } + +export function retry_fetch( + fetch_f: typeof fetch, + httpCodes: number[], + base = 500, + maxRetries = 5, +): typeof fetch { + const retry: typeof fetch = async (input, init) => { + let tryCount = 0; + let retryTime = base; + while (tryCount < maxRetries) { + const resp = await fetch_f(input, init); + if (!resp.ok) { + if (httpCodes.some((x) => x == resp.status)) { + // Wait 500ms, 1 second, 2 seconds, 4 seconds, 8 seconds, fail + tryCount += 1; + await new Promise((res) => setTimeout(res, retryTime)); + retryTime *= 2; + continue; + } + return resp; + } + return resp; + } + + throw "Max retries"; + }; + + return retry; +} diff --git a/package.json b/package.json index 09e30ac..87fbb8f 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "ldes-client", "description": "This package provides common tooling to work with LDESes.", - "version": "0.0.9-alpha.0", + "version": "0.0.9-alpha.2", "main": "dist/lib/client.js", "bin": { "ldes-client": "dist/bin/cli.js" diff --git a/tests/helper.ts b/tests/helper.ts index 57b1877..8704f3e 100644 --- a/tests/helper.ts +++ b/tests/helper.ts @@ -32,24 +32,31 @@ function relationToQuads(rel: Relation): Quad[] { } export async function read(stream: ReadableStream): Promise { - return new Promise(async (res) => { - const out: Member[] = []; - const reader = stream.getReader(); - - let el = await reader.read(); - while (el) { - if (el.done || !el.value) break; - out.push(el.value); - el = await reader.read(); - } + return new Promise(async (res, rej) => { + try { + const out: Member[] = []; + const reader = stream.getReader(); + + let el = await reader.read(); + while (el) { + if (el.done || !el.value) break; + out.push(el.value); + el = await reader.read(); + } - res(out); + res(out); + } catch (ex) { + console.log("expect", ex); + rej(ex); + } }); } export class Fragment { private members: { member: T; id: string }[] = []; private relations: Relation[] = []; + + private failCount = 0; delay?: number; constructor(delay?: number) { @@ -60,6 +67,11 @@ export class Fragment { ldesId: string, memberToQuads: (id: string, member: T) => Quad[], ): Quad[] { + if (this.failCount > 0) { + this.failCount -= 1; + throw "I'm failing, oh no"; + } + const out: Quad[] = []; for (let rel of this.relations) { out.push(...relationToQuads(rel)); @@ -73,6 +85,11 @@ export class Fragment { return out; } + setFailcount(count: number): typeof this { + this.failCount = count; + return this; + } + addMember(id: string, member: T): typeof this { this.members.push({ member, id }); return this; @@ -149,15 +166,20 @@ export class Tree { if (fragment.delay) { await new Promise((res) => setTimeout(res, fragment.delay)); } - quads.push(...fragment.toQuads(BASE + this.root(), this.memberToQuads)); + try { + quads.push(...fragment.toQuads(BASE + this.root(), this.memberToQuads)); - const respText = new Writer().quadsToString(quads); + const respText = new Writer().quadsToString(quads); - const resp = new Response(respText, { - headers: { "content-type": "text/turtle" }, - }); + const resp = new Response(respText, { + headers: { "content-type": "text/turtle" }, + }); - return resp; + return resp; + } catch (ex) { + const resp = new Response("I'm too loaded yo", { status: 429 }); + return resp; + } }); } } diff --git a/tests/unordered.test.ts b/tests/unordered.test.ts index 96d68b1..765b362 100644 --- a/tests/unordered.test.ts +++ b/tests/unordered.test.ts @@ -1,7 +1,7 @@ import { afterEach, beforeEach, describe, expect, test } from "@jest/globals"; import { read, Tree } from "./helper"; -import { replicateLDES } from "../lib/client"; +import { replicateLDES, retry_fetch } from "../lib/client"; import { intoConfig } from "../lib/config"; import { Parser } from "n3"; import { TREE } from "@treecg/types"; @@ -338,7 +338,7 @@ describe("more complex tree", () => { let added = false; - client.addPollCycle(() => { + client.on("poll", () => { console.log("Poll cycle!"); if (!added) { tree.fragment(tree.root()).addMember("b", 7); @@ -389,7 +389,7 @@ describe("more complex tree", () => { let added = false; - client.addPollCycle(() => { + client.on("poll", () => { console.log("Poll cycle!"); if (!added) { tree.fragment(tree.root()).addMember("b", 7); @@ -414,4 +414,71 @@ describe("more complex tree", () => { await reader.cancel(); }); + + test("Exponential backoff works", async () => { + const tree = new Tree( + (x, numb) => + new Parser().parse(`<${x}> ${numb}.`), + "http://example.com/value", + ); + tree.fragment(tree.root()).addMember("a", 5); + const frag = tree.newFragment(); + tree.fragment(tree.root()).relation(frag, "https://w3id.org/tree#relation"); + tree.fragment(frag).setFailcount(2).addMember("b", 7); + + const base = tree.base() + tree.root(); + const mock = tree.mock(); + global.fetch = mock; + + const client = replicateLDES( + intoConfig({ + url: base, + fetcher: { maxFetched: 2, concurrentRequests: 10 }, + }), + undefined, + undefined, + "none", + ); + + const members = await read(client.stream()); + expect(members.length).toBe(2); + }); + + test("Exponential backoff works, handle max retries", async () => { + const tree = new Tree( + (x, numb) => + new Parser().parse(`<${x}> ${numb}.`), + "http://example.com/value", + ); + tree.fragment(tree.root()).addMember("a", 5); + const frag = tree.newFragment(); + tree.fragment(tree.root()).relation(frag, "https://w3id.org/tree#relation"); + tree.fragment(frag).setFailcount(5).addMember("b", 7); + + const base = tree.base() + tree.root(); + const mock = tree.mock(); + global.fetch = mock; + + const client = replicateLDES( + intoConfig({ + url: base, + fetch: retry_fetch(fetch, [408, 425, 429, 500, 502, 503, 504], 100, 2), + }), + undefined, + undefined, + "none", + ); + + let thrown = false; + + try { + const members = await read(client.stream()); + console.log("Here", members); + } catch (ex) { + console.log("Throw", ex); + thrown = true; + } + + expect(thrown).toBeTruthy(); + }); });