From 42abae0fb78e61612824ddc4d20f0b05ff358cd6 Mon Sep 17 00:00:00 2001 From: Keith Bourgoin Date: Mon, 19 Oct 2015 11:51:22 -0400 Subject: [PATCH 1/3] Fix race condition in tickless batcher. --- streamparse/storm/bolt.py | 37 +++++++++++++++++++++---------------- 1 file changed, 21 insertions(+), 16 deletions(-) diff --git a/streamparse/storm/bolt.py b/streamparse/storm/bolt.py index f49c68f6..a81c5330 100644 --- a/streamparse/storm/bolt.py +++ b/streamparse/storm/bolt.py @@ -520,18 +520,6 @@ def process_tick(self, tick_tup): if self.auto_ack: self.ack(tick_tup) - def process(self, tup): - """Group non-tick Tuples into batches by ``group_key``. - - .. warning:: - This method should **not** be overriden. If you want to tweak - how Tuples are grouped into batches, override ``group_key``. - """ - # Append latest Tuple to batches - group_key = self.group_key(tup) - with self._batch_lock: - self._batches[group_key].append(tup) - def _batch_entry_run(self): """The inside of ``_batch_entry``'s infinite loop. @@ -559,10 +547,27 @@ def _handle_worker_exception(self, signum, frame): reraise(*self.exc_info) - def _handle_run_exception(self, exc): - """Process an exception encountered while running the ``run()`` loop. + def _run(self): + """The inside of ``run``'s infinite loop. - Called right before program exits. + Separate from BatchingBolt's implementation because + we need to be able to acquire the batch lock after + reading the tuple. + + We can't acquire the lock before reading the tuple because if + that hange (i.e. the topology is shutting down) the lock being + acquired will freeze the rest of the bolt, which is precisely + what this batcher seeks to avoid. """ + tup = self.read_tuple() with self._batch_lock: - super(TicklessBatchingBolt, self)._handle_run_exception(exc) \ No newline at end of file + self._current_tups = [tup] + if self.is_heartbeat(tup): + self.send_message({'command': 'sync'}) + elif self.is_tick(tup): + self.process_tick(tup) + else: + self.process(tup) + # reset so that we don't accidentally fail the wrong Tuples + # if a successive call to read_tuple fails + self._current_tups = [] From e24aa5c532c063e893f1866640d5522ade320632 Mon Sep 17 00:00:00 2001 From: Keith Bourgoin Date: Mon, 19 Oct 2015 11:51:46 -0400 Subject: [PATCH 2/3] Always ack tick tuples in tickless batcher. --- streamparse/storm/bolt.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/streamparse/storm/bolt.py b/streamparse/storm/bolt.py index a81c5330..901e4d78 100644 --- a/streamparse/storm/bolt.py +++ b/streamparse/storm/bolt.py @@ -517,8 +517,7 @@ def __init__(self, *args, **kwargs): def process_tick(self, tick_tup): """ Just ack tick tuples and ignore them. """ - if self.auto_ack: - self.ack(tick_tup) + self.ack(tick_tup) def _batch_entry_run(self): """The inside of ``_batch_entry``'s infinite loop. From 0719ad2e7739b5c598579cbad471ba12e256a839 Mon Sep 17 00:00:00 2001 From: Keith Bourgoin Date: Mon, 19 Oct 2015 11:51:55 -0400 Subject: [PATCH 3/3] Bump version to 2.1.3 --- streamparse/version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streamparse/version.py b/streamparse/version.py index 7e6f2f4f..65442b45 100644 --- a/streamparse/version.py +++ b/streamparse/version.py @@ -21,5 +21,5 @@ :organization: Parsely ''' -__version__ = '2.1.2' +__version__ = '2.1.3' VERSION = tuple(int(x) for x in __version__.split('.'))