diff --git a/src/event-log/event-stream.ts b/src/event-log/event-stream.ts index 606ea5e3b..dc783aa79 100644 --- a/src/event-log/event-stream.ts +++ b/src/event-log/event-stream.ts @@ -1,6 +1,6 @@ import type { MessageStore } from '../types/message-store.js'; import type { EventsSubscribeMessage, EventsSubscription } from '../types/events-types.js'; -import type { EventStream, Subscription } from '../types/subscriptions.js'; +import type { EventStream, SubscriptionHandler } from '../types/subscriptions.js'; import type { Filter, KeyValues } from '../types/query-types.js'; import type { GenericMessage, GenericMessageSubscription } from '../types/message-types.js'; import type { RecordsSubscribeMessage, RecordsSubscription } from '../types/records-types.js'; @@ -25,7 +25,7 @@ export class EventStreamEmitter implements EventStream { private reauthorizationTTL: number; private isOpen: boolean = false; - private subscriptions: Map = new Map(); + private subscriptions: Map = new Map(); constructor(config?: EventStreamConfig) { this.reauthorizationTTL = config?.reauthorizationTTL || 0; // if set to zero it does not reauthorize diff --git a/src/event-log/subscription.ts b/src/event-log/subscription.ts index cfc3d3627..1b298c9ad 100644 --- a/src/event-log/subscription.ts +++ b/src/event-log/subscription.ts @@ -1,13 +1,18 @@ import type { DwnError } from '../core/dwn-error.js'; import type { EventEmitter } from 'events'; import type { MessageStore } from '../types/message-store.js'; -import type { EmitFunction, Subscription } from '../types/subscriptions.js'; +import type { EmitFunction, SubscriptionHandler } from '../types/subscriptions.js'; import type { Filter, KeyValues } from '../types/query-types.js'; import type { GenericMessage, GenericMessageHandler } from '../types/message-types.js'; import { FilterUtility } from '../utils/filter.js'; -export class SubscriptionBase implements Subscription { +/** + * Base class to extend default subscription behavior. + * + * ie. `RecordsSubscriptionHandler` has different rules for authorization and only matches specific message types. + */ +export class SubscriptionHandlerBase implements SubscriptionHandler { protected eventEmitter: EventEmitter; protected messageStore: MessageStore; protected filters: Filter[]; diff --git a/src/handlers/events-subscribe.ts b/src/handlers/events-subscribe.ts index a831fc18e..33dc345cc 100644 --- a/src/handlers/events-subscribe.ts +++ b/src/handlers/events-subscribe.ts @@ -10,7 +10,7 @@ import { Events } from '../utils/events.js'; import { EventsSubscribe } from '../interfaces/events-subscribe.js'; import { Message } from '../core/message.js'; import { messageReplyFromError } from '../core/message-reply.js'; -import { SubscriptionBase } from '../event-log/subscription.js'; +import { SubscriptionHandlerBase } from '../event-log/subscription.js'; import { authenticate, authorizeOwner } from '../core/auth.js'; export class EventsSubscribeHandler implements MethodHandler { @@ -57,7 +57,7 @@ export class EventsSubscribeHandler implements MethodHandler { } } -export class EventsSubscriptionHandler extends SubscriptionBase { +export class EventsSubscriptionHandler extends SubscriptionHandlerBase { public static async create(input: { tenant: string, message: EventsSubscribeMessage, diff --git a/src/handlers/records-subscribe.ts b/src/handlers/records-subscribe.ts index 28ab59845..6c8f3fa3e 100644 --- a/src/handlers/records-subscribe.ts +++ b/src/handlers/records-subscribe.ts @@ -14,7 +14,7 @@ import { Records } from '../utils/records.js'; import { RecordsDelete } from '../interfaces/records-delete.js'; import { RecordsSubscribe } from '../interfaces/records-subscribe.js'; import { RecordsWrite } from '../interfaces/records-write.js'; -import { SubscriptionBase } from '../event-log/subscription.js'; +import { SubscriptionHandlerBase } from '../event-log/subscription.js'; import { Time } from '../utils/time.js'; import { DwnError, DwnErrorCode } from '../core/dwn-error.js'; import { DwnInterfaceName, DwnMethodName } from '../enums/dwn-interface-method.js'; @@ -231,7 +231,7 @@ export class RecordsSubscribeHandler implements MethodHandler { } } -export class RecordsSubscriptionHandler extends SubscriptionBase { +export class RecordsSubscriptionHandler extends SubscriptionHandlerBase { private recordsSubscribe: RecordsSubscribe; private reauthorizationTTL: number; diff --git a/src/types/subscriptions.ts b/src/types/subscriptions.ts index 38669c395..c5ae6ae1c 100644 --- a/src/types/subscriptions.ts +++ b/src/types/subscriptions.ts @@ -8,6 +8,9 @@ import type { RecordsSubscribeMessage, RecordsSubscription } from './records-typ export type EmitFunction = (tenant: string, message: GenericMessage, indexes: KeyValues) => void; +/** + * The EventStream interface implements a pub/sub system based on Message filters. + */ export interface EventStream { subscribe(tenant: string, message: EventsSubscribeMessage, filters: Filter[], messageStore: MessageStore): Promise; subscribe(tenant: string, message: RecordsSubscribeMessage, filters: Filter[], messageStore: MessageStore): Promise; @@ -17,7 +20,12 @@ export interface EventStream { close(): Promise; } -export interface Subscription { +/** + * The SubscriptionHandler interface is implemented by specific types of Subscription Handlers. + * + * ie. `RecordsSubscriptionHandler` has behavior to re-authorize subscriptions. + */ +export interface SubscriptionHandler { id: string; listener: EmitFunction; on: (handler: GenericMessageHandler) => { off: () => void };