-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathparse_inmode_scan.py
executable file
·125 lines (104 loc) · 4.1 KB
/
parse_inmode_scan.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
'''
Copyright © 2018 Anton Tsukanov. Contacts: [email protected]
License: http://www.gnu.org/licenses/gpl.txt
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
'''
import math
import sys
import os
import argparse
import re
def read_fasta(path):
fasta = list()
with open(path, 'r') as file:
for line in file:
#print(line)
if line.startswith('>'):
line = line[1:].strip().split(':')
record = dict()
record['name'] = int(line[0].split('_')[1])
#record['name'] = line[0]
record['chr'] = line[2]
coordinates_strand = line[3]
start, end = re.findall(r'\d*-\d*', coordinates_strand)[0].split('-')
record['start'] = start
record['end'] = end
strand = re.findall(r'\(.\)', coordinates_strand[:-3])
if not strand == []:
record['strand'] = strand[0].strip('()')
else:
record['strand'] = '+'
else:
record['seq'] = line.strip().upper()
fasta.append(record)
file.close()
return(fasta)
def complement(seq):
return(seq.replace('A', 't').replace('T', 'a').replace('C', 'g').replace('G', 'c').replace('N', 'n').upper()[::-1])
def read_inmode_bed(path):
container = []
with open(path) as file:
for line in file:
n, start, end, strand, score = line.split()
container.append({
'chr': '.',
'start': int(start),
'end': int(end),
'id': int(n),
'score': math.log(float(score), 10),
'strand': strand,
'site': '.'
})
return(container)
def write_bed(data, threshold, path):
with open(path, 'w') as file:
for line in data:
if line['score'] >= threshold:
line = list(line.values())
line = [str(i) for i in line]
file.write('\t'.join(line) + '\n')
else:
continue
return(0)
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument('-if', '--inputFasta', action='store', dest='input_fasta',
required=True, help='path to FASTA file')
parser.add_argument('-bed', '--inputBED', action='store', dest='input_bed',
required=True, help='path to BED file with inmode scan output')
parser.add_argument('-o', '--outputBED', action='store', dest='output',
required=True, help='path to output BED with sites')
parser.add_argument('-t', '--threshold', action='store', dest='threshold',
required=True, type=float, help='Threshold for data filtering')
if len(sys.argv) == 1:
parser.print_help(sys.stderr)
sys.exit(1)
return(parser.parse_args())
def main():
args = parse_args()
input_fasta = args.input_fasta
input_bed = args.input_bed
output = args.output
threshold = args.threshold
fasta = read_fasta(input_fasta)
bed = read_inmode_bed(input_bed)
for index, line in enumerate(bed):
record = fasta[int(line['id'])]
if line['strand'] == '-':
line['site'] = complement(record['seq'][line['start']:line['end']])
else:
line['site'] = record['seq'][line['start']:line['end']]
line['chr'] = record['chr']
line['id'] = 'peaks_' + str(record['name'])
line['start'] = int(line['start']) + int(record['start'])
line['end'] = int(line['end']) + int(record['start'])
write_bed(bed, threshold, output)
if __name__ == '__main__':
main()