Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Python OAuth example #109 #186

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions tutorials/oauth/python/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Send Messages in Python using Azure Event Hubs for Apache Kafka Ecosystem with OAuthBearer

This quickstart will show how to create and connect to an Event Hubs Kafka endpoint using an example producer written in Python. See the [kafka-python docs](https://kafka-python.readthedocs.io/en/master/index.html) for more information on how to use Kafka clients in Python.

## Prerequisites

If you don't have an Azure subscription, create a [free account](https://azure.microsoft.com/free/?ref=microsoft.com&utm_source=microsoft.com&utm_medium=docs&utm_campaign=visualstudio) before you begin.

Additionally, you need to install:

* [Python](https://www.python.org/downloads/)
* [Git](https://www.git-scm.com/downloads)
* On Ubuntu, you can run `sudo apt-get install git` to install Git.
* [Azure CLI](https://docs.microsoft.com/en-us/cli/azure/install-azure-cli)
* [Install kafka-python library](https://github.com/dpkp/kafka-python)
* Run `pip install kafka-python`.
* [Install azure-identity library](https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/identity/azure-identity)
* Run `pip install azure-identity`.

## Create an Event Hubs namespace

An Event Hubs namespace is required to send or receive from any Event Hubs service. See [Create Kafka-enabled Event Hubs](https://docs.microsoft.com/azure/event-hubs/event-hubs-create-kafka-enabled) for instructions on getting an Event Hubs Kafka endpoint.

Additionally, topics in Kafka map to Event Hub instances, so create an Event Hub instance called "example_topic" that our samples can send and receive messages from.

### FQDN

For these samples, you will need the Fully Qualified Domain Name of your Event Hubs namespace which can be found in Azure Portal. To do so, in Azure Portal, go to your Event Hubs namespace overview page and copy host name which should look like `**`mynamespace.servicebus.windows.net`**`.

If your Event Hubs namespace is deployed on a non-Public cloud, your domain name may differ (e.g. \*.servicebus.chinacloudapi.cn, \*.servicebus.usgovcloudapi.net, or \*.servicebus.cloudapi.de).

## Provision the correct permissions to yourself

In order to run these samples, you will need to assign yourself the role "Event Hubs Data Sender", scoped to the Event Hubs namespace you created in the previous section.

Learn more about [AAD Role Based Access Control](https://docs.microsoft.com/en-us/azure/role-based-access-control/overview)

Learn more about [Azure Event Hubs Role Based Access Control](https://docs.microsoft.com/en-us/azure/event-hubs/authorize-access-azure-active-directory)

## ALTERNATIVE: Create an Azure Active Directory Application

An alternative for using your own personal credentials for authenticating with the Event Hub, you can create a service principal for the application from which you will send events to the Event Hub. You will need to create an AAD application with a client secret and assign it as Event Hubs Data Sender on the Event Hubs namespace you created in the previous section.

Learn more about [AAD Role Based Access Control](https://docs.microsoft.com/en-us/azure/role-based-access-control/overview)

Learn more about [Azure Event Hubs Role Based Access Control](https://docs.microsoft.com/en-us/azure/event-hubs/authorize-access-azure-active-directory)


## Clone the example project

Now that you have registered your Kafka-enabled Event Hubs namespace in the AAD, clone the Azure Event Hubs for Kafka repository and navigate to the `tutorials/oauth/python` subfolder:

```bash
git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/oauth/python
```

## Configuration

In the ./producer/config/oauth_config.json file, make sure you replace the NAMESPACE placeholders with the name of your actual Event Hubs namespace.

## Login to Azure CLI

The Python producer in this example makes use of the credentials with which you are authenticated in the Azure CLI. To login to the Azure CLI, run `az login` in your terminal. In the browser pop-up, login to the Azure account for which you have provisioned the 'Event Hub Data sender' permission.

Alternatively, if you created a service principal with 'Event Hub Data sender' permissions, run `az login --service-principal -u <app-id> -p <password> --tenant <tenant>` to use the service principal's credentials on your local machine, with `app_id` being the application id of your registered application, and `password` being a client secret you have created for your registered application.

If you want to run the producer script on a machine which already has a service principal account which it is configured to use for communication with other Azure resources, perform the following steps:
1. Go to the ./producer/src/token_provider.py file
2. In line 14, change `AzureCliCredential()` to `DefaultAzureCredential()`.


## Producer

The producer sample demonstrates how to send messages to the Event Hubs service using the Kafka protocol.

You can run the sample via:

```bash
$ cd producer/src
$ python producer.py
```

The producer will now begin sending events to the Kafka-enabled Event Hub on topic `example_topic` and printing the events to your console. If you would like to change the topic, change the topic variable in `producer.py`.

The producer can be stopped by using CTRL+C in the terminal.
6 changes: 6 additions & 0 deletions tutorials/oauth/python/producer/config/oauth_config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"bootstrap_servers": "NAMESPACE.servicebus.windows.net:9093",
"security_protocol": "SASL_SSL",
"sasl_mechanism": "OAUTHBEARER",
"token_scope": "https://NAMESPACE.servicebus.windows.net"
}
24 changes: 24 additions & 0 deletions tutorials/oauth/python/producer/src/data_generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""
THIS MODULE IS COPIED FROM THE EXAMPLE IN: https://betterdatascience.com/apache-kafka-in-python-how-to-stream-data-with-producers-and-consumers/
CREDITS TO: Dario Radečić
"""
import random
import string

user_ids = list(range(1, 101))
recipient_ids = list(range(1, 101))

def generate_message() -> dict:
random_user_id = random.choice(user_ids)
# Copy the recipients array
recipient_ids_copy = recipient_ids.copy()
# User can't send message to himself
recipient_ids_copy.remove(random_user_id)
random_recipient_id = random.choice(recipient_ids_copy)
# Generate a random message
message = ''.join(random.choice(string.ascii_letters) for i in range(32))
return {
'user_id': random_user_id,
'recipient_id': random_recipient_id,
'message': message
}
45 changes: 45 additions & 0 deletions tutorials/oauth/python/producer/src/producer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import time
import json
import random
from datetime import datetime

from data_generator import generate_message
from token_provider import AzureTokenProvider
from kafka import KafkaProducer

# Messages will be serialized as JSON
def serializer(message):
return json.dumps(message).encode('utf-8')

# Read config for connection
with open("../config/oauth_config.json") as file:
config = json.load(file)

# Kafka Producer
producer = KafkaProducer(
bootstrap_servers=[config["bootstrap_servers"]],
security_protocol=config["security_protocol"],
sasl_mechanism=config["sasl_mechanism"],
sasl_oauth_token_provider=AzureTokenProvider(token_scope=config["token_scope"]),
value_serializer=serializer
)

# Event Hub/Kafka topic name. In Kafka terminology, an Event Hub corresponds to a topic.
topic = 'example_topic'

# CODE IS INSPIRED BY: https://betterdatascience.com/apache-kafka-in-python-how-to-stream-data-with-producers-and-consumers/
# CREDITS TO: Dario Radečić
if __name__ == '__main__':
# Infinite loop - runs until you kill the program
while True:
# Generate a message
dummy_message = generate_message()

# Send it to our 'messages' topic
print(f'Producing message @ {datetime.now()} | Message = {str(dummy_message)}')
producer.send(topic, dummy_message)
print('Message sent to Event Hub.')

# Sleep for a random number of seconds
time_to_sleep = random.randint(1, 11)
time.sleep(time_to_sleep)
16 changes: 16 additions & 0 deletions tutorials/oauth/python/producer/src/token_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from azure.identity import DefaultAzureCredential, AzureCliCredential

class AzureTokenProvider():
def __init__(self, token_scope):
# Token provider needs a scope for which the token must be requested.
# In this case, an Azure Event Hub
self.token_scope = token_scope

def token(self):
"""
Returns a (str) ID/Access Token to be sent to the Kafka
client.
"""
credential = AzureCliCredential()
token = credential.get_token(self.token_scope).token
return token