Skip to content

#748: don't share httpx.AsyncHTTPTransport between event loops #1695

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 40 additions & 4 deletions pydantic_ai_slim/pydantic_ai/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@

from __future__ import annotations as _annotations

import asyncio
import weakref
from abc import ABC, abstractmethod
from collections.abc import AsyncIterator, Iterator
from collections.abc import AsyncIterator, Iterator, MutableMapping
from contextlib import asynccontextmanager, contextmanager
from dataclasses import dataclass, field
from datetime import datetime
from functools import cache
from types import TracebackType

import httpx
from typing_extensions import Literal, TypeAliasType
Expand Down Expand Up @@ -506,15 +509,48 @@ def cached_async_http_client(*, provider: str | None = None, timeout: int = 600,
@cache
def _cached_async_http_client(provider: str | None, timeout: int = 600, connect: int = 5) -> httpx.AsyncClient:
return httpx.AsyncClient(
transport=_cached_async_http_transport(),
transport=_get_transport_for_loop(),
timeout=httpx.Timeout(timeout=timeout, connect=connect),
headers={'User-Agent': get_user_agent()},
)


@cache
def _cached_async_http_transport() -> httpx.AsyncHTTPTransport:
return httpx.AsyncHTTPTransport()
def _get_transport_for_loop() -> _PerLoopTransport:
return _PerLoopTransport()
Comment on lines 518 to +520
Copy link
Author

@ewjoachim ewjoachim May 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a convoluted way of just defining the instance as a module variable, but it works. Also, it was kinda-already done that way before so I kept it



class _PerLoopTransport(httpx.AsyncBaseTransport):
def __init__(self):
self.transports: MutableMapping[asyncio.AbstractEventLoop, httpx.AsyncHTTPTransport] = (
weakref.WeakKeyDictionary()
)
Comment on lines +525 to +527
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be debatable if we want this to be a WeakKeyDictionary or a normal dict.

It would probably work with a normal dict, but it feels that if a loop is referenced by nobody but this mapping, we really don't need to have it here, so I'd say it makes sense to keep it.

Note that if we found a way to remove the strong reference to the loop, e.g. by closing the pool, then we would probably be happy with the WeakKeyDictionary without the for loop in get_transport, but I'm not sure I know when we should close the pool for this to stay useful (in the end, the goal is to reuse the tcp connections in the end)

Like they say, 2 hardest problems in compsci: naming things and cache invalidation and off-by-one errors.


def get_transport(self) -> httpx.AsyncHTTPTransport:
# Clean the dictionary of closed loops
for loop in list(self.transports.keys()):
if loop.is_closed():
del self.transports[loop]

return self.transports.setdefault(asyncio.get_running_loop(), httpx.AsyncHTTPTransport())
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because I'm using setdefault here, it means httpx.AsyncHTTPTransport() is always assigned even when it's not going to be used. Is that something we want to optimize ?


async def __aenter__(self):
await self.get_transport().__aenter__()
return self

async def __aexit__(
self,
exc_type: type[BaseException] | None = None,
exc_value: BaseException | None = None,
traceback: TracebackType | None = None,
) -> None:
await self.get_transport().__aexit__(exc_type, exc_value, traceback)
Comment on lines +537 to +547
Copy link
Author

@ewjoachim ewjoachim May 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those seem to be the not-covered lines. Meaning we (or rather httpx) never use the aenter/aexit API of the transport. We could just raise NotImplementError rather than do something not obvious and not needed wait for someone to actually need it ?

I'm saying "not obvious" because it's very slightly unclear whether we would want __aenter__ to return self or the underlying transport


async def handle_async_request(self, request: httpx.Request) -> httpx.Response:
return await self.get_transport().handle_async_request(request)

async def aclose(self) -> None:
await self.get_transport().aclose()


@cache
Expand Down
Loading