From 25374f039a5a14f7372299c9df99f5ce7c3e7b4b Mon Sep 17 00:00:00 2001 From: Serge Klochkov <3175289+slvrtrn@users.noreply.github.com> Date: Tue, 10 Dec 2024 15:02:57 +0100 Subject: [PATCH] Fix an uncaught exception during failure response decompression (#366) --- .../integration/node_compression.test.ts | 81 +++++++++++++++++++ .../client-node/src/connection/compression.ts | 19 +++-- .../src/connection/node_base_connection.ts | 10 ++- 3 files changed, 100 insertions(+), 10 deletions(-) create mode 100644 packages/client-node/__tests__/integration/node_compression.test.ts diff --git a/packages/client-node/__tests__/integration/node_compression.test.ts b/packages/client-node/__tests__/integration/node_compression.test.ts new file mode 100644 index 00000000..209eef4b --- /dev/null +++ b/packages/client-node/__tests__/integration/node_compression.test.ts @@ -0,0 +1,81 @@ +import type { ClickHouseClient } from '@clickhouse/client-common' +import { createTestClient } from '@test/utils' +import http from 'http' +import type Stream from 'stream' +import type { NodeClickHouseClientConfigOptions } from '../../src/config' + +describe('[Node.js] Compression', () => { + const port = 18123 + + let client: ClickHouseClient + let server: http.Server + + describe('Malformed compression response', () => { + const logAndQuit = (err: Error | unknown, prefix: string) => { + console.error(prefix, err) + process.exit(1) + } + const uncaughtExceptionListener = (err: Error) => + logAndQuit(err, 'uncaughtException:') + const unhandledRejectionListener = (err: unknown) => + logAndQuit(err, 'unhandledRejection:') + + beforeEach(async () => { + process.on('uncaughtException', uncaughtExceptionListener) + process.on('unhandledRejection', unhandledRejectionListener) + client = createTestClient({ + url: `http://localhost:${port}`, + compression: { + response: true, + }, + } as NodeClickHouseClientConfigOptions) + }) + afterEach(async () => { + process.off('uncaughtException', uncaughtExceptionListener) + process.off('unhandledRejection', unhandledRejectionListener) + await client.close() + server.close() + }) + + it('should not propagate the exception to the global context if a failed response is malformed', async () => { + server = http.createServer(async (_req, res) => { + return makeResponse(res, 500) + }) + server.listen(port) + + // The request fails completely (and the error message cannot be decompressed) + await expectAsync( + client.query({ + query: 'SELECT 1', + format: 'JSONEachRow', + }), + ).toBeRejectedWith( + jasmine.objectContaining({ + code: 'Z_DATA_ERROR', + }), + ) + }) + + it('should not propagate the exception to the global context if a successful response is malformed', async () => { + server = http.createServer(async (_req, res) => { + return makeResponse(res, 200) + }) + server.listen(port) + + const rs = await client.query({ + query: 'SELECT 1', + format: 'JSONEachRow', + }) + + // Fails during the response streaming + await expectAsync(rs.text()).toBeRejectedWithError() + }) + }) + + function makeResponse(res: http.ServerResponse, status: 200 | 500) { + res.appendHeader('Content-Encoding', 'gzip') + res.statusCode = status + res.write('A malformed response without compression') + return res.end() + } +}) diff --git a/packages/client-node/src/connection/compression.ts b/packages/client-node/src/connection/compression.ts index 4bd90933..983932b5 100644 --- a/packages/client-node/src/connection/compression.ts +++ b/packages/client-node/src/connection/compression.ts @@ -1,12 +1,14 @@ +import type { LogWriter } from '@clickhouse/client-common' import type Http from 'http' import Stream from 'stream' import Zlib from 'zlib' -export function decompressResponse(response: Http.IncomingMessage): - | { - response: Stream.Readable - } - | { error: Error } { +type DecompressResponseResult = { response: Stream.Readable } | { error: Error } + +export function decompressResponse( + response: Http.IncomingMessage, + logWriter: LogWriter, +): DecompressResponseResult { const encoding = response.headers['content-encoding'] if (encoding === 'gzip') { @@ -16,9 +18,10 @@ export function decompressResponse(response: Http.IncomingMessage): Zlib.createGunzip(), function pipelineCb(err) { if (err) { - // FIXME: use logger instead - // eslint-disable-next-line no-console - console.error(err) + logWriter.error({ + message: 'An error occurred while decompressing the response', + err, + }) } }, ), diff --git a/packages/client-node/src/connection/node_base_connection.ts b/packages/client-node/src/connection/node_base_connection.ts index d17dfc84..77d45864 100644 --- a/packages/client-node/src/connection/node_base_connection.ts +++ b/packages/client-node/src/connection/node_base_connection.ts @@ -465,7 +465,7 @@ export abstract class NodeBaseConnection // even if the stream decompression is disabled, we have to decompress it in case of an error const isFailedResponse = !isSuccessfulResponse(_response.statusCode) if (tryDecompressResponseStream || isFailedResponse) { - const decompressionResult = decompressResponse(_response) + const decompressionResult = decompressResponse(_response, this.logger) if (isDecompressionError(decompressionResult)) { return reject(decompressionResult.error) } @@ -474,7 +474,13 @@ export abstract class NodeBaseConnection responseStream = _response } if (isFailedResponse) { - reject(parseError(await getAsText(responseStream))) + try { + const errorMessage = await getAsText(responseStream) + reject(parseError(errorMessage)) + } catch (err) { + // If the ClickHouse response is malformed + reject(err) + } } else { return resolve({ stream: responseStream,