Skip to content

Commit

Permalink
Merge pull request #627 from NDCMS/fix-output-errors
Browse files Browse the repository at this point in the history
Fix output errors
  • Loading branch information
klannon authored Mar 13, 2018
2 parents 0b5deeb + 76c2d4c commit c601ae9
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 49 deletions.
6 changes: 3 additions & 3 deletions docs/install.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ Dependencies
Cooperative Computing Lab`_ and install them by unpacking the tarball and
adding the `bin` directory to your path::

export cctools=lobster-149-cdaaaac6-cvmfs-70dfa0d6
export cctools=lobster-154-16b67876-cvmfs-70dfa0d6
wget -O - http://ccl.cse.nd.edu/software/files/${cctools}-x86_64-redhat6.tar.gz
export PATH=$PWD/${cctools}-x86_64-redhat6/bin:$PATH
export PYTHONPATH=$PWD/${cctools}-x86_64-redhat6/lib/python2.6/site-packages:$PYTHONPATH

.. note::
At Notre Dame, a development version can be accessed via::

export cctools=lobster-149-cdaaaac6-cvmfs-70dfa0d6
export cctools=lobster-154-16b67876-cvmfs-70dfa0d6
export PYTHONPATH=$PYTHONPATH:/afs/crc.nd.edu/group/ccl/software/x86_64/redhat6/cctools/$cctools/lib/python2.6/site-packages
export PATH=/afs/crc.nd.edu/group/ccl/software/x86_64/redhat6/cctools/$cctools/bin:$PATH

Expand Down Expand Up @@ -96,6 +96,6 @@ easy modification of the source::
.. [#ftools] ``tcsh`` users should use the following to access the
`cctools` development version at Notre Dame::
setenv cctools lobster-148-c1a7ecbd-cvmfs-0941e442
setenv cctools lobster-154-16b67876-cvmfs-70dfa0d6
setenv PYTHONPATH ${PYTHONPATH}:/afs/crc.nd.edu/group/ccl/software/x86_64/redhat6/cctools/$cctools/lib/python2.6/site-packages
setenv PATH /afs/crc.nd.edu/group/ccl/software/x86_64/redhat6/cctools/$cctools/bin:${PATH}
2 changes: 1 addition & 1 deletion install_dependencies.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ fi

(
cd $VIRTUAL_ENV/src
wget -O - http://ccl.cse.nd.edu/software/files/cctools-lobster-142-55035a54-cvmfs-40cf5bba-source.tar.gz|tar xzf -
wget -O - http://ccl.cse.nd.edu/software/files/cctools-lobster-154-16b67876-cvmfs-70dfa0d6-source.tar.gz|tar xzf -
cd cctools*
sed -i 's/\(config_perl_path\)=auto/\1=no/' ./configure
./configure --prefix $VIRTUAL_ENV
Expand Down
File renamed without changes.
153 changes: 114 additions & 39 deletions lobster/core/data/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,10 +303,10 @@ def compare(stat, file):
if int(size) == int(match.groups()[0]):
return True
else:
logger.error("size mismatch after transfer")
logger.debug("remote size: {0}".format(match.groups()[0]))
logger.debug("local size: {0}".format(size))
return False
errorMsg = 'size mismatch after transfer\n'
errorMsg += ' remote size: {0}\n'.format(match.groups()[0])
errorMsg += ' local size: {0}\n'.format(size)
raise RuntimeError(errorMsg)
else:
raise RuntimeError('checking output for {0} failed: {1}'.format(file, stat))

Expand Down Expand Up @@ -353,13 +353,55 @@ def compare(stat, file):
return compare(p.stdout, localname)
except RuntimeError as e:
logger.error(e)
elif output.startswith("hdfs://"):
server, path = re.match("hdfs://([a-zA-Z0-9:.\-]+)/(.*)", output).groups()
timeout = '300' # Just to be safe, have a timeout
args = [
"timeout",
timeout,
"hdfs",
"dfs",
"-fs",
'hdfs://'+server,
"-stat",
'"Size: %b"',
os.path.join('/',path, remotename)
]
p = run_subprocess(args, capture=True)
try:
return compare(p.stdout, localname)
except RuntimeError as e:
logger.error(e)
elif output.startswith('srm://') or output.startswith('gsiftp://'):
if len(os.environ["LOBSTER_GFAL_COPY"]) > 0:
# FIXME gfal is very picky about its environment
prg = [os.environ["LOBSTER_GFAL_COPY"].replace('copy','stat')]
args = prg + [
os.path.join(output, remotename),
]
pruned_env = dict(env)
for k in ['LD_LIBRARY_PATH', 'PATH']:
pruned_env[k] = ':'.join([x for x in os.environ[k].split(':') if 'CMSSW' not in x])
p = run_subprocess(args, env=pruned_env, capture=True)
try:
return compare(p.stdout, localname)
except RuntimeError as e:
logger.error(e)

else:
logger.info('Skipping gfal-based file check because no gfal executable defined in wrapper.')



return True
# If we get here, we tried all of the other methods and never returned a True, so return false
return False


@check_execution(exitcode=211, update={'stageout_exit_code': 211, 'output_size': 0}, timing='stage_out_end')
def check_outputs(data, config):
logger.info('Checking output files...')
for local, remote in config['output files']:
logger.info(' Checking {0} => {1}'.format(local,remote))
if not check_output(config, local, remote):
raise IOError("could not verify output file '{}'".format(remote))

Expand Down Expand Up @@ -552,6 +594,35 @@ def copy_inputs(data, config, env):
else:
logger.error('Unable to copy input with Chirp')
data['transfers']['chirp']['stage-in failure'] += 1
elif input.startswith("hdfs://"):
logger.info("Trying hdfs client access method")
server, path = re.match("hdfs://([a-zA-Z0-9:.\-]+)/(.*)", input).groups()
server = "hdfs://"+server
remotename = os.path.join('/',path, file)

timeout = '300' # Just to be safe, have a timeout
args = [
"timeout",
timeout,
"hdfs",
"dfs",
"-fs",
server,
"-get",
remotename,
os.path.basename(file)
]
p = run_subprocess(args, env=env)
if p.returncode == 0:
logger.info('Successfully copied input with hdfs client')
filename = 'file:' + os.path.basename(file)
config['mask']['files'].append(filename)
config['file map'][filename] = file
data['transfers']['hdfs']['stage-in success'] += 1
break
else:
logger.error('Unable to copy input with hdfs client')
data['transfers']['hdfs']['stage-in failure'] += 1
else:
logger.warning('skipping unhandled stage-in method: {0}'.format(input))
else:
Expand Down Expand Up @@ -622,7 +693,9 @@ def copy_outputs(data, config, env):
logger.info("attempting stage-out with `shutil.copy2('{0}', '{1}')`".format(localname, rn))
try:
shutil.copy2(localname, rn)
logger.info('Checking output file transfer.')
if check_output(config, localname, remotename):
logger.info('File transfer successful!')
transferred.append(localname)
target_se.append(default_se)
data['transfers']['file']['stageout success'] += 1
Expand All @@ -637,7 +710,7 @@ def copy_outputs(data, config, env):
prg = [os.environ["LOBSTER_LCG_CP"], "-b", "-v", "-D", "srmv2", "--sendreceive-timeout", "600"]
elif len(os.environ["LOBSTER_GFAL_COPY"]) > 0:
# FIXME gfal is very picky about its environment
prg = [os.environ["LOBSTER_GFAL_COPY"]]
prg = [os.environ["LOBSTER_GFAL_COPY"], "-f"]
else:
data['transfers'][protocol]['stageout failure'] += 1
continue
Expand All @@ -658,7 +731,9 @@ def copy_outputs(data, config, env):
pruned_env['LD_LIBRARY_PATH'] = ldpath

p = run_subprocess(args, env=pruned_env)
logger.info('Checking output file transfer.')
if p.returncode == 0 and check_output(config, localname, remotename):
logger.info('File transfer successful!')
transferred.append(localname)
match = server_re.match(args[-1])
if match:
Expand All @@ -681,7 +756,9 @@ def copy_outputs(data, config, env):
server,
os.path.join(path, remotename)]
p = run_subprocess(args, env=env)
logger.info('Checking output file transfer.')
if p.returncode == 0 and check_output(config, localname, remotename):
logger.info('File transfer successful!')
transferred.append(localname)
match = server_re.match(args[-1])
if match:
Expand All @@ -690,6 +767,35 @@ def copy_outputs(data, config, env):
break
else:
data['transfers']['chirp']['stageout failure'] += 1
elif output.startswith("hdfs://"):
server, path = re.match("hdfs://([a-zA-Z0-9:.\-]+)/(.*)", output).groups()
server = "hdfs://"+server

timeout = '300' # Just to be safe, have a timeout
args = [
"timeout",
timeout,
"hdfs",
"dfs",
"-fs",
server,
"-put",
localname,
os.path.join('/',path, remotename),
]

p = run_subprocess(args, env=env)
logger.info('Checking output file transfer.')
if p.returncode == 0 and check_output(config, localname, remotename):
logger.info('File transfer successful!')
transferred.append(localname)
match = server_re.match(args[-1])
if match:
target_se.append(match.group(1))
data['transfers']['hdfs']['stageout success'] += 1
break
else:
data['transfers']['hdfs']['stageout failure'] += 1
else:
logger.warning('skipping unhandled stage-out method: {0}'.format(output))

Expand Down Expand Up @@ -869,7 +975,7 @@ def get_bare_size(filename):
raise IOError("Can't open ROOT file '{0}'".format(filename))

size = 0
for treename in ("Events", "Runs", "Lumis"):
for treename in ("Events", "Runs", "LuminosityBlocks"):
if not rootfile.GetListOfKeys().Contains(treename):
rootfile.Close()
raise IOError("Can't find tree '{1}' in ROOT file '{0}'".format(filename, treename))
Expand Down Expand Up @@ -1069,40 +1175,9 @@ def write_zipfiles(data):
logger.propagate = False
logger.setLevel(logging.DEBUG)

with open('report.json', 'r') as fd:
with open('report.json.in', 'r') as fd:
data = json.load(fd)
data['transfers'] = defaultdict(Counter)
data = {
'files': {
'info': {},
'output_info': {},
'skipped': [],
},
'cache': {
'start_size': 0,
'end_size': 0,
'type': 2,
},
'task_exit_code': 0,
'exe_exit_code': 0,
'stageout_exit_code': 0,
'cpu_time': 0,
'events_written': 0,
'output_size': 0,
'output_bare_size': 0,
'output_storage_element': '',
'task_timing': {
'stage_in_end': 0,
'prologue_end': 0,
'wrapper_start': 0,
'wrapper_ready': 0,
'processing_end': 0,
'epilogue_end': 0,
'stage_out_end': 0,
},
'events_per_run': 0,
'transfers': defaultdict(Counter)
}

configfile = sys.argv[1]
with open(configfile) as f:
Expand Down
4 changes: 0 additions & 4 deletions lobster/core/data/wrapper.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@ log() {

date +%s > t_wrapper_start

# WorkQueue does not like it if we transfer report.json in with the same
# name, so rename it here.
mv report.json.in report.json

log "startup" "wrapper started" "echo -e 'hostname: $(hostname)\nkernel: $(uname -a)'"

log "trace" "tracing google" traceroute -w 1 www.google.com
Expand Down
2 changes: 1 addition & 1 deletion lobster/core/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ def __setup_inputs(self):
(self.siteconf, 'siteconf', False),
(os.path.join(os.path.dirname(__file__), 'data', 'wrapper.sh'), 'wrapper.sh', True),
(os.path.join(os.path.dirname(__file__), 'data', 'task.py'), 'task.py', True),
(os.path.join(os.path.dirname(__file__), 'data', 'report.json'), 'report.json.in', True),
(os.path.join(os.path.dirname(__file__), 'data', 'report.json.in'), 'report.json.in', True),
(self.parrot_bin, 'bin', True),
(self.parrot_lib, 'lib', True),
]
Expand Down
1 change: 0 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
'core/data/siteconf/PhEDEx/storage.xml',
'core/data/merge_cfg.py',
'core/data/merge_reports.py',
'core/data/report.json',
'core/data/report.json.in',
'commands/data/index.html',
'commands/data/gh.png',
Expand Down

0 comments on commit c601ae9

Please sign in to comment.