-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
151 lines (118 loc) · 4.89 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import argparse
import concurrent.futures
import csv
import logging
import os.path
import json
import glob
from typing import Optional
import epidoc
from merge import csv_fieldnames, convert
idp_data_repo = ""
output_dir = "out"
def convert_source(source: str):
try:
with open(os.path.join(output_dir, f"source-{source.lower()}.json"), "w") as result_file:
for doc in glob.glob(os.path.join(idp_data_repo, source, "**", "*.xml"), recursive=True):
with open(doc) as f:
ed = epidoc.load(f)
result_file.write(
json.dumps(
{
"tms": ed.idno.get("tm"),
"file": doc.replace(f"{idp_data_repo}{os.sep}", ""),
}
)
+ "\n"
)
except Exception as e:
return e # returned and not raised due to concurrent execution
def group_by_tm():
result = {}
count_total = 0
count_missing = 0
for doc in glob.glob(os.path.join(output_dir, "source-*.json")):
with open(doc) as source_file:
for line in source_file.readlines():
count_total += 1
ed: dict[str, str] = json.loads(line)
file: str = ed["file"]
tms: Optional[str] = ed.get("tms")
if tms is None:
count_missing += 1
continue
for tm in tms.split(" "):
if tm in result:
result[tm].append(file)
else:
result[tm] = [file]
with open(os.path.join(output_dir, "tms.json"), "w") as tms_f:
for tm, files in result.items():
tms_f.write(json.dumps({"tm": tm, "files": files}) + "\n")
logging.warning(f"{count_missing:,} of {count_total:,} files had no TM number.")
def merge_process_fn(args: list[str, int, int]):
filename, start, stop = args
entries = []
with open(filename) as fh:
fh.seek(start)
for line in fh.readlines(stop - start):
doc = json.loads(line)
tm = doc.get("tm")
files = doc.get("files")
entries.append(convert(tm, files, idp_data_repo=idp_data_repo))
return entries
def merge():
grouped_result_file = os.path.join(output_dir, "tms.json")
file_size = os.path.getsize(grouped_result_file)
split_size = 1024 * 1024
fn_args = []
with open(grouped_result_file) as tms_file:
cursor = 0
for chunk in range(file_size // split_size):
# determine the end of the chunk
if cursor + split_size > file_size:
end = file_size
else:
end = cursor + split_size
# seek the end of the chunk and ensure it is an entire line
tms_file.seek(end)
tms_file.readline()
end = tms_file.tell() # current location
fn_args.append([grouped_result_file, cursor, end])
cursor = end # setup next chunk
entries = []
with concurrent.futures.ProcessPoolExecutor(max_workers=8) as executor:
for fn_result in executor.map(merge_process_fn, fn_args):
if isinstance(fn_result, Exception):
raise fn_result
entries.extend(fn_result)
with open(os.path.join(output_dir, "ipd-data-sheet.csv"), "w") as res_f:
writer = csv.DictWriter(res_f, fieldnames=csv_fieldnames, quoting=csv.QUOTE_ALL)
writer.writeheader()
for line in entries:
writer.writerow(line)
def main(data_path: str, sources: list[str], step: Optional[str]):
# Due to the concurrent function below a global variable is used.
global idp_data_repo
idp_data_repo = data_path
# Step 1: Convert sources to 1 JSON file per source with the required information.
if step is None or step == "convert":
with concurrent.futures.ProcessPoolExecutor(max_workers=8) as executor:
for result in executor.map(convert_source, sources):
if isinstance(result, Exception):
raise result
# Step 2: Group by TM.
if step is None or step == "group":
group_by_tm()
# Step 3: Create sheet
if step is None or step == "merge":
merge()
if __name__ == "__main__":
parser = argparse.ArgumentParser("Convert IDP data to a single CSV file")
parser.add_argument("--path", help="Path to the cloned repository https://github.com/papyri/idp.data", required=True)
parser.add_argument("--step", help="Execute only a single step", choices=["convert", "group", "merge"])
args = parser.parse_args()
path: str = args.path
if not os.path.isdir(path):
raise TypeError(f"{path} is not a directory")
main(path, sources=["APD", "APIS", "DCLP", "DDB_EpiDoc_XML", "HGV_meta_EpiDoc", "HGV_trans_EpiDoc"], step=args.step)