Skip to content

Commit

Permalink
conflicts resolved
Browse files Browse the repository at this point in the history
  • Loading branch information
Adam Leos committed Aug 28, 2023
2 parents abcddec + 36546c3 commit fc2f501
Show file tree
Hide file tree
Showing 10 changed files with 1,104 additions and 6,021 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ dist
data
etc
node_modules
tmp
tmp
dwn.db
24 changes: 19 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,22 @@ cloudflared tunnel --url http://localhost:3000
# Configuration
Configuration can be set using environment variables

| Env Var | Description | Default |
| ------------------------- | ------------------------------------------------------------------------- | ------- |
| `DS_PORT` | Port that the server listens on | `3000` |
| `DS_MAX_RECORD_DATA_SIZE` | maximum size for `RecordsWrite` data. use `b`, `kb`, `mb`, `gb` for value | `1gb` |
| `DS_WEBSOCKET_SERVER` | whether to enable listening over `ws:`. values: `on`,`off` | `on` |
| Env Var | Description | Default |
| ------------------------- | ---------------------------------------------------------------------------------------| ---------------------- |
| `DS_PORT` | Port that the server listens on | `3000` |
| `DS_MAX_RECORD_DATA_SIZE` | maximum size for `RecordsWrite` data. use `b`, `kb`, `mb`, `gb` for value | `1gb` |
| `DS_WEBSOCKET_SERVER` | whether to enable listening over `ws:`. values: `on`,`off` | `on` |
| `DWN_STORAGE` | URL to use for storage by default. See [Storage Options](#storage-options) for details | `level://data` |
| `DWN_STORAGE_MESSAGES` | URL to use for storage of messages. | value of `DWN_STORAGE` |
| `DWN_STORAGE_DATA` | URL to use for data storage | value of `DWN_STORAGE` |
| `DWN_STORAGE_EVENTS` | URL to use for event storage | value of `DWN_STORAGE` |

# Storage Options
Several storage formats are supported, and may be configured with the `DWN_STORAGE_*` environment variables:

| Database | Example | Notes |
|------------|-------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| LevelDB | `level://data` | use three slashes for absolute paths, two for relative. Example shown uses directory `data` in the current working directory |
| Sqlite | `sqlite://dwn.db` | use three slashes for absolute paths, two for relative. Example shown creates a file `dwn.db` in the current working directory |
| MySQL | `mysql://user:pass@host/db?debug=true&timezone=-0700` | [all URL options documented here](https://github.com/mysqljs/mysql#connection-options) |
| PostgreSQL | `postgres:///dwn` | any options other than the URL scheme (`postgres://`) may also be specified via [standard environment variables](https://node-postgres.com/features/connecting#environment-variables) |
6,903 changes: 912 additions & 5,991 deletions package-lock.json

Large diffs are not rendered by default.

7 changes: 7 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,18 @@
},
"dependencies": {
"@tbd54566975/dwn-sdk-js": "0.2.1",
"@tbd54566975/dwn-sql-store": "^0.0.3",
"better-sqlite3": "^8.5.0",
"bytes": "3.1.2",
"cors": "2.8.5",
"express": "4.18.2",
"loglevel": "^1.8.1",
"loglevel-plugin-prefix": "^0.8.4",
"multiformats": "11.0.2",
"mysql2": "^3.6.0",
"node-fetch": "3.3.1",
"pg": "^8.11.2",
"pg-cursor": "^2.10.2",
"prom-client": "14.2.0",
"readable-stream": "4.3.0",
"response-time": "2.3.2",
Expand Down
11 changes: 8 additions & 3 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@ export const config = {
// port that server listens on
port : parseInt(process.env.DS_PORT || '3000'),
// whether to enable 'ws:'
webSocketServerEnabled : { 'on': true, 'off': false }[process.env.DS_WEBSOCKET_SERVER] ?? true
// TODO: add config option to change data path. will need to change dwn.ts to ensure that the option works
// inside and outside a docker container.
webSocketServerEnabled : { 'on': true, 'off': false }[process.env.DS_WEBSOCKET_SERVER] ?? true,
// where to store persistant data
messageStore : process.env.DWN_STORAGE_MESSAGES || process.env.DWN_STORAGE || 'level://data',
dataStore : process.env.DWN_STORAGE_DATA || process.env.DWN_STORAGE || 'level://data',
eventLog : process.env.DWN_STORAGE_EVENTS || process.env.DWN_STORAGE || 'level://data',

// log level - trace/debug/info/warn/error
logLevel: process.env.DWN_SERVER_LOG_LEVEL || 'INFO',
};
20 changes: 9 additions & 11 deletions src/dwn-server.ts
Original file line number Diff line number Diff line change
@@ -1,27 +1,32 @@
import type { Config } from './config.js';
import log from 'loglevel';
import prefix from 'loglevel-plugin-prefix';

import { Dwn } from '@tbd54566975/dwn-sdk-js';
import { DataStoreLevel, EventLogLevel, MessageStoreLevel } from '@tbd54566975/dwn-sdk-js/stores';

import { WsApi } from './ws-api.js';
import { HttpApi } from './http-api.js';
import { config as defaultConfig } from './config.js';
import { HttpServerShutdownHandler } from './lib/http-server-shutdown-handler.js';
import { setProcessHandlers } from './process-handlers.js';
import { getDWNConfig } from './storage.js';

export type DwnServerOptions = {
dwn?: Dwn;
config?: Config;
};

export class DwnServer {
dwn: Dwn;
dwn?: Dwn;
config: Config;
httpServerShutdownHandler: HttpServerShutdownHandler;

constructor(options: DwnServerOptions = {}) {
this.config = options.config ?? defaultConfig;
this.dwn = options.dwn;
log.setLevel(this.config.logLevel as log.LogLevelDesc);
prefix.reg(log);
prefix.apply(log);
}

async start() : Promise<void> {
Expand All @@ -31,19 +36,12 @@ export class DwnServer {

async listen(): Promise<void> {
if (!this.dwn) {
const dataStore = new DataStoreLevel({ blockstoreLocation: 'data/DATASTORE' });
const eventLog = new EventLogLevel({ location: 'data/EVENTLOG' });
const messageStore = new MessageStoreLevel({
blockstoreLocation : 'data/MESSAGESTORE',
indexLocation : 'data/INDEX'
});

this.dwn = await Dwn.create({ eventLog, dataStore, messageStore });
this.dwn = await Dwn.create(getDWNConfig(this.config));
}

const httpApi = new HttpApi(this.dwn);
const httpServer = httpApi.listen(this.config.port, () => {
console.log(`server listening on port ${this.config.port}`);
log.info(`server listening on port ${this.config.port}`);
});

this.httpServerShutdownHandler = new HttpServerShutdownHandler(httpServer);
Expand Down
23 changes: 13 additions & 10 deletions src/http-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@ import responseTime from 'response-time';

import cors from 'cors';
import express from 'express';
import { register, Histogram } from 'prom-client';
import { register } from 'prom-client';
import log from 'loglevel';

import { v4 as uuidv4 } from 'uuid';

import { jsonRpcApi } from './json-rpc-api.js';
import { JsonRpcRequest } from './lib/json-rpc.js';
import { createJsonRpcErrorResponse, JsonRpcErrorCodes } from './lib/json-rpc.js';
import { responseHistogram, requestCounter } from './metrics.js';

export class HttpApi {
api: Express;
Expand All @@ -20,13 +23,6 @@ export class HttpApi {
this.api = express();
this.dwn = dwn;

const responseHistogram = new Histogram({
name : 'http_response',
help : 'response histogram',
buckets : [50, 250, 500, 750, 1000],
labelNames : ['route', 'code'],
});

this.api.use(cors({ exposedHeaders: 'dwn-response' }));
this.api.use(responseTime((req: Request, res: Response, time) => {
const url = req.url === '/' ? '/jsonrpc' : req.url;
Expand All @@ -36,6 +32,7 @@ export class HttpApi {

const statusCode = res.statusCode.toString();
responseHistogram.labels(route, statusCode).observe(time);
log.info(req.method, decodeURI(req.url), res.statusCode);
}));

this.api.get('/health', (_req, res) => {
Expand Down Expand Up @@ -68,8 +65,9 @@ export class HttpApi {
return res.status(400).json(reply);
}

let dwnRpcRequest: JsonRpcRequest;
try {
dwnRequest = JSON.parse(dwnRequest);
dwnRpcRequest = JSON.parse(dwnRequest);
} catch (e) {
const reply = createJsonRpcErrorResponse(uuidv4(), JsonRpcErrorCodes.BadRequest, e.message);

Expand All @@ -82,14 +80,19 @@ export class HttpApi {
const requestDataStream = (parseInt(contentLength) > 0 || transferEncoding !== undefined) ? req : undefined;

const requestContext: RequestContext = { dwn: this.dwn, transport: 'http', dataStream: requestDataStream };
const { jsonRpcResponse, dataStream: responseDataStream } = await jsonRpcApi.handle(dwnRequest, requestContext as RequestContext);
const { jsonRpcResponse, dataStream: responseDataStream } = await jsonRpcApi.handle(dwnRpcRequest, requestContext as RequestContext);

// If the handler catches a thrown exception and returns a JSON RPC InternalError, return the equivalent
// HTTP 500 Internal Server Error with the response.
if (jsonRpcResponse.error) {
requestCounter.inc({ method: dwnRpcRequest.method, error: 1 });
return res.status(500).json(jsonRpcResponse);
}

requestCounter.inc({
method : dwnRpcRequest.method,
status : jsonRpcResponse?.result?.reply?.status?.code || 0,
});
if (responseDataStream) {
res.setHeader('content-type', 'application/octet-stream');
res.setHeader('dwn-response', JSON.stringify(jsonRpcResponse));
Expand Down
14 changes: 14 additions & 0 deletions src/metrics.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { Histogram, Counter } from 'prom-client';

export const requestCounter = new Counter({
name : 'dwn_requests_total',
help : 'all dwn requests processed',
labelNames : ['method', 'status', 'error'],
});

export const responseHistogram = new Histogram({
name : 'http_response',
help : 'response histogram',
buckets : [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10],
labelNames : ['route', 'code'],
});
110 changes: 110 additions & 0 deletions src/storage.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import { Config } from './config.js';
import { DataStore, EventLog, MessageStore, DwnConfig } from '@tbd54566975/dwn-sdk-js';
import { SqliteDialect, MysqlDialect, PostgresDialect, MessageStoreSql, DataStoreSql, EventLogSql, Dialect } from '@tbd54566975/dwn-sql-store';
import { DataStoreLevel, EventLogLevel, MessageStoreLevel } from '@tbd54566975/dwn-sdk-js/stores';

import Database from 'better-sqlite3';
import { createPool as MySQLCreatePool } from 'mysql2';
import pg from 'pg';
import Cursor from 'pg-cursor';

enum EStoreType {
DataStore,
MessageStore,
EventLog,
}

enum BackendTypes {
LEVEL = 'level',
SQLITE = 'sqlite',
MYSQL = 'mysql',
POSTGRES = 'postgres',
}

type StoreType = DataStore | EventLog | MessageStore;

export function getDWNConfig(config: Config): DwnConfig {
let dataStore: DataStore = getStore(config.dataStore, EStoreType.DataStore);
let eventLog: EventLog = getStore(config.eventLog, EStoreType.EventLog);
let messageStore: MessageStore = getStore(config.messageStore, EStoreType.MessageStore);

return { eventLog, dataStore, messageStore };
}

function getLevelStore(storeURI: URL, storeType: EStoreType) {
switch (storeType) {
case EStoreType.DataStore:
return new DataStoreLevel({
blockstoreLocation: storeURI.host + storeURI.pathname + '/DATASTORE',
});
case EStoreType.MessageStore:
return new MessageStoreLevel({
blockstoreLocation: storeURI.host + storeURI.pathname + '/DATASTORE',
});
case EStoreType.EventLog:
return new EventLogLevel({
location: storeURI.host + storeURI.pathname + '/EVENTLOG'
});
default:
throw new Error('Unexpected level store type');
}
}

function getDBStore(db: Dialect, storeType: EStoreType) {
switch (storeType) {
case EStoreType.DataStore:
return new DataStoreSql(db);
case EStoreType.MessageStore:
return new MessageStoreSql(db);
case EStoreType.EventLog:
return new EventLogSql(db);
default:
throw new Error('Unexpected db store type');
}
}

function getStore(storeString: string, storeType: EStoreType.DataStore): DataStore;
function getStore(storeString: string, storeType: EStoreType.EventLog): EventLog;
function getStore(storeString: string, storeType: EStoreType.MessageStore): MessageStore;
function getStore(storeString: string, storeType: EStoreType): StoreType {
const storeURI = new URL(storeString);

switch (storeURI.protocol.slice(0, -1)) {
case BackendTypes.LEVEL:
return getLevelStore(storeURI, storeType);

case BackendTypes.SQLITE:
case BackendTypes.MYSQL:
case BackendTypes.POSTGRES:
return getDBStore(getDBFromURI(storeURI), storeType);

default:
throw invalidStorageSchemeMessage(storeURI.protocol);
}
}

function getDBFromURI(u: URL): Dialect {
switch(u.protocol.slice(0, -1)) {
case BackendTypes.SQLITE:
return new SqliteDialect({
database: async () => new Database(u.host + u.pathname),
});
case BackendTypes.MYSQL:
return new MysqlDialect({
pool: async () => MySQLCreatePool(u.toString()),
});
case BackendTypes.POSTGRES:
return new PostgresDialect({
pool : async () => new pg.Pool({u}),
cursor : Cursor,
});
}
}

function invalidStorageSchemeMessage(protocol: string): string {
let schemes = [];
for (const [_, value] of Object.entries(BackendTypes)) {
schemes.push(value);
}
return 'Unknown storage protocol ' + protocol.slice(0, 1) + '! Please use one of: ' + schemes.join(', ') + '. For details, see README';
}
10 changes: 10 additions & 0 deletions src/ws-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { base64url } from 'multiformats/bases/base64';

import { jsonRpcApi } from './json-rpc-api.js';
import { createJsonRpcErrorResponse, JsonRpcErrorCodes, JsonRpcResponse } from './lib/json-rpc.js';
import { requestCounter } from './metrics.js';

const SOCKET_ISALIVE_SYMBOL = Symbol('isAlive');

Expand Down Expand Up @@ -80,6 +81,15 @@ export class WsApi {
const requestContext: RequestContext = { dwn, transport: 'ws', dataStream: requestDataStream };
const { jsonRpcResponse } = await jsonRpcApi.handle(dwnRequest, requestContext);

if (jsonRpcResponse.error) {
requestCounter.inc({ method: dwnRequest.method, error: 1 });
} else {
requestCounter.inc({
method : dwnRequest.method,
status : jsonRpcResponse?.result?.reply?.status?.code || 0,
});
}

const responseBuffer = WsApi.jsonRpcResponseToBuffer(jsonRpcResponse);
return socket.send(responseBuffer);
});
Expand Down

0 comments on commit fc2f501

Please sign in to comment.