Skip to content

Commit

Permalink
Merge pull request #793 from interval/ws-rate-limit
Browse files Browse the repository at this point in the history
Rate limit web socket messages, close connection if limit exceeded
  • Loading branch information
jacobmischka authored Sep 7, 2022
2 parents 5539d43 + a89b8c6 commit aad899f
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 6 deletions.
18 changes: 16 additions & 2 deletions src/classes/ISocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ const MESSAGE_META = z.object({

export class TimeoutError extends Error {}

export class NotConnectedError extends Error {}

interface PendingMessage {
data: string
onAckReceived: () => void
Expand Down Expand Up @@ -46,6 +48,7 @@ export default class ISocket {
private pingTimeout: number
private isAuthenticated: boolean
private timeouts: Set<NodeJS.Timeout>
private isClosed = false
onMessage: Evt<string>
onOpen: Evt<void>
onError: Evt<Error>
Expand Down Expand Up @@ -95,6 +98,8 @@ export default class ISocket {
* throwing an error if `ACK` is not received within `sendTimeout`.
*/
async send(data: string) {
if (this.isClosed) throw new NotConnectedError()

return new Promise<void>((resolve, reject) => {
const id = v4()

Expand All @@ -121,8 +126,10 @@ export default class ISocket {
* Close the underlying WebSocket connection, and this ISocket
* connection.
*/
close() {
return this.ws.close()
close(code?: number, reason?: string) {
this.isClosed = true
this.onMessage.detach()
return this.ws.close(code, reason)
}

constructor(ws: WebSocket | NodeWebSocket, config?: ISocketConfig) {
Expand All @@ -148,13 +155,15 @@ export default class ISocket {
this.isAuthenticated = false

this.onClose.attach(() => {
this.isClosed = true
for (const timeout of this.timeouts) {
clearTimeout(timeout)
}
this.timeouts.clear()
})

this.ws.onopen = () => {
this.isClosed = false
this.onOpen.post()
}

Expand All @@ -172,6 +181,9 @@ export default class ISocket {
if (evt.stopPropagation) {
evt.stopPropagation()
}

if (this.isClosed) return

const data = JSON.parse(evt.data.toString())
const meta = MESSAGE_META.parse(data)

Expand Down Expand Up @@ -218,6 +230,8 @@ export default class ISocket {
* `pong` is not received within `pingTimeout`.
*/
async ping() {
if (this.isClosed) throw new NotConnectedError()

if (!('ping' in this.ws)) {
// Not supported in web client WebSocket
throw new Error(
Expand Down
6 changes: 3 additions & 3 deletions src/classes/Logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,16 @@ export default class Logger {
}

warn(...args: any[]) {
console.warn(...args)
console.warn('[Interval] ', ...args)
}

error(...args: any[]) {
console.error(...args)
console.error('[Interval] ', ...args)
}

debug(...args: any[]) {
if (this.logLevel === 'debug') {
console.debug(...args)
console.debug('[Interval] ', ...args)
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/examples/basic/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,7 @@ const interval = new Interval({
}
},
log_dos: async () => {
for (let i = 0; i < 100_000; i++) {
for (let i = 0; i < 1000; i++) {
await ctx.log(i)
}
},
Expand Down

0 comments on commit aad899f

Please sign in to comment.