1
- """Implementation of the @dike.batch decorator"""
1
+ """Implementation of the @dike.batch decorator."""
2
+
2
3
import asyncio
3
4
import functools
4
5
from contextlib import contextmanager
11
12
12
13
13
14
# Deactivate mccabe's complexity warnings which doesn't like closures
14
- # flake8: noqa: C901
15
- def batch (
15
+ def batch ( # noqa: PLR0915, C901
16
16
* ,
17
17
target_batch_size : int ,
18
18
max_waiting_time : float ,
19
19
max_processing_time : float = 10.0 ,
20
20
argument_type : str = "list" ,
21
21
) -> Callable [[Callable [..., Coroutine [Any , Any , Any ]]], Callable [..., Coroutine [Any , Any , Any ]]]:
22
22
"""@batch is a decorator to cumulate function calls and process them in batches.
23
- Not thread-safe.
24
23
25
- The function to wrap must have arguments of type list or numpy.array which can be aggregated.
24
+ Not thread-safe.
25
+
26
+ The function to wrap must have arguments of type list or `numpy.array` which can be aggregated.
26
27
It must return just a single value of the same type. The type has to be specified with the
27
28
`argument_type` parameter of the decorator.
28
29
@@ -102,7 +103,7 @@ def batch(
102
103
if argument_type == "numpy" and np is None :
103
104
raise ValueError ('Unable to use "numpy" as argument_type because numpy is not available' )
104
105
105
- def decorator (func ):
106
+ def decorator (func ): # noqa: C901, PLR0915
106
107
next_free_batch : int = 0
107
108
call_args_queue : List [Tuple [List , Dict ]] = []
108
109
n_rows_in_queue : int = 0
@@ -112,16 +113,21 @@ def decorator(func):
112
113
113
114
@functools .wraps (func )
114
115
async def batching_call (* args , ** kwargs ):
115
- """This is the actual wrapper function which controls the process"""
116
- nonlocal results , num_results_ready , result_ready_events , call_args_queue , n_rows_in_queue
116
+ """This is the actual wrapper function which controls the process."""
117
+ nonlocal \
118
+ results , \
119
+ num_results_ready , \
120
+ result_ready_events , \
121
+ call_args_queue , \
122
+ n_rows_in_queue
117
123
118
124
with enqueue (args , kwargs ) as (my_batch_no , start_index , stop_index ):
119
125
await wait_for_calculation (my_batch_no )
120
126
return get_results (start_index , stop_index , my_batch_no )
121
127
122
128
@contextmanager
123
129
def enqueue (args , kwargs ) -> (int , int , int ):
124
- """Add call arguments to queue and get the batch number and result indices"""
130
+ """Add call arguments to queue and get the batch number and result indices. """
125
131
batch_no = next_free_batch
126
132
if batch_no not in result_ready_events :
127
133
result_ready_events [batch_no ] = asyncio .Event ()
@@ -132,7 +138,7 @@ def enqueue(args, kwargs) -> (int, int, int):
132
138
remove_result (batch_no )
133
139
134
140
def add_args_to_queue (args , kwargs ):
135
- """Add a new argument vector to the queue and return result indices"""
141
+ """Add a new argument vector to the queue and return result indices. """
136
142
nonlocal call_args_queue , n_rows_in_queue
137
143
138
144
if call_args_queue and (
@@ -155,7 +161,7 @@ def add_args_to_queue(args, kwargs):
155
161
return offset , n_rows_in_queue
156
162
157
163
async def wait_for_calculation (batch_no_to_calculate ):
158
- """Pause until the result becomes available or trigger the calculation on timeout"""
164
+ """Pause until the result becomes available or trigger the calculation on timeout. """
159
165
if n_rows_in_queue >= target_batch_size :
160
166
await calculate (batch_no_to_calculate )
161
167
else :
@@ -173,20 +179,20 @@ async def wait_for_calculation(batch_no_to_calculate):
173
179
)
174
180
175
181
async def calculate (batch_no_to_calculate ):
176
- """Call the decorated coroutine with batched arguments"""
182
+ """Call the decorated coroutine with batched arguments. """
177
183
nonlocal results , call_args_queue , num_results_ready
178
184
if next_free_batch == batch_no_to_calculate :
179
185
n_results = len (call_args_queue )
180
186
args , kwargs = pop_args_from_queue ()
181
187
try :
182
188
results [batch_no_to_calculate ] = await func (* args , ** kwargs )
183
- except Exception as e : # pylint: disable=broad-except
189
+ except Exception as e : # pylint: disable=broad-except # noqa: BLE001
184
190
results [batch_no_to_calculate ] = e
185
191
num_results_ready [batch_no_to_calculate ] = n_results
186
192
result_ready_events [batch_no_to_calculate ].set ()
187
193
188
194
def pop_args_from_queue ():
189
- """Get all collected arguments from the queue as batch"""
195
+ """Get all collected arguments from the queue as batch. """
190
196
nonlocal next_free_batch , call_args_queue , n_rows_in_queue
191
197
192
198
n_args = len (call_args_queue [0 ][0 ])
@@ -215,7 +221,7 @@ def pop_args_from_queue():
215
221
return args , kwargs
216
222
217
223
def get_results (start_index : int , stop_index : int , batch_no ):
218
- """Pop the results for a certain index range from the output buffer"""
224
+ """Pop the results for a certain index range from the output buffer. """
219
225
nonlocal results
220
226
221
227
if isinstance (results [batch_no ], Exception ):
@@ -225,7 +231,7 @@ def get_results(start_index: int, stop_index: int, batch_no):
225
231
return results_to_return
226
232
227
233
def remove_result (batch_no ):
228
- """Reduce reference count to output buffer and eventually delete it"""
234
+ """Reduce reference count to output buffer and eventually delete it. """
229
235
nonlocal num_results_ready , result_ready_events , results
230
236
231
237
if num_results_ready [batch_no ] == 1 :
0 commit comments