Skip to content

Commit

Permalink
Use xrootd servers as specified in siteconf.
Browse files Browse the repository at this point in the history
Also implement some unit tests for the task running script. Fixes #619.
  • Loading branch information
matz-e committed Dec 13, 2017
1 parent ba1ef01 commit 39436d4
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 82 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
*.pyc
*.swp
*.swo
*~
Session.vim
Lobster.egg-info
dist/
Expand Down
Empty file added lobster/core/data/__init__.py
Empty file.
177 changes: 95 additions & 82 deletions lobster/core/data/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import tempfile
import time
import traceback
import xml.dom.minidom

sys.path.append('python')

Expand Down Expand Up @@ -58,9 +59,6 @@ def __call__(self, params):
dashboard.apMonSend(params)


monitor = Dash()


class Mangler(logging.Formatter):

def __init__(self):
Expand All @@ -84,16 +82,6 @@ def format(self, record):
return fmt.format(chevron=chevron, message=record.msg, date=time.strftime("%c"), context=self.context)


mangler = Mangler()

console = logging.StreamHandler()
console.setFormatter(mangler)

logger = logging.getLogger('prawn')
logger.addHandler(console)
logger.propagate = False
logger.setLevel(logging.DEBUG)

fragment = """
import FWCore.ParameterSet.Config as cms
process.Timing = cms.Service("Timing",
Expand Down Expand Up @@ -161,14 +149,10 @@ def format(self, record):


def expand_command(cmd, args, infiles, outfiles):
"""
Expand variables in a command list.
"""Expand variables in a command list.
Do so by replacing `@args` with `args`, `@inputfiles` with `infiles`,
and `@outputfiles` with `outfiles`. Returns an expanded command list.
>>> expand_command(["foo", "@inputfiles", "--some-flag"], ["-a"], ["bar", "baz"])
["foo", "bar", "baz", "--some-flag"]
"""
def replace(xs, s, ys):
try:
Expand All @@ -183,6 +167,20 @@ def replace(xs, s, ys):
return newcmd


def find_xrootd_server(filename):
"""Find the leading XRootD server in `filename` and return it.
"""
fakepath = '/store/user/foo/bar.root'
doc = xml.dom.minidom.parse(filename)
for e in doc.getElementsByTagName("lfn-to-pfn"):
if e.attributes["protocol"].value != "xrootd":
continue
m = re.match(e.attributes['path-match'].value, fakepath)
if not m:
continue
return e.attributes["result"].value.replace('$1', m.group(1)).replace(fakepath, '')


def run_subprocess(*args, **kwargs):
logger.info("executing '{}'".format(" ".join(*args)))

Expand Down Expand Up @@ -408,6 +406,8 @@ def copy_inputs(data, config, env):
fast_track = False
successes = defaultdict(int)

default_xrootd_server = find_xrootd_server('/cvmfs/cms.cern.ch/SITECONF/local/PhEDEx/storage.xml')

for file in files:
# If the file has been transferred by WQ, there's no need to
# monkey around with the input list
Expand All @@ -426,7 +426,7 @@ def copy_inputs(data, config, env):
if config['executable'] == 'cmsRun':
filename = file
else:
filename = "root://cmsxrootd.fnal.gov/" + file
filename = default_xrootd_server + file
config['mask']['files'].append(filename)
config['file map'][filename] = file
logger.info("AAA access to input file {} detected".format(file))
Expand Down Expand Up @@ -1059,66 +1059,79 @@ def write_zipfiles(data):
zipf.close()


data = {
'files': {
'info': {},
'output_info': {},
'skipped': [],
},
'cache': {
'start_size': 0,
'end_size': 0,
'type': 2,
},
'task_exit_code': 0,
'exe_exit_code': 0,
'stageout_exit_code': 0,
'cpu_time': 0,
'events_written': 0,
'output_size': 0,
'output_bare_size': 0,
'output_storage_element': '',
'task_timing': {
'stage_in_end': 0,
'prologue_end': 0,
'wrapper_start': 0,
'wrapper_ready': 0,
'processing_end': 0,
'epilogue_end': 0,
'stage_out_end': 0,
},
'events_per_run': 0,
'transfers': defaultdict(Counter)
}

configfile = sys.argv[1]
with open(configfile) as f:
config = json.load(f)

monitor.configure(config)

atexit.register(send_final_dashboard_update, data, config)
atexit.register(write_report, data)
atexit.register(write_zipfiles, data)

logger.info('data is {0}'.format(str(data)))
env = os.environ
env['X509_USER_PROXY'] = 'proxy'

extract_wrapper_times(data)
copy_inputs(data, config, env)

logger.info("updated parameters are")
with mangler.output("json"):
for l in json.dumps(config, sort_keys=True, indent=2).splitlines():
logger.debug(l)

send_initial_dashboard_update(data, config)

run_prologue(data, config, env)
run_command(data, config, env)
run_epilogue(data, config, env)

copy_outputs(data, config, env)
check_outputs(data, config)
check_parrot_cache(data)
if __name__ == '__main__':
monitor = Dash()
mangler = Mangler()

console = logging.StreamHandler()
console.setFormatter(mangler)

logger = logging.getLogger('prawn')
logger.addHandler(console)
logger.propagate = False
logger.setLevel(logging.DEBUG)


data = {
'files': {
'info': {},
'output_info': {},
'skipped': [],
},
'cache': {
'start_size': 0,
'end_size': 0,
'type': 2,
},
'task_exit_code': 0,
'exe_exit_code': 0,
'stageout_exit_code': 0,
'cpu_time': 0,
'events_written': 0,
'output_size': 0,
'output_bare_size': 0,
'output_storage_element': '',
'task_timing': {
'stage_in_end': 0,
'prologue_end': 0,
'wrapper_start': 0,
'wrapper_ready': 0,
'processing_end': 0,
'epilogue_end': 0,
'stage_out_end': 0,
},
'events_per_run': 0,
'transfers': defaultdict(Counter)
}

configfile = sys.argv[1]
with open(configfile) as f:
config = json.load(f)

monitor.configure(config)

atexit.register(send_final_dashboard_update, data, config)
atexit.register(write_report, data)
atexit.register(write_zipfiles, data)

logger.info('data is {0}'.format(str(data)))
env = os.environ
env['X509_USER_PROXY'] = 'proxy'

extract_wrapper_times(data)
copy_inputs(data, config, env)

logger.info("updated parameters are")
with mangler.output("json"):
for l in json.dumps(config, sort_keys=True, indent=2).splitlines():
logger.debug(l)

send_initial_dashboard_update(data, config)

run_prologue(data, config, env)
run_command(data, config, env)
run_epilogue(data, config, env)

copy_outputs(data, config, env)
check_outputs(data, config)
check_parrot_cache(data)
44 changes: 44 additions & 0 deletions test/data/siteconf/PhEDEx/storage.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
<!-- storage.xml for T3_US_NotreDame
Storage element: deepthought.crc.nd.edu
Storage software: bestman 2.2.1.2
Phedex host: deepthought.crc.nd.edu
All CMS data are stored using the /store convention
Therefore we only need to map:
/+store/(.*)
-->
<storage-mapping>

<!-- Translation rules for LFN to PFN (Logical File Name to Physical File Name
Bestman does not support the srm v1 protocol so we fudge it with a file result
-->

<lfn-to-pfn protocol="file" destination-match=".*" path-match="/+store/(.*)" result="file:/hadoop/store/$1"/>
<lfn-to-pfn protocol="direct" destination-match=".*" path-match="/+store/(.*)" result="/hadoop/store/$1"/>
<!-- Old bestman configuration. Replaced with gsiftp
<lfn-to-pfn protocol="srmv2" destination-match=".*" path-match="/+store/(.*)" result="srm://deepthought.crc.nd.edu:8443/srm/v2/server?SFN=/hadoop/store/$1"/>
-->
<lfn-to-pfn protocol="srmv2" destination-match=".*" path-match="/+store/(.*)" result="gsiftp://deepthought.crc.nd.edu/store/$1"/>
<lfn-to-pfn protocol="gsiftp" destination-match=".*" path-match="/+store/(.*)" result="gsiftp://deepthought.crc.nd.edu/store/$1"/>
<!-- Xrootd fallback rules -->
<lfn-to-pfn protocol="xrootd" destination-match=".*" path-match="/+store/(.*)" result="root://ndcms.crc.nd.edu//store/$1"/>
<lfn-to-pfn protocol="xrootd-fallback1" destination-match=".*" path-match="/+store/(.*)" result="root://cmsxrootd.fnal.gov//store/$1"/>
<!-- Hadoop protocol rules-->
<lfn-to-pfn protocol="hadoop" destination-match=".*" path-match="//+store/(.*)" result="/hadoop/store/$1"/>
<lfn-to-pfn protocol="hadoop" destination-match=".*" path-match="//+store/user/(.*)" result="/hadoop/store/user/$1"/>

<!-- Translation rules for PFN to LFN (Physical File Name to Logical File Name -->

<pfn-to-lfn protocol="file" destination-match=".*" path-match="file:/hadoop/store/(.*)" result="/store/$1"/>
<pfn-to-lfn protocol="direct" destination-match=".*" path-match="/hadoop/store/(.*)" result="/store/$1"/>
<pfn-to-lfn protocol="srm" destination-match=".*" path-match="file:/hadoop/store/(.*)" result="/store/$1"/>
<!-- Old bestman configuration. Replaced with gsiftp
<pfn-to-lfn protocol="srmv2" destination-match=".*" path-match=".*\?SFN=/hadoop/store/(.*)" result="/store/$1"/>
-->
<pfn-to-lfn protocol="srmv2" destination-match=".*" path-match="gsiftp://deepthought.crc.nd.edu/store/(.*)" result="/store/$1"/>
<pfn-to-lfn protocol="gsiftp" destination-match=".*" path-match="gsiftp://deepthought.crc.nd.edu/store/(.*)" result="/store/$1"/>
<pfn-to-lfn protocol="hadoop" destination-match=".*" path-match="/hadoop/store/(.*)" result="/store/$1"/>

</storage-mapping>
21 changes: 21 additions & 0 deletions test/test_core_data_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import os

from lobster.core.data import task


class TestCommands(object):

def test_expansion(self):
cmd = ["foo", "@inputfiles", "--some-flag"]
args = ["-a"]
infiles = ["bar", "baz"]
outfiles = []
result = ["foo", "bar", "baz", "--some-flag"]
assert task.expand_command(cmd, args, infiles, outfiles) == result


class TestDiscovery(object):

def test_xrootd_server(self):
fn = os.path.join(os.path.dirname(__file__), 'data', 'siteconf', 'PhEDEx', 'storage.xml')
assert task.find_xrootd_server(fn) == 'root://ndcms.crc.nd.edu/'

0 comments on commit 39436d4

Please sign in to comment.