-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathadmix_calculator.py
151 lines (122 loc) · 5.23 KB
/
admix_calculator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# -*- coding: utf-8 -*-
import numpy as np
import scipy.optimize as optimize
from admix_model import *
# 祖源计算器类
class AdmixCalculator:
def __init__(self):
self.admixModel = AdmixModel()
# 计算用户每个alleles等位基因相对于祖源模型的ref和alt基因型的突变总次数
def __get_geno_stat(
self, user_genome: dict, rsid: list, ref_allele: list, alt_allele: list
) -> tuple:
ref_allele_count = []
alt_allele_count = []
# allele点位匹配计数
match_count = 0
for i in range(len(rsid)):
ref_allele_count.append(0)
alt_allele_count.append(0)
if rsid[i] in user_genome:
match_count += 1
user_geno = user_genome.get(rsid[i])
if len(user_geno) == 2:
allele1 = user_geno[0]
allele2 = user_geno[1]
elif len(user_geno) == 1:
allele1 = user_geno[0]
allele2 = user_geno[0]
else:
allele1 = "-"
allele2 = "-"
if allele1 == ref_allele[i]:
ref_allele_count[i] += 1
if allele2 == ref_allele[i]:
ref_allele_count[i] += 1
if allele1 == alt_allele[i]:
alt_allele_count[i] += 1
if allele2 == alt_allele[i]:
alt_allele_count[i] += 1
# 匹配的allele点位在祖源模型中的占比
model_match_ratio = match_count / len(rsid) if len(rsid) > 0 else 0
# 匹配的allele点位在用户数据中的占比
user_match_ratio = match_count / len(user_genome) if len(user_genome) > 0 else 0
return (
np.array(ref_allele_count),
np.array(alt_allele_count),
model_match_ratio,
user_match_ratio,
)
# 损失函数
def __loss_func(
self,
ref_allele_count: list,
alt_allele_count: list,
frequency: list,
admix_ratio: list,
) -> float:
# 计算用户所有等位基因产生ref和alt突变频率的加权平均值
ref_frq_mean = np.dot(frequency, admix_ratio)
alt_frq_mean = np.dot(1 - frequency, admix_ratio)
# 计算用户所有等位基因发生ref和alt突变的联合概率,取对数,连乘变连和
ref_pr = np.dot(
ref_allele_count, np.log(np.where(ref_frq_mean > 0, ref_frq_mean, 1))
)
alt_pr = np.dot(
alt_allele_count, np.log(np.where(alt_frq_mean > 0, alt_frq_mean, 1))
)
# 返回负值作为损失值
return -(ref_pr + alt_pr)
# 根据用户基因数据,祖源模型,使用极大似然估计计算用户的祖源成分
def calc_admix(
self, user_genome: dict, admix_model_name: str, opt_tol: float = 1e-4
) -> dict:
# 获取指定的祖源模型信息
admix_info = self.admixModel.get_model_info(admix_model_name)
admix_count = len(admix_info["admix"])
if admix_count < 1:
raise Exception("祖源模型“{}”没有人群成分".format(admix_model_name))
# 获取指定祖源模型的rsid, alleles数据源
rsid, ref_allele, alt_allele, frequency = self.admixModel.get_model_data(
admix_model_name
)
# 祖源模型的SNP数量
snp_count = len(rsid)
if admix_count != len(frequency[0]):
raise Exception(
"祖源模型“{}”的信息文件和数据文件的人群成分数量不一致".format(
admix_model_name
)
)
# 计算用户基因相对于祖源模型的突变情况
ref_allele_count, alt_allele_count, user_match_ratio, model_match_ratio = (
self.__get_geno_stat(user_genome, rsid, ref_allele, alt_allele)
)
# 损失函数的参数初始值
initial_guess = np.ones(admix_count) / admix_count
# 损失函数的参数约束条件
bounds = tuple((0, 1) for i in range(admix_count))
constraints = {"type": "eq", "fun": lambda ratio: np.sum(ratio) - 1}
# 计算用户祖源成分的最优值
res = optimize.minimize(
fun=lambda ratio: self.__loss_func(
ref_allele_count, alt_allele_count, frequency, ratio
),
x0=initial_guess,
bounds=bounds,
constraints=constraints,
tol=opt_tol,
)
if res.success:
admix_ratio = res.x
else:
admix_ratio = np.zeros(admix_count)
# 用户祖源成分值写入祖源模型
for i, admix in enumerate(admix_info["admix"]):
admix["ratio"] = admix_ratio[i]
admix_info["snp_count"] = snp_count
admix_info["user_match_ratio"] = user_match_ratio
admix_info["model_match_ratio"] = model_match_ratio
# 对祖源成分比例降序排序
admix_info["admix"].sort(key=lambda admix: admix["ratio"], reverse=True)
return admix_info