Skip to content

Commit

Permalink
Merge pull request #193 from Parsely/hotfix/tickless_bolt_race_condition
Browse files Browse the repository at this point in the history
Fix race condition in TicklessBatchingBolt
  • Loading branch information
dan-blanchard committed Oct 20, 2015
2 parents d743188 + 0719ad2 commit 81fef11
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 19 deletions.
40 changes: 22 additions & 18 deletions streamparse/storm/bolt.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,20 +517,7 @@ def __init__(self, *args, **kwargs):

def process_tick(self, tick_tup):
""" Just ack tick tuples and ignore them. """
if self.auto_ack:
self.ack(tick_tup)

def process(self, tup):
"""Group non-tick Tuples into batches by ``group_key``.
.. warning::
This method should **not** be overriden. If you want to tweak
how Tuples are grouped into batches, override ``group_key``.
"""
# Append latest Tuple to batches
group_key = self.group_key(tup)
with self._batch_lock:
self._batches[group_key].append(tup)
self.ack(tick_tup)

def _batch_entry_run(self):
"""The inside of ``_batch_entry``'s infinite loop.
Expand Down Expand Up @@ -559,10 +546,27 @@ def _handle_worker_exception(self, signum, frame):
reraise(*self.exc_info)


def _handle_run_exception(self, exc):
"""Process an exception encountered while running the ``run()`` loop.
def _run(self):
"""The inside of ``run``'s infinite loop.
Called right before program exits.
Separate from BatchingBolt's implementation because
we need to be able to acquire the batch lock after
reading the tuple.
We can't acquire the lock before reading the tuple because if
that hange (i.e. the topology is shutting down) the lock being
acquired will freeze the rest of the bolt, which is precisely
what this batcher seeks to avoid.
"""
tup = self.read_tuple()
with self._batch_lock:
super(TicklessBatchingBolt, self)._handle_run_exception(exc)
self._current_tups = [tup]
if self.is_heartbeat(tup):
self.send_message({'command': 'sync'})
elif self.is_tick(tup):
self.process_tick(tup)
else:
self.process(tup)
# reset so that we don't accidentally fail the wrong Tuples
# if a successive call to read_tuple fails
self._current_tups = []
2 changes: 1 addition & 1 deletion streamparse/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@
:organization: Parsely
'''

__version__ = '2.1.2'
__version__ = '2.1.3'
VERSION = tuple(int(x) for x in __version__.split('.'))

0 comments on commit 81fef11

Please sign in to comment.