@@ -1217,6 +1217,10 @@ class ParallelEnv(BatchedEnvBase, metaclass=_PEnvMeta):
1217
1217
__doc__ += BatchedEnvBase .__doc__
1218
1218
__doc__ += """
1219
1219
1220
+ .. note:: ParallelEnv will timeout after one of the worker is idle for a determinate amount of time.
1221
+ This can be controlled via the BATCHED_PIPE_TIMEOUT environment variable, which in turn modifies
1222
+ the torchrl._utils.BATCHED_PIPE_TIMEOUT integer. The default timeout value is 10000 seconds.
1223
+
1220
1224
.. warning::
1221
1225
TorchRL's ParallelEnv is quite stringent when it comes to env specs, since
1222
1226
these are used to build shared memory buffers for inter-process communication.
@@ -1353,7 +1357,10 @@ class ParallelEnv(BatchedEnvBase, metaclass=_PEnvMeta):
1353
1357
"""
1354
1358
1355
1359
def _start_workers (self ) -> None :
1360
+ import torchrl
1361
+
1356
1362
self ._timeout = 10.0
1363
+ self .BATCHED_PIPE_TIMEOUT = torchrl ._utils .BATCHED_PIPE_TIMEOUT
1357
1364
1358
1365
from torchrl .envs .env_creator import EnvCreator
1359
1366
@@ -1606,7 +1613,7 @@ def step_and_maybe_reset(
1606
1613
1607
1614
for i in workers_range :
1608
1615
event = self ._events [i ]
1609
- event .wait (self ._timeout )
1616
+ event .wait (self .BATCHED_PIPE_TIMEOUT )
1610
1617
event .clear ()
1611
1618
1612
1619
if self ._non_tensor_keys :
@@ -1796,7 +1803,7 @@ def _step(self, tensordict: TensorDictBase) -> TensorDictBase:
1796
1803
1797
1804
for i in workers_range :
1798
1805
event = self ._events [i ]
1799
- event .wait (self ._timeout )
1806
+ event .wait (self .BATCHED_PIPE_TIMEOUT )
1800
1807
event .clear ()
1801
1808
1802
1809
if self ._non_tensor_keys :
@@ -1965,7 +1972,7 @@ def tentative_update(val, other):
1965
1972
1966
1973
for i , _ in outs :
1967
1974
event = self ._events [i ]
1968
- event .wait (self ._timeout )
1975
+ event .wait (self .BATCHED_PIPE_TIMEOUT )
1969
1976
event .clear ()
1970
1977
1971
1978
workers_nontensor = []
@@ -2023,7 +2030,7 @@ def _shutdown_workers(self) -> None:
2023
2030
for channel in self .parent_channels :
2024
2031
channel .close ()
2025
2032
for proc in self ._workers :
2026
- proc .join (timeout = 1.0 )
2033
+ proc .join (timeout = self . _timeout )
2027
2034
finally :
2028
2035
for proc in self ._workers :
2029
2036
if proc .is_alive ():
0 commit comments