Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
acomagu committed Apr 30, 2024
0 parents commit 1d5c7b5
Showing 7 changed files with 1,934 additions and 0 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
node_modules
73 changes: 73 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# DynamoDB DataLoader

This library provides a [DataLoader](https://github.com/graphql/dataloader) layer for efficient fetching from DynamoDB by caching and batching.

## Features

- Batch Loading: Combines multiple queries into fewer network requests to DynamoDB(only for `get` operation).
- Unified Caching: Caches are shared across get, query, and scan operations.
- But this shared caching is effective only in limited scenarios, such as when entries previously fetched using query or scan are accessed again using get. Also, the feature does not function when only parts of records are retrieved.

## Initializing the DataLoader

Define the schema for your tables, specifying each table's name and the attribute names that form the keys used in caching.

```typescript
import { DynamodbDataLoader, TableSchema } from '@acomagu/dynamodb-dataloader';

const tableSchemas: TableSchema[] = [
{ tableName: "Users", keyAttributeNames: ["userId"] },
{ tableName: "Posts", keyAttributeNames: ["userId", "postId"] }, // PK and SK
]; // Used to enable cache sharing across query, scan, and get operations.

const options = {
dynamodbClient: new DynamoDBClient({ /* AWS SDK configuration options */ }),
getOptions: { /* BatchGet options */ },
};

const dynamodbDataLoader = new DynamodbDataLoader(tableSchemas, options); // All arguments are optional.
```

## Fetching Data

### Get Operation

Fetch data for a specific user ID from the "Users" table using the getter DataLoader:

```typescript
const getUserRequest = {
TableName: "Users",
Key: { userId: "12345" }
};
const item = await dynamodbDataLoader.getter.load(getUserRequest);
```

### Query Operation

Example of querying posts for a specific user:

```typescript
const queryPostsRequest = {
TableName: "Posts",
KeyConditionExpression: "userId = :userId",
ExpressionAttributeValues: {
":userId": "12345",
},
};
const items = await dynamodbDataLoader.querier.load(queryPostsRequest);
```

### Scan Operation

Scanning for items with a specific filter:

```typescript
const scanRequest = {
TableName: "Posts",
FilterExpression: "contains(content, :content)",
ExpressionAttributeValues: {
":content": "DynamoDB",
},
};
const items = await dynamodbDataLoader.scanner.load(scanRequest);
```
63 changes: 63 additions & 0 deletions dynamodb-dataloader.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import * as dynamodb from '@aws-sdk/client-dynamodb';
import * as dynamodbLib from '@aws-sdk/lib-dynamodb';
import * as assert from 'node:assert/strict';
import test from 'node:test';
import { DynamodbDataLoader } from './dynamodb-dataloader.js';

const dynamodbClient = new dynamodb.DynamoDBClient({});

const tableName = `TestTable_${Math.random().toString(32).substring(2)}`;

test.before(async () => {
await dynamodbClient.send(new dynamodb.CreateTableCommand({
TableName: tableName,
AttributeDefinitions: [
{ AttributeName: 'pk', AttributeType: 'S' },
{ AttributeName: 'sk', AttributeType: 'B' },
],
KeySchema: [
{ KeyType: 'HASH', AttributeName: 'pk' },
{ KeyType: 'RANGE', AttributeName: 'sk' },
],
BillingMode: 'PAY_PER_REQUEST',
}));
await dynamodb.waitUntilTableExists({
client: dynamodbClient,
maxWaitTime: 60,
}, {
TableName: tableName,
});
await dynamodbClient.send(new dynamodbLib.BatchWriteCommand({
RequestItems: {
[tableName]: [
{
PutRequest: {
Item: {
pk: 'pk1',
sk: Buffer.from('sk1'),
attr: 'attr1',
},
},
}
],
},
}));
});

test.after(async () => {
await dynamodbClient.send(new dynamodb.DeleteTableCommand({
TableName: tableName,
}));
});

await test('dynamodbDataLoader', async () => {
const result: any = await new DynamodbDataLoader().getter.load({
TableName: tableName,
Key: {
pk: 'pk1',
sk: Buffer.from('sk1'),
},
});

assert.equal(result.attr, 'attr1');
});
215 changes: 215 additions & 0 deletions dynamodb-dataloader.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
import * as dynamodb from '@aws-sdk/client-dynamodb';
import * as dynamodbLib from '@aws-sdk/lib-dynamodb';
import * as dynamodbUtil from '@aws-sdk/util-dynamodb';
import DataLoader from 'dataloader';

export interface GetRequest {
TableName: string;
Key: Record<string, dynamodbUtil.NativeScalarAttributeValue>;
}

export type ScanRequest = Pick<dynamodbLib.ScanCommandInput,
| 'ConsistentRead'
| 'ExpressionAttributeNames'
| 'ExpressionAttributeValues'
| 'FilterExpression'
| 'IndexName'
| 'Limit'
| 'ProjectionExpression'
| 'ReturnConsumedCapacity'
| 'Segment'
| 'Select'
| 'TableName'
| 'TotalSegments'
>;

export type QueryRequest = Pick<dynamodbLib.QueryCommandInput,
| 'ConsistentRead'
| 'ExpressionAttributeNames'
| 'ExpressionAttributeValues'
| 'FilterExpression'
| 'IndexName'
| 'KeyConditionExpression'
| 'Limit'
| 'ProjectionExpression'
| 'ReturnConsumedCapacity'
| 'ScanIndexForward'
| 'Select'
| 'TableName'
>;

export interface TableSchema {
readonly tableName: string;
readonly keyAttributeNames: readonly [string] | readonly [string, string];
}

export class DynamodbDataLoader {
dynamodbClient: dynamodb.DynamoDBClient;
dynamodbDocumentClient: dynamodbLib.DynamoDBDocumentClient;

scanner = new DataLoader<ScanRequest, Record<string, unknown>[], string>(scanRequests =>
Promise.all(scanRequests.map(async scanRequest => {
const iter = dynamodbLib.paginateScan({
client: this.dynamodbDocumentClient,
}, scanRequest);

const items = [];
for await (const page of iter) {
if (page.Items) items.push(...page.Items);
}

if (this.tableSchemas && (!scanRequest.Select || scanRequest.Select === dynamodb.Select.ALL_ATTRIBUTES)) {
const tableSchema = this.tableSchemas.find(s => s.tableName === scanRequest.TableName);
if (!tableSchema) {
console.warn(`DynamoDB Dataloader: Could not find table schema of table ${scanRequest.TableName}`);
return items;
}

for (const item of items) {
const key = tableSchema.keyAttributeNames.reduce((key, attrName) => ({
...key,
[attrName]: item[attrName],
}), {});

if (Object.values(key).includes(undefined) || Object.values(key).includes(null)) continue;

this.getter.prime({
TableName: scanRequest.TableName ?? '',
Key: key,
}, item);
}
}

return items;
})),
{
cacheKeyFn(key) {
return JSON.stringify({
ExpressionAttributeNames: key.ExpressionAttributeNames,
ExpressionAttributeValues: key.ExpressionAttributeValues,
FilterExpression: key.FilterExpression,
Limit: key.Limit,
ProjectionExpression: key.ProjectionExpression,
Select: key.Select,
TableName: key.TableName,
});
},
},
);
querier = new DataLoader<QueryRequest, Record<string, unknown>[], string>(queryRequests =>
Promise.all(queryRequests.map(async queryRequest => {
const iter = dynamodbLib.paginateQuery({
client: this.dynamodbDocumentClient,
}, queryRequest);

const items = [];
for await (const page of iter) {
if (page.Items) items.push(...page.Items);
}

if (this.tableSchemas && (!queryRequest.Select || queryRequest.Select === dynamodb.Select.ALL_ATTRIBUTES)) {
const tableSchema = this.tableSchemas.find(s => s.tableName === queryRequest.TableName);
if (!tableSchema) {
console.warn(`DynamoDB Dataloader: Could not find table schema of table ${queryRequest.TableName}`);
return items;
}

for (const item of items) {
const key = tableSchema.keyAttributeNames.reduce((key, attrName) => ({
...key,
[attrName]: item[attrName],
}), {});

if (Object.values(key).includes(undefined) || Object.values(key).includes(null)) continue;

this.getter.prime({
TableName: queryRequest.TableName ?? '',
Key: key,
}, item);
}
}

return items;
})),
{
cacheKeyFn(key) {
return JSON.stringify({
ExpressionAttributeNames: key.ExpressionAttributeNames,
ExpressionAttributeValues: key.ExpressionAttributeValues,
FilterExpression: key.FilterExpression,
KeyConditionExpression: key.KeyConditionExpression,
Limit: key.Limit,
ProjectionExpression: key.ProjectionExpression,
Select: key.Select,
TableName: key.TableName,
});
},
},
);
getter = new DataLoader<GetRequest, unknown, string>(
async (getRequests) => {
const byTableName = Object.groupBy(getRequests, req => req.TableName);
let requestItems: dynamodb.BatchGetItemCommandInput['RequestItems'] = Object.fromEntries(Object.entries(byTableName).flatMap(([tableName, reqs]) => {
if (!reqs) return [];
return [[tableName, {
Keys: reqs.map(req => dynamodbUtil.marshall(req.Key)),

ConsistentRead: this.options?.getOptions?.ConsistentRead,
ExpressionAttributeNames: this.options?.getOptions?.ExpressionAttributeNames,
ProjectionExpression: this.options?.getOptions?.ProjectionExpression,
}]];
}));

let responses: Record<string, Record<string, dynamodb.AttributeValue>[]> = {};
while (requestItems && Object.values(requestItems).flat().length) {
const result: dynamodb.BatchGetItemCommandOutput = await this.dynamodbClient.send(new dynamodb.BatchGetItemCommand({
RequestItems: requestItems,
ReturnConsumedCapacity: this.options?.getOptions?.ReturnConsumedCapacity,
}));

responses = {
...responses,
...result.Responses,
};

requestItems = result.UnprocessedKeys;
}

const items = getRequests.map(getRequest =>
responses[getRequest.TableName]?.find(item =>
Object.entries(getRequest.Key).every(([attr, expected]) => {
const a = item[attr];
const b = dynamodbUtil.convertToAttr(expected);

if (a === b) return true;

if (a?.S) return a.S === b.S;
if (a?.N) return a.N === b.N;
if (a?.B) {
if (!b.B) return false;
return Buffer.from(a.B).equals(Buffer.from(b.B));
}

throw new Error(`Unexpected key: ${JSON.stringify(a)}`);
}),
),
);

return items.map(item => item ? dynamodbUtil.unmarshall(item) : item);
},
{
maxBatchSize: 100,
cacheKeyFn: ({ TableName, Key }) => {
return TableName + '|' + Object.keys(Key).sort().map(k => k + ':' + Key[k]).join('|');
},
},
);

constructor(readonly tableSchemas?: readonly TableSchema[], readonly options?: {
readonly dynamodbClient?: dynamodb.DynamoDBClient;
readonly getOptions?: Pick<dynamodb.KeysAndAttributes, 'ConsistentRead' | 'ProjectionExpression' | 'ExpressionAttributeNames'> & Pick<dynamodb.BatchGetItemCommandInput, 'ReturnConsumedCapacity'>;
}) {
this.dynamodbClient = options?.dynamodbClient ?? new dynamodb.DynamoDBClient({});
this.dynamodbDocumentClient = dynamodbLib.DynamoDBDocumentClient.from(this.dynamodbClient);
}
}
1,535 changes: 1,535 additions & 0 deletions package-lock.json

Large diffs are not rendered by default.

23 changes: 23 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{
"name": "@acomagu/dynamodb-dataloader",
"version": "1.0.0",
"type": "module",
"private": false,
"publishConfig": {
"access": "public"
},
"main": "./dynamodb-dataloader.js",
"types": "./dynamodb-dataloader.d.ts",
"scripts": {
"test": "node --loader ts-node/esm --test ./dynamodb-dataloader.test.ts"
},
"devDependencies": {
"@aws-sdk/client-dynamodb": "^3.564.0",
"@aws-sdk/lib-dynamodb": "^3.564.0",
"@aws-sdk/util-dynamodb": "^3.564.0",
"@types/node": "^20.12.7",
"dataloader": "^2.2.2",
"ts-node": "^10.9.2",
"typescript": "^5.4.5"
}
}
24 changes: 24 additions & 0 deletions tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"compilerOptions": {
"declaration": true,
"downlevelIteration": true,
"emitDecoratorMetadata": true,
"esModuleInterop": true,
"experimentalDecorators": true,
"incremental": true,
"inlineSourceMap": true,
"inlineSources": true,
"lib": ["esnext"],
"module": "nodenext",
"moduleResolution": "nodenext",
"noImplicitReturns": true,
"noUncheckedIndexedAccess": true,
"noUnusedLocals": true,
"strict": true,
"target": "es2022"
},
"ts-node": {
"logError": true,
"esm": true
}
}

0 comments on commit 1d5c7b5

Please sign in to comment.