Skip to content

Commit 1c290e4

Browse files
committed
Support streaming in sqlQuery/txSqlQuery
1 parent 125cdcf commit 1c290e4

File tree

11 files changed

+408
-406
lines changed

11 files changed

+408
-406
lines changed

immudb-node-grpcjs/schema.proto

+3-2
Original file line numberDiff line numberDiff line change
@@ -694,6 +694,7 @@ message SQLQueryRequest {
694694
string sql = 1;
695695
repeated NamedParam params = 2;
696696
bool reuseSnapshot = 3;
697+
bool acceptStream = 4;
697698
}
698699

699700
message NamedParam {
@@ -842,7 +843,7 @@ service ImmuService {
842843
rpc Rollback (google.protobuf.Empty) returns (google.protobuf.Empty){};
843844

844845
rpc TxSQLExec(SQLExecRequest) returns (google.protobuf.Empty) {};
845-
rpc TxSQLQuery(SQLQueryRequest) returns (SQLQueryResult) {};
846+
rpc TxSQLQuery(SQLQueryRequest) returns (stream SQLQueryResult) {};
846847

847848
rpc Login (LoginRequest) returns (LoginResponse){
848849
option deprecated = true;
@@ -1153,7 +1154,7 @@ service ImmuService {
11531154
};
11541155
};
11551156

1156-
rpc SQLQuery(SQLQueryRequest) returns (SQLQueryResult) {
1157+
rpc SQLQuery(SQLQueryRequest) returns (stream SQLQueryResult) {
11571158
option (google.api.http) = {
11581159
post: "/db/sqlquery"
11591160
body: "*"

immudb-node-grpcjs/src/immudb/schema/ImmuService.ts

+10-18
Original file line numberDiff line numberDiff line change
@@ -402,14 +402,10 @@ export interface ImmuServiceClient extends grpc.Client {
402402
sqlExec(argument: _immudb_schema_SQLExecRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_immudb_schema_SQLExecResult__Output>): grpc.ClientUnaryCall;
403403
sqlExec(argument: _immudb_schema_SQLExecRequest, callback: grpc.requestCallback<_immudb_schema_SQLExecResult__Output>): grpc.ClientUnaryCall;
404404

405-
SQLQuery(argument: _immudb_schema_SQLQueryRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_immudb_schema_SQLQueryResult__Output>): grpc.ClientUnaryCall;
406-
SQLQuery(argument: _immudb_schema_SQLQueryRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_immudb_schema_SQLQueryResult__Output>): grpc.ClientUnaryCall;
407-
SQLQuery(argument: _immudb_schema_SQLQueryRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_immudb_schema_SQLQueryResult__Output>): grpc.ClientUnaryCall;
408-
SQLQuery(argument: _immudb_schema_SQLQueryRequest, callback: grpc.requestCallback<_immudb_schema_SQLQueryResult__Output>): grpc.ClientUnaryCall;
409-
sqlQuery(argument: _immudb_schema_SQLQueryRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_immudb_schema_SQLQueryResult__Output>): grpc.ClientUnaryCall;
410-
sqlQuery(argument: _immudb_schema_SQLQueryRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_immudb_schema_SQLQueryResult__Output>): grpc.ClientUnaryCall;
411-
sqlQuery(argument: _immudb_schema_SQLQueryRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_immudb_schema_SQLQueryResult__Output>): grpc.ClientUnaryCall;
412-
sqlQuery(argument: _immudb_schema_SQLQueryRequest, callback: grpc.requestCallback<_immudb_schema_SQLQueryResult__Output>): grpc.ClientUnaryCall;
405+
SQLQuery(argument: _immudb_schema_SQLQueryRequest, metadata: grpc.Metadata, options?: grpc.CallOptions): grpc.ClientReadableStream<_immudb_schema_SQLQueryResult__Output>;
406+
SQLQuery(argument: _immudb_schema_SQLQueryRequest, options?: grpc.CallOptions): grpc.ClientReadableStream<_immudb_schema_SQLQueryResult__Output>;
407+
sqlQuery(argument: _immudb_schema_SQLQueryRequest, metadata: grpc.Metadata, options?: grpc.CallOptions): grpc.ClientReadableStream<_immudb_schema_SQLQueryResult__Output>;
408+
sqlQuery(argument: _immudb_schema_SQLQueryRequest, options?: grpc.CallOptions): grpc.ClientReadableStream<_immudb_schema_SQLQueryResult__Output>;
413409

414410
Scan(argument: _immudb_schema_ScanRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_immudb_schema_Entries__Output>): grpc.ClientUnaryCall;
415411
Scan(argument: _immudb_schema_ScanRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_immudb_schema_Entries__Output>): grpc.ClientUnaryCall;
@@ -465,14 +461,10 @@ export interface ImmuServiceClient extends grpc.Client {
465461
txSqlExec(argument: _immudb_schema_SQLExecRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_google_protobuf_Empty__Output>): grpc.ClientUnaryCall;
466462
txSqlExec(argument: _immudb_schema_SQLExecRequest, callback: grpc.requestCallback<_google_protobuf_Empty__Output>): grpc.ClientUnaryCall;
467463

468-
TxSQLQuery(argument: _immudb_schema_SQLQueryRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_immudb_schema_SQLQueryResult__Output>): grpc.ClientUnaryCall;
469-
TxSQLQuery(argument: _immudb_schema_SQLQueryRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_immudb_schema_SQLQueryResult__Output>): grpc.ClientUnaryCall;
470-
TxSQLQuery(argument: _immudb_schema_SQLQueryRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_immudb_schema_SQLQueryResult__Output>): grpc.ClientUnaryCall;
471-
TxSQLQuery(argument: _immudb_schema_SQLQueryRequest, callback: grpc.requestCallback<_immudb_schema_SQLQueryResult__Output>): grpc.ClientUnaryCall;
472-
txSqlQuery(argument: _immudb_schema_SQLQueryRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_immudb_schema_SQLQueryResult__Output>): grpc.ClientUnaryCall;
473-
txSqlQuery(argument: _immudb_schema_SQLQueryRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_immudb_schema_SQLQueryResult__Output>): grpc.ClientUnaryCall;
474-
txSqlQuery(argument: _immudb_schema_SQLQueryRequest, options: grpc.CallOptions, callback: grpc.requestCallback<_immudb_schema_SQLQueryResult__Output>): grpc.ClientUnaryCall;
475-
txSqlQuery(argument: _immudb_schema_SQLQueryRequest, callback: grpc.requestCallback<_immudb_schema_SQLQueryResult__Output>): grpc.ClientUnaryCall;
464+
TxSQLQuery(argument: _immudb_schema_SQLQueryRequest, metadata: grpc.Metadata, options?: grpc.CallOptions): grpc.ClientReadableStream<_immudb_schema_SQLQueryResult__Output>;
465+
TxSQLQuery(argument: _immudb_schema_SQLQueryRequest, options?: grpc.CallOptions): grpc.ClientReadableStream<_immudb_schema_SQLQueryResult__Output>;
466+
txSqlQuery(argument: _immudb_schema_SQLQueryRequest, metadata: grpc.Metadata, options?: grpc.CallOptions): grpc.ClientReadableStream<_immudb_schema_SQLQueryResult__Output>;
467+
txSqlQuery(argument: _immudb_schema_SQLQueryRequest, options?: grpc.CallOptions): grpc.ClientReadableStream<_immudb_schema_SQLQueryResult__Output>;
476468

477469
TxScan(argument: _immudb_schema_TxScanRequest, metadata: grpc.Metadata, options: grpc.CallOptions, callback: grpc.requestCallback<_immudb_schema_TxList__Output>): grpc.ClientUnaryCall;
478470
TxScan(argument: _immudb_schema_TxScanRequest, metadata: grpc.Metadata, callback: grpc.requestCallback<_immudb_schema_TxList__Output>): grpc.ClientUnaryCall;
@@ -750,7 +742,7 @@ export interface ImmuServiceHandlers extends grpc.UntypedServiceImplementation {
750742

751743
SQLExec: grpc.handleUnaryCall<_immudb_schema_SQLExecRequest__Output, _immudb_schema_SQLExecResult>;
752744

753-
SQLQuery: grpc.handleUnaryCall<_immudb_schema_SQLQueryRequest__Output, _immudb_schema_SQLQueryResult>;
745+
SQLQuery: grpc.handleServerStreamingCall<_immudb_schema_SQLQueryRequest__Output, _immudb_schema_SQLQueryResult>;
754746

755747
Scan: grpc.handleUnaryCall<_immudb_schema_ScanRequest__Output, _immudb_schema_Entries>;
756748

@@ -764,7 +756,7 @@ export interface ImmuServiceHandlers extends grpc.UntypedServiceImplementation {
764756

765757
TxSQLExec: grpc.handleUnaryCall<_immudb_schema_SQLExecRequest__Output, _google_protobuf_Empty>;
766758

767-
TxSQLQuery: grpc.handleUnaryCall<_immudb_schema_SQLQueryRequest__Output, _immudb_schema_SQLQueryResult>;
759+
TxSQLQuery: grpc.handleServerStreamingCall<_immudb_schema_SQLQueryRequest__Output, _immudb_schema_SQLQueryResult>;
768760

769761
TxScan: grpc.handleUnaryCall<_immudb_schema_TxScanRequest__Output, _immudb_schema_TxList>;
770762

immudb-node-grpcjs/src/immudb/schema/SQLQueryRequest.ts

+2
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@ export interface SQLQueryRequest {
66
'sql'?: (string);
77
'params'?: (_immudb_schema_NamedParam)[];
88
'reuseSnapshot'?: (boolean);
9+
'acceptStream'?: (boolean);
910
}
1011

1112
export interface SQLQueryRequest__Output {
1213
'sql': (string);
1314
'params': (_immudb_schema_NamedParam__Output)[];
1415
'reuseSnapshot': (boolean);
16+
'acceptStream': (boolean);
1517
}

immudb-node-showcase/src/overview-showcase.ts

+60-51
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,28 @@
11
import Long from 'long'
22
import {
3-
Client,
4-
verifyVerification,
5-
types,
6-
stream,
3+
Client,
4+
verifyVerification,
5+
types,
6+
stream,
77
} from '@codenotary/immudb-node'
88

99

1010

1111

1212

1313
overviewSchowcase()
14-
.catch(console.error)
14+
.catch(console.error)
1515

1616

1717
async function overviewSchowcase() {
1818

19-
19+
2020
const client = new Client({
21-
host: '127.0.0.1',
22-
port: 3322,
23-
user: 'immudb',
24-
password: 'immudb',
25-
database: 'defaultdb',
21+
host: '127.0.0.1',
22+
port: 3322,
23+
user: 'immudb',
24+
password: 'immudb',
25+
database: 'defaultdb',
2626
})
2727

2828

@@ -35,8 +35,8 @@ async function overviewSchowcase() {
3535

3636
const valEntries2 = await client.setValEntries({
3737
kvms: [
38-
{key: Buffer.of(0), val: Buffer.of(0)},
39-
{key: Buffer.of(1), val: Buffer.of(1)},
38+
{ key: Buffer.of(0), val: Buffer.of(0) },
39+
{ key: Buffer.of(1), val: Buffer.of(1) },
4040
]
4141
})
4242
console.log('valEntries2:')
@@ -45,47 +45,47 @@ async function overviewSchowcase() {
4545

4646
const valEntry3 = await client.setValEntries({
4747
kvms: [
48-
{key: Buffer.of(2), val: Buffer.of(2)},
48+
{ key: Buffer.of(2), val: Buffer.of(2) },
4949
]
5050
})
5151
console.log('valEntry3:')
5252
console.log(valEntry3)
5353

5454

5555
const refEntry4 = await client.setRefEntry({
56-
key: Buffer.of(3),
56+
key: Buffer.of(3),
5757
referToKey: valEntries2.valEntries[0].key,
58-
keyTxId: valEntries2.valEntries[0].id,
59-
boundRef: true,
58+
keyTxId: valEntries2.valEntries[0].id,
59+
boundRef: true,
6060
})
6161
console.log('refEntry4:')
6262
console.log(refEntry4)
6363

6464

6565
const zSetEntry5 = await client.setZSetEntry({
66-
zSet: Buffer.of(4),
67-
referredKey: valEntry3.valEntries[0].key,
68-
referredKeyScore: 3,
66+
zSet: Buffer.of(4),
67+
referredKey: valEntry3.valEntries[0].key,
68+
referredKeyScore: 3,
6969
})
7070
console.log('zSetEntry5:')
7171
console.log(zSetEntry5)
7272

7373
const entries6 = await client.setValRefZSetEntries({
7474
ops: [
7575
{
76-
type: 'val',
77-
key: Buffer.of(2),
78-
val: Buffer.of(6),
76+
type: 'val',
77+
key: Buffer.of(2),
78+
val: Buffer.of(6),
7979
},
8080
{
81-
type: 'ref',
82-
key: Buffer.of(3),
81+
type: 'ref',
82+
key: Buffer.of(3),
8383
referToKey: valEntry3.valEntries[0].key
8484
},
8585
{
86-
type: 'zSet',
86+
type: 'zSet',
8787
referredKey: valEntries2.valEntries[1].key,
88-
zSet: zSetEntry5.zSetTxEntry.zSet,
88+
zSet: zSetEntry5.zSetTxEntry.zSet,
8989
referredKeyScore: 9,
9090
}
9191
]
@@ -126,7 +126,8 @@ async function overviewSchowcase() {
126126
console.log(stream.toKVEntries(Buffer.concat(buffs)))
127127

128128

129-
const sqlExecCreateTable7 = await client.sqlExec({sql: `
129+
const sqlExecCreateTable7 = await client.sqlExec({
130+
sql: `
130131
create table if not exists testtable (
131132
id1 integer not null,
132133
id2 varchar[3] null,
@@ -140,7 +141,8 @@ async function overviewSchowcase() {
140141
console.log(sqlExecCreateTable7)
141142

142143

143-
const sqlExecUpsert8 = await client.sqlExec({sql: `
144+
const sqlExecUpsert8 = await client.sqlExec({
145+
sql: `
144146
upsert into testtable
145147
(id1, id2, created, data, isactive)
146148
values
@@ -153,15 +155,18 @@ async function overviewSchowcase() {
153155

154156

155157
const sqlTxAt8 = await client.executeSqlTx('ReadWrite', async (txApi) => {
156-
const sqlQueryInTxAt8 = await txApi.query({sql: `
158+
const sqlQueryInTxAt8 = await txApi.query({
159+
sql: `
157160
select * from testtable;
158161
`})
159162
console.log('sqlQueryInTxAt8')
160-
console.log(sqlQueryInTxAt8)
161-
163+
for await (const row of sqlQueryInTxAt8) {
164+
console.log(row)
165+
}
162166

163167
// sqlExecUpsert9
164-
const sqlExecUpsertInTx9 = txApi.exec({sql:`
168+
const sqlExecUpsertInTx9 = txApi.exec({
169+
sql: `
165170
upsert into testtable
166171
(id1, id2, created, data, isactive)
167172
values
@@ -170,11 +175,14 @@ async function overviewSchowcase() {
170175
`})
171176

172177

173-
const sqlQueryInTxAt9 = await txApi.query({sql: `
178+
const sqlQueryInTxAt9 = await txApi.query({
179+
sql: `
174180
select * from testtable;
175181
`})
176182
console.log('sqlQueryInTxAt9')
177-
console.log(sqlQueryInTxAt9)
183+
for await (const row of sqlQueryInTxAt9) {
184+
console.log(row)
185+
}
178186

179187

180188
throw 'I would like to cancel'
@@ -184,18 +192,19 @@ async function overviewSchowcase() {
184192
console.log(sqlTxAt8)
185193

186194

187-
const sqlQueryAt8 = await client.sqlQuery({sql: `
195+
const sqlQueryAt8 = await client.sqlQuery({
196+
sql: `
188197
select * from testtable;
189198
`})
190199
console.log('sqlQueryInTxAt8')
191200
console.log(sqlQueryAt8)
192-
const k = sqlQueryAt8[0]
193-
const d = k[0]
194-
195-
201+
202+
const row = await sqlQueryAt8.next()
203+
console.log(row)
204+
196205

197206
const dbScanAt8 = await client.scanDbEntries({
198-
scanStartAtTxId: Long.fromValue(1, true),
207+
scanStartAtTxId: Long.fromValue(1, true),
199208
})
200209
console.log('dbScanAt8')
201210
console.log(dbScanAt8)
@@ -208,12 +217,12 @@ async function overviewSchowcase() {
208217

209218

210219
const setAndProof9 = await client.setValEntriesGetVerification({
211-
kvms: [{key: Buffer.from('yo'), val: Buffer.from('man')}],
220+
kvms: [{ key: Buffer.from('yo'), val: Buffer.from('man') }],
212221
refTxId: stateAt8.txId,
213222
refHash: stateAt8.txHash,
214223
})
215224
console.log('setAndProof9')
216-
console.dir(setAndProof9, {depth: 10})
225+
console.dir(setAndProof9, { depth: 10 })
217226

218227
console.log('verifyVerification(setAndProof9) result:')
219228
console.log(verifyVerification(setAndProof9.verification))
@@ -232,7 +241,7 @@ async function overviewSchowcase() {
232241
refTxId: stateAt9.txId,
233242
})
234243
console.log('getTx2AndVerification')
235-
console.log(getTx2AndVerification, {depth: 10})
244+
console.log(getTx2AndVerification, { depth: 10 })
236245
console.log('verifyVerification(getTx2AndVerification) result:')
237246
console.log(verifyVerification(getTx2AndVerification.verification))
238247

@@ -241,40 +250,40 @@ async function overviewSchowcase() {
241250

242251
// entries6
243252
const getTx6AndVerification = await client.getTxAndVerification({
244-
txId: entries6.tx.id,
253+
txId: entries6.tx.id,
245254
refHash: stateAt9.txHash,
246255
refTxId: stateAt9.txId,
247256
})
248257
console.log('getTx6AndVerification')
249-
console.log(getTx6AndVerification, {depth: 10})
258+
console.log(getTx6AndVerification, { depth: 10 })
250259
console.log('verifyVerification(getTx6AndVerification) result:')
251260
console.log(verifyVerification(getTx6AndVerification.verification))
252261

253262

254263

255264
// sqlExecCreateTable7.subTxes[0].tx?.id
256265
const getTx7AndVerification = await client.getTxAndVerification({
257-
txId: Long.fromInt(7, true),
266+
txId: Long.fromInt(7, true),
258267
refHash: stateAt9.txHash,
259268
refTxId: stateAt9.txId,
260269
})
261270
console.log('getTx7AndVerification')
262-
console.log(getTx7AndVerification, {depth: 10})
271+
console.log(getTx7AndVerification, { depth: 10 })
263272
console.log('verifyVerification(getTx7AndVerification) result:')
264273
console.log(verifyVerification(getTx7AndVerification.verification))
265274

266275

267276
// sqlExecUpsert8
268277
const getTx8AndVerification = await client.getTxAndVerification({
269-
txId: Long.fromInt(8, true),
278+
txId: Long.fromInt(8, true),
270279
refHash: stateAt9.txHash,
271280
refTxId: stateAt9.txId,
272281
})
273282
console.log('getTx8AndVerification')
274-
console.log(getTx8AndVerification, {depth: 10})
283+
console.log(getTx8AndVerification, { depth: 10 })
275284
console.log('verifyVerification(getTx8AndVerification) result:')
276285
console.log(verifyVerification(getTx8AndVerification.verification))
277-
286+
278287

279288

280289

0 commit comments

Comments
 (0)