Skip to content

Commit

Permalink
Use json file for job args 5288 (#5370)
Browse files Browse the repository at this point in the history
* add input_args.json to  proj_dir/local

* use --jobId arg when calling CMSRunAnalysis.*

* do not write job specific JSON files anymiore

* remove unsopported stageout option

* remove code which supported TW versions used in 2024
  • Loading branch information
belforte authored Feb 17, 2025
1 parent 8b8da2a commit c9bcc3d
Showing 1 changed file with 28 additions and 122 deletions.
150 changes: 28 additions & 122 deletions src/python/CRABClient/Commands/preparelocal.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# tell pylint to accept some old style which is needed for python2
# pylint: disable=unspecified-encoding, raise-missing-from
"""
The commands prepares a directory and the relative scripts to execute the jobs locally.
It can also execute a specific job if the jobid option is passed
Expand Down Expand Up @@ -37,7 +39,7 @@ def __call__(self):
self.logger.info("Getting input files into tmp dir %s" % tmpDir)
self.getInputFiles()

with(open('input_args.json')) as fd:
with(open('input_args.json')) as fd: # this file is created by DagmanCreator in the TW
inputArgs = json.load(fd)

if self.options.jobid:
Expand Down Expand Up @@ -74,33 +76,15 @@ def getInputFiles(self):

if not status in ['UPLOADED', 'SUBMITTED']:
raise ClientException('Can only execute jobs from tasks in status SUBMITTED or UPLOADED. Current status is %s' % status)
# new way first
# following try-except can be removed and only the code in the try kept once
# there are no more tasks wubmitted with TW version v3.241018 or earlier
try: # this will fail with old tasks
inputsFilename = os.path.join(os.getcwd(), 'InputFiles.tar.gz')
sandboxFilename = os.path.join(os.getcwd(), 'sandbox.tar.gz')
downloadFromS3(crabserver=self.crabserver, filepath=inputsFilename,
objecttype='runtimefiles', taskname=taskname, logger=self.logger)
downloadFromS3(crabserver=self.crabserver, filepath=sandboxFilename,
objecttype='sandbox', logger=self.logger,
tarballname=sandboxName, username=username)
with tarfile.open(inputsFilename) as tf:
tf.extractall()
except:
# old way for taks submitted "some time ago". They should better have bootstrapped
# so webdir should be defined.
self.logger.info('Task was submitted with old TaskWorker, fall back to WEB_DIR for tarballs')
from ServerUtilities import getProxiedWebDir
from CRABClient.UserUtilities import curlGetFileFromURL
webdir = getProxiedWebDir(crabserver=self.crabserver, task=taskname,
logFunction=self.logger.debug)
httpCode = curlGetFileFromURL(webdir + '/InputFiles.tar.gz', inputsFilename, self.proxyfilename,
logger=self.logger)
if httpCode != 200:
raise ClientException("Failed to download 'InputFiles.tar.gz' from %s" % webdir)
with tarfile.open(inputsFilename) as tf:
tf.extractall()
inputsFilename = os.path.join(os.getcwd(), 'InputFiles.tar.gz')
sandboxFilename = os.path.join(os.getcwd(), 'sandbox.tar.gz')
downloadFromS3(crabserver=self.crabserver, filepath=inputsFilename,
objecttype='runtimefiles', taskname=taskname, logger=self.logger)
downloadFromS3(crabserver=self.crabserver, filepath=sandboxFilename,
objecttype='sandbox', logger=self.logger,
tarballname=sandboxName, username=username)
with tarfile.open(inputsFilename) as tf:
tf.extractall()

def executeTestRun(self, destDir, jobnr):
"""
Expand All @@ -115,108 +99,35 @@ def prepareDir(self, inputArgs, targetDir):
""" Prepare a directory with just the necessary files:
"""

self.logger.debug("Creating InputArgs.txt file")
# NEW: change to have one json file for each job with the argument for CMSRunAnalysis.py
# then run_job.sh passes it as only argument to CMSRunAnalysis.sh
# keys in that json must have same name as destination attributes for options defined
# in CMSRunAnalysis.py's parser. We take care here of renaming
# KEEP also old code for backward compatibility with tasks created with previous TaskWorker version
# but may also be useful for local submission https://twiki.cern.ch/twiki/bin/view/CMSPublic/CRABPrepareLocal
inputArgsStr = ("-a %(CRAB_Archive)s --sourceURL=%(CRAB_ISB)s"
" --jobNumber=%(CRAB_Id)s --cmsswVersion=%(CRAB_JobSW)s --scramArch=%(CRAB_JobArch)s"
" --inputFile=%(inputFiles)s --runAndLumis=%(runAndLumiMask)s"
"--lheInputFiles=%(lheInputFiles)s"
" --firstEvent=%(firstEvent)s --firstLumi=%(firstLumi)s"
" --lastEvent=%(lastEvent)s --firstRun=%(firstRun)s "
"--seeding=%(seeding)s --scriptExe=%(scriptExe)s "
"--eventsPerLumi=%(eventsPerLumi)s --maxRuntime=%(maxRuntime)s"
" --scriptArgs=%(scriptArgs)s -o %(CRAB_AdditionalOutputFiles)s\n")
# remap key in input_args.json to the argument names required by CMSRunAnalysis.py
# use as : value_of_argument_name = inputArgs[argMap[argument_name]]
argMap = {
'archiveJob': 'CRAB_Archive', 'outFiles': 'CRAB_AdditionalOutputFiles',
'sourceURL': 'CRAB_ISB', 'cmsswVersion': 'CRAB_JobSW',
'scramArch': 'CRAB_JobArch', 'runAndLumis': 'runAndLumiMask',
'inputFile' : 'inputFiles', 'lheInputFiles': 'lheInputFiles',
# the ones below are not changed
'firstEvent': 'firstEvent', 'firstLumi': 'firstLumi', 'lastEvent': 'lastEvent',
'firstRun': 'firstRun', 'seeding': 'seeding', 'scriptExe': 'scriptExe',
'scriptArgs': 'scriptArgs', 'eventsPerLumi': 'eventsPerLumi', 'maxRuntime': 'maxRuntime'
}

# create one JSON argument file for each jobId
jId = 0
for jobArgs in inputArgs:
jId +=1
inputArgsForScript = {}
for key, value in argMap.items():
inputArgsForScript[key] = jobArgs[value]
inputArgsForScript['jobNumber'] = jId
with open("%s/JobArgs-%s.json" % (targetDir, jId), 'w') as fh:
json.dump(inputArgsForScript, fh)

for f in ["gWMS-CMSRunAnalysis.sh", "CMSRunAnalysis.sh", "cmscp.py", "CMSRunAnalysis.tar.gz",
"sandbox.tar.gz", "run_and_lumis.tar.gz", "input_files.tar.gz", "Job.submit",
"submit_env.sh", "splitting-summary.json"
"submit_env.sh", "splitting-summary.json", "input_args.json"
]:
try: # for backward compatibility with TW v3.241017 where splitting-summary.json is missing
shutil.copy2(f, targetDir)
except FileNotFoundError:
pass
try: # for backward compatibility with TW v3.241017 where splitting-summary.json is missing
shutil.copy2(f, targetDir)
except FileNotFoundError:
pass

cmd = "cd %s; tar xf CMSRunAnalysis.tar.gz" % targetDir
execute_command(command=cmd, logger=self.logger)

# this InputArgs.txt is for backward compatibility with old TW
# but may also be useful for local submission https://twiki.cern.ch/twiki/bin/view/CMSPublic/CRABPrepareLocal
with open(os.path.join(targetDir, "InputArgs.txt"), "w") as fd:
for ia in inputArgs:
fd.write(inputArgsStr % ia)

self.logger.debug("Creating run_job.sh file")
#Few observations about the wrapper:
#All the export are done because normally this env is set by condor (see the Environment classad)
#Exception is CRAB3_RUNTIME_DEBUG that is set to avoid the dashboard code to blows up since come classad are not there
#We check the X509_USER_PROXY variable is set otherwise stageout fails
#The "tar xzmf CMSRunAnalysis.tar.gz" is needed because in CRAB3_RUNTIME_DEBUG mode the file is not unpacked (why?)
#Job.submit is also modified to set some things that are condor macro expanded during submission (needed by cmscp)
# Few observations about the wrapper:
# All the export are done because normally this env is set by condor (see the Environment classad)
# Exception is CRAB3_RUNTIME_DEBUG that is set to avoid the dashboard code to blows up since come classad are not there
# We check the X509_USER_PROXY variable is set otherwise stageout fails
# The "tar xzmf CMSRunAnalysis.tar.gz" is needed because in CRAB3_RUNTIME_DEBUG mode the file is not unpacked (why?)
# Job.submit is also modified to set some things that are condor macro expanded during submission (needed by cmscp)
bashWrapper = """#!/bin/bash
. ./submit_env.sh && save_env && setup_local_env
#
export _CONDOR_JOB_AD=Job.${1}.submit
# leading '+' signs must be removed to use JDL as classAd file
sed -e 's/^+//' Job.submit > Job.${1}.submit
./CMSRunAnalysis.sh --jobId ${1}
"""
if self.options.enableStageout:
self.logger.debug("Creating jobsubmit fixup files")
os.makedirs(os.path.join(targetDir, "jobsubmit_fixups"))
i = 1
for ia in inputArgs:
with open(os.path.join(targetDir, "jobsubmit_fixups", "job%s" % i), "w") as fd:
fd.write("""CRAB_localOutputFiles = "%(CRAB_localOutputFiles)s"
CRAB_Destination = "%(CRAB_Destination)s"
""" % ia)
i += 1

bashWrapper += """if [ ! -f "${X509_USER_PROXY}" ]; then
echo "X509_USER_PROXY variable does not point to a valid file"
exit
fi
echo "CRAB_Id = \\\"${1}\\\"" >> Job.${1}.submit
echo 'CRAB_StageoutPolicy = "remote"' >> Job.${1}.submit
echo 'CRAB_AsyncDest = "%s"' >> Job.${1}.submit
echo `grep CRAB_OutTempLFNDir Job.submit | tr -d "+"` >> Job.${1}.submit
echo `grep CRAB_OutLFNDir Job.submit | tr -d "+"` >> Job.${1}.submit
cat jobsubmit_fixups/job${1} >> Job.${1}.submit
""" % self.destination
bashWrapper += './gWMS-CMSRunAnalysis.sh `sed "${1}q;d" InputArgs.txt`'
else:
bashWrapper += '\n./CMSRunAnalysis.sh --json JobArgs-${1}.json'
#bashWrapper += "echo 'CRAB_TransferOutputs = 0' >> Job.${1}.submit\n"
#bashWrapper += "echo 'CRAB_SaveLogsFlag = 0' >> Job.${1}.submit\n"

bashWrapper += "\n" # add new-line at edn of file for easy-to-read

with open(os.path.join(targetDir, "run_job.sh"), "w") as fd:
fd.write(bashWrapper)
Expand All @@ -233,15 +144,10 @@ def setOptions(self):
type="int",
help="Optional id of the job you want to execute locally")

self.parser.add_option("--enableStageout",
dest="enableStageout",
default=False,
action="store_true",
help="After the job runs copy the output file on the storage destination")

self.parser.add_option("--destdir",
dest="destdir",
default=None)
default=None,
help="Optional name of the directory to use, defaults to <projdir>/local")

def validateOptions(self):
SubCommand.validateOptions(self)
Expand Down

0 comments on commit c9bcc3d

Please sign in to comment.