Skip to content

Commit

Permalink
Merge pull request #543 from remind101/blocking
Browse files Browse the repository at this point in the history
Improved signal handling
  • Loading branch information
ejholmes authored Feb 28, 2018
2 parents 937f222 + b17620e commit 9e82548
Show file tree
Hide file tree
Showing 11 changed files with 55 additions and 90 deletions.
12 changes: 9 additions & 3 deletions stacker/actions/base.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import sys
import logging
import threading

Expand All @@ -7,6 +8,7 @@

import botocore.exceptions
from stacker.session_cache import get_session
from stacker.exceptions import PlanFailed

from stacker.util import (
ensure_s3_bucket,
Expand Down Expand Up @@ -188,9 +190,13 @@ def s3_stack_push(self, blueprint, force=False):
return template_url

def execute(self, *args, **kwargs):
self.pre_run(*args, **kwargs)
self.run(*args, **kwargs)
self.post_run(*args, **kwargs)
try:
self.pre_run(*args, **kwargs)
self.run(*args, **kwargs)
self.post_run(*args, **kwargs)
except PlanFailed as e:
logger.error(e.message)
sys.exit(1)

def pre_run(self, *args, **kwargs):
pass
Expand Down
4 changes: 1 addition & 3 deletions stacker/actions/build.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import logging
import sys

from .base import BaseAction, plan, build_walker
from .base import STACK_POLL_TIME
Expand Down Expand Up @@ -354,8 +353,7 @@ def run(self, concurrency=0, outline=False,
plan.outline(logging.DEBUG)
logger.debug("Launching stacks: %s", ", ".join(plan.keys()))
walker = build_walker(concurrency)
if not plan.execute(walker):
sys.exit(1)
plan.execute(walker)
else:
if outline:
plan.outline()
Expand Down
4 changes: 1 addition & 3 deletions stacker/actions/destroy.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import logging
import sys

from .base import BaseAction, plan, build_walker
from .base import STACK_POLL_TIME
Expand Down Expand Up @@ -91,8 +90,7 @@ def run(self, force, concurrency=0, tail=False, *args, **kwargs):
# steps to COMPLETE in order to log them
plan.outline(logging.DEBUG)
walker = build_walker(concurrency)
if not plan.execute(walker):
sys.exit(1)
plan.execute(walker)
else:
plan.outline(message="To execute this plan, run with \"--force\" "
"flag.")
Expand Down
4 changes: 1 addition & 3 deletions stacker/actions/diff.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import difflib
import json
import logging
import sys
from operator import attrgetter

from .base import plan, build_walker
Expand Down Expand Up @@ -270,8 +269,7 @@ def run(self, concurrency=0, *args, **kwargs):
plan.outline(logging.DEBUG)
logger.info("Diffing stacks: %s", ", ".join(plan.keys()))
walker = build_walker(concurrency)
if not plan.execute(walker):
sys.exit(1)
plan.execute(walker)

"""Don't ever do anything for pre_run or post_run"""
def pre_run(self, *args, **kwargs):
Expand Down
10 changes: 10 additions & 0 deletions stacker/commands/stacker/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@

from ...environment import parse_environment

logger = logging.getLogger(__name__)

SIGNAL_NAMES = {
signal.SIGINT: "SIGINT",
signal.SIGTERM: "SIGTERM",
}


def cancel():
"""Returns a threading.Event() that will get set when SIGTERM, or
Expand All @@ -14,6 +21,9 @@ def cancel():
cancel = threading.Event()

def cancel_execution(signum, frame):
signame = SIGNAL_NAMES.get(signum, signum)
logger.info("Signal %s received, quitting "
"(this can take some time)...", signame)
cancel.set()

signal.signal(signal.SIGINT, cancel_execution)
Expand Down
52 changes: 9 additions & 43 deletions stacker/dag/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import collections
import logging
import threading
from threading import Thread
from copy import copy, deepcopy
from collections import deque

Expand Down Expand Up @@ -153,21 +153,13 @@ def walk(self, walk_func):
Args:
walk_func (:class:`types.FunctionType`): The function to be called
on each node of the graph.
Returns:
bool: True if the function succeeded on every node, otherwise
False.
"""
nodes = self.topological_sort()
# Reverse so we start with nodes that have no dependencies.
nodes.reverse()

failed = False
for n in nodes:
if not walk_func(n):
failed = True

return not failed
walk_func(n)

def rename_edges(self, old_node_name, new_node_name):
""" Change references to a node in existing edges.
Expand Down Expand Up @@ -383,25 +375,6 @@ def release(self):
pass


class Thread(threading.Thread):
"""Used when executing walk_func's in parallel, to provide access to the
return value.
"""
def __init__(self, *args, **kwargs):
super(Thread, self).__init__(*args, **kwargs)
self._return = None

def run(self):
if self._Thread__target is not None:
self._return = self._Thread__target(
*self._Thread__args,
**self._Thread__kwargs)

def join(self, *args, **kwargs):
super(Thread, self).join(*args, **kwargs)
return self._return


class ThreadedWalker(object):
"""A DAG walker that walks the graph as quickly as the graph topology
allows, using threads.
Expand Down Expand Up @@ -437,14 +410,14 @@ def walk(self, dag, walk_func):
# Blocks until all of the given nodes have completed execution (whether
# successfully, or errored). Returns True if all nodes returned True.
def wait_for(nodes):
return all(threads[node].join() for node in nodes)
for node in nodes:
thread = threads[node]
while thread.is_alive():
threads[node].join(0.5)

# For each node in the graph, we're going to allocate a thread to
# execute. The thread will block executing walk_func, until all of the
# nodes dependencies have executed successfully.
#
# If any node fails for some reason (e.g. raising an exception), any
# downstream nodes will be cancelled.
# nodes dependencies have executed.
for node in nodes:
def fn(n, deps):
if deps:
Expand All @@ -460,17 +433,10 @@ def fn(n, deps):

self.semaphore.acquire()
try:
ret = walk_func(n)
return walk_func(n)
finally:
self.semaphore.release()

if ret:
logger.debug("%s completed", n)
else:
logger.debug("%s failed", n)

return ret

deps = dag.all_downstreams(node)
threads[node] = Thread(target=fn, args=(node, deps), name=node)

Expand All @@ -479,4 +445,4 @@ def fn(n, deps):
threads[node].start()

# Wait for all threads to complete executing.
return wait_for(nodes)
wait_for(nodes)
2 changes: 1 addition & 1 deletion stacker/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ def __init__(self, failed_stacks, *args, **kwargs):
self.failed_stacks = failed_stacks

stack_names = ', '.join(stack.name for stack in failed_stacks)
message = "The following stacks failed: %s\n" % (stack_names,)
message = "The following stacks failed: %s" % (stack_names,)

super(PlanFailed, self).__init__(message, *args, **kwargs)

Expand Down
13 changes: 12 additions & 1 deletion stacker/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from .exceptions import (
CancelExecution,
GraphError,
PlanFailed,
)
from .ui import ui
from .dag import DAG, DAGValidationError, walk
Expand Down Expand Up @@ -345,7 +346,17 @@ def walk_func(step):
return self.graph.walk(walk, walk_func)

def execute(self, *args, **kwargs):
return self.walk(*args, **kwargs)
"""Walks each step in the underlying graph, and raises an exception if
any of the steps fail.
Raises:
PlanFailed: Raised if any of the steps fail.
"""
self.walk(*args, **kwargs)

failed_steps = [step for step in self.steps if step.status == FAILED]
if failed_steps:
raise PlanFailed(failed_steps)

def walk(self, walker):
"""Walks each step in the underlying graph, in topological order.
Expand Down
30 changes: 2 additions & 28 deletions stacker/tests/test_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,32 +83,7 @@ def walk_func(n):
nodes.append(n)
return True

ok = dag.walk(walk_func)
assert ok == True # noqa: E712
assert nodes == ['d', 'c', 'b', 'a'] or nodes == ['d', 'b', 'c', 'a']


@with_setup(blank_setup)
def test_walk_failed():
dag = DAG()

# b and c should be executed at the same time.
dag.from_dict({'a': ['b', 'c'],
'b': ['d'],
'c': ['d'],
'd': []})

nodes = []

def walk_func(n):
nodes.append(n)
return False

ok = dag.walk(walk_func)

# Only 2 should have been hit. The rest are canceled because they depend on
# the success of d.
assert ok == False # noqa: E712
dag.walk(walk_func)
assert nodes == ['d', 'c', 'b', 'a'] or nodes == ['d', 'b', 'c', 'a']


Expand Down Expand Up @@ -209,6 +184,5 @@ def walk_func(n):
lock.release()
return True

ok = walker.walk(dag, walk_func)
assert ok == True # noqa: E712
walker.walk(dag, walk_func)
assert nodes == ['d', 'c', 'b', 'a'] or nodes == ['d', 'b', 'c', 'a']
13 changes: 8 additions & 5 deletions stacker/tests/test_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from stacker.exceptions import (
CancelExecution,
GraphError,
PlanFailed,
)
from stacker.status import (
COMPLETE,
Expand Down Expand Up @@ -116,7 +117,7 @@ def fn(stack, status=None):
description="Test",
steps=[Step(vpc, fn), Step(db, fn), Step(app, fn)],
targets=['db.1'])
self.assertTrue(plan.execute(walk))
plan.execute(walk)

self.assertEquals(calls, [
'namespace-vpc.1', 'namespace-db.1'])
Expand All @@ -141,7 +142,8 @@ def fn(stack, status=None):
bastion_step = Step(bastion, fn)
plan = build_plan(description="Test", steps=[vpc_step, bastion_step])

plan.execute(walk)
with self.assertRaises(PlanFailed):
plan.execute(walk)

self.assertEquals(calls, ['namespace-vpc.1'])
self.assertEquals(vpc_step.status, FAILED)
Expand All @@ -166,7 +168,7 @@ def fn(stack, status=None):
bastion_step = Step(bastion, fn)

plan = build_plan(description="Test", steps=[vpc_step, bastion_step])
self.assertTrue(plan.execute(walk))
plan.execute(walk)

self.assertEquals(calls, ['namespace-vpc.1', 'namespace-bastion.1'])

Expand Down Expand Up @@ -195,7 +197,8 @@ def fn(stack, status=None):

plan = build_plan(description="Test", steps=[
vpc_step, bastion_step, db_step])
self.assertFalse(plan.execute(walk))
with self.assertRaises(PlanFailed):
plan.execute(walk)

calls.sort()

Expand All @@ -222,7 +225,7 @@ def fn(stack, status=None):

plan = build_plan(description="Test", steps=[
vpc_step, bastion_step])
self.assertTrue(plan.execute(walk))
plan.execute(walk)

self.assertEquals(calls, ['namespace-vpc.1', 'namespace-bastion.1'])

Expand Down
1 change: 1 addition & 0 deletions tests/suite.bats
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,7 @@ EOF
assert_has_line "${STACKER_NAMESPACE}-dependent-rollback-parent: submitted (rolling back new stack)"
assert_has_line "${STACKER_NAMESPACE}-dependent-rollback-parent: failed (rolled back new stack)"
assert_has_line "${STACKER_NAMESPACE}-dependent-rollback-child: failed (dependency has failed)"
assert_has_line "The following stacks failed: dependent-rollback-parent, dependent-rollback-child"
}

@test "stacker build - raw template" {
Expand Down

0 comments on commit 9e82548

Please sign in to comment.