Skip to content

Commit

Permalink
* Update code for WIC support
Browse files Browse the repository at this point in the history
  • Loading branch information
ag-ramachandran committed Sep 4, 2024
1 parent d96fa94 commit 5c413c5
Show file tree
Hide file tree
Showing 3 changed files with 4 additions and 11 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<artifactId>kafka-sink-azure-kusto</artifactId>
<packaging>jar</packaging>
<description>A Kafka Connect plugin for Azure Data Explorer (Kusto) Database</description>
<version>4.1.1</version>
<version>4.1.2</version>
<properties>
<avro.random.generator.version>0.4.1</avro.random.generator.version>
<awaitility.version>4.2.2</awaitility.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,23 +99,16 @@ private static boolean isStreamingEnabled(@NotNull KustoSinkConfig config) throw
kcsb = ConnectionStringBuilder.createWithAadTokenProviderAuthentication(
clusterUrl,
() -> {
WorkloadIdentityCredential wic = new WorkloadIdentityCredentialBuilder()
.clientId(config.getAuthAppId())
.tenantId(config.getAuthAuthority())
.tokenFilePath(config.getTokenFilePath()).build();
WorkloadIdentityCredential wic = new WorkloadIdentityCredentialBuilder().build();
TokenRequestContext requestContext = new TokenRequestContext();
String clusterScope = String.format("%s/.default", clusterUrl);
requestContext.setScopes(Collections.singletonList(clusterScope));
String logContext = String.format("Using scope {%s} for Workload identity federation. Using app-id {%s}" +
", token-file path {%s} and tenant {%s}",
clusterScope, config.getAuthAppId(), config.getTokenFilePath(), config.getAuthAuthority());
log.info(logContext);
AccessToken accessToken = wic.getTokenSync(requestContext);
if (accessToken != null) {
log.debug("Returned access token that expires at {}", accessToken.getExpiresAt());
return accessToken.getToken();
} else {
log.error("Obtained empty token during token refresh. Context {}", logContext);
log.error("Obtained empty token during token refresh. Context {}", clusterScope);
throw new ConnectException("Failed to retrieve WIF token");
}
});
Expand Down
2 changes: 1 addition & 1 deletion src/test/resources/it-table-setup.kql
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
.create-merge table TBL (vnum:int, vdec:decimal, vdate:datetime, vb:boolean, vreal:real, vstr:string, vlong:long,type:string)
.alter table TBL policy streamingingestion enable
.alter table TBL policy ingestionbatching @'{"MaximumBatchingTimeSpan":"00:00:05", "MaximumNumberOfItems": 100, "MaximumRawDataSizeMB": 300}'
.alter table TBL policy ingestionbatching @'{"MaximumBatchingTimeSpan":"00:00:10", "MaximumNumberOfItems": 10, "MaximumRawDataSizeMB": 100}'
.create-or-alter table TBL ingestion avro mapping "avro_mapping" '[{"Column":"vnum","Properties":{"Field":"vnum"}},{"Column":"vreal","Properties":{"Field":"vreal"}},{"Column":"vdec","Properties":{"Field":"vdec"}},{"Column":"vdate","Properties":{"Field":"vdate"}},{"Column":"vstr","Properties":{"Field":"vstr"}},{"Column":"vb","Properties":{"Field":"vb"}},{"Column":"vlong","datatype":"long","Properties":{"Field":"vlong"}},{"Column":"type","Properties":{"ConstValue":"avro"}}]'
.create-or-alter table TBL ingestion json mapping "json_mapping" '[{"column":"vnum","datatype":"int","Properties":{"Path":"$.vnum"}},{"column":"vdate","datatype":"datetime","Properties":{"Path":"$.vdate"}},{"Column":"vdec","datatype":"decimal","Properties":{"Path":"$.vdec"}},{"column":"vb","datatype":"boolean","Properties":{"Path":"$.vb"}},{"column":"vreal","datatype":"real","Properties":{"Path":"$.vreal"}},{"column":"vstr","datatype":"string","Properties":{"Path":"$.vstr"}},{"column":"vlong","datatype":"long","Properties":{"Path":"$.vlong"}},{"column":"type","datatype":"string","Properties":{"ConstValue":"json"}}]'
.create-or-alter table TBL ingestion csv mapping "csv_mapping" '[{"column":"vnum","datatype":"int","Properties":{"Ordinal":"4"}},{"column":"vdate","datatype":"datetime","Properties":{"Ordinal":"1"}},{"Column":"vdec","datatype":"decimal","Properties":{"Ordinal":"2"}},{"column":"vb","datatype":"boolean","Properties":{"Ordinal":"0"}},{"column":"vreal","datatype":"real","Properties":{"Ordinal":"5"}},{"column":"vstr","datatype":"string","Properties":{"Ordinal":"6"}},{"column":"vlong","datatype":"long","Properties":{"Ordinal":"3"}},{"column":"type","datatype":"string","Properties":{"ConstValue":"csv"}}]'

0 comments on commit 5c413c5

Please sign in to comment.