-
Notifications
You must be signed in to change notification settings - Fork 3
/
post_processing.py
123 lines (103 loc) · 4.31 KB
/
post_processing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
#############################################################################
## post_processing is a script that scans BIDS EEG folder and checks its
## validdity, removes duplicated entries in tsv files.
#############################################################################
## Copyright (c) 2018-2019, University of Liège
## Author: Nikita Beliy
## Owner: Liege University https://www.uliege.be
## Credits: [{credit_list}]
## Version: 0.74
## Maintainer: Nikita Beliy
## Email: [email protected]
## Status: developpement
#############################################################################
## This file is part of eegBidsCreator
## eegBidsCreator is free software: you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation, either version 2 of the License, or
## (at your option) any later version.
## eegBidsCreator is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
## You should have received a copy of the GNU General Public License
## along with eegBidsCreator. If not, see <https://www.gnu.org/licenses/>.
############################################################################
import os
import glob
import logging
import json
import tools.tools as tools
logFormatter = logging.Formatter(
"[%(levelname)-7.7s]:%(asctime)s:%(name)s %(message)s",
datefmt='%m/%d/%Y %H:%M:%S')
Logger = logging.getLogger()
Logger.setLevel(getattr(logging, "INFO"))
consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(logFormatter)
Logger.addHandler(consoleHandler)
Logger.info(">>>>>>>>>>>>>>>>>>>>>>>>")
Logger.info("Starting post-processing")
Logger.info("<<<<<<<<<<<<<<<<<<<<<<<<")
Logger.debug(str(os.sys.argv))
Logger.debug("Process PID: " + str(os.getpid()))
path = os.sys.argv[1]
Logger.info("Path: {}".format(path))
dirs = [os.path.basename(d) for d in glob.glob(path + "/sub-*")]
Logger.info("Reading field definitions")
fields = []
with open(path + "/participants.json", "r") as j:
fields = list(json.load(j).keys())
Logger.info("Reading subjects list")
f = open(path + "/participants.tsv")
subj = [l.split("\t") for l in set(f.readlines())]
f.close()
subj = [s for s in subj if s[0] in dirs]
subj.sort(key=lambda s: s[0])
if subj == [] or dirs == []:
raise Exception("Couldn't find subjects in {}".format(path))
subj_names = [s[0] for s in subj]
for d in dirs:
if d not in subj_names:
Logger.warning("Subject {} is not in list of participants".format(d))
tools.rrm(path + "/sub-" + d)
# Add security measure if writ
Logger.info("Writting subject list")
with open(path + "/participants.tsv", "w") as f:
print("\t".join(fields), file=f)
for s in subj:
if len(fields) != len(s):
Logger.warning("Subject {} fields mismatch description".format(s[0]))
print("\t".join(s), end='', file=f)
fields = []
Logger.info("Scnning for scan list")
for sc in glob.glob(path + "/**/*scans.tsv", recursive=True):
loc_path = os.path.dirname(sc)
Logger.debug("Scan fount at {}".format(loc_path))
with open(sc, "r") as f:
files = [fl.split('\t') for fl in set(f.readlines())]
files = [fl for fl in files if os.path.isfile(loc_path + "/" + fl[0])]
with open(sc[:-4] + ".json", "r") as j:
fields = list(json.load(j).keys())
full_list = glob.glob(loc_path + "/*/*", recursive=True)
prefixes = []
for pr in files:
bn = os.path.basename(pr[0])
prefixes.append(bn[:bn.rfind('_')])
for f in full_list:
found = False
for pr in prefixes:
if pr in f:
found = True
break
if not found:
Logger.warning("file: {}, prefix not found.".format(f))
tools.rrm(f)
files.sort(key=lambda time:time[1])
with open(sc, "w") as f:
print("\t".join(fields), file=f)
for s in files:
if len(fields) != len(s):
Logger.warning("Scan {} fields mismatch description".format(s[0]))
print("\t".join(s), end='', file=f)
tools.remove_empty_dirs(path)