From a587909af7e10b082e9ed5c4f171d40e62fb6208 Mon Sep 17 00:00:00 2001 From: Jacob Mischka Date: Wed, 10 Aug 2022 09:59:56 -0500 Subject: [PATCH 1/4] Allow dynamically adding/removing actions and groups after listen `ExperimentalInterval` and `ActionGroup`'s `use` methods, along with new `unuse` method and `add` and `remove` methods in `ActionGroup` and `ExperimentalActions`, now trigger the `IntervalClient` to reregister its actions if `listen()` has already been called and the host has already been initialized. --- src/classes/ActionGroup.ts | 36 ++++++++- src/classes/Actions.ts | 128 +++++++++++++++++++++++++++++ src/classes/IntervalClient.ts | 80 ++++++++++++------ src/examples/nested/index.ts | 47 +++++++++-- src/experimental.ts | 63 ++++++++++++++- src/index.ts | 148 ++++++---------------------------- 6 files changed, 343 insertions(+), 159 deletions(-) create mode 100644 src/classes/Actions.ts diff --git a/src/classes/ActionGroup.ts b/src/classes/ActionGroup.ts index a6d8a3f..f6c2d90 100644 --- a/src/classes/ActionGroup.ts +++ b/src/classes/ActionGroup.ts @@ -1,8 +1,10 @@ -import { IntervalActionDefinitions } from '../types' +import { Evt } from 'evt' +import { IntervalActionDefinition, IntervalActionDefinitions } from '../types' export interface ActionGroupConfig { name: string actions: IntervalActionDefinitions + groups?: Record } export default class ActionGroup { @@ -10,12 +12,44 @@ export default class ActionGroup { actions: IntervalActionDefinitions groups: Record = {} + onChange: Evt + #groupChangeCtx = Evt.newCtx() + constructor(config: ActionGroupConfig) { this.name = config.name this.actions = config.actions + this.groups = config.groups ?? {} + this.onChange = new Evt() + + for (const group of Object.values(this.groups)) { + group.onChange.attach(this.#groupChangeCtx, this.onChange.post) + } } use(groupSlug: string, group: ActionGroup) { + group.onChange.attach(this.#groupChangeCtx, this.onChange.post) this.groups[groupSlug] = group + this.onChange.post() + } + + unuse(groupSlug: string) { + const group = this.groups[groupSlug] + if (!group) return + + group.onChange.detach(this.#groupChangeCtx) + delete this.groups[groupSlug] + } + + add(slug: string, action: IntervalActionDefinition) { + this.actions[slug] = action + + this.onChange.post() + } + + remove(slug: string) { + if (this.actions[slug]) { + delete this.actions[slug] + this.onChange.post() + } } } diff --git a/src/classes/Actions.ts b/src/classes/Actions.ts new file mode 100644 index 0000000..3533afa --- /dev/null +++ b/src/classes/Actions.ts @@ -0,0 +1,128 @@ +import { z } from 'zod' +import fetch from 'node-fetch' +import * as superjson from 'superjson' +import Logger from './Logger' +import Interval, { IntervalError, QueuedAction } from '..' +import { ENQUEUE_ACTION, DEQUEUE_ACTION } from '../internalRpcSchema' + +/** + * This is effectively a namespace inside of Interval with a little bit of its own state. + */ +export default class Actions { + protected interval: Interval + #logger: Logger + #apiKey?: string + #endpoint: string + + constructor( + interval: Interval, + endpoint: string, + logger: Logger, + apiKey?: string + ) { + this.interval = interval + this.#apiKey = apiKey + this.#logger = logger + this.#endpoint = endpoint + '/api/actions' + } + + #getAddress(path: string): string { + if (path.startsWith('/')) { + path = path.substring(1) + } + + return `${this.#endpoint}/${path}` + } + + async enqueue( + slug: string, + { assignee, params }: Pick = {} + ): Promise { + let body: z.infer + try { + const { json, meta } = params + ? superjson.serialize(params) + : { json: undefined, meta: undefined } + body = ENQUEUE_ACTION.inputs.parse({ + assignee, + slug, + params: json, + paramsMeta: meta, + }) + } catch (err) { + this.#logger.debug(err) + throw new IntervalError('Invalid input.') + } + + const response = await fetch(this.#getAddress('enqueue'), { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Authorization: `Bearer ${this.#apiKey}`, + }, + body: JSON.stringify(body), + }) + .then(r => r.json()) + .then(r => ENQUEUE_ACTION.returns.parseAsync(r)) + .catch(err => { + this.#logger.debug(err) + throw new IntervalError('Received invalid API response.') + }) + + if (response.type === 'error') { + throw new IntervalError( + `There was a problem enqueuing the action: ${response.message}` + ) + } + + return { + id: response.id, + assignee, + params, + } + } + + async dequeue(id: string): Promise { + let body: z.infer + try { + body = DEQUEUE_ACTION.inputs.parse({ + id, + }) + } catch (err) { + this.#logger.debug(err) + throw new IntervalError('Invalid input.') + } + + const response = await fetch(this.#getAddress('dequeue'), { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Authorization: `Bearer ${this.#apiKey}`, + }, + body: JSON.stringify(body), + }) + .then(r => r.json()) + .then(r => DEQUEUE_ACTION.returns.parseAsync(r)) + .catch(err => { + this.#logger.debug(err) + throw new IntervalError('Received invalid API response.') + }) + + if (response.type === 'error') { + throw new IntervalError( + `There was a problem enqueuing the action: ${response.message}` + ) + } + + let { type, params, paramsMeta, ...rest } = response + + if (paramsMeta && params) { + params = superjson.deserialize({ json: params, meta: paramsMeta }) + } + + return { + ...rest, + params, + } + } +} diff --git a/src/classes/IntervalClient.ts b/src/classes/IntervalClient.ts index 5dd33cb..92eab06 100644 --- a/src/classes/IntervalClient.ts +++ b/src/classes/IntervalClient.ts @@ -69,12 +69,13 @@ export default class IntervalClient { #retryIntervalMs: number = 3000 #pingIntervalMs: number = 30_000 #closeUnresponsiveConnectionTimeoutMs: number = 3 * 60 * 1000 // 3 minutes + #reinitializeBatchTimeoutMs: number = 200 #pingIntervalHandle: NodeJS.Timeout | undefined #intentionallyClosed = false - #actionDefinitions: ActionDefinition[] - #groupDefinitions: GroupDefinition[] - #actionHandlers: Record + #actionDefinitions: ActionDefinition[] = [] + #groupDefinitions: GroupDefinition[] = [] + #actionHandlers: Record = {} organization: | { @@ -93,6 +94,35 @@ export default class IntervalClient { this.#endpoint = config.endpoint } + this.#walkActions(config) + + if (config.retryIntervalMs && config.retryIntervalMs > 0) { + this.#retryIntervalMs = config.retryIntervalMs + } + + if (config.pingIntervalMs && config.pingIntervalMs > 0) { + this.#pingIntervalMs = config.pingIntervalMs + } + + if ( + config.closeUnresponsiveConnectionTimeoutMs && + config.closeUnresponsiveConnectionTimeoutMs > 0 + ) { + this.#closeUnresponsiveConnectionTimeoutMs = + config.closeUnresponsiveConnectionTimeoutMs + } + + if ( + config.reinitializeBatchTimeoutMs && + config.reinitializeBatchTimeoutMs > 0 + ) { + this.#reinitializeBatchTimeoutMs = config.reinitializeBatchTimeoutMs + } + + this.#httpEndpoint = getHttpEndpoint(this.#endpoint) + } + + #walkActions(config: InternalConfig) { const groupDefinitions: GroupDefinition[] = [] const actionDefinitions: (ActionDefinition & { handler: undefined })[] = [] const actionHandlers: Record = {} @@ -140,24 +170,6 @@ export default class IntervalClient { this.#groupDefinitions = groupDefinitions this.#actionDefinitions = actionDefinitions this.#actionHandlers = actionHandlers - - if (config.retryIntervalMs && config.retryIntervalMs > 0) { - this.#retryIntervalMs = config.retryIntervalMs - } - - if (config.pingIntervalMs && config.pingIntervalMs > 0) { - this.#pingIntervalMs = config.pingIntervalMs - } - - if ( - config.closeUnresponsiveConnectionTimeoutMs && - config.closeUnresponsiveConnectionTimeoutMs > 0 - ) { - this.#closeUnresponsiveConnectionTimeoutMs = - config.closeUnresponsiveConnectionTimeoutMs - } - - this.#httpEndpoint = getHttpEndpoint(this.#endpoint) } get #log() { @@ -176,11 +188,24 @@ export default class IntervalClient { | DuplexRPCClient | undefined = undefined #isConnected = false + #isInitialized = false get isConnected() { return this.#isConnected } + #reinitializeTimeout: NodeJS.Timeout | null = null + + handleActionsChange(config: InternalConfig) { + if (this.#isInitialized && !this.#reinitializeTimeout) { + this.#reinitializeTimeout = setTimeout(async () => { + this.#walkActions(config) + await this.#initializeHost() + this.#reinitializeTimeout = null + }, this.#reinitializeBatchTimeoutMs) + } + } + async listen() { if (this.#actionDefinitions.length === 0) { this.#log.prod( @@ -828,11 +853,16 @@ export default class IntervalClient { this.organization = response.organization this.environment = response.environment - this.#log.prod( - `🔗 Connected! Access your actions at: ${response.dashboardUrl}` - ) - this.#log.debug('Host ID:', this.#ws.id) + if (!this.#isInitialized) { + this.#log.prod( + `🔗 Connected! Access your actions at: ${response.dashboardUrl}` + ) + this.#log.debug('Host ID:', this.#ws.id) + this.#isInitialized = true + } } + + return response } async #send( diff --git a/src/examples/nested/index.ts b/src/examples/nested/index.ts index 9e75deb..0f62bf0 100644 --- a/src/examples/nested/index.ts +++ b/src/examples/nested/index.ts @@ -1,18 +1,34 @@ import { IntervalActionHandler } from '../..' import ExperimentalInterval, { ActionGroup, io } from '../../experimental' -const interval = new ExperimentalInterval({ - apiKey: 'alex_dev_kcLjzxNFxmGLf0aKtLVhuckt6sziQJtxFOdtM19tBrMUp5mj', - logLevel: 'debug', - endpoint: 'ws://localhost:3000/websocket', -}) - const action: IntervalActionHandler = async () => { const message = await io.input.text('Hello?') return message } +const devOnly = new ActionGroup({ + name: 'Dev-only', + actions: { + action, + }, +}) + +const interval = new ExperimentalInterval({ + apiKey: 'alex_dev_kcLjzxNFxmGLf0aKtLVhuckt6sziQJtxFOdtM19tBrMUp5mj', + logLevel: 'debug', + endpoint: 'ws://localhost:3000/websocket', + groups: { + devOnly, + toRemove: new ActionGroup({ + name: 'To remove', + actions: { + action, + }, + }), + }, +}) + const nested = new ActionGroup({ name: 'Nested', actions: { @@ -45,7 +61,24 @@ nested.use( }) ) -interval.listen() +interval.listen().then(() => { + interval.use( + 'new', + new ActionGroup({ + name: 'New Group', + actions: { + action, + }, + }) + ) + + interval.unuse('toRemove') + + devOnly.add('self_destructing', async () => { + devOnly.remove('self_destructing') + return 'Bye!' + }) +}) const anon = new ExperimentalInterval({ logLevel: 'debug', diff --git a/src/experimental.ts b/src/experimental.ts index c2ae114..564583c 100644 --- a/src/experimental.ts +++ b/src/experimental.ts @@ -1,7 +1,8 @@ import { z } from 'zod' +import { Evt } from 'evt' import fetch from 'node-fetch' import type { IncomingMessage, ServerResponse } from 'http' -import Interval, { io, ctx, IntervalError } from '.' +import Interval, { io, ctx, InternalConfig, IntervalError } from '.' import IntervalClient from './classes/IntervalClient' import ActionGroup from './classes/ActionGroup' import * as pkg from '../package.json' @@ -12,8 +13,29 @@ import { LambdaRequestPayload, LambdaResponse, } from './utils/http' +import Actions from './classes/Actions' +import { IntervalActionDefinition } from './types' class ExperimentalInterval extends Interval { + #groupChangeCtx = Evt.newCtx() + + constructor(config: InternalConfig) { + super(config) + this.actions = new ExperimentalActions( + this, + this.httpEndpoint, + this.log, + this.apiKey + ) + + if (this.config.groups) { + for (const group of Object.values(this.config.groups)) { + group.onChange.attach(this.#groupChangeCtx, () => { + this.client?.handleActionsChange(this.config) + }) + } + } + } /* * Add an ActionGroup and its child actions to this deployment. */ @@ -22,7 +44,23 @@ class ExperimentalInterval extends Interval { this.config.groups = {} } + group.onChange.attach(this.#groupChangeCtx, () => { + this.client?.handleActionsChange(this.config) + }) + this.config.groups[prefix] = group + + this.client?.handleActionsChange(this.config) + } + + unuse(prefix: string) { + if (!this.config.groups) return + + const group = this.config.groups[prefix] + if (!group) return + + group.onChange.detach(this.#groupChangeCtx) + delete this.config.groups[prefix] } /* @@ -211,6 +249,29 @@ class ExperimentalInterval extends Interval { } } +export class ExperimentalActions extends Actions { + add(slug: string, action: IntervalActionDefinition) { + if (!this.interval.config.actions) { + this.interval.config.actions = {} + } + + this.interval.config.actions[slug] = action + this.interval.client?.handleActionsChange(this.interval.config) + } + + remove(slug: string) { + const { actions } = this.interval.config + + if (!actions) return + const action = actions[slug] + if (!action) return + + delete actions[slug] + + this.interval.client?.handleActionsChange(this.interval.config) + } +} + export { ActionGroup, io, ctx, IntervalError, ExperimentalInterval as Interval } export default ExperimentalInterval diff --git a/src/index.ts b/src/index.ts index ea151ed..420a748 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,14 +1,9 @@ import { z } from 'zod' import fetch from 'node-fetch' -import * as superjson from 'superjson' +import Actions from './classes/Actions' import IOError from './classes/IOError' import Logger from './classes/Logger' -import { - ENQUEUE_ACTION, - DEQUEUE_ACTION, - NOTIFY, - ActionEnvironment, -} from './internalRpcSchema' +import { NOTIFY, ActionEnvironment } from './internalRpcSchema' import { SerializableRecord } from './ioSchema' import type { ActionCtx, @@ -43,6 +38,7 @@ export interface InternalConfig { retryIntervalMs?: number pingIntervalMs?: number closeUnresponsiveConnectionTimeoutMs?: number + reinitializeBatchTimeoutMs?: number } export interface QueuedAction { @@ -97,7 +93,7 @@ export default class Interval { #client: IntervalClient | undefined #apiKey: string | undefined #httpEndpoint: string - actions: Actions + #actions: Actions organization: | { @@ -107,7 +103,7 @@ export default class Interval { | undefined environment: ActionEnvironment | undefined - constructor(config: Omit) { + constructor(config: InternalConfig) { this.config = config this.#apiKey = config.apiKey this.#logger = new Logger(config.logLevel) @@ -115,7 +111,20 @@ export default class Interval { this.#httpEndpoint = getHttpEndpoint( config.endpoint ?? DEFAULT_WEBSOCKET_ENDPOINT ) - this.actions = new Actions(this.#httpEndpoint, this.#logger, this.#apiKey) + this.#actions = new Actions( + this, + this.#httpEndpoint, + this.#logger, + this.#apiKey + ) + } + + get actions(): Actions { + return this.#actions + } + + /* @internal */ set actions(actions: Actions) { + this.#actions = actions } protected get apiKey(): string | undefined { @@ -149,6 +158,10 @@ export default class Interval { return this.#client?.close() } + /* @internal */ get client() { + return this.#client + } + async notify(config: NotifyConfig): Promise { let body: z.infer try { @@ -191,119 +204,4 @@ export default class Interval { } } -/** - * This is effectively a namespace inside of Interval with a little bit of its own state. - */ -class Actions { - #logger: Logger - #apiKey?: string - #endpoint: string - - constructor(endpoint: string, logger: Logger, apiKey?: string) { - this.#apiKey = apiKey - this.#logger = logger - this.#endpoint = endpoint + '/api/actions' - } - - #getAddress(path: string): string { - if (path.startsWith('/')) { - path = path.substring(1) - } - - return `${this.#endpoint}/${path}` - } - - async enqueue( - slug: string, - { assignee, params }: Pick = {} - ): Promise { - let body: z.infer - try { - const { json, meta } = params - ? superjson.serialize(params) - : { json: undefined, meta: undefined } - body = ENQUEUE_ACTION.inputs.parse({ - assignee, - slug, - params: json, - paramsMeta: meta, - }) - } catch (err) { - this.#logger.debug(err) - throw new IntervalError('Invalid input.') - } - - const response = await fetch(this.#getAddress('enqueue'), { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - Authorization: `Bearer ${this.#apiKey}`, - }, - body: JSON.stringify(body), - }) - .then(r => r.json()) - .then(r => ENQUEUE_ACTION.returns.parseAsync(r)) - .catch(err => { - this.#logger.debug(err) - throw new IntervalError('Received invalid API response.') - }) - - if (response.type === 'error') { - throw new IntervalError( - `There was a problem enqueuing the action: ${response.message}` - ) - } - - return { - id: response.id, - assignee, - params, - } - } - - async dequeue(id: string): Promise { - let body: z.infer - try { - body = DEQUEUE_ACTION.inputs.parse({ - id, - }) - } catch (err) { - this.#logger.debug(err) - throw new IntervalError('Invalid input.') - } - - const response = await fetch(this.#getAddress('dequeue'), { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - Authorization: `Bearer ${this.#apiKey}`, - }, - body: JSON.stringify(body), - }) - .then(r => r.json()) - .then(r => DEQUEUE_ACTION.returns.parseAsync(r)) - .catch(err => { - this.#logger.debug(err) - throw new IntervalError('Received invalid API response.') - }) - - if (response.type === 'error') { - throw new IntervalError( - `There was a problem enqueuing the action: ${response.message}` - ) - } - - let { type, params, paramsMeta, ...rest } = response - - if (paramsMeta && params) { - params = superjson.deserialize({ json: params, meta: paramsMeta }) - } - - return { - ...rest, - params, - } - } -} - export { Interval, IOError } From 334685199e1eeb13fdac29e9b5f0a5369f6e7350 Mon Sep 17 00:00:00 2001 From: Jacob Mischka Date: Tue, 16 Aug 2022 08:20:59 -0500 Subject: [PATCH 2/4] Allow ActionGroups without actions, just groups --- src/classes/ActionGroup.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/classes/ActionGroup.ts b/src/classes/ActionGroup.ts index f6c2d90..bc05c40 100644 --- a/src/classes/ActionGroup.ts +++ b/src/classes/ActionGroup.ts @@ -3,7 +3,7 @@ import { IntervalActionDefinition, IntervalActionDefinitions } from '../types' export interface ActionGroupConfig { name: string - actions: IntervalActionDefinitions + actions?: IntervalActionDefinitions groups?: Record } @@ -17,7 +17,7 @@ export default class ActionGroup { constructor(config: ActionGroupConfig) { this.name = config.name - this.actions = config.actions + this.actions = config.actions ?? {} this.groups = config.groups ?? {} this.onChange = new Evt() From eff61498f7e9c86fe6e20f01b82421ab3e17938b Mon Sep 17 00:00:00 2001 From: Jacob Mischka Date: Tue, 16 Aug 2022 09:07:09 -0500 Subject: [PATCH 3/4] Change use -> addGroup and unuse -> removeGroup --- src/classes/ActionGroup.ts | 4 ++-- src/examples/nested/index.ts | 14 +++++++------- src/experimental.ts | 4 ++-- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/src/classes/ActionGroup.ts b/src/classes/ActionGroup.ts index bc05c40..57b3b35 100644 --- a/src/classes/ActionGroup.ts +++ b/src/classes/ActionGroup.ts @@ -26,13 +26,13 @@ export default class ActionGroup { } } - use(groupSlug: string, group: ActionGroup) { + addGroup(groupSlug: string, group: ActionGroup) { group.onChange.attach(this.#groupChangeCtx, this.onChange.post) this.groups[groupSlug] = group this.onChange.post() } - unuse(groupSlug: string) { + removeGroup(groupSlug: string) { const group = this.groups[groupSlug] if (!group) return diff --git a/src/examples/nested/index.ts b/src/examples/nested/index.ts index 0f62bf0..e365084 100644 --- a/src/examples/nested/index.ts +++ b/src/examples/nested/index.ts @@ -38,7 +38,7 @@ const nested = new ActionGroup({ }, }) -nested.use( +nested.addGroup( 'more', new ActionGroup({ name: 'More nested', @@ -48,9 +48,9 @@ nested.use( }) ) -interval.use('nested', nested) +interval.addGroup('nested', nested) -nested.use( +nested.addGroup( 'other', new ActionGroup({ name: 'Other', @@ -62,7 +62,7 @@ nested.use( ) interval.listen().then(() => { - interval.use( + interval.addGroup( 'new', new ActionGroup({ name: 'New Group', @@ -72,7 +72,7 @@ interval.listen().then(() => { }) ) - interval.unuse('toRemove') + interval.removeGroup('toRemove') devOnly.add('self_destructing', async () => { devOnly.remove('self_destructing') @@ -85,7 +85,7 @@ const anon = new ExperimentalInterval({ endpoint: 'ws://localhost:3000/websocket', }) -anon.use('nested', nested) +anon.addGroup('nested', nested) anon.listen() @@ -95,6 +95,6 @@ const prod = new ExperimentalInterval({ endpoint: 'ws://localhost:3000/websocket', }) -prod.use('test', nested) +prod.addGroup('test', nested) prod.listen() diff --git a/src/experimental.ts b/src/experimental.ts index 564583c..9c9146d 100644 --- a/src/experimental.ts +++ b/src/experimental.ts @@ -39,7 +39,7 @@ class ExperimentalInterval extends Interval { /* * Add an ActionGroup and its child actions to this deployment. */ - use(prefix: string, group: ActionGroup) { + addGroup(prefix: string, group: ActionGroup) { if (!this.config.groups) { this.config.groups = {} } @@ -53,7 +53,7 @@ class ExperimentalInterval extends Interval { this.client?.handleActionsChange(this.config) } - unuse(prefix: string) { + removeGroup(prefix: string) { if (!this.config.groups) return const group = this.config.groups[prefix] From 3640eba1fbfd409d56f6e7dbf00414444ec4a8f7 Mon Sep 17 00:00:00 2001 From: Jacob Mischka Date: Tue, 16 Aug 2022 09:29:36 -0500 Subject: [PATCH 4/4] Bump SDK version to -dev --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 4b302f7..a963148 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@interval/sdk", - "version": "0.24.0", + "version": "0.25.0-dev", "homepage": "https://interval.com", "repository": { "type": "git",