From 43751b0012e0a3e26523eed6ca333df7d81f7f4d Mon Sep 17 00:00:00 2001 From: Serge Klochkov <3175289+slvrtrn@users.noreply.github.com> Date: Thu, 22 Aug 2024 16:19:34 +0300 Subject: [PATCH] Allow to disable stream decompression for `exec` (#300) --- CHANGELOG.md | 6 ++ packages/client-common/src/client.ts | 41 +++++++---- packages/client-common/src/connection.ts | 1 + .../client-common/src/utils/connection.ts | 12 ++-- packages/client-common/src/version.ts | 2 +- .../__tests__/integration/node_exec.test.ts | 46 ++++++++++++- .../src/connection/node_base_connection.ts | 69 +++++++++++++------ .../node_custom_agent_connection.ts | 4 +- .../src/connection/node_http_connection.ts | 4 +- .../src/connection/node_https_connection.ts | 4 +- packages/client-node/src/version.ts | 2 +- .../src/connection/web_connection.ts | 5 +- packages/client-web/src/version.ts | 2 +- 13 files changed, 146 insertions(+), 52 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7ad66363..c3646dc9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +# 1.5.0 (Node.js) + +## New features + +- It is now possible to disable the automatic decompression of the response stream with the `exec` method. See `ExecParams.decompress_response_stream` for more details. ([#298](https://github.com/ClickHouse/clickhouse-js/issues/298)). + # 1.4.1 (Node.js, Web) ## Improvements diff --git a/packages/client-common/src/client.ts b/packages/client-common/src/client.ts index 60d3ca4d..d7039b00 100644 --- a/packages/client-common/src/client.ts +++ b/packages/client-common/src/client.ts @@ -66,10 +66,17 @@ export type ExecParams = BaseQueryParams & { * If {@link ExecParamsWithValues.values} are defined, the query is sent as a request parameter, * and the values are sent in the request body instead. */ query: string + /** If set to `false`, the client _will not_ decompress the response stream, even if the response compression + * was requested by the client via the {@link BaseClickHouseClientConfigOptions.compression.response } setting. + * This could be useful if the response stream is passed to another application as-is, + * and the decompression is handled there. + * @note 1) Node.js only. This setting will have no effect on the Web version. + * @note 2) In case of an error, the stream will be decompressed anyway, regardless of this setting. + * @default true */ + decompress_response_stream?: boolean } export type ExecParamsWithValues = ExecParams & { - /** If you have a custom INSERT statement to run with `exec`, - * the data from this stream will be inserted. + /** If you have a custom INSERT statement to run with `exec`, the data from this stream will be inserted. * * NB: the data in the stream is expected to be serialized accordingly to the FORMAT clause * used in {@link ExecParams.query} in this case. @@ -170,11 +177,12 @@ export class ClickHouseClient { } /** - * Used for most statements that can have a response, such as SELECT. - * FORMAT clause should be specified separately via {@link QueryParams.format} (default is JSON) - * Consider using {@link ClickHouseClient.insert} for data insertion, - * or {@link ClickHouseClient.command} for DDLs. + * Used for most statements that can have a response, such as `SELECT`. + * FORMAT clause should be specified separately via {@link QueryParams.format} (default is `JSON`). + * Consider using {@link ClickHouseClient.insert} for data insertion, or {@link ClickHouseClient.command} for DDLs. * Returns an implementation of {@link BaseResultSet}. + * + * See {@link DataFormat} for the formats supported by the client. */ async query( params: QueryParamsWithFormat, @@ -211,7 +219,9 @@ export class ClickHouseClient { * when the format clause is not applicable, or when you are not interested in the response at all. * Response stream is destroyed immediately as we do not expect useful information there. * Examples of such statements are DDLs or custom inserts. - * If you are interested in the response data, consider using {@link ClickHouseClient.exec} + * + * @note if you have a custom query that does not work with {@link ClickHouseClient.query}, + * and you are interested in the response data, consider using {@link ClickHouseClient.exec}. */ async command(params: CommandParams): Promise { const query = removeTrailingSemi(params.query.trim()) @@ -222,18 +232,23 @@ export class ClickHouseClient { } /** - * Similar to {@link ClickHouseClient.command}, but for the cases where the output is expected, - * but format clause is not applicable. The caller of this method is expected to consume the stream, - * otherwise, the request will eventually be timed out. + * Similar to {@link ClickHouseClient.command}, but for the cases where the output _is expected_, + * but format clause is not applicable. The caller of this method _must_ consume the stream, + * as the underlying socket will not be released until then, and the request will eventually be timed out. + * + * @note it is not intended to use this method to execute the DDLs, such as `CREATE TABLE` or similar; + * use {@link ClickHouseClient.command} instead. */ async exec( params: ExecParams | ExecParamsWithValues, ): Promise> { const query = removeTrailingSemi(params.query.trim()) const values = 'values' in params ? params.values : undefined + const decompress_response_stream = params.decompress_response_stream ?? true return await this.connection.exec({ query, values, + decompress_response_stream, ...this.withClientQueryParams(params), }) } @@ -242,8 +257,10 @@ export class ClickHouseClient { * The primary method for data insertion. It is recommended to avoid arrays in case of large inserts * to reduce application memory consumption and consider streaming for most of such use cases. * As the insert operation does not provide any output, the response stream is immediately destroyed. - * In case of a custom insert operation, such as, for example, INSERT FROM SELECT, - * consider using {@link ClickHouseClient.command}, passing the entire raw query there (including FORMAT clause). + * + * @note in case of a custom insert operation (e.g., `INSERT FROM SELECT`), + * consider using {@link ClickHouseClient.command}, passing the entire raw query there + * (including the `FORMAT` clause). */ async insert(params: InsertParams): Promise { if (Array.isArray(params.values) && params.values.length === 0) { diff --git a/packages/client-common/src/connection.ts b/packages/client-common/src/connection.ts index 6d824d5a..821fe1b1 100644 --- a/packages/client-common/src/connection.ts +++ b/packages/client-common/src/connection.ts @@ -41,6 +41,7 @@ export interface ConnInsertParams extends ConnBaseQueryParams { export interface ConnExecParams extends ConnBaseQueryParams { values?: Stream + decompress_response_stream?: boolean } export interface ConnBaseResult extends WithResponseHeaders { diff --git a/packages/client-common/src/utils/connection.ts b/packages/client-common/src/utils/connection.ts index efef8503..62e65eac 100644 --- a/packages/client-common/src/utils/connection.ts +++ b/packages/client-common/src/utils/connection.ts @@ -5,17 +5,17 @@ export type HttpHeaders = Record export function withCompressionHeaders({ headers, - compress_request, - decompress_response, + enable_request_compression, + enable_response_compression, }: { headers: HttpHeaders - compress_request: boolean | undefined - decompress_response: boolean | undefined + enable_request_compression: boolean | undefined + enable_response_compression: boolean | undefined }): Record { return { ...headers, - ...(decompress_response ? { 'Accept-Encoding': 'gzip' } : {}), - ...(compress_request ? { 'Content-Encoding': 'gzip' } : {}), + ...(enable_response_compression ? { 'Accept-Encoding': 'gzip' } : {}), + ...(enable_request_compression ? { 'Content-Encoding': 'gzip' } : {}), } } diff --git a/packages/client-common/src/version.ts b/packages/client-common/src/version.ts index e4f53bef..b7c78f16 100644 --- a/packages/client-common/src/version.ts +++ b/packages/client-common/src/version.ts @@ -1 +1 @@ -export default '1.4.1' +export default '1.5.0' diff --git a/packages/client-node/__tests__/integration/node_exec.test.ts b/packages/client-node/__tests__/integration/node_exec.test.ts index a343a9f9..52d64026 100644 --- a/packages/client-node/__tests__/integration/node_exec.test.ts +++ b/packages/client-node/__tests__/integration/node_exec.test.ts @@ -1,10 +1,10 @@ import type { ClickHouseClient } from '@clickhouse/client-common' -import { createTestClient } from '@test/utils' -import { guid } from '@test/utils' import { createSimpleTable } from '@test/fixtures/simple_table' +import { createTestClient, guid } from '@test/utils' import Stream from 'stream' -import { getAsText } from '../../src/utils' +import Zlib from 'zlib' import { drainStream, ResultSet } from '../../src' +import { getAsText } from '../../src/utils' describe('[Node.js] exec', () => { let client: ClickHouseClient @@ -165,4 +165,44 @@ describe('[Node.js] exec', () => { expect(await rs.json()).toEqual(expected) } }) + + describe('disabled stream decompression', () => { + beforeEach(() => { + client = createTestClient({ + compression: { + response: true, + }, + }) + }) + + it('should get a compressed response stream without decompressing it', async () => { + const result = await client.exec({ + query: 'SELECT 42 AS result FORMAT JSONEachRow', + decompress_response_stream: false, + }) + const text = await getAsText(decompress(result.stream)) + expect(text).toEqual('{"result":42}\n') + }) + + it('should force decompress in case of an error', async () => { + await expectAsync( + client.exec({ + query: 'invalid', + decompress_response_stream: false, + }), + ).toBeRejectedWith( + jasmine.objectContaining({ + message: jasmine.stringContaining('Syntax error'), + }), + ) + }) + + function decompress(stream: Stream.Readable) { + return Stream.pipeline(stream, Zlib.createGunzip(), (err) => { + if (err) { + console.error(err) + } + }) + } + }) }) diff --git a/packages/client-node/src/connection/node_base_connection.ts b/packages/client-node/src/connection/node_base_connection.ts index d54b53ee..0608cdbb 100644 --- a/packages/client-node/src/connection/node_base_connection.ts +++ b/packages/client-node/src/connection/node_base_connection.ts @@ -15,10 +15,10 @@ import type { LogWriter, ResponseHeaders, } from '@clickhouse/client-common' -import { sleep } from '@clickhouse/client-common' import { isSuccessfulResponse, parseError, + sleep, toSearchParams, transformUrl, withHttpSettings, @@ -63,8 +63,10 @@ export interface RequestParams { body?: string | Stream.Readable // provided by the user and wrapped around internally abort_signal: AbortSignal - decompress_response?: boolean - compress_request?: boolean + enable_response_compression?: boolean + enable_request_compression?: boolean + // if there are compression headers, attempt to decompress it + try_decompress_response_stream?: boolean parse_summary?: boolean } @@ -73,7 +75,6 @@ export abstract class NodeBaseConnection { protected readonly defaultAuthHeader: string protected readonly defaultHeaders: Http.OutgoingHttpHeaders - protected readonly additionalHTTPHeaders: Record private readonly logger: LogWriter private readonly knownSockets = new WeakMap() @@ -83,12 +84,11 @@ export abstract class NodeBaseConnection protected readonly params: NodeConnectionParams, protected readonly agent: Http.Agent, ) { - this.additionalHTTPHeaders = params.http_headers ?? {} this.defaultAuthHeader = `Basic ${Buffer.from( `${params.username}:${params.password}`, ).toString('base64')}` this.defaultHeaders = { - ...this.additionalHTTPHeaders, + ...(params.http_headers ?? {}), // KeepAlive agent for some reason does not set this on its own Connection: this.params.keep_alive.enabled ? 'keep-alive' : 'close', 'User-Agent': getUserAgent(this.params.application_id), @@ -137,13 +137,15 @@ export abstract class NodeBaseConnection ) const searchParams = toSearchParams({ database: this.params.database, - clickhouse_settings, query_params: params.query_params, session_id: params.session_id, + clickhouse_settings, query_id, }) - const decompressResponse = clickhouse_settings.enable_http_compression === 1 const { controller, controllerCleanup } = this.getAbortController(params) + // allows to enforce the compression via the settings even if the client instance has it disabled + const enableResponseCompression = + clickhouse_settings.enable_http_compression === 1 try { const { stream, response_headers } = await this.request( { @@ -151,7 +153,7 @@ export abstract class NodeBaseConnection url: transformUrl({ url: this.params.url, searchParams }), body: params.query, abort_signal: controller.signal, - decompress_response: decompressResponse, + enable_response_compression: enableResponseCompression, headers: this.buildRequestHeaders(params), }, 'Query', @@ -170,7 +172,7 @@ export abstract class NodeBaseConnection search_params: searchParams, err: err as Error, extra_args: { - decompress_response: decompressResponse, + decompress_response: enableResponseCompression, clickhouse_settings, }, }) @@ -200,7 +202,7 @@ export abstract class NodeBaseConnection url: transformUrl({ url: this.params.url, searchParams }), body: params.values, abort_signal: controller.signal, - compress_request: this.params.compression.compress_request, + enable_request_compression: this.params.compression.compress_request, parse_summary: true, headers: this.buildRequestHeaders(params), }, @@ -371,16 +373,28 @@ export abstract class NodeBaseConnection ): Promise> { const query_id = this.getQueryId(params.query_id) const sendQueryInParams = params.values !== undefined + const clickhouse_settings = withHttpSettings( + params.clickhouse_settings, + this.params.compression.decompress_response, + ) const toSearchParamsOptions = { query: sendQueryInParams ? params.query : undefined, database: this.params.database, - clickhouse_settings: params.clickhouse_settings, query_params: params.query_params, session_id: params.session_id, + clickhouse_settings, query_id, } const searchParams = toSearchParams(toSearchParamsOptions) const { controller, controllerCleanup } = this.getAbortController(params) + const tryDecompressResponseStream = + params.op === 'Exec' + ? // allows to disable stream decompression for the `Exec` operation only + params.decompress_response_stream ?? + this.params.compression.decompress_response + : // there is nothing useful in the response stream for the `Command` operation, + // and it is immediately destroyed; never decompress it + false try { const { stream, summary, response_headers } = await this.request( { @@ -389,6 +403,10 @@ export abstract class NodeBaseConnection body: sendQueryInParams ? params.values : params.query, abort_signal: controller.signal, parse_summary: true, + enable_request_compression: this.params.compression.compress_request, + enable_response_compression: + this.params.compression.decompress_response, + try_decompress_response_stream: tryDecompressResponseStream, headers: this.buildRequestHeaders(params), }, params.op, @@ -438,20 +456,30 @@ export abstract class NodeBaseConnection ): Promise => { this.logResponse(op, request, params, _response, start) - const decompressionResult = decompressResponse(_response) - if (isDecompressionError(decompressionResult)) { - return reject(decompressionResult.error) + let responseStream: Stream.Readable + const tryDecompressResponseStream = + params.try_decompress_response_stream ?? true + // even if the stream decompression is disabled, we have to decompress it in case of an error + const isFailedResponse = !isSuccessfulResponse(_response.statusCode) + if (tryDecompressResponseStream || isFailedResponse) { + const decompressionResult = decompressResponse(_response) + if (isDecompressionError(decompressionResult)) { + return reject(decompressionResult.error) + } + responseStream = decompressionResult.response + } else { + responseStream = _response } - if (isSuccessfulResponse(_response.statusCode)) { + if (isFailedResponse) { + reject(parseError(await getAsText(responseStream))) + } else { return resolve({ - stream: decompressionResult.response, + stream: responseStream, summary: params.parse_summary ? this.parseSummary(op, _response) : undefined, response_headers: { ..._response.headers }, }) - } else { - reject(parseError(await getAsText(decompressionResult.response))) } } @@ -492,7 +520,7 @@ export abstract class NodeBaseConnection } } - if (params.compress_request) { + if (params.enable_request_compression) { Stream.pipeline(bodyStream, Zlib.createGzip(), request, callback) } else { Stream.pipeline(bodyStream, request, callback) @@ -626,4 +654,5 @@ interface SocketInfo { type RunExecParams = ConnBaseQueryParams & { op: 'Exec' | 'Command' values?: ConnExecParams['values'] + decompress_response_stream?: boolean } diff --git a/packages/client-node/src/connection/node_custom_agent_connection.ts b/packages/client-node/src/connection/node_custom_agent_connection.ts index 0684c32e..15d85f61 100644 --- a/packages/client-node/src/connection/node_custom_agent_connection.ts +++ b/packages/client-node/src/connection/node_custom_agent_connection.ts @@ -19,8 +19,8 @@ export class NodeCustomAgentConnection extends NodeBaseConnection { protected createClientRequest(params: RequestParams): Http.ClientRequest { const headers = withCompressionHeaders({ headers: params.headers, - compress_request: params.compress_request, - decompress_response: params.decompress_response, + enable_request_compression: params.enable_request_compression, + enable_response_compression: params.enable_response_compression, }) return Http.request(params.url, { method: params.method, diff --git a/packages/client-node/src/connection/node_http_connection.ts b/packages/client-node/src/connection/node_http_connection.ts index 58c7b5ed..9f1fa85c 100644 --- a/packages/client-node/src/connection/node_http_connection.ts +++ b/packages/client-node/src/connection/node_http_connection.ts @@ -18,8 +18,8 @@ export class NodeHttpConnection extends NodeBaseConnection { protected createClientRequest(params: RequestParams): Http.ClientRequest { const headers = withCompressionHeaders({ headers: params.headers, - compress_request: params.compress_request, - decompress_response: params.decompress_response, + enable_request_compression: params.enable_request_compression, + enable_response_compression: params.enable_response_compression, }) return Http.request(params.url, { method: params.method, diff --git a/packages/client-node/src/connection/node_https_connection.ts b/packages/client-node/src/connection/node_https_connection.ts index 865ad546..0c1b6ee7 100644 --- a/packages/client-node/src/connection/node_https_connection.ts +++ b/packages/client-node/src/connection/node_https_connection.ts @@ -48,8 +48,8 @@ export class NodeHttpsConnection extends NodeBaseConnection { protected createClientRequest(params: RequestParams): Http.ClientRequest { const headers = withCompressionHeaders({ headers: params.headers, - compress_request: params.compress_request, - decompress_response: params.decompress_response, + enable_request_compression: params.enable_request_compression, + enable_response_compression: params.enable_response_compression, }) return Https.request(params.url, { method: params.method, diff --git a/packages/client-node/src/version.ts b/packages/client-node/src/version.ts index e4f53bef..b7c78f16 100644 --- a/packages/client-node/src/version.ts +++ b/packages/client-node/src/version.ts @@ -1 +1 @@ -export default '1.4.1' +export default '1.5.0' diff --git a/packages/client-web/src/connection/web_connection.ts b/packages/client-web/src/connection/web_connection.ts index 4d470029..72c69ea5 100644 --- a/packages/client-web/src/connection/web_connection.ts +++ b/packages/client-web/src/connection/web_connection.ts @@ -179,8 +179,9 @@ export class WebConnection implements Connection { Authorization: `Basic ${btoa(`${params.auth.username}:${params.auth.password}`)}`, } : this.defaultHeaders, - compress_request: false, - decompress_response: this.params.compression.decompress_response, + enable_request_compression: false, + enable_response_compression: + this.params.compression.decompress_response, }) const response = await fetch(url, { body: values, diff --git a/packages/client-web/src/version.ts b/packages/client-web/src/version.ts index e4f53bef..b7c78f16 100644 --- a/packages/client-web/src/version.ts +++ b/packages/client-web/src/version.ts @@ -1 +1 @@ -export default '1.4.1' +export default '1.5.0'