diff --git a/quickstart/dotnet-aad/KafkaProducer/KafkaProducer.csproj b/quickstart/dotnet-aad/KafkaProducer/KafkaProducer.csproj new file mode 100644 index 0000000..5e23615 --- /dev/null +++ b/quickstart/dotnet-aad/KafkaProducer/KafkaProducer.csproj @@ -0,0 +1,13 @@ + + + + Exe + net5.0 + + + + + + + + diff --git a/quickstart/dotnet-aad/KafkaProducer/Program.cs b/quickstart/dotnet-aad/KafkaProducer/Program.cs new file mode 100644 index 0000000..43f3d38 --- /dev/null +++ b/quickstart/dotnet-aad/KafkaProducer/Program.cs @@ -0,0 +1,155 @@ +using Azure.Core; +using Azure.Identity; +using Confluent.Kafka; +using System; +using System.Collections.Generic; +using System.Threading.Tasks; + +namespace KafkaProducer +{ + class Program + { + + private static string eventHubNamespaceFQDNwithPort = ".servicebus.windows.net:9093"; + + private static string eventHubNamespace = ""; + + // connectionstring - primary or secondary key + private static string policyConnectionString = "Endpoint=sb://.servicebus.windows.net/;SharedAccessKeyName=;SharedAccessKey=;EntityPath="; + + //in kafka world this is the topic in event hub is the event hub name under the namespace + private static string topicName = ""; + + + + + + static async Task Main(string[] args) + { + Console.WriteLine("Hello Event Hub Kafka client!"); + + //remember to proberly feed the following static attributes: + //eventHubNamespace + //eventHubNamespaceFQDNwithPort + + + try + { + //----- SAS AUTH -----// + // + //to leverage SAS Key as authentication methods we need the following static attribute valorized in the code: + //policyConnectionString + // + //var producer = GetProducerSAS(); + // + //----- -------- -----// + + + //----- AAD AUTH -----// + // + //to leverage AAD as authentication methods you need to provide to the application a valid principal informations according to what's needed by DefaultAzureCredentialOptions + // + //Get a Producer using an AAD Token + var producer = GetProducerAADToken(); + // + //----- -------- -----// + + Console.WriteLine("Initiating Execution"); + + for (int x = 0; x < 100; x++) + { + var msg = new Message { Value = string.Format("This is a sample message - msg # {0} at {1}", x, DateTime.Now.ToString("yyyMMdd_HHmmSSfff")) }; + + // publishes the message to Event Hubs + var result = await producer.ProduceAsync(topicName, msg); + + Console.WriteLine($"Message {result.Value} sent to partition {result.TopicPartition } with result {result.Status}"); + } + + } + catch (Exception e) + { + Console.WriteLine(string.Format("Exception Ocurred - {0}", e.Message)); + } + } + + + /// + /// Kafka Client callback to get or refresh a Token + /// + /// + /// + static void OAuthTokenRefreshCallback(IClient client, string cfg) + { + try + { + DefaultAzureCredentialOptions defaultAzureCredentialOptions = new DefaultAzureCredentialOptions(); + DefaultAzureCredential defaultAzureCredential = new DefaultAzureCredential(defaultAzureCredentialOptions); + + var tokenRequestContext = new TokenRequestContext(new string[] { $"https://{eventHubNamespace}.servicebus.windows.net/.default" }); + + var accessToken = defaultAzureCredential.GetToken(tokenRequestContext); + + + var token = accessToken.Token; + var expire = accessToken.ExpiresOn.ToUnixTimeMilliseconds(); + + //principal could be null for Kafka OAuth 2.0 auth + client.OAuthBearerSetToken(token, expire, null); + } + catch (Exception ex) + { + client.OAuthBearerSetTokenFailure(ex.ToString()); + } + } + + + + private static IProducer GetProducerAADToken() + { + var pConfig = new ProducerConfig + { + BootstrapServers = eventHubNamespaceFQDNwithPort, + SaslMechanism = SaslMechanism.OAuthBearer, + SecurityProtocol = SecurityProtocol.SaslSsl, + BrokerVersionFallback = "0.10.0.0", + ApiVersionFallbackMs = 0, + Debug = "security,broker,protocol" + }; + + //instantiates the producer + var producerBuilder = new ProducerBuilder(pConfig); + + //configure the call back to grab a token + IProducer producer = producerBuilder.SetOAuthBearerTokenRefreshHandler(OAuthTokenRefreshCallback).Build(); + + return producer; + } + + + private static IProducer GetProducerSAS() + { + //for Plain Sasl mechanism use an hard coded naming convention for user name and use the connection string as password + + var pConfig = new ProducerConfig + { + BootstrapServers = eventHubNamespaceFQDNwithPort, + SaslMechanism = SaslMechanism.Plain, + SecurityProtocol = SecurityProtocol.SaslSsl, + SaslUsername = "$ConnectionString", + SaslPassword = policyConnectionString, + BrokerVersionFallback = "0.10.0.0", + ApiVersionFallbackMs = 0, + Debug = "security,broker,protocol" + }; + + //instantiates the producer + var producerBuilder = new ProducerBuilder(pConfig); + + IProducer producer = producerBuilder.Build(); + + return producer; + } + + } +} diff --git a/quickstart/dotnet-aad/KafkaSampleApps.sln b/quickstart/dotnet-aad/KafkaSampleApps.sln new file mode 100644 index 0000000..ec24fbe --- /dev/null +++ b/quickstart/dotnet-aad/KafkaSampleApps.sln @@ -0,0 +1,25 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio Version 16 +VisualStudioVersion = 16.0.31424.327 +MinimumVisualStudioVersion = 10.0.40219.1 +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KafkaProducer", "KafkaProducer\KafkaProducer.csproj", "{4567B897-BC03-4D9B-BB1D-83469CC97FF6}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Release|Any CPU = Release|Any CPU + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {4567B897-BC03-4D9B-BB1D-83469CC97FF6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {4567B897-BC03-4D9B-BB1D-83469CC97FF6}.Debug|Any CPU.Build.0 = Debug|Any CPU + {4567B897-BC03-4D9B-BB1D-83469CC97FF6}.Release|Any CPU.ActiveCfg = Release|Any CPU + {4567B897-BC03-4D9B-BB1D-83469CC97FF6}.Release|Any CPU.Build.0 = Release|Any CPU + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection + GlobalSection(ExtensibilityGlobals) = postSolution + SolutionGuid = {2813E88D-9F06-47AC-B2C2-2A176437B8CA} + EndGlobalSection +EndGlobal diff --git a/quickstart/dotnet-aad/README.md b/quickstart/dotnet-aad/README.md new file mode 100644 index 0000000..aeabaf1 --- /dev/null +++ b/quickstart/dotnet-aad/README.md @@ -0,0 +1,64 @@ +# Event Hub Kafka endpoint Azure AD Authentication using C# + +Event Hub is a big-data streaming platform and event ingestion service part of the messaging service ecosystem in Azure + +Event Hub for Kafka support both SAS (shared access signatures) and OAuth 2.0 (in our case Azure AD). + +I started to look for sample code around using a C# library authenticating with an OAuth 2.0 token obtained from Azure AD but I didn't found anything, so I decided to create my sample code and the best .NET Kafka library I found out there it was the “Confluent.Kafka”. + +This sample will connect to Event Hub kafka endpoint using the Confluent.Kafka library grabbing an Azure AD Access Token with the help of the Azure.Identity library. + + +## Executing the sample + +In order to execute the sample code you need to fill some variable with some configuration of your Event Hub (of course for production grade app this stuff should come from a configuration and not hardcoded into the code). + + private static string eventHubNamespaceFQDNwithPort = ".servicebus.windows.net:9093"; + + private static string eventHubNamespace = ""; + + // connectionstring - primary or secondary key + private static string policyConnectionString = "Endpoint=sb://.servicebus.windows.net/;SharedAccessKeyName=;SharedAccessKey=;EntityPath="; + + //in kafka world this is the topic in event hub is the event hub name under the namespace + private static string topicName = ""; + + +The, the code leverage the Azure.Identity package, in particular the 'DefaultAzureCredentialOptions' class to obtain a token. +Please follo the instruction on the article you find linked at the bottom to configure a Principal and provide the secret to this sample app. + + + static void OAuthTokenRefreshCallback(IClient client, string cfg) + { + try + { + DefaultAzureCredentialOptions defaultAzureCredentialOptions = new DefaultAzureCredentialOptions(); + DefaultAzureCredential defaultAzureCredential = new DefaultAzureCredential(defaultAzureCredentialOptions); + + var tokenRequestContext = new TokenRequestContext(new string[] { $"https://{eventHubNamespace}.servicebus.windows.net/.default" }); + + var accessToken = defaultAzureCredential.GetToken(tokenRequestContext); + + + var token = accessToken.Token; + var expire = accessToken.ExpiresOn.ToUnixTimeMilliseconds(); + + //principal could be null for Kafka OAuth 2.0 auth + client.OAuthBearerSetToken(token, expire, null); + } + catch (Exception ex) + { + client.OAuthBearerSetTokenFailure(ex.ToString()); + } + } + + + + +## Microsoft Techcommunity Blog Post + +You can find the full blog post at this url: +https://techcommunity.microsoft.com/t5/fasttrack-for-azure/event-hub-kafka-endpoint-azure-ad-authentication-using-c/ba-p/2586185 + + +