Skip to content

Commit

Permalink
Merge pull request #157 from OFFIS-DAI/development
Browse files Browse the repository at this point in the history
2.1.4
  • Loading branch information
rcschrg authored Jan 27, 2025
2 parents d9038a2 + 7cc1c78 commit 3bf1109
Show file tree
Hide file tree
Showing 13 changed files with 182 additions and 48 deletions.
22 changes: 22 additions & 0 deletions docs/source/codecs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,28 @@ We have to make the type known to the codec to use it:
abc 123
abc 123

The codec distinguishes different types for decoding by assigning a type id (32 bit integer) to the type.
The type id can either be automatically generated (like above) or explicitely set when adding the serializer.
Note that if you set a type_id yourself you need to ensure that the decoding container associated the same id
with the desired type.

.. testcode::

codec = JSON()
codec.add_serializer(*MyClass.__serializer__(), type_id=4711)
my_object = MyClass("abc", 123)
encoded = codec.encode(my_object)
decoded = codec.decode(encoded)

print(my_object.x, my_object.y)
print(decoded.x, decoded.y)

.. testoutput::

abc 123
abc 123

All that is left to do now is to pass our codec to the container. This is done during container creation in the ``create_container`` method.

.. testcode::
Expand Down
2 changes: 1 addition & 1 deletion docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
author = "mango team"

# The full version, including alpha/beta/rc tags
version = release = "2.1.3"
version = release = "2.1.4"


# -- General configuration ---------------------------------------------------
Expand Down
21 changes: 10 additions & 11 deletions mango/agent/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -433,13 +433,16 @@ def _raise_exceptions(self, fut: asyncio.Future):
Inline function used as a callback to raise exceptions
:param fut: The Future object of the task
"""
if fut.exception() is not None:
logger.error(
"Agent %s: Caught the following exception in _check_inbox: ",
self.aid,
fut.exception(),
)
raise fut.exception()
try:
if fut.exception() is not None:
logger.error(
"Agent %s: Caught the following exception in _check_inbox: ",
self.aid,
fut.exception(),
)
raise fut.exception()
except asyncio.CancelledError:
pass

async def _check_inbox(self):
"""Task for waiting on new message in the inbox"""
Expand Down Expand Up @@ -487,10 +490,6 @@ async def shutdown(self):
await self._check_inbox_task
except asyncio.CancelledError:
pass
try:
await self.scheduler.stop()
except asyncio.CancelledError:
pass
try:
await self.scheduler.shutdown()
except asyncio.CancelledError:
Expand Down
58 changes: 46 additions & 12 deletions mango/messages/codecs.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

import inspect
import json
from ctypes import c_int32
from hashlib import sha1

from mango.messages.message import (
ACLMessage,
Expand Down Expand Up @@ -115,7 +117,32 @@ def decode(self, data):
"""Decode *data* from :class:`bytes` to the original data structure."""
raise NotImplementedError

def add_serializer(self, otype, serialize, deserialize):
def make_type_id(self, otype):
"""Create a type id for *otype* using:
- type name
- function names in the class
- signature of the class
and return a 32 bit integer type id."""
class_funcs = sorted(inspect.getmembers(otype, predicate=inspect.isfunction))

data = otype.__name__
for d in class_funcs:
data += d[0]

try:
attrs = [a for a in inspect.signature(otype).parameters]
for d in attrs:
data += d
except ValueError:
# object type has no inspectable signature
pass

int_hash = int(sha1(data.encode("utf-8")).hexdigest(), 16)
# truncate to 32 bit for protobuf wrapper
type_id = c_int32(int_hash).value
return type_id

def add_serializer(self, otype, serialize, deserialize, type_id=None):
"""Add methods to *serialize* and *deserialize* objects typed *otype*.
This can be used to de-/encode objects that the codec otherwise
Expand All @@ -129,9 +156,16 @@ def add_serializer(self, otype, serialize, deserialize):
"""
if otype in self._serializers:
raise ValueError(f'There is already a serializer for type "{otype}"')
typeid = len(self._serializers)
self._serializers[otype] = (typeid, serialize)
self._deserializers[typeid] = deserialize

if type_id is None:
type_id = self.make_type_id(otype)

if type_id in self._deserializers.keys():
raise ValueError(f'There is already a serializer with type id "{type_id}"')

# type_id = len(self._serializers)
self._serializers[otype] = (type_id, serialize)
self._deserializers[type_id] = deserialize

def serialize_obj(self, obj):
"""Serialize *obj* to something that the codec can encode."""
Expand All @@ -141,14 +175,14 @@ def serialize_obj(self, obj):
otype = object

try:
typeid, serialize = self._serializers[otype]
type_id, serialize = self._serializers[otype]
except KeyError:
raise SerializationError(
f'No serializer found for type "{orig_type}"'
) from None

try:
return {"__type__": (typeid, serialize(obj))}
return {"__type__": (type_id, serialize(obj))}
except Exception as e:
raise SerializationError(
f'Could not serialize object "{obj!r}": {e}'
Expand All @@ -159,8 +193,8 @@ def deserialize_obj(self, obj_repr):
# This method is called for *all* dicts so we have to check if it
# contains a desrializable type.
if "__type__" in obj_repr:
typeid, data = obj_repr["__type__"]
obj_repr = self._deserializers[typeid](data)
type_id, data = obj_repr["__type__"]
obj_repr = self._deserializers[type_id](data)
return obj_repr


Expand Down Expand Up @@ -197,8 +231,8 @@ def encode(self, data):
# This is to have the type_id available to decoding later.
# Otherwise, we can not infer the original proto type from the serialized message.
proto_msg = GenericProtoMsg()
typeid, content = self.serialize_obj(data)
proto_msg.type_id = typeid
type_id, content = self.serialize_obj(data)
proto_msg.type_id = type_id
proto_msg.content = content.SerializeToString()
return proto_msg.SerializeToString()

Expand Down Expand Up @@ -262,9 +296,9 @@ def _acl_to_proto(self, acl_message):

# content is only allowed to be a proto message known to the codec here
if acl_message.content is not None:
typeid, content = self.serialize_obj(acl_message.content)
type_id, content = self.serialize_obj(acl_message.content)
msg.content = content.SerializeToString()
msg.content_type = typeid
msg.content_type = type_id

return msg

Expand Down
36 changes: 26 additions & 10 deletions mango/util/scheduling.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,17 @@

def _raise_exceptions(fut: asyncio.Future):
"""
Inline function used as a callback to raise exceptions
Inline function used as a callback to raise exceptions.
:param fut: The Future object of the task
"""
if fut.exception() is not None:
try:
raise fut.exception()
except Exception:
logger.exception("got exception in scheduled event")
try:
if fut.exception() is not None:
try:
raise fut.exception()
except BaseException:
logger.exception("got exception in scheduled event")
except asyncio.CancelledError:
pass # if this happens the task has been cancelled by mango


@dataclass
Expand Down Expand Up @@ -891,13 +894,21 @@ def _remove_generic_task(self, target_list, fut=asyncio.Future):
del target_list[i]
break

async def stop_tasks(self, task_list):
for i in range(len(task_list) - 1, -1, -1):
_, task, _, _ = task_list[i]
task.cancel()
try:
await task
except asyncio.CancelledError:
pass

async def stop(self):
"""
Cancel all not finished scheduled tasks
"""
for _, task, _, _ in self._scheduled_tasks + self._scheduled_process_tasks:
task.cancel()
await task
await self.stop_tasks(self._scheduled_tasks)
await self.stop_tasks(self._scheduled_process_tasks)

async def tasks_complete(self, timeout=1, recursive=False):
"""Finish all pending tasks using a timeout.
Expand Down Expand Up @@ -942,8 +953,13 @@ async def shutdown(self):
# resume all process so they can get shutdown
for _, _, scheduled_process_control, _ in self._scheduled_process_tasks:
scheduled_process_control.kill_process()
if len(self._scheduled_tasks) > 0:
logger.debug(
"There are still scheduled tasks running on shutdown %s",
self._scheduled_tasks,
)
await self.stop()
for task, _, _, _ in self._scheduled_tasks:
task.close()
await self.stop()
if self._process_pool_exec is not None:
self._process_pool_exec.shutdown()
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "mango-agents"
version = "2.1.3"
version = "2.1.4"
authors = [
{ name="mango Team", email="[email protected]" },
]
Expand Down
6 changes: 3 additions & 3 deletions tests/unit_tests/clock/test_external_clock.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,9 @@ async def test_schedule_timestamp_task():
await increase_time_task

for task_no in test_tasks:
assert (
task_no / 10 in results_dict_asyncio.keys()
), f"results_dict_asyncio {results_dict_asyncio}"
assert task_no / 10 in results_dict_asyncio.keys(), (
f"results_dict_asyncio {results_dict_asyncio}"
)
assert task_no in results_dict_external.keys()

for simulation_time, real_time in results_dict_external.items():
Expand Down
21 changes: 21 additions & 0 deletions tests/unit_tests/core/test_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,24 @@ async def run_this(c):
assert agent2.test_counter == 1

asyncio.run(run_this(c))


async def do_weird_stuff():
fut = asyncio.Future()
await fut


@pytest.mark.asyncio
async def test_agent_with_deadlock_task():
# GIVEN
c = create_tcp_container(addr=("127.0.0.1", 5555))
agent = c.register(MyAgent())

async with activate(c) as c:
t = agent.schedule_instant_task(do_weird_stuff())
t = agent.schedule_instant_task(do_weird_stuff())
t = agent.schedule_instant_task(do_weird_stuff())
t = agent.schedule_instant_task(do_weird_stuff())

# THEN
assert len(agent.scheduler._scheduled_tasks) == 0
6 changes: 3 additions & 3 deletions tests/unit_tests/core/test_external_scheduling_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,9 @@ async def test_step_with_replying_agent():
container_output = await external_scheduling_container.step(
simulation_time=10, incoming_messages=[encoded_msg]
)
assert (
len(container_output.messages) == 3
), f"output messages: {container_output.messages}"
assert len(container_output.messages) == 3, (
f"output messages: {container_output.messages}"
)
assert (
container_output.messages[0].time
< container_output.messages[1].time
Expand Down
19 changes: 18 additions & 1 deletion tests/unit_tests/messages/test_codecs.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,27 @@ def __eq__(self, other):
# ------------------
# base class tests
# ------------------
def test_add_serializer_add_new():
def test_add_serializer_id_auto():
my_codec = Codec()
my_codec.add_serializer(*SomeOtherClass.__serializer__())
my_codec.add_serializer(*SomeDataClass.__serializer__())
assert True


def test_add_serialize_id_manual():
my_codec = Codec()
my_codec.add_serializer(*SomeOtherClass.__serializer__(), type_id=10)

# can't add same class twice with different id
with pytest.raises(ValueError):
my_codec.add_serializer(*SomeOtherClass.__serializer__(), type_id=20)

# can't add other class with same id
with pytest.raises(ValueError):
my_codec.add_serializer(*SomeDataClass.__serializer__(), type_id=10)

# can add different class with different id
my_codec.add_serializer(*SomeDataClass.__serializer__(), type_id=20)
assert True


Expand Down
6 changes: 3 additions & 3 deletions tests/unit_tests/role_agent_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ async def wait_for_pong_replies(self, timeout=1):
f"Timeout occurred while waiting for the ping response of {addr}, "
"going to check if all messages could be send"
)
assert (
self._expect_no_answer
), "Not all pong replies have arrived on time"
assert self._expect_no_answer, (
"Not all pong replies have arrived on time"
)


class DeactivateAllRoles(Role):
Expand Down
6 changes: 3 additions & 3 deletions tests/unit_tests/test_agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ async def wait_for_sending_messages(self, timeout=1):
except asyncio.TimeoutError:
assert False, "Timeout occurred while waiting for sending a message"

assert (
t.exception() is None and t.result()
), "Sending of at least one message failed"
assert t.exception() is None and t.result(), (
"Sending of at least one message failed"
)

async def wait_for_pong_replies(self, timeout=1):
for addr_tuple, fut in self.open_ping_requests.items():
Expand Down
Loading

0 comments on commit 3bf1109

Please sign in to comment.