diff --git a/openidc_client/__init__.py b/openidc_client/__init__.py index d8ff31d..b430ba2 100644 --- a/openidc_client/__init__.py +++ b/openidc_client/__init__.py @@ -32,6 +32,7 @@ import logging from threading import Lock import time + try: from StringIO import StringIO except ImportError: @@ -39,6 +40,7 @@ import secrets import socket import os + try: from urllib import urlencode except ImportError: @@ -47,7 +49,7 @@ import webbrowser from wsgiref import simple_server -import requests +import httpx import sys from openidc_client import release @@ -74,9 +76,19 @@ class OpenIDCClient(object): # refresh the token. # refresh_token: The token we can use to refresh the access token # scopes: A list of scopes that we had requested with the token - def __init__(self, app_identifier, id_provider, id_provider_mapping, - client_id, client_secret=None, use_post=False, useragent=None, - cachedir=None, printfd=sys.stdout, use_pkce=False): + def __init__( + self, + app_identifier, + id_provider, + id_provider_mapping, + client_id, + client_secret=None, + use_post=False, + useragent=None, + cachedir=None, + printfd=sys.stdout, + use_pkce=False, + ): """Client for interacting with web services relying on OpenID Connect. :param app_identifier: Identifier for storage of retrieved tokens @@ -105,9 +117,8 @@ def __init__(self, app_identifier, id_provider, id_provider_mapping, self.idp_mapping = id_provider_mapping self.client_id = client_id self.client_secret = client_secret - self.useragent = useragent or 'python-openid-client/%s' % \ - release.VERSION - self.cachedir = os.path.expanduser(cachedir or '~/.openidc') + self.useragent = useragent or "python-openid-client/%s" % release.VERSION + self.cachedir = os.path.expanduser(cachedir or "~/.openidc") self.last_returned_uuid = None self.problem_reported = False self.token_to_try = None @@ -142,13 +153,13 @@ def get_token(self, scopes, new_token=True): :returns: String bearer token if possible or None """ if not isinstance(scopes, list): - raise ValueError('Scopes must be a list') + raise ValueError("Scopes must be a list") token = self._get_token_with_scopes(scopes) if token: # If we had a valid token, use that self.last_returned_uuid = token[0] self.problem_reported = False - return token[1]['access_token'] + return token[1]["access_token"] elif not new_token: return None @@ -157,7 +168,7 @@ def get_token(self, scopes, new_token=True): if uuid: self.last_returned_uuid = uuid self.problem_reported = False - return self._cache[uuid]['access_token'] + return self._cache[uuid]["access_token"] def report_token_issue(self): """Report an error with the last token that was returned. @@ -174,7 +185,7 @@ def report_token_issue(self): token was lacking specific permissions. """ if not self.last_returned_uuid: - raise Exception('Cannot report issue before requesting token') + raise Exception("Cannot report issue before requesting token") if self.problem_reported: # We were reported an issue before. Let's just remove this token. self._delete_token(self.last_returned_uuid) @@ -185,10 +196,10 @@ def report_token_issue(self): return None else: self.problem_reported = True - return self._cache[self.last_returned_uuid]['access_token'] + return self._cache[self.last_returned_uuid]["access_token"] def send_request(self, *args, **kwargs): - """Make an python-requests POST request. + """Make an httpx POST request. Allarguments and keyword arguments are like the arguments to requests, except for `scopes`, `new_token` and `auto_refresh` keyword arguments. @@ -206,10 +217,10 @@ def send_request(self, *args, **kwargs): """ ckwargs = copy(kwargs) - scopes = ckwargs.pop('scopes') - new_token = ckwargs.pop('new_token', True) - auto_refresh = ckwargs.pop('auto_refresh', True) - method = ckwargs.pop('http_method', 'POST') + scopes = ckwargs.pop("scopes") + new_token = ckwargs.pop("new_token", True) + auto_refresh = ckwargs.pop("auto_refresh", True) + method = ckwargs.pop("http_method", "POST") is_retry = False if self.token_to_try: @@ -222,21 +233,20 @@ def send_request(self, *args, **kwargs): return None if self.use_post: - if 'json' in ckwargs: - raise ValueError('Cannot provide json in a post call') - if method not in ['POST']: - raise ValueError('Cannot use POST tokens in %s method' % - method) - - if 'data' not in ckwargs: - ckwargs['data'] = {} - ckwargs['data']['access_token'] = token + if "json" in ckwargs: + raise ValueError("Cannot provide json in a post call") + if method not in ["POST"]: + raise ValueError("Cannot use POST tokens in %s method" % method) + + if "data" not in ckwargs: + ckwargs["data"] = {} + ckwargs["data"]["access_token"] = token else: - if 'headers' not in ckwargs: - ckwargs['headers'] = {} - ckwargs['headers']['Authorization'] = 'Bearer %s' % token + if "headers" not in ckwargs: + ckwargs["headers"] = {} + ckwargs["headers"]["Authorization"] = "Bearer %s" % token - resp = requests.request(method, *args, **ckwargs) + resp = httpx.request(method, *args, **ckwargs) if resp.status_code == 401 and not is_retry: if not auto_refresh: return resp @@ -259,24 +269,24 @@ def _cachefile(self): This assures that whenever this file is touched, the cache lock is held """ assert self._cache_lock.locked() - return os.path.join(self.cachedir, 'oidc_%s.json' % self.app_id) + return os.path.join(self.cachedir, "oidc_%s.json" % self.app_id) def __refresh_cache(self): """Refreshes the self._cache from the cache on disk. Requires cache_lock to be held by caller.""" assert self._cache_lock.locked() - self.debug('Refreshing cache') + self.debug("Refreshing cache") if not os.path.isdir(self.cachedir): - self.debug('Creating directory') + self.debug("Creating directory") os.makedirs(self.cachedir) if not os.path.exists(self._cachefile): - self.debug('Creating file') - with open(self._cachefile, 'w') as f: + self.debug("Creating file") + with open(self._cachefile, "w") as f: f.write(json.dumps({})) - with open(self._cachefile, 'r') as f: + with open(self._cachefile, "r") as f: self._cache = json.loads(f.read()) - self.debug('Loaded %i tokens', len(self._cache)) + self.debug("Loaded %i tokens", len(self._cache)) def _refresh_cache(self): """Refreshes the self._cache from the cache on disk. @@ -290,8 +300,8 @@ def __write_cache(self): Requires cache_lock to be held by caller.""" assert self._cache_lock.locked() - self.debug('Writing cache with %i tokens', len(self._cache)) - with open(self._cachefile, 'w') as f: + self.debug("Writing cache with %i tokens", len(self._cache)) + with open(self._cachefile, "w") as f: f.write(json.dumps(self._cache)) def _add_token(self, token): @@ -302,7 +312,7 @@ def _add_token(self, token): :param token: Dict of the token to be added to the cache """ uuid = uuidgen().hex - self.debug('Adding token %s to cache', uuid) + self.debug("Adding token %s to cache", uuid) with self._cache_lock: self.__refresh_cache() self._cache[uuid] = token @@ -317,8 +327,7 @@ def _update_token(self, uuid, toupdate): :param token: UUID of the token to be updated :param toupdate: Dict indicating which fields need to be updated """ - self.debug('Updating token %s in cache, fields %s', - uuid, toupdate.keys()) + self.debug("Updating token %s in cache, fields %s", uuid, toupdate.keys()) with self._cache_lock: self.__refresh_cache() if uuid not in self._cache: @@ -334,15 +343,15 @@ def _delete_token(self, uuid): :param uuid: UUID of the token to be removed from cache """ - self.debug('Removing token %s from cache', uuid) + self.debug("Removing token %s from cache", uuid) with self._cache_lock: self.__refresh_cache() if uuid in self._cache: - self.debug('Removing token') + self.debug("Removing token") del self._cache[uuid] self.__write_cache() else: - self.debug('Token was already gone') + self.debug("Token was already gone") def _get_token_with_scopes(self, scopes): """Searches the cache for any tokens that have the requested scopes. @@ -357,28 +366,30 @@ def _get_token_with_scopes(self, scopes): found """ possible_token = None - self.debug('Trying to get token with scopes %s', scopes) + self.debug("Trying to get token with scopes %s", scopes) for uuid in self._cache: - self.debug('Checking %s', uuid) + self.debug("Checking %s", uuid) token = self._cache[uuid] - if token['idp'] != self.idp: - self.debug('Incorrect idp') + if token["idp"] != self.idp: + self.debug("Incorrect idp") continue - if not set(scopes).issubset(set(token['scopes'])): - self.debug('Missing scope: %s not subset of %s', - set(scopes), - set(token['scopes'])) + if not set(scopes).issubset(set(token["scopes"])): + self.debug( + "Missing scope: %s not subset of %s", + set(scopes), + set(token["scopes"]), + ) continue - if token['expires_at'] < time.time(): + if token["expires_at"] < time.time(): # This is a token that's supposed to still be valid, prefer it # over any others we have - self.debug('Not yet expired, returning') + self.debug("Not yet expired, returning") return uuid, token # This is a token that may or may not still be valid - self.debug('Possible') + self.debug("Possible") possible_token = (uuid, token) if possible_token: - self.debug('Returning possible token') + self.debug("Returning possible token") return possible_token def _idp_url(self, method): @@ -391,8 +402,7 @@ def _idp_url(self, method): if method in self.idp_mapping: return self.idp + self.idp_mapping[method] else: - return ValueError('Idp Mapping did not include path for %s' - % method) + return ValueError("Idp Mapping did not include path for %s" % method) def _refresh_token(self, uuid): """Tries to refresh a token and put the refreshed token in self._cache @@ -405,39 +415,41 @@ def _refresh_token(self, uuid): :returns: True if the token was succesfully refreshed, False otherwise """ oldtoken = self._cache[uuid] - if not oldtoken['refresh_token']: + if not oldtoken["refresh_token"]: self.debug("Unable to refresh: no refresh token present") return False - self.debug('Refreshing token %s', uuid) - data = {'client_id': self.client_id, - 'grant_type': 'refresh_token', - 'refresh_token': oldtoken['refresh_token']} + self.debug("Refreshing token %s", uuid) + data = { + "client_id": self.client_id, + "grant_type": "refresh_token", + "refresh_token": oldtoken["refresh_token"], + } if self.client_secret: - data['client_secret'] = self.client_secret + data["client_secret"] = self.client_secret - resp = requests.request( - 'POST', - self._idp_url('Token'), - data=data) + resp = httpx.request("POST", self._idp_url("Token"), data=data) resp.raise_for_status() resp = resp.json() - if 'error' in resp: - self.debug('Unable to refresh, error: %s', resp['error']) + if "error" in resp: + self.debug("Unable to refresh, error: %s", resp["error"]) return False self._update_token( uuid, - {'access_token': resp['access_token'], - 'token_type': resp['token_type'], - 'refresh_token': resp['refresh_token'], - 'expires_at': time.time() + resp['expires_in']}) - self.debug('Refreshed until %s', self._cache[uuid]['expires_at']) + { + "access_token": resp["access_token"], + "token_type": resp["token_type"], + "refresh_token": resp["refresh_token"], + "expires_at": time.time() + resp["expires_in"], + }, + ) + self.debug("Refreshed until %s", self._cache[uuid]["expires_at"]) return True def _get_server(self, app): """This function returns a SimpleServer with an available WEB_PORT.""" for port in WEB_PORTS: try: - server = simple_server.make_server('0.0.0.0', port, app) + server = simple_server.make_server("0.0.0.0", port, app) return server except socket.error: # This port did not work. Switch to next one @@ -458,46 +470,49 @@ def _get_new_token(self, scopes): the valid cache, and then return the UUID. If the user cancelled (or we got another error), we will return None. """ - def _token_app(environ, start_response): - query = environ['QUERY_STRING'] - split = query.split('&') - kv = dict([v.split('=', 1) for v in split]) - if 'error' in kv: - self.debug('Error code returned: %s (%s)', - kv['error'], kv.get('error_description')) + def _token_app(environ, start_response): + query = environ["QUERY_STRING"] + split = query.split("&") + kv = dict([v.split("=", 1) for v in split]) + + if "error" in kv: + self.debug( + "Error code returned: %s (%s)", + kv["error"], + kv.get("error_description"), + ) self._retrieved_code = False else: - self._retrieved_code = kv['code'] + self._retrieved_code = kv["code"] # Just return a message - start_response('200 OK', [('Content-Type', 'text/plain')]) - return [u'You can close this window and return to the CLI'.encode('ascii')] + start_response("200 OK", [("Content-Type", "text/plain")]) + return [u"You can close this window and return to the CLI".encode("ascii")] self._retrieved_code = None server = self._get_server(_token_app) if not server: - raise Exception('We were unable to instantiate a webserver') - return_uri = 'http://localhost:%i/' % server.socket.getsockname()[1] + raise Exception("We were unable to instantiate a webserver") + return_uri = "http://localhost:%i/" % server.socket.getsockname()[1] rquery = {} - rquery['scope'] = ' '.join(scopes) - rquery['response_type'] = 'code' - rquery['client_id'] = self.client_id - rquery['redirect_uri'] = return_uri - rquery['response_mode'] = 'query' + rquery["scope"] = " ".join(scopes) + rquery["response_type"] = "code" + rquery["client_id"] = self.client_id + rquery["redirect_uri"] = return_uri + rquery["response_mode"] = "query" if self._use_pkce: code_verifier = secrets.token_urlsafe(PKCE_CODE_VERIFIER_LENGTH) code_challenge = urlsafe_b64encode( - sha256(code_verifier.encode('utf-8')).digest() + sha256(code_verifier.encode("utf-8")).digest() ) - rquery['code_challenge'] = code_challenge.decode('utf-8').rstrip('=') - rquery['code_challenge_method'] = 'S256' + rquery["code_challenge"] = code_challenge.decode("utf-8").rstrip("=") + rquery["code_challenge_method"] = "S256" query = urlencode(rquery) - authz_url = '%s?%s' % (self._idp_url('Authorization'), query) - print('Please visit %s to grant authorization' % authz_url, - file=self._printfd) + authz_url = "%s?%s" % (self._idp_url("Authorization"), query) + print("Please visit %s to grant authorization" % authz_url, file=self._printfd) webbrowser.open(authz_url) server.handle_request() server.server_close() @@ -506,35 +521,35 @@ def _token_app(environ, start_response): if self._retrieved_code is False: # The user cancelled the request self._retrieved_code = None - self.debug('User cancelled') + self.debug("User cancelled") return None - self.debug('We got an authorization code!') - data = {'client_id': self.client_id, - 'grant_type': 'authorization_code', - 'redirect_uri': return_uri, - 'code': self._retrieved_code} + self.debug("We got an authorization code!") + data = { + "client_id": self.client_id, + "grant_type": "authorization_code", + "redirect_uri": return_uri, + "code": self._retrieved_code, + } if self.client_secret: - data['client_secret'] = self.client_secret + data["client_secret"] = self.client_secret if self._use_pkce: - data['code_verifier'] = code_verifier + data["code_verifier"] = code_verifier - resp = requests.request( - 'POST', - self._idp_url('Token'), - data=data) + resp = httpx.request("POST", self._idp_url("Token"), data=data) resp.raise_for_status() self._retrieved_code = None resp = resp.json() - if 'error' in resp: - self.debug('Error exchanging authorization code: %s', - resp['error']) + if "error" in resp: + self.debug("Error exchanging authorization code: %s", resp["error"]) return None - token = {'access_token': resp['access_token'], - 'refresh_token': resp.get('refresh_token'), - 'expires_at': time.time() + int(resp['expires_in']), - 'idp': self.idp, - 'token_type': resp['token_type'], - 'scopes': scopes} + token = { + "access_token": resp["access_token"], + "refresh_token": resp.get("refresh_token"), + "expires_at": time.time() + int(resp["expires_in"]), + "idp": self.idp, + "token_type": resp["token_type"], + "scopes": scopes, + } # AND WE ARE DONE! \o/ return self._add_token(token) diff --git a/openidc_client/requestsauth.py b/openidc_client/requestsauth.py index e2cf2ab..25e32e6 100644 --- a/openidc_client/requestsauth.py +++ b/openidc_client/requestsauth.py @@ -23,6 +23,7 @@ """Python-Requests AuthBase wrapping OpenIDCClient.""" +import httpx import requests @@ -37,7 +38,7 @@ def handle_response(self, response, **kwargs): new_token = self.client.report_token_issue() if not new_token: return response - response.request.headers['Authorization'] = 'Bearer %s' % new_token + response.request.headers["Authorization"] = "Bearer %s" % new_token # Consume the content so we can reuse the connection response.content @@ -51,10 +52,47 @@ def handle_response(self, response, **kwargs): return response def __call__(self, request): - request.register_hook('response', self.handle_response) - token = self.client.get_token(self.scopes, - new_token=self.new_token) + request.register_hook("response", self.handle_response) + token = self.client.get_token(self.scopes, new_token=self.new_token) if token is None: - raise RuntimeError('No token received') - request.headers['Authorization'] = 'Bearer %s' % token + raise RuntimeError("No token received") + request.headers["Authorization"] = "Bearer %s" % token return request + + +class OpenIDCClientAutherHttpx(httpx.Auth): + def __init__(self, oidcclient, scopes, new_token=True): + self.client = oidcclient + self.scopes = scopes + self.new_token = new_token + + def handle_response(self, response, **kwargs): + if response.status_code in [401, 403]: + new_token = self.client.report_token_issue() + if not new_token: + return response + response.request.headers["Authorization"] = "Bearer %s" % new_token + + # Consume the content so we can reuse the connection + response.content + response.raw.release_conn() + + r = response.connection.send(response.request) + r.history.append(response) + + return r + else: + return response + + def auth_flow(self, request): + token = self.client.get_token(self.scopes, new_token=self.new_token) + if token is None: + raise RuntimeError("No token received") + request.headers["Authorization"] = "Bearer %s" % token + response = yield request + if response.status_code in [401, 403]: + new_token = self.client.report_token_issue() + if not new_token: + return response + response.request.headers["Authorization"] = "Bearer %s" % new_token + yield response.request diff --git a/setup.py b/setup.py index d0643d2..56e2038 100755 --- a/setup.py +++ b/setup.py @@ -1,30 +1,33 @@ #!/usr/bin/python -tt from setuptools import find_packages, setup -exec(compile(open("openidc_client/release.py").read(), - "openidc_client/release.py", 'exec')) +exec( + compile( + open("openidc_client/release.py").read(), "openidc_client/release.py", "exec" + ) +) setup( - name='openidc-client', + name="openidc-client", version=VERSION, - description='OpenID Connect Client with caching and token management', - author='Patrick Uiterwijk', - author_email='puiterwijk@fedoraproject.org', - license='MIT', - keywords='OpenID Connect Client', - url='https://github.com/puiterwijk/python-openidc-client', + description="OpenID Connect Client with caching and token management", + author="Patrick Uiterwijk", + author_email="puiterwijk@fedoraproject.org", + license="MIT", + keywords="OpenID Connect Client", + url="https://github.com/puiterwijk/python-openidc-client", packages=find_packages(exclude=["tests"]), include_package_data=True, install_requires=[ - 'requests', + "httpx", ], classifiers=[ - 'Development Status :: 4 - Beta', - 'Intended Audience :: Developers', - 'License :: OSI Approved :: MIT License', - 'Programming Language :: Python :: 2.7', - 'Topic :: Internet :: WWW/HTTP', - 'Topic :: Software Development :: Libraries :: Python Modules', + "Development Status :: 4 - Beta", + "Intended Audience :: Developers", + "License :: OSI Approved :: MIT License", + "Programming Language :: Python :: 2.7", + "Topic :: Internet :: WWW/HTTP", + "Topic :: Software Development :: Libraries :: Python Modules", ], - test_suite='tests', + test_suite="tests", ) diff --git a/tests/test_openidcclient.py b/tests/test_openidcclient.py index fed86ad..787f873 100644 --- a/tests/test_openidcclient.py +++ b/tests/test_openidcclient.py @@ -30,6 +30,7 @@ import tempfile import urllib.parse import unittest + try: from mock import MagicMock, patch except ImportError: @@ -37,45 +38,49 @@ import openidc_client as openidcclient -BASE_URL = 'http://app/' -IDP_URL = 'https://idp/' +BASE_URL = "http://app/" +IDP_URL = "https://idp/" def set_token(client, toreturn): """Mock helper for _get_server to set a retrieved token.""" + def setter(app): client._retrieved_code = toreturn return MagicMock() + return setter def mock_request(responses): """Mock helper for responding to HTTP requests.""" + def perform(method, url, **extra): def rfs(toret): """Helper for Raise For Status.""" + def call(): if toret.status_code != 200: - raise Exception('Mocked response %s' % toret.status_code) + raise Exception("Mocked response %s" % toret.status_code) + return call toreturn = MagicMock() if url in responses: if len(responses[url]) == 0: - raise Exception('Unhandled requested to %s (extra %s)' % - (url, extra)) + raise Exception("Unhandled requested to %s (extra %s)" % (url, extra)) retval = responses[url][0] responses[url] = responses[url][1:] toreturn.status_code = 200 - if '_code' in retval: - toreturn.status_code = retval['_code'] - del retval['_code'] + if "_code" in retval: + toreturn.status_code = retval["_code"] + del retval["_code"] toreturn.json = MagicMock(return_value=retval) toreturn.raise_for_status = rfs(toreturn) return toreturn else: - raise Exception('Unhandled mocked URL: %s (extra: %s)' % - (url, extra)) + raise Exception("Unhandled mocked URL: %s (extra: %s)" % (url, extra)) + return perform @@ -84,16 +89,16 @@ class OpenIdBaseClientTest(unittest.TestCase): """Test the OpenId Base Client.""" def setUp(self): - self.cachedir = tempfile.mkdtemp('oidcclient') + self.cachedir = tempfile.mkdtemp("oidcclient") openidcclient.webbrowser = MagicMock() self.client = openidcclient.OpenIDCClient( - 'myapp', + "myapp", id_provider=IDP_URL, - id_provider_mapping={'Token': 'Token', - 'Authorization': 'Authorization'}, - client_id='testclient', - client_secret='notsecret', - cachedir=self.cachedir) + id_provider_mapping={"Token": "Token", "Authorization": "Authorization"}, + client_id="testclient", + client_secret="notsecret", + cachedir=self.cachedir, + ) def tearDown(self): shutil.rmtree(self.cachedir) @@ -101,85 +106,110 @@ def tearDown(self): def test_cachefile(self): """Test that the cachefile name is set by app id.""" with self.client._cache_lock: - self.assertEqual('oidc_myapp.json', - self.client._cachefile.rsplit('/', 1)[1]) + self.assertEqual( + "oidc_myapp.json", self.client._cachefile.rsplit("/", 1)[1] + ) def test_get_new_token_cancel(self): """Test that we handle it correctly if the user cancels.""" - with patch.object(self.client, '_get_server', - side_effect=set_token(self.client, False)) as gsmock: - with patch.object(openidcclient.requests, 'request', - side_effect=mock_request({})) as postmock: - result = self.client._get_new_token( - ['test_get_new_token_cancel']) + with patch.object( + self.client, "_get_server", side_effect=set_token(self.client, False) + ) as gsmock: + with patch.object( + openidcclient.httpx, "request", side_effect=mock_request({}) + ) as postmock: + result = self.client._get_new_token(["test_get_new_token_cancel"]) self.assertEqual(result, None) assert gsmock.call_count == 1 postmock.assert_not_called() def test_get_new_token_error(self): """Test that we handle errors correctly.""" - postresp = {'https://idp/Token': [ - {'error': 'some_error', - 'error_description': 'Some error occured'}]} - - with patch.object(self.client, '_get_server', - side_effect=set_token(self.client, 'authz')) as gsm: - with patch.object(openidcclient.requests, 'request', - side_effect=mock_request(postresp)) as postmock: - result = self.client._get_new_token( - ['test_get_new_token_error']) + postresp = { + "https://idp/Token": [ + {"error": "some_error", "error_description": "Some error occured"} + ] + } + + with patch.object( + self.client, "_get_server", side_effect=set_token(self.client, "authz") + ) as gsm: + with patch.object( + openidcclient.httpx, "request", side_effect=mock_request(postresp) + ) as postmock: + result = self.client._get_new_token(["test_get_new_token_error"]) self.assertEqual(result, None) assert gsm.call_count == 1 postmock.assert_called_once_with( - 'POST', - 'https://idp/Token', - data={'code': 'authz', - 'client_secret': 'notsecret', - 'grant_type': 'authorization_code', - 'client_id': 'testclient', - 'redirect_uri': 'http://localhost:1/'}) + "POST", + "https://idp/Token", + data={ + "code": "authz", + "client_secret": "notsecret", + "grant_type": "authorization_code", + "client_id": "testclient", + "redirect_uri": "http://localhost:1/", + }, + ) def test_get_new_token_working(self): """Test for a completely succesful case.""" - postresp = {'https://idp/Token': [ - {'access_token': 'testtoken', - 'refresh_token': 'refreshtoken', - 'expires_in': 600, - 'token_type': 'Bearer'}]} - - with patch.object(self.client, '_get_server', - side_effect=set_token(self.client, 'authz')) as gsm: - with patch.object(openidcclient.requests, 'request', - side_effect=mock_request(postresp)) as postmock: - result = self.client._get_new_token( - ['test_get_new_token_working']) + postresp = { + "https://idp/Token": [ + { + "access_token": "testtoken", + "refresh_token": "refreshtoken", + "expires_in": 600, + "token_type": "Bearer", + } + ] + } + + with patch.object( + self.client, "_get_server", side_effect=set_token(self.client, "authz") + ) as gsm: + with patch.object( + openidcclient.httpx, "request", side_effect=mock_request(postresp) + ) as postmock: + result = self.client._get_new_token(["test_get_new_token_working"]) self.assertNotEqual(result, None) assert gsm.call_count == 1 postmock.assert_called_once_with( - 'POST', - 'https://idp/Token', - data={'code': 'authz', - 'client_secret': 'notsecret', - 'grant_type': 'authorization_code', - 'client_id': 'testclient', - 'redirect_uri': 'http://localhost:1/'}) + "POST", + "https://idp/Token", + data={ + "code": "authz", + "client_secret": "notsecret", + "grant_type": "authorization_code", + "client_id": "testclient", + "redirect_uri": "http://localhost:1/", + }, + ) def test_get_new_token_pkce_working(self): """Test for a completely succesful case with PKCE.""" - postresp = {'https://idp/Token': [ - {'access_token': 'testtoken', - 'refresh_token': 'refreshtoken', - 'expires_in': 600, - 'token_type': 'Bearer'}]} - - with patch.object(self.client, '_get_server', - side_effect=set_token(self.client, 'authz')) as gsm: - with patch.object(openidcclient.requests, 'request', - side_effect=mock_request(postresp)) as postmock: - with patch.object(openidcclient.webbrowser, 'open') as wb: + postresp = { + "https://idp/Token": [ + { + "access_token": "testtoken", + "refresh_token": "refreshtoken", + "expires_in": 600, + "token_type": "Bearer", + } + ] + } + + with patch.object( + self.client, "_get_server", side_effect=set_token(self.client, "authz") + ) as gsm: + with patch.object( + openidcclient.httpx, "request", side_effect=mock_request(postresp) + ) as postmock: + with patch.object(openidcclient.webbrowser, "open") as wb: self.client._use_pkce = True result = self.client._get_new_token( - ['test_get_new_token_pkce_working']) + ["test_get_new_token_pkce_working"] + ) self.client._use_pkce = False self.assertNotEqual(result, None) assert gsm.call_count == 1 @@ -188,376 +218,485 @@ def test_get_new_token_pkce_working(self): (wbargs, _) = wb.call_args auth_url = urllib.parse.urlparse(wbargs[0]) auth_params = urllib.parse.parse_qs(auth_url.query) - assert auth_params['code_challenge_method'] == ['S256'] - assert 'code_challenge' in auth_params - code_challenge = auth_params['code_challenge'][0] + assert auth_params["code_challenge_method"] == ["S256"] + assert "code_challenge" in auth_params + code_challenge = auth_params["code_challenge"][0] (args, kwargs) = postmock.call_args assert args[0] == "POST" assert args[1] == "https://idp/Token" - assert kwargs['data']['code'] == 'authz' - assert kwargs['data']['client_id'] == 'testclient' - assert kwargs['data']['client_secret'] == 'notsecret' - assert kwargs['data']['grant_type'] == 'authorization_code' - assert kwargs['data']['redirect_uri'] == 'http://localhost:1/' - code_verifier = kwargs['data']['code_verifier'] + assert kwargs["data"]["code"] == "authz" + assert kwargs["data"]["client_id"] == "testclient" + assert kwargs["data"]["client_secret"] == "notsecret" + assert kwargs["data"]["grant_type"] == "authorization_code" + assert kwargs["data"]["redirect_uri"] == "http://localhost:1/" + code_verifier = kwargs["data"]["code_verifier"] assert len(code_verifier) >= 43 assert len(code_verifier) <= 128 - correct_challenge = urlsafe_b64encode(sha256(code_verifier.encode()).digest()) - correct_challenge = correct_challenge.decode().rstrip('=') + correct_challenge = urlsafe_b64encode( + sha256(code_verifier.encode()).digest() + ) + correct_challenge = correct_challenge.decode().rstrip("=") assert correct_challenge == code_challenge def test_get_token_no_new(self): """Test that if we don't have a token we can skip getting a new oen.""" - self.assertEqual(self.client.get_token(['test_get_token_no_new'], - new_token=False), - None) + self.assertEqual( + self.client.get_token(["test_get_token_no_new"], new_token=False), None + ) def test_get_token_from_cache(self): """Test that if we have a cached token, that gets returned.""" - postresp = {'https://idp/Token': [ - {'access_token': 'testtoken', - 'refresh_token': 'refreshtoken', - 'expires_in': 600, - 'token_type': 'Bearer'}]} - - with patch.object(self.client, '_get_server', - side_effect=set_token(self.client, 'authz')) as gsm: - with patch.object(openidcclient.requests, 'request', - side_effect=mock_request(postresp)) as postmock: - result = self.client._get_new_token( - ['test_get_token_from_cache']) + postresp = { + "https://idp/Token": [ + { + "access_token": "testtoken", + "refresh_token": "refreshtoken", + "expires_in": 600, + "token_type": "Bearer", + } + ] + } + + with patch.object( + self.client, "_get_server", side_effect=set_token(self.client, "authz") + ) as gsm: + with patch.object( + openidcclient.httpx, "request", side_effect=mock_request(postresp) + ) as postmock: + result = self.client._get_new_token(["test_get_token_from_cache"]) self.assertNotEqual(result, None) assert gsm.call_count == 1 postmock.assert_called_once_with( - 'POST', - 'https://idp/Token', - data={'code': 'authz', - 'client_secret': 'notsecret', - 'grant_type': 'authorization_code', - 'client_id': 'testclient', - 'redirect_uri': 'http://localhost:1/'}) + "POST", + "https://idp/Token", + data={ + "code": "authz", + "client_secret": "notsecret", + "grant_type": "authorization_code", + "client_id": "testclient", + "redirect_uri": "http://localhost:1/", + }, + ) self.assertNotEqual( - self.client.get_token(['test_get_token_from_cache'], - new_token=False), - None) + self.client.get_token(["test_get_token_from_cache"], new_token=False), None + ) def test_get_token_new(self): """Test that get_token can get a new token.""" - postresp = {'https://idp/Token': [ - {'access_token': 'testtoken', - 'refresh_token': 'refreshtoken', - 'expires_in': 600, - 'token_type': 'Bearer'}]} - - with patch.object(self.client, '_get_server', - side_effect=set_token(self.client, 'authz')) as gsm: - with patch.object(openidcclient.requests, 'request', - side_effect=mock_request(postresp)) as postmock: + postresp = { + "https://idp/Token": [ + { + "access_token": "testtoken", + "refresh_token": "refreshtoken", + "expires_in": 600, + "token_type": "Bearer", + } + ] + } + + with patch.object( + self.client, "_get_server", side_effect=set_token(self.client, "authz") + ) as gsm: + with patch.object( + openidcclient.httpx, "request", side_effect=mock_request(postresp) + ) as postmock: self.assertNotEqual( - self.client.get_token(['test_get_token_new'], - new_token=True), - None) + self.client.get_token(["test_get_token_new"], new_token=True), None + ) assert gsm.call_count == 1 postmock.assert_called_once_with( - 'POST', - 'https://idp/Token', - data={'code': 'authz', - 'client_secret': 'notsecret', - 'grant_type': 'authorization_code', - 'client_id': 'testclient', - 'redirect_uri': 'http://localhost:1/'}) + "POST", + "https://idp/Token", + data={ + "code": "authz", + "client_secret": "notsecret", + "grant_type": "authorization_code", + "client_id": "testclient", + "redirect_uri": "http://localhost:1/", + }, + ) def test_report_token_issue_refreshable(self): """Test that we refresh a token if problems are reported.""" - postresp = {'https://idp/Token': [ - {'access_token': 'testtoken', - 'refresh_token': 'refreshtoken', - 'expires_in': 600, - 'token_type': 'Bearer'}, - {'access_token': 'refreshedtoken', - 'refresh_token': 'refreshtoken2', - 'expires_in': 600, - 'token_type': 'Bearer'}]} - - with patch.object(self.client, '_get_server', - side_effect=set_token(self.client, 'authz')) as gsm: - with patch.object(openidcclient.requests, 'request', - side_effect=mock_request(postresp)) as postmock: + postresp = { + "https://idp/Token": [ + { + "access_token": "testtoken", + "refresh_token": "refreshtoken", + "expires_in": 600, + "token_type": "Bearer", + }, + { + "access_token": "refreshedtoken", + "refresh_token": "refreshtoken2", + "expires_in": 600, + "token_type": "Bearer", + }, + ] + } + + with patch.object( + self.client, "_get_server", side_effect=set_token(self.client, "authz") + ) as gsm: + with patch.object( + openidcclient.httpx, "request", side_effect=mock_request(postresp) + ) as postmock: self.assertNotEqual( self.client.get_token( - ['test_report_token_issue_refreshable'], - new_token=True), - None) + ["test_report_token_issue_refreshable"], new_token=True + ), + None, + ) assert gsm.call_count == 1 postmock.assert_called_with( - 'POST', - 'https://idp/Token', - data={'code': 'authz', - 'client_secret': 'notsecret', - 'grant_type': 'authorization_code', - 'client_id': 'testclient', - 'redirect_uri': 'http://localhost:1/'}) + "POST", + "https://idp/Token", + data={ + "code": "authz", + "client_secret": "notsecret", + "grant_type": "authorization_code", + "client_id": "testclient", + "redirect_uri": "http://localhost:1/", + }, + ) postmock.reset_mock() - self.assertNotEqual(self.client.report_token_issue(), - None) + self.assertNotEqual(self.client.report_token_issue(), None) postmock.assert_called_once_with( - 'POST', - 'https://idp/Token', - data={'client_id': 'testclient', - 'client_secret': 'notsecret', - 'grant_type': 'refresh_token', - 'refresh_token': 'refreshtoken'}) + "POST", + "https://idp/Token", + data={ + "client_id": "testclient", + "client_secret": "notsecret", + "grant_type": "refresh_token", + "refresh_token": "refreshtoken", + }, + ) def test_report_token_issue_revoked(self): """Test that we only try to refresh once.""" - postresp = {'https://idp/Token': [ - {'access_token': 'testtoken', - 'refresh_token': 'refreshtoken', - 'expires_in': 600, - 'token_type': 'Bearer'}, - {'error': 'invalid_token', - 'error_description': 'This token is not valid'}]} - - with patch.object(self.client, '_get_server', - side_effect=set_token(self.client, 'authz')) as gsm: - with patch.object(openidcclient.requests, 'request', - side_effect=mock_request(postresp)) as postmock: + postresp = { + "https://idp/Token": [ + { + "access_token": "testtoken", + "refresh_token": "refreshtoken", + "expires_in": 600, + "token_type": "Bearer", + }, + { + "error": "invalid_token", + "error_description": "This token is not valid", + }, + ] + } + + with patch.object( + self.client, "_get_server", side_effect=set_token(self.client, "authz") + ) as gsm: + with patch.object( + openidcclient.httpx, "request", side_effect=mock_request(postresp) + ) as postmock: self.assertNotEqual( self.client.get_token( - ['test_report_token_issue_revoked'], - new_token=True), - None) + ["test_report_token_issue_revoked"], new_token=True + ), + None, + ) assert gsm.call_count == 1 postmock.assert_called_with( - 'POST', - 'https://idp/Token', - data={'code': 'authz', - 'client_secret': 'notsecret', - 'grant_type': 'authorization_code', - 'client_id': 'testclient', - 'redirect_uri': 'http://localhost:1/'}) + "POST", + "https://idp/Token", + data={ + "code": "authz", + "client_secret": "notsecret", + "grant_type": "authorization_code", + "client_id": "testclient", + "redirect_uri": "http://localhost:1/", + }, + ) postmock.reset_mock() - self.assertEqual(self.client.report_token_issue(), - None) + self.assertEqual(self.client.report_token_issue(), None) postmock.assert_called_once_with( - 'POST', - 'https://idp/Token', - data={'client_id': 'testclient', - 'client_secret': 'notsecret', - 'grant_type': 'refresh_token', - 'refresh_token': 'refreshtoken'}) + "POST", + "https://idp/Token", + data={ + "client_id": "testclient", + "client_secret": "notsecret", + "grant_type": "refresh_token", + "refresh_token": "refreshtoken", + }, + ) def test_report_token_issue_no_refresh(self): """Test that we don't try to refresh if there's no refresh token.""" - postresp = {'https://idp/Token': [ - {'access_token': 'testtoken', - 'expires_in': 600, - 'token_type': 'Bearer'}, - {'error': 'invalid_token', - 'error_description': 'This token is not valid'}]} - - with patch.object(self.client, '_get_server', - side_effect=set_token(self.client, 'authz')) as gsm: - with patch.object(openidcclient.requests, 'request', - side_effect=mock_request(postresp)) as postmock: + postresp = { + "https://idp/Token": [ + { + "access_token": "testtoken", + "expires_in": 600, + "token_type": "Bearer", + }, + { + "error": "invalid_token", + "error_description": "This token is not valid", + }, + ] + } + + with patch.object( + self.client, "_get_server", side_effect=set_token(self.client, "authz") + ) as gsm: + with patch.object( + openidcclient.httpx, "request", side_effect=mock_request(postresp) + ) as postmock: self.assertNotEqual( self.client.get_token( - ['test_report_token_issue_rno_refresh'], - new_token=True), - None) + ["test_report_token_issue_rno_refresh"], new_token=True + ), + None, + ) assert gsm.call_count == 1 postmock.assert_called_with( - 'POST', - 'https://idp/Token', - data={'code': 'authz', - 'client_secret': 'notsecret', - 'grant_type': 'authorization_code', - 'client_id': 'testclient', - 'redirect_uri': 'http://localhost:1/'}) + "POST", + "https://idp/Token", + data={ + "code": "authz", + "client_secret": "notsecret", + "grant_type": "authorization_code", + "client_id": "testclient", + "redirect_uri": "http://localhost:1/", + }, + ) postmock.reset_mock() - self.assertEqual(self.client.report_token_issue(), - None) + self.assertEqual(self.client.report_token_issue(), None) postmock.assert_not_called() def test_send_request_valid_token(self): """Test that we send the token.""" - postresp = {'https://idp/Token': [ - {'access_token': 'testtoken', - 'refresh_token': 'refreshtoken', - 'expires_in': 600, - 'token_type': 'Bearer'}], - 'http://app/test': [ - {} - ]} - - with patch.object(self.client, '_get_server', - side_effect=set_token(self.client, 'authz')) as gsm: - with patch.object(openidcclient.requests, 'request', - side_effect=mock_request(postresp)) as postmock: + postresp = { + "https://idp/Token": [ + { + "access_token": "testtoken", + "refresh_token": "refreshtoken", + "expires_in": 600, + "token_type": "Bearer", + } + ], + "http://app/test": [{}], + } + + with patch.object( + self.client, "_get_server", side_effect=set_token(self.client, "authz") + ) as gsm: + with patch.object( + openidcclient.httpx, "request", side_effect=mock_request(postresp) + ) as postmock: result = self.client.send_request( - 'http://app/test', - scopes=['test_send_request_valid_token']) + "http://app/test", scopes=["test_send_request_valid_token"] + ) assert gsm.call_count == 1 self.assertEqual(result.json(), {}) postmock.assert_called_with( - 'POST', - 'http://app/test', - headers={'Authorization': 'Bearer testtoken'}) + "POST", + "http://app/test", + headers={"Authorization": "Bearer testtoken"}, + ) def test_send_request_valid_token_PATH(self): """Test that we send the token with a PATCH request.""" - postresp = {'https://idp/Token': [ - {'access_token': 'testtoken', - 'refresh_token': 'refreshtoken', - 'expires_in': 600, - 'token_type': 'Bearer'}], - 'http://app/test': [ - {} - ]} - - with patch.object(self.client, '_get_server', - side_effect=set_token(self.client, 'authz')) as gsm: - with patch.object(openidcclient.requests, 'request', - side_effect=mock_request(postresp)) as postmock: + postresp = { + "https://idp/Token": [ + { + "access_token": "testtoken", + "refresh_token": "refreshtoken", + "expires_in": 600, + "token_type": "Bearer", + } + ], + "http://app/test": [{}], + } + + with patch.object( + self.client, "_get_server", side_effect=set_token(self.client, "authz") + ) as gsm: + with patch.object( + openidcclient.httpx, "request", side_effect=mock_request(postresp) + ) as postmock: result = self.client.send_request( - 'http://app/test', - scopes=['test_send_request_valid_token'], - http_method='PATCH') + "http://app/test", + scopes=["test_send_request_valid_token"], + http_method="PATCH", + ) assert gsm.call_count == 1 self.assertEqual(result.json(), {}) postmock.assert_called_with( - 'PATCH', - 'http://app/test', - headers={'Authorization': 'Bearer testtoken'}) + "PATCH", + "http://app/test", + headers={"Authorization": "Bearer testtoken"}, + ) def test_send_request_not_valid_token_500(self): """Test that we don't refresh if we get a server error.""" - postresp = {'https://idp/Token': [ - {'access_token': 'testtoken', - 'refresh_token': 'refreshtoken', - 'expires_in': 600, - 'token_type': 'Bearer'}], - 'http://app/test': [ - {'_code': 500}, - ]} - - with patch.object(self.client, '_get_server', - side_effect=set_token(self.client, 'authz')) as gsm: - with patch.object(openidcclient.requests, 'request', - side_effect=mock_request(postresp)) as postmock: + postresp = { + "https://idp/Token": [ + { + "access_token": "testtoken", + "refresh_token": "refreshtoken", + "expires_in": 600, + "token_type": "Bearer", + } + ], + "http://app/test": [ + {"_code": 500}, + ], + } + + with patch.object( + self.client, "_get_server", side_effect=set_token(self.client, "authz") + ) as gsm: + with patch.object( + openidcclient.httpx, "request", side_effect=mock_request(postresp) + ) as postmock: result = self.client.send_request( - 'http://app/test', - scopes=['test_send_request_not_valid_token_500']) + "http://app/test", scopes=["test_send_request_not_valid_token_500"] + ) assert gsm.call_count == 1 self.assertEqual(result.status_code, 500) self.assertEqual(result.json(), {}) postmock.assert_called_with( - 'POST', - 'http://app/test', - headers={'Authorization': 'Bearer testtoken'}) + "POST", + "http://app/test", + headers={"Authorization": "Bearer testtoken"}, + ) def test_send_request_not_valid_token_403(self): """Test that we don't refresh if the app returns a 403 (forbidden)""" - postresp = {'https://idp/Token': [ - {'access_token': 'testtoken', - 'refresh_token': 'refreshtoken', - 'expires_in': 600, - 'token_type': 'Bearer'}], - 'http://app/test': [ - {'_code': 403}, - ]} - - with patch.object(self.client, '_get_server', - side_effect=set_token(self.client, 'authz')) as gsm: - with patch.object(openidcclient.requests, 'request', - side_effect=mock_request(postresp)) as postmock: + postresp = { + "https://idp/Token": [ + { + "access_token": "testtoken", + "refresh_token": "refreshtoken", + "expires_in": 600, + "token_type": "Bearer", + } + ], + "http://app/test": [ + {"_code": 403}, + ], + } + + with patch.object( + self.client, "_get_server", side_effect=set_token(self.client, "authz") + ) as gsm: + with patch.object( + openidcclient.httpx, "request", side_effect=mock_request(postresp) + ) as postmock: result = self.client.send_request( - 'http://app/test', - scopes=['test_send_request_not_valid_token_403']) + "http://app/test", scopes=["test_send_request_not_valid_token_403"] + ) assert gsm.call_count == 1 self.assertEqual(result.status_code, 403) self.assertEqual(result.json(), {}) postmock.assert_called_with( - 'POST', - 'http://app/test', - headers={'Authorization': 'Bearer testtoken'}) + "POST", + "http://app/test", + headers={"Authorization": "Bearer testtoken"}, + ) def test_send_request_not_valid_token_401_refreshable(self): """Test that we do refresh with a 401.""" - postresp = {'https://idp/Token': [ - {'access_token': 'testtoken', - 'refresh_token': 'refreshtoken', - 'expires_in': 600, - 'token_type': 'Bearer'}, - {'access_token': 'refreshedtoken', - 'refresh_token': 'refreshtoken2', - 'expires_in': 600, - 'token_type': 'Bearer'}], - 'http://app/test': [ - {'_code': 401}, - {}, - {} - ]} - - with patch.object(self.client, '_get_server', - side_effect=set_token(self.client, 'authz')) as gsm: - with patch.object(openidcclient.requests, 'request', - side_effect=mock_request(postresp)) as postmock: + postresp = { + "https://idp/Token": [ + { + "access_token": "testtoken", + "refresh_token": "refreshtoken", + "expires_in": 600, + "token_type": "Bearer", + }, + { + "access_token": "refreshedtoken", + "refresh_token": "refreshtoken2", + "expires_in": 600, + "token_type": "Bearer", + }, + ], + "http://app/test": [{"_code": 401}, {}, {}], + } + + with patch.object( + self.client, "_get_server", side_effect=set_token(self.client, "authz") + ) as gsm: + with patch.object( + openidcclient.httpx, "request", side_effect=mock_request(postresp) + ) as postmock: result = self.client.send_request( - 'http://app/test', - scopes=['test_send_request_not_valid_token_401_' + - 'refreshable'], - json={'foo': 'bar'}) + "http://app/test", + scopes=["test_send_request_not_valid_token_401_" + "refreshable"], + json={"foo": "bar"}, + ) assert gsm.call_count == 1 self.assertEqual(result.json(), {}) postmock.assert_called_with( - 'POST', - 'http://app/test', - json={'foo': 'bar'}, - headers={'Authorization': 'Bearer refreshedtoken'}) + "POST", + "http://app/test", + json={"foo": "bar"}, + headers={"Authorization": "Bearer refreshedtoken"}, + ) postmock.reset_mock() self.client._refresh_cache() result = self.client.send_request( - 'http://app/test', - scopes=['test_send_request_not_valid_token_401_' + - 'refreshable'], - json={'foo': 'bar'}) + "http://app/test", + scopes=["test_send_request_not_valid_token_401_" + "refreshable"], + json={"foo": "bar"}, + ) self.assertEqual(result.json(), {}) postmock.assert_called_with( - 'POST', - 'http://app/test', - json={'foo': 'bar'}, - headers={'Authorization': 'Bearer refreshedtoken'}) + "POST", + "http://app/test", + json={"foo": "bar"}, + headers={"Authorization": "Bearer refreshedtoken"}, + ) def test_send_request_not_valid_token_401_not_refreshable(self): """Test that we only try to refresh once and then throw away.""" - postresp = {'https://idp/Token': [ - {'access_token': 'testtoken', - 'refresh_token': 'refreshtoken', - 'expires_in': 600, - 'token_type': 'Bearer'}, - {'error': 'invalid_token', - 'error_description': 'Could not refresh'}], - 'http://app/test': [ - {'_code': 401}, - ]} - - with patch.object(self.client, '_get_server', - side_effect=set_token(self.client, 'authz')) as gsm: - with patch.object(openidcclient.requests, 'request', - side_effect=mock_request(postresp)): + postresp = { + "https://idp/Token": [ + { + "access_token": "testtoken", + "refresh_token": "refreshtoken", + "expires_in": 600, + "token_type": "Bearer", + }, + {"error": "invalid_token", "error_description": "Could not refresh"}, + ], + "http://app/test": [ + {"_code": 401}, + ], + } + + with patch.object( + self.client, "_get_server", side_effect=set_token(self.client, "authz") + ) as gsm: + with patch.object( + openidcclient.httpx, "request", side_effect=mock_request(postresp) + ): result = self.client.send_request( - 'http://app/test', - scopes=['test_send_request_not_valid_token_401_not_' + - 'refreshable']) + "http://app/test", + scopes=[ + "test_send_request_not_valid_token_401_not_" + "refreshable" + ], + ) assert gsm.call_count == 1 self.assertEqual(result.status_code, 401) self.assertEqual(result.json(), {}) # Make sure that if there was an error, the token is cleared - self.assertEqual(self.client.get_token( - ['test_send_request_not_valid_token_401_not_refreshable'], - new_token=False), None) + self.assertEqual( + self.client.get_token( + ["test_send_request_not_valid_token_401_not_refreshable"], + new_token=False, + ), + None, + )