Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start move to dask histEFT #422

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
6 changes: 3 additions & 3 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ jobs:
run: |
mkdir dir_for_topcoffea
cd dir_for_topcoffea
git clone https://github.com/TopEFT/topcoffea.git
git clone -b dask_hists https://github.com/TopEFT/topcoffea.git
cd topcoffea
conda run -n coffea-env pip install -e .
cd ../..
Expand Down Expand Up @@ -116,9 +116,9 @@ jobs:
run: |
conda run -n coffea-env pytest --cov=./ --cov-report=xml -rP --cov-append tests/test_make_1d_quad_plots.py

- name: Run topeft processors over test files with futures executor
- name: Run topeft processors over test files with dask executor
run: |
conda run -n coffea-env pytest --cov=./ --cov-report=xml -rP --cov-append tests/test_futures.py
conda run -n coffea-env pytest --cov=./ --cov-report=xml -rP --cov-append tests/test_dask.py

- name: Get topeft yields
run: |
Expand Down
196 changes: 105 additions & 91 deletions analysis/topeft_run2/analysis_processor.py

Large diffs are not rendered by default.

181 changes: 92 additions & 89 deletions analysis/topeft_run2/run_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,24 @@
import cloudpickle
import gzip
import os
from functools import partial

from coffea import processor
from coffea.nanoevents import NanoAODSchema
import dask
import ndcctools.taskvine as vine

import topcoffea.modules.utils as utils
import topcoffea.modules.remote_environment as remote_environment

#import warnings
#warnings.filterwarnings("error", module="coffea.*")
from coffea.nanoevents import NanoEventsFactory, NanoAODSchema
from coffea.dataset_tools import preprocess, filter_files

from topeft.modules.dataDrivenEstimation import DataDrivenProducer
from topeft.modules.get_renormfact_envelope import get_renormfact_envelope
import analysis_processor

LST_OF_KNOWN_EXECUTORS = ["futures","work_queue"]
LST_OF_KNOWN_EXECUTORS = ["dask","taskvine"]

WGT_VAR_LST = [
"nSumOfWeights_ISRUp",
Expand All @@ -36,8 +42,8 @@

parser = argparse.ArgumentParser(description='You can customize your run')
parser.add_argument('jsonFiles' , nargs='?', default='', help = 'Json file(s) containing files and metadata')
parser.add_argument('--executor','-x' , default='work_queue', help = 'Which executor to use')
parser.add_argument('--prefix', '-r' , nargs='?', default='', help = 'Prefix or redirector to look for the files')
parser.add_argument('--executor','-x' , default='taskvine', help = 'Which executor to use')
parser.add_argument('--prefix', '-r' , nargs='?', default='.', help = 'Prefix or redirector to look for the files')
parser.add_argument('--test','-t' , action='store_true' , help = 'To perform a test, run over a few events in a couple of chunks')
parser.add_argument('--pretend' , action='store_true', help = 'Read json files but, not execute the analysis')
parser.add_argument('--nworkers','-n' , default=8 , help = 'Number of workers')
Expand Down Expand Up @@ -89,7 +95,7 @@
if not do_np:
raise Exception("Error: Cannot specify do_renormfact_envelope if we have not already done the integration across the appl axis that occurs in the data driven estimator step.")
if dotest:
if executor == "futures":
if executor == "dask":
nchunks = 2
chunksize = 10000
nworkers = 1
Expand All @@ -102,7 +108,7 @@
ecut_threshold = args.ecut
if ecut_threshold is not None: ecut_threshold = float(args.ecut)

if executor == "work_queue":
if executor == "taskvine":
# construct wq port range
port = list(map(int, args.port.split('-')))
if len(port) < 1:
Expand Down Expand Up @@ -181,9 +187,11 @@ def LoadJsonToSampleName(jsonFile, prefix):

flist = {}
nevts_total = 0
for sname in samplesdict.keys():
for sname, info in samplesdict.items():
redirector = samplesdict[sname]['redirector']
flist[sname] = [(redirector+f) for f in samplesdict[sname]['files']]
flist[sname] = {'files': {}}
for f in info['files']:
flist[sname]['files'][f"{redirector}/{f}"] = treename
samplesdict[sname]['year'] = samplesdict[sname]['year']
samplesdict[sname]['xsec'] = float(samplesdict[sname]['xsec'])
samplesdict[sname]['nEvents'] = int(samplesdict[sname]['nEvents'])
Expand Down Expand Up @@ -240,40 +248,26 @@ def LoadJsonToSampleName(jsonFile, prefix):
else:
print('No Wilson coefficients specified')

processor_instance = analysis_processor.AnalysisProcessor(samplesdict,wc_lst,hist_lst,ecut_threshold,do_errors,do_systs,split_lep_flavor,skip_sr,skip_cr)

if executor == "work_queue":
executor_args = {
'master_name': '{}-workqueue-coffea'.format(os.environ['USER']),

# find a port to run work queue in this range:
'port': port,

'debug_log': 'debug.log',
'transactions_log': 'tr.log',
'stats_log': 'stats.log',
'tasks_accum_log': 'tasks.log',
scheduler_opts = {}
if executor == "taskvine":
manager_args = {
"name": "{}-taskvine-coffea".format(os.environ["USER"]),
"port": port,
}

'environment_file': remote_environment.get_environment(
extra_pip_local = {"topeft": ["topeft", "setup.py"]},
vine_mgr = vine.DaskVine(**manager_args)
executor_args = {
"environment": remote_environment.get_environment(
extra_pip_local={"topeft": ["topeft", "setup.py"]},
),
'extra_input_files': ["analysis_processor.py"],

'retries': 5,

# use mid-range compression for chunks results. 9 is the default for work
# queue in coffea. Valid values are 0 (minimum compression, less memory
# usage) to 16 (maximum compression, more memory usage).
'compression': 9,

# automatically find an adequate resource allocation for tasks.
# tasks are first tried using the maximum resources seen of previously ran
# tasks. on resource exhaustion, they are retried with the maximum resource
# values, if specified below. if a maximum is not specified, the task waits
# forever until a larger worker connects.
'resource_monitor': True,
'resources_mode': 'auto',

"extra_files": {
vine_mgr.declare_file(
"analysis_processor.py", cache=True
): "analysis_processor.py"
},
"retries": 5,
"resources_mode": None,
# this resource values may be omitted when using
# resources_mode: 'auto', but they do make the initial portion
# of a workflow run a little bit faster.
Expand All @@ -285,64 +279,73 @@ def LoadJsonToSampleName(jsonFile, prefix):
# mode will use the values specified here, so workers need to be at least
# this large. If left unspecified, tasks will use whole workers in the
# exploratory mode.
# "resources": {
# 'cores': 1,
# 'disk': 8000, #MB
# 'memory': 10000, #MB

# control the size of accumulation tasks. Results are
# accumulated in groups of size chunks_per_accum, keeping at
# most chunks_per_accum at the same time in memory per task.
'chunks_per_accum': 25,
'chunks_accum_in_mem': 2,

# terminate workers on which tasks have been running longer than average.
# This is useful for temporary conditions on worker nodes where a task will
# be finish faster is ran in another worker.
# the time limit is computed by multipliying the average runtime of tasks
# by the value of 'fast_terminate_workers'. Since some tasks can be
# legitimately slow, no task can trigger the termination of workers twice.
#
# warning: small values (e.g. close to 1) may cause the workflow to misbehave,
# as most tasks will be terminated.
#
# Less than 1 disables it.
'fast_terminate_workers': 0,

# print messages when tasks are submitted, finished, etc.,
# together with their resource allocation and usage. If a task
# fails, its standard output is also printed, so we can turn
# off print_stdout for all tasks.
'verbose': True,
'print_stdout': False,
# 'disk': 8000, # MB
# 'memory': 10000, # MB
# }
}

vine_sch = partial(vine_mgr.get, **executor_args)
scheduler_opts = {"scheduler": vine_sch}

# Run the processor and get the output
tstart = time.time()

if executor == "futures":
exec_instance = processor.futures_executor(workers=nworkers)
runner = processor.Runner(exec_instance, schema=NanoAODSchema, chunksize=chunksize, maxchunks=nchunks)
elif executor == "work_queue":
executor = processor.WorkQueueExecutor(**executor_args)
runner = processor.Runner(executor, schema=NanoAODSchema, chunksize=chunksize, maxchunks=nchunks, skipbadfiles=False, xrootdtimeout=180)

output = runner(flist, treename, processor_instance)
dataset_runable, dataset_updated = preprocess(
flist,
align_clusters=False,
step_size=chunksize,
files_per_batch=10,
skip_bad_files=False,
save_form=True,
uproot_options={"xrootdtimeout": 180},
**scheduler_opts
)
dataset_runable = filter_files(dataset_runable)

# trim steps if maximum number of chunks specified:
if nchunks:
for info in dataset_runable.values():
for fdict in info["files"].values():
fdict["steps"] = fdict["steps"][0:nchunks]

events = {}
for name, info in dataset_runable.items():
events[name] = NanoEventsFactory.from_root(
dataset_runable[name]["files"],
schemaclass=NanoAODSchema,
metadata={"dataset": name},
).events()

processor_instance = analysis_processor.AnalysisProcessor(
samplesdict,
wc_lst,
hist_lst,
ecut_threshold,
do_errors,
do_systs,
split_lep_flavor,
skip_sr,
skip_cr,
)

to_compute = {}
for name, events_of_name in events.items():
to_compute[name] = processor_instance.process(events_of_name)

(output, ) = dask.compute(to_compute, **scheduler_opts)

dt = time.time() - tstart

if executor == "work_queue":
print('Processed {} events in {} seconds ({:.2f} evts/sec).'.format(nevts_total,dt,nevts_total/dt))

#nbins = sum(sum(arr.size for arr in h.eval({}).values()) for h in output.values() if isinstance(h, hist.Hist))
#nfilled = sum(sum(np.sum(arr > 0) for arr in h.eval({}).values()) for h in output.values() if isinstance(h, hist.Hist))
#print("Filled %.0f bins, nonzero bins: %1.1f %%" % (nbins, 100*nfilled/nbins,))

if executor == "futures":
print("Processing time: %1.2f s with %i workers (%.2f s cpu overall)" % (dt, nworkers, dt*nworkers, ))
print(
"Processed {} events in {} seconds ({:.2f} evts/sec).".format(
nevts_total, dt, nevts_total / dt
)
)

# Save the output
if not os.path.isdir(outpath): os.system("mkdir -p %s"%outpath)
out_pkl_file = os.path.join(outpath,outname+".pkl.gz")
if not os.path.isdir(outpath):
os.system("mkdir -p %s" % outpath)
out_pkl_file = os.path.join(outpath, outname + ".pkl.gz")
print(f"\nSaving output in {out_pkl_file}...")
with gzip.open(out_pkl_file, "wb") as fout:
cloudpickle.dump(output, fout)
Expand Down
7 changes: 5 additions & 2 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ name: coffea-env
channels:
- conda-forge
dependencies:
- conda-forge::python=3.9
- conda-forge::python=3.10
- numpy=1.23.5
- coffea=0.7.21
- coffea
- ndcctools
- conda-pack
- dill
- xrootd
- git
- pyyaml
- dask
- dask-awkward
- dask-histogram
8 changes: 6 additions & 2 deletions tests/test_futures.py → tests/test_dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,16 @@ def test_topcoffea():
"python",
"analysis/topeft_run2/run_analysis.py",
"-x",
"futures",
"dask",
"input_samples/sample_jsons/test_samples/UL17_private_ttH_for_CI.json",
"-o",
"output_check_yields",
"-p",
"analysis/topeft_run2/histos/"
"analysis/topeft_run2/histos/",
"--nchunks",
"1",
"--chunksize",
"1000"
]

# Run TopCoffea
Expand Down
2 changes: 1 addition & 1 deletion tests/test_workqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,6 @@ def test_topcoffea_wq():

# Run TopCoffea
with factory:
subprocess.run(args, cwd="analysis/topeft_run2", timeout=400)
subprocess.run(args, cwd="analysis/topeft_run2", timeout=600)

assert (exists('analysis/topeft_run2/histos/output_check_yields_wq.pkl.gz'))
Loading
Loading