Skip to content

Commit

Permalink
Fix a bunch of issues with ssh tunneling in 3.0.0.dev0
Browse files Browse the repository at this point in the history
- Commands that needed ssh tunnels weren't previously using them anymore.
- If you try to create the same tunnel twice, it just ignores the second time.
- Output from virtualenv creation is now displayed properly
  • Loading branch information
dan-blanchard committed Mar 17, 2016
1 parent 441be29 commit 71e1424
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 71 deletions.
4 changes: 1 addition & 3 deletions streamparse/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@

import logging

from . import (bolt, cli, component, contextmanagers, decorators, dsl, spout,
storm)
from . import bolt, cli, component, decorators, dsl, spout, storm
from .dsl import Grouping, Stream, Topology
from .storm import (BatchingBolt, Bolt, JavaBolt, JavaSpout, ShellBolt,
ShellSpout, Spout, StormHandler, TicklessBatchingBolt,
Expand All @@ -26,7 +25,6 @@
'bolt',
'cli',
'component',
'contextmanagers',
'decorators',
'dsl',
'Grouping',
Expand Down
4 changes: 2 additions & 2 deletions streamparse/cli/kill.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ def kill_topology(topology_name=None, env_name=None, wait=None):
topology_name = get_topology_definition(topology_name)[0]
env_name, env_config = get_env_config(env_name)
# Use ssh tunnel with Nimbus if use_ssh_for_nimbus is unspecified or True
with ssh_tunnel(env_config):
nimbus_client = get_nimbus_client(env_config)
with ssh_tunnel(env_config) as (host, port):
nimbus_client = get_nimbus_client(env_config, host=host, port=port)
return _kill_topology(topology_name, nimbus_client, wait=wait)


Expand Down
4 changes: 2 additions & 2 deletions streamparse/cli/list.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ def list_topologies(env_name):
"""Prints out all running Storm topologies"""
env_name, env_config = get_env_config(env_name)
# Use ssh tunnel with Nimbus if use_ssh_for_nimbus is unspecified or True
with ssh_tunnel(env_config):
nimbus_client = get_nimbus_client(env_config)
with ssh_tunnel(env_config) as (host, port):
nimbus_client = get_nimbus_client(env_config, host=host, port=port)
topologies = _list_topologies(nimbus_client)
if not topologies:
print('No topologies found.')
Expand Down
40 changes: 25 additions & 15 deletions streamparse/cli/slot_usage.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from six import iteritems

from .common import add_environment
from ..util import get_ui_json, storm_lib_version
from ..util import get_ui_json, get_ui_jsons, storm_lib_version


def subparser_hook(subparsers):
Expand All @@ -25,29 +25,39 @@ def subparser_hook(subparsers):

def display_slot_usage(env_name):
print('Querying Storm UI REST service for slot usage stats (this can take a while)...')
topology_summary = '/api/v1/topology/summary'
topology_detail = '/api/v1/topology/{topology}'
component = '/api/v1/topology/{topology}/component/{component}'
topo_summary_json = get_ui_json(env_name, topology_summary)
topology_summary_path = '/api/v1/topology/summary'
topology_detail_path = '/api/v1/topology/{topology}'
component_path = '/api/v1/topology/{topology}/component/{component}'
topo_summary_json = get_ui_json(env_name, topology_summary_path)
topology_ids = [x['id'] for x in topo_summary_json['topologies']]
# Keep track of the number of workers used by each topology on each machine
topology_worker_ports = defaultdict(lambda: defaultdict(set))
topology_executor_counts = defaultdict(Counter)
topology_names = set()
topology_components = dict()
topology_detail_jsons = get_ui_jsons(env_name,
(topology_detail_path.format(topology=topology)
for topology in topology_ids))

for topology in topology_ids:
topology_detail_json = get_ui_json(env_name,
topology_detail.format(topology=topology))
topology_detail_json = topology_detail_jsons[topology_detail_path.format(topology=topology)]
spouts = [x['spoutId'] for x in topology_detail_json['spouts']]
bolts = [x['boltId'] for x in topology_detail_json['bolts']]
for comp in spouts + bolts:
comp_detail = get_ui_json(env_name,
component.format(topology=topology,
component=comp))
for worker in comp_detail['executorStats']:
topology_worker_ports[worker['host']][topology_detail_json['name']].add(worker['port'])
topology_executor_counts[worker['host']][topology_detail_json['name']] += 1
topology_names.add(topology_detail_json['name'])
topology_components[topology] = spouts + bolts

comp_details = get_ui_jsons(env_name,
(component_path.format(topology=topology,
component=comp)
for topology, comp_list in iteritems(topology_components)
for comp in comp_list))

for request_url, comp_detail in iteritems(comp_details):
topology = request_url.split('/')[4]
topology_detail_json = topology_detail_jsons[topology_detail_path.format(topology=topology)]
for worker in comp_detail['executorStats']:
topology_worker_ports[worker['host']][topology_detail_json['name']].add(worker['port'])
topology_executor_counts[worker['host']][topology_detail_json['name']] += 1
topology_names.add(topology_detail_json['name'])

print("# Slot (and Executor) Counts by Topology")
topology_names = sorted(topology_names)
Expand Down
14 changes: 11 additions & 3 deletions streamparse/cli/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import sys
import time

import fabric
import simplejson as json
from fabric.api import env
from six import itervalues, string_types
Expand All @@ -17,7 +18,7 @@
from ..dsl.topology import Topology, TopologyType
from ..thrift import storm_thrift
from ..util import (activate_env, get_config, get_env_config, get_nimbus_client,
get_topology_definition, is_ssh_for_nimbus, ssh_tunnel)
get_topology_definition, ssh_tunnel)
from .common import (add_ackers, add_debug, add_environment, add_name,
add_options, add_par, add_wait, add_workers,
resolve_ackers_workers)
Expand Down Expand Up @@ -117,6 +118,13 @@ def _submit_topology(topology_name, topology_class, uploaded_jar, env_config,
options = []
for option in options:
key, val = option.split("=", 1)
try:
val = int(val)
except ValueError:
try:
val = float(val)
except ValueError:
pass
storm_options[key] = val

print("Submitting {} topology to nimbus...".format(topology_name), end='')
Expand Down Expand Up @@ -218,8 +226,8 @@ def submit_topology(name=None, env_name="prod", workers=2, ackers=2,
print('Deploying "{}" topology...'.format(name))
sys.stdout.flush()
# Use ssh tunnel with Nimbus if use_ssh_for_nimbus is unspecified or True
with ssh_tunnel(env_config):
nimbus_client = get_nimbus_client(env_config)
with ssh_tunnel(env_config) as (host, port):
nimbus_client = get_nimbus_client(env_config, host=host, port=port)
_kill_existing_topology(name, force, wait, nimbus_client)
uploaded_jar = _upload_jar(nimbus_client, topology_jar)
_submit_topology(name, topology_class, uploaded_jar, env_config, workers,
Expand Down
38 changes: 20 additions & 18 deletions streamparse/cli/update_virtualenv.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
import os
from io import open

from fabric.api import env, execute, parallel, prefix, put, puts, run
import fabric
from fabric.api import env, execute, parallel, prefix, put, puts, run, show
from fabric.contrib.files import exists

from .common import add_environment, add_name
Expand All @@ -21,23 +22,24 @@ def _create_or_update_virtualenv(virtualenv_root,
virtualenv_name,
requirements_file,
virtualenv_flags=None):
virtualenv_path = os.path.join(virtualenv_root, virtualenv_name)
if not exists(virtualenv_path):
if virtualenv_flags is None:
virtualenv_flags = ''
puts("virtualenv not found in {}, creating one.".format(virtualenv_root))
run("virtualenv {} {}".format(virtualenv_path, virtualenv_flags))

puts("Uploading requirements.txt to temporary file.")
tmpfile = run("mktemp /tmp/streamparse_requirements-XXXXXXXXX.txt")
put(requirements_file, tmpfile)

puts("Updating virtualenv: {}".format(virtualenv_name))
cmd = "source {}".format(os.path.join(virtualenv_path, 'bin/activate'))
with prefix(cmd):
run("pip install -r {} --exists-action w".format(tmpfile))

run("rm {}".format(tmpfile))
with show('output'):
virtualenv_path = os.path.join(virtualenv_root, virtualenv_name)
if not exists(virtualenv_path):
if virtualenv_flags is None:
virtualenv_flags = ''
puts("virtualenv not found in {}, creating one.".format(virtualenv_root))
run("virtualenv {} {}".format(virtualenv_path, virtualenv_flags))

puts("Uploading requirements.txt to temporary file.")
tmpfile = run("mktemp /tmp/streamparse_requirements-XXXXXXXXX.txt")
put(requirements_file, tmpfile)

puts("Updating virtualenv: {}".format(virtualenv_name))
cmd = "source {}".format(os.path.join(virtualenv_path, 'bin/activate'))
with prefix(cmd):
run("pip install -r {} --exists-action w".format(tmpfile))

run("rm {}".format(tmpfile))


def create_or_update_virtualenvs(env_name, topology_name, requirements_file,
Expand Down
38 changes: 24 additions & 14 deletions streamparse/cli/worker_uptime.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@

from pkg_resources import parse_version
from prettytable import PrettyTable
from six import iteritems, itervalues

from .common import add_environment
from ..util import get_ui_json, storm_lib_version
from ..util import get_ui_json, get_ui_jsons, storm_lib_version


def subparser_hook(subparsers):
Expand All @@ -21,25 +22,34 @@ def subparser_hook(subparsers):


def display_worker_uptime(env_name):
topology_summary = '/api/v1/topology/summary'
topology_detail = '/api/v1/topology/{topology}'
component = '/api/v1/topology/{topology}/component/{component}'
topo_summary_json = get_ui_json(env_name, topology_summary)
topology_summary_path = '/api/v1/topology/summary'
topology_detail_path = '/api/v1/topology/{topology}'
component_path = '/api/v1/topology/{topology}/component/{component}'
topo_summary_json = get_ui_jsons(env_name, topology_summary_path)
topology_ids = [x['id'] for x in topo_summary_json['topologies']]
topology_components = dict()
worker_stats = []

topology_detail_jsons = get_ui_jsons(env_name,
(topology_detail_path.format(topology=topology)
for topology in topology_ids))

for topology in topology_ids:
topology_detail_json = get_ui_json(env_name,
topology_detail.format(topology=topology))
topology_detail_json = topology_detail_jsons[topology_detail_path.format(topology=topology)]
spouts = [x['spoutId'] for x in topology_detail_json['spouts']]
bolts = [x['boltId'] for x in topology_detail_json['bolts']]
for comp in spouts + bolts:
comp_detail = get_ui_json(env_name,
component.format(topology=topology,
component=comp))
worker_stats += [(worker['host'], worker['id'], worker['uptime'],
worker['workerLogLink']) for worker in
comp_detail['executorStats']]
topology_components[topology] = spouts + bolts

comp_details = get_ui_jsons(env_name,
(component_path.format(topology=topology,
component=comp)
for topology, comp_list in iteritems(topology_components)
for comp in comp_list))

for comp_detail in itervalues(comp_details):
worker_stats += [(worker['host'], worker['id'], worker['uptime'],
worker['workerLogLink']) for worker in
comp_detail['executorStats']]
worker_stats = sorted(set(worker_stats))

print("# Worker Stats")
Expand Down
40 changes: 26 additions & 14 deletions streamparse/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,28 @@ def _port_in_use(port, server_type="tcp"):
return False


_active_tunnel = None


@contextmanager
def ssh_tunnel(env_config, local_port=6627, remote_port=None):
def ssh_tunnel(env_config, local_port=6627, remote_port=None, quiet=False):
"""Setup an optional ssh_tunnel to Nimbus.
If use_ssh_for_nimbus is False, no tunnel will be created.
"""
host, nimbus_port = get_nimbus_host_port(env_config)
if remote_port is None:
remote_port = nimbus_port
if env_config.get('use_ssh_for_nimbus', True):
host, nimbus_port = get_nimbus_host_port(env_config)
if remote_port is None:
remote_port = nimbus_port
global _active_tunnel
if _port_in_use(local_port):
raise IOError("Local port: {} already in use, unable to open ssh "
"tunnel to {}:{}.".format(local_port, host, remote_port))
if local_port == _active_tunnel:
yield 'localhost', local_port
else:
raise IOError("Local port: {} already in use, unable to open "
"ssh tunnel to {}:{}.".format(local_port,
host,
remote_port))

user = env_config.get("user")
if user:
Expand All @@ -80,14 +89,16 @@ def ssh_tunnel(env_config, local_port=6627, remote_port=None):
.format(" ".join(ssh_cmd)))
time.sleep(0.2)
try:
print("ssh tunnel to Nimbus {}:{} established.".format(host,
remote_port))
yield
if not quiet:
print("ssh tunnel to Nimbus {}:{} established."
.format(host, remote_port))
_active_tunnel = local_port
yield 'localhost', local_port
finally:
ssh_proc.kill()
# Do nothing if we're not supposed to use ssh
else:
yield
yield host, remote_port


def activate_env(env_name=None):
Expand Down Expand Up @@ -278,11 +289,12 @@ def get_ui_jsons(env_name, api_paths):
for local_port in local_ports:
try:
data = {}
with ssh_tunnel(env_config.get("user"), host, local_port,
remote_ui_port):
with ssh_tunnel(env_config, local_port=local_port,
remote_port=remote_ui_port) as (host, local_port):
for api_path in api_paths:
r = requests.get('http://127.0.0.1:%s%s' % (local_port,
api_path))
r = requests.get('http://{}:{}{}'.format(host,
local_port,
api_path))
data[api_path] = r.json()
return data
except Exception as e:
Expand Down

0 comments on commit 71e1424

Please sign in to comment.