Skip to content

Commit 148e3eb

Browse files
committed
[Intake] Use new config envars, fix config for project
1 parent e7f2084 commit 148e3eb

File tree

5 files changed

+73
-35
lines changed

5 files changed

+73
-35
lines changed

config.json

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
{
22
"aws": {
3-
"s3": {
4-
"inputBucket": "dig-analysis-data",
5-
"outputBucket": "dig-analysis-data",
6-
"bioindexBucket": "dig-bio-index"
3+
"input": {
4+
"bucket": "dig-analysis-data"
5+
},
6+
"output": {
7+
"bucket": "dig-analysis-data"
8+
},
9+
"bioindex": {
10+
"bucket": "dig-bio-index"
711
},
812
"emr": {
913
"subnetIds": [
@@ -15,8 +19,11 @@
1519
"jobFlowRoleId": "dig-aggregator-role",
1620
"securityGroupIds": []
1721
},
18-
"rds": {
22+
"runs": {
1923
"instance": "dig-analysis-state"
24+
},
25+
"portal": {
26+
"instance": "dig-bio-portal"
2027
}
2128
}
2229
}

intake/src/main/resources/processor_bootstrap.sh

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ LDSC_ROOT=/mnt/var/intake
66
sudo mkdir -p "$LDSC_ROOT"
77
cd "$LDSC_ROOT"
88

9-
sudo aws s3 cp s3://dig-analysis-data/bin/qc/Homo_sapiens.GRCh37.75.dna.primary_assembly.fa ./
10-
sudo aws s3 cp s3://dig-analysis-data/bin/qc/var_to_af.zip ./
9+
sudo aws s3 cp s3://dig-analysis-bin/bin/intake/Homo_sapiens.GRCh37.75.dna.primary_assembly.fa ./
10+
sudo aws s3 cp s3://dig-analysis-bin/bin/intake/var_to_af.zip ./
1111
sudo unzip var_to_af.zip -d ./g1000
1212
sudo rm var_to_af.zip
1313

intake/src/main/resources/variantProcessor.py

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,21 +11,25 @@
1111
import sqlalchemy
1212
import subprocess
1313

14-
s3_path = f's3://dig-analysis-data/variants_raw'
15-
s3_output = f's3://dig-analysis-data/variants_processed'
14+
input_path = os.environ['INPUT_PATH']
15+
output_path = os.environ['OUTPUT_PATH']
16+
17+
s3_path = f'{input_path}/variants_raw'
18+
s3_output = f'{output_path}/variants_processed'
1619
data_path = f'/mnt/var/intake'
1720

1821

19-
class BioIndexDB:
22+
class PortalDB:
2023
def __init__(self):
21-
self.secret_id = 'dig-bio-portal'
24+
self.secret_id = os.environ['PORTAL_SECRET']
25+
self.db_name = os.environ['PORTAL_DB']
2226
self.config = None
2327
self.engine = None
2428

2529
def get_config(self):
2630
if self.config is None:
2731
client = Session().client('secretsmanager')
28-
self.config = json.loads(client.get_secret_value(SecretId='dig-bio-portal')['SecretString'])
32+
self.config = json.loads(client.get_secret_value(SecretId=self.secret_id)['SecretString'])
2933
return self.config
3034

3135
def get_engine(self):
@@ -37,7 +41,7 @@ def get_engine(self):
3741
password=self.config['password'],
3842
host=self.config['host'],
3943
port=self.config['port'],
40-
db=self.config['dbname']
44+
db=self.db_name
4145
))
4246
return self.engine
4347

@@ -46,16 +50,23 @@ def get_is_dichotomous(self, phenotype_name):
4650
query = sqlalchemy.text(f'SELECT name, dichotomous FROM Phenotypes WHERE name = \"{phenotype_name}\"')
4751
rows = connection.execute(query).all()
4852
if len(rows) != 1:
49-
raise Exception(f"Impossible number of rows returned ({len(rows)}) for phenotype {phenotype_name}."
53+
raise Exception(f"Invalid number of rows returned ({len(rows)}) for phenotype {phenotype_name}."
54+
f"Check the database and try again.")
55+
if rows[0][1] is None:
56+
raise Exception(f"Invalid dichotomous information ({rows[0][1]}) for phenotype {phenotype_name}."
5057
f"Check the database and try again.")
5158
return rows[0][1] == 1
5259

53-
def get_dataset_data(self, dataset_name):
60+
def get_dataset_data(self, dataset):
5461
with self.get_engine().connect() as connection:
55-
query = sqlalchemy.text(f'SELECT name, ancestry FROM Datasets WHERE name = \"{dataset_name}\"')
56-
rows = connection.execute(query).all()
62+
rows = connection.execute(
63+
sqlalchemy.text(f'SELECT name, ancestry FROM Datasets WHERE name = \"{dataset}\"')
64+
).all()
5765
if len(rows) != 1:
58-
raise Exception(f"Impossible number of rows returned ({len(rows)}) for dataset {dataset_name}. "
66+
raise Exception(f"Impossible number of rows returned ({len(rows)}) for phenotype {dataset}. "
67+
f"Check the database and try again.")
68+
if rows[0][0] is None or rows[0][1] is None:
69+
raise Exception(f"Invalid name / ancestry information ({rows[0][0]} / {rows[0][1]}) for dataset {dataset}. "
5970
f"Check the database and try again.")
6071
return {'name': rows[0][0], 'ancestry': rows[0][1]}
6172

@@ -600,7 +611,7 @@ def main():
600611
subprocess.check_call(['aws', 's3', 'cp', f'{s3_path}/{args.filepath}', filename])
601612
subprocess.check_call(['aws', 's3', 'cp', f'{s3_path}/{path_to_file}/metadata', 'metadata'])
602613

603-
db = BioIndexDB()
614+
db = PortalDB()
604615
utils = IntakeUtilities.from_metadata_file(data_path, db)
605616
data_intake = DataIntake(filename, utils)
606617

intake/src/main/resources/variantQC.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
11
#!/usr/bin/python3
22

33
import argparse
4+
import os
45
import platform
56
import subprocess
67

78
from pyspark.sql import SparkSession
89
from pyspark.sql.functions import lit
910
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType, FloatType, DoubleType
1011

11-
s3dir = 's3://dig-analysis-data'
12+
s3_in = os.environ['INPUT_PATH']
13+
s3_out = os.environ['OUTPUT_PATH']
1214

1315
variants_schema = StructType([
1416
StructField('varId', StringType(), nullable=False),
@@ -109,9 +111,9 @@ def split(self, spark_df):
109111
tech, dataset, phenotype = args.method_dataset_phenotype.split('/')
110112

111113
# get the source and output directories (method_dataset is formatted as method/dataset here)
112-
srcdir = f'{s3dir}/variants_processed/{args.method_dataset_phenotype}'
113-
outdir = f'{s3dir}/variants_qc/{args.method_dataset_phenotype}/pass'
114-
qcdir = f'{s3dir}/variants_qc/{args.method_dataset_phenotype}/fail'
114+
srcdir = f'{s3_in}/variants_processed/{args.method_dataset_phenotype}'
115+
outdir = f'{s3_out}/variants_qc/{args.method_dataset_phenotype}/pass'
116+
qcdir = f'{s3_out}/variants_qc/{args.method_dataset_phenotype}/fail'
115117

116118
# create a spark session and dataframe from part files
117119
spark = SparkSession.builder.appName('qc').getOrCreate()

intake/src/main/resources/variantScaling.py

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
#!/usr/bin/python3
22
import argparse
33
from boto3.session import Session
4-
import json
54
import math
5+
import json
66
import os
77
from pyspark.sql import SparkSession
88
from pyspark.sql.functions import mean
@@ -14,20 +14,22 @@
1414
MAF_SCALING_THRESHOLD = 2
1515
FALLBACK_SCALING_THRESHOLD = 5
1616
TRAINING_DATA_MINIMUM_COUNT = 1000
17-
s3dir = 's3://dig-analysis-data'
17+
18+
s3_in = os.environ['INPUT_PATH']
19+
s3_out = os.environ['OUTPUT_PATH']
1820

1921

20-
class BioIndexDB:
22+
class PortalDB:
2123
def __init__(self):
22-
self.secret_id = 'dig-bio-portal'
23-
self.region = 'us-east-1'
24+
self.secret_id = os.environ['PORTAL_SECRET']
25+
self.db_name = os.environ['PORTAL_DB']
2426
self.config = None
2527
self.engine = None
2628

2729
def get_config(self):
2830
if self.config is None:
29-
client = Session().client('secretsmanager', region_name=self.region)
30-
self.config = json.loads(client.get_secret_value(SecretId='dig-bio-portal')['SecretString'])
31+
client = Session().client('secretsmanager')
32+
self.config = json.loads(client.get_secret_value(SecretId=self.secret_id)['SecretString'])
3133
return self.config
3234

3335
def get_engine(self):
@@ -39,7 +41,7 @@ def get_engine(self):
3941
password=self.config['password'],
4042
host=self.config['host'],
4143
port=self.config['port'],
42-
db=self.config['dbname']
44+
db=self.db_name
4345
))
4446
return self.engine
4547

@@ -48,10 +50,26 @@ def get_is_dichotomous(self, phenotype_name):
4850
query = sqlalchemy.text(f'SELECT name, dichotomous FROM Phenotypes WHERE name = \"{phenotype_name}\"')
4951
rows = connection.execute(query).all()
5052
if len(rows) != 1:
51-
raise Exception(f"Impossible number of rows returned ({len(rows)}) for phenotype {phenotype_name}."
53+
raise Exception(f"Invalid number of rows returned ({len(rows)}) for phenotype {phenotype_name}."
54+
f"Check the database and try again.")
55+
if rows[0][1] is None:
56+
raise Exception(f"Invalid dichotomous information ({rows[0][1]}) for phenotype {phenotype_name}."
5257
f"Check the database and try again.")
5358
return rows[0][1] == 1
5459

60+
def get_dataset_data(self, dataset):
61+
with self.get_engine().connect() as connection:
62+
rows = connection.execute(
63+
sqlalchemy.text(f'SELECT name, ancestry FROM Datasets WHERE name = \"{dataset}\"')
64+
).all()
65+
if len(rows) != 1:
66+
raise Exception(f"Impossible number of rows returned ({len(rows)}) for phenotype {dataset}. "
67+
f"Check the database and try again.")
68+
if rows[0][0] is None or rows[0][1] is None:
69+
raise Exception(f"Invalid name / ancestry information ({rows[0][0]} / {rows[0][1]}) for dataset {dataset}. "
70+
f"Check the database and try again.")
71+
return {'name': rows[0][0], 'ancestry': rows[0][1]}
72+
5573

5674
class ScalingLogger:
5775
def __init__(self, log_file):
@@ -155,12 +173,12 @@ def main():
155173
opts.add_argument('method_dataset_phenotype')
156174
args = opts.parse_args()
157175

158-
db = BioIndexDB()
176+
db = PortalDB()
159177
tech, dataset, phenotype = args.method_dataset_phenotype.split('/')
160178
is_dichotomous = db.get_is_dichotomous(phenotype)
161179

162-
srcdir = f'{s3dir}/variants_qc/{args.method_dataset_phenotype}/pass'
163-
outdir = f'{s3dir}/variants/{args.method_dataset_phenotype}'
180+
srcdir = f'{s3_in}/variants_qc/{args.method_dataset_phenotype}/pass'
181+
outdir = f'{s3_out}/variants/{args.method_dataset_phenotype}'
164182

165183
logger = ScalingLogger('scaling.log')
166184
logger.log(f'Reading from {srcdir}')

0 commit comments

Comments
 (0)