-
Notifications
You must be signed in to change notification settings - Fork 39
/
example.js
128 lines (109 loc) · 4.27 KB
/
example.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
const IngestClient = require("azure-kusto-ingest").IngestClient;
const IngestStatusQueues = require("azure-kusto-ingest").IngestStatusQueues;
const IngestionProps = require("azure-kusto-ingest").IngestionProperties;
const KustoConnectionStringBuilder = require("azure-kusto-data").KustoConnectionStringBuilder;
const { DataFormat, JsonColumnMapping, IngestionMappingKind, CompressionType, ReportLevel, ReportMethod } = require("azure-kusto-ingest");
const { BlobDescriptor, StreamDescriptor } = require("azure-kusto-ingest").IngestionDescriptors;
const StreamingIngestClient = require("azure-kusto-ingest").StreamingIngestClient;
const fs = require("fs");
const clusterName = null;
const appId = null;
const appKey = null;
const authorityId = null;
const props = new IngestionProps({
database: "Database",
table: "Table",
format: DataFormat.JSON,
ingestionMapping: [
JsonColumnMapping.withPath("TargetColumn1", "$.sourceProp1"),
JsonColumnMapping.withPath("TargetColumn2", "$.sourceProp2"),
JsonColumnMapping.withPath("TargetColumn3", "$.sourceProp3"),
],
ingestionMappingType: IngestionMappingKind.JSON,
reportLevel: ReportLevel.FailuresAndSuccesses,
reportMethod: ReportMethod.Queue,
});
// After using the client, you should use `close()` to free up resources
const ingestClient = new IngestClient(
KustoConnectionStringBuilder.withAadApplicationKeyAuthentication(`https://ingest-${clusterName}.kusto.windows.net`, appId, appKey, authorityId),
props,
);
const statusQueues = new IngestStatusQueues(ingestClient);
startIngestion();
// Streaming ingest client
const props2 = new IngestionProps({
database: "Database",
table: "Table",
format: DataFormat.JSON,
ingestionMappingReference: "Pre-defined mapping name",
});
// Init with engine endpoint
const streamingIngestClient = new StreamingIngestClient(
KustoConnectionStringBuilder.withAadApplicationKeyAuthentication(`https://${clusterName}.kusto.windows.net`, appId, appKey, authorityId),
props2,
);
startStreamingIngestion();
async function startIngestion() {
console.log("Ingest from file");
try {
await ingestClient.ingestFromFile("file.json");
console.log("Ingestion done?");
await waitForStatus();
} catch (err) {
console.log(err);
}
try {
await ingestClient.ingestFromBlob(new BlobDescriptor("https://<account>.blob.core.windows.net/<container>/file.json.gz", 1024 * 50 /* 50MB file */));
console.log("Ingestion done?");
await waitForStatus();
} catch (err) {
console.log(err);
}
}
async function waitForStatus(numberOFIngestions = 1) {
while ((await statusQueues.failure.isEmpty()) && (await statusQueues.success.isEmpty())) {
console.log("Waiting for status...");
await sleep(1000);
}
const failures = await statusQueues.failure.pop(numberOFIngestions);
for (let failure of failures) {
console.log(`Failed: ${JSON.stringify(failure)}`);
}
const successes = await statusQueues.success.pop(numberOFIngestions);
for (let success of successes) {
console.log(`Succeeded: ${JSON.stringify(success)}`);
}
}
function sleep(ms) {
return new Promise((resolve) => {
setTimeout(resolve, ms);
});
}
async function startStreamingIngestion() {
// Ingest from file with either file path or FileDescriptor
try {
await streamingIngestClient.ingestFromFile("file.json", props2);
console.log("Ingestion done");
} catch (err) {
console.log(err);
}
// Ingest from stream with either ReadStream or StreamDescriptor
let stream = fs.createReadStream("file.json");
try {
await streamingIngestClient.ingestFromStream(stream, props2);
console.log("Ingestion done");
} catch (err) {
console.log(err);
}
// For gzip data set StreamDescriptor.compressionType to CompressionType.GZIP
stream = fs.createReadStream("file.json.gz");
const streamDescriptor = new StreamDescriptor(stream, "id", CompressionType.GZIP);
try {
await streamingIngestClient.ingestFromStream(streamDescriptor, props2);
console.log("Ingestion done");
} catch (err) {
console.log(err);
}
}