Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhancement: Added Kafka OIDC + azure.identity independent producer/consumer examples. #227

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 117 additions & 0 deletions tutorials/oauth/python/config_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
#!/usr/bin/env python
#
# Copyright (c) Microsoft Corporation. All rights reserved.
# Copyright 2023 Azure Inc.
# Licensed under the MIT License.
# Licensed under the Apache License, Version 2.0
#

from azure.identity import DefaultAzureCredential
from functools import partial
import os
import requests
import time


TENANT_ID = os.environ.get('AZURE_TENANT_ID')
CLIENT_ID = os.environ.get('AZURE_CLIENT_ID')
CLIENT_SECRET = os.environ.get('AZURE_CLIENT_SECRET')


def get_oauth_config(namespace):
conf = {
'bootstrap.servers': '%s:9093' % namespace,

# Required OAuth2 configuration properties
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'OAUTHBEARER'
}
return conf


def get_azure_config(namespace):
def oauth_cb(cred, namespace_fqdn, config):
# confluent_kafka requires an oauth callback function to return (str, float) with the values of (<access token>, <expiration date in seconds from epoch>)

# cred: an Azure identity library credential object. Ex: an instance of DefaultAzureCredential, ManagedIdentityCredential, etc
# namespace_fqdn: the FQDN for the target Event Hubs namespace. Ex: 'mynamespace.servicebus.windows.net'
# config: confluent_kafka passes the value of 'sasl.oauthbearer.config' as the config param

access_token = cred.get_token('https://%s/.default' % namespace_fqdn)
return access_token.token, access_token.expires_on

# Azure credential
# See https://docs.microsoft.com/en-us/azure/developer/python/sdk/authentication-overview
cred = DefaultAzureCredential()

# Producer configuration
# See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
conf = get_oauth_config(namespace)

# the resulting oauth_cb must accept a single `config` parameter, so we use partial to bind the namespace/identity to our function
conf['oauth_cb'] = partial(oauth_cb, cred, namespace)
return conf


# Using Kafka oauthbearer OIDC semantics that decodes JWT tokens and uses exp claim
# KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC
#
def get_oidc_config(namespace):
conf = get_oauth_config(namespace)
conf.update({
'sasl.oauthbearer.method': 'oidc',
'sasl.oauthbearer.client.id': CLIENT_ID,
'sasl.oauthbearer.client.secret': CLIENT_SECRET,
'sasl.oauthbearer.token.endpoint.url': 'https://login.microsoftonline.com/%s/oauth2/v2.0/token' % TENANT_ID,
'sasl.oauthbearer.scope': 'https://%s/.default' % namespace,
})

return conf


# Using expires_in field from the token response to treat OAUTHBEARER as opaque
# and avoid decoding JWT by utilizing RFC9068, RFC6749 concepts.
# https://datatracker.ietf.org/doc/html/rfc9068#name-privacy-considerations
# https://www.rfc-editor.org/rfc/rfc6749#section-4.2.2
#
def get_opaque_config(namespace):
def oauth_cb(config):
# take the time before request first
token_exp_time = int(time.time())

token_resp = requests.post(
"https://login.microsoftonline.com/%s/oauth2/v2.0/token" % TENANT_ID,
auth=(CLIENT_ID, CLIENT_SECRET),
data={
'grant_type': 'client_credentials',
'scope': 'https://%s/.default' % namespace
}
)

token_resp = token_resp.json()

# add expires_in value which is a token validity time
# in seconds from the time the response was generated
#
token_exp_time += int(token_resp['expires_in'])

return token_resp['access_token'], token_exp_time

conf = get_oauth_config(namespace)
conf['oauth_cb'] = oauth_cb

return conf



# Returns producer configs for azure, opaque, oidc modes
#
def get_config(namespace, mode):
if mode == 'azure':
conf = get_azure_config(namespace)
elif mode == 'oidc':
conf = get_oidc_config(namespace)
elif mode == 'opaque':
conf = get_opaque_config(namespace)

return conf
121 changes: 45 additions & 76 deletions tutorials/oauth/python/consumer.py
Original file line number Diff line number Diff line change
@@ -1,94 +1,22 @@
#!/usr/bin/env python
#
# Copyright (c) Microsoft Corporation. All rights reserved.
# Copyright 2016 Confluent Inc.
# Copyright 2023 Confluent Inc.
# Licensed under the MIT License.
# Licensed under the Apache License, Version 2.0
#
# Original Confluent sample modified for use with Azure Event Hubs for Apache Kafka Ecosystems

from azure.identity import DefaultAzureCredential
from confluent_kafka import Consumer, KafkaException
import sys
import getopt
import argparse
import json
import logging
from functools import partial
from pprint import pformat
import config_utils


def stats_cb(stats_json_str):
stats_json = json.loads(stats_json_str)
print('\nKAFKA Stats: {}\n'.format(pformat(stats_json)))


def oauth_cb(cred, namespace_fqdn, config):
# confluent_kafka requires an oauth callback function to return (str, float) with the values of (<access token>, <expiration date in seconds from epoch>)

# cred: an Azure identity library credential object. Ex: an instance of DefaultAzureCredential, ManagedIdentityCredential, etc
# namespace_fqdn: the FQDN for the target Event Hubs namespace. Ex: 'mynamespace.servicebus.windows.net'
# config: confluent_kafka passes the value of 'sasl.oauthbearer.config' as the config param

access_token = cred.get_token('https://%s/.default' % namespace_fqdn)
return access_token.token, access_token.expires_on


def print_usage_and_exit(program_name):
sys.stderr.write(
'Usage: %s [options..] <eventhubs-namespace> <group> <topic1> <topic2> ..\n' % program_name)
options = '''
Options:
-T <intvl> Enable client statistics at specified interval (ms)
'''
sys.stderr.write(options)
sys.exit(1)


if __name__ == '__main__':
optlist, argv = getopt.getopt(sys.argv[1:], 'T:')
if len(argv) < 3:
print_usage_and_exit(sys.argv[0])

namespace = argv[0]
group = argv[1]
topics = argv[2:]

# Azure credential
# See https://docs.microsoft.com/en-us/azure/developer/python/sdk/authentication-overview
cred = DefaultAzureCredential()

# Consumer configuration
# See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
conf = {
'bootstrap.servers': '%s:9093' % namespace,
'group.id': group,
'session.timeout.ms': 6000,
'auto.offset.reset': 'earliest',

# Required OAuth2 configuration properties
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'OAUTHBEARER',
# the resulting oauth_cb must accept a single `config` parameter, so we use partial to bind the namespace/identity to our function
'oauth_cb': partial(oauth_cb, cred, namespace),
}

# Check to see if -T option exists
for opt in optlist:
if opt[0] != '-T':
continue
try:
intval = int(opt[1])
except ValueError:
sys.stderr.write("Invalid option value for -T: %s\n" % opt[1])
sys.exit(1)

if intval <= 0:
sys.stderr.write("-T option value needs to be larger than zero: %s\n" % opt[1])
sys.exit(1)

conf['stats_cb'] = stats_cb
conf['statistics.interval.ms'] = int(opt[1])

def consume_workload(conf, topics):
# Create logger for consumer (logs will be emitted when poll() is called)
logger = logging.getLogger('consumer')
logger.setLevel(logging.DEBUG)
Expand Down Expand Up @@ -127,3 +55,44 @@ def print_assignment(consumer, partitions):
finally:
# Close down consumer to commit final offsets.
c.close()


def parse_consumer_args():
parser = argparse.ArgumentParser(description='Process command line arguments.')
parser.add_argument('namespace', help='Eventhubs namespace')
parser.add_argument('group', help='Group')
parser.add_argument('topics', nargs='+', help='Topic1, Topic2, ...')
parser.add_argument('-T', type=int,
help='Enable client statistics at specified interval (ms)')
parser.add_argument('--mode', default='azure',
choices=['azure', 'oidc', 'opaque'],
help='Token request callback implementation logic')

args = parser.parse_args()

if args.T and args.T <= 0:
sys.stderr.write("-T option value needs to be larger than zero: %s\n" % args.T)
sys.exit(1)

return args.namespace, args.group, args.topics, args.T, args.mode


if __name__ == '__main__':
namespace, group, topics, T, mode = parse_consumer_args()

conf = config_utils.get_config(namespace, mode)
conf.update({
'group.id': group,
'session.timeout.ms': 6000,
'auto.offset.reset': 'earliest'
})

def stats_cb(stats_json_str):
stats_json = json.loads(stats_json_str)
print('\nKAFKA Stats: {}\n'.format(pformat(stats_json)))

if T:
conf['stats_cb'] = stats_cb
conf['statistics.interval.ms'] = T

consume_workload(conf, topics)
71 changes: 30 additions & 41 deletions tutorials/oauth/python/producer.py
Original file line number Diff line number Diff line change
@@ -1,56 +1,23 @@
#!/usr/bin/env python
#
# Copyright (c) Microsoft Corporation. All rights reserved.
# Copyright 2016 Confluent Inc.
# Copyright 2023 Confluent Inc.
# Licensed under the MIT License.
# Licensed under the Apache License, Version 2.0
#
# Original Confluent sample modified for use with Azure Event Hubs for Apache Kafka Ecosystems

from azure.identity import DefaultAzureCredential
from confluent_kafka import Producer
import sys
from functools import partial
import argparse
import config_utils


def oauth_cb(cred, namespace_fqdn, config):
# confluent_kafka requires an oauth callback function to return (str, float) with the values of (<access token>, <expiration date in seconds from epoch>)

# cred: an Azure identity library credential object. Ex: an instance of DefaultAzureCredential, ManagedIdentityCredential, etc
# namespace_fqdn: the FQDN for the target Event Hubs namespace. Ex: 'mynamespace.servicebus.windows.net'
# config: confluent_kafka passes the value of 'sasl.oauthbearer.config' as the config param

access_token = cred.get_token('https://%s/.default' % namespace_fqdn)
return access_token.token, access_token.expires_on


if __name__ == '__main__':
if len(sys.argv) != 3:
sys.stderr.write('Usage: %s <eventhubs-namespace> <topic>\n' % sys.argv[0])
sys.exit(1)

namespace = sys.argv[1]
topic = sys.argv[2]

# Azure credential
# See https://docs.microsoft.com/en-us/azure/developer/python/sdk/authentication-overview
cred = DefaultAzureCredential()

# Producer configuration
# See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
conf = {
'bootstrap.servers': '%s:9093' % namespace,

# Required OAuth2 configuration properties
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'OAUTHBEARER',
# the resulting oauth_cb must accept a single `config` parameter, so we use partial to bind the namespace/identity to our function
'oauth_cb': partial(oauth_cb, cred, namespace),
}

def produce_workload(conf, topic, num_messages):
# Create Producer instance
p = Producer(**conf)


# Optional per-message delivery callback (triggered by poll() or flush())
# when a message has been successfully delivered or permanently
# failed delivery (after retries).
Expand All @@ -59,10 +26,10 @@ def delivery_callback(err, msg):
sys.stderr.write('%% Message failed delivery: %s\n' % err)
else:
sys.stderr.write('%% Message delivered to %s [%d] @ %d\n' %
(msg.topic(), msg.partition(), msg.offset()))
(msg.topic(), msg.partition(), msg.offset()))

# Write 1-100 to topic
for i in range(0, 100):
# Write 1-num_messages to topic
for i in range(num_messages):
try:
p.produce(topic, str(i), callback=delivery_callback)
except BufferError:
Expand All @@ -78,3 +45,25 @@ def delivery_callback(err, msg):
# Wait until all messages have been delivered
sys.stderr.write('%% Waiting for %d deliveries\n' % len(p))
p.flush()


def parse_producer_args():
parser = argparse.ArgumentParser(description='Process command line arguments.')
parser.add_argument('namespace', help='Eventhubs namespace')
parser.add_argument('topic', help='Topic or Event Hub')
parser.add_argument('--mode', default='azure',
choices=['azure', 'oidc', 'opaque'],
help='Token request callback implementation logic')
parser.add_argument('--num-messages', type=int, default=100,
help='Number of messages to be produced')

args = parser.parse_args()
return args.namespace, args.topic, args.mode, args.num_messages


if __name__ == '__main__':
namespace, topic, mode, num_messages = parse_producer_args()

conf = config_utils.get_config(namespace, mode)

produce_workload(conf, topic, num_messages=num_messages)