-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
As Agent Process does not schedule tasks correctly with externalClock #54
Comments
In GitLab by @maurerle on May 28, 2023, 12:42 I could get it to work using the DistributedClockAgent which distributes the time to its mirroragent running in the mirror container: import asyncio
from datetime import datetime
from dateutil import rrule
from mango import Agent, create_container
from mango.util.clock import ExternalClock
from mango.util.distributed_clock import DistributedClockAgent, DistributedClockManager
class Caller(Agent):
def __init__(self, container, receiver_addr, receiver_id, recurrency):
super().__init__(container)
self.receiver_addr = receiver_addr
self.receiver_id = receiver_id
self.schedule_recurrent_task(
coroutine_func=self.send_hello_world, recurrency=recurrency
)
async def send_hello_world(self):
time = datetime.fromtimestamp(self._scheduler.clock.time)
await self.send_acl_message(
receiver_addr=self.receiver_addr,
receiver_id=self.receiver_id,
content=f"Current time is {time}",
)
def handle_message(self, content, meta):
pass
class Receiver(Agent):
def __init__(self, container):
super().__init__(container)
self.wait_for_reply = asyncio.Future()
def handle_message(self, content, meta):
print(f"Received a message with the following content: {content}.")
async def main(start):
clock = ExternalClock(start_time=start.timestamp())
addr = ("127.0.0.1", 5555)
# market acts every 15 minutes
recurrency = rrule.rrule(rrule.MINUTELY, interval=15, dtstart=start)
c = await create_container(addr=addr, clock=clock)
same_process = False
clock_manager = DistributedClockManager(
c, receiver_clock_addresses=[addr]
)
if same_process:
receiver = Receiver(c)
caller = Caller(c, addr, receiver.aid, recurrency)
else:
def creator(container):
receiver = Receiver(container)
caller = Caller(container, addr, receiver.aid, recurrency)
clock_agent = DistributedClockAgent(container)
await c.as_agent_process(
agent_creator=creator
)
if isinstance(clock, ExternalClock):
for i in range(100):
await asyncio.sleep(0.01)
clock.set_time(clock.time + 60)
await clock_manager.distribute_time()
await c.shutdown()
if __name__ == "__main__":
from dateutil.parser import parse
start = parse("202301010000")
asyncio.run(main(start)) So it might be needed to add this feature when using as_agent_process..? |
In GitLab by @maurerle on Sep 11, 2023, 13:47 Pinging this - what is the best way to resolve this? |
I think it makes sense to use the DistributedClockManager. However it is not intuitive that it does not work with the regular external clock. So I think we should tackle this in the future, maybe we can just use shared memory in the external clock to resolve this. |
In GitLab by @maurerle on May 28, 2023, 11:51
The agent process feature seems to be very useful and has a lot of potential! Very neat!
Yet I found the following bug scheduling tasks to an external Clock:
I would expect that this new feature should behave correctly in the following example, yet it only schedules the first occurence.
This happens with
schedule_recurrent_task
,schedule_periodic_task
and similar - because the time in the mirror container is not synchronized with the original container..Besides that: Why is
WAIT_STEP = 0.01
?Full example:
The text was updated successfully, but these errors were encountered: