Skip to content

Commit

Permalink
Merge pull request #760 from interval/add-remove-actions
Browse files Browse the repository at this point in the history
Allow dynamically adding/removing actions and groups after listen
  • Loading branch information
jacobmischka authored Aug 18, 2022
2 parents 1a4df9a + 3640eba commit f8dadf7
Show file tree
Hide file tree
Showing 7 changed files with 353 additions and 169 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@interval/sdk",
"version": "0.24.0",
"version": "0.25.0-dev",
"homepage": "https://interval.com",
"repository": {
"type": "git",
Expand Down
42 changes: 38 additions & 4 deletions src/classes/ActionGroup.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,55 @@
import { IntervalActionDefinitions } from '../types'
import { Evt } from 'evt'
import { IntervalActionDefinition, IntervalActionDefinitions } from '../types'

export interface ActionGroupConfig {
name: string
actions: IntervalActionDefinitions
actions?: IntervalActionDefinitions
groups?: Record<string, ActionGroup>
}

export default class ActionGroup {
name: string
actions: IntervalActionDefinitions
groups: Record<string, ActionGroup> = {}

onChange: Evt<void>
#groupChangeCtx = Evt.newCtx()

constructor(config: ActionGroupConfig) {
this.name = config.name
this.actions = config.actions
this.actions = config.actions ?? {}
this.groups = config.groups ?? {}
this.onChange = new Evt()

for (const group of Object.values(this.groups)) {
group.onChange.attach(this.#groupChangeCtx, this.onChange.post)
}
}

use(groupSlug: string, group: ActionGroup) {
addGroup(groupSlug: string, group: ActionGroup) {
group.onChange.attach(this.#groupChangeCtx, this.onChange.post)
this.groups[groupSlug] = group
this.onChange.post()
}

removeGroup(groupSlug: string) {
const group = this.groups[groupSlug]
if (!group) return

group.onChange.detach(this.#groupChangeCtx)
delete this.groups[groupSlug]
}

add(slug: string, action: IntervalActionDefinition) {
this.actions[slug] = action

this.onChange.post()
}

remove(slug: string) {
if (this.actions[slug]) {
delete this.actions[slug]
this.onChange.post()
}
}
}
128 changes: 128 additions & 0 deletions src/classes/Actions.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
import { z } from 'zod'
import fetch from 'node-fetch'
import * as superjson from 'superjson'
import Logger from './Logger'
import Interval, { IntervalError, QueuedAction } from '..'
import { ENQUEUE_ACTION, DEQUEUE_ACTION } from '../internalRpcSchema'

/**
* This is effectively a namespace inside of Interval with a little bit of its own state.
*/
export default class Actions {
protected interval: Interval
#logger: Logger
#apiKey?: string
#endpoint: string

constructor(
interval: Interval,
endpoint: string,
logger: Logger,
apiKey?: string
) {
this.interval = interval
this.#apiKey = apiKey
this.#logger = logger
this.#endpoint = endpoint + '/api/actions'
}

#getAddress(path: string): string {
if (path.startsWith('/')) {
path = path.substring(1)
}

return `${this.#endpoint}/${path}`
}

async enqueue(
slug: string,
{ assignee, params }: Pick<QueuedAction, 'assignee' | 'params'> = {}
): Promise<QueuedAction> {
let body: z.infer<typeof ENQUEUE_ACTION['inputs']>
try {
const { json, meta } = params
? superjson.serialize(params)
: { json: undefined, meta: undefined }
body = ENQUEUE_ACTION.inputs.parse({
assignee,
slug,
params: json,
paramsMeta: meta,
})
} catch (err) {
this.#logger.debug(err)
throw new IntervalError('Invalid input.')
}

const response = await fetch(this.#getAddress('enqueue'), {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${this.#apiKey}`,
},
body: JSON.stringify(body),
})
.then(r => r.json())
.then(r => ENQUEUE_ACTION.returns.parseAsync(r))
.catch(err => {
this.#logger.debug(err)
throw new IntervalError('Received invalid API response.')
})

if (response.type === 'error') {
throw new IntervalError(
`There was a problem enqueuing the action: ${response.message}`
)
}

return {
id: response.id,
assignee,
params,
}
}

async dequeue(id: string): Promise<QueuedAction> {
let body: z.infer<typeof DEQUEUE_ACTION['inputs']>
try {
body = DEQUEUE_ACTION.inputs.parse({
id,
})
} catch (err) {
this.#logger.debug(err)
throw new IntervalError('Invalid input.')
}

const response = await fetch(this.#getAddress('dequeue'), {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${this.#apiKey}`,
},
body: JSON.stringify(body),
})
.then(r => r.json())
.then(r => DEQUEUE_ACTION.returns.parseAsync(r))
.catch(err => {
this.#logger.debug(err)
throw new IntervalError('Received invalid API response.')
})

if (response.type === 'error') {
throw new IntervalError(
`There was a problem enqueuing the action: ${response.message}`
)
}

let { type, params, paramsMeta, ...rest } = response

if (paramsMeta && params) {
params = superjson.deserialize({ json: params, meta: paramsMeta })
}

return {
...rest,
params,
}
}
}
80 changes: 55 additions & 25 deletions src/classes/IntervalClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,13 @@ export default class IntervalClient {
#retryIntervalMs: number = 3000
#pingIntervalMs: number = 30_000
#closeUnresponsiveConnectionTimeoutMs: number = 3 * 60 * 1000 // 3 minutes
#reinitializeBatchTimeoutMs: number = 200
#pingIntervalHandle: NodeJS.Timeout | undefined
#intentionallyClosed = false

#actionDefinitions: ActionDefinition[]
#groupDefinitions: GroupDefinition[]
#actionHandlers: Record<string, IntervalActionHandler>
#actionDefinitions: ActionDefinition[] = []
#groupDefinitions: GroupDefinition[] = []
#actionHandlers: Record<string, IntervalActionHandler> = {}

organization:
| {
Expand All @@ -93,6 +94,35 @@ export default class IntervalClient {
this.#endpoint = config.endpoint
}

this.#walkActions(config)

if (config.retryIntervalMs && config.retryIntervalMs > 0) {
this.#retryIntervalMs = config.retryIntervalMs
}

if (config.pingIntervalMs && config.pingIntervalMs > 0) {
this.#pingIntervalMs = config.pingIntervalMs
}

if (
config.closeUnresponsiveConnectionTimeoutMs &&
config.closeUnresponsiveConnectionTimeoutMs > 0
) {
this.#closeUnresponsiveConnectionTimeoutMs =
config.closeUnresponsiveConnectionTimeoutMs
}

if (
config.reinitializeBatchTimeoutMs &&
config.reinitializeBatchTimeoutMs > 0
) {
this.#reinitializeBatchTimeoutMs = config.reinitializeBatchTimeoutMs
}

this.#httpEndpoint = getHttpEndpoint(this.#endpoint)
}

#walkActions(config: InternalConfig) {
const groupDefinitions: GroupDefinition[] = []
const actionDefinitions: (ActionDefinition & { handler: undefined })[] = []
const actionHandlers: Record<string, IntervalActionHandler> = {}
Expand Down Expand Up @@ -140,24 +170,6 @@ export default class IntervalClient {
this.#groupDefinitions = groupDefinitions
this.#actionDefinitions = actionDefinitions
this.#actionHandlers = actionHandlers

if (config.retryIntervalMs && config.retryIntervalMs > 0) {
this.#retryIntervalMs = config.retryIntervalMs
}

if (config.pingIntervalMs && config.pingIntervalMs > 0) {
this.#pingIntervalMs = config.pingIntervalMs
}

if (
config.closeUnresponsiveConnectionTimeoutMs &&
config.closeUnresponsiveConnectionTimeoutMs > 0
) {
this.#closeUnresponsiveConnectionTimeoutMs =
config.closeUnresponsiveConnectionTimeoutMs
}

this.#httpEndpoint = getHttpEndpoint(this.#endpoint)
}

get #log() {
Expand All @@ -176,11 +188,24 @@ export default class IntervalClient {
| DuplexRPCClient<typeof wsServerSchema, typeof hostSchema>
| undefined = undefined
#isConnected = false
#isInitialized = false

get isConnected() {
return this.#isConnected
}

#reinitializeTimeout: NodeJS.Timeout | null = null

handleActionsChange(config: InternalConfig) {
if (this.#isInitialized && !this.#reinitializeTimeout) {
this.#reinitializeTimeout = setTimeout(async () => {
this.#walkActions(config)
await this.#initializeHost()
this.#reinitializeTimeout = null
}, this.#reinitializeBatchTimeoutMs)
}
}

async listen() {
if (this.#actionDefinitions.length === 0) {
this.#log.prod(
Expand Down Expand Up @@ -828,11 +853,16 @@ export default class IntervalClient {
this.organization = response.organization
this.environment = response.environment

this.#log.prod(
`🔗 Connected! Access your actions at: ${response.dashboardUrl}`
)
this.#log.debug('Host ID:', this.#ws.id)
if (!this.#isInitialized) {
this.#log.prod(
`🔗 Connected! Access your actions at: ${response.dashboardUrl}`
)
this.#log.debug('Host ID:', this.#ws.id)
this.#isInitialized = true
}
}

return response
}

async #send<MethodName extends keyof typeof wsServerSchema>(
Expand Down
Loading

0 comments on commit f8dadf7

Please sign in to comment.