Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix an uncaught exception during failure response decompression #366

Merged
merged 1 commit into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import type { ClickHouseClient } from '@clickhouse/client-common'
import { createTestClient } from '@test/utils'
import http from 'http'
import type Stream from 'stream'
import type { NodeClickHouseClientConfigOptions } from '../../src/config'

describe('[Node.js] Compression', () => {
const port = 18123

let client: ClickHouseClient<Stream.Readable>
let server: http.Server

describe('Malformed compression response', () => {
const logAndQuit = (err: Error | unknown, prefix: string) => {
console.error(prefix, err)
process.exit(1)
}
const uncaughtExceptionListener = (err: Error) =>
logAndQuit(err, 'uncaughtException:')
const unhandledRejectionListener = (err: unknown) =>
logAndQuit(err, 'unhandledRejection:')

beforeEach(async () => {
process.on('uncaughtException', uncaughtExceptionListener)
process.on('unhandledRejection', unhandledRejectionListener)
client = createTestClient({
url: `http://localhost:${port}`,
compression: {
response: true,
},
} as NodeClickHouseClientConfigOptions)
})
afterEach(async () => {
process.off('uncaughtException', uncaughtExceptionListener)
process.off('unhandledRejection', unhandledRejectionListener)
await client.close()
server.close()
})

it('should not propagate the exception to the global context if a failed response is malformed', async () => {
server = http.createServer(async (_req, res) => {
return makeResponse(res, 500)
})
server.listen(port)

// The request fails completely (and the error message cannot be decompressed)
await expectAsync(
client.query({
query: 'SELECT 1',
format: 'JSONEachRow',
}),
).toBeRejectedWith(
jasmine.objectContaining({
code: 'Z_DATA_ERROR',
}),
)
})

it('should not propagate the exception to the global context if a successful response is malformed', async () => {
server = http.createServer(async (_req, res) => {
return makeResponse(res, 200)
})
server.listen(port)

const rs = await client.query({
query: 'SELECT 1',
format: 'JSONEachRow',
})

// Fails during the response streaming
await expectAsync(rs.text()).toBeRejectedWithError()
})
})

function makeResponse(res: http.ServerResponse, status: 200 | 500) {
res.appendHeader('Content-Encoding', 'gzip')
res.statusCode = status
res.write('A malformed response without compression')
return res.end()
}
})
19 changes: 11 additions & 8 deletions packages/client-node/src/connection/compression.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import type { LogWriter } from '@clickhouse/client-common'
import type Http from 'http'
import Stream from 'stream'
import Zlib from 'zlib'

export function decompressResponse(response: Http.IncomingMessage):
| {
response: Stream.Readable
}
| { error: Error } {
type DecompressResponseResult = { response: Stream.Readable } | { error: Error }

export function decompressResponse(
response: Http.IncomingMessage,
logWriter: LogWriter,
): DecompressResponseResult {
const encoding = response.headers['content-encoding']

if (encoding === 'gzip') {
Expand All @@ -16,9 +18,10 @@ export function decompressResponse(response: Http.IncomingMessage):
Zlib.createGunzip(),
function pipelineCb(err) {
if (err) {
// FIXME: use logger instead
// eslint-disable-next-line no-console
console.error(err)
logWriter.error({
message: 'An error occurred while decompressing the response',
err,
})
}
},
),
Expand Down
10 changes: 8 additions & 2 deletions packages/client-node/src/connection/node_base_connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ export abstract class NodeBaseConnection
// even if the stream decompression is disabled, we have to decompress it in case of an error
const isFailedResponse = !isSuccessfulResponse(_response.statusCode)
if (tryDecompressResponseStream || isFailedResponse) {
const decompressionResult = decompressResponse(_response)
const decompressionResult = decompressResponse(_response, this.logger)
if (isDecompressionError(decompressionResult)) {
return reject(decompressionResult.error)
}
Expand All @@ -474,7 +474,13 @@ export abstract class NodeBaseConnection
responseStream = _response
}
if (isFailedResponse) {
reject(parseError(await getAsText(responseStream)))
try {
const errorMessage = await getAsText(responseStream)
reject(parseError(errorMessage))
} catch (err) {
// If the ClickHouse response is malformed
reject(err)
}
} else {
return resolve({
stream: responseStream,
Expand Down
Loading