Skip to content

Commit

Permalink
Added more Python Kafka oauth examples - producer + consumer:
Browse files Browse the repository at this point in the history
1. OIDC SASL_SSL
2. SASL_SSL with custom token refresh callback without azure.identity dependency

Signed-off-by: Hlib Pylypets <[email protected]>
  • Loading branch information
Pilipets committed Dec 10, 2022
1 parent c7c395d commit 6078782
Show file tree
Hide file tree
Showing 2 changed files with 202 additions and 0 deletions.
107 changes: 107 additions & 0 deletions tutorials/oauth/python/more_examples/consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import os
import sys
import getopt
import time
import requests
from confluent_kafka import Consumer, KafkaException


TENANT_ID = os.environ.get('AZURE_TENANT_ID')
CLIENT_ID = os.environ.get('AZURE_CLIENT_ID')
CLIENT_SECRET = os.environ.get('AZURE_CLIENT_SECRET')


def get_oidc_config(namespace, group):
conf = {
'bootstrap.servers': '%s:9093' % namespace,
'group.id': group,
'session.timeout.ms': 6000,
'auto.offset.reset': 'earliest',
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'OAUTHBEARER',
'sasl.oauthbearer.method': 'oidc',
'sasl.oauthbearer.client.id': CLIENT_ID,
'sasl.oauthbearer.client.secret': CLIENT_SECRET,
'sasl.oauthbearer.token.endpoint.url': 'https://login.microsoftonline.com/%s/oauth2/v2.0/token' % TENANT_ID,
'sasl.oauthbearer.scope': 'https://%s/.default' % namespace,
}

return conf


def get_cb_config(namespace, group):
def oauth_cb(config):
resp = requests.post(
"https://login.microsoftonline.com/%s/oauth2/v2.0/token" % TENANT_ID,
auth=(CLIENT_ID, CLIENT_SECRET),
data={
'grant_type': 'client_credentials',
'scope': 'https://%s/.default' % namespace
}
)

token = resp.json()
return token['access_token'], time.time() + float(token['expires_in'])

conf = {
'bootstrap.servers': '%s:9093' % namespace,
'group.id': group,
'session.timeout.ms': 6000,
'auto.offset.reset': 'earliest',
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'OAUTHBEARER',
'oauth_cb': oauth_cb,
}

return conf


if __name__ == '__main__':
# Parse options
#
optlist, argv = getopt.getopt(sys.argv[1:], '', ['oidc'])
if len(argv) < 3:
sys.stderr.write(
'Usage: %s [--oidc] <eventhubs-namespace> <group> <topic1> <topic2> ..\n' % sys.argv[0])
sys.exit(1)

namespace = argv[0]
group = argv[1]
topics = argv[2:]

# Get proper consumer configuration
#
use_oidc = any(opt[0] == '--oidc' for opt in optlist)

if use_oidc:
conf = get_oidc_config(namespace, group)
else:
conf = get_cb_config(namespace, group)

# Run consumer
#
c = Consumer(conf)

def print_assignment(consumer, partitions):
print('Assignment:', partitions)

c.subscribe(topics, on_assign=print_assignment)

try:
while True:
msg = c.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
raise KafkaException(msg.error())
else:
sys.stderr.write('%% %s [%d] at offset %d with key %s:\n' %
(msg.topic(), msg.partition(), msg.offset(),
str(msg.key())))
print(msg.value())

except KeyboardInterrupt:
sys.stderr.write('%% Aborted by user\n')

finally:
c.close()
95 changes: 95 additions & 0 deletions tutorials/oauth/python/more_examples/producer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import sys
import time
import os
import requests
import getopt
from confluent_kafka import Producer


TENANT_ID = os.environ.get('AZURE_TENANT_ID')
CLIENT_ID = os.environ.get('AZURE_CLIENT_ID')
CLIENT_SECRET = os.environ.get('AZURE_CLIENT_SECRET')


def get_oidc_config(namespace):
conf = {
'bootstrap.servers': '%s:9093' % namespace,
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'OAUTHBEARER',
'sasl.oauthbearer.method': 'oidc',
'sasl.oauthbearer.client.id': CLIENT_ID,
'sasl.oauthbearer.client.secret': CLIENT_SECRET,
'sasl.oauthbearer.token.endpoint.url': 'https://login.microsoftonline.com/%s/oauth2/v2.0/token' % TENANT_ID,
'sasl.oauthbearer.scope': 'https://%s/.default' % namespace,
}

return conf


def get_cb_config(namespace):
def oauth_cb(config):
resp = requests.post(
"https://login.microsoftonline.com/%s/oauth2/v2.0/token" % TENANT_ID,
auth=(CLIENT_ID, CLIENT_SECRET),
data={
'grant_type': 'client_credentials',
'scope': 'https://%s/.default' % namespace
}
)

token = resp.json()
return token['access_token'], time.time() + float(token['expires_in'])

conf = {
'bootstrap.servers': '%s:9093' % namespace,
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'OAUTHBEARER',
'oauth_cb': oauth_cb,
}

return conf


if __name__ == '__main__':
# Parse options
#
optlist, argv = getopt.getopt(sys.argv[1:], '', ['oidc'])
if len(argv) != 2:
sys.stderr.write(
'Usage: %s [--oidc] <eventhubs-namespace> <topic>\n' % sys.argv[0])
sys.exit(1)

namespace = argv[0]
topic = argv[1]

# Get proper producer configuration
#
use_oidc = any(opt[0] == '--oidc' for opt in optlist)

if use_oidc:
conf = get_oidc_config(namespace)
else:
conf = get_cb_config(namespace)

# Run producer
#
p = Producer(**conf)

def delivery_callback(err, msg):
if err:
sys.stderr.write('%% Message failed delivery: %s\n' % err)
else:
sys.stderr.write('%% Message delivered to %s [%d] @ %d\n' %
(msg.topic(), msg.partition(), msg.offset()))

for i in range(0, 1):
try:
p.produce(topic, str(i), callback=delivery_callback)
except BufferError:
sys.stderr.write('%% Local producer queue is full (%d messages awaiting delivery): try again\n' %
len(p))

p.poll(0)

sys.stderr.write('%% Waiting for %d deliveries\n' % len(p))
p.flush()

0 comments on commit 6078782

Please sign in to comment.