Skip to content

Commit

Permalink
Set end_time in ThreadBasedCyclicSendTask.start() (#1871)
Browse files Browse the repository at this point in the history
  • Loading branch information
zariiii9003 authored Oct 18, 2024
1 parent a39e63e commit abe0db5
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 7 deletions.
9 changes: 6 additions & 3 deletions can/broadcastmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ def __init__(
"""
super().__init__(messages, period)
self.duration = duration
self.end_time: Optional[float] = None


class RestartableCyclicTaskABC(CyclicSendTaskABC, abc.ABC):
Expand Down Expand Up @@ -299,9 +300,6 @@ def __init__(
self.send_lock = lock
self.stopped = True
self.thread: Optional[threading.Thread] = None
self.end_time: Optional[float] = (
time.perf_counter() + duration if duration else None
)
self.on_error = on_error
self.modifier_callback = modifier_callback

Expand Down Expand Up @@ -341,6 +339,10 @@ def start(self) -> None:
self.thread = threading.Thread(target=self._run, name=name)
self.thread.daemon = True

self.end_time: Optional[float] = (
time.perf_counter() + self.duration if self.duration else None
)

if self.event and PYWIN32:
PYWIN32.set_timer(self.event, self.period_ms)

Expand All @@ -356,6 +358,7 @@ def _run(self) -> None:

while not self.stopped:
if self.end_time is not None and time.perf_counter() >= self.end_time:
self.stop()
break

try:
Expand Down
32 changes: 28 additions & 4 deletions test/simplecyclic_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,25 +155,49 @@ def test_stopping_perodic_tasks(self):
def test_restart_perodic_tasks(self):
period = 0.01
safe_timeout = period * 5 if not IS_PYPY else 1.0
duration = 0.3

msg = can.Message(
is_extended_id=False, arbitration_id=0x123, data=[0, 1, 2, 3, 4, 5, 6, 7]
)

def _read_all_messages(_bus: can.interfaces.virtual.VirtualBus) -> None:
sleep(safe_timeout)
while not _bus.queue.empty():
_bus.recv(timeout=period)
sleep(safe_timeout)

with can.ThreadSafeBus(interface="virtual", receive_own_messages=True) as bus:
task = bus.send_periodic(msg, period)
self.assertIsInstance(task, can.broadcastmanager.RestartableCyclicTaskABC)
self.assertIsInstance(task, can.broadcastmanager.ThreadBasedCyclicSendTask)

# Test that the task is sending messages
sleep(safe_timeout)
assert not bus.queue.empty(), "messages should have been transmitted"

# Stop the task and check that messages are no longer being sent
bus.stop_all_periodic_tasks(remove_tasks=False)
_read_all_messages(bus)
assert bus.queue.empty(), "messages should not have been transmitted"

# Restart the task and check that messages are being sent again
task.start()
sleep(safe_timeout)
while not bus.queue.empty():
bus.recv(timeout=period)
sleep(safe_timeout)
assert not bus.queue.empty(), "messages should have been transmitted"

# Stop the task and check that messages are no longer being sent
bus.stop_all_periodic_tasks(remove_tasks=False)
_read_all_messages(bus)
assert bus.queue.empty(), "messages should not have been transmitted"

# Restart the task with limited duration and wait until it stops
task.duration = duration
task.start()
sleep(duration + safe_timeout)
assert task.stopped
assert time.time() > task.end_time
assert not bus.queue.empty(), "messages should have been transmitted"
_read_all_messages(bus)
assert bus.queue.empty(), "messages should not have been transmitted"

# Restart the task and check that messages are being sent again
Expand Down

0 comments on commit abe0db5

Please sign in to comment.