Skip to content

Commit

Permalink
Make commands a lot more customizable.
Browse files Browse the repository at this point in the history
Remove `pset` and `argument` parameters, and instead parse the `command`
parameter for workflows. This allows to shuffle things around a little and
will be more flexible in the future. The following merge command would now
be possible:

    my_merge.py -o @outputfiles --inputs @inputfiles

While at it, pulling in datasets from DBS and splitting them on files broke
recently, fix it.

Fixes #207.
  • Loading branch information
matz-e committed Dec 12, 2017
1 parent a103623 commit 2ea4ea4
Show file tree
Hide file tree
Showing 8 changed files with 133 additions and 29 deletions.
57 changes: 57 additions & 0 deletions examples/script.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import datetime

from lobster import cmssw
from lobster.core import AdvancedOptions, Category, Config, StorageConfiguration, Workflow

version = datetime.datetime.now().strftime('%Y%m%d_%H%M')

storage = StorageConfiguration(
output=[
"hdfs://eddie.crc.nd.edu:19000/store/user/$USER/lobster_test_" + version,
"file:///hadoop/store/user/$USER/lobster_test_" + version,
# ND is not in the XrootD redirector, thus hardcode server.
# Note the double-slash after the hostname!
"root://deepthought.crc.nd.edu//store/user/$USER/lobster_test_" + version,
"chirp://eddie.crc.nd.edu:9094/store/user/$USER/lobster_test_" + version,
"gsiftp://T3_US_NotreDame/store/user/$USER/lobster_test_" + version,
"srm://T3_US_NotreDame/store/user/$USER/lobster_test_" + version
]
)

processing = Category(
name='processing',
cores=1,
runtime=900,
memory=1000
)

workflows = []

ttH = Workflow(
label='ttH',
dataset=cmssw.Dataset(
dataset='/ttHToNonbb_M125_13TeV_powheg_pythia8/RunIIFall15MiniAODv2-PU25nsData2015v1_76X_mcRun2_asymptotic_v12-v1/MINIAODSIM',
lumis_per_task=20,
file_based=True
),
category=processing,
command='root -b -q -l script_macro.C @outputfiles @inputfiles',
extra_inputs=['script_macro.C'],
publish_label='test',
merge_command='hadd @outputfiles @inputfiles',
merge_size='3.5G',
outputs=['output.root']
)

workflows.append(ttH)

config = Config(
workdir='/tmpscratch/users/$USER/lobster_test_' + version,
plotdir='~/www/lobster/test_' + version,
storage=storage,
workflows=workflows,
advanced=AdvancedOptions(
bad_exit_codes=[127, 160],
log_level=1
)
)
13 changes: 13 additions & 0 deletions examples/script_macro.C
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
void
script_macro() {
auto count = gApplication->Argc();
gApplication->GetOptions(&count, gApplication->Argv());
auto outname = gApplication->Argv(1);
TFile out(outname, "RECREATE");
TH1F hist("vertex_ndof", "", 500, -0.5, 499.5);
TChain c("Events");
for (int i = 2; i < count; ++i)
c.Add(gApplication->Argv(i));
c.Draw("recoVertexs_offlineSlimmedPrimaryVertices__PAT.obj.ndof_>>vertex_ndof");
hist.Write();
}
4 changes: 2 additions & 2 deletions examples/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@
label='ttH',
dataset=cmssw.Dataset(
dataset='/ttHToNonbb_M125_13TeV_powheg_pythia8/RunIIFall15MiniAODv2-PU25nsData2015v1_76X_mcRun2_asymptotic_v12-v1/MINIAODSIM',
events_per_task=5000
events_per_task=50000
),
category=processing,
pset='slim.py',
command='cmsRun simple_pset.py',
publish_label='test',
merge_size='3.5G',
outputs=['output.root']
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion lobster/cmssw/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ def query_database(self):
if not self.file_based:
self.__cache.cache(self.dataset, self.lumi_mask, baseinfo, result)

result.stop_on_file_boundary = (result.total_units != total_lumis)
result.stop_on_file_boundary = (result.total_units != total_lumis) and not self.file_based
if result.stop_on_file_boundary:
logger.debug("split lumis detected in {} - "
"{} unique (run, lumi) but "
Expand Down
37 changes: 34 additions & 3 deletions lobster/core/data/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,29 @@ def format(self, record):
"""


def expand_command(cmd, args, infiles, outfiles):
"""
Expand variables in a command list.
Do so by replacing `@args` with `args`, `@inputfiles` with `infiles`,
and `@outputfiles` with `outfiles`. Returns an expanded command list.
>>> expand_command(["foo", "@inputfiles", "--some-flag"], ["-a"], ["bar", "baz"])
["foo", "bar", "baz", "--some-flag"]
"""
def replace(xs, s, ys):
try:
idx = xs.index(s)
return xs[:idx] + ys + xs[idx + 1:]
except ValueError:
return xs

newcmd = replace(cmd, "@args", args)
newcmd = replace(newcmd, "@inputfiles", infiles)
newcmd = replace(newcmd, "@outputfiles", outfiles)
return newcmd


def run_subprocess(*args, **kwargs):
logger.info("executing '{}'".format(" ".join(*args)))

Expand Down Expand Up @@ -400,8 +423,12 @@ def copy_inputs(data, config, env):
# When the config specifies no "input," this implies to use
# AAA to access data in, e.g., DBS
if len(config['input']) == 0:
config['mask']['files'].append(file)
config['file map'][file] = file
if config['executable'] == 'cmsRun':
filename = file
else:
filename = "root://cmsxrootd.fnal.gov/" + file
config['mask']['files'].append(filename)
config['file map'][filename] = file
logger.info("AAA access to input file {} detected".format(file))
data['transfers']['root']['stage-in success'] += 1
continue
Expand Down Expand Up @@ -855,6 +882,7 @@ def get_bare_size(filename):
def run_command(data, config, env):
cmd = config['executable']
args = config['arguments']
unique = config.get('arguments_unique', [])
if 'cmsRun' in cmd:
pset = config['pset']
pset_mod = pset.replace(".py", "_mod.py")
Expand All @@ -870,10 +898,13 @@ def run_command(data, config, env):
cmd = shlex.split(cmd)
if os.path.isfile(cmd[0]):
cmd[0] = os.path.join(os.getcwd(), cmd[0])
cmd.extend([str(arg) for arg in args])

cmd.extend([str(arg) for arg in args])
if config.get('append inputs to args', False):
cmd.extend(unique)
cmd.extend([str(f) for f in config['mask']['files']])
else:
cmd = expand_command(cmd, unique, config['mask']['files'], [lf for lf, rf in config['output files']])

p = run_subprocess(cmd, env=env)
logger.info("executable returned with exit code {0}.".format(p.returncode))
Expand Down
47 changes: 25 additions & 22 deletions lobster/core/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,13 @@ class Workflow(Configurable):
:class:`ValueError` will be raised otherwise.
command : str
Which executable to run (for non-CMSSW workflows)
The executable string may contain `@args`, `@outputfiles`, and
`@inputfiles`, which will be replaced by unique arguments and
output as well as input files, respectively.
extra_inputs : list
Additional inputs outside the sandbox needed to process the
workflow.
arguments : list
Arguments to pass to the executable.
unique_arguments : list
A list of arguments. Each element of the dataset is processed
once for each argument in this list. The unique argument is
Expand All @@ -173,36 +175,34 @@ class Workflow(Configurable):
local : bool
If set to `True`, Lobster will assume this workflow's input is
present on the output storage element.
pset : str
The CMSSW configuration to use, if any.
globaltag : str
Which GlobalTag this workflow uses. Needed for publication of
CMSSW workflows, and can be automatically determined for these.
merge_command : str
Accepts `cmsRun`, `hadd`, or a custom command. Tells Lobster what
command to use for merging. If outputs are autodetermined
(`outputs=None`), cmssw will be used for EDM output and hadd will
be used otherwise. Custom commands should accept the output file
as the first argument followed by one or more input files to merge.
"""
be used otherwise.
See the specification for the `command` parameter about passing
input and output file values.
"""
_mutable = {}

def __init__(self,
label,
dataset,
command,
category=Category('default', mode='fixed'),
publish_label=None,
cleanup_input=False,
merge_size=-1,
sandbox=None,
command='cmsRun',
extra_inputs=None,
arguments=None,
unique_arguments=None,
extra_inputs=None,
outputs=None,
output_format="{base}_{id}.{ext}",
local=False,
pset=None,
globaltag=None,
merge_command='cmsRun'):
self.label = label
Expand All @@ -216,9 +216,12 @@ def __init__(self,
self.merge_size = self.__check_merge(merge_size)
self.cleanup_input = cleanup_input

self.command = command
self.arguments = shlex.split(command)
self.command = self.arguments.pop(0)
self.pset = None
if self.command == 'cmsRun':
self.pset = self.arguments.pop(0)
self.extra_inputs = extra_inputs if extra_inputs else []
self.arguments = arguments if arguments else []
if unique_arguments:
if any(x is None for x in unique_arguments):
raise ValueError("Unique arguments should not be None")
Expand All @@ -233,10 +236,10 @@ def __init__(self,
if hasattr(dataset, 'parent'):
self.parent = dataset.parent

self.pset = pset
self.globaltag = globaltag
self.local = local or hasattr(dataset, 'files')
self.merge_command = merge_command
self.merge_args = shlex.split(merge_command)
self.merge_command = self.merge_args.pop(0)

from lobster.cmssw.sandbox import Sandbox
self.sandbox = sandbox or Sandbox()
Expand Down Expand Up @@ -338,6 +341,7 @@ def determine_outputs(self, basedirs):
if 'TFileService' in process.services:
self.outputs.append(process.services['TFileService'].fileName.value().replace('file:', ''))
self.merge_command = 'hadd'
self.merge_args = ['@outputfiles', '@inputfiles']

logger.info("workflow {0}: adding output file(s) '{1}'".format(self.label, ', '.join(self.outputs)))

Expand Down Expand Up @@ -442,18 +446,17 @@ def adjust(self, params, env, taskdir, inputs, outputs, merge, reports=None, uni
inputs.append((os.path.join(os.path.dirname(__file__), 'data', 'task.py'), 'task.py', True))
inputs.extend((r, "_".join(os.path.normpath(r).split(os.sep)[-3:]), False) for r in reports)

if self.merge_command == 'cmsRun':
cmd = self.merge_command
if cmd == 'cmsRun':
args = ['outputFile=' + self.outputs[0]]
pset = os.path.join(os.path.dirname(__file__), 'data', 'merge_cfg.py')
else:
cmd = self.merge_command
args = [self.outputs[0]]
if self.merge_command == 'hadd':
args = self.merge_args
if cmd == 'hadd':
args = ['-n', '0', '-f'] + args
else:
inputs.extend((i, os.path.basename(i), True) for i in self.extra_inputs)
pset = None
params['append inputs to args'] = True

params['prologue'] = None
params['epilogue'] = ['python', 'merge_reports.py', 'report.json'] \
Expand All @@ -462,7 +465,7 @@ def adjust(self, params, env, taskdir, inputs, outputs, merge, reports=None, uni
inputs.extend((i, os.path.basename(i), True) for i in self.extra_inputs)

if unique:
args.extend(shlex.split(unique))
params['arguments_unique'] = shlex.split(unique)
if pset:
pset = os.path.join(self.workdir, pset)
if self.category.runtime:
Expand All @@ -476,7 +479,7 @@ def adjust(self, params, env, taskdir, inputs, outputs, merge, reports=None, uni
outputs.append((os.path.join(taskdir, 'report.xml.gz'), 'report.xml.gz'))

params['pset'] = os.path.basename(pset)
else:
elif '@args' not in args and '@inputfiles' not in args and '@outputfiles' not in args:
params['append inputs to args'] = True

params['executable'] = cmd
Expand Down
2 changes: 1 addition & 1 deletion lobster/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from email.mime.text import MIMEText
from pkg_resources import get_distribution

VERSION = "1.7"
VERSION = "1.8"

logger = logging.getLogger('lobster.util')

Expand Down

0 comments on commit 2ea4ea4

Please sign in to comment.