From be32cef3d7f81e543c73ea52de2a69b55bcf7e4f Mon Sep 17 00:00:00 2001 From: Axel van t Westeinde Date: Fri, 14 Jan 2022 09:09:25 +0100 Subject: [PATCH] added Python OAuth example --- tutorials/oauth/python/README.md | 86 +++++++++++++++++++ .../python/producer/config/oauth_config.json | 6 ++ .../python/producer/src/data_generator.py | 24 ++++++ .../oauth/python/producer/src/producer.py | 45 ++++++++++ .../python/producer/src/token_provider.py | 16 ++++ 5 files changed, 177 insertions(+) create mode 100644 tutorials/oauth/python/README.md create mode 100644 tutorials/oauth/python/producer/config/oauth_config.json create mode 100644 tutorials/oauth/python/producer/src/data_generator.py create mode 100644 tutorials/oauth/python/producer/src/producer.py create mode 100644 tutorials/oauth/python/producer/src/token_provider.py diff --git a/tutorials/oauth/python/README.md b/tutorials/oauth/python/README.md new file mode 100644 index 0000000..7def9c2 --- /dev/null +++ b/tutorials/oauth/python/README.md @@ -0,0 +1,86 @@ +# Send Messages in Python using Azure Event Hubs for Apache Kafka Ecosystem with OAuthBearer + +This quickstart will show how to create and connect to an Event Hubs Kafka endpoint using an example producer written in Python. See the [kafka-python docs](https://kafka-python.readthedocs.io/en/master/index.html) for more information on how to use Kafka clients in Python. + +## Prerequisites + +If you don't have an Azure subscription, create a [free account](https://azure.microsoft.com/free/?ref=microsoft.com&utm_source=microsoft.com&utm_medium=docs&utm_campaign=visualstudio) before you begin. + +Additionally, you need to install: + +* [Python](https://www.python.org/downloads/) +* [Git](https://www.git-scm.com/downloads) + * On Ubuntu, you can run `sudo apt-get install git` to install Git. +* [Azure CLI](https://docs.microsoft.com/en-us/cli/azure/install-azure-cli) +* [Install kafka-python library](https://github.com/dpkp/kafka-python) + * Run `pip install kafka-python`. +* [Install azure-identity library](https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/identity/azure-identity) + * Run `pip install azure-identity`. + +## Create an Event Hubs namespace + +An Event Hubs namespace is required to send or receive from any Event Hubs service. See [Create Kafka-enabled Event Hubs](https://docs.microsoft.com/azure/event-hubs/event-hubs-create-kafka-enabled) for instructions on getting an Event Hubs Kafka endpoint. + +Additionally, topics in Kafka map to Event Hub instances, so create an Event Hub instance called "example_topic" that our samples can send and receive messages from. + +### FQDN + +For these samples, you will need the Fully Qualified Domain Name of your Event Hubs namespace which can be found in Azure Portal. To do so, in Azure Portal, go to your Event Hubs namespace overview page and copy host name which should look like `**`mynamespace.servicebus.windows.net`**`. + +If your Event Hubs namespace is deployed on a non-Public cloud, your domain name may differ (e.g. \*.servicebus.chinacloudapi.cn, \*.servicebus.usgovcloudapi.net, or \*.servicebus.cloudapi.de). + +## Provision the correct permissions to yourself + +In order to run these samples, you will need to assign yourself the role "Event Hubs Data Sender", scoped to the Event Hubs namespace you created in the previous section. + +Learn more about [AAD Role Based Access Control](https://docs.microsoft.com/en-us/azure/role-based-access-control/overview) + +Learn more about [Azure Event Hubs Role Based Access Control](https://docs.microsoft.com/en-us/azure/event-hubs/authorize-access-azure-active-directory) + +## ALTERNATIVE: Create an Azure Active Directory Application + +An alternative for using your own personal credentials for authenticating with the Event Hub, you can create a service principal for the application from which you will send events to the Event Hub. You will need to create an AAD application with a client secret and assign it as Event Hubs Data Sender on the Event Hubs namespace you created in the previous section. + +Learn more about [AAD Role Based Access Control](https://docs.microsoft.com/en-us/azure/role-based-access-control/overview) + +Learn more about [Azure Event Hubs Role Based Access Control](https://docs.microsoft.com/en-us/azure/event-hubs/authorize-access-azure-active-directory) + + +## Clone the example project + +Now that you have registered your Kafka-enabled Event Hubs namespace in the AAD, clone the Azure Event Hubs for Kafka repository and navigate to the `tutorials/oauth/python` subfolder: + +```bash +git clone https://github.com/Azure/azure-event-hubs-for-kafka.git +cd azure-event-hubs-for-kafka/tutorials/oauth/python +``` + +## Configuration + +In the ./producer/config/oauth_config.json file, make sure you replace the NAMESPACE placeholders with the name of your actual Event Hubs namespace. + +## Login to Azure CLI + +The Python producer in this example makes use of the credentials with which you are authenticated in the Azure CLI. To login to the Azure CLI, run `az login` in your terminal. In the browser pop-up, login to the Azure account for which you have provisioned the 'Event Hub Data sender' permission. + +Alternatively, if you created a service principal with 'Event Hub Data sender' permissions, run `az login --service-principal -u -p --tenant ` to use the service principal's credentials on your local machine, with `app_id` being the application id of your registered application, and `password` being a client secret you have created for your registered application. + +If you want to run the producer script on a machine which already has a service principal account which it is configured to use for communication with other Azure resources, perform the following steps: +1. Go to the ./producer/src/token_provider.py file +2. In line 14, change `AzureCliCredential()` to `DefaultAzureCredential()`. + + +## Producer + +The producer sample demonstrates how to send messages to the Event Hubs service using the Kafka protocol. + +You can run the sample via: + +```bash +$ cd producer/src +$ python producer.py +``` + +The producer will now begin sending events to the Kafka-enabled Event Hub on topic `example_topic` and printing the events to your console. If you would like to change the topic, change the topic variable in `producer.py`. + +The producer can be stopped by using CTRL+C in the terminal. diff --git a/tutorials/oauth/python/producer/config/oauth_config.json b/tutorials/oauth/python/producer/config/oauth_config.json new file mode 100644 index 0000000..3303bcb --- /dev/null +++ b/tutorials/oauth/python/producer/config/oauth_config.json @@ -0,0 +1,6 @@ +{ + "bootstrap_servers": "NAMESPACE.servicebus.windows.net:9093", + "security_protocol": "SASL_SSL", + "sasl_mechanism": "OAUTHBEARER", + "token_scope": "https://NAMESPACE.servicebus.windows.net" +} \ No newline at end of file diff --git a/tutorials/oauth/python/producer/src/data_generator.py b/tutorials/oauth/python/producer/src/data_generator.py new file mode 100644 index 0000000..916513d --- /dev/null +++ b/tutorials/oauth/python/producer/src/data_generator.py @@ -0,0 +1,24 @@ +""" +THIS MODULE IS COPIED FROM THE EXAMPLE IN: https://betterdatascience.com/apache-kafka-in-python-how-to-stream-data-with-producers-and-consumers/ +CREDITS TO: Dario Radečić +""" +import random +import string + +user_ids = list(range(1, 101)) +recipient_ids = list(range(1, 101)) + +def generate_message() -> dict: + random_user_id = random.choice(user_ids) + # Copy the recipients array + recipient_ids_copy = recipient_ids.copy() + # User can't send message to himself + recipient_ids_copy.remove(random_user_id) + random_recipient_id = random.choice(recipient_ids_copy) + # Generate a random message + message = ''.join(random.choice(string.ascii_letters) for i in range(32)) + return { + 'user_id': random_user_id, + 'recipient_id': random_recipient_id, + 'message': message + } \ No newline at end of file diff --git a/tutorials/oauth/python/producer/src/producer.py b/tutorials/oauth/python/producer/src/producer.py new file mode 100644 index 0000000..b79e2d1 --- /dev/null +++ b/tutorials/oauth/python/producer/src/producer.py @@ -0,0 +1,45 @@ +import time +import json +import random +from datetime import datetime + +from data_generator import generate_message +from token_provider import AzureTokenProvider +from kafka import KafkaProducer + +# Messages will be serialized as JSON +def serializer(message): + return json.dumps(message).encode('utf-8') + +# Read config for connection +with open("../config/oauth_config.json") as file: + config = json.load(file) + +# Kafka Producer +producer = KafkaProducer( + bootstrap_servers=[config["bootstrap_servers"]], + security_protocol=config["security_protocol"], + sasl_mechanism=config["sasl_mechanism"], + sasl_oauth_token_provider=AzureTokenProvider(token_scope=config["token_scope"]), + value_serializer=serializer + ) + +# Event Hub/Kafka topic name. In Kafka terminology, an Event Hub corresponds to a topic. +topic = 'example_topic' + +# CODE IS INSPIRED BY: https://betterdatascience.com/apache-kafka-in-python-how-to-stream-data-with-producers-and-consumers/ +# CREDITS TO: Dario Radečić +if __name__ == '__main__': + # Infinite loop - runs until you kill the program + while True: + # Generate a message + dummy_message = generate_message() + + # Send it to our 'messages' topic + print(f'Producing message @ {datetime.now()} | Message = {str(dummy_message)}') + producer.send(topic, dummy_message) + print('Message sent to Event Hub.') + + # Sleep for a random number of seconds + time_to_sleep = random.randint(1, 11) + time.sleep(time_to_sleep) \ No newline at end of file diff --git a/tutorials/oauth/python/producer/src/token_provider.py b/tutorials/oauth/python/producer/src/token_provider.py new file mode 100644 index 0000000..89dfe23 --- /dev/null +++ b/tutorials/oauth/python/producer/src/token_provider.py @@ -0,0 +1,16 @@ +from azure.identity import DefaultAzureCredential, AzureCliCredential + +class AzureTokenProvider(): + def __init__(self, token_scope): + # Token provider needs a scope for which the token must be requested. + # In this case, an Azure Event Hub + self.token_scope = token_scope + + def token(self): + """ + Returns a (str) ID/Access Token to be sent to the Kafka + client. + """ + credential = AzureCliCredential() + token = credential.get_token(self.token_scope).token + return token \ No newline at end of file