forked from bdanzi/spark_tnp
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconverter.py
137 lines (113 loc) · 5.75 KB
/
converter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import os
import subprocess
import glob
import getpass
from pyspark.sql import SparkSession
from registry import registry
from dataset_allowed_definitions import get_allowed_sub_eras
def run_convert(spark, particle, resonance, era, dataTier, subEra, customDir='', baseDir='', use_pog_space=False, use_local=False):
'''
Converts a directory of root files into parquet
'''
if baseDir == '':
if use_pog_space is False:
inDir = os.path.join('hdfs://analytix/user', getpass.getuser())
outDir = os.path.join('hdfs://analytix/user', getpass.getuser())
else:
inDir = 'hdfs://analytix/cms/muon_pog'
outDir = 'hdfs://analytix/cms/muon_pog'
inDir = os.path.join(inDir, 'root', customDir, particle, resonance, era, dataTier, subEra)
outDir = os.path.join(outDir, 'parquet', customDir, particle, resonance, era, dataTier, subEra)
else:
if use_local is False and 'hdfs' not in baseDir:
print('>>>>>>>>> Warning! Custom baseDir given to convert but `useLocalSpark` flag not enabled and no `hdfs` in baseDir.')
print('>>>>>>>>> Distributed spark clusters can only read files in hdfs. Make sure baseDir is an `hdfs` path, e.g.:')
print('>>>>>>>>> `hdfs://analytix/user/[user]/[your-custom-dir]`')
print('>>>>>>>>> Or else, if this is a local test, use the `--useLocalSpark` flag (see command help).')
inDir = baseDir
outDir = baseDir
# The following glob command works only on the edge nodes, which has a fuse-style hdfs mountpoint
# if 'hdfs' in inDir:
# inDir = inDir.replace('hdfs://analytix', '/hdfs/analytix.cern.ch')
# fnames = glob.glob(os.path.join(inDir, f'{inDir}/*.root'))
# else:
# fnames = glob.glob(f'{inDir}/*.root')
# fnames = [f.replace('/hdfs/analytix.cern.ch', 'hdfs://analytix') for f in fnames]
# Make sure path is in hdfs format (not fuse-style format)
inDir = inDir.replace('/hdfs/analytix.cern.ch', 'hdfs://analytix')
outDir = outDir.replace('/hdfs/analytix.cern.ch', 'hdfs://analytix')
# The following glob command works in both lxplus and edge nodes
cmd = "hdfs dfs -find {} -name '*.root'".format(inDir)
fnames = subprocess.check_output(cmd, shell=True).strip().split(b'\n')
fnames = [fname.decode('ascii') for fname in fnames]
outName = os.path.join(outDir, 'tnp.parquet')
if use_local is True and 'hdfs' not in outName:
outName = 'file://' + outName
else:
outName = outName.replace('/hdfs/analytix.cern.ch', 'hdfs://analytix') # just in case
print(f'>>>>>>>>> Path to input root files: {inDir}')
print(f'>>>>>>>>> Path to output parquet files: {outName}')
# treename = 'tpTree/fitter_tree' # old tnp tool tree name
# treename = 'muon/tree' # old miniAOD tree name
treename = 'muon/StandAloneEvents'
print(f'>>>>>>>>> Number of files to process: {len(fnames)}')
if len(fnames) == 0:
print('>>>>>>>>> Error! No ROOT files found to convert with desired options.')
print('>>>>>>>>> Exiting...')
return
print(f'>>>>>>>>> First file: {fnames[0]}')
# process batchsize files at a time
batchsize = 100
new = True
while fnames:
current = fnames[:batchsize]
fnames = fnames[batchsize:]
rootfiles = spark.read.format("root")\
.option('tree', treename)\
.load(current)
rootfiles = rootfiles.select("pair_mass","tag_pt", "tag_isTight","tag_charge","probe_charge","tag_pfIso04_neutral","tag_pfIso04_photon","tag_pfIso04_sumPU","tag_pfIso04_charged","tag_hltL3crIsoL1sSingleMu22L1f0L2f10QL3f24QL3trkIsoFiltered","tag_hltL3fL1sSingleMu22L1f0L2f10QL3Filtered24Q","probe_pt","probe_isTrkMatch","probe_isSA","probeSA_isTrkMatch","probe_eta","nVertices","ls")\
# merge rootfiles. chosen to make files of 8-32 MB (input)
# become at most 1 GB (parquet recommendation)
# https://parquet.apache.org/documentation/latest/
# .coalesce(int(len(current)/32)) \
# but it is too slow for now, maybe try again later
if new:
rootfiles.write.parquet(outName)
new = False
else:
rootfiles.write.mode('append')\
.parquet(outName)
def run_all(particle, resonance, era, dataTier, subEra=None, customDir='', baseDir='', use_pog_space=False, use_local=False):
if subEra is not None:
subEras = [subEra]
else:
subEras = get_allowed_sub_eras(resonance, era)
# subEras by default includes whole era too, so remove for convert
subEras.remove(era)
local_jars = ','.join([
'./laurelin-1.0.0.jar',
'./log4j-api-2.13.0.jar',
'./log4j-core-2.13.0.jar',
])
spark = SparkSession\
.builder\
.appName("TnP")\
.config("spark.jars", local_jars)\
.config("spark.driver.extraClassPath", local_jars)\
.config("spark.executor.extraClassPath", local_jars)\
.config("spark.dynamicAllocation.maxExecutors", "100")\
.config("spark.driver.memory", "10g")\
.config("spark.executor.memory", "10g")\
.config("spark.sql.shuffle.partitions", "500")\
.config("spark.executor.cores", "1")
if use_local is True:
spark = spark.master("local")
spark = spark.getOrCreate()
print('\n\n------------------ DEBUG ----------------')
sc = spark.sparkContext
print(sc.getConf().toDebugString())
print('---------------- END DEBUG ----------------\n\n')
for subEra in subEras:
print('\n>>>>>>>>> Converting:', particle, resonance, era, subEra)
run_convert(spark, particle, resonance, era, dataTier, subEra, customDir, baseDir, use_pog_space, use_local)
spark.stop()