-
Notifications
You must be signed in to change notification settings - Fork 15
/
problem.py
371 lines (300 loc) · 11.9 KB
/
problem.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
import json
import os
import typing
from dataclasses import dataclass
from os.path import join as pjoin
import numpy as np
import pandas as pd
from rampwf.prediction_types.base import BasePrediction
from rampwf.score_types import BaseScoreType
from rampwf.workflows import Estimator
from sklearn.model_selection import GroupShuffleSplit
DATA_HOME = "data"
RANDOM_STATE = 777
THRESHOLD_IoU = 0.75
rng = np.random.RandomState(RANDOM_STATE)
# --------------------------------------
# 0) Utils to manipulate data
# --------------------------------------
@dataclass
class WalkSignal:
"""Wrapper class around a numpy array containing a walk signal (with metadata)"""
trial_code: str
age: int
gender: str
height: float
weight: int
bmi: float
laterality: str
sensor: str
pathology_group: str
is_control: str
foot: str # left or right
signal: typing.Any # numpy array or pandas dataframe
@classmethod
def load_from_file(cls, code, data_home=DATA_HOME):
fname = pjoin(data_home, code)
with open(fname + ".json", "r") as file_handle:
metadata = json.load(file_handle)
signal = pd.read_csv(fname + ".csv", sep=",") # left and right feet
left_foot_cols = [
"LAV",
"LAX",
"LAY",
"LAZ",
"LRV",
"LRX",
"LRY",
"LRZ",
]
right_foot_cols = [
"RAV",
"RAX",
"RAY",
"RAZ",
"RRV",
"RRX",
"RRY",
"RRZ",
]
left_foot = cls(
trial_code=code,
age=metadata["Age"],
gender=metadata["Gender"],
height=metadata["Height"],
weight=metadata["Weight"],
bmi=metadata["BMI"],
laterality=metadata["Laterality"],
sensor=metadata["Sensor"],
pathology_group=metadata["PathologyGroup"],
is_control=metadata["IsControl"],
foot="Left",
signal=signal[left_foot_cols].rename(columns=lambda name: name[1:]),
)
right_foot = cls(
trial_code=code,
age=metadata["Age"],
gender=metadata["Gender"],
height=metadata["Height"],
weight=metadata["Weight"],
bmi=metadata["BMI"],
laterality=metadata["Laterality"],
sensor=metadata["Sensor"],
pathology_group=metadata["PathologyGroup"],
is_control=metadata["IsControl"],
foot="Right",
signal=signal[right_foot_cols].rename(
columns=lambda name: name[1:]
),
)
return left_foot, right_foot
def load_steps(code, data_home=DATA_HOME):
"""Return two lists of steps (left and right feet).
Arguments:
code {str} -- code of the trial, e.g. "2-10"
Keyword Arguments:
data_home {str} -- folder where the files lie (default: {DATA_HOME})
Returns:
[tuple(list)] -- two lists of steps (left foot, right foot)
"""
fname = pjoin(data_home, code)
with open(fname + ".json", "r") as file_handle:
metadata = json.load(file_handle)
return metadata["LeftFootActivity"], metadata["RightFootActivity"]
def _read_data(path, train_or_test="train"):
"""Return the list of signals and steps for the train or test data set
Arguments:
path {str} -- folder where the train and test data are
Keyword Arguments:
train_or_test {str} -- train or test (default: {"train"})
Returns:
[tupe(List[WalkSignal], List)] -- (list of signals, list of lists of steps)
"""
folder = pjoin(path, DATA_HOME, train_or_test)
code_list = [
fname.split(".")[0]
for fname in os.listdir(folder)
if fname.endswith(".csv")
]
test = os.getenv("RAMP_TEST_MODE", 0) # are we in test mode
if test:
code_sublist = rng.choice(code_list, 5, replace=False)
else:
code_sublist = code_list
X = list()
y = list()
for code in code_sublist:
left_signal, right_signal = WalkSignal.load_from_file(code, folder)
left_steps, right_steps = load_steps(code, folder)
X.extend((left_signal, right_signal))
y.extend((left_steps, right_steps))
return X, np.array(y, dtype=list)
# --------------------------------------
# 2) Object implementing the score type
# --------------------------------------
def _check_step_list(step_list):
"""Some sanity checks."""
for step in step_list:
assert len(step) == 2, f"A step consists of a start and an end: {step}."
start, end = step
assert start < end, f"start should be before end: {step}."
def inter_over_union(interval_1, interval_2):
"""Intersection over union for two intervals."""
a, b = interval_1
c, d = interval_2
intersection = max(0, min(b, d) - max(a, c))
if intersection > 0:
union = max(b, d) - min(a, c)
else:
union = (b - a) + (d - c)
return intersection / union
def _step_detection_precision(step_list_true, step_list_pred):
"""Precision is the number of correctly predicted steps divided by the number of predicted
steps. A predicted step is counted as correct if it overlaps an annotated step (measured by the
"intersection over union" metric) by more than 75%.
Note that an annotated step can only be detected once. If several predicted steps correspond
to the same annotated step, all but one are considered as false.
Here, precision is computed on a single prediction task (all steps correspond to the same
signal).
The lists y_true_ and y_pred are lists of steps, for instance:
- step_list_true: [[357, 431], [502, 569], [633, 715], [778, 849], [907, 989]]
- step_list_pred: [[293, 365], [422, 508], [565, 642], [701, 789]]
Arguments:
step_list_true {List} -- list of true steps
step_list_pred {List} -- list of predicted steps
Returns:
float -- precision, between 0.0 and 1.0
"""
_check_step_list(step_list_pred)
if len(step_list_pred) == 0: # empty prediction
return 0.0
n_correctly_predicted = 0
detected_index_set = set() # set of index of detected true steps
for step_pred in step_list_pred:
for (index, step_true) in enumerate(step_list_true):
if (index not in detected_index_set) and (
inter_over_union(step_pred, step_true) > THRESHOLD_IoU
):
n_correctly_predicted += 1
detected_index_set.add(index)
break
return n_correctly_predicted / len(step_list_pred)
def _step_detection_recall(step_list_true, step_list_pred):
"""Recall is the number of detected annotated steps divided by the total number of annotated
steps. An annotated step is counted as detected if it overlaps a predicted step (measured by
the "intersection over union" metric) by more than 75%.
Note that an annotated step can only be detected once. If several annotated steps are detected
with the same predicted step, all but one are considered undetected.
Here, recall is computed on a single prediction task (all steps correspond to the same
signal).
The lists y_true_ and y_pred are lists of steps, for instance:
- step_list_true: [[357, 431], [502, 569], [633, 715], [778, 849], [907, 989]]
- step_list_pred: [[293, 365], [422, 508], [565, 642], [701, 789]]
Arguments:
step_list_true {List} -- list of true steps
step_list_pred {List} -- list of predicted steps
Returns:
float -- recall, between 0.0 and 1.0
"""
_check_step_list(step_list_pred)
n_detected_true = 0
predicted_index_set = set() # set of indexes of predicted steps
for step_true in step_list_true:
for (index, step_pred) in enumerate(step_list_pred):
if (index not in predicted_index_set) and (
inter_over_union(step_pred, step_true) > THRESHOLD_IoU
):
n_detected_true += 1
predicted_index_set.add(index)
break
return n_detected_true / len(step_list_true)
class FScoreStepDetection(BaseScoreType):
is_lower_the_better = False
minimum = 0.0
maximum = 1.0
def __init__(self, name="F-score (step detection)", precision=3):
self.name = name
self.precision = precision
def __call__(self, y_true, y_pred) -> float:
"""
Calculate f-score (geometric mean between precision and recall) for each instance (each
signal) and return the weighted average over instances.
The lists y_true_ and y_pred are lists of lists of steps, for instance:
- y_true: [[[907, 989]] [[357, 431], [502, 569]], [[633, 715], [778, 849]]]
- y_pred: [[[293, 365]], [[422, 508], [565, 642]], [[701, 789]]]
Arguments:
y_true {List} -- true steps
y_pred {List} -- predicted steps
Returns:
float -- f-score, between 0.0 and 1.0
"""
# to prevent throwing an exception when passing empty lists
if len(y_true) == 0:
return 0
fscore_list = list()
for (step_list_true, step_list_pred) in zip(y_true, y_pred):
prec = _step_detection_precision(step_list_true, step_list_pred)
rec = _step_detection_recall(step_list_true, step_list_pred)
if prec + rec < 1e-6:
fscore_list.append(0.0)
else:
fscore_list.append((2 * prec * rec) / (prec + rec))
return np.mean(fscore_list)
# --------------------------------------
# 3) Prediction types
# --------------------------------------
class _Predictions(BasePrediction):
def __init__(self, y_pred=None, y_true=None, n_samples=None):
"""Essentially the same as in a regression task, but the prediction is a list not a float."""
if y_pred is not None:
self.y_pred = np.array(y_pred, dtype=list)
elif y_true is not None:
self.y_pred = np.array(y_true, dtype=list)
elif n_samples is not None:
# self.n_columns == 0:
shape = n_samples
self.y_pred = np.empty(shape, dtype=list)
self.y_pred.fill(np.nan)
else:
raise ValueError(
"Missing init argument: y_pred, y_true, or n_samples"
)
self.check_y_pred_dimensions()
@property
def valid_indexes(self):
"""Return valid indices (e.g., a cross-validation slice)."""
if len(self.y_pred.shape) == 1:
return ~pd.isnull(self.y_pred)
elif len(self.y_pred.shape) == 2:
return ~pd.isnull(self.y_pred[:, 0])
else:
raise ValueError("y_pred.shape > 2 is not implemented")
def check_y_pred_dimensions(self):
pass
@classmethod
def combine(cls, predictions_list, index_list=None):
"""Dummy function. Here, combining consists in taking the first prediction."""
combined_predictions = cls(y_pred=predictions_list[0].y_pred)
return combined_predictions
def make_step_detection():
return _Predictions
# --------------------------------------
# 4) Ramp problem definition
# --------------------------------------
problem_title = "Step Detection with Inertial Measurement Units"
Predictions = make_step_detection()
workflow = Estimator()
score_types = [FScoreStepDetection(name="F-score (step detection)")]
def get_train_data(path="."):
return _read_data(path, "train")
def get_test_data(path="."):
return _read_data(path, "test")
def get_cv(X, y):
"""
In this cross-validation scheme, for a single trial, the left and right signals are
not in different folds and test/train sets, therefore the cross-validation is
stratified according to the `trial_code` attribute.
"""
cv = GroupShuffleSplit(n_splits=5, test_size=0.2, random_state=RANDOM_STATE)
code_list = [signal.trial_code for signal in X]
return cv.split(X, y, code_list)