Skip to content

#748: don't share httpx.AsyncHTTPTransport between event loops #1695

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

ewjoachim
Copy link

@ewjoachim ewjoachim commented May 12, 2025

Fixes #748

In cached_async_http_client we cache the http client (...duh) and its transport. The transport keeps a pointer to the current event loop through:

transport._pool._connections[0]._connection._network_stream._stream.transport_stream._transport._loop

and will try to reuse this event loop on subsequent calls. This causes RuntimeError('Event loop is closed') in case that loop was closed and we're using another one.

My fix makes it so that the real instance of the transport is only evaluated when we're in a loop context, and returns a different instance for each distinct loop.

I've been using https://github.com/oscar-broman/pydantic-ai-gemini-issue-mre as a reproducer, and it fails on main and passes on my branch.

The reason we don't see the same test failure in this repo's tests surely is this: because there are fixtures with scope='session', anyIO only creates a single event loop that each test share, so we can't see the issue (also, we mock HTTP calls with pytest-recording and I'm not 100% sure the mock occurs at a level that would make the test fail visible)

All that is to say: I'm not sure how best to implement a reproducer without impeding too much on the codebase. Opinions from maintainers welcome :)

Warning

I'm afraid this makes the following fixture very brittle. It will work if the anyio single event loop issue continues to be, but if we manage to have an event loop for each test, then we won't be able to properly close the clients. That said: isn't that an issue with real code in the real life too ? Who's responsible for closing those cached clients ? (maybe they close on __del__ but that's not ideal, is it ?

@pytest.fixture(autouse=True)
async def close_cached_httpx_client() -> AsyncIterator[None]:
yield
for provider in [
'openai',
'anthropic',
'azure',
'google-gla',
'google-vertex',
'groq',
'mistral',
'cohere',
'deepseek',
None,
]:
await cached_async_http_client(provider=provider).aclose()

@ewjoachim ewjoachim force-pushed the fix-event-loop-gemini branch from c27343f to e5ccea4 Compare May 12, 2025 13:55
@ewjoachim
Copy link
Author

Screenshot 2025-05-12 at 16 24 41

We have another issue. Everywhere we copy the http client on the model from the provider (see screenshot above), we then won't be using the cache attributes to know whether this client belongs to the appropriate loop or not.

@ewjoachim ewjoachim force-pushed the fix-event-loop-gemini branch from e5ccea4 to 45b685d Compare May 12, 2025 15:47
@ewjoachim
Copy link
Author

Ok, I've amended my commit with a new proposal, though I haven't tested it as much as the previous one. Now, this idea is to use a proxy object for the Transport, and decide on the real object only when we're sure we're in a valid async context.

What do you think ? Is that accesptable and should I work on that or do you see a flaw in that plan ?

@ewjoachim
Copy link
Author

Hm, I thought of a much cleaner solution: keeping transports in a weakrefs.WeakKeyDictionary[EventLoop, Transports]. This way, we keep the transports while the loop is alive and drop it when it's gone.

BUT the whole point is that transports keep a strong ref to the loop, so if we keep a strong ref to the transport, the loop's refcount is never going to go to zero 😭

I guess though we could remove any item from the dict where the loop is closed 🤔

@ewjoachim ewjoachim force-pushed the fix-event-loop-gemini branch from 45b685d to bbc34be Compare May 12, 2025 21:13
@ewjoachim
Copy link
Author

ewjoachim commented May 12, 2025

New tentative (likely going to get complain for missing coverage).

I've kept the weakref just in case, but I've added code that will remove from the (weak) keys when the loops that are closed.

I think this should solve the issue. Happy to get some feedback before spending time adding missing tests. Also, if you want to take it from here, feel free :)

@bojan2501
Copy link

I tested changes with simple Streamlit app and Vertex AI.
Old issue with Closed Loops is gone.
Will try do some more tests and see how it works.
Chears for the fix.

@Kludex Kludex self-assigned this May 13, 2025
Copy link
Author

@ewjoachim ewjoachim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just some comments to make review maybe easier ?

if loop.is_closed():
del self.transports[loop]

return self.transports.setdefault(asyncio.get_running_loop(), httpx.AsyncHTTPTransport())
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because I'm using setdefault here, it means httpx.AsyncHTTPTransport() is always assigned even when it's not going to be used. Is that something we want to optimize ?

Comment on lines +538 to +548
async def __aenter__(self):
await self.get_transport().__aenter__()
return self

async def __aexit__(
self,
exc_type: type[BaseException] | None = None,
exc_value: BaseException | None = None,
traceback: TracebackType | None = None,
) -> None:
await self.get_transport().__aexit__(exc_type, exc_value, traceback)
Copy link
Author

@ewjoachim ewjoachim May 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those seem to be the not-covered lines. Meaning we (or rather httpx) never use the aenter/aexit API of the transport. We could just raise NotImplementError rather than do something not obvious and not needed wait for someone to actually need it ?

I'm saying "not obvious" because it's very slightly unclear whether we would want __aenter__ to return self or the underlying transport

Comment on lines 518 to +520
@cache
def _cached_async_http_transport() -> httpx.AsyncHTTPTransport:
return httpx.AsyncHTTPTransport()
def _get_transport_for_loop() -> _PerLoopTransport:
return _PerLoopTransport()
Copy link
Author

@ewjoachim ewjoachim May 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a convoluted way of just defining the instance as a module variable, but it works. Also, it was kinda-already done that way before so I kept it

Comment on lines +526 to +528
self.transports: MutableMapping[asyncio.AbstractEventLoop, httpx.AsyncHTTPTransport] = (
weakref.WeakKeyDictionary()
)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be debatable if we want this to be a WeakKeyDictionary or a normal dict.

It would probably work with a normal dict, but it feels that if a loop is referenced by nobody but this mapping, we really don't need to have it here, so I'd say it makes sense to keep it.

Note that if we found a way to remove the strong reference to the loop, e.g. by closing the pool, then we would probably be happy with the WeakKeyDictionary without the for loop in get_transport, but I'm not sure I know when we should close the pool for this to stay useful (in the end, the goal is to reuse the tcp connections in the end)

Like they say, 2 hardest problems in compsci: naming things and cache invalidation and off-by-one errors.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Gemini causes 'Event loop is closed' when running inside an async context
3 participants