Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added dotnet sample using Azure AD authentication #163

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions quickstart/dotnet-aad/KafkaProducer/KafkaProducer.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net5.0</TargetFramework>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Azure.Identity" Version="1.4.0" />
<PackageReference Include="Confluent.Kafka" Version="1.7.0" />
</ItemGroup>

</Project>
155 changes: 155 additions & 0 deletions quickstart/dotnet-aad/KafkaProducer/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
using Azure.Core;
using Azure.Identity;
using Confluent.Kafka;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace KafkaProducer
{
class Program
{

private static string eventHubNamespaceFQDNwithPort = "<event hub namespace name>.servicebus.windows.net:9093";

private static string eventHubNamespace = "<event hub namespace name>";

// connectionstring - primary or secondary key
private static string policyConnectionString = "Endpoint=sb://<event hub namespace name>.servicebus.windows.net/;SharedAccessKeyName=<policy name>;SharedAccessKey=<policy key>;EntityPath=<event hub name>";

//in kafka world this is the topic in event hub is the event hub name under the namespace
private static string topicName = "<event hub name>";





static async Task Main(string[] args)
{
Console.WriteLine("Hello Event Hub Kafka client!");

//remember to proberly feed the following static attributes:
//eventHubNamespace
//eventHubNamespaceFQDNwithPort


try
{
//----- SAS AUTH -----//
//
//to leverage SAS Key as authentication methods we need the following static attribute valorized in the code:
//policyConnectionString
//
//var producer = GetProducerSAS();
//
//----- -------- -----//


//----- AAD AUTH -----//
//
//to leverage AAD as authentication methods you need to provide to the application a valid principal informations according to what's needed by DefaultAzureCredentialOptions
//
//Get a Producer using an AAD Token
var producer = GetProducerAADToken();
//
//----- -------- -----//

Console.WriteLine("Initiating Execution");

for (int x = 0; x < 100; x++)
{
var msg = new Message<Null, string> { Value = string.Format("This is a sample message - msg # {0} at {1}", x, DateTime.Now.ToString("yyyMMdd_HHmmSSfff")) };

// publishes the message to Event Hubs
var result = await producer.ProduceAsync(topicName, msg);

Console.WriteLine($"Message {result.Value} sent to partition {result.TopicPartition } with result {result.Status}");
}

}
catch (Exception e)
{
Console.WriteLine(string.Format("Exception Ocurred - {0}", e.Message));
}
}


/// <summary>
/// Kafka Client callback to get or refresh a Token
/// </summary>
/// <param name="client"></param>
/// <param name="cfg"></param>
static void OAuthTokenRefreshCallback(IClient client, string cfg)
{
try
{
DefaultAzureCredentialOptions defaultAzureCredentialOptions = new DefaultAzureCredentialOptions();
DefaultAzureCredential defaultAzureCredential = new DefaultAzureCredential(defaultAzureCredentialOptions);

var tokenRequestContext = new TokenRequestContext(new string[] { $"https://{eventHubNamespace}.servicebus.windows.net/.default" });

var accessToken = defaultAzureCredential.GetToken(tokenRequestContext);


var token = accessToken.Token;
var expire = accessToken.ExpiresOn.ToUnixTimeMilliseconds();

//principal could be null for Kafka OAuth 2.0 auth
client.OAuthBearerSetToken(token, expire, null);
}
catch (Exception ex)
{
client.OAuthBearerSetTokenFailure(ex.ToString());
}
}



private static IProducer<Null, string> GetProducerAADToken()
{
var pConfig = new ProducerConfig
{
BootstrapServers = eventHubNamespaceFQDNwithPort,
SaslMechanism = SaslMechanism.OAuthBearer,
SecurityProtocol = SecurityProtocol.SaslSsl,
BrokerVersionFallback = "0.10.0.0",
ApiVersionFallbackMs = 0,
Debug = "security,broker,protocol"
};

//instantiates the producer
var producerBuilder = new ProducerBuilder<Null, string>(pConfig);

//configure the call back to grab a token
IProducer<Null, string> producer = producerBuilder.SetOAuthBearerTokenRefreshHandler(OAuthTokenRefreshCallback).Build();

return producer;
}


private static IProducer<Null, string> GetProducerSAS()
{
//for Plain Sasl mechanism use an hard coded naming convention for user name and use the connection string as password

var pConfig = new ProducerConfig
{
BootstrapServers = eventHubNamespaceFQDNwithPort,
SaslMechanism = SaslMechanism.Plain,
SecurityProtocol = SecurityProtocol.SaslSsl,
SaslUsername = "$ConnectionString",
SaslPassword = policyConnectionString,
BrokerVersionFallback = "0.10.0.0",
ApiVersionFallbackMs = 0,
Debug = "security,broker,protocol"
};

//instantiates the producer
var producerBuilder = new ProducerBuilder<Null, string>(pConfig);

IProducer<Null, string> producer = producerBuilder.Build();

return producer;
}

}
}
25 changes: 25 additions & 0 deletions quickstart/dotnet-aad/KafkaSampleApps.sln
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 16
VisualStudioVersion = 16.0.31424.327
MinimumVisualStudioVersion = 10.0.40219.1
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KafkaProducer", "KafkaProducer\KafkaProducer.csproj", "{4567B897-BC03-4D9B-BB1D-83469CC97FF6}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Release|Any CPU = Release|Any CPU
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{4567B897-BC03-4D9B-BB1D-83469CC97FF6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{4567B897-BC03-4D9B-BB1D-83469CC97FF6}.Debug|Any CPU.Build.0 = Debug|Any CPU
{4567B897-BC03-4D9B-BB1D-83469CC97FF6}.Release|Any CPU.ActiveCfg = Release|Any CPU
{4567B897-BC03-4D9B-BB1D-83469CC97FF6}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {2813E88D-9F06-47AC-B2C2-2A176437B8CA}
EndGlobalSection
EndGlobal
64 changes: 64 additions & 0 deletions quickstart/dotnet-aad/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Event Hub Kafka endpoint Azure AD Authentication using C#

Event Hub is a big-data streaming platform and event ingestion service part of the messaging service ecosystem in Azure

Event Hub for Kafka support both SAS (shared access signatures) and OAuth 2.0 (in our case Azure AD).

I started to look for sample code around using a C# library authenticating with an OAuth 2.0 token obtained from Azure AD but I didn't found anything, so I decided to create my sample code and the best .NET Kafka library I found out there it was the “Confluent.Kafka”.

This sample will connect to Event Hub kafka endpoint using the Confluent.Kafka library grabbing an Azure AD Access Token with the help of the Azure.Identity library.


## Executing the sample

In order to execute the sample code you need to fill some variable with some configuration of your Event Hub (of course for production grade app this stuff should come from a configuration and not hardcoded into the code).

private static string eventHubNamespaceFQDNwithPort = "<event hub namespace name>.servicebus.windows.net:9093";

private static string eventHubNamespace = "<event hub namespace name>";

// connectionstring - primary or secondary key
private static string policyConnectionString = "Endpoint=sb://<event hub namespace name>.servicebus.windows.net/;SharedAccessKeyName=<policy name>;SharedAccessKey=<policy key>;EntityPath=<event hub name>";

//in kafka world this is the topic in event hub is the event hub name under the namespace
private static string topicName = "<event hub name>";


The, the code leverage the Azure.Identity package, in particular the 'DefaultAzureCredentialOptions' class to obtain a token.
Please follo the instruction on the article you find linked at the bottom to configure a Principal and provide the secret to this sample app.


static void OAuthTokenRefreshCallback(IClient client, string cfg)
{
try
{
DefaultAzureCredentialOptions defaultAzureCredentialOptions = new DefaultAzureCredentialOptions();
DefaultAzureCredential defaultAzureCredential = new DefaultAzureCredential(defaultAzureCredentialOptions);

var tokenRequestContext = new TokenRequestContext(new string[] { $"https://{eventHubNamespace}.servicebus.windows.net/.default" });

var accessToken = defaultAzureCredential.GetToken(tokenRequestContext);


var token = accessToken.Token;
var expire = accessToken.ExpiresOn.ToUnixTimeMilliseconds();

//principal could be null for Kafka OAuth 2.0 auth
client.OAuthBearerSetToken(token, expire, null);
}
catch (Exception ex)
{
client.OAuthBearerSetTokenFailure(ex.ToString());
}
}




## Microsoft Techcommunity Blog Post

You can find the full blog post at this url:
https://techcommunity.microsoft.com/t5/fasttrack-for-azure/event-hub-kafka-endpoint-azure-ad-authentication-using-c/ba-p/2586185