Skip to content

Commit

Permalink
Extended Python Kafka oauth examples with opaque and oidc examples:
Browse files Browse the repository at this point in the history
- Added an option to run producer, consumer with --mode=[oidc, azure, opaque]
- oidc corresponds to KIP-768 and decoding JWT with the usage of exp claim
- opaque utilizes expires_in field without azure.identity dependency

Signed-off-by: Hlib Pylypets <[email protected]>
  • Loading branch information
Pilipets committed Jun 5, 2023
1 parent c7c395d commit 1d9a2d8
Show file tree
Hide file tree
Showing 3 changed files with 189 additions and 117 deletions.
117 changes: 117 additions & 0 deletions tutorials/oauth/python/config_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
#!/usr/bin/env python
#
# Copyright (c) Microsoft Corporation. All rights reserved.
# Copyright 2023 Azure Inc.
# Licensed under the MIT License.
# Licensed under the Apache License, Version 2.0
#

from azure.identity import DefaultAzureCredential
from functools import partial
import os
import requests
import time


TENANT_ID = os.environ.get('AZURE_TENANT_ID')
CLIENT_ID = os.environ.get('AZURE_CLIENT_ID')
CLIENT_SECRET = os.environ.get('AZURE_CLIENT_SECRET')


def get_oauth_config(namespace):
conf = {
'bootstrap.servers': '%s:9093' % namespace,

# Required OAuth2 configuration properties
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'OAUTHBEARER'
}
return conf


def get_azure_config(namespace):
def oauth_cb(cred, namespace_fqdn, config):
# confluent_kafka requires an oauth callback function to return (str, float) with the values of (<access token>, <expiration date in seconds from epoch>)

# cred: an Azure identity library credential object. Ex: an instance of DefaultAzureCredential, ManagedIdentityCredential, etc
# namespace_fqdn: the FQDN for the target Event Hubs namespace. Ex: 'mynamespace.servicebus.windows.net'
# config: confluent_kafka passes the value of 'sasl.oauthbearer.config' as the config param

access_token = cred.get_token('https://%s/.default' % namespace_fqdn)
return access_token.token, access_token.expires_on

# Azure credential
# See https://docs.microsoft.com/en-us/azure/developer/python/sdk/authentication-overview
cred = DefaultAzureCredential()

# Producer configuration
# See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
conf = get_oauth_config(namespace)

# the resulting oauth_cb must accept a single `config` parameter, so we use partial to bind the namespace/identity to our function
conf['oauth_cb'] = partial(oauth_cb, cred, namespace)
return conf


# Using Kafka oauthbearer OIDC semantics that decodes JWT tokens and uses exp claim
# KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC
#
def get_oidc_config(namespace):
conf = get_oauth_config(namespace)
conf.update({
'sasl.oauthbearer.method': 'oidc',
'sasl.oauthbearer.client.id': CLIENT_ID,
'sasl.oauthbearer.client.secret': CLIENT_SECRET,
'sasl.oauthbearer.token.endpoint.url': 'https://login.microsoftonline.com/%s/oauth2/v2.0/token' % TENANT_ID,
'sasl.oauthbearer.scope': 'https://%s/.default' % namespace,
})

return conf


# Using expires_in field from the token response to treat OAUTHBEARER as opaque
# and avoid decoding JWT by utilizing RFC9068, RFC6749 concepts.
# https://datatracker.ietf.org/doc/html/rfc9068#name-privacy-considerations
# https://www.rfc-editor.org/rfc/rfc6749#section-4.2.2
#
def get_opaque_config(namespace):
def oauth_cb(config):
# take the time before request first
token_exp_time = int(time.time())

token_resp = requests.post(
"https://login.microsoftonline.com/%s/oauth2/v2.0/token" % TENANT_ID,
auth=(CLIENT_ID, CLIENT_SECRET),
data={
'grant_type': 'client_credentials',
'scope': 'https://%s/.default' % namespace
}
)

token_resp = token_resp.json()

# add expires_in value which is a token validity time
# in seconds from the time the response was generated
#
token_exp_time += int(token_resp['expires_in'])

return token_resp['access_token'], token_exp_time

conf = get_oauth_config(namespace)
conf['oauth_cb'] = oauth_cb

return conf



# Returns producer configs for azure, opaque, oidc modes
#
def get_config(namespace, mode):
if mode == 'azure':
conf = get_azure_config(namespace)
elif mode == 'oidc':
conf = get_oidc_config(namespace)
elif mode == 'opaque':
conf = get_opaque_config(namespace)

return conf
118 changes: 42 additions & 76 deletions tutorials/oauth/python/consumer.py
Original file line number Diff line number Diff line change
@@ -1,94 +1,22 @@
#!/usr/bin/env python
#
# Copyright (c) Microsoft Corporation. All rights reserved.
# Copyright 2016 Confluent Inc.
# Copyright 2023 Confluent Inc.
# Licensed under the MIT License.
# Licensed under the Apache License, Version 2.0
#
# Original Confluent sample modified for use with Azure Event Hubs for Apache Kafka Ecosystems

from azure.identity import DefaultAzureCredential
from confluent_kafka import Consumer, KafkaException
import sys
import getopt
import argparse
import json
import logging
from functools import partial
from pprint import pformat
import config_utils


def stats_cb(stats_json_str):
stats_json = json.loads(stats_json_str)
print('\nKAFKA Stats: {}\n'.format(pformat(stats_json)))


def oauth_cb(cred, namespace_fqdn, config):
# confluent_kafka requires an oauth callback function to return (str, float) with the values of (<access token>, <expiration date in seconds from epoch>)

# cred: an Azure identity library credential object. Ex: an instance of DefaultAzureCredential, ManagedIdentityCredential, etc
# namespace_fqdn: the FQDN for the target Event Hubs namespace. Ex: 'mynamespace.servicebus.windows.net'
# config: confluent_kafka passes the value of 'sasl.oauthbearer.config' as the config param

access_token = cred.get_token('https://%s/.default' % namespace_fqdn)
return access_token.token, access_token.expires_on


def print_usage_and_exit(program_name):
sys.stderr.write(
'Usage: %s [options..] <eventhubs-namespace> <group> <topic1> <topic2> ..\n' % program_name)
options = '''
Options:
-T <intvl> Enable client statistics at specified interval (ms)
'''
sys.stderr.write(options)
sys.exit(1)


if __name__ == '__main__':
optlist, argv = getopt.getopt(sys.argv[1:], 'T:')
if len(argv) < 3:
print_usage_and_exit(sys.argv[0])

namespace = argv[0]
group = argv[1]
topics = argv[2:]

# Azure credential
# See https://docs.microsoft.com/en-us/azure/developer/python/sdk/authentication-overview
cred = DefaultAzureCredential()

# Consumer configuration
# See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
conf = {
'bootstrap.servers': '%s:9093' % namespace,
'group.id': group,
'session.timeout.ms': 6000,
'auto.offset.reset': 'earliest',

# Required OAuth2 configuration properties
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'OAUTHBEARER',
# the resulting oauth_cb must accept a single `config` parameter, so we use partial to bind the namespace/identity to our function
'oauth_cb': partial(oauth_cb, cred, namespace),
}

# Check to see if -T option exists
for opt in optlist:
if opt[0] != '-T':
continue
try:
intval = int(opt[1])
except ValueError:
sys.stderr.write("Invalid option value for -T: %s\n" % opt[1])
sys.exit(1)

if intval <= 0:
sys.stderr.write("-T option value needs to be larger than zero: %s\n" % opt[1])
sys.exit(1)

conf['stats_cb'] = stats_cb
conf['statistics.interval.ms'] = int(opt[1])

def consume_workload(conf, topics):
# Create logger for consumer (logs will be emitted when poll() is called)
logger = logging.getLogger('consumer')
logger.setLevel(logging.DEBUG)
Expand Down Expand Up @@ -127,3 +55,41 @@ def print_assignment(consumer, partitions):
finally:
# Close down consumer to commit final offsets.
c.close()


def parse_consumer_args():
parser = argparse.ArgumentParser(description='Process command line arguments.')
parser.add_argument('namespace', help='Eventhubs namespace')
parser.add_argument('group', help='Group')
parser.add_argument('topics', nargs='+', help='Topic1, Topic2, ...')
parser.add_argument('-T', type=int, help='Enable client statistics at specified interval (ms)')
parser.add_argument('--mode', default='azure', choices=['azure', 'oidc', 'opaque'], help='Optional confluent producer configuration mode - azure, oidc, opaque')

args = parser.parse_args()

if args.T and args.T <= 0:
sys.stderr.write("-T option value needs to be larger than zero: %s\n" % args.T)
sys.exit(1)

return args.namespace, args.group, args.topics, args.T, args.mode


if __name__ == '__main__':
namespace, group, topics, T, mode = parse_consumer_args()

conf = config_utils.get_config(namespace, mode)
conf.update({
'group.id': group,
'session.timeout.ms': 6000,
'auto.offset.reset': 'earliest'
})

def stats_cb(stats_json_str):
stats_json = json.loads(stats_json_str)
print('\nKAFKA Stats: {}\n'.format(pformat(stats_json)))

if T:
conf['stats_cb'] = stats_cb
conf['statistics.interval.ms'] = T

consume_workload(conf, topics)
71 changes: 30 additions & 41 deletions tutorials/oauth/python/producer.py
Original file line number Diff line number Diff line change
@@ -1,56 +1,23 @@
#!/usr/bin/env python
#
# Copyright (c) Microsoft Corporation. All rights reserved.
# Copyright 2016 Confluent Inc.
# Copyright 2023 Confluent Inc.
# Licensed under the MIT License.
# Licensed under the Apache License, Version 2.0
#
# Original Confluent sample modified for use with Azure Event Hubs for Apache Kafka Ecosystems

from azure.identity import DefaultAzureCredential
from confluent_kafka import Producer
import sys
from functools import partial
import argparse
import config_utils


def oauth_cb(cred, namespace_fqdn, config):
# confluent_kafka requires an oauth callback function to return (str, float) with the values of (<access token>, <expiration date in seconds from epoch>)

# cred: an Azure identity library credential object. Ex: an instance of DefaultAzureCredential, ManagedIdentityCredential, etc
# namespace_fqdn: the FQDN for the target Event Hubs namespace. Ex: 'mynamespace.servicebus.windows.net'
# config: confluent_kafka passes the value of 'sasl.oauthbearer.config' as the config param

access_token = cred.get_token('https://%s/.default' % namespace_fqdn)
return access_token.token, access_token.expires_on


if __name__ == '__main__':
if len(sys.argv) != 3:
sys.stderr.write('Usage: %s <eventhubs-namespace> <topic>\n' % sys.argv[0])
sys.exit(1)

namespace = sys.argv[1]
topic = sys.argv[2]

# Azure credential
# See https://docs.microsoft.com/en-us/azure/developer/python/sdk/authentication-overview
cred = DefaultAzureCredential()

# Producer configuration
# See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
conf = {
'bootstrap.servers': '%s:9093' % namespace,

# Required OAuth2 configuration properties
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'OAUTHBEARER',
# the resulting oauth_cb must accept a single `config` parameter, so we use partial to bind the namespace/identity to our function
'oauth_cb': partial(oauth_cb, cred, namespace),
}

def produce_workload(conf, topic, num_messages):
# Create Producer instance
p = Producer(**conf)


# Optional per-message delivery callback (triggered by poll() or flush())
# when a message has been successfully delivered or permanently
# failed delivery (after retries).
Expand All @@ -59,10 +26,10 @@ def delivery_callback(err, msg):
sys.stderr.write('%% Message failed delivery: %s\n' % err)
else:
sys.stderr.write('%% Message delivered to %s [%d] @ %d\n' %
(msg.topic(), msg.partition(), msg.offset()))
(msg.topic(), msg.partition(), msg.offset()))

# Write 1-100 to topic
for i in range(0, 100):
# Write 1-records_num to topic
for i in range(num_messages):
try:
p.produce(topic, str(i), callback=delivery_callback)
except BufferError:
Expand All @@ -78,3 +45,25 @@ def delivery_callback(err, msg):
# Wait until all messages have been delivered
sys.stderr.write('%% Waiting for %d deliveries\n' % len(p))
p.flush()


def parse_producer_args():
parser = argparse.ArgumentParser(description='Process command line arguments.')
parser.add_argument('namespace', help='Eventhubs namespace')
parser.add_argument('topic', help='Topic or Event Hub')
parser.add_argument('--mode', default='azure',
choices=['azure', 'oidc', 'opaque'],
help='Optional confluent producer configuration mode - azure, oidc, opaque')
parser.add_argument('--num-messages', type=int, default=100,
help='Optional number of messages to be produced')

args = parser.parse_args()
return args.namespace, args.topic, args.mode, args.num_messages


if __name__ == '__main__':
namespace, topic, mode, num_messages = parse_producer_args()

conf = config_utils.get_config(namespace, mode)

produce_workload(conf, topic, num_messages=num_messages)

0 comments on commit 1d9a2d8

Please sign in to comment.