diff --git a/claussnitzerlab/.editorconfig b/claussnitzerlab/.editorconfig new file mode 100644 index 00000000..587935a6 --- /dev/null +++ b/claussnitzerlab/.editorconfig @@ -0,0 +1,14 @@ +root = true + +[*] +insert_final_newline = true + +[*.java] +indent_style = space +indent_size = 4 +trim_trailing_whitespace = true + +[*.{scala,sbt}] +indent_style = space +indent_size = 2 +trim_trailing_whitespace = true diff --git a/claussnitzerlab/.scalafmt.conf b/claussnitzerlab/.scalafmt.conf new file mode 100644 index 00000000..ca683bf8 --- /dev/null +++ b/claussnitzerlab/.scalafmt.conf @@ -0,0 +1,4 @@ +version = "2.4.2" +align=more +docstrings=ScalaDoc +maxColumn=120 diff --git a/claussnitzerlab/LICENSE.txt b/claussnitzerlab/LICENSE.txt new file mode 100644 index 00000000..0d0952ea --- /dev/null +++ b/claussnitzerlab/LICENSE.txt @@ -0,0 +1,29 @@ +Copyright 2020 + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: + +1. Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + +2. Redistributions in binary form must reproduce the above copyright +notice, this list of conditions and the following disclaimer in the +documentation and/or other materials provided with the distribution. + +3. Neither the name of the copyright holder nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE +COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS +OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED +AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH +DAMAGE. diff --git a/claussnitzerlab/README.md b/claussnitzerlab/README.md new file mode 100644 index 00000000..2229a1f7 --- /dev/null +++ b/claussnitzerlab/README.md @@ -0,0 +1,14 @@ +# claussnitzerlab + +This is the documentation about the method. + +Please put some details here about the method, what its inputs are, what its +outputs are, where it reads from, and where it writes to. + +## Stages + +These are the stages of claussnitzerlab. + +### ClaussnitzerlabStage + +A description of what this stage does. diff --git a/claussnitzerlab/built.sbt b/claussnitzerlab/built.sbt new file mode 100644 index 00000000..3a965631 --- /dev/null +++ b/claussnitzerlab/built.sbt @@ -0,0 +1,70 @@ +val Versions = new { + val Aggregator = "0.3.0-SNAPSHOT" + val Scala = "2.13.2" +} + +// set the version of scala to compile with +scalaVersion := Versions.Scala + +// add scala compile flags +scalacOptions ++= Seq( + "-feature", + "-deprecation", + "-unchecked", + "-Ywarn-value-discard" +) + +// add required libraries +libraryDependencies ++= Seq( + "org.broadinstitute.dig" %% "dig-aggregator-core" % Versions.Aggregator +) + +// set the oranization this method belongs to +organization := "org.broadinstitute.dig" + +// entry point when running this method +mainClass := Some("org.broadinstitute.dig.aggregator.methods.claussnitzerlab.Claussnitzerlab") + +// enables buildInfo, which bakes git version info into the jar +enablePlugins(GitVersioning) + +// get the buildInfo task +val buildInfoTask = taskKey[Seq[File]]("buildInfo") + +// define execution code for task +buildInfoTask := { + val file = (resourceManaged in Compile).value / "version.properties" + + // log where the properties will be written to + streams.value.log.info(s"Writing version info to $file...") + + // collect git versioning information + val branch = git.gitCurrentBranch.value + val lastCommit = git.gitHeadCommit.value + val describedVersion = git.gitDescribedVersion.value + val anyUncommittedChanges = git.gitUncommittedChanges.value + val remoteUrl = (scmInfo in ThisBuild).value.map(_.browseUrl.toString) + val buildDate = java.time.Instant.now + + // map properties + val properties = Map[String, String]( + "branch" -> branch, + "lastCommit" -> lastCommit.getOrElse(""), + "remoteUrl" -> remoteUrl.getOrElse(""), + "uncommittedChanges" -> anyUncommittedChanges.toString, + "buildDate" -> buildDate.toString + ) + + // build properties content + val contents = properties.toList.map { + case (key, value) if value.length > 0 => s"$key=$value" + case _ => "" + } + + // output the version information from git to versionInfo.properties + IO.write(file, contents.mkString("\n")) + Seq(file) +} + +// add the build info task output to resources +(resourceGenerators in Compile) += buildInfoTask.taskValue diff --git a/claussnitzerlab/project/build.properties b/claussnitzerlab/project/build.properties new file mode 100644 index 00000000..797e7ccf --- /dev/null +++ b/claussnitzerlab/project/build.properties @@ -0,0 +1 @@ +sbt.version=1.3.10 diff --git a/claussnitzerlab/project/plugins.sbt b/claussnitzerlab/project/plugins.sbt new file mode 100644 index 00000000..23d5057a --- /dev/null +++ b/claussnitzerlab/project/plugins.sbt @@ -0,0 +1 @@ +addSbtPlugin("com.typesafe.sbt" % "sbt-git" % "1.0.0") diff --git a/claussnitzerlab/src/main/resources/claussnitzerlabBootstrap.sh b/claussnitzerlab/src/main/resources/claussnitzerlabBootstrap.sh new file mode 100644 index 00000000..30f0fc98 --- /dev/null +++ b/claussnitzerlab/src/main/resources/claussnitzerlabBootstrap.sh @@ -0,0 +1,41 @@ +#!/bin/bash -xe + +# Bootstrap scripts can either be run as a... +# +# bootstrapScript +# bootstrapStep +# +# A bootstrap script is run while the machine is being provisioned by +# AWS, are run as a different user, and must complete within 60 minutes +# or the provisioning fails. This can be a good thing, as it prevents +# accidentally creating scripts that never terminate (e.g. waiting for +# user input). +# +# A bootstrap step is a "step" like any other job step. It can take as +# long as needed. It is run as the hadoop user and is run in the step's +# directory (e.g. /mnt/var/lib/hadoop/steps/s-123456789). +# +# Most of the time, it's best to user a bootstrap script and not step. + +#sudo yum groups mark convert +# +## check if GCC, make, etc. are installed already +#DEVTOOLS=$(sudo yum grouplist | grep 'Development Tools') +# +#if [ -z "$DEVTOOLS" ]; then +# sudo yum groupinstall -y 'Development Tools' +#fi + +WORK_DIR="/mnt/var/claussnitzerlab" +mkdir -p "${WORK_DIR}" + +# install the python libraries +sudo pip3 install torch==1.5.1 +sudo pip3 install twobitreader +sudo pip3 install numpy +sudo pip3 install sklearn + +# copy the basset python files and the model weights file +aws s3 cp s3://dig-analysis-data/bin/regionpytorch/nasa_labels.txt "${WORK_DIR}" +aws s3 cp s3://dig-analysis-data/bin/regionpytorch/hg19.2bit "${WORK_DIR}" +aws s3 cp s3://dig-analysis-data/bin/regionpytorch/nasa_ampt2d_cnn_900_best_p041.pth "${WORK_DIR}" \ No newline at end of file diff --git a/claussnitzerlab/src/main/resources/claussnitzerlabScript.sh b/claussnitzerlab/src/main/resources/claussnitzerlabScript.sh new file mode 100644 index 00000000..a24d63e4 --- /dev/null +++ b/claussnitzerlab/src/main/resources/claussnitzerlabScript.sh @@ -0,0 +1,48 @@ +#!/bin/bash -xe + +echo "JOB_BUCKET = ${JOB_BUCKET}" +echo "JOB_METHOD = ${JOB_METHOD}" +echo "JOB_STAGE = ${JOB_STAGE}" +echo "JOB_PREFIX = ${JOB_PREFIX}" +echo "JOB_DRYRUN = ${JOB_DRYRUN}" +# +# You can also pass command line arguments to the script from your stage. +# + +echo "Argument passed: $*" + +# set where the source and destination is in S3 and where VEP is +S3DIR="s3://dig-analysis-data/out" + +# get the name of the part file from the command line; set the output filename +PART=$(basename -- "$1") +OUTFILE="${PART%.*}claussnitzerlab.json" +WARNINGS="${OUTFILE}_warnings.txt" +WORK_DIR="/mnt/var/claussnitzerlab" + +# copy the part file from S3 to local +aws s3 cp "$S3DIR/varianteffect/common/$PART" "${WORK_DIR}" + +# copy the basset python files and the model weights file +aws s3 cp s3://dig-analysis-data/bin/regionpytorch/fullNasaScript.py "${WORK_DIR}" +aws s3 cp s3://dig-analysis-data/bin/regionpytorch/dcc_basset_lib.py "${WORK_DIR}" + +# cd to work directory +cd "${WORK_DIR}" + +# run pytorch script +python3 fullNasaScript.py -i "$PART" -b 100 -o "$OUTFILE" + +# copy the output of VEP back to S3 +aws s3 cp "$OUTFILE" "$S3DIR/regionpytorch/claussnitzerlab/$OUTFILE" + +# delete the input and output files; keep the cluster clean +rm "$PART" +rm "$OUTFILE" + +# check for a warnings file, upload that, too and then delete it +if [ -e "$WARNINGS" ]; then + aws s3 cp "$WARNINGS" "$S3DIR/regionpytorch/claussnitzerlab/warnings/$WARNINGS" + rm "$WARNINGS" +fi + diff --git a/claussnitzerlab/src/main/resources/dcc_basset_lib.py b/claussnitzerlab/src/main/resources/dcc_basset_lib.py new file mode 100644 index 00000000..53e7b90e --- /dev/null +++ b/claussnitzerlab/src/main/resources/dcc_basset_lib.py @@ -0,0 +1,543 @@ +# imports +import twobitreader +from twobitreader import TwoBitFile +import numpy as np +import torch +from torch import nn +from sklearn.preprocessing import OneHotEncoder +import csv +import re +import json + +print("have pytorch version {}".format(torch.__version__)) +print("have numpy version {}".format(np.__version__)) + +# add in model classes +class LambdaBase(nn.Sequential): + def __init__(self, fn, *args): + super(LambdaBase, self).__init__(*args) + self.lambda_func = fn + + def forward_prepare(self, input): + output = [] + for module in self._modules.values(): + output.append(module(input)) + return output if output else input + +class Lambda(LambdaBase): + def forward(self, input): + return self.lambda_func(self.forward_prepare(input)) + + +# method to create string sequence from position +def get_genomic_sequence(position, offset, chromosome, alt_allele=None): + if alt_allele is not None: + sequence = chromosome[position - offset : position - 1] + alt_allele + chromosome[position : position + offset] + else: + sequence = chromosome[position - offset: position + offset] + + # trim to size + if len(sequence) > offset * 2: + sequence = sequence[:offset * 2] + + # fail if too big + # TODO - fix to shift + # TODO - also fix if start is negative, shift + if len(sequence) < offset * 2: + print("the position {} and chrom {} have a sequence less than {}".format(position, chromosome, offset * 2)) + raise ValueError() + + # return + return sequence.upper() + +def get_ref_alt_sequences(position, offset, chromosome, alt_allele=None): + ref_sequence = get_genomic_sequence(position, offset, chromosome) + alt_sequence = get_genomic_sequence(position, offset, chromosome, alt_allele) + + return ref_sequence, alt_sequence + +def get_input_np_array(sequence_list): + sequence_np = None + for seq in sequence_list: + if sequence_np is None: + sequence_np = np.array(list(seq)) + else: + sequence_np = np.vstack((sequence_np, np.array(list(seq)))) + + # return + return sequence_np + +def get_one_hot_sequence_array(sequence_list): + # get the numpy sequence + sequence_np = get_input_np_array(sequence_list) + + # use the numpy utility to replace the letters by numbers + sequence_np[sequence_np == 'A'] = 0 + sequence_np[sequence_np == 'C'] = 1 + sequence_np[sequence_np == 'G'] = 2 + sequence_np[sequence_np == 'T'] = 3 + + # convert to ints + try: + sequence_np = sequence_np.astype(np.int) + except: + raise ValueError("got error for sequence \n{}".format(sequence_np)) + + # one hot the sequence + number_classes = 4 + sequence_np = np.eye(number_classes)[sequence_np] + + # return + return sequence_np + +def get_variant_list(file): + variants = [] + with open(file, 'r') as variant_file: + + # read all the next rows + for line in variant_file: + row = json.loads(line) + # print(row) + variants.append(row['varId']) + + + # print the first 10 variants + # for index in range(1, 10): + # print("got variant: {}".format(variants[index])) + + # return + return variants + +def load_beluga_model(weights_file, should_log=True): + # load the weights + state_dict = torch.load(weights_file) + pretrained_model_reloaded_th = load_beluga_model_from_state_dict(state_dict, should_log) + + # return + return pretrained_model_reloaded_th + +def load_basset_model(weights_file, should_log=True): + # load the weights + state_dict = torch.load(weights_file) + pretrained_model_reloaded_th = load_basset_model_from_state_dict(state_dict, should_log) + + # return + return pretrained_model_reloaded_th + +def load_nasa_model(weights_file, should_log=True): + # load the weights + state_dict = torch.load(weights_file) + pretrained_model_reloaded_th = load_nasa_model_from_state_dict(state_dict, should_log) + + # return + return pretrained_model_reloaded_th + +def load_beluga_model_from_state_dict(state_dict, should_log=True): + # load the DeepSEA Beluga model + pretrained_model = nn.Sequential( + nn.Sequential( + nn.Conv2d(4, 320, (1, 8)), + nn.ReLU(), + nn.Conv2d(320, 320, (1, 8)), + nn.ReLU(), + nn.Dropout(0.2), + nn.MaxPool2d((1, 4), (1, 4)), + nn.Conv2d(320, 480, (1, 8)), + nn.ReLU(), + nn.Conv2d(480, 480, (1, 8)), + nn.ReLU(), + nn.Dropout(0.2), + nn.MaxPool2d((1, 4), (1, 4)), + nn.Conv2d(480, 640, (1, 8)), + nn.ReLU(), + nn.Conv2d(640, 640, (1, 8)), + nn.ReLU(), + ), + nn.Sequential( + nn.Dropout(0.5), + Lambda(lambda x: x.view(x.size(0), -1)), + nn.Sequential(Lambda(lambda x: x.view(1, -1) if 1 == len(x.size()) else x), nn.Linear(67840, 2003)), + nn.ReLU(), + nn.Sequential(Lambda(lambda x: x.view(1, -1) if 1 == len(x.size()) else x), nn.Linear(2003, 2002)), + ), + nn.Sigmoid(), + ) + + # print + if should_log: + print("got DeepSEA Beluga model of type {}".format(type(pretrained_model))) + + # load the weights + pretrained_model.load_state_dict(state_dict) + + # return + return pretrained_model + +def load_basset_model_from_state_dict(state_dict, should_log=True): + # load the Basset model + pretrained_model_reloaded_th = nn.Sequential( # Sequential, + nn.Conv2d(4,300,(19, 1)), + nn.BatchNorm2d(300), + nn.ReLU(), + nn.MaxPool2d((3, 1),(3, 1)), + nn.Conv2d(300,200,(11, 1)), + nn.BatchNorm2d(200), + nn.ReLU(), + nn.MaxPool2d((4, 1),(4, 1)), + nn.Conv2d(200,200,(7, 1)), + nn.BatchNorm2d(200), + nn.ReLU(), + nn.MaxPool2d((4, 1),(4, 1)), + Lambda(lambda x: x.view(x.size(0),-1)), # Reshape, + nn.Sequential(Lambda(lambda x: x.view(1,-1) if 1==len(x.size()) else x ),nn.Linear(2000,1000)), # Linear, + nn.BatchNorm1d(1000,1e-05,0.1,True),#BatchNorm1d, + nn.ReLU(), + nn.Dropout(0.3), + nn.Sequential(Lambda(lambda x: x.view(1,-1) if 1==len(x.size()) else x ),nn.Linear(1000,1000)), # Linear, + nn.BatchNorm1d(1000,1e-05,0.1,True),#BatchNorm1d, + nn.ReLU(), + nn.Dropout(0.3), + nn.Sequential(Lambda(lambda x: x.view(1,-1) if 1==len(x.size()) else x ),nn.Linear(1000,164)), # Linear, + nn.Sigmoid(), + ) + + # print + if should_log: + print("got Basset model of type {}".format(type(pretrained_model_reloaded_th))) + + # load the weights + pretrained_model_reloaded_th.load_state_dict(state_dict) + + # return + return pretrained_model_reloaded_th + +def load_nasa_model_from_state_dict(state_dict, should_log=True): + # load the Basset model + pretrained_model_reloaded_th = nn.Sequential( # Sequential, + nn.Conv2d(4,400,(21, 1),(1, 1),(10, 0)), + nn.BatchNorm2d(400), + nn.ReLU(), # NO WEIGHTS + nn.MaxPool2d((3, 1),(3, 1),(0, 0),ceil_mode=True), # NO WEIGHTS + nn.Conv2d(400,300,(11, 1),(1, 1),(5, 0)), + nn.BatchNorm2d(300), + nn.ReLU(), # NO WEIGHTS + nn.MaxPool2d((4, 1),(4, 1),(0, 0),ceil_mode=True), # NO WEIGHTS + nn.Conv2d(300,300,(7, 1),(1, 1),(3, 0)), + nn.BatchNorm2d(300), + nn.ReLU(), # NO WEIGHTS + nn.MaxPool2d((4, 1),(4, 1),(0, 0),ceil_mode=True), # NO WEIGHTS + nn.Conv2d(300,300,(5, 1),(1, 1),(2, 0)), + nn.BatchNorm2d(300), + nn.ReLU(), # NO WEIGHTS + nn.MaxPool2d((4, 1),(4, 1),(0, 0),ceil_mode=True), # NO WEIGHTS + Lambda(lambda x: x.view(x.size(0),-1)), # Reshape, # NO WEIGHTS + nn.Sequential( + Lambda(lambda x: x.view(1,-1) if 1==len(x.size()) else x ), # NO WEIGHTS + nn.Linear(1500,1024)), # Linear, + nn.BatchNorm1d(1024,1e-05,0.1,True), #BatchNorm1d, + nn.ReLU(), # NO WEIGHTS + nn.Dropout(0.3), # NO WEIGHTS + nn.Sequential( + Lambda(lambda x: x.view(1,-1) if 1==len(x.size()) else x ), # NO WEIGHTS + nn.Linear(1024,512)), # Linear, + nn.BatchNorm1d(512,1e-05,0.1,True),#BatchNorm1d, + nn.ReLU(), # NO WEIGHTS + nn.Dropout(0.3), # NO WEIGHTS + nn.Sequential( + Lambda(lambda x: x.view(1,-1) if 1==len(x.size()) else x ), # NO WEIGHTS + nn.Linear(512,167)), # Linear, + nn.Sigmoid(), # NO WEIGHTS + ) + # pretrained_model_reloaded_th = nn.Sequential( # Sequential, + # nn.Conv2d(4,400,(21, 1)), + # nn.BatchNorm2d(400), + # nn.ReLU(), + # nn.MaxPool2d((3, 1),(3, 1)), + # nn.Conv2d(400,300,(11, 1)), + # nn.BatchNorm2d(300), + # nn.ReLU(), + # nn.MaxPool2d((4, 1),(4, 1)), + # nn.Conv2d(300,300,(7, 1)), + # nn.BatchNorm2d(300), + # nn.ReLU(), + # nn.MaxPool2d((4, 1),(4, 1)), + # nn.Conv2d(300,300,(5, 1)), + # nn.BatchNorm2d(200), + # nn.ReLU(), + # nn.MaxPool2d((4, 1),(4, 1)), + # Lambda(lambda x: x.view(x.size(0),-1)), # Reshape, + # nn.Sequential(Lambda(lambda x: x.view(1,-1) if 1==len(x.size()) else x ),nn.Linear(1500,1024)), # Linear, + # nn.BatchNorm1d(1024,1e-05,0.1,True),#BatchNorm1d, + # nn.ReLU(), + # nn.Dropout(0.3), + # nn.Sequential(Lambda(lambda x: x.view(1,-1) if 1==len(x.size()) else x ),nn.Linear(1024,512)), # Linear, + # nn.BatchNorm1d(1000,1e-05,0.1,True),#BatchNorm1d, + # nn.ReLU(), + # nn.Dropout(0.3), + # nn.Sequential(Lambda(lambda x: x.view(1,-1) if 1==len(x.size()) else x ),nn.Linear(512,167)), # Linear, + # nn.Sigmoid(), + # ) + + # print + if should_log: + print("got Nasa SA model of type {}".format(type(pretrained_model_reloaded_th))) + + # load the weights + if state_dict is not None: + pretrained_model_reloaded_th.load_state_dict(state_dict) + + # return + return pretrained_model_reloaded_th + +def generate_input_tensor(variant_list, file_twobit): + # load the chromosome data + # get the genome file + hg19 = TwoBitFile(file_twobit) + + # loop for through the variants + # for variant in variant_list: + + # get the chrom + chromosome = hg19['chr11'] + position = 95311422 + + # load the data + ref_sequence, alt_sequence = dcc_basset_lib.get_ref_alt_sequences(position, 300, chromosome, 'C') + + print("got ref sequence one hot of type {} and shape {}".format(type(ref_sequence), len(ref_sequence))) + print("got alt sequence one hot of type {} and shape {}".format(type(alt_sequence), len(alt_sequence))) + + # build list and transform into input + sequence_list = [] + # sequence_list.append(ref_sequence) + sequence_list.append(ref_sequence) + # sequence_list.append(alt_sequence) + sequence_list.append(alt_sequence) + + print(alt_sequence) + + # get the np array of right shape + sequence_one_hot = dcc_basset_lib.get_one_hot_sequence_array(sequence_list) + print("got sequence one hot of type {} and shape {}".format(type(sequence_one_hot), sequence_one_hot.shape)) + # print(sequence_one_hot) + + # create a pytorch tensor + tensor = torch.from_numpy(sequence_one_hot) + + print("got pytorch tensor with type {} and shape {} and data type \n{}".format(type(tensor), tensor.shape, tensor.dtype)) + + # build the input tensor + tensor_initial = torch.unsqueeze(tensor, 3) + tensor_input = tensor_initial.permute(0, 2, 1, 3) + tensor_input = tensor_input.to(torch.float) + +def split_variant(variant): + pieces = variant.split(":") + return pieces + +def get_result_map(variant_list, result_tensor, label_list, debug = False): + '''method to take a variant list, labels list and ML model result tensor + and create a list of maps of the result for each variant''' + + # check that the dimensions match + # should have tensor 2x as big as variant list (one result for ref, one for alt) + if 2 * len(variant_list) != result_tensor.shape[0]: + raise Exception("the result tensor should have 2x as many rows as the variant list (not {} tensor and {} variants)".format(result_tensor.shape[0], len(variant_list))) + # the tensor results columns should match the size of the labels + if len(label_list) != result_tensor.shape[1]: + raise Exception("the result tensor should have as many columns as the label list (not {} tensor and {} labels)".format(result_tensor.shape[0]), len(label_list)) + + # build the result list + result_list = [] + + # loop through the variants and add the resulting aggregated value in for each tissue + for index, variant in enumerate(variant_list): + result_map = {'var_id': variant} + + # calculate the aggregated result value + # get the absolute value of the difference + tensor_abs = torch.abs(result_tensor[index * 2] - result_tensor[index * 2 + 1]) + + if debug: + print("for variant {} got aggregated tensor \n{}".format(variant, tensor_abs)) + + # add in the result for each tissue + for index in range(0, len(label_list)): + result_map[label_list[index]] = tensor_abs[index].item() + + # add the result to the list + result_list.append(result_map) + + # return + return result_list + +def get_input_tensor_from_variant_list(variant_list, genome_lookup, region_size, debug = False): + ''' + method to return the ML model input vectorcorresponding to the variant list + ''' + + # keep track of variants that return odd sequences + variants_to_remove_list = [] + + # list used to build the tensor + sequence_list = [] + + # loop through the variants + for variant in variant_list: + # split the variant + variant_pieces = split_variant(variant) + chrom = variant_pieces[0] + position = int(variant_pieces[1]) + alt_allele = variant_pieces[3] + + try: + # get the chrom + chromosome_lookup = genome_lookup['chr' + chrom] + + # load the data + ref_sequence, alt_sequence = get_ref_alt_sequences(position, int(region_size/2), chromosome_lookup, alt_allele) + + # verify the letters in the sequence + for test_sequence in (ref_sequence, alt_allele): + if not re.match("^[ACGT]*$", test_sequence): + # get letter N for unseuqnced regions; throw them out + # if re.match("[N].", test_sequence): + if debug: + print("for variant {} got incorrect letter in sequence \n{}".format(variant, test_sequence)) + raise ValueError() + except: + variants_to_remove_list.append(variant) + continue + # else: + # raise ValueError("for variant {} got incorrect letter in sequence \n{}".format(variant, test_sequence)) + + # debug + if debug: + if len(alt_sequence) > region_size: + print("Got long sequence for variant {}".format(variant)) + print("got ref sequence one hot of type {} and shape {}".format(type(ref_sequence), len(ref_sequence))) + print("got alt sequence one hot of type {} and shape {}".format(type(alt_sequence), len(alt_sequence))) + + # append the two new regions to the sequence list + sequence_list.append(ref_sequence) + sequence_list.append(alt_sequence) + + # debug + if debug: + for index, seq in enumerate(sequence_list): + print("({}) has size {}".format(index, len(seq))) + + # get the np array of right shape once all the variants have been added in + sequence_one_hot = get_one_hot_sequence_array(sequence_list) + if debug: + print("got sequence one hot of type {} and shape {}".format(type(sequence_one_hot), sequence_one_hot.shape)) + # print(sequence_one_hot) + + # create a pytorch tensor + tensor = torch.from_numpy(sequence_one_hot) + + if debug: + print("got pytorch tensor with type {} and shape {} and data type \n{}".format(type(tensor), tensor.shape, tensor.dtype)) + + # build the input tensor + tensor_initial = torch.unsqueeze(tensor, 3) + tensor_input = tensor_initial.permute(0, 2, 1, 3) + tensor_input = tensor_input.to(torch.float) + + if debug: + print("got transposed pytorch tensor with type {} and shape {} and data type \n{}".format(type(tensor_input), tensor_input.shape, tensor_input.dtype)) + + # create updated list to return + updated_variant_list = [variant for variant in variant_list if variant not in variants_to_remove_list] + + # return + return updated_variant_list, tensor_input + +if __name__ == '__main__': + # set the data dir + # dir_data = "/Users/mduby/Data/Broad/" + dir_data = "/home/javaprog/Data/Broad/" + + # file locations + file_input = dir_data + "Magma/Common/part-00011-6a21a67f-59b3-4792-b9b2-7f99deea6b5a-c000.csv" + file_model_weights = dir_data + 'Basset/Model/pretrained_model_reloaded_th.pth' + file_twobit = dir_data + 'Basset/TwoBitReader/hg19.2bit' + + + # get the genome file + hg19 = TwoBitFile(dir_data + 'Basset/TwoBitReader/hg19.2bit') + + print("two bit file of type {}".format(type(hg19))) + + # get the chrom + chromosome = hg19['chr11'] + position = 95311422 + # chromosome = hg19['chr8'] + # position = 118184783 + + print("two bit chromosome of type {}".format(type(chromosome))) + + # get the regular sequence + ref_sequence = get_genomic_sequence(position, 3, chromosome) + print("got ref sequence: {}".format(ref_sequence)) + + # get the allele sequence + alt_sequence = get_genomic_sequence(position, 3, chromosome, 'C') + print("got alt sequence: {}".format(alt_sequence)) + print() + + # get ref and alt sequences + ref_sequence, alt_sequence = get_ref_alt_sequences(position, 3, chromosome, 'C') + print("got ref sequence: {}".format(ref_sequence)) + print("got alt sequence: {}".format(alt_sequence)) + print() + + sequence_list = [] + sequence_list.append(ref_sequence) + sequence_list.append(alt_sequence) + print("input sequence list {}".format(sequence_list)) + print() + + sequence_numpy = get_input_np_array(sequence_list) + print("got sequence input of type {} and shape {} of\n{}".format(type(sequence_numpy), sequence_numpy.shape, sequence_numpy)) + print() + + sequence_one_hot = get_one_hot_sequence_array(sequence_list) + print("got sequence one hot of type {} and shape {} of\n{}".format(type(sequence_one_hot), sequence_one_hot.shape, sequence_one_hot)) + print() + + # read the variant file + # variant_file = dir_data + 'Magma/Common/part-00011-6a21a67f-59b3-4792-b9b2-7f99deea6b5a-c000.csv' + variant_file = dir_data + 'dig-analysis-data/out/varianteffect/common/part-00000-24063efa-89ff-412d-9134-8cd90f09380b-c000.json' + variant_list = get_variant_list(variant_file) + for index in range(1, 10): + print("got variant: {}".format(variant_list[index])) + + # # load the pytorch model + # pretrained_model_reloaded_th = load_basset_model(file_model_weights) + # pretrained_model_reloaded_th.eval() + + # # better summary of the model + # print(pretrained_model_reloaded_th) + + # split a variant + variant_id = "1:65359821:G:A" + variant_pieces = split_variant(variant_id) + print("got variant pieces: {}".format(variant_pieces)) + + + # test the reuls aggregating + label_list = ['red', 'green', 'blue', 'yellow'] + variant_list = ['var1', 'var2', 'var3'] + result_tensor = torch.ones(6, 4) + test_list = get_result_map(variant_list, result_tensor, label_list, True) + print("for result aggregation test got: {}".format(test_list)) + + # test the input tensor creation + variant_list = ['1:65359821:G:A', '3:7359821:G:C', '8:3359821:G:T'] + variant_list, sequence_results = get_input_tensor_from_variant_list(variant_list, hg19, 20, True) + print("got input tensor of type {} and shape {} and data \n{}".format(type(sequence_results), sequence_results.shape, sequence_results)) + + # test for letter Ns + variant_list.append("20:26319418:A:G") + variant_list, test_results = get_input_tensor_from_variant_list(variant_list, hg19, 600, False) diff --git a/claussnitzerlab/src/main/resources/fullNasaScript.py b/claussnitzerlab/src/main/resources/fullNasaScript.py new file mode 100644 index 00000000..d1d96ec5 --- /dev/null +++ b/claussnitzerlab/src/main/resources/fullNasaScript.py @@ -0,0 +1,144 @@ +# copied from github below for use in my project at work +# https://github.com/kipoi/models/blob/master/Basset/pretrained_model_reloaded_th.py +# see paper at +# http://kipoi.org/models/Basset/ + +# imports +import torch +from torch import nn +import twobitreader +from twobitreader import TwoBitFile +import time +import argparse +import json + +print("got pytorch version of {}".format(torch.__version__)) + +# set the code and data directories +dir_code = "" +dir_data = "" +# dir_code = "/Users/mduby/Code/WorkspacePython/" +# dir_data = "/Users/mduby/Data/Broad/" +# dir_code = "/home/javaprog/Code/PythonWorkspace/" +# dir_data = "/home/javaprog/Data/Broad/" + +# import relative libraries +import sys +sys.path.insert(0, dir_code + 'MachineLearningPython/DccKP/Basset/') +import dcc_basset_lib + +# file input +file_input = dir_data + "Magma/Common/part-00011-6a21a67f-59b3-4792-b9b2-7f99deea6b5a-c000.csv" +file_twobit = dir_data + 'hg19.2bit' +# file_output = dir_data + "Basset/Out/basset_part-00011-6a21a67f-59b3-4792-b9b2-7f99deea6b5a-c000.csv" +# file_model_weights = dir_data + 'Basset/Production/basset_pretrained_model_reloaded.pth' +# labels_file = dir_data + '/Basset/Production/basset_labels.txt' +file_output = dir_data + "Basset/Out/nasa_part-00011-6a21a67f-59b3-4792-b9b2-7f99deea6b5a-c000.csv" +file_model_weights = dir_data + 'nasa_ampt2d_cnn_900_best_p041.pth' +labels_file = dir_data + 'nasa_labels.txt' + +# set the region size +region_size = 900 + +# chunk_size = 1000 # 20s, 153 chunks - so 50 mins per file, 200 x 50 = 10,000 mins on PC +batch_size = 20 + +# read in the passed in file if any +# configure argparser +parser = argparse.ArgumentParser("script to run the Basset PyTorch model on DCC a variants file") +# add the arguments +parser.add_argument('-i', '--input_file', help='the file to process', default=file_input, required=True) +parser.add_argument('-o', '--output_file', help='the file to save the results to', default=file_output, required=True) +parser.add_argument('-b', '--batch', help='the batch size to process', default=batch_size, required=False) +# get the args +args = vars(parser.parse_args()) +if args['input_file'] is not None: + file_input = args['input_file'] +if args['output_file'] is not None: + file_output = args['output_file'] +if args['batch'] is not None: + batch_size = int(args['batch']) +print("using variant input file {} and resule output file {} with batch size {}".format(file_input, file_output, batch_size)) + +# open the label file +with open(labels_file) as f: + labels_list = [line.strip() for line in f.readlines()] + +# load the chromosome data +# get the genome file +hg19 = TwoBitFile(file_twobit) +print("two bit file of type {}".format(type(hg19))) + +# LOAD THE MODEL +# load the weights +# pretrained_model_reloaded_th = dcc_basset_lib.load_basset_model(file_model_weights) +pretrained_model_reloaded_th = dcc_basset_lib.load_nasa_model(file_model_weights) + +# make the model eval +pretrained_model_reloaded_th.eval() + +# better summary +print(pretrained_model_reloaded_th) + +# LOAD THE INPUTS +# load the list of variants +variant_list = dcc_basset_lib.get_variant_list(file_input) +print("got variant list of size {}".format(len(variant_list))) + +# split into chunks +# chunk_size = 2000 +chunks = [variant_list[x : x+batch_size] for x in range(0, len(variant_list), batch_size)] +print("got chunk list of size {} and type {}".format(len(chunks), type(chunks))) + +# loop through chunks +main_start_time = time.perf_counter() +final_results = [] +for chunk_index in range(0, len(chunks)): +# for chunk_index in range(6, 9): + variant_list = chunks[chunk_index] + + # get start time + start_time = time.perf_counter() + + # get the sequence input for the first chunk + # variant_list = chunks[0] + variant_list, tensor_input = dcc_basset_lib.get_input_tensor_from_variant_list(variant_list, hg19, region_size, False) + + # get end time + end_time = time.perf_counter() + print("({}) generated input tensor of shape {} in {:0.4}s".format(chunk_index, tensor_input.shape, end_time - start_time)) + + # get start time + start_time = time.perf_counter() + + # run the model predictions + pretrained_model_reloaded_th.eval() + predictions = pretrained_model_reloaded_th(tensor_input) + + # get end time + end_time = time.perf_counter() + print("generated predictions tensor of shape {} in {:0.4}s".format(predictions.shape, end_time - start_time)) + + # get start time + start_time = time.perf_counter() + + # get the result map + result_list = dcc_basset_lib.get_result_map(variant_list, predictions, labels_list) + final_results.extend(result_list) + # print("got result list {}".format(result_list)) + + # get end time + end_time = time.perf_counter() + print("got result list of size {} in time {:0.4f}s".format(len(result_list), end_time - start_time)) + +# end +main_end_time = time.perf_counter() +print("got final results of size {} in time {:0.4f}".format(len(final_results), main_end_time - main_start_time)) + +# write out the output +with open(file_output, 'w') as out_file: + out = json.dumps(final_results) + out_file.write(out) + print("got final results file {}".format(file_output)) + + diff --git a/claussnitzerlab/src/main/resources/nasa_ampt2d_cnn_900_best_p041.pth b/claussnitzerlab/src/main/resources/nasa_ampt2d_cnn_900_best_p041.pth new file mode 100644 index 00000000..621aad20 Binary files /dev/null and b/claussnitzerlab/src/main/resources/nasa_ampt2d_cnn_900_best_p041.pth differ diff --git a/claussnitzerlab/src/main/resources/nasa_labels.txt b/claussnitzerlab/src/main/resources/nasa_labels.txt new file mode 100644 index 00000000..f4bc0d1b --- /dev/null +++ b/claussnitzerlab/src/main/resources/nasa_labels.txt @@ -0,0 +1,167 @@ +A549_treated_with_ethanol_0.02_percent_for_1_hour-Roadmap +A673-ENCODE +Adipocyte-Claussnitzer_d0 +Adipocyte-Claussnitzer_d14 +Adipocyte-Claussnitzer_d2 +Adipocyte-Claussnitzer_d6 +adipose_tissue-ENCODE +adrenal_gland-ENCODE +adrenal_gland_fetal-ENCODE +adrenal_gland_fetal-Roadmap +astrocyte-ENCODE +astrocyte-Roadmap +B_cell-ENCODE +bipolar_neuron_from_iPSC-ENCODE +BJAB_anti-IgM_anti-CD40_4hr-Engreitz +BJAB-Engreitz +body_of_pancreas-ENCODE +breast_epithelium-ENCODE +brite_adipose-Loft2014 +cardiac_muscle_cell-ENCODE +CD14-positive_monocyte-ENCODE +CD14-positive_monocyte-Novakovic2016 +CD14-positive_monocyte-Roadmap +CD14-positive_monocytes-Roadmap +CD14-positive_monocyte_treated_with_BG_1h-Novakovic2016 +CD14-positive_monocyte_treated_with_BG_4h-Novakovic2016 +CD14-positive_monocyte_treated_with_BG_d1-Novakovic2016 +CD14-positive_monocyte_treated_with_BG_d6-Novakovic2016 +CD14-positive_monocyte_treated_with_LPS_1h-Novakovic2016 +CD14-positive_monocyte_treated_with_LPS_4h-Novakovic2016 +CD14-positive_monocyte_treated_with_LPS_d1-Novakovic2016 +CD14-positive_monocyte_treated_with_LPS_d6-Novakovic2016 +CD14-positive_monocyte_treated_with_RPMI_1h-Novakovic2016 +CD14-positive_monocyte_treated_with_RPMI_4h-Novakovic2016 +CD14-positive_monocyte_treated_with_RPMI_d1-Novakovic2016 +CD14-positive_monocyte_treated_with_RPMI_d6-Novakovic2016 +CD19-positive_B_cell-Roadmap +CD34-positive_mobilized-Roadmap +CD3-positive_T_cell-Roadmap +CD4-positive_helper_T_cell-Corces2016 +CD4-positive_helper_T_cell-ENCODE +CD56-positive_natural_killer_cells-Roadmap +CD8-positive_alpha-beta_T_cell-Corces2016 +CD8-positive_alpha-beta_T_cell-ENCODE +coronary_artery-ENCODE +coronary_artery_smooth_muscle_cell-Miller2016 +dendritic_cell_treated_with_Lipopolysaccharide_0_ng-mL_for_0_hour-Garber2017 +dendritic_cell_treated_with_Lipopolysaccharide_100_ng-mL_for_1_hour-Garber2017 +dendritic_cell_treated_with_Lipopolysaccharide_100_ng-mL_for_2_hour-Garber2017 +dendritic_cell_treated_with_Lipopolysaccharide_100_ng-mL_for_30_minute-Garber2017 +dendritic_cell_treated_with_Lipopolysaccharide_100_ng-mL_for_4_hour-Garber2017 +dendritic_cell_treated_with_Lipopolysaccharide_100_ng-mL_for_6_hour-Garber2017 +eahy926 +endothelial_cell_of_umbilical_vein-Roadmap +endothelial_cell_of_umbilical_vein_VEGF_stim_12_hours-Zhang2013 +endothelial_cell_of_umbilical_vein_VEGF_stim_4_hours-Zhang2013 +epithelial_cell_of_prostate-ENCODE +erythroblast-Corces2016 +fibroblast_of_arm-ENCODE +fibroblast_of_dermis-ENCODE +fibroblast_of_dermis-Roadmap +fibroblast_of_lung_fetal-ENCODE +fibroblast_of_lung-Roadmap +foreskin_fibroblast-Roadmap +gastrocnemius_medialis-ENCODE +GM12878-ENCODE +GM12878-Roadmap +H1_BMP4_Derived_Mesendoderm_Cultured_Cells-Roadmap +H1_BMP4_Derived_Trophoblast_Cultured_Cells-Roadmap +H1_Derived_Mesenchymal_Stem_Cells-Roadmap +H1_Derived_Neuronal_Progenitor_Cultured_Cells-Roadmap +H1-hESC-ENCODE +H1-hESC-Roadmap +H7 +H9-ENCODE +H9-Roadmap +HAP1 +HCT116-AID_ENCODE_DHS_Auxin6hr +HCT116-AID_ENCODE_DHS_untreated +HCT116-ENCODE +heart_ventricle-ENCODE +HeLa-S3-ENCODE +HeLa-S3-Roadmap +hepatocyte-ENCODE +HepG2-Roadmap +HT29 +IMR90-ENCODE +IMR90-Roadmap +induced_pluripotent_stem_cell-ENCODE +iPS_DF_19.11_Cell_Line-Roadmap +iPS_DF_19.11-ENCODE +Jurkat_anti-CD3_PMA_4hr-Engreitz +Jurkat-Engreitz +K562-Roadmap +Karpas-422-ENCODE +keratinocyte-ENCODE +keratinocyte-Roadmap +large_intestine_fetal-ENCODE +large_intestine_fetal-Roadmap +liver-ENCODE +LNCAP +LoVo +mammary_epithelial_cell-ENCODE +mammary_epithelial_cell-Roadmap +MCF10A-Ji2017 +MCF10A_treated_with_TAM24hr-Ji2017 +MCF-7-ENCODE +MDA-MB-231 +megakaryocyte-erythroid_progenitor-Corces2016 +MM.1S-ENCODE +muscle_of_leg_fetal-ENCODE +muscle_of_leg_fetal-Roadmap +muscle_of_trunk_fetal-ENCODE +muscle_of_trunk_fetal-Roadmap +myotube-ENCODE +myotube_originated_from_skeletal_muscle_myoblast-Roadmap +natural_killer_cell-Corces2016 +NCCIT +OCI-LY7-ENCODE +osteoblast-ENCODE +ovary-Roadmap +Panc1-ENCODE +pancreas-Roadmap +pancreatic-acinar-cell-AMP +pancreatic-alpha-cell-AMP +pancreatic-beta-cell-AMP +pancreatic-ductal-cell-AMP +PC-9-ENCODE +placenta-ENCODE +placenta-Roadmap +psoas_muscle-Roadmap +sigmoid_colon-ENCODE +skeletal_muscle_myoblast-Roadmap +SK-N-SH-ENCODE +small_intestine_fetal-ENCODE +small_intestine_fetal-Roadmap +spinal_cord_fetal-ENCODE +spleen-ENCODE +stomach-ENCODE +stomach_fetal-ENCODE +stomach_fetal-Roadmap +stomach-Roadmap +T-cell-ENCODE +teloHAEC +THP1-Engreitz +THP1_LPS_4hr-Engreitz +THP-1_macrophage-VanBortle2017 +THP-1_monocyte-VanBortle2017 +THP_pmaLPS_ATAC_0h +THP_pmaLPS_ATAC_120h +THP_pmaLPS_ATAC_12h +THP_pmaLPS_ATAC_1h +THP_pmaLPS_ATAC_24h +THP_pmaLPS_ATAC_2h +THP_pmaLPS_ATAC_48h +THP_pmaLPS_ATAC_6h +THP_pmaLPS_ATAC_72h +THP_pmaLPS_ATAC_96h +thymus_fetal-ENCODE +thymus_fetal-Roadmap +thyroid_gland-ENCODE +transverse_colon-ENCODE +trophoblast_cell-ENCODE +U937-Engreitz +U937_LPS_4hr-Engreitz +uterus-ENCODE +white_adipose-Loft2014 diff --git a/claussnitzerlab/src/main/resources/sampleBootstrap.sh b/claussnitzerlab/src/main/resources/sampleBootstrap.sh new file mode 100644 index 00000000..4497a6f0 --- /dev/null +++ b/claussnitzerlab/src/main/resources/sampleBootstrap.sh @@ -0,0 +1,27 @@ +#!/bin/bash -xe + +# Bootstrap scripts can either be run as a... +# +# bootstrapScript +# bootstrapStep +# +# A bootstrap script is run while the machine is being provisioned by +# AWS, are run as a different user, and must complete within 60 minutes +# or the provisioning fails. This can be a good thing, as it prevents +# accidentally creating scripts that never terminate (e.g. waiting for +# user input). +# +# A bootstrap step is a "step" like any other job step. It can take as +# long as needed. It is run as the hadoop user and is run in the step's +# directory (e.g. /mnt/var/lib/hadoop/steps/s-123456789). +# +# Most of the time, it's best to user a bootstrap script and not step. + +#sudo yum groups mark convert +# +## check if GCC, make, etc. are installed already +#DEVTOOLS=$(sudo yum grouplist | grep 'Development Tools') +# +#if [ -z "$DEVTOOLS" ]; then +# sudo yum groupinstall -y 'Development Tools' +#fi diff --git a/claussnitzerlab/src/main/resources/sampleScript.sh b/claussnitzerlab/src/main/resources/sampleScript.sh new file mode 100644 index 00000000..e9cc4d3a --- /dev/null +++ b/claussnitzerlab/src/main/resources/sampleScript.sh @@ -0,0 +1,24 @@ +#!/bin/bash -xe + +echo "JOB_BUCKET = ${JOB_BUCKET}" +echo "JOB_METHOD = ${JOB_METHOD}" +echo "JOB_STAGE = ${JOB_STAGE}" +echo "JOB_PREFIX = ${JOB_PREFIX}" + +# +# You can also pass command line arguments to the script from your stage. +# + +echo "Argument passed: $*" + +# +# You have access to the AWS CLI to copy/read data from S3. +# + +aws s3 ls "${JOB_BUCKET}/out/metaanalysis/trans-ethnic/$1/" --recursive + +# +# You can also use the hadoop command. +# + +hadoop fs -ls "${JOB_BUCKET}/out/metaanalysis/trans-ethnic/$1/*" diff --git a/claussnitzerlab/src/main/resources/sampleSparkJob.py b/claussnitzerlab/src/main/resources/sampleSparkJob.py new file mode 100644 index 00000000..f0bbad3b --- /dev/null +++ b/claussnitzerlab/src/main/resources/sampleSparkJob.py @@ -0,0 +1,57 @@ +from argparse import ArgumentParser +from os import getenv +from platform import python_version +from pyspark.sql import SparkSession + + +def main(): + """ + Arguments: phenotype + """ + print(f'PYTHON VERSION = {python_version()}') + print(f'USER = {getenv("USER")}') + + # + # Job steps can pass arguments to the scripts. + # + + # build argument + opts = ArgumentParser() + opts.add_argument('phenotype') + + # args.phenotype will be set + args = opts.parse_args() + + # should show args.phenotype + print(args) + + # + # All these environment variables will be set for you. + # + + bucket = getenv('JOB_BUCKET') # e.g. s3://dig-analysis-data + method = getenv('JOB_METHOD') # e.g. Test + stage = getenv('JOB_STAGE') # e.g. TestStage + prefix = getenv('JOB_PREFIX') # e.g. out/Test/TestStage + + print(f'JOB_BUCKET = {bucket}') + print(f'JOB_METHOD = {method}') + print(f'JOB_STAGE = {stage}') + print(f'JOB_PREFIX = {prefix}') + + # + # Run the spark job. + # + + # PySpark steps need to create a spark session + spark = SparkSession.builder.appName(method).getOrCreate() + + # TODO: spark job here... + + # done + spark.stop() + + +# entry point +if __name__ == '__main__': + main() diff --git a/claussnitzerlab/src/main/scala/Claussnitzerlab.scala b/claussnitzerlab/src/main/scala/Claussnitzerlab.scala new file mode 100644 index 00000000..2fd1268d --- /dev/null +++ b/claussnitzerlab/src/main/scala/Claussnitzerlab.scala @@ -0,0 +1,28 @@ +package org.broadinstitute.dig.aggregator.methods.claussnitzerlab + +import org.broadinstitute.dig.aggregator.core._ +import org.broadinstitute.dig.aws._ +import org.broadinstitute.dig.aws.emr._ + +/** This is your aggregator method. + * + * All that needs to be done here is to implement the initStages function, + * which adds stages to the method in the order they should be executed. + * + * When you are ready to run it, use SBT from the CLI: + * + * sbt run [args] + * + * See the README of the dig-aggregator-core project for a complete list of + * CLI arguments available. + */ +object Claussnitzerlab extends Method { + + /** Add all stages used in this method here. Stages must be added in the + * order they should be serially executed. + */ + override def initStages(implicit context: Context) = { +// addStage(new ListVariantsStage) // this created the dig-analysis-data/out/varianteffect/variants *.csv files + addStage(new ClaussnitzerlabStage) // main stage for the python script + } +} diff --git a/claussnitzerlab/src/main/scala/ClaussnitzerlabStage.scala b/claussnitzerlab/src/main/scala/ClaussnitzerlabStage.scala new file mode 100644 index 00000000..71c19ef9 --- /dev/null +++ b/claussnitzerlab/src/main/scala/ClaussnitzerlabStage.scala @@ -0,0 +1,117 @@ +package org.broadinstitute.dig.aggregator.methods.claussnitzerlab + +import org.broadinstitute.dig.aggregator.core._ +import org.broadinstitute.dig.aws._ +import org.broadinstitute.dig.aws.emr._ + +/** This is a stage in your method. + * + * Stages take one or more inputs and generate one or more outputs. Each + * stage consists of a... + * + * - list of input sources; + * - rules mapping inputs to outputs; + * - make function that returns a job used to produce a given output + * + * Optionally, a stage can also override... + * + * - its name, which defaults to its class name + * - the cluster definition used to provision EC2 instances + */ +class ClaussnitzerlabStage(implicit context: Context) extends Stage { + + /** Cluster configuration used when running this stage. The super + * class already has a default configuration defined, so it's easier + * to just copy and override specific parts of it. + */ + override val cluster: ClusterDef = super.cluster.copy( + instances = 1, + masterInstanceType = Strategy.computeOptimized(vCPUs = 16), + slaveInstanceType = Strategy.computeOptimized(vCPUs = 16), + bootstrapScripts = Seq( + new BootstrapScript(resourceUri("claussnitzerlabBootstrap.sh")) // pip3 install and downloading binary files + ) + ) + + /** Input sources need to be declared so they can be used in rules. + * + * Input sources are a glob-like S3 prefix to an object in S3. Wildcards + * can be pattern matched in the rules of the stage. + */ + val variants: Input.Source = Input.Source.Success("out/varianteffect/common/") // has to end with / + + /** When run, all the input sources here will be checked to see if they + * are new or updated. + */ + override val sources: Seq[Input.Source] = Seq(variants) + + /** For every input that is new/updated, this partial function is called, + * which pattern matches it against the inputs sources defined above and + * maps them to the output(s) that should be built. + * + * In our variants input source, there are two wildcards in the S3 prefix, + * which are matched to the dataset name and phenotype. The dataset is + * ignored, and the name of the phenotype is used as the output. + */ + override val rules: PartialFunction[Input, Outputs] = { + case variants() => Outputs.Named("claussnitzerlab") + } + + /** Once all the rules have been applied to the new and updated inputs, + * each of the outputs that needs built is send here. A job is returned, + * which is the series of steps that need to be executed on the cluster + * in order for the final output to be successfully built. + * + * It is assumed that all outputs for a given stage are independent of + * each other and can be executed in parallel across multiple, identical + * clusters. + */ + override def make(output: String): Job = { + + /* All job steps require a URI to a location in S3 where the script can + * be read from by the cluster. + * + * The resourceUri function will upload the resource in the jar to a + * unique location in S3 and return the URI to where it was uploaded. + */ + val claussnitzerlabScript = resourceUri("claussnitzerlabScript.sh") + val claussnitzerlabLibrary = resourceUri("dcc_basset_lib.py") + val claussnitzerlabPyTorch = resourceUri("fullNasaScript.py") + + // we used the phenotype as the output in rules +// val phenotype = output + + + // add a step for each part file + // runscript can be python script + // can be parrallel since each file can be processed independently + // _ is the input to the script + // get all the variant part files to process, use only the part filename + val objects = context.s3.ls(s"out/varianteffect/common/") + val parts = objects.map(_.key.split('/').last).filter(_.startsWith("part-")) + + // create new job + // take(2) will help with only taking 2 part files to process; good for testing + // new Job(parts.take(1).map(Job.Script(claussnitzerlabScript, _)), isParallel = true) // for testing + new Job(parts.map(Job.Script(claussnitzerlabScript, _)), isParallel = true) // for production + + } + + // add success amd prepareJob methods (see vep) + // spark does _success by default; create method for non spark jobs + + /** Before the jobs actually run, perform this operation. + */ + override def prepareJob(output: String): Unit = { + context.s3.rm("out/regionpytorch/claussnitzerlab/") // method to clear out directory where results go + } + + /** On success, write the _SUCCESS file in the output directory. + */ + override def success(output: String): Unit = { + context.s3.touch("out/regionpytorch/claussnitzerlab/_SUCCESS") // only Spark jobs create a _SUCCESS file by default; need to manually for sh/py jobs + () + } + + +} diff --git a/claussnitzerlab/version.sbt b/claussnitzerlab/version.sbt new file mode 100644 index 00000000..e7654440 --- /dev/null +++ b/claussnitzerlab/version.sbt @@ -0,0 +1 @@ +version in ThisBuild := "0.1.0"