Skip to content

Commit ef0e1bd

Browse files
committed
Enable gzip, zstd compression for sync streams.
1 parent c7ded62 commit ef0e1bd

File tree

2 files changed

+63
-7
lines changed

2 files changed

+63
-7
lines changed
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
import { Readable } from 'node:stream';
2+
import type Negotiator from 'negotiator';
3+
import * as zlib from 'node:zlib';
4+
5+
/**
6+
* Compress a streamed response.
7+
*
8+
* `@fastify/compress` can do something similar, but does not appear to work as well on streamed responses.
9+
* The manual implementation is simple enough, and gives us more control over the low-level details.
10+
*
11+
* @param negotiator Negotiator from the request, to negotiate response encoding
12+
* @param stream plain-text stream
13+
* @returns
14+
*/
15+
export function maybeCompressResponseStream(
16+
negotiator: Negotiator,
17+
stream: Readable
18+
): { stream: Readable; encodingHeaders: Record<string, string> } {
19+
const encoding = (negotiator as any).encoding(['identity', 'gzip', 'zstd'], { preferred: 'identity' });
20+
if (encoding == 'zstd') {
21+
return {
22+
stream: stream.pipe(
23+
// Available since Node v23.8.0, v22.15.0
24+
// This does the actual compression in a background thread pool.
25+
zlib.createZstdCompress({
26+
// We need to flush the frame after every new input chunk, to avoid delaying data
27+
// in the output stream.
28+
flush: zlib.constants.ZSTD_e_flush,
29+
params: {
30+
// Default compression level is 3. We reduce this slightly to limit CPU overhead
31+
[zlib.constants.ZSTD_c_compressionLevel]: 2
32+
}
33+
})
34+
),
35+
encodingHeaders: { 'Content-Encoding': 'zstd' }
36+
};
37+
} else if (encoding == 'gzip') {
38+
return {
39+
stream: stream.pipe(
40+
zlib.createGzip({
41+
// We need to flush the frame after every new input chunk, to avoid delaying data
42+
// in the output stream.
43+
flush: zlib.constants.Z_SYNC_FLUSH
44+
})
45+
),
46+
encodingHeaders: { 'Content-Encoding': 'gzip' }
47+
};
48+
} else {
49+
return {
50+
stream: stream,
51+
encodingHeaders: {}
52+
};
53+
}
54+
}

packages/service-core/src/routes/endpoints/sync-stream.ts

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
1-
import { ErrorCode, errors, logger, router, schema } from '@powersync/lib-services-framework';
2-
import { RequestParameters } from '@powersync/service-sync-rules';
3-
import { Readable } from 'stream';
1+
import { ErrorCode, errors, router, schema } from '@powersync/lib-services-framework';
42
import Negotiator from 'negotiator';
3+
import { Readable } from 'stream';
54

65
import * as sync from '../../sync/sync-index.js';
76
import * as util from '../../util/util-index.js';
@@ -10,6 +9,7 @@ import { authUser } from '../auth.js';
109
import { routeDefinition } from '../router.js';
1110

1211
import { APIMetric } from '@powersync/service-types';
12+
import { maybeCompressResponseStream } from '../compression.js';
1313

1414
export enum SyncRoutes {
1515
STREAM = '/sync/stream'
@@ -31,10 +31,10 @@ export const syncStreamed = routeDefinition({
3131
const userAgent = headers['x-user-agent'] ?? headers['user-agent'];
3232
const clientId = payload.params.client_id;
3333
const streamStart = Date.now();
34+
const negotiator = new Negotiator(payload.request);
3435
// This falls back to JSON unless there's preference for the bson-stream in the Accept header.
3536
const useBson =
36-
payload.request.headers.accept &&
37-
new Negotiator(payload.request).mediaType(supportedContentTypes) == concatenatedBsonContentType;
37+
payload.request.headers.accept && negotiator.mediaType(supportedContentTypes) == concatenatedBsonContentType;
3838

3939
logger.defaultMeta = {
4040
...logger.defaultMeta,
@@ -80,10 +80,11 @@ export const syncStreamed = routeDefinition({
8080
});
8181

8282
const byteContents = useBson ? sync.bsonLines(syncLines) : sync.ndjson(syncLines);
83-
const stream = Readable.from(sync.transformToBytesTracked(byteContents, tracker), {
83+
const plainStream = Readable.from(sync.transformToBytesTracked(byteContents, tracker), {
8484
objectMode: false,
8585
highWaterMark: 16 * 1024
8686
});
87+
const { stream, encodingHeaders } = maybeCompressResponseStream(negotiator, plainStream);
8788

8889
// Best effort guess on why the stream was closed.
8990
// We use the `??=` operator everywhere, so that we catch the first relevant
@@ -118,7 +119,8 @@ export const syncStreamed = routeDefinition({
118119
return new router.RouterResponse({
119120
status: 200,
120121
headers: {
121-
'Content-Type': useBson ? concatenatedBsonContentType : ndJsonContentType
122+
'Content-Type': useBson ? concatenatedBsonContentType : ndJsonContentType,
123+
...encodingHeaders
122124
},
123125
data: stream,
124126
afterSend: async (details) => {

0 commit comments

Comments
 (0)