Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bids ingress #2121

Merged
merged 5 commits into from
Jun 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions CPAC/_entrypoints/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,17 @@ def run_main():
args.skip_bids_validator,
only_one_anat=False,
)
from bids2table import bids2table

table = bids2table(bids_dir, workers=10)
bids_table = table[
(table["ent__ext"].str.contains(".nii"))
& (table["ent__datatype"].notnull())
]
# fillna
bids_table['ent__ses'] = bids_table['ent__ses'].fillna('None')
grouped_tab = bids_table.groupby(["ent__sub", "ent__ses"])

else:
sub_list = load_cpac_data_config(
args.data_config_file, args.participant_label, args.aws_input_creds
Expand Down Expand Up @@ -824,7 +835,7 @@ def run_main():
data_hash = hash_data_config(sub_list)
data_config_file = f"cpac_data_config_{data_hash}_{st}.yml"

sublogdirs = [set_subject(sub, c)[2] for sub in sub_list]
sublogdirs = [set_subject(sub, c)[2] for sub in grouped_tab]
# write out the data configuration file
data_config_file = os.path.join(sublogdirs[0], data_config_file)
with open(data_config_file, "w", encoding="utf-8") as _f:
Expand Down Expand Up @@ -919,7 +930,7 @@ def run_main():

WFLOGGER.info("Starting participant level processing")
exitcode = CPAC.pipeline.cpac_runner.run(
data_config_file,
grouped_tab,
pipeline_config_file,
plugin="MultiProc" if plugin_args["n_procs"] > 1 else "Linear",
plugin_args=plugin_args,
Expand Down
95 changes: 46 additions & 49 deletions CPAC/pipeline/cpac_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@


def run_workflow(
sub_dict,
sub_group,
c,
run,
pipeline_timing_info=None,
Expand All @@ -234,8 +234,8 @@ def run_workflow(

Parameters
----------
sub_dict : dictionary
subject dictionary with anatomical and functional image paths
sub_group :
subject group with anatomical and functional image paths
c : Configuration object
CPAC pipeline configuration dictionary object
run : boolean
Expand Down Expand Up @@ -270,7 +270,7 @@ def run_workflow(
# Assure that changes on config will not affect other parts
c = copy.copy(c)

subject_id, p_name, log_dir = set_subject(sub_dict, c)
subject_id, p_name, log_dir = set_subject(sub_group, c)
c["subject_id"] = subject_id

set_up_logger(
Expand Down Expand Up @@ -352,21 +352,21 @@ def run_workflow(
c.pipeline_setup["system_config"]["max_cores_per_participant"]
) * int(c.pipeline_setup["system_config"]["num_participants_at_once"])

try:
creds_path = sub_dict["creds_path"]
if creds_path and "none" not in creds_path.lower():
if os.path.exists(creds_path):
input_creds_path = os.path.abspath(creds_path)
else:
err_msg = (
f'Credentials path: "{creds_path}" for subject "{subject_id}" was'
" not found. Check this path and try again."
)
raise FileNotFoundError(err_msg)
else:
input_creds_path = None
except KeyError:
input_creds_path = None
# try:
# creds_path = sub_group["creds_path"]
# if creds_path and "none" not in creds_path.lower():
# if os.path.exists(creds_path):
# input_creds_path = os.path.abspath(creds_path)
# else:
# err_msg = (
# f'Credentials path: "{creds_path}" for subject "{subject_id}" was'
# " not found. Check this path and try again."
# )
# raise FileNotFoundError(err_msg)
# else:
# input_creds_path = None
# except KeyError:
# input_creds_path = None

information = """
Environment
Expand Down Expand Up @@ -425,7 +425,6 @@ def run_workflow(
subject_info = {}
subject_info["subject_id"] = subject_id
subject_info["start_time"] = pipeline_start_time

check_centrality_degree = c.network_centrality["run"] and (
len(c.network_centrality["degree_centrality"]["weight_options"]) != 0
or len(c.network_centrality["eigenvector_centrality"]["weight_options"]) != 0
Expand Down Expand Up @@ -469,7 +468,7 @@ def run_workflow(
set_up_random_state_logger(log_dir)

try:
workflow = build_workflow(subject_id, sub_dict, c, p_name)
workflow = build_workflow(subject_id, sub_group, c, p_name)
except Exception as exception:
WFLOGGER.exception("Building workflow failed")
raise exception
Expand Down Expand Up @@ -858,9 +857,7 @@ def initialize_nipype_wf(cfg, sub_data_dct, name=""):
if name:
name = f"_{name}"

workflow_name = (
f'cpac{name}_{sub_data_dct["subject_id"]}_{sub_data_dct["unique_id"]}'
)
workflow_name = f"cpac{name}_{sub_data_dct[0][0]}_{sub_data_dct[0][1]}"
wf = pe.Workflow(name=workflow_name)
wf.base_dir = cfg.pipeline_setup["working_directory"]["path"]
wf.config["execution"] = {
Expand Down Expand Up @@ -1214,37 +1211,37 @@ def connect_pipeline(wf, cfg, rpool, pipeline_blocks):
return wf


def build_workflow(subject_id, sub_dict, cfg, pipeline_name=None):
def build_workflow(subject_id, sub_group, cfg, pipeline_name=None):
"""Build a C-PAC workflow for a single subject."""
from CPAC.utils.datasource import gather_extraction_maps

# Workflow setup
wf = initialize_nipype_wf(cfg, sub_dict, name=pipeline_name)
wf = initialize_nipype_wf(cfg, sub_group, name=pipeline_name)

# Extract credentials path if it exists
try:
creds_path = sub_dict["creds_path"]
if creds_path and "none" not in creds_path.lower():
if os.path.exists(creds_path):
input_creds_path = os.path.abspath(creds_path)
else:
err_msg = (
f'Credentials path: "{creds_path}" for subject "{subject_id}" was'
" not found. Check this path and try again."
)
raise FileNotFoundError(err_msg)
else:
input_creds_path = None
except KeyError:
input_creds_path = None

cfg.pipeline_setup["input_creds_path"] = input_creds_path
# try:
# creds_path = sub_group["creds_path"]
# if creds_path and "none" not in creds_path.lower():
# if os.path.exists(creds_path):
# input_creds_path = os.path.abspath(creds_path)
# else:
# err_msg = (
# f'Credentials path: "{creds_path}" for subject "{subject_id}" was'
# " not found. Check this path and try again."
# )
# raise FileNotFoundError(err_msg)
# else:
# input_creds_path = None
# except KeyError:
# input_creds_path = None

# cfg.pipeline_setup["input_creds_path"] = input_creds_path

# """""""""""""""""""""""""""""""""""""""""""""""""""
# PREPROCESSING
# """""""""""""""""""""""""""""""""""""""""""""""""""

wf, rpool = initiate_rpool(wf, cfg, sub_dict)
wf, rpool = initiate_rpool(wf, cfg, sub_group)

pipeline_blocks = build_anat_preproc_stack(rpool, cfg)

Expand Down Expand Up @@ -1289,8 +1286,8 @@ def build_workflow(subject_id, sub_dict, cfg, pipeline_name=None):

# Distortion/Susceptibility Correction
distcor_blocks = []
if "fmap" in sub_dict:
fmap_keys = sub_dict["fmap"]
if "fmap" in sub_group[0]:
fmap_keys = sub_group[1]["ent__suffix"].values
if "phasediff" in fmap_keys or "phase1" in fmap_keys:
if "magnitude" in fmap_keys or "magnitude1" in fmap_keys:
distcor_blocks.append(distcor_phasediff_fsl_fugue)
Expand Down Expand Up @@ -1626,15 +1623,15 @@ def build_workflow(subject_id, sub_dict, cfg, pipeline_name=None):
]:
if errorstring in lookup_error.args[0]:
missing_key = errorstrings[errorstrings.index(errorstring) + 1]
if missing_key and missing_key.endswith("_bold") and "func" not in sub_dict:
if missing_key and missing_key.endswith("_bold") and "func" not in sub_group:
raise FileNotFoundError(
"The provided pipeline configuration requires functional "
"data but no functional data were found for "
+ "/".join(
[
sub_dict[key]
sub_group[key]
for key in ["site", "subject_id", "unique_id"]
if key in sub_dict
if key in sub_group
]
)
+ ". Please check "
Expand Down
94 changes: 49 additions & 45 deletions CPAC/pipeline/cpac_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ def run_T1w_longitudinal(sublist, cfg):


def run(
subject_list_file,
bids_table,
config_file=None,
p_name=None,
plugin=None,
Expand Down Expand Up @@ -299,20 +299,21 @@ def run(
)

# Init variables
sublist = None
if ".yaml" in subject_list_file or ".yml" in subject_list_file:
subject_list_file = os.path.realpath(subject_list_file)
else:
from CPAC.utils.bids_utils import (
bids_gen_cpac_sublist,
collect_bids_files_configs,
)

(file_paths, config) = collect_bids_files_configs(subject_list_file, None)
sublist = bids_gen_cpac_sublist(subject_list_file, file_paths, config, None)
if not sublist:
WFLOGGER.error("Did not find data in %s", subject_list_file)
return 1
sublist = bids_table

# if ".yaml" in subject_list_file or ".yml" in subject_list_file:
# subject_list_file = os.path.realpath(subject_list_file)
# else:
# from CPAC.utils.bids_utils import (
# bids_gen_cpac_sublist,
# collect_bids_files_configs,
# )

# (file_paths, config) = collect_bids_files_configs(subject_list_file, None)
# sublist = bids_gen_cpac_sublist(subject_list_file, file_paths, config, None)
# if not sublist:
# WFLOGGER.error("Did not find data in %s", subject_list_file)
# return 1

# take date+time stamp for run identification purposes
unique_pipeline_id = strftime("%Y%m%d%H%M%S")
Expand Down Expand Up @@ -400,38 +401,38 @@ def run(
p_name = check_pname(p_name, c)

# Load in subject list
try:
if not sublist:
sublist = yaml.safe_load(open(subject_list_file, "r"))
except:
msg = "Subject list is not in proper YAML format. Please check your file"
raise FileNotFoundError(msg)
# try:
# if not sublist:
# sublist = yaml.safe_load(open(subject_list_file, "r"))
# except:
# msg = "Subject list is not in proper YAML format. Please check your file"
# raise FileNotFoundError(msg)

# Populate subject scan map
sub_scan_map = {}
try:
for sub in sublist:
if sub["unique_id"]:
s = sub["subject_id"] + "_" + sub["unique_id"]
else:
s = sub["subject_id"]
scan_ids = ["scan_anat"]

if "func" in sub:
for id in sub["func"]:
scan_ids.append("scan_" + str(id))

if "rest" in sub:
for id in sub["rest"]:
scan_ids.append("scan_" + str(id))

sub_scan_map[s] = scan_ids
except Exception as e:
msg = (
"\n\nERROR: Subject list file not in proper format - check if you loaded"
" the correct file?\nError name: cpac_runner_0001\n\n"
)
raise ValueError(msg) from e
# sub_scan_map = {}
# try:
# for sub in sublist:
# if sub["unique_id"]:
# s = sub["subject_id"] + "_" + sub["unique_id"]
# else:
# s = sub["subject_id"]
# scan_ids = ["scan_anat"]

# if "func" in sub:
# for id in sub["func"]:
# scan_ids.append("scan_" + str(id))

# if "rest" in sub:
# for id in sub["rest"]:
# scan_ids.append("scan_" + str(id))

# sub_scan_map[s] = scan_ids
# except Exception as e:
# msg = (
# "\n\nERROR: Subject list file not in proper format - check if you loaded"
# " the correct file?\nError name: cpac_runner_0001\n\n"
# )
# raise ValueError(msg) from e

pipeline_timing_info = []
pipeline_timing_info.append(unique_pipeline_id)
Expand Down Expand Up @@ -472,6 +473,7 @@ def run(
if not os.path.exists(c.pipeline_setup["working_directory"]["path"]):
try:
os.makedirs(c.pipeline_setup["working_directory"]["path"])

except:
err = (
"\n\n[!] CPAC says: Could not create the working "
Expand Down Expand Up @@ -662,6 +664,8 @@ def replace_index(target1, target2, file_path):
test_config,
)
except Exception as exception: # pylint: disable=broad-except
print(f"Failed to start {set_subject(sub, c)[2]}")
print(exception)
exitcode = 1
failed_to_start(set_subject(sub, c)[2], exception)
return exitcode
Expand Down
Loading