forked from ErasmusMC-Bioinformatics/shm_csr
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsplit_imgt_file.py
147 lines (132 loc) · 5.43 KB
/
split_imgt_file.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
#!/usr/bin/env python3
"""
Script to split IMGT file into several archives for each of the genes
Rather than creating each new archive individually this script will read
the input files only once and as such enormously shorten processing time.
"""
import argparse
import io
import os
import tarfile
import tempfile
from typing import Iterator, List, Tuple
def merged_txt_to_match_dict(merged: str):
with open(merged, "rt") as f:
header = next(f).strip("\n")
column_names = header.split("\t")
# For the baseline result there is no best_match column
if "best_match" in column_names:
best_match_index = column_names.index("best_match")
else:
best_match_index = None
sequence_id_index = column_names.index("Sequence.ID")
match_dict = {}
for line in f:
values = line.strip().split("\t")
sequence_id = values[sequence_id_index]
if best_match_index is not None:
best_match = values[best_match_index]
if "unmatched" in best_match:
# For some reason the table has values such as: unmatched, IGA2
continue
else:
best_match = ""
match_dict[sequence_id] = best_match
return match_dict
def imgt_to_tables(imgt_file: str) -> Iterator[Tuple[str, io.TextIOWrapper]]:
print(f"opening IMGT file: {imgt_file}")
with tarfile.open(imgt_file, "r") as archive:
while True:
member = archive.next()
if member is None:
return
if member.name in {"README.txt"}:
continue
if member.name.startswith("11_"):
continue
f = archive.extractfile(member)
f_text = io.TextIOWrapper(f)
yield member.name, f_text
f_text.close()
def split_imgt(imgt_file: str, merged_file: str, outdir: str, genes: List[str],
prefix: str):
"""
This function creates a separate tar file for each of the gene matches
based on the merged file. Unmatched genes are left out.
:param imgt_file: The original IMGT file
:param merged_file: The merged data file generated by SHM&CSR pipeline
:param outdir: The output directory.
:param genes: The genes to split out. Use '-' for all identified genes.
:return:
"""
match_dict = merged_txt_to_match_dict(merged_file)
gene_tarfiles = []
os.makedirs(outdir, exist_ok=True)
for gene in genes:
new_filename = f"{prefix}_{gene}.txz" if gene else f"{prefix}.txz"
gene_tarfiles.append(
tarfile.open(os.path.join(outdir, new_filename), mode="w:xz")
)
for name, table in imgt_to_tables(imgt_file):
# Read each table one by one and per line select in which output
# files it should go.
gene_files = []
for gene in genes:
fp, fname = tempfile.mkstemp()
# The file pointer fp will be wrapped in a python file object
# so we can ensure there remain no open files.
f = open(fp, mode="wt")
gene_files.append((gene, f, fname))
header = next(table)
header_number_of_tabs = header.count('\t')
column_names = header.strip("\n").split("\t")
fr1_columns = [index for index, column in enumerate(column_names)
if column.startswith("FR1")]
sequence_id_index = column_names.index("Sequence ID")
for _, gene_file, _ in gene_files:
gene_file.write(header)
for line in table:
# IMGT sometimes delivers half-empty rows.
row_number_of_tabs = line.count("\t")
missing_tabs = header_number_of_tabs - row_number_of_tabs
if missing_tabs:
line = line.strip("\n") + missing_tabs * "\t" + "\n"
values = line.strip("\n").split("\t")
sequence_id = values[sequence_id_index]
match = match_dict.get(sequence_id)
if match is None:
continue
if name.startswith("8_"):
# change the FR1 columns to 0 in the "8_..." file
for index in fr1_columns:
values[index] = "0"
line = "\t".join(values) + "\n"
for gene, gene_file, _ in gene_files:
if gene in match:
gene_file.write(line)
for gene_tarfile, (_, gene_file, fname) in zip(gene_tarfiles, gene_files):
gene_file.flush()
gene_tarfile.add(fname, name)
gene_file.close()
os.remove(fname)
for gene_tarfile in gene_tarfiles:
gene_tarfile.close()
def argument_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser()
parser.add_argument("imgt_file", help="The original IMGT FILE")
parser.add_argument("merged", help="merged.txt file")
parser.add_argument("--outdir", help="output directory")
parser.add_argument(
"genes",
nargs="+",
help="The genes to split out. Use '-' for all identified genes.")
parser.add_argument("--prefix", help="Prefix for the archives and "
"directories")
return parser
def main():
args = argument_parser().parse_args()
genes = ["" if gene == "-" else gene for gene in args.genes]
split_imgt(args.imgt_file, args.merged, args.outdir, genes,
args.prefix)
if __name__ == "__main__":
main()