-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgenerate_variant_quality_tsv.py
145 lines (116 loc) · 5.87 KB
/
generate_variant_quality_tsv.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
from datetime import date
from os.path import exists
from sys import path
from pprint import pprint
import argparse
import os
import sys
import read_resource_jsons as read_resources
def main():
# Parse the command line
args = parseCommandLine()
# Define the executable bcftools command
if args.tools_directory:
if not args.tools_directory.endswith('/'): args.tools_directory = str(args.tools_directory) + '/'
bcftools = args.tools_directory + 'bcftools/bcftools'
else:
bcftools = 'bcftools'
# Open output tsv files to write annotations to
vq_output_file = open('variant_quality.tsv', 'w')
print('CHROM\tSTART\tEND\tREF\tALT\t', args.uid, file = vq_output_file)
# Store the samples that must have the minimum GQ value
if args.parents:
parents = args.parents.split(',') if ',' in args.parents else [args.parents]
trio_output_file = open('trio_quality.tsv', 'w')
if not args.trio_uid:
fail('parents have been provided and so --trio_uid (-d) must also be provided')
print('CHROM\tSTART\tEND\tREF\tALT\t', args.trio_uid, file = trio_output_file)
if args.family:
family = args.family.split(',') if ',' in args.family else [args.parents]
family_output_file = open('family_quality.tsv', 'w')
if not args.family_uid:
fail('additional family members have been provided and so the --family_uid (-m) must also be provided')
print('CHROM\tSTART\tEND\tREF\tALT\t', args.family_uid, file = family_output_file)
# Loop over all records in the vcf file
command = bcftools + ' query -f \'%CHROM\\t%POS\\t%END\\t%REF\\t%ALT\\t%INFO/het_low_qual\\t%INFO/het_hi_qual\\t%INFO/hom_low_qual\\t%INFO/hom_hi_qual\\t%INFO/fam_geno' + '\\n\' ' + str(args.input_vcf)
for record in os.popen(command).readlines():
# Split the record on tabs
fields = record.rstrip().split('\t')
fields[0], fields[2] = updateCoords(fields[0], fields[2])
# Since all samples in the vcf are evaluated for quailty, there could be sample ids associated with multiple of
# the variant quality annotations
variant_quality = 'Fail'
trio_quality = False
family_quality = False
if args.proband in fields[5] or args.proband in fields[7]:
variant_quality = 'Fail'
elif args.proband in fields[6] or args.proband in fields[8]:
variant_quality = 'Pass'
# If additional samples were provided, check if they are present in the fam_geno annotation. If the proband quality is
# Fail, there is no need for this check
if args.parents:
trio_quality = True
for sample in parents:
if sample not in fields[9]:
trio_quality = False
break
if args.family:
family_quality = True
for sample in family:
if sample not in fields[9]:
family_quality = False
break
# If the proband quality is fail, but there are other samples that are high quality, mark this as Fail+
if variant_quality == 'Fail':
if fields[6] != '.' or fields[8] != '.':
variant_quality = 'Fail+'
print('\t'.join(fields[0:5]), '\t', variant_quality, sep = '', file = vq_output_file)
if trio_quality:
print('\t'.join(fields[0:5]), '\tPass', sep = '', file = trio_output_file)
if family_quality:
print('\t'.join(fields[0:5]), '\tPass', sep = '', file = family_output_file)
# Close the output tsv file
vq_output_file.close()
if args.parents:
trio_output_file.close()
if args.family:
family_output_file.close()
# Input options
def parseCommandLine():
global version
parser = argparse.ArgumentParser(description='Process the command line')
# Required arguments
parser.add_argument('--input_vcf', '-i', required = True, metavar = 'string', help = 'The input vcf file to annotate')
parser.add_argument('--tools_directory', '-t', required = True, metavar = 'string', help = 'The path to the directory where the tools live')
parser.add_argument('--proband', '-p', required = True, metavar = 'string', help = 'The name of the proband as it appears in the vcf header')
# The uids of the quality annotations. The trio and family quality uids should only be included if --parents and --family are used
parser.add_argument('--uid', '-u', required = True, metavar = 'string', help = 'The uid of the variant quality attribute')
parser.add_argument('--trio_uid', '-d', required = False, metavar = 'string', help = 'The uid of the trio variant quality attribute')
parser.add_argument('--family_uid', '-m', required = False, metavar = 'string', help = 'The uid of the family variant quality attribute')
# Optionally list the sample names of the mother and father. If these are provided, generate the additional family quality annotation to
# determine whether the inheritance can be believed
parser.add_argument('--parents', '-a', required = False, metavar = 'string', help = 'A comma separated list of the sample names of the mother and father')
parser.add_argument('--family', '-f', required = False, metavar = 'string', help = 'A comma separated list of the sample names of additional family members that should be considered in the family quality filter')
return parser.parse_args()
# Update the chromosome and position in the tsv file
def updateCoords(chrom, pos):
# Check that the chromosome does not include "chr" prefix
if chrom.startswith('chr'): chrom = chrom.strip('chr')
# Add one to the end position
pos = str(int(pos) + 1)
# Return the updated values
return chrom, pos
# Add the value to the fields list ensuring that it is a valid value
def updateFields(fields, value):
# Ensuret he value is under the 255 character limit
if len(value) > 254: value = 'HGVS code too long'
# Append the value and return
fields.append(value)
return fields
# If the script fails, provide an error message and exit
def fail(message):
print('ERROR: ', message, sep = '')
exit(1)
# Initialise global variables
if __name__ == "__main__":
main()