Skip to content

Added HttpxAsyncClient wrapper for httpx.AsyncClient and support for send_each_for_multicast_async() #878

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
May 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
4c3a6d9
Refactored retry config to `_retry.py` and added support for backoff …
jonathanedey Apr 11, 2025
1c4c844
Added unit tests for `_retry.py`
jonathanedey Apr 14, 2025
b3aba37
Updated unit tests for HTTPX request errors
jonathanedey Apr 14, 2025
2f7c3fa
Add HttpxAsyncClient to wrap httpx.AsyncClient
jonathanedey Apr 22, 2025
a873a17
Merge branch 'je-httpx-retry' into je-http2-client
jonathanedey Apr 25, 2025
1be3393
Added forced refresh to google auth credential flow and fixed lint
jonathanedey Apr 28, 2025
e5ad5c5
Added unit tests for `GoogleAuthCredentialFlow` and `HttpxAsyncClient`
jonathanedey Apr 28, 2025
2df6ed5
Removed duplicate export
jonathanedey Apr 28, 2025
4f2a912
Added support for `send_each_for_multicast_async()` and updated doc s…
jonathanedey Apr 29, 2025
c4b0673
Remove duplicate auth class
jonathanedey Apr 29, 2025
3aba1d7
Cover auth request error case when `requests` request fails in HTTPX …
jonathanedey Apr 29, 2025
4e49722
Update test for `send_each_for_multicast_async()`
jonathanedey Apr 29, 2025
0b976cb
Address review comments
jonathanedey May 14, 2025
a294e22
Merge branch 'je-httpx-retry' into je-http2-client
jonathanedey May 14, 2025
ece43fc
Merge branch 'fcm-http2' into je-http2-client
jonathanedey May 21, 2025
f4a3931
fix lint and some types
jonathanedey May 22, 2025
a5aa585
Address review comments and removed unused code
jonathanedey May 26, 2025
1eab897
Merge branch 'fcm-http2' into je-http2-client
jonathanedey May 27, 2025
2b8683c
Update metric header test logic for `TestHttpxAsyncClient`
jonathanedey May 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
214 changes: 208 additions & 6 deletions firebase_admin/_http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,23 @@

"""Internal HTTP client module.

This module provides utilities for making HTTP calls using the requests library.
"""

from google.auth import transport
import requests
This module provides utilities for making HTTP calls using the requests library.
"""

from __future__ import annotations
import logging
from typing import Any, Dict, Generator, Optional, Tuple, Union
import httpx
import requests.adapters
from requests.packages.urllib3.util import retry # pylint: disable=import-error
from google.auth import credentials
from google.auth import transport
from google.auth.transport import requests as google_auth_requests

from firebase_admin import _utils
from firebase_admin._retry import HttpxRetry, HttpxRetryTransport

logger = logging.getLogger(__name__)

if hasattr(retry.Retry.DEFAULT, 'allowed_methods'):
_ANY_METHOD = {'allowed_methods': None}
Expand All @@ -34,6 +43,9 @@
connect=1, read=1, status=4, status_forcelist=[500, 503],
raise_on_status=False, backoff_factor=0.5, **_ANY_METHOD)

DEFAULT_HTTPX_RETRY_CONFIG = HttpxRetry(
max_retries=4, status_forcelist=[500, 503], backoff_factor=0.5)


DEFAULT_TIMEOUT_SECONDS = 120

Expand Down Expand Up @@ -144,7 +156,6 @@ def close(self):
self._session.close()
self._session = None


class JsonHttpClient(HttpClient):
"""An HTTP client that parses response messages as JSON."""

Expand All @@ -153,3 +164,194 @@ def __init__(self, **kwargs):

def parse_body(self, resp):
return resp.json()

class GoogleAuthCredentialFlow(httpx.Auth):
"""Google Auth Credential Auth Flow"""
def __init__(self, credential: credentials.Credentials):
self._credential = credential
self._max_refresh_attempts = 2
self._refresh_status_codes = (401,)

def apply_auth_headers(
self,
request: httpx.Request,
auth_request: google_auth_requests.Request
) -> None:
"""A helper function to refreshes credentials if needed and mutates the request headers to
contain access token and any other google auth headers."""

logger.debug(
'Attempting to apply auth headers. Credential validity before: %s',
self._credential.valid
)
self._credential.before_request(
auth_request, request.method, str(request.url), request.headers
)
logger.debug('Auth headers applied. Credential validity after: %s', self._credential.valid)

def auth_flow(self, request: httpx.Request) -> Generator[httpx.Request, httpx.Response, None]:
_original_headers = request.headers.copy()
_credential_refresh_attempt = 0

# Create a Google auth request object to be used for refreshing credentials
auth_request = google_auth_requests.Request()

while True:
# Copy original headers for each attempt
request.headers = _original_headers.copy()

# Apply auth headers (which might include an implicit refresh if token is expired)
self.apply_auth_headers(request, auth_request)

logger.debug(
'Dispatching request, attempt %d of %d',
_credential_refresh_attempt, self._max_refresh_attempts
)
response: httpx.Response = yield request

if response.status_code in self._refresh_status_codes:
if _credential_refresh_attempt < self._max_refresh_attempts:
logger.debug(
'Received status %d. Attempting explicit credential refresh. \
Attempt %d of %d.',
response.status_code,
_credential_refresh_attempt + 1,
self._max_refresh_attempts
)
# Explicitly force a credentials refresh
self._credential.refresh(auth_request)
_credential_refresh_attempt += 1
else:
logger.debug(
'Received status %d, but max auth refresh attempts (%d) reached. \
Returning last response.',
response.status_code, self._max_refresh_attempts
)
break
else:
# Status code is not one that requires a refresh, so break and return response
logger.debug(
'Status code %d does not require refresh. Returning response.',
response.status_code
)
break
# The last yielded response is automatically returned by httpx's auth flow.

class HttpxAsyncClient():
"""Async HTTP client used to make HTTP/2 calls using HTTPX.

HttpxAsyncClient maintains an async HTTPX client, and handles request authentication and retries
if necessary.
"""
def __init__(
self,
credential: Optional[credentials.Credentials] = None,
base_url: str = '',
headers: Optional[Union[httpx.Headers, Dict[str, str]]] = None,
retry_config: HttpxRetry = DEFAULT_HTTPX_RETRY_CONFIG,
timeout: int = DEFAULT_TIMEOUT_SECONDS,
http2: bool = True
) -> None:
"""Creates a new HttpxAsyncClient instance from the provided arguments.

If a credential is provided, initializes a new async HTTPX client authorized with it.
Otherwise, initializes a new unauthorized async HTTPX client.

Args:
credential: A Google credential that can be used to authenticate requests (optional).
base_url: A URL prefix to be added to all outgoing requests (optional).
headers: A map of headers to be added to all outgoing requests (optional).
retry_config: A HttpxRetry configuration. Default settings would retry up to 4 times for
HTTP 500 and 503 errors (optional).
timeout: HTTP timeout in seconds. Defaults to 120 seconds when not specified (optional).
http2: A boolean indicating if HTTP/2 support should be enabled. Defaults to `True` when
not specified (optional).
"""
self._base_url = base_url
self._timeout = timeout
self._headers = {**headers, **METRICS_HEADERS} if headers else {**METRICS_HEADERS}
self._retry_config = retry_config

# Only set up retries on urls starting with 'http://' and 'https://'
self._mounts = {
'http://': HttpxRetryTransport(retry=self._retry_config, http2=http2),
'https://': HttpxRetryTransport(retry=self._retry_config, http2=http2)
}

if credential:
self._async_client = httpx.AsyncClient(
http2=http2,
timeout=self._timeout,
headers=self._headers,
auth=GoogleAuthCredentialFlow(credential), # Add auth flow for credentials.
mounts=self._mounts
)
else:
self._async_client = httpx.AsyncClient(
http2=http2,
timeout=self._timeout,
headers=self._headers,
mounts=self._mounts
)

@property
def base_url(self):
return self._base_url

@property
def timeout(self):
return self._timeout

@property
def async_client(self):
return self._async_client

async def request(self, method: str, url: str, **kwargs: Any) -> httpx.Response:
"""Makes an HTTP call using the HTTPX library.

This is the sole entry point to the HTTPX library. All other helper methods in this
class call this method to send HTTP requests out. Refer to
https://www.python-httpx.org/api/ for more information on supported options
and features.

Args:
method: HTTP method name as a string (e.g. get, post).
url: URL of the remote endpoint.
**kwargs: An additional set of keyword arguments to be passed into the HTTPX API
(e.g. json, params, timeout).

Returns:
Response: An HTTPX response object.

Raises:
HTTPError: Any HTTPX exceptions encountered while making the HTTP call.
RequestException: Any requests exceptions encountered while making the HTTP call.
"""
if 'timeout' not in kwargs:
kwargs['timeout'] = self.timeout
resp = await self._async_client.request(method, self.base_url + url, **kwargs)
return resp.raise_for_status()

async def headers(self, method: str, url: str, **kwargs: Any) -> httpx.Headers:
resp = await self.request(method, url, **kwargs)
return resp.headers

async def body_and_response(
self, method: str, url: str, **kwargs: Any) -> Tuple[Any, httpx.Response]:
resp = await self.request(method, url, **kwargs)
return self.parse_body(resp), resp

async def body(self, method: str, url: str, **kwargs: Any) -> Any:
resp = await self.request(method, url, **kwargs)
return self.parse_body(resp)

async def headers_and_body(
self, method: str, url: str, **kwargs: Any) -> Tuple[httpx.Headers, Any]:
resp = await self.request(method, url, **kwargs)
return resp.headers, self.parse_body(resp)

def parse_body(self, resp: httpx.Response) -> Any:
return resp.json()

async def aclose(self) -> None:
await self._async_client.aclose()
7 changes: 3 additions & 4 deletions firebase_admin/_retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@
import random
import re
import time
from types import CoroutineType
from typing import Any, Callable, List, Optional, Tuple
from typing import Any, Callable, List, Optional, Tuple, Coroutine
import logging
import asyncio
import httpx
Expand Down Expand Up @@ -163,7 +162,7 @@ class HttpxRetryTransport(httpx.AsyncBaseTransport):

DEFAULT_RETRY = HttpxRetry(max_retries=4, status_forcelist=[500, 503], backoff_factor=0.5)

def __init__(self, retry: HttpxRetry = DEFAULT_RETRY, **kwargs) -> None:
def __init__(self, retry: HttpxRetry = DEFAULT_RETRY, **kwargs: Any) -> None:
self._retry = retry

transport_kwargs = kwargs.copy()
Expand All @@ -180,7 +179,7 @@ async def handle_async_request(self, request: httpx.Request) -> httpx.Response:
async def _dispatch_with_retry(
self,
request: httpx.Request,
dispatch_method: Callable[[httpx.Request], CoroutineType[Any, Any, httpx.Response]]
dispatch_method: Callable[[httpx.Request], Coroutine[Any, Any, httpx.Response]]
) -> httpx.Response:
"""Sends a request with retry logic using a provided dispatch method."""
# This request config is used across all requests that use this transport and therefore
Expand Down
Loading