Skip to content

Commit

Permalink
Multiple resource leaks fixed in proxy and dealer
Browse files Browse the repository at this point in the history
  • Loading branch information
DZabavchik committed Feb 21, 2025
1 parent c1a7836 commit 5b848cb
Show file tree
Hide file tree
Showing 10 changed files with 300 additions and 125 deletions.
4 changes: 4 additions & 0 deletions crossbar/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
if not hasattr(eth_abi, 'encode_single') and hasattr(eth_abi, 'encode'):
eth_abi.encode_single = eth_abi.encode

import eth_typing
if not hasattr(eth_typing, 'ChainID') and hasattr(eth_typing, 'ChainId'):
eth_typing.ChainID = eth_typing.ChainId

# monkey patch web3 for master branch / upcoming v6 (which we need for python 3.11)
# AttributeError: type object 'Web3' has no attribute 'toChecksumAddress'. Did you mean: 'to_checksum_address'?
import web3
Expand Down
60 changes: 45 additions & 15 deletions crossbar/router/dealer.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class InvocationRequest(object):
'id',
'registration',
'caller',
'caller_session_id',
'call',
'callee',
'forward_for',
Expand All @@ -53,6 +54,7 @@ def __init__(self, id, registration, caller, call, callee, forward_for, authoriz
self.id = id
self.registration = registration
self.caller = caller
self.caller_session_id = caller._session_id
self.call = call
self.callee = callee
self.forward_for = forward_for
Expand Down Expand Up @@ -185,6 +187,7 @@ def detach(self, session):
is_rlink_session = (session._authrole == "rlink")
if session in self._caller_to_invocations:

# this needs to update all four places where we track invocations similar to _remove_invoke_request
outstanding = self._caller_to_invocations.get(session, [])
for invoke in outstanding: # type: InvocationRequest
if invoke.canceled:
Expand All @@ -207,11 +210,26 @@ def detach(self, session):
request=invoke.id,
session=session._session_id,
)

if invoke.timeout_call:
invoke.timeout_call.cancel()
invoke.timeout_call = None

invokes = self._callee_to_invocations[callee]
invokes.remove(invoke)
if not invokes:
del self._callee_to_invocations[callee]

del self._invocations[invoke.id]
del self._invocations_by_call[(invoke.caller_session_id, invoke.call.request)]

self._router.send(invoke.callee, message.Interrupt(
request=invoke.id,
mode=message.Cancel.KILLNOWAIT,
))

del self._caller_to_invocations[session]

if session in self._session_to_registrations:

# send out Errors for any in-flight calls we have
Expand All @@ -235,6 +253,21 @@ def detach(self, session):
if invoke.caller._transport:
invoke.caller._transport.send(reply)

if invoke.timeout_call:
invoke.timeout_call.cancel()
invoke.timeout_call = None

invokes = self._caller_to_invocations[invoke.caller]
invokes.remove(invoke)
if not invokes:
del self._caller_to_invocations[invoke.caller]

del self._invocations[invoke.id]
del self._invocations_by_call[(invoke.caller_session_id, invoke.call.request)]

if outstanding:
del self._callee_to_invocations[session]

for registration in self._session_to_registrations[session]:
was_registered, was_last_callee = self._registration_map.drop_observer(session, registration)

Expand Down Expand Up @@ -1120,23 +1153,20 @@ def _remove_invoke_request(self, invocation_request):
invocation_request.timeout_call.cancel()
invocation_request.timeout_call = None

invokes = self._callee_to_invocations[invocation_request.callee]
invokes.remove(invocation_request)
if not invokes:
del self._callee_to_invocations[invocation_request.callee]

invokes = self._caller_to_invocations[invocation_request.caller]
invokes.remove(invocation_request)
if not invokes:
del self._caller_to_invocations[invocation_request.caller]
# all four places should always be updated together
if invocation_request.id in self._invocations:
del self._invocations[invocation_request.id]
invokes = self._callee_to_invocations[invocation_request.callee]
invokes.remove(invocation_request)
if not invokes:
del self._callee_to_invocations[invocation_request.callee]

del self._invocations[invocation_request.id]
invokes = self._caller_to_invocations[invocation_request.caller]
invokes.remove(invocation_request)
if not invokes:
del self._caller_to_invocations[invocation_request.caller]

# the session_id will be None if the caller session has
# already vanished
caller_id = invocation_request.caller._session_id
if caller_id is not None:
del self._invocations_by_call[caller_id, invocation_request.call.request]
del self._invocations_by_call[invocation_request.caller_session_id, invocation_request.call.request]

# noinspection PyUnusedLocal
def processCancel(self, session, cancel):
Expand Down
3 changes: 2 additions & 1 deletion crossbar/router/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -736,12 +736,13 @@ def onClose(self, wasClean):
self.onLeave(CloseDetails())
except Exception:
self.log.failure("Exception raised in onLeave callback")
self.log.warn("{tb}".format(tb=Failure().getTraceback()))

try:
self._router.detach(self)
except Exception as e:
self.log.error("Failed to detach session '{}': {}".format(self._session_id, e))
self.log.debug("{tb}".format(tb=Failure().getTraceback()))
self.log.warn("{tb}".format(tb=Failure().getTraceback()))

self._session_id = None

Expand Down
7 changes: 7 additions & 0 deletions crossbar/router/test/test_dealer.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,17 @@ def test_outstanding_invoke_but_caller_gone(self):
outstanding = mock.Mock()
outstanding.call.request = 1

# there was a bug where timeout calls were not getting cancelled
# mock has non-null timeout_call, so we need to set it to None
outstanding.timeout_call = None
dealer = self.router._dealer
dealer.attach(session)

# All four maps involved in invocation tracking must be updated atomically
dealer._caller_to_invocations[outstanding.caller] = [outstanding]
dealer._callee_to_invocations[session] = [outstanding]
dealer._invocations[outstanding.id] = outstanding
dealer._invocations_by_call[(outstanding.caller_session_id, outstanding.call.request)] = outstanding
# pretend we've disconnected already
outstanding.caller._transport = None

Expand Down
2 changes: 1 addition & 1 deletion crossbar/worker/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def _run_command_exec_worker(options, reactor=None, personality=None):

# we use an Autobahn utility to import the "best" available Twisted reactor
from autobahn.twisted.choosereactor import install_reactor
reactor = install_reactor(options.reactor)
reactor = install_reactor(explicit_reactor=options.reactor or os.environ.get('CROSSBAR_REACTOR', None))

# make sure logging to something else than stdio is setup _first_
from crossbar._logging import make_JSON_observer, cb_logging_aware
Expand Down
Loading

0 comments on commit 5b848cb

Please sign in to comment.