Skip to content

Commit

Permalink
exposing source id (#15)
Browse files Browse the repository at this point in the history
* exposing source id

* eslint fixes
  • Loading branch information
Daniel Dubovski authored Feb 21, 2019
1 parent a9ae8ca commit c93a1e1
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 12 deletions.
11 changes: 8 additions & 3 deletions azure-kusto-ingest/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion azure-kusto-ingest/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
"azure-storage": "^2.10.2",
"moment": "^2.22.2",
"request": "^2.88.0",
"uuid": "^3.3.2"
"uuid": "^3.3.2",
"uuid-validate": "0.0.3"
},
"devDependencies": {
"mocha": "^5.2.0",
Expand Down
21 changes: 18 additions & 3 deletions azure-kusto-ingest/source/descriptors.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@ const fs = require("fs");
const path = require("path");
const zlib = require("zlib");
const Transform = require("stream").Transform;
const uuidValidate = require("uuid-validate");

function assertUuid4(maybeUuid, errorMessage) {
if (!!maybeUuid && !uuidValidate(maybeUuid, 4)) {
throw Error(errorMessage);
}
}

class BytesCounter extends Transform {
constructor() {
Expand All @@ -20,12 +26,15 @@ class BytesCounter extends Transform {
}

class FileDescriptor {
constructor(filePath) {
constructor(filePath, sourceId = null) {
this.filePath = filePath;
this.name = path.basename(this.filePath);
this.extension = path.extname(this.filePath).toLowerCase();
this.size = null;
this.zipped = this.extension === ".gz";

assertUuid4(sourceId, "sourceId is not a valid uuid/v4");
this.sourceId = sourceId;
}

_gzip(callback) {
Expand All @@ -52,12 +61,15 @@ class FileDescriptor {


class StreamDescriptor {
constructor(stream) {
constructor(stream, sourceId = null) {
this._stream = stream;

this.stream = null;
this.name = "stream";
this.size = null;

assertUuid4(sourceId, "sourceId is not a valid uuid/v4");
this.sourceId = sourceId;
}

pipe(dest) {
Expand All @@ -70,9 +82,12 @@ class StreamDescriptor {
}

class BlobDescriptor {
constructor(path, size = null) {
constructor(path, size = null, sourceId = null) {
this.path = path;
this.size = size;

assertUuid4(sourceId, "sourceId is not a valid uuid/v4");
this.sourceId = sourceId;
}
}

Expand Down
6 changes: 5 additions & 1 deletion azure-kusto-ingest/source/ingestClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ module.exports = class KustoIngestClient {
blobService.createBlockBlobFromLocalFile(containerDetails.objectName, blobName, fileToUpload, (err) => {
if (err) return callback(err);
let blobUri = `${containerDetails.toURI({ withSas: false })}/${blobName}?${containerDetails.sas}`;
return this.ingestFromBlob(new BlobDescriptor(blobUri, descriptor.size), props, callback);
return this.ingestFromBlob(new BlobDescriptor(blobUri, descriptor.size, descriptor.sourceId), props, callback);
});
});

Expand All @@ -100,6 +100,10 @@ module.exports = class KustoIngestClient {
const props = this._mergeProps(ingestionProperties);
props.validate();

if (typeof (blob) === "string") {
blob = new BlobDescriptor(blob);
}

return this.resourceManager.getIngestionQueues((err, queues) => {
if (err) return callback(err);

Expand Down
8 changes: 4 additions & 4 deletions azure-kusto-ingest/source/ingestionBlobInfo.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ const uuidv4 = require("uuid/v4");
const moment = require("moment");

module.exports = class IngestionBlobInfo {
constructor(blob, ingestionProperties, authContext) {
this.BlobPath = blob.path;
this.RawDataSize = blob.size;
constructor(blobDescriptor, ingestionProperties, authContext) {
this.BlobPath = blobDescriptor.path;
this.RawDataSize = blobDescriptor.size;
this.DatabaseName = ingestionProperties.database;
this.TableName = ingestionProperties.table;
this.RetainBlobOnSuccess = true;
Expand All @@ -13,7 +13,7 @@ module.exports = class IngestionBlobInfo {
this.ReportLevel = ingestionProperties.reportLevel;
this.ReportMethod = ingestionProperties.reportMethod;
this.SourceMessageCreationTime = moment.utc();
this.Id = uuidv4();
this.Id = blobDescriptor.sourceId || uuidv4();

let additionalProperties = ingestionProperties.additionalProperties || {};
additionalProperties.authorizationContext = authContext;
Expand Down

0 comments on commit c93a1e1

Please sign in to comment.