Skip to content

Commit

Permalink
Fix main branch tests (#297)
Browse files Browse the repository at this point in the history
  • Loading branch information
slvrtrn authored Aug 7, 2024
1 parent 2199122 commit 2988c50
Show file tree
Hide file tree
Showing 9 changed files with 82 additions and 48 deletions.
16 changes: 0 additions & 16 deletions packages/client-common/__tests__/integration/abort_request.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,6 @@ describe('abort request', () => {
})

describe('select', () => {
it('cancels a select query before it is sent', async () => {
const controller = new AbortController()
const selectPromise = client.query({
query: 'SELECT sleep(3)',
format: 'CSV',
abort_signal: controller.signal,
})
controller.abort()

await expectAsync(selectPromise).toBeRejectedWith(
jasmine.objectContaining({
message: jasmine.stringMatching('The user aborted a request'),
}),
)
})

it('cancels a select query after it is sent', async () => {
const controller = new AbortController()
const selectPromise = client.query({
Expand Down
8 changes: 6 additions & 2 deletions packages/client-common/src/utils/sleep.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
export async function sleep(ms: number) {
await new Promise((resolve) => setTimeout(resolve, ms))
export async function sleep(ms: number): Promise<void> {
await new Promise((resolve) =>
setTimeout(() => {
resolve(void 0)
}, ms),
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { createTestClient, guid } from '@test/utils'
import type Stream from 'stream'
import { makeObjectStream } from '../utils/stream'

describe('[Node.js] abort request streaming', () => {
describe('[Node.js] abort request', () => {
let client: ClickHouseClient<Stream.Readable>

beforeEach(() => {
Expand All @@ -16,6 +16,23 @@ describe('[Node.js] abort request streaming', () => {
await client.close()
})

it('cancels a select query before it is sent', async () => {
const controller = new AbortController()
const selectPromise = client.query({
query: 'SELECT sleep(3)',
format: 'CSV',
abort_signal: controller.signal,
})
controller.abort()

await expectAsync(selectPromise).toBeRejectedWith(
jasmine.objectContaining({
// this happens even before we instantiate the request and its listeners, so that is just a plain AbortError
name: 'AbortError',
}),
)
})

it('cancels a select query while reading response', async () => {
const controller = new AbortController()
const selectPromise = client
Expand Down Expand Up @@ -136,7 +153,8 @@ describe('[Node.js] abort request streaming', () => {

await expectAsync(insertPromise).toBeRejectedWith(
jasmine.objectContaining({
message: jasmine.stringMatching('The user aborted a request'),
// this happens even before we instantiate the request and its listeners, so that is just a plain AbortError
name: 'AbortError',
}),
)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ describe('[Node.js] Client', () => {
const selectPromise = client.query({
query: 'SELECT * FROM system.numbers LIMIT 5',
})
emitResponseBody(clientRequest, 'hi')
await emitResponseBody(clientRequest, 'hi')
await selectPromise
}

Expand Down
2 changes: 1 addition & 1 deletion packages/client-node/__tests__/unit/node_client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import * as c from '../../src/connection/create_connection'

describe('[Node.js] createClient', () => {
it('throws on incorrect "url" config value', () => {
expect(() => createClient({ url: 'foo' })).toThrow(
expect(() => createClient({ url: 'foobar' })).toThrow(
jasmine.objectContaining({
message: jasmine.stringContaining('ClickHouse URL is malformed.'),
}),
Expand Down
24 changes: 12 additions & 12 deletions packages/client-node/__tests__/unit/node_connection.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ describe('[Node.js] Connection', () => {
query: 'SELECT * FROM system.numbers LIMIT 5',
})
const responseBody1 = 'foobar'
emitResponseBody(request1, responseBody1)
await emitResponseBody(request1, responseBody1)
const queryResult1 = await selectPromise1

const request2 = stubClientRequest()
Expand All @@ -62,7 +62,7 @@ describe('[Node.js] Connection', () => {
query: 'SELECT * FROM system.numbers LIMIT 5',
})
const responseBody2 = 'qaz'
emitResponseBody(request2, responseBody2)
await emitResponseBody(request2, responseBody2)
const queryResult2 = await selectPromise2

await assertConnQueryResult(queryResult1, responseBody1)
Expand Down Expand Up @@ -93,7 +93,7 @@ describe('[Node.js] Connection', () => {
query_id,
})
const responseBody = 'foobar'
emitResponseBody(request, responseBody)
await emitResponseBody(request, responseBody)
const { stream } = await selectPromise
expect(await getAsText(stream)).toBe(responseBody)

Expand All @@ -119,7 +119,7 @@ describe('[Node.js] Connection', () => {
query: 'SELECT * FROM system.numbers LIMIT 5',
})
const responseBody1 = 'foobar'
emitResponseBody(request1, responseBody1)
await emitResponseBody(request1, responseBody1)
const queryResult1 = await execPromise1

const request2 = stubClientRequest()
Expand All @@ -129,7 +129,7 @@ describe('[Node.js] Connection', () => {
query: 'SELECT * FROM system.numbers LIMIT 5',
})
const responseBody2 = 'qaz'
emitResponseBody(request2, responseBody2)
await emitResponseBody(request2, responseBody2)
const queryResult2 = await execPromise2

await assertConnQueryResult(queryResult1, responseBody1)
Expand Down Expand Up @@ -162,7 +162,7 @@ describe('[Node.js] Connection', () => {
query_id,
})
const responseBody = 'foobar'
emitResponseBody(request, responseBody)
await emitResponseBody(request, responseBody)
const { stream } = await execPromise
expect(await getAsText(stream)).toBe(responseBody)

Expand All @@ -187,7 +187,7 @@ describe('[Node.js] Connection', () => {
const cmdPromise = adapter.command({
query: 'SELECT * FROM system.numbers LIMIT 5',
})
emitResponseBody(request1, 'Ok.')
await emitResponseBody(request1, 'Ok.')
const { query_id } = await cmdPromise

const request2 = stubClientRequest()
Expand All @@ -196,7 +196,7 @@ describe('[Node.js] Connection', () => {
const cmdPromise2 = adapter.command({
query: 'SELECT * FROM system.numbers LIMIT 5',
})
emitResponseBody(request2, 'Ok.')
await emitResponseBody(request2, 'Ok.')
const { query_id: query_id2 } = await cmdPromise2

expect(query_id).not.toEqual(query_id2)
Expand All @@ -223,7 +223,7 @@ describe('[Node.js] Connection', () => {
query: 'SELECT * FROM system.numbers LIMIT 5',
query_id,
})
emitResponseBody(request, 'Ok.')
await emitResponseBody(request, 'Ok.')
const { query_id: result_query_id } = await cmdPromise

expect(httpRequestStub).toHaveBeenCalledTimes(1)
Expand All @@ -250,7 +250,7 @@ describe('[Node.js] Connection', () => {
values: 'foobar',
})
const responseBody1 = 'foobar'
emitResponseBody(request1, responseBody1)
await emitResponseBody(request1, responseBody1)
const { query_id: queryId1 } = await insertPromise1

const request2 = stubClientRequest()
Expand All @@ -261,7 +261,7 @@ describe('[Node.js] Connection', () => {
values: 'foobar',
})
const responseBody2 = 'qaz'
emitResponseBody(request2, responseBody2)
await emitResponseBody(request2, responseBody2)
const { query_id: queryId2 } = await insertPromise2

assertQueryId(queryId1)
Expand Down Expand Up @@ -293,7 +293,7 @@ describe('[Node.js] Connection', () => {
query_id,
})
const responseBody = 'foobar'
emitResponseBody(request, responseBody)
await emitResponseBody(request, responseBody)
await insertPromise

const [url] = httpRequestStub.calls.mostRecent().args
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,15 @@ import {
} from '../utils/http_stubs'

describe('Node.js Connection compression', () => {
let httpRequestStub: jasmine.Spy<typeof Http.request>
beforeEach(() => {
httpRequestStub = spyOn(Http, 'request')
})

describe('response decompression', () => {
it('hints ClickHouse server to send a gzip compressed response if compress_request: true', async () => {
const request = stubClientRequest()
const httpRequestStub = spyOn(Http, 'request').and.returnValue(request)
httpRequestStub.and.returnValue(request)

const adapter = buildHttpConnection({
compression: {
Expand All @@ -41,7 +46,8 @@ describe('Node.js Connection compression', () => {

it('does not send a compression algorithm hint if compress_request: false', async () => {
const request = stubClientRequest()
const httpRequestStub = spyOn(Http, 'request').and.returnValue(request)
httpRequestStub.and.returnValue(request)

const adapter = buildHttpConnection({
compression: {
decompress_response: false,
Expand All @@ -54,7 +60,7 @@ describe('Node.js Connection compression', () => {
})

const responseBody = 'foobar'
emitResponseBody(request, responseBody)
await emitResponseBody(request, responseBody)

const queryResult = await selectPromise
await assertConnQueryResult(queryResult, responseBody)
Expand All @@ -66,7 +72,8 @@ describe('Node.js Connection compression', () => {

it('uses request-specific settings over config settings', async () => {
const request = stubClientRequest()
const httpRequestStub = spyOn(Http, 'request').and.returnValue(request)
httpRequestStub.and.returnValue(request)

const adapter = buildHttpConnection({
compression: {
decompress_response: false,
Expand Down Expand Up @@ -94,7 +101,8 @@ describe('Node.js Connection compression', () => {

it('decompresses a gzip response', async () => {
const request = stubClientRequest()
spyOn(Http, 'request').and.returnValue(request)
httpRequestStub.and.returnValue(request)

const adapter = buildHttpConnection({
compression: {
decompress_response: true,
Expand All @@ -115,7 +123,7 @@ describe('Node.js Connection compression', () => {

it('throws on an unexpected encoding', async () => {
const request = stubClientRequest()
spyOn(Http, 'request').and.returnValue(request)
httpRequestStub.and.returnValue(request)
const adapter = buildHttpConnection({
compression: {
decompress_response: true,
Expand All @@ -138,7 +146,7 @@ describe('Node.js Connection compression', () => {

it('provides decompression error to a stream consumer', async () => {
const request = stubClientRequest()
spyOn(Http, 'request').and.returnValue(request)
httpRequestStub.and.returnValue(request)
const adapter = buildHttpConnection({
compression: {
decompress_response: true,
Expand All @@ -151,6 +159,7 @@ describe('Node.js Connection compression', () => {
})

// No GZIP encoding for the body here
await sleep(0)
request.emit(
'response',
buildIncomingMessage({
Expand Down Expand Up @@ -196,23 +205,23 @@ describe('Node.js Connection compression', () => {
next()
},
final() {
Zlib.unzip(chunks, (err, result) => {
Zlib.unzip(chunks, (_err, result) => {
finalResult = result
})
},
}) as ClientRequest

const httpRequestStub = spyOn(Http, 'request').and.returnValue(request)
httpRequestStub.and.returnValue(request)

void adapter.insert({
query: 'INSERT INTO insert_compression_table',
values,
})

// trigger stream pipeline
await sleep(0)
request.emit('socket', socketStub)

await sleep(100)

expect(finalResult!.toString('utf8')).toEqual(values)
expect(httpRequestStub).toHaveBeenCalledTimes(1)
const calledWith = httpRequestStub.calls.mostRecent().args[1]
Expand Down
6 changes: 4 additions & 2 deletions packages/client-node/__tests__/utils/http_stubs.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { LogWriter } from '@clickhouse/client-common'
import { TestLogger } from '@test/utils'
import { sleep, TestLogger } from '@test/utils'
import { randomUUID } from '@test/utils/guid'
import type Http from 'http'
import type { ClientRequest } from 'http'
Expand Down Expand Up @@ -63,10 +63,11 @@ export function stubClientRequest(): ClientRequest {
return request
}

export function emitResponseBody(
export async function emitResponseBody(
request: Http.ClientRequest,
body: string | Buffer | undefined,
) {
await sleep(0)
request.emit(
'response',
buildIncomingMessage({
Expand All @@ -80,6 +81,7 @@ export async function emitCompressedBody(
body: string | Buffer,
encoding = 'gzip',
) {
await sleep(0)
const compressedBody = await gzip(body)
request.emit(
'response',
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { ClickHouseClient, Row } from '@clickhouse/client-common'
import { createTestClient } from '@test/utils'

describe('[Web] abort request streaming', () => {
describe('[Web] abort request', () => {
let client: ClickHouseClient<ReadableStream>

beforeEach(() => {
Expand All @@ -12,6 +12,23 @@ describe('[Web] abort request streaming', () => {
await client.close()
})

// a slightly different assertion vs the same Node.js test
it('cancels a select query before it is sent', async () => {
const controller = new AbortController()
const selectPromise = client.query({
query: 'SELECT sleep(3)',
format: 'CSV',
abort_signal: controller.signal,
})
controller.abort()

await expectAsync(selectPromise).toBeRejectedWith(
jasmine.objectContaining({
message: jasmine.stringMatching('The user aborted a request'),
}),
)
})

it('cancels a select query while reading response', async () => {
const controller = new AbortController()
const selectPromise = client
Expand Down

0 comments on commit 2988c50

Please sign in to comment.