Skip to content

Commit

Permalink
Add linear backoff and maximum retry logic for failed RPC calls
Browse files Browse the repository at this point in the history
While it turns out that this wasn't causing issues in the wild yet, it's
possible to basically saturate a connection with very large payloads and
our existing naive and static retry logic. This introduces backoff with
a max number of retries, so one failed RPC call can't permanently
bring a host to its knees.
  • Loading branch information
jacobmischka committed Apr 18, 2023
1 parent 41c3366 commit 9094a87
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 30 deletions.
7 changes: 5 additions & 2 deletions src/classes/DuplexRPCClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,10 @@ export class DuplexRPCClient<

public async send<MethodName extends keyof CallerSchema>(
methodName: MethodName,
inputs: z.input<CallerSchema[MethodName]['inputs']>
inputs: z.input<CallerSchema[MethodName]['inputs']>,
options: {
timeoutFactor?: number
} = {}
) {
const id = generateId()

Expand Down Expand Up @@ -335,7 +338,7 @@ export class DuplexRPCClient<
}
})
} else {
this.communicator.send(msg).catch(err => {
this.communicator.send(msg, options).catch(err => {
reject(err)
})
}
Expand Down
8 changes: 4 additions & 4 deletions src/classes/ISocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -224,15 +224,15 @@ export default class ISocket {
* Send a `MESSAGE` containing data to the connected counterpart,
* throwing an error if `ACK` is not received within `sendTimeout`.
*/
async send(data: string) {
async send(data: string, options: { timeoutFactor?: number } = {}) {
if (this.isClosed) throw new NotConnectedError()

return new Promise<void>((resolve, reject) => {
const id = v4()

const failTimeout = setTimeout(() => {
reject(new TimeoutError())
}, this.sendTimeout)
}, this.sendTimeout * (options.timeoutFactor ?? 1))

this.timeouts.add(failTimeout)

Expand Down Expand Up @@ -280,8 +280,8 @@ export default class ISocket {

this.id = config?.id || v4()
this.connectTimeout = config?.connectTimeout ?? 15_000
this.sendTimeout = config?.sendTimeout ?? 3000
this.pingTimeout = config?.pingTimeout ?? 3000
this.sendTimeout = config?.sendTimeout ?? 5000
this.pingTimeout = config?.pingTimeout ?? 5000
this.isAuthenticated = false

this.onClose.attach(() => {
Expand Down
69 changes: 47 additions & 22 deletions src/classes/IntervalClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ export default class IntervalClient {
#completeHttpRequestDelayMs: number = 3000
#completeShutdownDelayMs: number = 3000
#retryIntervalMs: number = 3000
#maxResendAttempts: number = 10
#pingIntervalMs: number = 30_000
#closeUnresponsiveConnectionTimeoutMs: number = 3 * 60 * 1000 // 3 minutes
#reinitializeBatchTimeoutMs: number = 200
Expand Down Expand Up @@ -175,6 +176,10 @@ export default class IntervalClient {
this.#completeHttpRequestDelayMs = config.completeHttpRequestDelayMs
}

if (config.maxResendAttempts && config.maxResendAttempts > 0) {
this.#maxResendAttempts = config.maxResendAttempts
}

this.#httpEndpoint = getHttpEndpoint(this.#endpoint)

if (config.setHostHandlers) {
Expand Down Expand Up @@ -499,7 +504,8 @@ export default class IntervalClient {
)
: new Map(this.#pendingIOCalls)

while (toResend.size > 0) {
let attemptNumber = 1
while (toResend.size > 0 && attemptNumber <= this.#maxResendAttempts) {
await Promise.allSettled(
Array.from(toResend.entries()).map(([transactionId, ioCall]) =>
this.#send('SEND_IO_CALL', {
Expand Down Expand Up @@ -534,15 +540,16 @@ export default class IntervalClient {
this.#logger.debug('Failed resending pending IO call:', err)
}

const retrySleepMs = this.#retryIntervalMs * attemptNumber
this.#logger.debug(
`Trying again in ${Math.round(
this.#retryIntervalMs / 1000
)}s...`
`Trying again in ${Math.round(retrySleepMs / 1000)}s...`
)
await sleep(this.#retryIntervalMs)
await sleep(retrySleepMs)
})
)
)

attemptNumber++
}
}

Expand All @@ -560,7 +567,8 @@ export default class IntervalClient {
)
: new Map(this.#pendingPageLayouts)

while (toResend.size > 0) {
let attemptNumber = 1
while (toResend.size > 0 && attemptNumber <= this.#maxResendAttempts) {
await Promise.allSettled(
Array.from(toResend.entries()).map(([pageKey, page]) =>
this.#send('SEND_PAGE', {
Expand Down Expand Up @@ -595,15 +603,16 @@ export default class IntervalClient {
this.#logger.debug('Failed resending pending page layout:', err)
}

const retrySleepMs = this.#retryIntervalMs * attemptNumber
this.#logger.debug(
`Trying again in ${Math.round(
this.#retryIntervalMs / 1000
)}s...`
`Trying again in ${Math.round(retrySleepMs / 1000)}s...`
)
await sleep(this.#retryIntervalMs)
await sleep(retrySleepMs)
})
)
)

attemptNumber++
}
}

Expand All @@ -621,7 +630,8 @@ export default class IntervalClient {
)
: new Map(this.#transactionLoadingStates)

while (toResend.size > 0) {
let attemptNumber = 0
while (toResend.size > 0 && attemptNumber <= this.#maxResendAttempts) {
await Promise.allSettled(
Array.from(toResend.entries()).map(([transactionId, loadingState]) =>
this.#send('SEND_LOADING_CALL', {
Expand Down Expand Up @@ -657,15 +667,16 @@ export default class IntervalClient {
this.#logger.debug('Failed resending pending IO call:', err)
}

const retrySleepMs = this.#retryIntervalMs * attemptNumber
this.#logger.debug(
`Trying again in ${Math.round(
this.#retryIntervalMs / 1000
)}s...`
`Trying again in ${Math.round(retrySleepMs / 1000)}s...`
)
await sleep(this.#retryIntervalMs)
await sleep(retrySleepMs)
})
)
)

attemptNumber++
}
}

Expand Down Expand Up @@ -2003,27 +2014,41 @@ export default class IntervalClient {
this.#logger.debug('Error from peer RPC', err)
}

while (true) {
for (
let attemptNumber = 1;
attemptNumber <= this.#maxResendAttempts;
attemptNumber++
) {
try {
this.#logger.debug('Sending via server', methodName, inputs)
return await this.#serverRpc.send(methodName, {
...inputs,
skipClientCall,
})
return await this.#serverRpc.send(
methodName,
{
...inputs,
skipClientCall,
},
{
timeoutFactor: attemptNumber,
}
)
} catch (err) {
const sleepTimeBeforeRetrying = this.#retryIntervalMs * attemptNumber

if (err instanceof TimeoutError) {
this.#log.debug(
`RPC call timed out, retrying in ${Math.round(
this.#retryIntervalMs / 1000
sleepTimeBeforeRetrying / 1000
)}s...`
)
this.#log.debug(err)
sleep(this.#retryIntervalMs)
sleep(sleepTimeBeforeRetrying)
} else {
throw err
}
}
}

throw new IntervalError('Maximum failed resend attempts reached, aborting.')
}

/**
Expand Down
4 changes: 2 additions & 2 deletions src/examples/basic/table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ export const multiple_tables: IntervalActionHandler = async io => {
export const big_payload_table = new Page({
name: 'Big table',
handler: async () => {
const bigData = generateRows(100000)
const bigData = generateRows(10_000)

return new Layout({
children: [
Expand Down Expand Up @@ -311,7 +311,7 @@ export const table_custom: IntervalActionHandler = async io => {

const rows: { [key: string]: any }[] = []
for (let i = 0; i < rowsCount; i++) {
const row: typeof rows[0] = {}
const row: (typeof rows)[0] = {}
for (const field of fields) {
switch (field.value) {
case 'id':
Expand Down
5 changes: 5 additions & 0 deletions src/examples/utils/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ export function generateRows(count: number, offset = 0) {
~~~`,
]),
number: faker.datatype.number(100),
...Object.fromEntries(
Array(50)
.fill(null)
.map((_, i) => [`text_${i}`, faker.lorem.paragraph()])
),
boolean: faker.datatype.boolean(),
date: faker.datatype.datetime(),
image: faker.image.imageUrl(
Expand Down
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ export interface InternalConfig {
connectTimeoutMs?: number
sendTimeoutMs?: number
pingTimeoutMs?: number
maxResendAttempts?: number
completeHttpRequestDelayMs?: number

closeUnresponsiveConnectionTimeoutMs?: number
Expand Down

0 comments on commit 9094a87

Please sign in to comment.