Skip to content

Commit

Permalink
refactor write without data flow
Browse files Browse the repository at this point in the history
  • Loading branch information
LiranCohen committed Nov 30, 2023
1 parent e57db06 commit fc3e998
Show file tree
Hide file tree
Showing 4 changed files with 295 additions and 131 deletions.
1 change: 1 addition & 0 deletions src/core/dwn-error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ export enum DwnErrorCode {
RecordsWriteMissingAuthorizationSigner = 'RecordsWriteMissingAuthorizationSigner',
RecordsWriteMissingSigner = 'RecordsWriteMissingSigner',
RecordsWriteMissingDataInPrevious = 'RecordsWriteMissingDataInPrevious',
RecordsWriteMissingEncodedDataInPrevious = 'RecordsWriteMissingEncodedDataInPrevious',
RecordsWriteMissingDataAssociation = 'RecordsWriteMissingDataAssociation',
RecordsWriteMissingDataStream = 'RecordsWriteMissingDataStream',
RecordsWriteMissingProtocol = 'RecordsWriteMissingProtocol',
Expand Down
214 changes: 116 additions & 98 deletions src/handlers/records-write.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import type { EventLog } from '../types/event-log.js';
import type { GenericMessageReply } from '../core/message-reply.js';
import type { MessageStore } from '../types//message-store.js';
import type { MethodHandler } from '../types/method-handler.js';
import type { RecordsDeleteMessage, RecordsWriteMessage, RecordsWriteMessageWithOptionalEncodedData } from '../types/records-types.js';
import type { RecordsWriteMessage, RecordsWriteMessageWithOptionalEncodedData } from '../types/records-types.js';

import { authenticate } from '../core/auth.js';
import { Cid } from '../utils/cid.js';
Expand Down Expand Up @@ -87,57 +87,43 @@ export class RecordsWriteHandler implements MethodHandler {
};
}

const isLatestBaseState = true;
const indexes = await recordsWrite.constructRecordsWriteIndexes(isLatestBaseState);

// if data is below a certain threshold, we embed the data directly into the message for storage in MessageStore.
let messageWithOptionalEncodedData: RecordsWriteMessageWithOptionalEncodedData = message;
if (dataStream === undefined && newestExistingMessage?.descriptor.method === DwnMethodName.Delete) {
return messageReplyFromError(new DwnError(DwnErrorCode.RecordsWriteMissingDataStream, 'No data stream was provided with the previous message being a delete'), 400);
} else if (newMessageIsInitialWrite && dataStream === undefined) {
// we allow pruned writing of the initial RecordsWrite message in cases where we do not have the original data,
// however this is not the latest base state of the record and should not be queryable/readable until a subsequent write with appropriate data.
indexes.isLatestBaseState = false;
} else {
try {
// we set latestBaseState initially to false, as we allow `RecordsWrite` without a data stream in some cases
// depending on validation rules within `processMessageWithDataStream` and `processMessageWithoutDataStream`
// if it has been associated data and should show up in reads/queries
let isLatestBaseState = false;
let messageWithOptionalEncodedData = message as RecordsWriteMessageWithOptionalEncodedData;

try {
if (dataStream === undefined) {
const newestRecordsWriteMessage = newestExistingMessage as RecordsWriteMessage;
// if no data stream exists and this is not the initial message, check data integrity against the newest message.
RecordsWriteHandler.validateDataIntegrity(
newestRecordsWriteMessage?.descriptor.dataCid,
newestRecordsWriteMessage.descriptor.dataSize,
message.descriptor.dataCid,
message.descriptor.dataSize,
);
}
// if data is below the threshold, we store it within MessageStore
if (message.descriptor.dataSize <= DwnConstant.maxDataSizeAllowedToBeEncoded) {
// processes and sets `encodedData` with appropriate data.
messageWithOptionalEncodedData = await this.processEncodedData(
message,
dataStream,
newestExistingMessage as (RecordsWriteMessage|RecordsDeleteMessage) | undefined
);
} else {
await this.putData(tenant, message, dataStream);
}
} catch (error) {
const e = error as any;
if (e.code === DwnErrorCode.RecordsWriteMissingDataInPrevious ||
e.code === DwnErrorCode.RecordsWriteMissingDataAssociation ||
e.code === DwnErrorCode.RecordsWriteDataCidMismatch ||
e.code === DwnErrorCode.RecordsWriteDataSizeMismatch) {
return messageReplyFromError(error, 400);
}
if (dataStream !== undefined) {
({ isLatestBaseState, messageWithOptionalEncodedData } = await this.processMessageWithDataStream(tenant, message, dataStream));
} else if (newestExistingMessage?.descriptor.method === DwnMethodName.Delete) {
throw new DwnError(
DwnErrorCode.RecordsWriteMissingDataStream,
'No data stream was provided with the previous message being a delete'
);
} else if (!newMessageIsInitialWrite) {
// at this point we know that newestExistingMessage exists is not a Delete
const newestExistingWrite = newestExistingMessage as RecordsWriteMessageWithOptionalEncodedData;
({ isLatestBaseState, messageWithOptionalEncodedData } = await this.processMessageWithoutDataStream(tenant, message, newestExistingWrite ));
}

// else throw
throw error;
const indexes = await recordsWrite.constructRecordsWriteIndexes(isLatestBaseState);
await this.messageStore.put(tenant, messageWithOptionalEncodedData, indexes);
await this.eventLog.append(tenant, await Message.getCid(message));
} catch (error) {
const e = error as any;
if (e.code === DwnErrorCode.RecordsWriteMissingEncodedDataInPrevious ||
e.code === DwnErrorCode.RecordsWriteMissingDataInPrevious ||
e.code === DwnErrorCode.RecordsWriteMissingDataStream ||
e.code === DwnErrorCode.RecordsWriteMissingDataAssociation ||
e.code === DwnErrorCode.RecordsWriteDataCidMismatch ||
e.code === DwnErrorCode.RecordsWriteDataSizeMismatch) {
return messageReplyFromError(error, 400);
}
}

await this.messageStore.put(tenant, messageWithOptionalEncodedData, indexes);
await this.eventLog.append(tenant, await Message.getCid(message));
// else throw
throw error;
}

const messageReply = {
status: { code: 202, detail: 'Accepted' }
Expand All @@ -151,78 +137,110 @@ export class RecordsWriteHandler implements MethodHandler {
return messageReply;
};

/**
* Embeds the record's data into the `encodedData` property.
* If dataStream is present, it uses the dataStream. Otherwise, uses the `encodedData` from the most recent RecordsWrite.
*
* @returns {RecordsWriteMessageWithOptionalEncodedData} `encodedData` embedded.
*
* @throws {DwnError} with `DwnErrorCode.RecordsWriteMissingDataInPrevious`
* if `dataStream` is absent AND `encodedData` of previous message is missing
* @throws {DwnError} with `DwnErrorCode.RecordsWriteDataCidMismatch`
* if the data stream resulted in a data CID that mismatches with `dataCid` in the given message
* @throws {DwnError} with `DwnErrorCode.RecordsWriteDataSizeMismatch`
* if `dataSize` in `descriptor` given mismatches the actual data size
*/
public async processEncodedData(
private async processMessageWithDataStream(
tenant: string,
message: RecordsWriteMessage,
dataStream: _Readable.Readable,
):Promise<{
isLatestBaseState: boolean
messageWithOptionalEncodedData: RecordsWriteMessageWithOptionalEncodedData
}> {
let isLatestBaseState = false;
let messageWithOptionalEncodedData: RecordsWriteMessageWithOptionalEncodedData = message;

// if data is below the threshold, we store it within MessageStore
if (message.descriptor.dataSize <= DwnConstant.maxDataSizeAllowedToBeEncoded) {
const dataBytes = await DataStream.toBytes(dataStream!);
const dataCid = await Cid.computeDagPbCidFromBytes(dataBytes);
// validate data integrity before setting.
RecordsWriteHandler.validateDataIntegrity(message.descriptor.dataCid, message.descriptor.dataSize, dataCid, dataBytes.length);
messageWithOptionalEncodedData = await this.encodeAndSetData(message, dataBytes);
isLatestBaseState = true;
} else {
const messageCid = await Message.getCid(message);
const result = await this.dataStore.put(tenant, messageCid, message.descriptor.dataCid, dataStream);
await this.validateDataStoreIntegrity(tenant, message, result.dataCid, result.dataSize);
isLatestBaseState = true;
}

return { isLatestBaseState, messageWithOptionalEncodedData };
}

private async processMessageWithoutDataStream(
tenant: string,
message: RecordsWriteMessage,
dataStream?: _Readable.Readable,
newestExistingMessage?: RecordsWriteMessage | RecordsDeleteMessage
): Promise<RecordsWriteMessageWithOptionalEncodedData> {
let dataBytes;
if (dataStream === undefined) {
const newestWithData = newestExistingMessage as RecordsWriteMessageWithOptionalEncodedData | undefined;
if (newestWithData?.encodedData === undefined) {
newestExistingWrite: RecordsWriteMessageWithOptionalEncodedData,
):Promise<{
isLatestBaseState: boolean
messageWithOptionalEncodedData: RecordsWriteMessageWithOptionalEncodedData
}> {
let isLatestBaseState = false;
let messageWithOptionalEncodedData: RecordsWriteMessageWithOptionalEncodedData = message;
const { dataCid, dataSize } = message.descriptor;
// if the incoming message is not an initial write, and no dataStream is provided, first check integrity against newest existing write.
RecordsWriteHandler.validateDataIntegrity(dataCid, dataSize, newestExistingWrite.descriptor.dataCid, newestExistingWrite.descriptor.dataSize);
if (dataSize <= DwnConstant.maxDataSizeAllowedToBeEncoded) {
// we encode the data from the original write if it is smaller than the data-store threshold
if (newestExistingWrite.encodedData !== undefined) {
const dataBytes = Encoder.base64UrlToBytes(newestExistingWrite.encodedData);
messageWithOptionalEncodedData = await this.encodeAndSetData(message, dataBytes);
isLatestBaseState = true; // will show up in reads/queries
} else {
throw new DwnError(
DwnErrorCode.RecordsWriteMissingDataInPrevious,
DwnErrorCode.RecordsWriteMissingEncodedDataInPrevious,
`No dataStream was provided and unable to get data from previous message`
);
} else {
dataBytes = Encoder.base64UrlToBytes(newestWithData.encodedData);
}
} else {
dataBytes = await DataStream.toBytes(dataStream);
const previousWriteMessageCid = await Message.getCid(newestExistingWrite);
// attempt to retrieve the data from the previous message, if it does not exist we have no previous data to associate.
// we preform this check in case a user attempts to gain access to data by knowing the dataCid,
// https://github.com/TBD54566975/dwn-sdk-js/issues/359
// so we insure that the data is already associated with the existing newest message
const dataResults = await this.dataStore.get(tenant, previousWriteMessageCid, message.descriptor.dataCid);
if (dataResults === undefined) {
throw new DwnError(
DwnErrorCode.RecordsWriteMissingDataInPrevious,
`No dataStream was provided and unable to get data from previous message`
);
}
const result = await this.dataStore.associate(tenant, await Message.getCid(message), message.descriptor.dataCid);
if (result === undefined) {
throw new DwnError(
DwnErrorCode.RecordsWriteMissingDataAssociation,
'No dataStream was provided and unable to associate with previous data'
);
}
await this.validateDataStoreIntegrity(tenant, message, result.dataCid, result.dataSize);
isLatestBaseState = true; // will show up in reads/queries
}

const dataCid = await Cid.computeDagPbCidFromBytes(dataBytes);
RecordsWriteHandler.validateDataIntegrity(message.descriptor.dataCid, message.descriptor.dataSize, dataCid, dataBytes.length);
return { isLatestBaseState, messageWithOptionalEncodedData };
}

/**
* Returns a `RecordsWriteMessageWithOptionalEncodedData` with a copy of the incoming message and the incoming data encoded to `Base64URL`.
*/
public async encodeAndSetData(message: RecordsWriteMessage, dataBytes: Uint8Array):Promise<RecordsWriteMessageWithOptionalEncodedData> {
const recordsWrite: RecordsWriteMessageWithOptionalEncodedData = { ...message };
recordsWrite.encodedData = Encoder.bytesToBase64Url(dataBytes);
return recordsWrite;
}

/**
* Puts the given data in storage unless tenant already has that data for the given recordId
*
* @throws {DwnError} with `DwnErrorCode.RecordsWriteMissingDataAssociation`
* if `dataStream` is absent AND unable to associate data given `dataCid`
* @throws {DwnError} with `DwnErrorCode.RecordsWriteDataCidMismatch`
* if the data stream resulted in a data CID that mismatches with `dataCid` in the given message
* @throws {DwnError} with `DwnErrorCode.RecordsWriteDataSizeMismatch`
* if `dataSize` in `descriptor` given mismatches the actual data size
* Validates the data integrity after either putting the data or associating it with a new message.
* Upon failure deletes the association, and subsequently the data if there are no other associations.
*/
public async putData(
private async validateDataStoreIntegrity(
tenant: string,
message: RecordsWriteMessage,
dataStream?: _Readable.Readable,
dataCid: string,
dataSize: number
): Promise<void> {
let result: { dataCid: string, dataSize: number };
const messageCid = await Message.getCid(message);

if (dataStream === undefined) {
const associateResult = await this.dataStore.associate(tenant, messageCid, message.descriptor.dataCid);
if (associateResult === undefined) {
throw new DwnError(DwnErrorCode.RecordsWriteMissingDataAssociation, `Unable to associate dataCid ${message.descriptor.dataCid} ` +
`to messageCid ${messageCid} because dataStream was not provided and data was not found in dataStore`);
}
result = associateResult;
} else {
result = await this.dataStore.put(tenant, messageCid, message.descriptor.dataCid, dataStream);
}

try {
RecordsWriteHandler.validateDataIntegrity(message.descriptor.dataCid, message.descriptor.dataSize, result.dataCid, result.dataSize);
RecordsWriteHandler.validateDataIntegrity(message.descriptor.dataCid, message.descriptor.dataSize, dataCid, dataSize);
} catch (error) {
// delete data and throw error to caller
await this.dataStore.delete(tenant, messageCid, message.descriptor.dataCid);
Expand Down
15 changes: 7 additions & 8 deletions tests/handlers/records-query.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1335,31 +1335,30 @@ export function testRecordsQueryHandler(): void {
{ author: alice, schema, data: Encoder.stringToBytes('5'), published: true, recipient: carol.did }
);

// directly inserting data to datastore so that we don't have to setup to grant Bob permission to write to Alice's DWN
const recordsWriteHandler = new RecordsWriteHandler(didResolver, messageStore, dataStore, eventLog);

const additionalIndexes1 = await record1Data.recordsWrite.constructRecordsWriteIndexes(true);
record1Data.message = await recordsWriteHandler.processEncodedData(record1Data.message, record1Data.dataStream);
record1Data.message = await recordsWriteHandler.encodeAndSetData(record1Data.message, record1Data.dataBytes!);
await messageStore.put(alice.did, record1Data.message, additionalIndexes1);
await eventLog.append(alice.did, await Message.getCid(record1Data.message));

const additionalIndexes2 = await record2Data.recordsWrite.constructRecordsWriteIndexes(true);
record2Data.message = await recordsWriteHandler.processEncodedData(record2Data.message, record2Data.dataStream);
record2Data.message = await recordsWriteHandler.encodeAndSetData(record2Data.message,record2Data.dataBytes!);
await messageStore.put(alice.did, record2Data.message, additionalIndexes2);
await eventLog.append(alice.did, await Message.getCid(record2Data.message));

const additionalIndexes3 = await record3Data.recordsWrite.constructRecordsWriteIndexes(true);
record3Data.message = await recordsWriteHandler.processEncodedData(record3Data.message, record3Data.dataStream);
record3Data.message = await recordsWriteHandler.encodeAndSetData(record3Data.message, record3Data.dataBytes!);
await messageStore.put(alice.did, record3Data.message, additionalIndexes3);
await eventLog.append(alice.did, await Message.getCid(record3Data.message));

const additionalIndexes4 = await record4Data.recordsWrite.constructRecordsWriteIndexes(true);
record4Data.message = await recordsWriteHandler.processEncodedData(record4Data.message, record4Data.dataStream);
record4Data.message = await recordsWriteHandler.encodeAndSetData(record4Data.message, record4Data.dataBytes!);
await messageStore.put(alice.did, record4Data.message, additionalIndexes4);
await eventLog.append(alice.did, await Message.getCid(record4Data.message));

const additionalIndexes5 = await record5Data.recordsWrite.constructRecordsWriteIndexes(true);
record5Data.message = await recordsWriteHandler.processEncodedData(record5Data.message, record5Data.dataStream);
record5Data.message = await recordsWriteHandler.encodeAndSetData(record5Data.message, record5Data.dataBytes!);
await messageStore.put(alice.did, record5Data.message, additionalIndexes5);
await eventLog.append(alice.did, await Message.getCid(record5Data.message));

Expand Down Expand Up @@ -1464,9 +1463,9 @@ export function testRecordsQueryHandler(): void {
const recordsWriteHandler = new RecordsWriteHandler(didResolver, messageStore, dataStore, eventLog);

const messages: GenericMessage[] = [];
for await (const { recordsWrite, message, dataStream } of messagePromises) {
for await (const { recordsWrite, message, dataBytes } of messagePromises) {
const indexes = await recordsWrite.constructRecordsWriteIndexes(true);
const processedMessage = await recordsWriteHandler.processEncodedData(message, dataStream);
const processedMessage = await recordsWriteHandler.encodeAndSetData(message, dataBytes!);
await messageStore.put(alice.did, processedMessage, indexes);
await eventLog.append(alice.did, await Message.getCid(processedMessage));
messages.push(processedMessage);
Expand Down
Loading

0 comments on commit fc3e998

Please sign in to comment.