diff --git a/README.md b/README.md index b09f0922..71873a16 100644 --- a/README.md +++ b/README.md @@ -211,7 +211,7 @@ Implement a stream processing architecture using: Implement a stream processing architecture using: - Event Hubs Kafka (Ingest / Immutable Log) -- Flink on HDInsight or Azure Kubernetes Service (Stream Process) +- Flink on HDInsight (Stream Process) - Event Hubs Kafka (Serve) ### [Event Hubs Kafka + Azure Functions + Cosmos DB](https://github.com/Azure-Samples/streaming-at-scale/tree/main/eventhubskafka-functions-cosmosdb) diff --git a/_doc/_images/flink-job-manager.png b/_doc/_images/flink-job-manager.png index e1920b1f..9a6b854a 100644 Binary files a/_doc/_images/flink-job-manager.png and b/_doc/_images/flink-job-manager.png differ diff --git a/assert/has-local-unzip.sh b/assert/has-local-unzip.sh new file mode 100644 index 00000000..02123557 --- /dev/null +++ b/assert/has-local-unzip.sh @@ -0,0 +1,12 @@ +#!/bin/bash + +# Strict mode, fail on any error +set -euo pipefail + +HAS_UNZIP=$(command -v unzip || true) +if [ -z "$HAS_UNZIP" ]; then + echo "unzip not found" + echo "please install it using your package manager, for example, on Ubuntu:" + echo " sudo apt install unzip" + exit 1 +fi diff --git a/components/apache-flink/flink-kafka-consumer/pom.xml b/components/apache-flink/flink-kafka-consumer/pom.xml index abb09ebb..0ca33db8 100644 --- a/components/apache-flink/flink-kafka-consumer/pom.xml +++ b/components/apache-flink/flink-kafka-consumer/pom.xml @@ -9,9 +9,10 @@ UTF-8 - 1.9.1 + 1.16.0 1.8 - 2.11 + 2.12 + 0.12 4.13.1 ${java.version} ${java.version} @@ -35,16 +36,23 @@ org.apache.flink - flink-streaming-java_${scala.binary.version} + flink-streaming-java ${flink.version} provided + + org.apache.flink + flink-clients + ${flink.version} + provided + + org.apache.flink - flink-connector-kafka-0.11_${scala.binary.version} + flink-connector-kafka ${flink.version} @@ -52,15 +60,8 @@ org.slf4j - slf4j-log4j12 - 1.7.7 - runtime - - - log4j - log4j - 1.2.17 - runtime + slf4j-api + 1.7.32 @@ -70,16 +71,25 @@ test + + + org.apache.flink + flink-test-utils-junit + ${flink.version} + test + + + org.apache.flink - flink-runtime_${scala.binary.version} + flink-runtime ${flink.version} test tests org.apache.flink - flink-streaming-java_${scala.binary.version} + flink-streaming-java ${flink.version} test tests diff --git a/components/apache-flink/flink-kafka-consumer/src/main/java/com/microsoft/samples/flink/ComplexEventProcessingJob.java b/components/apache-flink/flink-kafka-consumer/src/main/java/com/microsoft/samples/flink/ComplexEventProcessingJob.java index 5a2004f3..3f0c5066 100755 --- a/components/apache-flink/flink-kafka-consumer/src/main/java/com/microsoft/samples/flink/ComplexEventProcessingJob.java +++ b/components/apache-flink/flink-kafka-consumer/src/main/java/com/microsoft/samples/flink/ComplexEventProcessingJob.java @@ -11,25 +11,26 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; import org.apache.flink.streaming.api.windowing.time.Time; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static com.microsoft.samples.flink.StreamingJobCommon.getParams; + public class ComplexEventProcessingJob { private static final int MAX_EVENT_DELAY = 60; // max delay for out of order events private static final Logger LOG = LoggerFactory.getLogger(ComplexEventProcessingJob.class); public static void main(String[] args) throws Exception { - ParameterTool params = ParameterTool.fromArgs(args); + ParameterTool params = getParams(args); StreamExecutionEnvironment env = StreamingJobCommon.createStreamExecutionEnvironment(params); JsonMapperSchema schema = new JsonMapperSchema(SampleRecord.class); FlinkKafkaConsumerBase> consumer = StreamingJobCommon.createKafkaConsumer(params, schema); JsonMapperSchema schema2 = new JsonMapperSchema(SampleTag.class); - FlinkKafkaProducer011 producer = StreamingJobCommon.createKafkaProducer(params, schema2); + FlinkKafkaProducer producer = StreamingJobCommon.createKafkaProducer(params, schema2); // setup streaming execution environment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); @@ -50,7 +51,6 @@ public static void main(String[] args) throws Exception { } - static void buildStream(DataStream> source, SinkFunction producer, KeyedProcessFunction logic) { DataStream stream = source .rebalance() diff --git a/components/apache-flink/flink-kafka-consumer/src/main/java/com/microsoft/samples/flink/ComplexEventProcessingLogic.java b/components/apache-flink/flink-kafka-consumer/src/main/java/com/microsoft/samples/flink/ComplexEventProcessingLogic.java index 1fa8f03a..8c3307a1 100755 --- a/components/apache-flink/flink-kafka-consumer/src/main/java/com/microsoft/samples/flink/ComplexEventProcessingLogic.java +++ b/components/apache-flink/flink-kafka-consumer/src/main/java/com/microsoft/samples/flink/ComplexEventProcessingLogic.java @@ -32,8 +32,6 @@ public void processElement(SampleRecord receivedRecord, Context context, Collect state = new SampleState(); } - LOG.debug(String.format("DBG-3 %s, %d, %d", receivedRecord.deviceId, state.recordsSize(), state.tagsSize())); - // add latest record to the state state.addRecord(receivedRecord); diff --git a/components/apache-flink/flink-kafka-consumer/src/main/java/com/microsoft/samples/flink/SimpleRelayStreamingJob.java b/components/apache-flink/flink-kafka-consumer/src/main/java/com/microsoft/samples/flink/SimpleRelayStreamingJob.java index 22876320..a8ac2082 100755 --- a/components/apache-flink/flink-kafka-consumer/src/main/java/com/microsoft/samples/flink/SimpleRelayStreamingJob.java +++ b/components/apache-flink/flink-kafka-consumer/src/main/java/com/microsoft/samples/flink/SimpleRelayStreamingJob.java @@ -15,11 +15,13 @@ import java.time.Instant; +import static com.microsoft.samples.flink.StreamingJobCommon.getParams; + public class SimpleRelayStreamingJob { private static final int MAX_EVENT_DELAY = 60; // max delay for out of order events public static void main(String[] args) throws Exception { - ParameterTool params = ParameterTool.fromArgs(args); + ParameterTool params = getParams(args); StreamExecutionEnvironment env = StreamingJobCommon.createStreamExecutionEnvironment(params); JsonMapperSchema schema = new JsonMapperSchema<>(SampleRecord.class); diff --git a/components/apache-flink/flink-kafka-consumer/src/main/java/com/microsoft/samples/flink/StreamingJobCommon.java b/components/apache-flink/flink-kafka-consumer/src/main/java/com/microsoft/samples/flink/StreamingJobCommon.java index ac699d0c..19f29c6b 100644 --- a/components/apache-flink/flink-kafka-consumer/src/main/java/com/microsoft/samples/flink/StreamingJobCommon.java +++ b/components/apache-flink/flink-kafka-consumer/src/main/java/com/microsoft/samples/flink/StreamingJobCommon.java @@ -3,13 +3,15 @@ import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.io.InputStream; import java.util.Properties; @@ -44,18 +46,17 @@ static FlinkKafkaConsumerBase createKafkaConsumer(ParameterTool params, K LOG.info("Consuming from Kafka topic: {}", topicIn); // Create Kafka consumer deserializing from JSON. - // Flink recommends using Kafka 0.11 consumer as Kafka 1.0 consumer is not stable. - return new FlinkKafkaConsumer011<>(topicIn, schema, properties); + return new FlinkKafkaConsumer<>(topicIn, schema, properties); } - static FlinkKafkaProducer011 createKafkaProducer(ParameterTool params, SerializationSchema schema) { + static FlinkKafkaProducer createKafkaProducer(ParameterTool params, SerializationSchema schema) { Properties propertiesOut = new Properties(); setProperties(params, "kafka.out.", propertiesOut); String topicOut = (String) propertiesOut.remove("topic"); if (topicOut == null) throw new IllegalArgumentException("Missing configuration value kafka.topic.out"); LOG.info("Writing into Kafka topic: {}", topicOut); - FlinkKafkaProducer011 kafkaOut = new FlinkKafkaProducer011<>( + FlinkKafkaProducer kafkaOut = new FlinkKafkaProducer<>( topicOut, schema, propertiesOut @@ -70,6 +71,12 @@ private static void setProperties(ParameterTool params, String prefix, Propertie }); } - + static ParameterTool getParams(String[] args) throws IOException { + InputStream resourceAsStream = StreamingJobCommon.class.getClassLoader().getResourceAsStream("params.properties"); + if (resourceAsStream != null) { + return ParameterTool.fromPropertiesFile(resourceAsStream); + } + return ParameterTool.fromArgs(args); + } } diff --git a/components/apache-flink/flink-kafka-consumer/src/main/java/com/microsoft/samples/flink/data/SampleRecord.java b/components/apache-flink/flink-kafka-consumer/src/main/java/com/microsoft/samples/flink/data/SampleRecord.java index abc64850..d2c6dc46 100755 --- a/components/apache-flink/flink-kafka-consumer/src/main/java/com/microsoft/samples/flink/data/SampleRecord.java +++ b/components/apache-flink/flink-kafka-consumer/src/main/java/com/microsoft/samples/flink/data/SampleRecord.java @@ -43,5 +43,6 @@ public static class ComplexData implements Serializable { public double moreData20; public double moreData21; public double moreData22; + public double moreData23; } } diff --git a/components/apache-flink/flink-kafka-consumer/src/main/java/com/microsoft/samples/flink/utils/JsonMapperSchema.java b/components/apache-flink/flink-kafka-consumer/src/main/java/com/microsoft/samples/flink/utils/JsonMapperSchema.java index d8ebeeea..d0f2d3e5 100755 --- a/components/apache-flink/flink-kafka-consumer/src/main/java/com/microsoft/samples/flink/utils/JsonMapperSchema.java +++ b/components/apache-flink/flink-kafka-consumer/src/main/java/com/microsoft/samples/flink/utils/JsonMapperSchema.java @@ -3,7 +3,6 @@ import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; @@ -11,22 +10,23 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.module.SimpleModule; import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.header.Headers; import java.io.IOException; import java.io.Serializable; import java.time.Instant; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; +import java.util.Optional; public class JsonMapperSchema implements KafkaDeserializationSchema>, SerializationSchema { - private static final DateTimeFormatter dateTimeFormatter = - DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSX").withZone(ZoneOffset.UTC); private final Class type; private final ObjectMapper mapper; + private static final DateTimeFormatter dateTimeFormatter = + DateTimeFormatter.ISO_INSTANT.withZone(ZoneOffset.UTC); + public JsonMapperSchema(Class type) { this.type = type; @@ -57,7 +57,7 @@ public ConsumerRecord deserialize(ConsumerRecord r) t byte[] message = r.value(); T v = mapper.readValue(message, type); return new - ConsumerRecord(r.topic(), r.partition(), r. offset(), r. timestamp(), r. timestampType(), null, r. serializedKeySize(), r. serializedValueSize(), r. key(), v, r. headers()); + ConsumerRecord<>(r.topic(), r.partition(), r.offset(), r.timestamp(), r.timestampType(), r.serializedKeySize(), r.serializedValueSize(), r.key(), v, r.headers(), Optional.empty()); } public TypeInformation> getProducedType() { @@ -76,7 +76,7 @@ private static class InstantDeserializer extends JsonDeserializer imple @Override public Instant deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException { - return dateTimeFormatter.parse(jsonParser.getText(), Instant::from); + return Instant.parse(jsonParser.getText()); } } } diff --git a/components/apache-flink/flink-kafka-consumer/src/test/java/com/microsoft/samples/flink/ComplexEventJobProcessingLogicTest.java b/components/apache-flink/flink-kafka-consumer/src/test/java/com/microsoft/samples/flink/ComplexEventJobProcessingLogicTest.java index f5ed479c..230a799b 100644 --- a/components/apache-flink/flink-kafka-consumer/src/test/java/com/microsoft/samples/flink/ComplexEventJobProcessingLogicTest.java +++ b/components/apache-flink/flink-kafka-consumer/src/test/java/com/microsoft/samples/flink/ComplexEventJobProcessingLogicTest.java @@ -29,7 +29,7 @@ public void setupTestHarness() throws Exception { //instantiate user-defined function ComplexEventProcessingLogic logic = new ComplexEventProcessingLogic(); - // wrap user defined function into a the corresponding operator + // wrap user defined function into the corresponding operator testHarness = new KeyedOneInputStreamOperatorTestHarness<>( new KeyedProcessOperator<>(logic), (KeySelector) value -> value.deviceId, diff --git a/components/apache-flink/hdinsight-aks/oneClickFlink.json b/components/apache-flink/hdinsight-aks/oneClickFlink.json new file mode 100644 index 00000000..fe0ec582 --- /dev/null +++ b/components/apache-flink/hdinsight-aks/oneClickFlink.json @@ -0,0 +1,314 @@ +{ + "$schema": "https://schema.management.azure.com/schemas/2019-04-01/deploymentTemplate.json#", + "contentVersion": "1.0.0.0", + "parameters": { + "clusterPoolName": { + "type": "String", + "metadata": { + "description": "The name of cluster pool." + } + }, + "clusterPoolVersion": { + "defaultValue": "1.0", + "type": "String", + "metadata": { + "description": "HDInsight on AKS cluster pool version to be created." + } + }, + "clusterPoolNodeVmSize": { + "defaultValue": "Standard_F4s_v2", + "allowedValues": [ + "Standard_F4s_v2", + "Standard_D4a_v4", + "Standard_D4as_v4", + "Standard_E4s_v3" + ], + "type": "String", + "metadata": { + "description": "VM SKU size for the cluster pool." + } + }, + "resourcePrefix": { + "type": "String", + "metadata": { + "description": "Provide a prefix for creating necessary resources required for cluster creation, your necessary resources will be named as [prefix + predefined string]. For example, if you provide resource prefix as demo then, MSI is created with name as demoMSI and Storage is created with name as demostore." + } + }, + "clusterName": { + "type": "String", + "metadata": { + "description": "The name of the cluster." + } + }, + "hdinsightOnAksClusterVersion": { + "defaultValue": "1.0.6", + "type": "String", + "metadata": { + "description": "The HDInsight on AKS cluster version to be created." + } + }, + "clusterOssVersion": { + "defaultValue": "1.16.0", + "type": "String", + "metadata": { + "description": "The OSS version of the cluster to be created." + } + }, + "subnetId": { + "type": "String", + "defaultValue": "", + "metadata": { + "description": "Subnet resource id" + } + }, + "clusterPoolLogAnalyticsWorkspaceId": { + "type": "string", + "metadata": { + "description": "Resource Id of the Log Analytics workspace when the logs to be stored" + } + }, + "taskManagerCPU": { + "type": "string", + "defaultValue": "2", + "metadata": { + "description": "Task manager CPU count" + } + }, + "taskManagerMemoryInMB": { + "type": "string", + "defaultValue": "2000", + "metadata": { + "description": "Task manager memory in MB" + } + }, + "jobManagerCPU": { + "type": "string", + "defaultValue": "1", + "metadata": { + "description": "Job manager CPU count" + } + }, + "jobManagerMemoryInMB": { + "type": "string", + "defaultValue": "2000", + "metadata": { + "description": "Job manager memory in MB" + } + }, + "historyServerCPU": { + "type": "string", + "defaultValue": "1", + "metadata": { + "description": "History server CPU count" + } + }, + "historyServerMemoryInMB": { + "type": "string", + "defaultValue": "2000", + "metadata": { + "description": "History server memory in MB" + } + }, + "headNodeVMSize": { + "type": "string", + "metadata": { + "description": "VM SKU selected for the head node" + } + }, + "headNodeCount": { + "type": "string", + "defaultValue": "2", + "metadata": { + "description": "Head node count" + } + }, + "workerNodeVMSize": { + "type": "string", + "metadata": { + "description": "VM SKU selected for the worker node" + } + }, + "workerNodeCount": { + "type": "string", + "defaultValue": "3", + "metadata": { + "description": "Worker node count" + } + }, + "userObjectId": { + "type": "String", + "metadata": { + "description": "The user alias object ID from Azure Active Directory. For example, myuserid@microsoft.com, search for this alias in AAD in the Azure portal and copy the Object ID." + } + } + }, + "variables": { + "clusterName": "[concat(parameters('clusterPoolName'), '/', parameters('clusterName'))]", + "msiName": "[concat(parameters('resourcePrefix'), 'MSI')]", + "roleAssignedGuid": "[guid(variables('msiName'), resourceGroup().id, deployment().name)]", + "storageName": "[concat(parameters('resourcePrefix'), 'store')]" + }, + "resources": [ + { + "type": "microsoft.hdinsight/clusterpools", + "apiVersion": "2023-06-01-preview", + "name": "[parameters('clusterPoolName')]", + "location": "[resourceGroup().location]", + "properties": { + "networkProfile": { + "subnetId": "[parameters('subnetId')]" + }, + "logAnalyticsProfile": { + "enabled": true, + "workspaceId": "[parameters('clusterPoolLogAnalyticsWorkspaceId')]" + }, + "clusterPoolProfile": { + "clusterPoolVersion": "[parameters('clusterPoolVersion')]" + }, + "computeProfile": { + "vmSize": "[parameters('clusterPoolNodeVmSize')]", + "count": 3 + } + } + }, + { + "type": "Microsoft.ManagedIdentity/userAssignedIdentities", + "apiVersion": "2018-11-30", + "name": "[variables('msiName')]", + "location": "[resourceGroup().location]", + "properties": {} + }, + { + "type": "Microsoft.Storage/storageAccounts", + "apiVersion": "2019-04-01", + "name": "[variables('storageName')]", + "location": "[resourceGroup().location]", + "sku": { + "name": "Standard_RAGRS", + "tier": "Standard" + }, + "kind": "StorageV2", + "properties": { + "minimumTlsVersion": "TLS1_2", + "allowBlobPublicAccess": true, + "allowSharedKeyAccess": true, + "isHnsEnabled": true, + "networkAcls": { + "bypass": "AzureServices", + "virtualNetworkRules": [], + "ipRules": [], + "defaultAction": "Allow" + } + } + }, + { + "type": "Microsoft.Storage/storageAccounts/blobServices/containers", + "apiVersion": "2019-06-01", + "name": "[concat(variables('storageName'), '/default/', 'container1')]", + "dependsOn": [ + "[resourceId('Microsoft.Storage/storageAccounts', variables('storageName'))]" + ], + "properties": { + "publicAccess": "None" + } + }, + { + "type": "Microsoft.Authorization/roleAssignments", + "apiVersion": "2020-10-01-preview", + "name": "[variables('roleAssignedGuid')]", + "dependsOn": [ + "[resourceId('microsoft.hdinsight/clusterpools',parameters('clusterPoolName'))]", + "[resourceId('Microsoft.ManagedIdentity/userAssignedIdentities', variables('msiName'))]", + "[variables('msiName')]" + ], + "properties": { + "roleDefinitionId": "[concat('/subscriptions/', subscription().subscriptionId, '/providers/Microsoft.Authorization/roleDefinitions/', 'f1a07417-d97a-45cb-824c-7a7467783830')]", + "principalId": "[reference(resourceId('microsoft.hdinsight/clusterpools', parameters('clusterPoolName')), '2023-06-01-preview').aksClusterProfile.aksClusterAgentPoolIdentityProfile.msiObjectId]" + }, + "scope": "[concat('Microsoft.ManagedIdentity/userAssignedIdentities', '/', variables('msiName'))]" + }, + { + "type": "Microsoft.Authorization/roleAssignments", + "apiVersion": "2018-09-01-preview", + "name": "[guid(variables('storageName'), resourceGroup().id)]", + "dependsOn": [ + "[resourceId('Microsoft.Storage/storageAccounts', variables('storageName'))]", + "[resourceId('Microsoft.ManagedIdentity/userAssignedIdentities', variables('msiName'))]", + "[resourceId('microsoft.hdinsight/clusterpools', parameters('clusterPoolName'))]", + "[variables('storageName')]" + ], + "properties": { + "roleDefinitionId": "[concat('/subscriptions/', subscription().subscriptionId, '/providers/Microsoft.Authorization/roleDefinitions/', 'b7e6dc6d-f1e8-4753-8033-0f276bb0955b')]", + "principalId": "[reference(resourceId('Microsoft.ManagedIdentity/userAssignedIdentities', variables('msiName')), '2018-11-30').principalId]" + }, + "scope": "[concat('Microsoft.Storage/storageAccounts', '/', variables('storageName'))]" + }, + { + "type": "microsoft.hdinsight/clusterpools/clusters", + "apiVersion": "2023-06-01-preview", + "name": "[variables('clusterName')]", + "location": "[resourceGroup().location]", + "dependsOn": [ + "[resourceId('microsoft.hdinsight/clusterpools', parameters('clusterPoolName'))]", + "[resourceId('Microsoft.Storage/storageAccounts', variables('storageName'))]", + "[resourceId('Microsoft.ManagedIdentity/userAssignedIdentities', variables('msiName'))]" + ], + "properties": { + "clusterType": "Flink", + "computeProfile": { + "nodes": [ + { + "type": "Head", + "vmSize": "[parameters('headNodeVMSize')]", + "count": "[int(parameters('headNodeCount'))]" + }, + { + "type": "Worker", + "vmSize": "[parameters('workerNodeVMSize')]", + "count": "[int(parameters('workerNodeCount'))]" + } + ] + }, + "clusterProfile": { + "clusterVersion": "[parameters('hdinsightOnAksClusterVersion')]", + "ossVersion": "[parameters('clusterOssVersion')]", + "identityProfile": { + "msiResourceId": "[string(resourceId('Microsoft.ManagedIdentity/userAssignedIdentities', variables('msiName')))]", + "msiClientId": "[string(reference(resourceId('Microsoft.ManagedIdentity/userAssignedIdentities', variables('msiName'))).clientId)]", + "msiObjectId": "[string(reference(resourceId('Microsoft.ManagedIdentity/userAssignedIdentities', variables('msiName'))).principalId)]" + }, + "authorizationProfile": { + "userIds": [ + "[parameters('userObjectId')]" + ] + }, + "logAnalyticsProfile": { + "enabled": true, + "applicationLogs": { + "stdErrorEnabled": true, + "stdOutEnabled": true + }, + "metricsEnabled": true + }, + "flinkProfile": { + "jobManager": { + "cpu": "[int(parameters('jobManagerCPU'))]", + "memory": "[int(parameters('jobManagerMemoryInMB'))]" + }, + "taskManager": { + "cpu": "[int(parameters('taskManagerCPU'))]", + "memory": "[int(parameters('taskManagerMemoryInMB'))]" + }, + "historyServer": { + "cpu": "[int(parameters('historyServerCPU'))]", + "memory": "[int(parameters('historyServerMemoryInMB'))]" + }, + "storage": { + "storageUri": "[concat('abfs://container1', '@', variables('storageName'), '.dfs.core.windows.net')]" + } + } + } + } + } + ] +} diff --git a/components/apache-flink/hdinsight-aks/run-flink-job.sh b/components/apache-flink/hdinsight-aks/run-flink-job.sh new file mode 100755 index 00000000..1e8a6206 --- /dev/null +++ b/components/apache-flink/hdinsight-aks/run-flink-job.sh @@ -0,0 +1,64 @@ +#!/bin/bash + +# Strict mode, fail on any error +set -euo pipefail + +echo 'Preparing Flink Job JAR' + +base_jar=flink-kafka-consumer-$FLINK_JOBTYPE.jar +mkdir -p target +jar_name=$FLINK_JOBTYPE.jar +jar_path=target/$jar_name + +cp "../components/apache-flink/flink-kafka-consumer/target/assembly/$base_jar" $jar_path + +main_class=$(unzip -p target/simple-relay.jar META-INF/MANIFEST.MF | grep ^Main-Class: |awk '{print $2}' RS='\r\n') + +cat << EOF > params.properties +kafka.in.topic=$KAFKA_TOPIC +kafka.in.bootstrap.servers=$KAFKA_IN_LISTEN_BROKERS +kafka.in.request.timeout.ms=60000 +kafka.in.sasl.mechanism=$KAFKA_IN_LISTEN_SASL_MECHANISM +kafka.in.security.protocol=$KAFKA_IN_LISTEN_SECURITY_PROTOCOL +kafka.in.sasl.jaas.config=$KAFKA_IN_LISTEN_JAAS_CONFIG +kafka.in.group.id=$PREFIX +kafka.out.topic=$KAFKA_OUT_TOPIC +kafka.out.bootstrap.servers=$KAFKA_OUT_SEND_BROKERS +kafka.out.request.timeout.ms=60000 +kafka.out.sasl.mechanism=$KAFKA_OUT_SEND_SASL_MECHANISM +kafka.out.security.protocol=$KAFKA_OUT_SEND_SECURITY_PROTOCOL +kafka.out.sasl.jaas.config=$KAFKA_OUT_SEND_JAAS_CONFIG +EOF + +zip -g $jar_path params.properties +rm params.properties + +echo 'uploading Flink job jar' + +jobname=$(uuidgen | tr A-Z a-z) +jarname="$jobname.jar" + +# if false; then +az storage blob upload --account-name "$HDINSIGHT_AKS_RESOURCE_PREFIX"store -c container1 \ + -n $jarname -f $jar_path \ + --overwrite \ + -o tsv >> log.txt +#fi + +echo 'running Flink job' + +cluster_resource=$(az resource show -g $RESOURCE_GROUP -n $HDINSIGHT_AKS_NAME --resource-type microsoft.hdinsight/clusterPools --api-version 2021-09-15-preview -o tsv --query id) +az rest --method POST --url "https://management.azure.com$cluster_resource/clusters/$HDINSIGHT_CLUSTER_NAME/runJob?api-version=2023-06-01-preview" \ +--body '{ + "properties": { + "jobType": "FlinkJob", + "jobName": "'$jobname'", + "action": "NEW", + "jobJarDirectory": "abfs://container1@'$HDINSIGHT_AKS_RESOURCE_PREFIX'store.dfs.core.windows.net/", + "jarName": "'$jarname'", + "entryClass": "'$main_class'", + "flinkConfiguration": { + "parallelism": "'$FLINK_PARALLELISM'" + } + } +}' diff --git a/components/apache-flink/hdinsight-aks/run-flink.sh b/components/apache-flink/hdinsight-aks/run-flink.sh new file mode 100644 index 00000000..4d5ff691 --- /dev/null +++ b/components/apache-flink/hdinsight-aks/run-flink.sh @@ -0,0 +1,4 @@ +source ../components/azure-monitor/create-log-analytics.sh +source ../components/azure-hdinsight/create-hdinsight-aks-flink.sh + +source ../components/apache-flink/hdinsight-aks/run-flink-job.sh diff --git a/components/azure-databricks/create-databricks.sh b/components/azure-databricks/create-databricks.sh index f3439bf0..47a3a47f 100755 --- a/components/azure-databricks/create-databricks.sh +++ b/components/azure-databricks/create-databricks.sh @@ -31,8 +31,10 @@ databricks_metainfo=$(az resource show -g $RESOURCE_GROUP --resource-type Micros # Databricks CLI automatically picks up configuration from $DATABRICKS_HOST and $DATABRICKS_TOKEN. export DATABRICKS_HOST=$(jq -r '"https://" + .location + ".azuredatabricks.net"' <<<"$databricks_metainfo") -echo 'creating Key Vault to store Databricks PAT token' -az keyvault create -g $RESOURCE_GROUP -n $ADB_TOKEN_KEYVAULT -o tsv >>log.txt +if ! az keyvault show -g $RESOURCE_GROUP -n $ADB_TOKEN_KEYVAULT -o none 2>/dev/null ; then + echo 'creating Key Vault to store Databricks PAT token' + az keyvault create -g $RESOURCE_GROUP -n $ADB_TOKEN_KEYVAULT -o tsv >>log.txt +fi echo 'checking PAT token secret presence in Key Vault' databricks_token_secret_name="DATABRICKS-TOKEN" diff --git a/components/azure-event-hubs/create-event-hub.sh b/components/azure-event-hubs/create-event-hub.sh index 4d233605..d074dce0 100755 --- a/components/azure-event-hubs/create-event-hub.sh +++ b/components/azure-event-hubs/create-event-hub.sh @@ -29,7 +29,7 @@ echo ". name: $eventHubName" echo ". partitions: $EVENTHUB_PARTITIONS" az eventhubs eventhub create -n $eventHubName -g $RESOURCE_GROUP \ - --message-retention 1 --partition-count $EVENTHUB_PARTITIONS --namespace-name $eventHubsNamespace \ + --partition-count $EVENTHUB_PARTITIONS --namespace-name $eventHubsNamespace \ --enable-capture "$EVENTHUB_CAPTURE" --capture-interval 300 --capture-size-limit 314572800 \ --archive-name-format 'capture/{Namespace}/{EventHub}/{Year}_{Month}_{Day}_{Hour}_{Minute}_{Second}_{PartitionId}' \ --blob-container streamingatscale \ diff --git a/components/azure-hdinsight/create-hdinsight-aks-flink.sh b/components/azure-hdinsight/create-hdinsight-aks-flink.sh new file mode 100755 index 00000000..4d3458f0 --- /dev/null +++ b/components/azure-hdinsight/create-hdinsight-aks-flink.sh @@ -0,0 +1,36 @@ +#!/bin/bash + +set -euo pipefail + +# Get ID of current user +userId=$(az ad signed-in-user show --query id -o tsv) + + +if ! az resource show -g $RESOURCE_GROUP -n $HDINSIGHT_AKS_NAME --resource-type microsoft.hdinsight/clusterPools --api-version 2021-09-15-preview -o none 2>/dev/null ; then + echo "getting Subnet ID" + subnet_id=$(az network vnet subnet show -g $RESOURCE_GROUP -n streaming-subnet --vnet-name $VNET_NAME --query id -o tsv) + + echo "getting Log Analytics workspace ID" + analytics_ws_resourceId=$(az resource show -g $RESOURCE_GROUP -n $LOG_ANALYTICS_WORKSPACE --resource-type Microsoft.OperationalInsights/workspaces --query id -o tsv) + + echo 'creating HDInsight cluster' + echo ". name: $HDINSIGHT_AKS_NAME" + + az deployment group create \ + --no-prompt \ + --resource-group $RESOURCE_GROUP \ + --template-file "../components/apache-flink/hdinsight-aks/OneClickFlink.json" \ + --parameters \ + clusterPoolName=$HDINSIGHT_AKS_NAME \ + clusterName=$HDINSIGHT_CLUSTER_NAME \ + resourcePrefix=$HDINSIGHT_AKS_RESOURCE_PREFIX \ + headNodeVMSize=$HDINSIGHT_AKS_WORKER_SIZE \ + workerNodeVMSize=$HDINSIGHT_AKS_WORKER_SIZE \ + workerNodeCount=$FLINK_PARALLELISM \ + userObjectId=$userId \ + clusterPoolLogAnalyticsWorkspaceId=$analytics_ws_resourceId \ + subnetId=$subnet_id \ + jobManagerCPU=2 \ + jobManagerMemoryInMB=8000 \ + -o tsv >> log.txt +fi diff --git a/eventhubs-streamanalytics-cosmosdb/create-solution.sh b/eventhubs-streamanalytics-cosmosdb/create-solution.sh old mode 100644 new mode 100755 diff --git a/eventhubs-streamanalytics-eventhubs/create-solution.sh b/eventhubs-streamanalytics-eventhubs/create-solution.sh old mode 100644 new mode 100755 diff --git a/eventhubskafka-flink-eventhubskafka/README.md b/eventhubskafka-flink-eventhubskafka/README.md index 2d706add..4cf35a34 100644 --- a/eventhubskafka-flink-eventhubskafka/README.md +++ b/eventhubskafka-flink-eventhubskafka/README.md @@ -3,7 +3,7 @@ topic: sample languages: - azurecli - json - - sql + - java products: - azure - azure-container-instances @@ -18,8 +18,6 @@ statusNotificationTargets: This sample uses Apache Flink to process streaming data from Event Hubs Kafka and uses another Event Hubs Kafka as a sink to store JSON data. This is done to analyze pure streaming performance of Flink; no aggregation is done and data is passed as fast as possible from the input to the output. Data is augmented by adding additional fields. -The sample provides a choice among options for hosting Flink: Azure Kubernetes Service, or Azure HDInsight Hadoop (using YARN). - To support very high throughput, two different Event Hubs namespaces are deployed by the template. Event Hubs capacity is limited to up to 20 units of 1 MB/s each (although this limit can be increased through a support ticket). If incoming throughput is under 10 MB/s, you could deploy two Event Hub instances under a single namespace instead. The provided scripts will create an end-to-end solution complete with load test client. @@ -40,8 +38,8 @@ The following tools/languages are also needed: - Install: `sudo apt install jq` - [Maven](https://maven.apache.org/install.html) - Install: `sudo apt install maven` -- [kubectl](https://kubernetes.io/docs/tasks/tools/install-kubectl/) -- [helm](https://helm.sh/docs/using_helm/#installing-helm) +- [Unzip](https://askubuntu.com/questions/660846/how-to-zip-and-unzip-a-directory-and-its-files-in-linux) + - Install: `sudo apt install unzip` ## Setup Solution @@ -78,9 +76,9 @@ To make sure that name collisions will be unlikely, you should use a random stri The script will create the following resources: -- **Azure Container Instances** to host Spark Load Test Clients: by default one client will be created, generating a load of 1000 events/second +- **Azure Container Instances** to host Load Test Clients: by default one client will be created, generating a load of 1000 events/second - **Event Hubs** Namespace, Hub and Consumer Group: to ingest data incoming from test clients and to store data generated by Apache Flink -- **HDInsight** or **Azure Kubernetes Service**: to host the Apache Flink job that processes event data +- **HDInsight**: to host the Apache Flink job that processes event data - **Azure Monitor**: to monitor HDInsight, Azure Kubernetes Service and Flink ## Streamed Data @@ -121,14 +119,9 @@ If you want to change some setting of the solution, like number of load test cli export EVENTHUB_CAPACITY=2 export EVENTHUB_PARTITIONS=1 - export FLINK_PARALLELISM=1 + export FLINK_PARALLELISM=3 export SIMULATOR_INSTANCES=1 - # settings for AKS (-p aks) - export AKS_NODES=3 - export AKS_VM_SIZE=Standard_D2s_v3 - # settings for HDInsight YARN (-p hdinsight) - export HDINSIGHT_HADOOP_WORKERS=3 - export HDINSIGHT_HADOOP_WORKER_SIZE=Standard_D3_V2 + export HDINSIGHT_AKS_WORKER_SIZE=Standard_D8ds_v5 The above settings have been chosen to sustain a 1,000 msg/s stream. The script also contains settings for 5,000 msg/s and 10,000 msg/s. @@ -138,28 +131,34 @@ The deployment script will report performance, by default every minute for 30 mi ``` ***** [M] Starting METRICS reporting -Event Hub capacity: 2 throughput units (this determines MAX VALUE below). Reporting aggregate metrics per minute, offset by 2 minutes, for 30 minutes. - Event Hub # IncomingMessages IncomingBytes OutgoingMessages OutgoingBytes ThrottledRequests - ----------- ---------------- ------------- ---------------- ------------- ----------------- - MAX VALUE 120000 120000000 491520 240000000 - - ----------- ---------------- ------------- ---------------- ------------- ----------------- - 2019-11-10T08:17:44 1 0 0 0 0 0 - 2019-11-10T08:17:44 2 0 0 0 0 0 - 2019-11-10T08:18:00 1 0 0 0 0 0 - 2019-11-10T08:18:00 2 0 0 0 0 0 - 2019-11-10T08:19:00 1 0 0 0 0 0 - 2019-11-10T08:19:00 2 0 0 0 0 0 - 2019-11-10T08:22:37 1 0 0 0 0 0 - 2019-11-10T08:22:37 2 0 0 0 0 0 - 2019-11-10T08:23:00 1 43163 40022882 43163 40365390 0 - 2019-11-10T08:23:00 2 37007 37332787 0 0 0 - 2019-11-10T08:24:00 1 59966 55621703 59966 56097577 0 - 2019-11-10T08:24:00 2 59943 60488690 0 0 0 - 2019-11-10T08:25:00 1 60258 55947759 60258 56425866 0 - 2019-11-10T08:25:00 2 60117 60728670 0 0 0 - 2019-11-10T08:26:00 1 60027 55738691 60027 56214951 0 - 2019-11-10T08:26:00 2 60003 60612850 0 0 0 + Event Hub # IncomingMessages IncomingBytes OutgoingMessages OutgoingBytes ThrottledRequests + ----------- ---------------- ------------- ---------------- ------------- ------------------ + 2023-08-15T08:01:57+0200 Event Hub 1 0 0 0 0 0 + 2023-08-15T08:01:59+0200 Event Hub 2 0 0 0 0 0 + 2023-08-15T08:02:02+0200 Event Hub 1 0 0 0 0 0 + 2023-08-15T08:02:03+0200 Event Hub 2 0 0 0 0 0 + 2023-08-15T08:03:02+0200 Event Hub 1 0 0 0 0 0 + 2023-08-15T08:03:03+0200 Event Hub 2 0 0 0 0 0 + 2023-08-15T08:04:02+0200 Event Hub 1 0 0 0 0 0 + 2023-08-15T08:04:04+0200 Event Hub 2 0 0 0 0 0 + 2023-08-15T08:05:09+0200 Event Hub 1 0 0 0 0 0 + 2023-08-15T08:05:11+0200 Event Hub 2 0 0 0 0 0 + 2023-08-15T08:06:03+0200 Event Hub 1 23821 20565185 0 0 0 + 2023-08-15T08:06:04+0200 Event Hub 2 0 0 0 0 0 + 2023-08-15T08:07:03+0200 Event Hub 1 60088 51900051 0 0 0 + 2023-08-15T08:07:05+0200 Event Hub 2 0 0 0 0 0 + 2023-08-15T08:08:02+0200 Event Hub 1 60013 51874051 0 0 0 + 2023-08-15T08:08:04+0200 Event Hub 2 0 0 0 0 0 + 2023-08-15T08:09:02+0200 Event Hub 1 60083 51952748 0 0 0 + 2023-08-15T08:09:04+0200 Event Hub 2 0 0 0 0 0 + 2023-08-15T08:10:04+0200 Event Hub 1 60113 51976704 0 0 0 + 2023-08-15T08:10:05+0200 Event Hub 2 0 0 0 0 0 + 2023-08-15T08:11:02+0200 Event Hub 1 60122 51991471 0 0 0 + 2023-08-15T08:11:04+0200 Event Hub 2 0 0 0 0 0 + 2023-08-15T08:12:02+0200 Event Hub 1 59964 51849376 0 0 0 + 2023-08-15T08:12:04+0200 Event Hub 2 0 0 0 0 0 + ``` In column "Event Hub #", 1 refers to the Event Hub used as input to @@ -173,27 +172,10 @@ around 60k events/min. The deployed Apache Flink solution doesn't do any analytics or projection, but only populates two fields in the JSON message: the time at which the event was received in Event Hubs, and the current timestamp. -The solution includes a custom monitoring library to log Flink events and metrics to Azure Monitor. The custom monitoring library is currently only included when the Flink job is deployed in AKS. To view the monitoring data, navigate to the Log Analytics resource in the Azure Portal. - -The Flink Job Manager UI shows information about the current running job. The IP address of the Job Manager UI is reported by the deployment script. Note that the solution deploys the Job Manager on a public IP address without any security. In a production deployment, you should disable public IP endpoints. +The Flink Job Manager UI shows information about the current running job. The Job Manager UI is accessible from the Azure Portal. ![Flink Job Manager Web UI](../_doc/_images/flink-job-manager.png) -### Flink deployment on AKS - -Deployment on Azure Kubernetes Service is done in single-job, highly available mode. The deployment includes: -* A Zookeeper cluster for maintaining quorum -* A pod for the (per-job) Flink Job Manager and -* A pod for each Flink Task Manager deployed as part of the job - -In HA mode, the Flink the JobManager exposes a dynamically allocated port. Together with the JobManager, we run a custom sidecar container containing a small shell script. The script calls the JobManager REST API (running on fixed port 8081) to discover the JobManager RPC port, then calls the Kubernetes API to update the port exposed in the Kubernetes Service. RBAC is used to grant the sidecar container permissions to only this specific operation in the API. - -### Flink deployment on HDInsight - -Deployment on HDInsight is done in job server, highly available mode. The deployment runs a YARN job for the Flink Job Manager, then submits a JAR job to the Job Manager. The Job Manager creates a YARN application per job. - -Note that deployed jobs do not survive an HDInsight cluster reboot. - ## Query Data Data is available in the created Event Hub output. You can use the Process Data screen in the Azure portal to inspect the event data. diff --git a/eventhubskafka-flink-eventhubskafka/create-solution.sh b/eventhubskafka-flink-eventhubskafka/create-solution.sh index 3486e48b..39ce2abe 100755 --- a/eventhubskafka-flink-eventhubskafka/create-solution.sh +++ b/eventhubskafka-flink-eventhubskafka/create-solution.sh @@ -7,11 +7,10 @@ export PREFIX='' export LOCATION="eastus" export TESTTYPE="1" export STEPS="CIPTM" -export FLINK_PLATFORM='aks' export FLINK_JOBTYPE='simple-relay' usage() { - echo "Usage: $0 -d [-s ] [-t ] [-l ] [-p ]" + echo "Usage: $0 -d [-s ] [-t ] [-l ]" echo "-s: specify which steps should be executed. Default=$STEPS" echo " Possible values:" echo " C=COMMON" @@ -21,7 +20,6 @@ usage() { echo " M=METRICS reporting" echo " V=VERIFY deployment" echo "-t: test 1,5,10 thousands msgs/sec. Default=$TESTTYPE" - echo "-p: platform: aks or hdinsight. Default=$FLINK_PLATFORM" echo "-a: type of job: 'simple-relay' or 'complex-processing'. Default=$FLINK_JOBTYPE" echo "-l: where to create the resources. Default=$LOCATION" exit 1; @@ -42,9 +40,6 @@ while getopts ":d:s:t:l:p:a:" arg; do l) LOCATION=${OPTARG} ;; - p) - FLINK_PLATFORM=${OPTARG} - ;; a) FLINK_JOBTYPE=${OPTARG} ;; @@ -65,12 +60,7 @@ if [ "$TESTTYPE" == "10" ]; then export EVENTHUB_PARTITIONS=8 export FLINK_PARALLELISM=8 export SIMULATOR_INSTANCES=5 - # settings for AKS (-p aks) - export AKS_NODES=4 - export AKS_VM_SIZE=Standard_D4s_v3 - # settings for HDInsight YARN (-p hdinsight) - export HDINSIGHT_HADOOP_WORKERS=3 - export HDINSIGHT_HADOOP_WORKER_SIZE=Standard_D3_V2 + export HDINSIGHT_AKS_WORKER_SIZE=Standard_D8ds_v5 fi # 5000 messages/sec @@ -79,26 +69,16 @@ if [ "$TESTTYPE" == "5" ]; then export EVENTHUB_PARTITIONS=4 export FLINK_PARALLELISM=4 export SIMULATOR_INSTANCES=3 - # settings for AKS (-p aks) - export AKS_NODES=5 - export AKS_VM_SIZE=Standard_D2s_v3 - # settings for HDInsight YARN (-p hdinsight) - export HDINSIGHT_HADOOP_WORKERS=3 - export HDINSIGHT_HADOOP_WORKER_SIZE=Standard_D3_V2 + export HDINSIGHT_AKS_WORKER_SIZE=Standard_D8ds_v5 fi # 1000 messages/sec if [ "$TESTTYPE" == "1" ]; then export EVENTHUB_CAPACITY=2 export EVENTHUB_PARTITIONS=1 - export FLINK_PARALLELISM=1 + export FLINK_PARALLELISM=3 export SIMULATOR_INSTANCES=1 - # settings for AKS (-p aks) - export AKS_NODES=3 - export AKS_VM_SIZE=Standard_D2s_v3 - # settings for HDInsight YARN (-p hdinsight) - export HDINSIGHT_HADOOP_WORKERS=3 - export HDINSIGHT_HADOOP_WORKER_SIZE=Standard_D3_V2 + export HDINSIGHT_AKS_WORKER_SIZE=Standard_D8ds_v5 fi # last checks and variables setup @@ -116,9 +96,8 @@ echo "Checking pre-requisites..." source ../assert/has-local-az.sh source ../assert/has-local-jq.sh -source ../assert/has-local-helm.sh -source ../assert/has-local-kubectl.sh source ../assert/has-local-mvn.sh +source ../assert/has-local-unzip.sh echo echo "Streaming at Scale with Flink" @@ -132,16 +111,9 @@ echo "Configuration: " echo ". Resource Group => $RESOURCE_GROUP" echo ". Region => $LOCATION" echo ". EventHubs => TU: $EVENTHUB_CAPACITY, Partitions: $EVENTHUB_PARTITIONS" -if [ "$FLINK_PLATFORM" == "hdinsight" ]; then - echo ". HDInsight YARN => VM: $HDINSIGHT_HADOOP_WORKER_SIZE, Workers: $HDINSIGHT_HADOOP_WORKERS" -else - echo ". AKS => VM: $AKS_VM_SIZE, Workers: $AKS_NODES" -fi -echo ". Flink => AKS nodes: $AKS_NODES x $AKS_VM_SIZE, Parallelism: $FLINK_PARALLELISM" +echo ". HDInsight on AKS => VM: $HDINSIGHT_AKS_WORKER_SIZE" +echo ". Flink => Parallelism: $FLINK_PARALLELISM" echo ". Simulators => $SIMULATOR_INSTANCES" -if [[ -n ${AD_SP_APP_ID:-} && -n ${AD_SP_SECRET:-} ]]; then - echo ". Service Principal => $AD_SP_APP_ID" -fi echo echo "Deployment started..." @@ -179,14 +151,9 @@ echo echo "***** [P] Setting up PROCESSING" - export APPINSIGHTS_NAME=$PREFIX"appmon" - # Creating multiple HDInsight clusters in the same Virtual Network requires each cluster to have unique first six characters. - export HDINSIGHT_YARN_NAME="yarn"$PREFIX"hdi" - export HDINSIGHT_PASSWORD="Strong_Passw0rd!" - export AKS_CLUSTER=$PREFIX"aks" - export SERVICE_PRINCIPAL_KV_NAME=$AKS_CLUSTER - export SERVICE_PRINCIPAL_KEYVAULT=$PREFIX"spkv" - export ACR_NAME=$PREFIX"acr" + export HDINSIGHT_AKS_NAME=$PREFIX"hdi" + export HDINSIGHT_CLUSTER_NAME=$PREFIX"flinkcluster" + export HDINSIGHT_AKS_RESOURCE_PREFIX=$PREFIX source ../components/azure-monitor/generate-workspace-name.sh @@ -195,7 +162,7 @@ echo "***** [P] Setting up PROCESSING" source ../components/apache-flink/build-flink-jobs.sh source ../components/azure-event-hubs/get-eventhubs-kafka-brokers-in-listen.sh source ../components/azure-event-hubs/get-eventhubs-kafka-brokers-out-send.sh - source ../components/apache-flink/$FLINK_PLATFORM/run-flink.sh + source ../components/apache-flink/hdinsight-aks/run-flink.sh fi echo