Skip to content

Commit

Permalink
update tests and comments
Browse files Browse the repository at this point in the history
  • Loading branch information
LiranCohen committed Jan 4, 2024
1 parent 5d638c3 commit 7df607b
Show file tree
Hide file tree
Showing 11 changed files with 205 additions and 59 deletions.
7 changes: 3 additions & 4 deletions src/core/protocol-authorization.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,19 +153,18 @@ export class ProtocolAuthorization {
);
}

// maybe combine with query?
public static async authorizeSubscription(
tenant: string,
incomingMessage: RecordsSubscribe,
messageStore: MessageStore,
): Promise<void> {
// validate that required properties exist in query filter
// validate that required properties exist in subscription filter
const { protocol, protocolPath, contextId } = incomingMessage.message.descriptor.filter;

// fetch the protocol definition
const protocolDefinition = await ProtocolAuthorization.fetchProtocolDefinition(
tenant,
protocol!, // authorizeQuery` is only called if `protocol` is present
protocol!, // `authorizeSubscription` is only called if `protocol` is present
messageStore,
);

Expand All @@ -190,7 +189,7 @@ export class ProtocolAuthorization {
tenant,
incomingMessage,
inboundMessageRuleSet,
[], // ancestor chain is not relevant to subscribes
[], // ancestor chain is not relevant to subscriptions
messageStore,
);
}
Expand Down
7 changes: 5 additions & 2 deletions src/core/records-grant-authorization.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,16 @@ export class RecordsGrantAuthorization {
* Authorizes the scope of a PermissionsGrant for RecordsSubscribe.
* @param messageStore Used to check if the grant has been revoked.
*/
public static async authorizeSubscribe(
public static async authorizeSubscribe(input: {
recordsSubscribeMessage: RecordsSubscribeMessage,
expectedGrantedToInGrant: string,
expectedGrantedForInGrant: string,
permissionsGrantMessage: PermissionsGrantMessage,
messageStore: MessageStore,
): Promise<void> {
}): Promise<void> {
const {
recordsSubscribeMessage, expectedGrantedToInGrant, expectedGrantedForInGrant, permissionsGrantMessage, messageStore
} = input;

await GrantAuthorization.performBaseValidation({
incomingMessage: recordsSubscribeMessage,
Expand Down
1 change: 0 additions & 1 deletion src/dwn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ export class Dwn {
),
[DwnInterfaceName.Events+ DwnMethodName.Subscribe]: new EventsSubscribeHandler(
this.didResolver,
this.messageStore,
this.eventStream,
),
[DwnInterfaceName.Messages + DwnMethodName.Get]: new MessagesGetHandler(
Expand Down
19 changes: 11 additions & 8 deletions src/event-log/event-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,13 @@ type EventStreamConfig = {

export class EventStreamEmitter implements EventStream {
private eventEmitter: EventEmitter;
private didResolver: DidResolver;
private messageStore: MessageStore;
private reauthorizationTTL: number;

private isOpen: boolean = false;
private subscriptions: Map<string, Subscription> = new Map();

constructor(config: EventStreamConfig) {
this.didResolver = config.didResolver;
this.messageStore = config.messageStore;
this.reauthorizationTTL = config.reauthorizationTTL ?? 0; // if set to zero it does not reauthorize

Expand All @@ -58,20 +56,25 @@ export class EventStreamEmitter implements EventStream {
return subscription;
}

const unsubscribe = async ():Promise<void> => { await this.unsubscribe(messageCid); };

if (RecordsSubscribe.isRecordsSubscribeMessage(message)) {
subscription = await RecordsSubscriptionHandler.create({
tenant,
message,
filters,
unsubscribe,
unsubscribe : () => this.unsubscribe(messageCid),
eventEmitter : this.eventEmitter,
messageStore : this.messageStore,
reauthorizationTTL : this.reauthorizationTTL,
});
} else if (EventsSubscribe.isEventsSubscribeMessage(message)) {
subscription = await EventsSubscriptionHandler.create(tenant, message, filters, this.eventEmitter, this.messageStore, unsubscribe);
subscription = await EventsSubscriptionHandler.create({
tenant,
message,
filters,
unsubscribe : () => this.unsubscribe(messageCid),
eventEmitter : this.eventEmitter,
messageStore : this.messageStore
});
} else {
throw new DwnError(DwnErrorCode.EventStreamSubscriptionNotSupported, 'not a supported subscription message');
}
Expand Down Expand Up @@ -102,8 +105,8 @@ export class EventStreamEmitter implements EventStream {

emit(tenant: string, message: GenericMessage, indexes: KeyValues): void {
if (!this.isOpen) {
//todo: dwn error
throw new Error('Event stream is not open. Cannot add to the stream.');
// silently ignore.
return;
}
try {
this.eventEmitter.emit(this.eventChannel, tenant, message, indexes);
Expand Down
9 changes: 4 additions & 5 deletions src/handlers/events-subscribe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import { authenticate, authorizeOwner } from '../core/auth.js';
export class EventsSubscribeHandler implements MethodHandler {
constructor(
private didResolver: DidResolver,
private messageStore: MessageStore,
private eventStream: EventStream
) {}

Expand Down Expand Up @@ -54,16 +53,16 @@ export class EventsSubscribeHandler implements MethodHandler {
}

export class EventsSubscriptionHandler extends SubscriptionBase {
public static async create(
public static async create(input: {
tenant: string,
message: EventsSubscribeMessage,
filters: Filter[],
eventEmitter: EventEmitter,
messageStore: MessageStore,
unsubscribe: () => Promise<void>
): Promise<EventsSubscriptionHandler> {
const id = await Message.getCid(message);
return new EventsSubscriptionHandler({ tenant, message, id, filters, eventEmitter, messageStore, unsubscribe });
}): Promise<EventsSubscriptionHandler> {
const id = await Message.getCid(input.message);
return new EventsSubscriptionHandler({ ...input, id });
}
};

14 changes: 4 additions & 10 deletions src/handlers/records-subscribe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,6 @@ export class RecordsSubscribeHandler implements MethodHandler {
};
}

// 1) owner filters
// 2) public filters
// 3) authorized filters
// a) protocol authorized
// b) grant authorized

/**
* Fetches the records as the owner of the DWN with no additional filtering.
*/
Expand Down Expand Up @@ -272,7 +266,7 @@ export class RecordsSubscriptionHandler extends SubscriptionBase {
await RecordsSubscribeHandler.authorizeRecordsSubscribe(this.tenant, this.recordsSubscribe, this.messageStore);
}

public static async create(options: {
public static async create(input: {
tenant: string,
message: RecordsSubscribeMessage,
filters: Filter[],
Expand All @@ -281,9 +275,9 @@ export class RecordsSubscriptionHandler extends SubscriptionBase {
unsubscribe: () => Promise<void>;
reauthorizationTTL: number
}): Promise<RecordsSubscriptionHandler> {
const id = await Message.getCid(options.message);
const recordsSubscribe = await RecordsSubscribe.parse(options.message);
return new RecordsSubscriptionHandler({ ...options, id, recordsSubscribe });
const id = await Message.getCid(input.message);
const recordsSubscribe = await RecordsSubscribe.parse(input.message);
return new RecordsSubscriptionHandler({ ...input, id, recordsSubscribe });
}

public listener: EmitFunction = async (tenant, message, indexes):Promise<void> => {
Expand Down
10 changes: 7 additions & 3 deletions src/interfaces/records-subscribe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,14 @@ export class RecordsSubscribe extends AbstractMessage<RecordsSubscribeMessage> {
* @param messageStore Used to check if the grant has been revoked.
*/
public async authorizeDelegate(messageStore: MessageStore): Promise<void> {
const grantedTo = this.signer!;
const grantedFor = this.author!;
const delegatedGrant = this.message.authorization!.authorDelegatedGrant!;
await RecordsGrantAuthorization.authorizeSubscribe(this.message, grantedTo, grantedFor, delegatedGrant, messageStore);
await RecordsGrantAuthorization.authorizeSubscribe({
recordsSubscribeMessage : this.message,
expectedGrantedToInGrant : this.signer!,
expectedGrantedForInGrant : this.author!,
permissionsGrantMessage : delegatedGrant,
messageStore
});
}

public static isRecordsSubscribeMessage(message: GenericMessage): message is RecordsSubscribeMessage {
Expand Down
4 changes: 2 additions & 2 deletions src/types/protocols-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ export type ProtocolActionRule = {

/**
* Action that the actor can perform.
* May be 'query' | 'read' | 'write'
* 'query' is only supported for `role` rules.
* May be 'query' | 'read' | 'write' | 'subscribe'
* 'query' and 'subscribe' are only supported for `role` rules.
*/
can: string;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import chai, { expect } from 'chai';

chai.use(chaiAsPromised);

describe('Event Stream Tests', () => {
describe('EventStreamEmitter', () => {
let eventStream: EventStreamEmitter;
let didResolver: DidResolver;
let messageStore: MessageStore;
Expand All @@ -22,8 +22,6 @@ describe('Event Stream Tests', () => {
blockstoreLocation : 'TEST-MESSAGESTORE',
indexLocation : 'TEST-INDEX'
});
// Create a new instance of EventStream before each test
eventStream = new EventStreamEmitter({ didResolver, messageStore });
});

beforeEach(async () => {
Expand All @@ -36,48 +34,45 @@ describe('Event Stream Tests', () => {
await eventStream.close();
});

xit('test add callback', async () => {
});

xit('test bad message', async () => {
});

xit('should throw an error when adding events to a closed stream', async () => {
});

xit('should handle concurrent event sending', async () => {
});

xit('test emitter chaining', async () => {
});

it('should remove listeners when unsubscribe method is used', async () => {
const alice = await DidKeyResolver.generate();

const emitter = new EventEmitter();
const eventEmitter = new EventStreamEmitter({ emitter, messageStore, didResolver });
eventStream = new EventStreamEmitter({ emitter, messageStore, didResolver });

// count the `events_bus` listeners, which represents all listeners
expect(emitter.listenerCount('events_bus')).to.equal(0);

// initiate a subscription, which should add a listener
const { message } = await TestDataGenerator.generateRecordsSubscribe({ author: alice });
const sub = await eventEmitter.subscribe(alice.did, message, []);
const sub = await eventStream.subscribe(alice.did, message, []);
expect(emitter.listenerCount('events_bus')).to.equal(1);

// close the subscription, which should remove the listener
await sub.close();
expect(emitter.listenerCount('events_bus')).to.equal(0);
});

it('should remove listeners when off method is used', async () => {
const alice = await DidKeyResolver.generate();
const emitter = new EventEmitter();
const eventEmitter = new EventStreamEmitter({ emitter, messageStore, didResolver });
eventStream = new EventStreamEmitter({ emitter, messageStore, didResolver });

// initiate a subscription
const { message } = await TestDataGenerator.generateRecordsSubscribe();
const sub = await eventEmitter.subscribe(alice.did, message, []);
const sub = await eventStream.subscribe(alice.did, message, []);
const messageCid = await Message.getCid(message);

// the listener count for the specific subscription should be at zero
expect(emitter.listenerCount(`${alice.did}_${messageCid}`)).to.equal(0);
const handler = (_:GenericMessage):void => {};
const on1 = sub.on(handler);
const on2 = sub.on(handler);

// after registering two handlers, there should be two listeners
expect(emitter.listenerCount(`${alice.did}_${messageCid}`)).to.equal(2);

// un-register the handlers one by one, checking the listener count after each.
on1.off();
expect(emitter.listenerCount(`${alice.did}_${messageCid}`)).to.equal(1);
on2.off();
Expand Down
Loading

0 comments on commit 7df607b

Please sign in to comment.