Skip to content

Commit

Permalink
An optional values parameter in the exec method (#290)
Browse files Browse the repository at this point in the history
  • Loading branch information
slvrtrn authored Jul 11, 2024
1 parent 30ce583 commit e78b6f0
Show file tree
Hide file tree
Showing 12 changed files with 159 additions and 16 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
# 1.4.0 (Node.js)

## New features

- (Node.js only) The `exec` method now accepts an optional `values` parameter, which allows you to pass the request body as a `Stream.Readable`. This can be useful in case of custom insert streaming with arbitrary ClickHouse data formats (which might not be explicitly supported and allowed by the client in the `insert` method yet). NB: in this case, you are expected to serialize the data in the stream in the required input format yourself.

# 1.3.0 (Common, Node.js, Web)

## New features
Expand Down
22 changes: 19 additions & 3 deletions packages/client-common/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,22 @@ export type QueryResult<Stream, Format extends DataFormat> =
? BaseResultSet<Stream, unknown>
: BaseResultSet<Stream, Format>

export interface ExecParams extends BaseQueryParams {
/** Statement to execute. */
export type ExecParams = BaseQueryParams & {
/** Statement to execute (including the FORMAT clause). By default, the query will be sent in the request body;
* If {@link ExecParamsWithValues.values} are defined, the query is sent as a request parameter,
* and the values are sent in the request body instead. */
query: string
}
export type ExecParamsWithValues<Stream> = ExecParams & {
/** If you have a custom INSERT statement to run with `exec`,
* the data from this stream will be inserted.
*
* NB: the data in the stream is expected to be serialized accordingly to the FORMAT clause
* used in {@link ExecParams.query} in this case.
*
* @see https://clickhouse.com/docs/en/interfaces/formats */
values: Stream
}

export type CommandParams = ExecParams
export type CommandResult = { query_id: string } & WithClickHouseSummary &
Expand Down Expand Up @@ -214,10 +226,14 @@ export class ClickHouseClient<Stream = unknown> {
* but format clause is not applicable. The caller of this method is expected to consume the stream,
* otherwise, the request will eventually be timed out.
*/
async exec(params: ExecParams): Promise<ExecResult<Stream>> {
async exec(
params: ExecParams | ExecParamsWithValues<Stream>,
): Promise<ExecResult<Stream>> {
const query = removeTrailingSemi(params.query.trim())
const values = 'values' in params ? params.values : undefined
return await this.connection.exec({
query,
values,
...this.withClientQueryParams(params),
})
}
Expand Down
6 changes: 5 additions & 1 deletion packages/client-common/src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ export interface ConnInsertParams<Stream> extends ConnBaseQueryParams {
values: string | Stream
}

export interface ConnExecParams<Stream> extends ConnBaseQueryParams {
values?: Stream
}

export interface ConnBaseResult extends WithResponseHeaders {
query_id: string
}
Expand Down Expand Up @@ -66,6 +70,6 @@ export interface Connection<Stream> {
close(): Promise<void>
query(params: ConnBaseQueryParams): Promise<ConnQueryResult<Stream>>
insert(params: ConnInsertParams<Stream>): Promise<ConnInsertResult>
exec(params: ConnBaseQueryParams): Promise<ConnExecResult<Stream>>
exec(params: ConnExecParams<Stream>): Promise<ConnExecResult<Stream>>
command(params: ConnBaseQueryParams): Promise<ConnCommandResult>
}
1 change: 1 addition & 0 deletions packages/client-common/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ export type {
Connection,
ConnectionParams,
ConnInsertResult,
ConnExecParams,
ConnExecResult,
ConnQueryResult,
ConnBaseQueryParams,
Expand Down
2 changes: 1 addition & 1 deletion packages/client-common/src/version.ts
Original file line number Diff line number Diff line change
@@ -1 +1 @@
export default '1.3.0'
export default '1.4.0'
105 changes: 102 additions & 3 deletions packages/client-node/__tests__/integration/node_exec.test.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import type { ClickHouseClient } from '@clickhouse/client-common'
import { createTestClient } from '@test/utils'
import type Stream from 'stream'
import { guid } from '@test/utils'
import { createSimpleTable } from '@test/fixtures/simple_table'
import Stream from 'stream'
import { getAsText } from '../../src/utils'
import { ResultSet } from '../../src'
import { drainStream, ResultSet } from '../../src'

describe('[Node.js] exec result streaming', () => {
describe('[Node.js] exec', () => {
let client: ClickHouseClient<Stream.Readable>
beforeEach(() => {
client = createTestClient()
Expand Down Expand Up @@ -66,4 +68,101 @@ describe('[Node.js] exec result streaming', () => {
expect(await rs.json()).toEqual([{ number: '0' }])
})
})

describe('custom insert streaming with exec', () => {
let tableName: string
beforeEach(async () => {
tableName = `test_node_exec_insert_stream_${guid()}`
await createSimpleTable(client, tableName)
})

it('should send an insert stream', async () => {
const stream = Stream.Readable.from(['42,foobar,"[1,2]"'], {
objectMode: false,
})
const execResult = await client.exec({
query: `INSERT INTO ${tableName} FORMAT CSV`,
values: stream,
})
// the result stream contains nothing useful for an insert and should be immediately drained to release the socket
await drainStream(execResult.stream)
await checkInsertedValues([
{
id: '42',
name: 'foobar',
sku: [1, 2],
},
])
})

it('should not fail with an empty stream', async () => {
const stream = new Stream.Readable({
read() {
// required
},
objectMode: false,
})
const execPromise = client.exec({
query: `INSERT INTO ${tableName} FORMAT CSV`,
values: stream,
})
// close the empty stream after the request is sent
stream.push(null)
// the result stream contains nothing useful for an insert and should be immediately drained to release the socket
const execResult = await execPromise
await drainStream(execResult.stream)
await checkInsertedValues([])
})

it('should not fail with an already closed stream', async () => {
const stream = new Stream.Readable({
read() {
// required
},
objectMode: false,
})
stream.push('42,foobar,"[1,2]"\n')
// close the stream with some values
stream.push(null)
const execResult = await client.exec({
query: `INSERT INTO ${tableName} FORMAT CSV`,
values: stream,
})
// the result stream contains nothing useful for an insert and should be immediately drained to release the socket
await drainStream(execResult.stream)
await checkInsertedValues([
{
id: '42',
name: 'foobar',
sku: [1, 2],
},
])
})

it('should not fail with an empty and already closed stream', async () => {
const stream = new Stream.Readable({
read() {
// required
},
objectMode: false,
})
// close the empty stream immediately
stream.push(null)
const execResult = await client.exec({
query: `INSERT INTO ${tableName} FORMAT CSV`,
values: stream,
})
// the result stream contains nothing useful for an insert and should be immediately drained to release the socket
await drainStream(execResult.stream)
await checkInsertedValues([])
})

async function checkInsertedValues<T = unknown>(expected: Array<T>) {
const rs = await client.query({
query: `SELECT * FROM ${tableName}`,
format: 'JSONEachRow',
})
expect(await rs.json()).toEqual(expected)
}
})
})
13 changes: 9 additions & 4 deletions packages/client-node/src/connection/node_base_connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import type {
ConnCommandResult,
Connection,
ConnectionParams,
ConnExecParams,
ConnExecResult,
ConnInsertParams,
ConnInsertResult,
Expand Down Expand Up @@ -225,7 +226,7 @@ export abstract class NodeBaseConnection
}

async exec(
params: ConnBaseQueryParams,
params: ConnExecParams<Stream.Readable>,
): Promise<ConnExecResult<Stream.Readable>> {
return this.runExec({
...params,
Expand Down Expand Up @@ -368,20 +369,23 @@ export abstract class NodeBaseConnection
params: RunExecParams,
): Promise<ConnExecResult<Stream.Readable>> {
const query_id = this.getQueryId(params.query_id)
const searchParams = toSearchParams({
const sendQueryInParams = params.values !== undefined
const toSearchParamsOptions = {
query: sendQueryInParams ? params.query : undefined,
database: this.params.database,
clickhouse_settings: params.clickhouse_settings,
query_params: params.query_params,
session_id: params.session_id,
query_id,
})
}
const searchParams = toSearchParams(toSearchParamsOptions)
const { controller, controllerCleanup } = this.getAbortController(params)
try {
const { stream, summary, response_headers } = await this.request(
{
method: 'POST',
url: transformUrl({ url: this.params.url, searchParams }),
body: params.query,
body: sendQueryInParams ? params.values : params.query,
abort_signal: controller.signal,
parse_summary: true,
headers: this.buildRequestHeaders(params),
Expand Down Expand Up @@ -617,4 +621,5 @@ interface SocketInfo {

type RunExecParams = ConnBaseQueryParams & {
op: 'Exec' | 'Command'
values?: ConnExecParams<Stream.Readable>['values']
}
4 changes: 3 additions & 1 deletion packages/client-node/src/connection/stream.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import type Stream from 'stream'

/** See https://github.com/ClickHouse/clickhouse-js/pull/203 */
/** Drains the response stream, as calling `destroy` on a {@link Stream.Readable} response stream
* will result in closing the underlying socket, and negate the KeepAlive feature benefits.
* See https://github.com/ClickHouse/clickhouse-js/pull/203 */
export async function drainStream(stream: Stream.Readable): Promise<void> {
return new Promise((resolve, reject) => {
function dropData() {
Expand Down
1 change: 1 addition & 0 deletions packages/client-node/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ export type {
export { createClient } from './client'
export { type NodeClickHouseClientConfigOptions as ClickHouseClientConfigOptions } from './config'
export { ResultSet, type StreamReadable } from './result_set'
export { drainStream } from './connection/stream'

/** Re-export @clickhouse/client-common types */
export {
Expand Down
2 changes: 1 addition & 1 deletion packages/client-node/src/version.ts
Original file line number Diff line number Diff line change
@@ -1 +1 @@
export default '1.3.0'
export default '1.4.0'
11 changes: 10 additions & 1 deletion packages/client-web/src/client.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import type {
DataFormat,
ExecParams,
ExecResult,
InputJSON,
InputJSONObjectEachRow,
InsertParams,
Expand All @@ -20,7 +22,10 @@ export type QueryResult<Format extends DataFormat> =
? ResultSet<unknown>
: ResultSet<Format>

export type WebClickHouseClient = Omit<WebClickHouseClientImpl, 'insert'> & {
export type WebClickHouseClient = Omit<
WebClickHouseClientImpl,
'insert' | 'exec'
> & {
/** See {@link ClickHouseClient.insert}.
*
* ReadableStream is removed from possible insert values
Expand All @@ -30,6 +35,10 @@ export type WebClickHouseClient = Omit<WebClickHouseClientImpl, 'insert'> & {
values: ReadonlyArray<T> | InputJSON<T> | InputJSONObjectEachRow<T>
},
): Promise<InsertResult>
/** See {@link ClickHouseClient.exec}.
*
* Custom values are currently not supported in the web versions. */
exec(params: ExecParams): Promise<ExecResult<ReadableStream>>
}

class WebClickHouseClientImpl extends ClickHouseClient<ReadableStream> {
Expand Down
2 changes: 1 addition & 1 deletion packages/client-web/src/version.ts
Original file line number Diff line number Diff line change
@@ -1 +1 @@
export default '1.3.0'
export default '1.4.0'

0 comments on commit e78b6f0

Please sign in to comment.