Skip to content

Commit

Permalink
Vine: Memory Leak Supplement (#3598)
Browse files Browse the repository at this point in the history
* vine: mem leak

* add cleanup in __del__
  • Loading branch information
JinZhou5042 authored Jan 9, 2024
1 parent 6e94c60 commit 2df7c68
Showing 1 changed file with 29 additions and 7 deletions.
36 changes: 29 additions & 7 deletions taskvine/src/bindings/python3/ndcctools/taskvine/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -974,21 +974,22 @@ def __init__(self, library_name, fn, *args, **kwargs):
self._event["fn_args"] = args
self.set_time_max(900) # maximum run time for function calls is 900s by default.
self.needs_library(library_name)
self.output_buffer = None
self._input_buffer = None
self._output_buffer = None

##
# Finalizes the task definition once the manager that will execute is run.
# This function is run by the manager before registering the task for
# execution.
#
#
# @param self Reference to the current python task object
# @param manager Manager to which the task was submitted
def submit_finalize(self, manager):
super().submit_finalize(manager)
f = manager.declare_buffer(cloudpickle.dumps(self._event))
self.add_input(f, "infile")
self.output_buffer = manager.declare_buffer(cache=False, peer_transfer=False)
self.add_output(self.output_buffer, "outfile")
self._input_buffer = manager.declare_buffer(buffer=cloudpickle.dumps(self._event), cache=False, peer_transfer=True)
self.add_input(self._input_buffer, "infile")
self._output_buffer = manager.declare_buffer(buffer=None, cache=False, peer_transfer=False)
self.add_output(self._output_buffer, "outfile")

##
# Specify function arguments. Accepts arrays and dictionaries. This
Expand All @@ -1012,14 +1013,35 @@ def set_exec_method(self, remote_task_exec_method):
remote_task_exec_method = "fork"
self._event["remote_task_exec_method"] = remote_task_exec_method

##
# Retrieve output, handles cleanup, and returns result or failure reason.
@property
def output(self):
output = cloudpickle.loads(self.output_buffer.contents())
output = cloudpickle.loads(self._output_buffer.contents())
self._manager.remove_file(self._input_buffer)
self._input_buffer = None
self._manager.remove_file(self._output_buffer)
self._output_buffer = None

if output['Success']:
return output['Result']
else:
return output['Reason']

##
# Remove input and output buffers under some circumstances `output` is not called
def __del__(self):
try:
if self._input_buffer:
self._manager.remove_file(self._input_buffer)
self._input_buffer = None
if self._output_buffer:
self._manager.remove_file(self._output_buffer)
self._output_buffer = None
super().__del__()
except TypeError:
pass


##
# \class LibraryTask
Expand Down

0 comments on commit 2df7c68

Please sign in to comment.