From 16e8b1104626f3d0642a4e366f4c4c1f07d0fa57 Mon Sep 17 00:00:00 2001 From: Tristan Baumbusch <tristan.baumbusch@essquare.de> Date: Tue, 31 Jan 2023 15:18:42 +0100 Subject: [PATCH] update java example for managedidentity using azure-identity library and latest dependency versions --- .../oauth/java/managedidentity/README.md | 16 +-- .../java/managedidentity/consumer/.classpath | 31 ------ .../java/managedidentity/consumer/.project | 23 ---- .../java/managedidentity/consumer/pom.xml | 47 ++++---- .../CustomAuthenticateCallbackHandler.java | 74 ------------- .../src/main/java/TestConsumerThread.java | 72 ------------ .../AzureAuthenticateCallbackHandler.java | 103 ++++++++++++++++++ .../examples/OAuthBearerTokenImpl.java} | 27 +++-- .../microsoft/examples}/TestConsumer.java | 6 +- .../examples/TestConsumerThread.java | 62 +++++++++++ .../src/main/resources/consumer.config | 2 +- .../java/managedidentity/producer/.classpath | 31 ------ .../java/managedidentity/producer/.project | 23 ---- .../java/managedidentity/producer/pom.xml | 47 ++++---- .../CustomAuthenticateCallbackHandler.java | 74 ------------- .../AzureAuthenticateCallbackHandler.java | 103 ++++++++++++++++++ .../examples/OAuthBearerTokenImpl.java} | 26 +++-- .../microsoft/examples}/TestDataReporter.java | 31 +++--- .../microsoft/examples}/TestProducer.java | 28 ++--- .../src/main/resources/producer.config | 2 +- 20 files changed, 391 insertions(+), 437 deletions(-) delete mode 100644 tutorials/oauth/java/managedidentity/consumer/.classpath delete mode 100644 tutorials/oauth/java/managedidentity/consumer/.project delete mode 100644 tutorials/oauth/java/managedidentity/consumer/src/main/java/CustomAuthenticateCallbackHandler.java delete mode 100644 tutorials/oauth/java/managedidentity/consumer/src/main/java/TestConsumerThread.java create mode 100644 tutorials/oauth/java/managedidentity/consumer/src/main/java/de/microsoft/examples/AzureAuthenticateCallbackHandler.java rename tutorials/oauth/java/managedidentity/{producer/src/main/java/OAuthBearerTokenImp.java => consumer/src/main/java/de/microsoft/examples/OAuthBearerTokenImpl.java} (60%) rename tutorials/oauth/java/managedidentity/consumer/src/main/java/{ => de/microsoft/examples}/TestConsumer.java (79%) create mode 100644 tutorials/oauth/java/managedidentity/consumer/src/main/java/de/microsoft/examples/TestConsumerThread.java delete mode 100644 tutorials/oauth/java/managedidentity/producer/.classpath delete mode 100644 tutorials/oauth/java/managedidentity/producer/.project delete mode 100644 tutorials/oauth/java/managedidentity/producer/src/main/java/CustomAuthenticateCallbackHandler.java create mode 100644 tutorials/oauth/java/managedidentity/producer/src/main/java/de/microsoft/examples/AzureAuthenticateCallbackHandler.java rename tutorials/oauth/java/managedidentity/{consumer/src/main/java/OAuthBearerTokenImp.java => producer/src/main/java/de/microsoft/examples/OAuthBearerTokenImpl.java} (60%) rename tutorials/oauth/java/managedidentity/producer/src/main/java/{ => de/microsoft/examples}/TestDataReporter.java (54%) rename tutorials/oauth/java/managedidentity/producer/src/main/java/{ => de/microsoft/examples}/TestProducer.java (55%) diff --git a/tutorials/oauth/java/managedidentity/README.md b/tutorials/oauth/java/managedidentity/README.md index 6b48fcf..f87d72b 100644 --- a/tutorials/oauth/java/managedidentity/README.md +++ b/tutorials/oauth/java/managedidentity/README.md @@ -8,7 +8,7 @@ If you don't have an Azure subscription, create a [free account](https://azure.m In addition: -* [Java Development Kit (JDK) 1.7+](http://www.oracle.com/technetwork/java/javase/downloads/index.html) +* [Java Development Kit (JDK) 17+](http://www.oracle.com/technetwork/java/javase/downloads/index.html) * On Ubuntu, run `apt-get install default-jdk` to install the JDK. * Be sure to set the JAVA_HOME environment variable to point to the folder where the JDK is installed. * [Download](http://maven.apache.org/download.cgi) and [install](http://maven.apache.org/install.html) a Maven binary archive @@ -72,7 +72,7 @@ Kafka clients need to be configured in a way that they can authenticate with Azu `sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;` * Set login callback handler. This is the authentication handler which is responsible to complete oauth flow and return an access token. - `sasl.login.callback.handler.class=CustomAuthenticateCallbackHandler;` + `sasl.login.callback.handler.class=de.microsoft.examples.AzureAuthenticateCallbackHandler;` ## Producer @@ -89,18 +89,18 @@ bootstrap.servers=mynamespace.servicebus.windows.net:9093 # REPLACE security.protocol=SASL_SSL sasl.mechanism=OAUTHBEARER sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required; -sasl.login.callback.handler.class=CustomAuthenticateCallbackHandler; +sasl.login.callback.handler.class=de.microsoft.examples.AzureAuthenticateCallbackHandler; ``` ### Run producer from command line -This sample is configured to send messages to topic `test`, if you would like to change the topic, change the TOPIC constant in `producer/src/main/java/TestProducer.java`. +This sample is configured to send messages to topic `test`, if you would like to change the topic, change the TOPIC constant in `producer/src/main/java/de/microsoft/examples/TestProducer.java`. To run the producer from the command line, generate the JAR and then run from within Maven (alternatively, generate the JAR using Maven, then run in Java by adding the necessary Kafka JAR(s) to the classpath): ```bash mvn clean package -mvn exec:java -Dexec.mainClass="TestProducer" +mvn exec:java -Dexec.mainClass="de.microsoft.examples.TestProducer" ``` The producer will now begin sending events to the Kafka-enabled Event Hub at topic `test` (or whatever topic you chose) and printing the events to stdout. @@ -122,18 +122,18 @@ request.timeout.ms=60000 security.protocol=SASL_SSL sasl.mechanism=OAUTHBEARER sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required; -sasl.login.callback.handler.class=CustomAuthenticateCallbackHandler; +sasl.login.callback.handler.class=de.microsoft.examples.AzureAuthenticateCallbackHandler; ``` ### Run consumer from command line -This sample is configured to receive messages from topic `test`, if you would like to change the topic, change the TOPIC constant in `consumer/src/main/java/TestConsumer.java`. +This sample is configured to receive messages from topic `test`, if you would like to change the topic, change the TOPIC constant in `consumer/src/main/java/de/microsoft/examples/TestConsumer.java`. To run the producer from the command line, generate the JAR and then run from within Maven (alternatively, generate the JAR using Maven, then run in Java by adding the necessary Kafka JAR(s) to the classpath): ```bash mvn clean package -mvn exec:java -Dexec.mainClass="TestConsumer" +mvn exec:java -Dexec.mainClass="de.microsoft.examples.TestConsumer" ``` If the Kafka-enabled Event Hub has incoming events (for instance, if your example producer is also running), then the consumer should now begin receiving events from topic `test` (or whatever topic you chose). diff --git a/tutorials/oauth/java/managedidentity/consumer/.classpath b/tutorials/oauth/java/managedidentity/consumer/.classpath deleted file mode 100644 index 698778f..0000000 --- a/tutorials/oauth/java/managedidentity/consumer/.classpath +++ /dev/null @@ -1,31 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<classpath> - <classpathentry kind="src" output="target/classes" path="src/main/java"> - <attributes> - <attribute name="optional" value="true"/> - <attribute name="maven.pomderived" value="true"/> - </attributes> - </classpathentry> - <classpathentry excluding="**" kind="src" output="target/classes" path="src/main/resources"> - <attributes> - <attribute name="maven.pomderived" value="true"/> - </attributes> - </classpathentry> - <classpathentry kind="src" output="target/test-classes" path="src/test/java"> - <attributes> - <attribute name="optional" value="true"/> - <attribute name="maven.pomderived" value="true"/> - </attributes> - </classpathentry> - <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.7"> - <attributes> - <attribute name="maven.pomderived" value="true"/> - </attributes> - </classpathentry> - <classpathentry kind="con" path="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER"> - <attributes> - <attribute name="maven.pomderived" value="true"/> - </attributes> - </classpathentry> - <classpathentry kind="output" path="target/classes"/> -</classpath> diff --git a/tutorials/oauth/java/managedidentity/consumer/.project b/tutorials/oauth/java/managedidentity/consumer/.project deleted file mode 100644 index 00d7e80..0000000 --- a/tutorials/oauth/java/managedidentity/consumer/.project +++ /dev/null @@ -1,23 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<projectDescription> - <name>event-hubs-kafka-java-consumer</name> - <comment></comment> - <projects> - </projects> - <buildSpec> - <buildCommand> - <name>org.eclipse.jdt.core.javabuilder</name> - <arguments> - </arguments> - </buildCommand> - <buildCommand> - <name>org.eclipse.m2e.core.maven2Builder</name> - <arguments> - </arguments> - </buildCommand> - </buildSpec> - <natures> - <nature>org.eclipse.jdt.core.javanature</nature> - <nature>org.eclipse.m2e.core.maven2Nature</nature> - </natures> -</projectDescription> diff --git a/tutorials/oauth/java/managedidentity/consumer/pom.xml b/tutorials/oauth/java/managedidentity/consumer/pom.xml index b575cd4..909ea9e 100644 --- a/tutorials/oauth/java/managedidentity/consumer/pom.xml +++ b/tutorials/oauth/java/managedidentity/consumer/pom.xml @@ -1,6 +1,7 @@ <?xml version="1.0" encoding="UTF-8"?> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> @@ -12,18 +13,31 @@ <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> + <java.version>17</java.version> + <maven.compiler.source>${java.version}</maven.compiler.source> + <maven.compiler.target>${java.version}</maven.compiler.target> </properties> <dependencies> - <dependency> - <groupId>org.apache.kafka</groupId> - <artifactId>kafka_2.12</artifactId> - <version>2.3.1</version> - </dependency> <dependency> - <groupId>com.microsoft.azure</groupId> - <artifactId>azure-client-authentication</artifactId> - <version>1.6.15</version> - <scope>compile</scope> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.13</artifactId> + <version>3.3.2</version> + <exclusions> + <exclusion> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + <version>3.6.4</version> + </dependency> + <dependency> + <groupId>com.azure</groupId> + <artifactId>azure-identity</artifactId> + <version>1.7.3</version> </dependency> </dependencies> <build> @@ -32,19 +46,12 @@ <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> - <version>3.6.1</version> - <configuration> - <source>1.7</source> - <target>1.7</target> - </configuration> + <version>3.10.1</version> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-resources-plugin</artifactId> - <version>3.0.2</version> - <configuration> - <encoding>UTF-8</encoding> - </configuration> + <version>3.3.0</version> </plugin> </plugins> </build> diff --git a/tutorials/oauth/java/managedidentity/consumer/src/main/java/CustomAuthenticateCallbackHandler.java b/tutorials/oauth/java/managedidentity/consumer/src/main/java/CustomAuthenticateCallbackHandler.java deleted file mode 100644 index 668d002..0000000 --- a/tutorials/oauth/java/managedidentity/consumer/src/main/java/CustomAuthenticateCallbackHandler.java +++ /dev/null @@ -1,74 +0,0 @@ -//Copyright (c) Microsoft Corporation. All rights reserved. -//Licensed under the MIT License. -import java.io.IOException; -import java.net.URI; -import java.text.ParseException; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeoutException; - -import javax.security.auth.callback.Callback; -import javax.security.auth.callback.UnsupportedCallbackException; -import javax.security.auth.login.AppConfigurationEntry; - -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler; -import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken; -import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback; - -import com.microsoft.azure.credentials.MSICredentials; -import com.nimbusds.jwt.JWT; -import com.nimbusds.jwt.JWTClaimsSet; -import com.nimbusds.jwt.JWTParser; - -public class CustomAuthenticateCallbackHandler implements AuthenticateCallbackHandler { - - final static ScheduledExecutorService EXECUTOR_SERVICE = Executors.newScheduledThreadPool(1); - final static MSICredentials CREDENTIALS = new MSICredentials(); - // Use AppServiceMSICredentials instead for App Service deployment. - // final static AppServiceMSICredentials CREDENTIALS = new AppServiceMSICredentials(AzureEnvironment.AZURE); - - private String sbUri; - - @Override - public void configure(Map<String, ?> configs, String mechanism, List<AppConfigurationEntry> jaasConfigEntries) { - String bootstrapServer = Arrays.asList(configs.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)).get(0).toString(); - bootstrapServer = bootstrapServer.replaceAll("\\[|\\]", ""); - URI uri = URI.create("https://" + bootstrapServer); - this.sbUri = uri.getScheme() + "://" + uri.getHost(); - } - - public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException { - for (Callback callback: callbacks) { - if (callback instanceof OAuthBearerTokenCallback) { - try { - OAuthBearerToken token = getOAuthBearerToken(); - OAuthBearerTokenCallback oauthCallback = (OAuthBearerTokenCallback) callback; - oauthCallback.token(token); - } catch (InterruptedException | ExecutionException | TimeoutException | ParseException e) { - e.printStackTrace(); - } - } else { - throw new UnsupportedCallbackException(callback); - } - } - } - - OAuthBearerToken getOAuthBearerToken() throws InterruptedException, ExecutionException, TimeoutException, IOException, ParseException - { - String accesToken = CREDENTIALS.getToken(sbUri); - JWT jwt = JWTParser.parse(accesToken); - JWTClaimsSet claims = jwt.getJWTClaimsSet(); - - return new OAuthBearerTokenImp(accesToken, claims.getExpirationTime()); - } - - public void close() throws KafkaException { - // NOOP - } -} diff --git a/tutorials/oauth/java/managedidentity/consumer/src/main/java/TestConsumerThread.java b/tutorials/oauth/java/managedidentity/consumer/src/main/java/TestConsumerThread.java deleted file mode 100644 index 5ee68f0..0000000 --- a/tutorials/oauth/java/managedidentity/consumer/src/main/java/TestConsumerThread.java +++ /dev/null @@ -1,72 +0,0 @@ -//Copyright (c) Microsoft Corporation. All rights reserved. -//Licensed under the MIT License. -import org.apache.kafka.clients.consumer.*; -import org.apache.kafka.common.serialization.LongDeserializer; -import org.apache.kafka.common.serialization.StringDeserializer; -import java.io.FileReader; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.Collections; -import java.util.Properties; - -public class TestConsumerThread implements Runnable { - - private final String TOPIC; - - //Each consumer needs a unique client ID per thread - private static int id = 0; - - public TestConsumerThread(final String TOPIC){ - this.TOPIC = TOPIC; - } - - public void run (){ - final Consumer<Long, String> consumer = createConsumer(); - System.out.println("Polling"); - - try { - while (true) { - final ConsumerRecords<Long, String> consumerRecords = consumer.poll(1000); - for(ConsumerRecord<Long, String> cr : consumerRecords) { - System.out.printf("Consumer Record:(%d, %s, %d, %d)\n", cr.key(), cr.value(), cr.partition(), cr.offset()); - } - consumer.commitAsync(); - } - } catch (CommitFailedException e) { - System.out.println("CommitFailedException: " + e); - } finally { - consumer.close(); - } - } - - private Consumer<Long, String> createConsumer() { - try { - final Properties properties = new Properties(); - synchronized (TestConsumerThread.class) { - properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "KafkaExampleConsumer#" + id); - id++; - } - properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName()); - properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - - //Get remaining properties from config file - properties.load(new FileReader("src/main/resources/consumer.config")); - - // Create the consumer using properties. - final Consumer<Long, String> consumer = new KafkaConsumer<>(properties); - - // Subscribe to the topic. - consumer.subscribe(Collections.singletonList(TOPIC)); - return consumer; - - } catch (FileNotFoundException e){ - System.out.println("FileNoteFoundException: " + e); - System.exit(1); - return null; //unreachable - } catch (IOException e){ - System.out.println("IOException: " + e); - System.exit(1); - return null; //unreachable - } - } -} diff --git a/tutorials/oauth/java/managedidentity/consumer/src/main/java/de/microsoft/examples/AzureAuthenticateCallbackHandler.java b/tutorials/oauth/java/managedidentity/consumer/src/main/java/de/microsoft/examples/AzureAuthenticateCallbackHandler.java new file mode 100644 index 0000000..8597c0f --- /dev/null +++ b/tutorials/oauth/java/managedidentity/consumer/src/main/java/de/microsoft/examples/AzureAuthenticateCallbackHandler.java @@ -0,0 +1,103 @@ +//Copyright (c) Microsoft Corporation. All rights reserved. +//Licensed under the MIT License. +package de.microsoft.examples; + +import com.azure.core.credential.AccessToken; +import com.azure.core.credential.TokenCredential; +import com.azure.core.credential.TokenRequestContext; +import com.azure.identity.DefaultAzureCredentialBuilder; +import com.nimbusds.jwt.JWT; +import com.nimbusds.jwt.JWTParser; +import java.text.ParseException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.UnsupportedCallbackException; +import javax.security.auth.login.AppConfigurationEntry; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback; + +public class AzureAuthenticateCallbackHandler implements AuthenticateCallbackHandler { + + private String requestedScope; + + @Override + public void handle(Callback[] callbacks) throws UnsupportedCallbackException { + for (Callback callback: callbacks) { + if (!(callback instanceof OAuthBearerTokenCallback)) { + throw new UnsupportedCallbackException(callback); + } + + try { + OAuthBearerToken token = getOAuthBearerToken(); + OAuthBearerTokenCallback oauthCallback = (OAuthBearerTokenCallback) callback; + oauthCallback.token(token); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (ExecutionException | TimeoutException | ParseException e) { + throw new RuntimeException("Failed to get token from azure.", e); + } + } + } + + private OAuthBearerToken getOAuthBearerToken() throws InterruptedException, ExecutionException, + TimeoutException, ParseException { + final TokenCredential defaultCredential = new DefaultAzureCredentialBuilder() + .build(); + + final TokenRequestContext tokenRequestContext = new TokenRequestContext() + .addScopes(requestedScope); + + final AccessToken accessToken = defaultCredential + .getTokenSync(tokenRequestContext); + return mapToOAuthBearerToken(accessToken); + } + + private static OAuthBearerTokenImpl mapToOAuthBearerToken(AccessToken value) { + try{ + final JWT jwt = JWTParser.parse(value.getToken()); + return new OAuthBearerTokenImpl(value.getToken(), jwt.getJWTClaimsSet().getExpirationTime()); + } catch(ParseException e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() throws KafkaException { + // NOOP + } + + @Override + public void configure(Map<String, ?> configs, String saslMechanism, List<AppConfigurationEntry> jaasConfigEntries) { + String bootstrapServer = extractFirstBootstrapServer(configs); + + final String hostname = bootstrapServer.split(":")[0]; + this.requestedScope = "https://" + hostname + "/.default"; + } + + private static String extractFirstBootstrapServer(Map<String, ?> configs) { + if (!configs.containsKey(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) { + throw new IllegalStateException("Missing bootstrap.servers in kafka configuration."); + } + + if (!(configs.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) instanceof List<?> bootstrapServersList)) { + throw new IllegalStateException( + "bootstrap.servers in kafka configuration is not a String value"); + } + + if (bootstrapServersList.size() > 1) { + throw new IllegalStateException("More than 1 bootstrap.servers not supported in this example."); + } + + if (!(bootstrapServersList.get(0) instanceof String bootstrapServer)) { + throw new IllegalStateException("bootstrap.servers has to be a String."); + } + return bootstrapServer; + } + +} diff --git a/tutorials/oauth/java/managedidentity/producer/src/main/java/OAuthBearerTokenImp.java b/tutorials/oauth/java/managedidentity/consumer/src/main/java/de/microsoft/examples/OAuthBearerTokenImpl.java similarity index 60% rename from tutorials/oauth/java/managedidentity/producer/src/main/java/OAuthBearerTokenImp.java rename to tutorials/oauth/java/managedidentity/consumer/src/main/java/de/microsoft/examples/OAuthBearerTokenImpl.java index 9748b21..8142a7d 100644 --- a/tutorials/oauth/java/managedidentity/producer/src/main/java/OAuthBearerTokenImp.java +++ b/tutorials/oauth/java/managedidentity/consumer/src/main/java/de/microsoft/examples/OAuthBearerTokenImpl.java @@ -1,33 +1,36 @@ //Copyright (c) Microsoft Corporation. All rights reserved. //Licensed under the MIT License. +package de.microsoft.examples; + import java.util.Date; +import java.util.HashSet; import java.util.Set; - import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken; -public class OAuthBearerTokenImp implements OAuthBearerToken -{ - String token; - long lifetimeMs; - - public OAuthBearerTokenImp(final String token, Date expiresOn) { +public class OAuthBearerTokenImpl implements OAuthBearerToken { + + private final String token; + private final long lifetimeMs; + private final Set<String> scopes = new HashSet<>(); + + public OAuthBearerTokenImpl(final String token, final Date expiresOn) { this.token = token; this.lifetimeMs = expiresOn.getTime(); } - + @Override public String value() { - return this.token; + return token; } @Override public Set<String> scope() { - return null; + return scopes; } @Override public long lifetimeMs() { - return this.lifetimeMs; + return lifetimeMs; } @Override @@ -39,4 +42,4 @@ public String principalName() { public Long startTimeMs() { return null; } -} \ No newline at end of file +} diff --git a/tutorials/oauth/java/managedidentity/consumer/src/main/java/TestConsumer.java b/tutorials/oauth/java/managedidentity/consumer/src/main/java/de/microsoft/examples/TestConsumer.java similarity index 79% rename from tutorials/oauth/java/managedidentity/consumer/src/main/java/TestConsumer.java rename to tutorials/oauth/java/managedidentity/consumer/src/main/java/de/microsoft/examples/TestConsumer.java index 5d1d5ca..0c8c0f7 100644 --- a/tutorials/oauth/java/managedidentity/consumer/src/main/java/TestConsumer.java +++ b/tutorials/oauth/java/managedidentity/consumer/src/main/java/de/microsoft/examples/TestConsumer.java @@ -1,4 +1,4 @@ -//Copyright (c) Microsoft Corporation. All rights reserved. +package de.microsoft.examples;//Copyright (c) Microsoft Corporation. All rights reserved. //Licensed under the MIT License. import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -6,10 +6,10 @@ public class TestConsumer { //Change constant to send messages to the desired topic private final static String TOPIC = "test"; - + private final static int NUM_THREADS = 1; - public static void main(String... args) throws Exception { + public static void main(String... args) { final ExecutorService executorService = Executors.newFixedThreadPool(NUM_THREADS); diff --git a/tutorials/oauth/java/managedidentity/consumer/src/main/java/de/microsoft/examples/TestConsumerThread.java b/tutorials/oauth/java/managedidentity/consumer/src/main/java/de/microsoft/examples/TestConsumerThread.java new file mode 100644 index 0000000..9a7b9a3 --- /dev/null +++ b/tutorials/oauth/java/managedidentity/consumer/src/main/java/de/microsoft/examples/TestConsumerThread.java @@ -0,0 +1,62 @@ +//Copyright (c) Microsoft Corporation. All rights reserved. +//Licensed under the MIT License. +package de.microsoft.examples; + +import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import java.io.IOException; +import java.util.Collections; +import java.util.Properties; + +public class TestConsumerThread implements Runnable { + + private final String topic; + + //Each consumer needs a unique client ID per thread + private static int id = 0; + + public TestConsumerThread(final String topic){ + this.topic = topic; + } + + public void run (){ + try (Consumer<Long, String> consumer = createConsumer()) { + System.out.println("Polling"); + + while (true) { + final ConsumerRecords<Long, String> consumerRecords = consumer.poll(1000); + for (ConsumerRecord<Long, String> cr : consumerRecords) { + System.out.printf("Consumer Record:(%d, %s, %d, %d)\n", cr.key(), cr.value(), + cr.partition(), cr.offset()); + } + consumer.commitAsync(); + } + } catch (CommitFailedException e) { + System.out.println("CommitFailedException: " + e); + } catch (IOException e) { + System.out.println("IOException: " + e); + System.exit(1); + } + } + + private Consumer<Long, String> createConsumer() throws IOException { + final Properties properties = new Properties(); + synchronized (TestConsumerThread.class) { + properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "KafkaExampleConsumer#" + id); + id++; + } + properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName()); + properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + + //Get remaining properties from config file + properties.load(TestConsumerThread.class.getResourceAsStream("/consumer.config")); + + // Create the consumer using properties. + final Consumer<Long, String> consumer = new KafkaConsumer<>(properties); + + // Subscribe to the topic. + consumer.subscribe(Collections.singletonList(topic)); + return consumer; + } +} diff --git a/tutorials/oauth/java/managedidentity/consumer/src/main/resources/consumer.config b/tutorials/oauth/java/managedidentity/consumer/src/main/resources/consumer.config index 5cc0ab3..a7d0d54 100644 --- a/tutorials/oauth/java/managedidentity/consumer/src/main/resources/consumer.config +++ b/tutorials/oauth/java/managedidentity/consumer/src/main/resources/consumer.config @@ -4,4 +4,4 @@ request.timeout.ms=60000 security.protocol=SASL_SSL sasl.mechanism=OAUTHBEARER sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required; -sasl.login.callback.handler.class=CustomAuthenticateCallbackHandler +sasl.login.callback.handler.class=de.microsoft.examples.AzureAuthenticateCallbackHandler diff --git a/tutorials/oauth/java/managedidentity/producer/.classpath b/tutorials/oauth/java/managedidentity/producer/.classpath deleted file mode 100644 index 698778f..0000000 --- a/tutorials/oauth/java/managedidentity/producer/.classpath +++ /dev/null @@ -1,31 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<classpath> - <classpathentry kind="src" output="target/classes" path="src/main/java"> - <attributes> - <attribute name="optional" value="true"/> - <attribute name="maven.pomderived" value="true"/> - </attributes> - </classpathentry> - <classpathentry excluding="**" kind="src" output="target/classes" path="src/main/resources"> - <attributes> - <attribute name="maven.pomderived" value="true"/> - </attributes> - </classpathentry> - <classpathentry kind="src" output="target/test-classes" path="src/test/java"> - <attributes> - <attribute name="optional" value="true"/> - <attribute name="maven.pomderived" value="true"/> - </attributes> - </classpathentry> - <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.7"> - <attributes> - <attribute name="maven.pomderived" value="true"/> - </attributes> - </classpathentry> - <classpathentry kind="con" path="org.eclipse.m2e.MAVEN2_CLASSPATH_CONTAINER"> - <attributes> - <attribute name="maven.pomderived" value="true"/> - </attributes> - </classpathentry> - <classpathentry kind="output" path="target/classes"/> -</classpath> diff --git a/tutorials/oauth/java/managedidentity/producer/.project b/tutorials/oauth/java/managedidentity/producer/.project deleted file mode 100644 index dae3335..0000000 --- a/tutorials/oauth/java/managedidentity/producer/.project +++ /dev/null @@ -1,23 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<projectDescription> - <name>event-hubs-kafka-java-producer</name> - <comment></comment> - <projects> - </projects> - <buildSpec> - <buildCommand> - <name>org.eclipse.jdt.core.javabuilder</name> - <arguments> - </arguments> - </buildCommand> - <buildCommand> - <name>org.eclipse.m2e.core.maven2Builder</name> - <arguments> - </arguments> - </buildCommand> - </buildSpec> - <natures> - <nature>org.eclipse.jdt.core.javanature</nature> - <nature>org.eclipse.m2e.core.maven2Nature</nature> - </natures> -</projectDescription> diff --git a/tutorials/oauth/java/managedidentity/producer/pom.xml b/tutorials/oauth/java/managedidentity/producer/pom.xml index 96d1617..943cc8f 100644 --- a/tutorials/oauth/java/managedidentity/producer/pom.xml +++ b/tutorials/oauth/java/managedidentity/producer/pom.xml @@ -1,6 +1,7 @@ <?xml version="1.0" encoding="UTF-8"?> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> @@ -12,18 +13,31 @@ <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> + <java.version>17</java.version> + <maven.compiler.source>${java.version}</maven.compiler.source> + <maven.compiler.target>${java.version}</maven.compiler.target> </properties> <dependencies> - <dependency> - <groupId>org.apache.kafka</groupId> - <artifactId>kafka_2.12</artifactId> - <version>2.3.1</version> - </dependency> <dependency> - <groupId>com.microsoft.azure</groupId> - <artifactId>azure-client-authentication</artifactId> - <version>1.6.15</version> - <scope>compile</scope> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.13</artifactId> + <version>3.3.2</version> + <exclusions> + <exclusion> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + <version>3.6.4</version> + </dependency> + <dependency> + <groupId>com.azure</groupId> + <artifactId>azure-identity</artifactId> + <version>1.7.3</version> </dependency> </dependencies> <build> @@ -32,19 +46,12 @@ <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> - <version>3.6.1</version> - <configuration> - <source>1.7</source> - <target>1.7</target> - </configuration> + <version>3.10.1</version> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-resources-plugin</artifactId> - <version>3.0.2</version> - <configuration> - <encoding>UTF-8</encoding> - </configuration> + <version>3.3.0</version> </plugin> </plugins> </build> diff --git a/tutorials/oauth/java/managedidentity/producer/src/main/java/CustomAuthenticateCallbackHandler.java b/tutorials/oauth/java/managedidentity/producer/src/main/java/CustomAuthenticateCallbackHandler.java deleted file mode 100644 index 668d002..0000000 --- a/tutorials/oauth/java/managedidentity/producer/src/main/java/CustomAuthenticateCallbackHandler.java +++ /dev/null @@ -1,74 +0,0 @@ -//Copyright (c) Microsoft Corporation. All rights reserved. -//Licensed under the MIT License. -import java.io.IOException; -import java.net.URI; -import java.text.ParseException; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeoutException; - -import javax.security.auth.callback.Callback; -import javax.security.auth.callback.UnsupportedCallbackException; -import javax.security.auth.login.AppConfigurationEntry; - -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler; -import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken; -import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback; - -import com.microsoft.azure.credentials.MSICredentials; -import com.nimbusds.jwt.JWT; -import com.nimbusds.jwt.JWTClaimsSet; -import com.nimbusds.jwt.JWTParser; - -public class CustomAuthenticateCallbackHandler implements AuthenticateCallbackHandler { - - final static ScheduledExecutorService EXECUTOR_SERVICE = Executors.newScheduledThreadPool(1); - final static MSICredentials CREDENTIALS = new MSICredentials(); - // Use AppServiceMSICredentials instead for App Service deployment. - // final static AppServiceMSICredentials CREDENTIALS = new AppServiceMSICredentials(AzureEnvironment.AZURE); - - private String sbUri; - - @Override - public void configure(Map<String, ?> configs, String mechanism, List<AppConfigurationEntry> jaasConfigEntries) { - String bootstrapServer = Arrays.asList(configs.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)).get(0).toString(); - bootstrapServer = bootstrapServer.replaceAll("\\[|\\]", ""); - URI uri = URI.create("https://" + bootstrapServer); - this.sbUri = uri.getScheme() + "://" + uri.getHost(); - } - - public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException { - for (Callback callback: callbacks) { - if (callback instanceof OAuthBearerTokenCallback) { - try { - OAuthBearerToken token = getOAuthBearerToken(); - OAuthBearerTokenCallback oauthCallback = (OAuthBearerTokenCallback) callback; - oauthCallback.token(token); - } catch (InterruptedException | ExecutionException | TimeoutException | ParseException e) { - e.printStackTrace(); - } - } else { - throw new UnsupportedCallbackException(callback); - } - } - } - - OAuthBearerToken getOAuthBearerToken() throws InterruptedException, ExecutionException, TimeoutException, IOException, ParseException - { - String accesToken = CREDENTIALS.getToken(sbUri); - JWT jwt = JWTParser.parse(accesToken); - JWTClaimsSet claims = jwt.getJWTClaimsSet(); - - return new OAuthBearerTokenImp(accesToken, claims.getExpirationTime()); - } - - public void close() throws KafkaException { - // NOOP - } -} diff --git a/tutorials/oauth/java/managedidentity/producer/src/main/java/de/microsoft/examples/AzureAuthenticateCallbackHandler.java b/tutorials/oauth/java/managedidentity/producer/src/main/java/de/microsoft/examples/AzureAuthenticateCallbackHandler.java new file mode 100644 index 0000000..8597c0f --- /dev/null +++ b/tutorials/oauth/java/managedidentity/producer/src/main/java/de/microsoft/examples/AzureAuthenticateCallbackHandler.java @@ -0,0 +1,103 @@ +//Copyright (c) Microsoft Corporation. All rights reserved. +//Licensed under the MIT License. +package de.microsoft.examples; + +import com.azure.core.credential.AccessToken; +import com.azure.core.credential.TokenCredential; +import com.azure.core.credential.TokenRequestContext; +import com.azure.identity.DefaultAzureCredentialBuilder; +import com.nimbusds.jwt.JWT; +import com.nimbusds.jwt.JWTParser; +import java.text.ParseException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.UnsupportedCallbackException; +import javax.security.auth.login.AppConfigurationEntry; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback; + +public class AzureAuthenticateCallbackHandler implements AuthenticateCallbackHandler { + + private String requestedScope; + + @Override + public void handle(Callback[] callbacks) throws UnsupportedCallbackException { + for (Callback callback: callbacks) { + if (!(callback instanceof OAuthBearerTokenCallback)) { + throw new UnsupportedCallbackException(callback); + } + + try { + OAuthBearerToken token = getOAuthBearerToken(); + OAuthBearerTokenCallback oauthCallback = (OAuthBearerTokenCallback) callback; + oauthCallback.token(token); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (ExecutionException | TimeoutException | ParseException e) { + throw new RuntimeException("Failed to get token from azure.", e); + } + } + } + + private OAuthBearerToken getOAuthBearerToken() throws InterruptedException, ExecutionException, + TimeoutException, ParseException { + final TokenCredential defaultCredential = new DefaultAzureCredentialBuilder() + .build(); + + final TokenRequestContext tokenRequestContext = new TokenRequestContext() + .addScopes(requestedScope); + + final AccessToken accessToken = defaultCredential + .getTokenSync(tokenRequestContext); + return mapToOAuthBearerToken(accessToken); + } + + private static OAuthBearerTokenImpl mapToOAuthBearerToken(AccessToken value) { + try{ + final JWT jwt = JWTParser.parse(value.getToken()); + return new OAuthBearerTokenImpl(value.getToken(), jwt.getJWTClaimsSet().getExpirationTime()); + } catch(ParseException e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() throws KafkaException { + // NOOP + } + + @Override + public void configure(Map<String, ?> configs, String saslMechanism, List<AppConfigurationEntry> jaasConfigEntries) { + String bootstrapServer = extractFirstBootstrapServer(configs); + + final String hostname = bootstrapServer.split(":")[0]; + this.requestedScope = "https://" + hostname + "/.default"; + } + + private static String extractFirstBootstrapServer(Map<String, ?> configs) { + if (!configs.containsKey(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) { + throw new IllegalStateException("Missing bootstrap.servers in kafka configuration."); + } + + if (!(configs.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) instanceof List<?> bootstrapServersList)) { + throw new IllegalStateException( + "bootstrap.servers in kafka configuration is not a String value"); + } + + if (bootstrapServersList.size() > 1) { + throw new IllegalStateException("More than 1 bootstrap.servers not supported in this example."); + } + + if (!(bootstrapServersList.get(0) instanceof String bootstrapServer)) { + throw new IllegalStateException("bootstrap.servers has to be a String."); + } + return bootstrapServer; + } + +} diff --git a/tutorials/oauth/java/managedidentity/consumer/src/main/java/OAuthBearerTokenImp.java b/tutorials/oauth/java/managedidentity/producer/src/main/java/de/microsoft/examples/OAuthBearerTokenImpl.java similarity index 60% rename from tutorials/oauth/java/managedidentity/consumer/src/main/java/OAuthBearerTokenImp.java rename to tutorials/oauth/java/managedidentity/producer/src/main/java/de/microsoft/examples/OAuthBearerTokenImpl.java index 9748b21..29317b2 100644 --- a/tutorials/oauth/java/managedidentity/consumer/src/main/java/OAuthBearerTokenImp.java +++ b/tutorials/oauth/java/managedidentity/producer/src/main/java/de/microsoft/examples/OAuthBearerTokenImpl.java @@ -1,33 +1,37 @@ //Copyright (c) Microsoft Corporation. All rights reserved. //Licensed under the MIT License. +package de.microsoft.examples; + import java.util.Date; +import java.util.HashSet; import java.util.Set; import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken; -public class OAuthBearerTokenImp implements OAuthBearerToken -{ - String token; - long lifetimeMs; - - public OAuthBearerTokenImp(final String token, Date expiresOn) { +public class OAuthBearerTokenImpl implements OAuthBearerToken { + + private final String token; + private final long lifetimeMs; + private final Set<String> scopes = new HashSet<>(); + + public OAuthBearerTokenImpl(final String token, final Date expiresOn) { this.token = token; this.lifetimeMs = expiresOn.getTime(); } - + @Override public String value() { - return this.token; + return token; } @Override public Set<String> scope() { - return null; + return scopes; } @Override public long lifetimeMs() { - return this.lifetimeMs; + return lifetimeMs; } @Override @@ -39,4 +43,4 @@ public String principalName() { public Long startTimeMs() { return null; } -} \ No newline at end of file +} diff --git a/tutorials/oauth/java/managedidentity/producer/src/main/java/TestDataReporter.java b/tutorials/oauth/java/managedidentity/producer/src/main/java/de/microsoft/examples/TestDataReporter.java similarity index 54% rename from tutorials/oauth/java/managedidentity/producer/src/main/java/TestDataReporter.java rename to tutorials/oauth/java/managedidentity/producer/src/main/java/de/microsoft/examples/TestDataReporter.java index 7e69439..3924751 100644 --- a/tutorials/oauth/java/managedidentity/producer/src/main/java/TestDataReporter.java +++ b/tutorials/oauth/java/managedidentity/producer/src/main/java/de/microsoft/examples/TestDataReporter.java @@ -1,39 +1,36 @@ //Copyright (c) Microsoft Corporation. All rights reserved. //Licensed under the MIT License. -import org.apache.kafka.clients.producer.Callback; +package de.microsoft.examples; + import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; -import java.sql.Timestamp; public class TestDataReporter implements Runnable { private static final int NUM_MESSAGES = 100; - private final String TOPIC; - private Producer<Long, String> producer; + private final String topic; + private final Producer<Long, String> producer; - public TestDataReporter(final Producer<Long, String> producer, String TOPIC) { + public TestDataReporter(final Producer<Long, String> producer, String topic) { this.producer = producer; - this.TOPIC = TOPIC; + this.topic = topic; } @Override public void run() { - for(int i = 0; i < NUM_MESSAGES; i++) { + for(int i = 0; i < NUM_MESSAGES; i++) { long time = System.currentTimeMillis(); System.out.println("Test Data #" + i + " from thread #" + Thread.currentThread().getId()); - - final ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(TOPIC, time, "Test Data #" + i); - producer.send(record, new Callback() { - public void onCompletion(RecordMetadata metadata, Exception exception) { - if (exception != null) { - System.out.println(exception); - System.exit(1); - } + + final ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(topic, time, "Test Data #" + i); + producer.send(record, (metadata, exception) -> { + if (exception != null) { + System.out.println(exception); + System.exit(1); } }); } System.out.println("Finished sending " + NUM_MESSAGES + " messages from thread #" + Thread.currentThread().getId() + "!"); } -} \ No newline at end of file +} diff --git a/tutorials/oauth/java/managedidentity/producer/src/main/java/TestProducer.java b/tutorials/oauth/java/managedidentity/producer/src/main/java/de/microsoft/examples/TestProducer.java similarity index 55% rename from tutorials/oauth/java/managedidentity/producer/src/main/java/TestProducer.java rename to tutorials/oauth/java/managedidentity/producer/src/main/java/de/microsoft/examples/TestProducer.java index ca2cec2..7494546 100644 --- a/tutorials/oauth/java/managedidentity/producer/src/main/java/TestProducer.java +++ b/tutorials/oauth/java/managedidentity/producer/src/main/java/de/microsoft/examples/TestProducer.java @@ -1,12 +1,14 @@ //Copyright (c) Microsoft Corporation. All rights reserved. //Licensed under the MIT License. +package de.microsoft.examples; + +import java.io.IOException; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; -import java.io.FileReader; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -14,11 +16,11 @@ public class TestProducer { //Change constant to send messages to the desired topic, for this example we use 'test' private final static String TOPIC = "test"; - + private final static int NUM_THREADS = 1; - public static void main(String... args) throws Exception { + public static void main(String... args) throws IOException { //Create Kafka Producer final Producer<Long, String> producer = createProducer(); @@ -29,19 +31,13 @@ public static void main(String... args) throws Exception { executorService.execute(new TestDataReporter(producer, TOPIC)); } - private static Producer<Long, String> createProducer() { - try{ - Properties properties = new Properties(); - properties.load(new FileReader("src/main/resources/producer.config")); - properties.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaExampleProducer"); - properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName()); - properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - return new KafkaProducer<>(properties); - } catch (Exception e){ - System.out.println("Failed to create producer with exception: " + e); - System.exit(0); - return null; //unreachable - } + private static Producer<Long, String> createProducer() throws IOException { + Properties properties = new Properties(); + properties.load(TestProducer.class.getResourceAsStream("/producer.config")); + properties.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaExampleProducer"); + properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName()); + properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + return new KafkaProducer<>(properties); } } diff --git a/tutorials/oauth/java/managedidentity/producer/src/main/resources/producer.config b/tutorials/oauth/java/managedidentity/producer/src/main/resources/producer.config index 3ea6785..bb8cfff 100644 --- a/tutorials/oauth/java/managedidentity/producer/src/main/resources/producer.config +++ b/tutorials/oauth/java/managedidentity/producer/src/main/resources/producer.config @@ -2,4 +2,4 @@ bootstrap.servers=mynamespace.servicebus.windows.net:9093 security.protocol=SASL_SSL sasl.mechanism=OAUTHBEARER sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required; -sasl.login.callback.handler.class=CustomAuthenticateCallbackHandler +sasl.login.callback.handler.class=de.microsoft.examples.AzureAuthenticateCallbackHandler