Skip to content

Commit

Permalink
Merge pull request #244 from Parsely/feature/sparse_run_returns
Browse files Browse the repository at this point in the history
Add back sparse run, which relies on Storm's new Flux facility
  • Loading branch information
dan-blanchard committed Apr 19, 2016
2 parents 6014e83 + ff48d08 commit 96399a0
Show file tree
Hide file tree
Showing 17 changed files with 333 additions and 110 deletions.
3 changes: 0 additions & 3 deletions examples/kafka-jvm/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@
"streamparse-box"
],
"log": {
"path": "/var/log/streamparse",
"max_bytes": 1000000,
"backup_count": 10,
"level": "info"
},
"virtualenv_root": "/data/virtualenvs"
Expand Down
3 changes: 0 additions & 3 deletions examples/redis/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@
"streamparse-box"
],
"log": {
"path": "/var/log/streamparse",
"max_bytes": 1000000,
"backup_count": 10,
"level": "info"
},
"virtualenv_root": "/data/virtualenvs"
Expand Down
3 changes: 2 additions & 1 deletion examples/redis/project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
:target-path "_build"
:min-lein-version "2.0.0"
:jvm-opts ["-client"]
:dependencies [[org.apache.storm/storm-core "0.9.4"]]
:dependencies [[org.apache.storm/storm-core "0.10.0"]
[org.apache.storm/flux-core "0.10.0"]]
:jar-exclusions [#"log4j\.properties" #"backtype" #"trident" #"META-INF" #"meta-inf" #"\.yaml"]
:uberjar-exclusions [#"log4j\.properties" #"backtype" #"trident" #"META-INF" #"meta-inf" #"\.yaml"]
)
6 changes: 1 addition & 5 deletions examples/redis/src/bolts.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import os
from collections import Counter

from redis import StrictRedis
Expand All @@ -11,7 +10,6 @@ class WordCountBolt(Bolt):

def initialize(self, conf, ctx):
self.counter = Counter()
self.pid = os.getpid()
self.total = 0

def _increment(self, word, inc_by):
Expand All @@ -22,14 +20,12 @@ def process(self, tup):
word = tup.values[0]
self._increment(word, 10 if word == "dog" else 1)
if self.total % 1000 == 0:
self.logger.info("counted [{:,}] words [pid={}]".format(self.total,
self.pid))
self.logger.info("counted %i words", self.total)
self.emit([word, self.counter[word]])


class RedisWordCountBolt(WordCountBolt):
def initialize(self, conf, ctx):
self.pid = os.getpid()
self.redis = StrictRedis()
self.total = 0

Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ setuptools
simplejson
pystorm>=2.0.1
thriftpy>=0.3.2
pyyaml
5 changes: 3 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ def readme():
'prettytable',
'six>=1.5',
'simplejson',
'pystorm>=2.0.1',
'thriftpy>=0.3.2'
'pystorm>=3.0.0',
'thriftpy>=0.3.2',
'pyyaml'
]

if sys.version_info.major < 3:
Expand Down
3 changes: 2 additions & 1 deletion streamparse/bootstrap/project/project.jinja2.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
:target-path "_build"
:min-lein-version "2.0.0"
:jvm-opts ["-client"]
:dependencies [[org.apache.storm/storm-core "0.10.0"]]
:dependencies [[org.apache.storm/storm-core "0.10.0"]
[org.apache.storm/flux-core "0.10.0"]]
:jar-exclusions [#"log4j\.properties" #"backtype" #"trident" #"META-INF" #"meta-inf" #"\.yaml"]
:uberjar-exclusions [#"log4j\.properties" #"backtype" #"trident" #"META-INF" #"meta-inf" #"\.yaml"]
)
1 change: 0 additions & 1 deletion streamparse/cli/attach.py

This file was deleted.

44 changes: 42 additions & 2 deletions streamparse/cli/common.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,46 @@
"""
Functions for adding common CLI arguments to argparse sub-commands.
"""
import argparse
import copy


class _StoreDictAction(argparse.Action):
"""Action for storing key=val option strings as a single dict."""
def __init__(self, option_strings, dest, nargs=None, const=None,
default=None, type=None, choices=None, required=False,
help=None, metavar=None):
if nargs == 0:
raise ValueError('nargs for store_dict actions must be > 0')
if const is not None and nargs != '?':
raise ValueError('nargs must be "?" to supply const')
super(_StoreDictAction, self).__init__(option_strings=option_strings,
dest=dest,
nargs=nargs,
const=const,
default=default,
type=type,
choices=choices,
required=required,
help=help,
metavar=metavar)

def __call__(self, parser, namespace, values, option_string=None):
if getattr(namespace, self.dest, None) is None:
setattr(namespace, self.dest, {})
# Only doing a copy here because that's what _AppendAction does
items = copy.copy(getattr(namespace, self.dest))
key, val = values.split("=", 1)
try:
val = int(val)
except ValueError:
try:
val = float(val)
except ValueError:
pass
items[key] = val
setattr(namespace, self.dest, items)


def add_ackers(parser):
""" Add --ackers option to parser """
Expand Down Expand Up @@ -40,8 +80,8 @@ def add_options(parser):
""" Add --option options to parser """
parser.add_argument('-o', '--option',
dest='options',
action='append',
help='Topology option to use upon submit. For example,'
action=_StoreDictAction,
help='Topology option to pass on to Storm. For example,'
' "-o topology.debug=true" is equivalent to '
'"--debug". May be repeated multiple for multiple'
' options.')
Expand Down
1 change: 0 additions & 1 deletion streamparse/cli/logs.py

This file was deleted.

1 change: 0 additions & 1 deletion streamparse/cli/restart.py

This file was deleted.

85 changes: 85 additions & 0 deletions streamparse/cli/run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
"""
Run a local Storm topology.
"""

from __future__ import absolute_import, print_function

from tempfile import NamedTemporaryFile

import yaml
from fabric.api import local

from ..util import (get_topology_definition, get_topology_from_file,
local_storm_version, storm_lib_version)
from .common import (add_ackers, add_debug, add_environment, add_name,
add_options, add_par, add_workers, resolve_ackers_workers)
from .jar import jar_for_deploy


def run_local_topology(name=None, time=0, workers=2, ackers=2, options=None,
debug=False):
"""Run a topology locally using Flux and `storm jar`."""
storm_options = {'topology.workers': workers,
'topology.acker.executors': ackers,
'topology.debug': debug}
if debug:
storm_options['pystorm.log.level'] = 'debug'
name, topology_file = get_topology_definition(name)
topology_class = get_topology_from_file(topology_file)

# Check Storm version is the same
local_version = local_storm_version()
project_version = storm_lib_version()
if local_version != project_version:
raise ValueError('Local Storm version, {}, is not the same as the '
'version in your project.clj, {}. The versions must '
'match.'.format(local_version, project_version))

# Prepare a JAR that has Storm dependencies packaged
topology_jar = jar_for_deploy(simple_jar=False)

if options is not None:
storm_options.update(options)

if time <= 0:
time = 9223372036854775807 # Max long value in Java

# Write YAML file
with NamedTemporaryFile(suffix='.yaml', delete=False) as yaml_file:
topology_flux_dict = topology_class.to_flux_dict(name)
topology_flux_dict['config'] = storm_options
yaml.dump(topology_flux_dict, yaml_file)
cmd = ('storm jar {jar} org.apache.storm.flux.Flux --local --no-splash '
'--sleep {time} {yaml}'.format(jar=topology_jar,
time=time,
yaml=yaml_file.name))
local(cmd)


def subparser_hook(subparsers):
""" Hook to add subparser for this command. """
subparser = subparsers.add_parser('run',
description=__doc__,
help=main.__doc__)
subparser.set_defaults(func=main)
add_ackers(subparser)
add_debug(subparser)
add_environment(subparser)
add_name(subparser)
add_options(subparser)
add_par(subparser)
subparser.add_argument('-t', '--time',
default=0,
type=int,
help='Time (in seconds) to keep local cluster '
'running. If time <= 0, run indefinitely. '
'(default: %(default)s)')
add_workers(subparser)


def main(args):
""" Run the local topology with the given arguments """
resolve_ackers_workers(args)
run_local_topology(name=args.name, time=args.time, workers=args.workers,
ackers=args.ackers, options=args.options,
debug=args.debug)
82 changes: 26 additions & 56 deletions streamparse/cli/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,18 @@

from __future__ import absolute_import, print_function, unicode_literals

import importlib
import os
import sys
import time

import fabric
import simplejson as json
from fabric.api import env
from six import itervalues, string_types

from ..dsl.component import JavaComponentSpec
from ..dsl.topology import Topology, TopologyType
from ..thrift import storm_thrift
from ..util import (activate_env, get_config, get_env_config, get_nimbus_client,
get_topology_definition, ssh_tunnel)
get_topology_definition, get_topology_from_file, ssh_tunnel)
from .common import (add_ackers, add_debug, add_environment, add_name,
add_options, add_par, add_wait, add_workers,
resolve_ackers_workers)
Expand Down Expand Up @@ -70,27 +67,9 @@ def _kill_existing_topology(topology_name, force, wait, nimbus_client):
sys.stdout.flush()


def _get_topology_from_file(topology_file):
"""
Given a filename for a topology, import the topology and return the class
"""
topology_dir, mod_name = os.path.split(topology_file)
# Remove .py extension before trying to import
mod_name = mod_name[:-3]
sys.path.append(os.path.join(topology_dir, '..', 'src'))
sys.path.append(topology_dir)
mod = importlib.import_module(mod_name)
for attr in mod.__dict__.values():
if isinstance(attr, TopologyType) and attr != Topology:
topology_class = attr
break
else:
raise ValueError('Could not find topology subclass in topology module.')
return topology_class


def _submit_topology(topology_name, topology_class, uploaded_jar, env_config,
workers, ackers, nimbus_client, options=None, debug=False):
def _submit_topology(topology_name, topology_class, uploaded_jar, config,
env_config, workers, ackers, nimbus_client, options=None,
debug=False):
storm_options = {'topology.workers': workers,
'topology.acker.executors': ackers,
'topology.debug': debug}
Expand All @@ -117,25 +96,30 @@ def _submit_topology(topology_name, topology_class, uploaded_jar, env_config,
if isinstance(log_config.get("level"), string_types):
storm_options['pystorm.log.level'] = log_config["level"].lower()

if options is None:
options = []
for option in options:
key, val = option.split("=", 1)
try:
val = int(val)
except ValueError:
try:
val = float(val)
except ValueError:
pass
storm_options[key] = val
if options is not None:
storm_options.update(options)

serializer = env_config.get('serializer', config.get('serializer', None))
if serializer is not None:
# Set serializer arg in bolts
for thrift_bolt in itervalues(topology_class.thrift_bolts):
inner_shell = thrift_bolt.bolt_object.shell
if inner_shell is not None:
inner_shell.script = '-s {} {}'.format(serializer,
inner_shell.script)
# Set serializer arg in spouts
for thrift_spout in itervalues(topology_class.thrift_spouts):
inner_shell = thrift_spout.spout_object.shell
if inner_shell is not None:
inner_shell.script = '-s {} {}'.format(serializer,
inner_shell.script)

print("Submitting {} topology to nimbus...".format(topology_name), end='')
sys.stdout.flush()
nimbus_client.submitTopology(name=topology_name,
uploadedJarLocation=uploaded_jar,
jsonConf=json.dumps(storm_options),
topology=topology_class._topology)
topology=topology_class.thrift_topology)
print('done')


Expand Down Expand Up @@ -190,7 +174,7 @@ def submit_topology(name=None, env_name="prod", workers=2, ackers=2,
config = get_config()
name, topology_file = get_topology_definition(name)
env_name, env_config = get_env_config(env_name)
topology_class = _get_topology_from_file(topology_file)
topology_class = get_topology_from_file(topology_file)

# Check if we need to maintain virtualenv during the process
use_venv = env_config.get('use_virtualenv', True)
Expand Down Expand Up @@ -222,21 +206,6 @@ def submit_topology(name=None, env_name="prod", workers=2, ackers=2,
if 'streamparse_run' in inner_shell.execution_command:
inner_shell.execution_command = streamparse_run_path

serializer = env_config.get('serializer', config.get('serializer', None))
if serializer is not None:
# Set serializer arg in bolts
for thrift_bolt in itervalues(topology_class.thrift_bolts):
inner_shell = thrift_bolt.bolt_object.shell
if isinstance(inner_shell, ShellComponent):
inner_shell.script = '-s {} {}'.format(serializer,
inner_shell.script)
# Set serializer arg in spouts
for thrift_spout in itervalues(topology_class.thrift_spouts):
inner_shell = thrift_spout.spout_object.shell
if isinstance(inner_shell, ShellComponent):
inner_shell.script = '-s {} {}'.format(serializer,
inner_shell.script)

# Check topology for JVM stuff to see if we need to create uber-jar
if simple_jar:
simple_jar = not any(isinstance(spec, JavaComponentSpec)
Expand All @@ -252,8 +221,9 @@ def submit_topology(name=None, env_name="prod", workers=2, ackers=2,
nimbus_client = get_nimbus_client(env_config, host=host, port=port)
_kill_existing_topology(name, force, wait, nimbus_client)
uploaded_jar = _upload_jar(nimbus_client, topology_jar)
_submit_topology(name, topology_class, uploaded_jar, env_config, workers,
ackers, nimbus_client, options=options, debug=debug)
_submit_topology(name, topology_class, uploaded_jar, config, env_config,
workers, ackers, nimbus_client, options=options,
debug=debug)
_post_submit_hooks(name, env_name, env_config)


Expand Down
Loading

0 comments on commit 96399a0

Please sign in to comment.