From 165431a469aa3fee0bc62b75800dad0792f8bebc Mon Sep 17 00:00:00 2001 From: Eddie McGrady Date: Tue, 29 Apr 2025 18:45:45 +0200 Subject: [PATCH] mspilup check added --- get_files_on_disk.py | 103 +++++++++++++++++++++++++++---------------- 1 file changed, 65 insertions(+), 38 deletions(-) diff --git a/get_files_on_disk.py b/get_files_on_disk.py index e35ff29..0df5914 100755 --- a/get_files_on_disk.py +++ b/get_files_on_disk.py @@ -1,9 +1,14 @@ #!/usr/bin/env python3 -"""Returns a list of files from a dataset including only files that are hosted on disk.""" +""" +Returns a list of files from a dataset including only files that are hosted on disk. +Requires access to MSPileup and the Rucio client. +""" -import os,sys,getpass,warnings,glob,shlex,subprocess,argparse # pylint: disable=multiple-imports +import os,sys,getpass,warnings,glob,shlex,subprocess,argparse,requests # pylint: disable=multiple-imports +from rucio.client.client import Client from collections import defaultdict +from pwd import getpwuid # prevent this script from ever being used in a batch job # to avoid DDOS of Rucio @@ -17,46 +22,68 @@ def getOS(): osv = subprocess.check_output(shlex.split(cmd), encoding="utf-8").rstrip() return osv +def getX509(): + if os.getenv('X509_USER_PROXY'): + return os.getenv('X509_USER_PROXY') + elif os.path.exists(f'/tmp/x509up_u{getpwuid(os.getuid()).pw_uid}'): + return f'/tmp/x509up_u{getpwuid(os.getuid()).pw_uid}' + else: + raise Exception('An active X509 proxy must be used to run this script') + def getHosted(dataset, user, allow=None, block=None): """Gets list of files on disk for a dataset, and list of sites along with how many files each site has""" if allow is not None and block is not None: raise RuntimeError("Cannot specify both allow list and block list, pick one") - osv = getOS() - rucio_path = f'/cvmfs/cms.cern.ch/rucio/x86_64/rhel{osv}/py3/current' - os.environ['RUCIO_HOME'] = rucio_path - os.environ['RUCIO_ACCOUNT'] = user - full_rucio_path = glob.glob(rucio_path+'/lib/python*.*')[0] - sys.path.insert(0,full_rucio_path+'/site-packages/') - - warnings.filterwarnings("ignore", message=".*cryptography.*") - from rucio.client.client import Client # pylint: disable=import-error,import-outside-toplevel - client = Client() - - # loop over blocks to avoid timeout error from too-large response - all_blocks = list(client.list_content(scope='cms',name=dataset)) - # batch some blocks together for fewer requests - # not fully optimized, but n=10 tested to be ~15% faster than n=1 - nblocks = 10 - block_groups = [all_blocks[i:i+nblocks] for i in range(0, len(all_blocks), nblocks)] - - from rucio.client.replicaclient import ReplicaClient # pylint: disable=import-error,import-outside-toplevel - rep_client = ReplicaClient() - - filelist = set() - sitelist = defaultdict(int) - def sitecond(site): - return ("_Tape" not in site) and (allow is None or site in allow) and (block is None or site not in block) - for block_group in block_groups: - reps = list(rep_client.list_replicas([{'scope': 'cms', 'name': block['name']} for block in block_group])) - for rep in reps: - for site,state in rep['states'].items(): - if state=='AVAILABLE' and sitecond(site): - filelist.add(rep['name']) - sitelist[site] += 1 - - sys.path.pop(0) - return filelist, sitelist + #check if a customName is defined for the dataset + + proxy = getX509() + msPileup = requests.get(f'https://cmsweb-prod.cern.ch/ms-pileup/data/pileup?pileupName={dataset}', + cert=proxy, verify=False).json()['result'] + if len(msPileup) > 0: + customName = msPileup[0].get('customName', '') + client = Client() + if customName: + scope = 'group.wmcore' + dataset = customName + else: + scope = 'cms' + rules = list(client.list_replication_rules({'name': dataset, 'account': 'wmcore_pileup', 'scope': scope})) + if (len(rules) > 0) and msPileup['active']: + osv = getOS() + rucio_path = f'/cvmfs/cms.cern.ch/rucio/x86_64/rhel{osv}/py3/current' + os.environ['RUCIO_HOME'] = rucio_path + os.environ['RUCIO_ACCOUNT'] = user + full_rucio_path = glob.glob(rucio_path+'/lib/python*.*')[0] + sys.path.insert(0,full_rucio_path+'/site-packages/') + + all_blocks = list(client.list_content(scope=scope, name=dataset)) + print(f'Found {len(all_blocks)} for dataset {dataset}...') + # loop over blocks to avoid timeout error from too-large response + # not fully optimized, but n=10 tested to be ~15% faster than n=1 + nblocks = 10 + block_groups = [all_blocks[i:i+nblocks] for i in range(0, len(all_blocks), nblocks)] + + filelist = set() + sitelist = defaultdict(int) + def sitecond(site): + return ("_Tape" not in site) and (allow is None or site in allow) and (block is None or site not in block) + for block_group in block_groups: + reps = list(client.list_replicas([{'scope': scope, 'name': block['name']} for block in block_group])) + for rep in reps: + for site,state in rep['states'].items(): + if state=='AVAILABLE' and sitecond(site): + filelist.add(rep['name']) + sitelist[site] += 1 + + sys.path.pop(0) + return filelist, sitelist + else: + print('The given container either has no active rules or is not active in MSPileup') + return None, None + else: + print('The given container was not found on MSPileup') + return None, None def main(dataset, user, outfile=None, verbose=False, allow=None, block=None): """Prints file list and site list""" @@ -85,4 +112,4 @@ def main(dataset, user, outfile=None, verbose=False, allow=None, block=None): parser.add_argument("dataset",type=str,help="dataset to query") args = parser.parse_args() - main(args.dataset, args.user, outfile=args.outfile, verbose=args.verbose, allow=args.allow, block=args.block) + main(args.dataset, args.user, outfile=args.outfile, verbose=args.verbose, allow=args.allow, block=args.block) \ No newline at end of file