Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion basset/built.sbt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
val Versions = new {
val Aggregator = "0.3.1-SNAPSHOT"
val Aggregator = "0.3.3-SNAPSHOT"
val Scala = "2.13.5"
}

Expand Down
2 changes: 1 addition & 1 deletion bioindex/built.sbt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
val Versions = new {
val Aggregator = "0.3.2-SNAPSHOT"
val Aggregator = "0.3.3-SNAPSHOT"
val Scala = "2.13.5"
}

Expand Down
2 changes: 1 addition & 1 deletion bottom-line/built.sbt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
val Versions = new {
val Aggregator = "0.3.2-SNAPSHOT"
val Aggregator = "0.3.3-SNAPSHOT"
val Scala = "2.13.5"
}

Expand Down
2 changes: 1 addition & 1 deletion burden-binning/built.sbt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
val Versions = new {
val Aggregator = "0.3.2-SNAPSHOT"
val Aggregator = "0.3.3-SNAPSHOT"
val Scala = "2.13.5"
}

Expand Down
22 changes: 18 additions & 4 deletions config.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
{
"aws": {
"s3": {
"bucket": "dig-analysis-data"
"input": {
"bucket": "dig-analysis-private",
"subdir": "satoshi_20240202"
},
"output": {
"bucket": "dig-analysis-private",
"subdir": "satoshi_20240202"
},
"bioindex": {
"bucket": "dig-analysis-private",
"subdir": "satoshi_20240202/bioindex"
},
"emr": {
"subnetIds": [
Expand All @@ -13,8 +22,13 @@
"jobFlowRoleId": "dig-aggregator-role",
"securityGroupIds": []
},
"rds": {
"instance": "dig-analysis-state"
"runs": {
"instance": "dig-analysis-state",
"dbOverride": "aggregator_satoshi_20240202"
},
"portal": {
"instance": "dig-bio-portal",
"dbOverride": "portal_satoshi_20240202"
}
}
}
2 changes: 1 addition & 1 deletion gene-associations/built.sbt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
val Versions = new {
val Aggregator = "0.3.2-SNAPSHOT"
val Aggregator = "0.3.3-SNAPSHOT"
val Scala = "2.13.2"
}

Expand Down
2 changes: 1 addition & 1 deletion huge/built.sbt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
val Versions = new {
val Aggregator = "0.3.2-SNAPSHOT"
val Aggregator = "0.3.3-SNAPSHOT"
val Scala = "2.13.10"
}

Expand Down
2 changes: 1 addition & 1 deletion intake/built.sbt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
val Versions = new {
val Aggregator = "0.3.2-SNAPSHOT"
val Aggregator = "0.3.3-SNAPSHOT"
val Scala = "2.13.2"
}

Expand Down
4 changes: 2 additions & 2 deletions intake/src/main/resources/processor_bootstrap.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ LDSC_ROOT=/mnt/var/intake
sudo mkdir -p "$LDSC_ROOT"
cd "$LDSC_ROOT"

sudo aws s3 cp s3://dig-analysis-data/bin/qc/Homo_sapiens.GRCh37.75.dna.primary_assembly.fa ./
sudo aws s3 cp s3://dig-analysis-data/bin/qc/var_to_af.zip ./
sudo aws s3 cp s3://dig-analysis-bin/bin/intake/Homo_sapiens.GRCh37.75.dna.primary_assembly.fa ./
sudo aws s3 cp s3://dig-analysis-bin/bin/intake/var_to_af.zip ./
sudo unzip var_to_af.zip -d ./g1000
sudo rm var_to_af.zip

Expand Down
35 changes: 23 additions & 12 deletions intake/src/main/resources/variantProcessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,25 @@
import sqlalchemy
import subprocess

s3_path = f's3://dig-analysis-data/variants_raw'
s3_output = f's3://dig-analysis-data/variants_processed'
input_path = os.environ['INPUT_PATH']
output_path = os.environ['OUTPUT_PATH']

s3_path = f'{input_path}/variants_raw'
s3_output = f'{output_path}/variants_processed'
data_path = f'/mnt/var/intake'


class BioIndexDB:
class PortalDB:
def __init__(self):
self.secret_id = 'dig-bio-portal'
self.secret_id = os.environ['PORTAL_SECRET']
self.db_name = os.environ['PORTAL_DB']
self.config = None
self.engine = None

def get_config(self):
if self.config is None:
client = Session().client('secretsmanager')
self.config = json.loads(client.get_secret_value(SecretId='dig-bio-portal')['SecretString'])
self.config = json.loads(client.get_secret_value(SecretId=self.secret_id)['SecretString'])
return self.config

def get_engine(self):
Expand All @@ -37,7 +41,7 @@ def get_engine(self):
password=self.config['password'],
host=self.config['host'],
port=self.config['port'],
db=self.config['dbname']
db=self.db_name
))
return self.engine

Expand All @@ -46,16 +50,23 @@ def get_is_dichotomous(self, phenotype_name):
query = sqlalchemy.text(f'SELECT name, dichotomous FROM Phenotypes WHERE name = \"{phenotype_name}\"')
rows = connection.execute(query).all()
if len(rows) != 1:
raise Exception(f"Impossible number of rows returned ({len(rows)}) for phenotype {phenotype_name}."
raise Exception(f"Invalid number of rows returned ({len(rows)}) for phenotype {phenotype_name}."
f"Check the database and try again.")
if rows[0][1] is None:
raise Exception(f"Invalid dichotomous information ({rows[0][1]}) for phenotype {phenotype_name}."
f"Check the database and try again.")
return rows[0][1] == 1

def get_dataset_data(self, dataset_name):
def get_dataset_data(self, dataset):
with self.get_engine().connect() as connection:
query = sqlalchemy.text(f'SELECT name, ancestry FROM Datasets WHERE name = \"{dataset_name}\"')
rows = connection.execute(query).all()
rows = connection.execute(
sqlalchemy.text(f'SELECT name, ancestry FROM Datasets WHERE name = \"{dataset}\"')
).all()
if len(rows) != 1:
raise Exception(f"Impossible number of rows returned ({len(rows)}) for dataset {dataset_name}. "
raise Exception(f"Impossible number of rows returned ({len(rows)}) for phenotype {dataset}. "
f"Check the database and try again.")
if rows[0][0] is None or rows[0][1] is None:
raise Exception(f"Invalid name / ancestry information ({rows[0][0]} / {rows[0][1]}) for dataset {dataset}. "
f"Check the database and try again.")
return {'name': rows[0][0], 'ancestry': rows[0][1]}

Expand Down Expand Up @@ -600,7 +611,7 @@ def main():
subprocess.check_call(['aws', 's3', 'cp', f'{s3_path}/{args.filepath}', filename])
subprocess.check_call(['aws', 's3', 'cp', f'{s3_path}/{path_to_file}/metadata', 'metadata'])

db = BioIndexDB()
db = PortalDB()
utils = IntakeUtilities.from_metadata_file(data_path, db)
data_intake = DataIntake(filename, utils)

Expand Down
13 changes: 9 additions & 4 deletions intake/src/main/resources/variantQC.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
#!/usr/bin/python3

import argparse
import os
import platform
import subprocess

from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType, FloatType, DoubleType

s3dir = 's3://dig-analysis-data'
input_path = os.environ['INPUT_PATH']
output_path = os.environ['OUTPUT_PATH']

s3_in = input_path
s3_out = output_path

variants_schema = StructType([
StructField('varId', StringType(), nullable=False),
Expand Down Expand Up @@ -109,9 +114,9 @@ def split(self, spark_df):
tech, dataset, phenotype = args.method_dataset_phenotype.split('/')

# get the source and output directories (method_dataset is formatted as method/dataset here)
srcdir = f'{s3dir}/variants_processed/{args.method_dataset_phenotype}'
outdir = f'{s3dir}/variants_qc/{args.method_dataset_phenotype}/pass'
qcdir = f'{s3dir}/variants_qc/{args.method_dataset_phenotype}/fail'
srcdir = f'{s3_in}/variants_processed/{args.method_dataset_phenotype}'
outdir = f'{s3_out}/variants_qc/{args.method_dataset_phenotype}/pass'
qcdir = f'{s3_out}/variants_qc/{args.method_dataset_phenotype}/fail'

# create a spark session and dataframe from part files
spark = SparkSession.builder.appName('qc').getOrCreate()
Expand Down
45 changes: 33 additions & 12 deletions intake/src/main/resources/variantScaling.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
#!/usr/bin/python3
import argparse
from boto3.session import Session
import json
import math
import json
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import mean
Expand All @@ -14,20 +14,25 @@
MAF_SCALING_THRESHOLD = 2
FALLBACK_SCALING_THRESHOLD = 5
TRAINING_DATA_MINIMUM_COUNT = 1000
s3dir = 's3://dig-analysis-data'

input_path = os.environ['INPUT_PATH']
output_path = os.environ['OUTPUT_PATH']

s3_in = input_path
s3_out = output_path


class BioIndexDB:
class PortalDB:
def __init__(self):
self.secret_id = 'dig-bio-portal'
self.region = 'us-east-1'
self.secret_id = os.environ['PORTAL_SECRET']
self.db_name = os.environ['PORTAL_DB']
self.config = None
self.engine = None

def get_config(self):
if self.config is None:
client = Session().client('secretsmanager', region_name=self.region)
self.config = json.loads(client.get_secret_value(SecretId='dig-bio-portal')['SecretString'])
client = Session().client('secretsmanager')
self.config = json.loads(client.get_secret_value(SecretId=self.secret_id)['SecretString'])
return self.config

def get_engine(self):
Expand All @@ -39,7 +44,7 @@ def get_engine(self):
password=self.config['password'],
host=self.config['host'],
port=self.config['port'],
db=self.config['dbname']
db=self.db_name
))
return self.engine

Expand All @@ -48,10 +53,26 @@ def get_is_dichotomous(self, phenotype_name):
query = sqlalchemy.text(f'SELECT name, dichotomous FROM Phenotypes WHERE name = \"{phenotype_name}\"')
rows = connection.execute(query).all()
if len(rows) != 1:
raise Exception(f"Impossible number of rows returned ({len(rows)}) for phenotype {phenotype_name}."
raise Exception(f"Invalid number of rows returned ({len(rows)}) for phenotype {phenotype_name}."
f"Check the database and try again.")
if rows[0][1] is None:
raise Exception(f"Invalid dichotomous information ({rows[0][1]}) for phenotype {phenotype_name}."
f"Check the database and try again.")
return rows[0][1] == 1

def get_dataset_data(self, dataset):
with self.get_engine().connect() as connection:
rows = connection.execute(
sqlalchemy.text(f'SELECT name, ancestry FROM Datasets WHERE name = \"{dataset}\"')
).all()
if len(rows) != 1:
raise Exception(f"Impossible number of rows returned ({len(rows)}) for phenotype {dataset}. "
f"Check the database and try again.")
if rows[0][0] is None or rows[0][1] is None:
raise Exception(f"Invalid name / ancestry information ({rows[0][0]} / {rows[0][1]}) for dataset {dataset}. "
f"Check the database and try again.")
return {'name': rows[0][0], 'ancestry': rows[0][1]}


class ScalingLogger:
def __init__(self, log_file):
Expand Down Expand Up @@ -155,12 +176,12 @@ def main():
opts.add_argument('method_dataset_phenotype')
args = opts.parse_args()

db = BioIndexDB()
db = PortalDB()
tech, dataset, phenotype = args.method_dataset_phenotype.split('/')
is_dichotomous = db.get_is_dichotomous(phenotype)

srcdir = f'{s3dir}/variants_qc/{args.method_dataset_phenotype}/pass'
outdir = f'{s3dir}/variants/{args.method_dataset_phenotype}'
srcdir = f'{s3_in}/variants_qc/{args.method_dataset_phenotype}/pass'
outdir = f'{s3_out}/variants/{args.method_dataset_phenotype}'

logger = ScalingLogger('scaling.log')
logger.log(f'Reading from {srcdir}')
Expand Down
2 changes: 1 addition & 1 deletion ldsc/built.sbt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
val Versions = new {
val Aggregator = "0.3.2-SNAPSHOT"
val Aggregator = "0.3.3-SNAPSHOT"
val Scala = "2.13.5"
}

Expand Down
2 changes: 1 addition & 1 deletion magma/built.sbt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
val Versions = new {
val Aggregator = "0.3.2-SNAPSHOT"
val Aggregator = "0.3.3-SNAPSHOT"
val Scala = "2.13.5"
}

Expand Down
2 changes: 1 addition & 1 deletion vep/built.sbt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
val Versions = new {
val Aggregator = "0.3.2-SNAPSHOT"
val Aggregator = "0.3.3-SNAPSHOT"
val Scala = "2.13.5"
}

Expand Down