Skip to content

Commit

Permalink
add basic auth support (#23)
Browse files Browse the repository at this point in the history
* add basic auth support

* bump version
  • Loading branch information
ajuvercr authored Mar 26, 2024
1 parent 2822557 commit 2408a51
Show file tree
Hide file tree
Showing 6 changed files with 1,704 additions and 344 deletions.
10 changes: 4 additions & 6 deletions bin/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ let verbose: boolean = false;
let save: string | undefined;
let onlyDefaultGraph: boolean = false;
let loose: boolean = false;
let basicAuth: string | undefined;

program
.arguments("<url>")
Expand Down Expand Up @@ -61,6 +62,7 @@ program
)
.option("-q --quiet", "be quiet")
.option("-v --verbose", "be verbose")
.option("--basic-auth <username>:<password>", "HTTP basic auth information")
.action((url: string, program) => {
urlIsView = program.urlIsView;
noShape = !program.shape;
Expand All @@ -74,6 +76,7 @@ program
verbose = program.verbose;
loose = program.loose;
onlyDefaultGraph = program.onlyDefaultGraph;
basicAuth = program.basicAuth;
if (program.after) {
if (!isNaN(new Date(program.after).getTime())) {
after = new Date(program.after);
Expand All @@ -94,11 +97,6 @@ program

program.parse(process.argv);

const f = global.fetch;
global.fetch = (req, options) => {
console.log("Global fetch", req);
return f(req, options);
};
async function main() {
const client = replicateLDES(
intoConfig({
Expand All @@ -115,7 +113,7 @@ async function main() {
onlyDefaultGraph,
after,
before,
// fetch: <typeof fetch>fetch_f,
basicAuth,
}),
undefined,
undefined,
Expand Down
5 changes: 2 additions & 3 deletions lib/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -187,10 +187,9 @@ async function getInfo(
config.shapes && config.shapes.length > 0 ? shapeConfigStore : store,
dereferencer,
{
// Updated upstream
cbdDefaultGraph: config.onlyDefaultGraph,
fetch: config.fetch,
}, //Stashed changes
},
),
shapeMap: config.noShape ? undefined : shapeMap,
timestampPath: timestampPaths[0],
Expand Down Expand Up @@ -430,7 +429,7 @@ export class Client {
},
cancel: async () => {
this.stateFactory.write();
console.log("Cancled");
console.log("Canceled");
this.strategy.cancle();
},
};
Expand Down
16 changes: 14 additions & 2 deletions lib/config.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import { NamedNode, Quad } from "@rdfjs/types";
import { DefaultFetcherConfig, FetcherConfig } from "./pageFetcher";
import { retry_fetch } from "./utils";
import {
handle_basic_auth,
limit_fetch_per_domain,
retry_fetch,
} from "./utils";

export interface ShapeConfig {
quads: Quad[];
Expand Down Expand Up @@ -34,6 +38,7 @@ export interface Config {
shapeFiles?: string[];
onlyDefaultGraph?: boolean;
fetch?: typeof fetch;
basicAuth?: string;
// Add flag to indicate in order (default true)
// Make sure that slower pages to first emit the first members
//
Expand Down Expand Up @@ -69,7 +74,14 @@ export async function getConfig(): Promise<Config & WithTarget> {

export function intoConfig(config: Partial<Config>): Config {
if (!config.fetch) {
config.fetch = retry_fetch(fetch, [408, 425, 429, 500, 502, 503, 504]);
const fetch_f = config.basicAuth
? handle_basic_auth(fetch, config.basicAuth, new URL(config.url!))
: fetch;

config.fetch = limit_fetch_per_domain(
retry_fetch(fetch_f, [408, 425, 429, 500, 502, 503, 504, 404]),
1,
);
}

return Object.assign({}, defaultConfig, defaultTarget, config);
Expand Down
98 changes: 96 additions & 2 deletions lib/utils.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { NamedNode, Quad_Subject, Stream, Term } from "@rdfjs/types";
import { NamedNode, Stream, Term } from "@rdfjs/types";
import { BaseQuad } from "n3";
import { StateFactory, StateT } from "./state";
import { RdfStore } from "rdf-stores";
import { RDF, SHACL } from "@treecg/types";
import debug from "debug";

export type Notifier<Events, S> = {
[K in keyof Events]: (event: Events[K], state: S) => void;
Expand Down Expand Up @@ -227,7 +228,6 @@ class ModulatorInstance<T> implements Modulator<T> {
this.notifier = notifier;
this.factory = factory;
for (let item of readd) {
console.log("Readding");
this.push(item.item);
}
}
Expand Down Expand Up @@ -289,19 +289,113 @@ class ModulatorInstance<T> implements Modulator<T> {
}
}

function urlToUrl(input: Parameters<typeof fetch>[0]): URL {
if (typeof input === "string") {
return new URL(input);
} else if (input instanceof URL) {
return input;
} else if (input instanceof Request) {
return new URL(input.url);
} else {
throw "Not a real url";
}
}

const log = debug("fetch");

export function limit_fetch_per_domain(
fetch_f: typeof fetch,
concurrent: number,
): typeof fetch {
const logger = log.extend("limit");
const domain_dict: { [domain: string]: Array<(value: void) => void> } = {};

const out: typeof fetch = async (input, init) => {
let url: URL = urlToUrl(input);
const domain = url.origin;

if (!(domain in domain_dict)) {
domain_dict[domain] = [];
}

const requests = domain_dict[domain];
await new Promise((res) => {
logger("%s capacity %d/%d", domain, requests.length, concurrent);
if (requests.length < concurrent) {
requests.push(res);
res({});
} else {
requests.push(res);
}
});
const resp = await fetch_f(input, init);

requests.shift();
for (let i = 0; i < concurrent; i++) {
if (requests[i]) {
requests[i]();
}
}

return resp;
};

return out;
}

export function handle_basic_auth(
fetch_f: typeof fetch,
basicAuth: string,
domain: URL,
): typeof fetch {
const logger = log.extend("auth");
let authRequired = false;

const basicAuthValue = `Basic ${Buffer.from(basicAuth).toString("base64")}`;
const setHeader = (init?: RequestInit): RequestInit => {
const reqInit = init || {};
const headers = new Headers(reqInit.headers);
headers.set("Authorization", basicAuthValue);
reqInit.headers = headers;
return reqInit;
};

const auth_f: typeof fetch = async (input, init) => {
let url: URL = urlToUrl(input);
if (authRequired && url.host === domain.host) {
return await fetch_f(input, setHeader(init));
}

const resp = await fetch_f(input, init);
if (resp.status === 401) {
logger("Unauthorized, adding basic auth");
if (url.host === domain.host) {
authRequired = true;
return await fetch_f(input, setHeader(init));
}
}

return resp;
};

return auth_f;
}

export function retry_fetch(
fetch_f: typeof fetch,
httpCodes: number[],
base = 500,
maxRetries = 5,
): typeof fetch {
const logger = log.extend("retry");
const retry: typeof fetch = async (input, init) => {
let tryCount = 0;
let retryTime = base;
while (tryCount < maxRetries) {
const resp = await fetch_f(input, init);
if (!resp.ok) {
if (httpCodes.some((x) => x == resp.status)) {
logger("Retry %s %d/%d", input, tryCount, maxRetries);
// Wait 500ms, 1 second, 2 seconds, 4 seconds, 8 seconds, fail
tryCount += 1;
await new Promise((res) => setTimeout(res, retryTime));
Expand Down
Loading

0 comments on commit 2408a51

Please sign in to comment.