Skip to content

Commit

Permalink
Merge pull request #1137 from interval/begin-shutdown
Browse files Browse the repository at this point in the history
Add async `safelyClose` method to close after transactions complete
  • Loading branch information
jacobmischka authored Mar 15, 2023
2 parents ba60021 + f3f61ab commit ddbe0b9
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 9 deletions.
62 changes: 60 additions & 2 deletions src/classes/IntervalClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,14 @@ export default class IntervalClient {
#httpEndpoint: string
#logger: Logger
#completeHttpRequestDelayMs: number = 3000
#completeShutdownDelayMs: number = 3000
#retryIntervalMs: number = 3000
#pingIntervalMs: number = 30_000
#closeUnresponsiveConnectionTimeoutMs: number = 3 * 60 * 1000 // 3 minutes
#reinitializeBatchTimeoutMs: number = 200
#pingIntervalHandle: NodeJS.Timeout | undefined
#intentionallyClosed = false
#resolveShutdown: (() => void) | undefined
#config: InternalConfig

#actionDefinitions: ActionDefinition[] = []
Expand Down Expand Up @@ -372,7 +374,8 @@ export default class IntervalClient {
return await result
}

close() {
immediatelyClose() {
this.#resolveShutdown = undefined
this.#intentionallyClosed = true

if (this.#serverRpc) {
Expand All @@ -393,6 +396,36 @@ export default class IntervalClient {
this.#isConnected = false
}

async safelyClose(): Promise<void> {
const response = await this.#send(
'BEGIN_HOST_SHUTDOWN',
{},
{
attemptPeerSend: false,
}
)

if (response.type === 'error') {
throw new IntervalError(
response.message ?? 'Unknown error sending shutdown request.'
)
}

if (this.#ioResponseHandlers.size === 0) {
this.immediatelyClose()
return
}

return new Promise<void>(resolve => {
this.#resolveShutdown = resolve
}).then(() => {
// doing this here and in #close just to be extra sure
// it's not missed in any future code paths
this.#resolveShutdown = undefined
this.immediatelyClose()
})
}

async declareHost(httpHostId: string) {
await this.#walkRoutes()

Expand Down Expand Up @@ -655,6 +688,12 @@ export default class IntervalClient {
this.#peerIdMap.delete(transactionId)
this.#peerIdToTransactionIdsMap.get(peerId)?.delete(transactionId)
}

if (this.#resolveShutdown && this.#ioResponseHandlers.size === 0) {
setTimeout(() => {
this.#resolveShutdown?.()
}, this.#completeShutdownDelayMs)
}
}

/**
Expand Down Expand Up @@ -756,7 +795,9 @@ export default class IntervalClient {
new Date().getTime() - this.#closeUnresponsiveConnectionTimeoutMs
) {
this.#logger.warn(
'No pong received in last three minutes, closing connection to Interval and retrying...'
`No pong received in last ${
this.#closeUnresponsiveConnectionTimeoutMs
}ms, closing connection to Interval and retrying...`
)
if (this.#pingIntervalHandle) {
clearInterval(this.#pingIntervalHandle)
Expand Down Expand Up @@ -917,6 +958,13 @@ export default class IntervalClient {
}
},
START_TRANSACTION: async inputs => {
if (this.#resolveShutdown) {
this.#logger.debug(
'In process of closing, refusing to start transaction'
)
return
}

if (!intervalClient.organization) {
intervalClient.#log.error('No organization defined')
return
Expand Down Expand Up @@ -1212,6 +1260,10 @@ export default class IntervalClient {
this.#closeTransaction(transactionId)
},
OPEN_PAGE: async inputs => {
if (this.#resolveShutdown) {
return { type: 'ERROR' as const, message: 'Host shutting down.' }
}

if (!this.organization) {
this.#log.error('No organization defined')

Expand Down Expand Up @@ -1627,6 +1679,12 @@ export default class IntervalClient {
}
}, this.#completeHttpRequestDelayMs)
}

if (this.#resolveShutdown && this.#ioResponseHandlers.size === 0) {
setTimeout(() => {
this.#resolveShutdown?.()
}, this.#completeShutdownDelayMs)
}
},
}
}
Expand Down
33 changes: 33 additions & 0 deletions src/examples/shutdown/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import Interval from '../../index'
import { sleep } from '../utils/helpers'

const interval = new Interval({
apiKey: 'live_N47qd1BrOMApNPmVd0BiDZQRLkocfdJKzvt8W6JT5ICemrAN',
logLevel: 'debug',
endpoint: 'ws://localhost:3000/websocket',
routes: {
wait_a_while: async (io, ctx) => {
await ctx.loading.start('Waiting...')
await sleep(5000)
return 'Done!'
},
},
})

interval.listen()

process.on('SIGINT', () => {
interval
.safelyClose()
.then(() => {
console.log('Shut down!')
process.exit(0)
})
.catch(err => {
console.error(
'Failed shutting down gracefully, forcibly closing connection'
)
interval.immediatelyClose()
process.exit(0)
})
})
4 changes: 2 additions & 2 deletions src/experimental.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ class ExperimentalInterval extends Interval {
const client = new IntervalClient(this, this.config)
const response = await client.respondToRequest(requestId)

client.close()
client.immediatelyClose()

return response
}
Expand All @@ -141,7 +141,7 @@ class ExperimentalInterval extends Interval {
const client = new IntervalClient(this, this.config)
const response = await client.declareHost(httpHostId)

client.close()
client.immediatelyClose()

return response
}
Expand Down
10 changes: 7 additions & 3 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,12 @@ export default class Interval {
return this.#client.listen()
}

close() {
return this.#client?.close()
immediatelyClose() {
return this.#client?.immediatelyClose()
}

async safelyClose(): Promise<void> {
return this.#client?.safelyClose()
}

/* @internal */ get client() {
Expand Down Expand Up @@ -278,7 +282,7 @@ export default class Interval {
return
}

let body: z.infer<typeof NOTIFY['inputs']>
let body: z.infer<(typeof NOTIFY)['inputs']>
try {
body = NOTIFY.inputs.parse({
...config,
Expand Down
18 changes: 16 additions & 2 deletions src/internalRpcSchema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,20 @@ export const wsServerSchema = {
])
.nullable(),
},
BEGIN_HOST_SHUTDOWN: {
// intentional empty object to allow for future additions
// and to support current `skipClientCall` behavior
inputs: z.object({}),
returns: z.discriminatedUnion('type', [
z.object({
type: z.literal('success'),
}),
z.object({
type: z.literal('error'),
message: z.string().optional(),
}),
]),
},
}

export type WSServerSchema = typeof wsServerSchema
Expand Down Expand Up @@ -585,5 +599,5 @@ export const hostSchema = {
export type HostSchema = typeof hostSchema

export type PeerConnectionInitializer = (
inputs: z.infer<typeof INITIALIZE_PEER_CONNECTION['inputs']>
) => Promise<z.infer<typeof INITIALIZE_PEER_CONNECTION['returns']>>
inputs: z.infer<(typeof INITIALIZE_PEER_CONNECTION)['inputs']>
) => Promise<z.infer<(typeof INITIALIZE_PEER_CONNECTION)['returns']>>

0 comments on commit ddbe0b9

Please sign in to comment.