forked from SwinTransformer/Video-Swin-Transformer
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgenerate_file_list.py
151 lines (124 loc) · 4.72 KB
/
generate_file_list.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import argparse
import fnmatch
import glob
import os
import os.path as osp
import mmcv
annotation_root = '../../data/hvu/annotations'
tag_file = 'hvu_tags.json'
args = None
def parse_directory(path,
rgb_prefix='img_',
flow_x_prefix='flow_x_',
flow_y_prefix='flow_y_',
level=1):
"""Parse directories holding extracted frames from standard benchmarks.
Args:
path (str): Directory path to parse frames.
rgb_prefix (str): Prefix of generated rgb frames name.
default: 'img_'.
flow_x_prefix (str): Prefix of generated flow x name.
default: `flow_x_`.
flow_y_prefix (str): Prefix of generated flow y name.
default: `flow_y_`.
level (int): Directory level for glob searching. Options are 1 and 2.
default: 1.
Returns:
dict: frame info dict with video id as key and tuple(path(str),
rgb_num(int), flow_x_num(int)) as value.
"""
print(f'parse frames under directory {path}')
if level == 1:
# Only search for one-level directory
def locate_directory(x):
return osp.basename(x)
frame_dirs = glob.glob(osp.join(path, '*'))
elif level == 2:
# search for two-level directory
def locate_directory(x):
return osp.join(osp.basename(osp.dirname(x)), osp.basename(x))
frame_dirs = glob.glob(osp.join(path, '*', '*'))
else:
raise ValueError('level can be only 1 or 2')
def count_files(directory, prefix_list):
"""Count file number with a given directory and prefix.
Args:
directory (str): Data directory to be search.
prefix_list (list): List or prefix.
Returns:
list (int): Number list of the file with the prefix.
"""
lst = os.listdir(directory)
cnt_list = [len(fnmatch.filter(lst, x + '*')) for x in prefix_list]
return cnt_list
# check RGB
frame_dict = {}
for i, frame_dir in enumerate(frame_dirs):
total_num = count_files(frame_dir,
(rgb_prefix, flow_x_prefix, flow_y_prefix))
dir_name = locate_directory(frame_dir)
num_x = total_num[1]
num_y = total_num[2]
if num_x != num_y:
raise ValueError(f'x and y direction have different number '
f'of flow images in video directory: {frame_dir}')
if i % 200 == 0:
print(f'{i} videos parsed')
frame_dict[dir_name] = (frame_dir, total_num[0], num_x)
print('frame directory analysis done')
return frame_dict
def parse_args():
parser = argparse.ArgumentParser(description='build file list for HVU')
parser.add_argument('--input_csv', type=str, help='path of input csv file')
parser.add_argument(
'--src_dir', type=str, help='source video / frames directory')
parser.add_argument(
'--output',
type=str,
help='output filename, should \
ends with .json')
parser.add_argument(
'--mode',
type=str,
choices=['frames', 'videos'],
help='generate file list for frames or videos')
args = parser.parse_args()
return args
if __name__ == '__main__':
args = parse_args()
tag_cates = mmcv.load(tag_file)
tag2category = {}
for k in tag_cates:
for tag in tag_cates[k]:
tag2category[tag] = k
data_list = open(args.input_csv).readlines()
data_list = [x.strip().split(',') for x in data_list[1:]]
if args.mode == 'videos':
downloaded = os.listdir(args.src_dir)
downloaded = [x.split('.')[0] for x in downloaded]
downloaded_set = set(downloaded)
else:
parse_result = parse_directory(args.src_dir)
downloaded_set = set(parse_result)
def parse_line(line):
tags, youtube_id, start, end = line
start, end = int(float(start) * 10), int(float(end) * 10)
newname = f'{youtube_id}_{start:06d}_{end:06d}'
tags = tags.split('|')
all_tags = {}
for tag in tags:
category = tag2category[tag]
all_tags.setdefault(category,
[]).append(tag_cates[category].index(tag))
return newname, all_tags
data_list = [parse_line(line) for line in data_list]
data_list = [line for line in data_list if line[0] in downloaded_set]
if args.mode == 'frames':
result = [
dict(
frame_dir=k[0], total_frames=parse_result[k[0]][1], label=k[1])
for k in data_list
]
elif args.mode == 'videos':
result = [dict(filename=k[0] + '.mp4', label=k[1]) for k in data_list]
mmcv.dump(result, args.output)