From 0ac61343fba1c9120a554905ffc3784479cc7a31 Mon Sep 17 00:00:00 2001 From: Simon Ye Date: Tue, 1 Aug 2017 13:22:24 -0400 Subject: [PATCH] Add --db-index, --db-file, --taxonomy, --db-pipe for piped db files Allow kraken to operate on multiple inputs -> outputs --- scripts/kraken | 146 ++++++++++++++++++++++--------------- scripts/read_merger.pl | 6 +- src/classify.cpp | 162 +++++++++++++++++++++++++++++++---------- src/kraken_headers.hpp | 2 + 4 files changed, 213 insertions(+), 103 deletions(-) diff --git a/scripts/kraken b/scripts/kraken index af5f67b..868af7f 100755 --- a/scripts/kraken +++ b/scripts/kraken @@ -46,6 +46,12 @@ my $min_hits = 1; my $fasta_input = 0; my $fastq_input = 0; my $db_prefix; +my $idx_file; +my $kdb_file; +my $db_size; +my $idx_size; +my $db_pipe = 0; +my $taxonomy; my $threads; my $preload = 0; my $gunzip = 0; @@ -55,7 +61,7 @@ my $check_names = 0; my $only_classified_output = 0; my $unclassified_out; my $classified_out; -my $outfile; +my @outfiles; GetOptions( "help" => \&display_help, @@ -64,11 +70,17 @@ GetOptions( "threads=i" => \$threads, "fasta-input" => \$fasta_input, "fastq-input" => \$fastq_input, + "db-index=s" => \$idx_file, + "db-file=s" => \$kdb_file, + "db-size=i" => \$db_size, + "index-size=i" => \$idx_size, + "db-pipe" => \$db_pipe, + "taxonomy=s" => \$taxonomy, "quick" => \$quick, "min-hits=i" => \$min_hits, "unclassified-out=s" => \$unclassified_out, "classified-out=s" => \$classified_out, - "output=s" => \$outfile, + "output=s" => \@outfiles, "preload" => \$preload, "paired" => \$paired, "check-names" => \$check_names, @@ -85,31 +97,41 @@ if (! @ARGV) { print STDERR "Need to specify input filenames!\n"; usage(); } -eval { $db_prefix = krakenlib::find_db($db_prefix); }; -if ($@) { - die "$PROG: $@"; + +if (!defined $kdb_file || !defined $idx_file || !defined $taxonomy) { + eval { $db_prefix = krakenlib::find_db($db_prefix); }; + if ($@) { + die "$PROG: $@"; + } } -my $taxonomy = "$db_prefix/taxonomy/nodes.dmp"; +if (!defined $taxonomy) { + $taxonomy = "$db_prefix/taxonomy/nodes.dmp"; +} if ($quick) { - undef $taxonomy; # Skip loading nodes file, not needed in quick mode + undef $taxonomy; # Skip loading nodes file, not needed in quick mode } -my $kdb_file = "$db_prefix/database.kdb"; -my $idx_file = "$db_prefix/database.idx"; -if (! -e $kdb_file) { - die "$PROG: $kdb_file does not exist!\n"; +if (!defined $kdb_file) { + $kdb_file = "$db_prefix/database.kdb"; + if (! -e $kdb_file) { + die "$PROG: $kdb_file does not exist!\n"; + } } -if (! -e $idx_file) { - die "$PROG: $idx_file does not exist!\n"; + +if (!defined $idx_file) { + $idx_file = "$db_prefix/database.idx"; + if (! -e $idx_file) { + die "$PROG: $idx_file does not exist!\n"; + } } if ($min_hits > 1 && ! $quick) { die "$PROG: --min_hits requires --quick to be specified\n"; } -if ($paired && @ARGV != 2) { - die "$PROG: --paired requires exactly two filenames\n"; +if ($paired && scalar @ARGV % 2 ) { + die "$PROG: --paired requires 2x filenames\n"; } my $compressed = $gunzip || $bunzip2; @@ -135,65 +157,64 @@ if ($auto_detect) { my @flags; push @flags, "-d", $kdb_file; push @flags, "-i", $idx_file; +push @flags, "-D", $db_size if defined $db_size; +push @flags, "-I", $idx_size if defined $idx_size; push @flags, "-t", $threads if $threads > 1; push @flags, "-n", $taxonomy if defined $taxonomy; push @flags, "-q" if $quick; +push @flags, "-p" if $db_pipe; push @flags, "-m", $min_hits if $min_hits > 1; push @flags, "-f" if $fastq_input && ! $paired; # merger always outputs FASTA push @flags, "-U", $unclassified_out if defined $unclassified_out; push @flags, "-C", $classified_out if defined $classified_out; -push @flags, "-o", $outfile if defined $outfile; +if (@outfiles) { + foreach my $outfile (@outfiles) { + push @flags, "-o", $outfile; + } +} push @flags, "-c", if $only_classified_output; push @flags, "-M" if $preload; # handle piping for decompression/merging -my @pipe_argv; -if ($paired) { - my @merge_flags; - push @merge_flags, "--fa" if $fasta_input; - push @merge_flags, "--fq" if $fastq_input; - push @merge_flags, "--gz" if $gunzip; - push @merge_flags, "--bz2" if $bunzip2; - push @merge_flags, "--check-names" if $check_names; - @pipe_argv = ("read_merger.pl", @merge_flags, @ARGV); -} -elsif ($compressed) { - if ($gunzip) { - @pipe_argv = ("gzip", "-dc", @ARGV); - } - elsif ($bunzip2) { - @pipe_argv = ("bzip2", "-dc", @ARGV); - } - else { - die "$PROG: unrecognized compression program! This is a Kraken bug.\n"; - } -} +my @pipe_argvv; +foreach (my $i = 0; $i < @ARGV; $i++) { + my @pipe_argv; + my $use_pipe = 0; + if ($paired) { + $use_pipe = 1; + + my @merge_flags; + push @merge_flags, "--fa" if $fasta_input; + push @merge_flags, "--fq" if $fastq_input; + push @merge_flags, "--gz" if $gunzip; + push @merge_flags, "--bz2" if $bunzip2; + push @merge_flags, "--check-names" if $check_names; + @pipe_argv = ("read_merger.pl", @merge_flags, @ARGV[$i, $i+1]); + $i++; + } + elsif ($compressed) { + $use_pipe = 1; + + if ($gunzip) { + @pipe_argv = ("gzip", "-dc", $ARGV[$i]); + } + elsif ($bunzip2) { + @pipe_argv = ("bzip2", "-dc", $ARGV[$i]); + } + else { + die "$PROG: unrecognized compression program! This is a Kraken bug.\n"; + } + } else { + @pipe_argv = $ARGV[$i]; + } + if ($use_pipe) { + @pipe_argv = ( "<(", @pipe_argv, ")" ); + } + push @pipe_argvv, @pipe_argv; -# if args exist, set up the pipe/fork/exec -if (@pipe_argv) { - pipe RD, WR; - my $pid = fork(); - if ($pid < 0) { - die "$PROG: fork error: $!\n"; - } - if ($pid) { - open STDIN, "<&RD" - or die "$PROG: can't dup stdin to read end of pipe: $!\n"; - close RD; - close WR; - @ARGV = ("/dev/fd/0"); # make classifier read from pipe - } - else { - open STDOUT, ">&WR" - or die "$PROG: can't dup stdout to write end of pipe: $!\n"; - close RD; - close WR; - exec @pipe_argv - or die "$PROG: can't exec $pipe_argv[0]: $!\n"; - } } -exec $CLASSIFY, @flags, @ARGV; +exec join(" ", ("exec /bin/bash -c \"exec", $CLASSIFY, @flags, @pipe_argvv, "\"")); die "$PROG: exec error: $!\n"; sub usage { @@ -212,6 +233,11 @@ Options: --fastq-input Input is FASTQ format --gzip-compressed Input is gzip compressed --bzip2-compressed Input is bzip2 compressed + --db-index Path for db index file + --db-file Path for db file + --db-size Size of db file + --index-size Size of db index file + --db-pipe DB is piped --quick Quick operation (use first hit or hits) --min-hits NUM In quick op., number of hits req'd for classification NOTE: this is ignored if --quick is not specified diff --git a/scripts/read_merger.pl b/scripts/read_merger.pl index 166f21f..71a6104 100755 --- a/scripts/read_merger.pl +++ b/scripts/read_merger.pl @@ -47,9 +47,9 @@ for my $file (@ARGV) { if (! -e $file) { die "$PROG: $file does not exist\n"; - } - if (! -f $file) { - die "$PROG: $file is not a regular file\n"; + } + if (! (-f $file || -p $file)) { + die "$PROG: $file is not a file or pipe\n"; } } diff --git a/src/classify.cpp b/src/classify.cpp index 957c8be..be1fef4 100644 --- a/src/classify.cpp +++ b/src/classify.cpp @@ -30,29 +30,33 @@ using namespace kraken; void parse_command_line(int argc, char **argv); void usage(int exit_code=EX_USAGE); -void process_file(char *filename); +void process_file(char *filename, ostream* kraken_output); void classify_sequence(DNASequence &dna, ostringstream &koss, ostringstream &coss, ostringstream &uoss); string hitlist_string(vector &taxa, vector &ambig); set get_ancestry(uint32_t taxon); +double elapsed_seconds(struct timeval time1, struct timeval time2); void report_stats(struct timeval time1, struct timeval time2); int Num_threads = 1; string DB_filename, Index_filename, Nodes_filename; +size_t DB_size = 0; +size_t Index_size = 0; +bool Pipe_DB = false; bool Quick_mode = false; bool Fastq_input = false; bool Print_classified = false; bool Print_unclassified = false; -bool Print_kraken = true; bool Populate_memory = false; bool Only_classified_kraken_output = false; uint32_t Minimum_hit_count = 1; map Parent_map; KrakenDB Database; -string Classified_output_file, Unclassified_output_file, Kraken_output_file; +string Classified_output_file, Unclassified_output_file; +vector Kraken_output_files; ostream *Classified_output; ostream *Unclassified_output; -ostream *Kraken_output; +vector Kraken_outputs; size_t Work_unit_size = DEF_WORK_UNIT_SIZE; uint64_t total_classified = 0; @@ -68,26 +72,72 @@ int main(int argc, char **argv) { if (! Nodes_filename.empty()) Parent_map = build_parent_map(Nodes_filename); - if (Populate_memory) - cerr << "Loading database... "; + struct timeval dtv1, dtv2; + gettimeofday(&dtv1, NULL); QuickFile db_file; - db_file.open_file(DB_filename); - if (Populate_memory) - db_file.load_file(); - Database = KrakenDB(db_file.ptr()); + if (!Pipe_DB) { + if (Populate_memory) + cerr << "Loading database... "; + + db_file.open_file(DB_filename); + if (Populate_memory) + db_file.load_file(); + Database = KrakenDB(db_file.ptr()); + } else { + ifstream in(DB_filename.c_str(), ios::in | ios::binary); + if (!in) { + cerr << "Couldn't open file " << DB_filename; + exit(EXIT_FAILURE); + } + char* buffer; + if (DB_size > 0) { + buffer = (char*) malloc(DB_size); + in.read(buffer, DB_size); + } else { + string db_string((istreambuf_iterator(in)), istreambuf_iterator()); + buffer = (char*)db_string.c_str(); + } + Database = KrakenDB(buffer); + } + KmerScanner::set_k(Database.get_k()); + KrakenDBIndex db_index; QuickFile idx_file; - idx_file.open_file(Index_filename); - if (Populate_memory) - idx_file.load_file(); - KrakenDBIndex db_index(idx_file.ptr()); + if (!Pipe_DB) { + idx_file.open_file(Index_filename); + if (Populate_memory) + idx_file.load_file(); + db_index = KrakenDBIndex(idx_file.ptr()); + } else { + ifstream in(Index_filename.c_str(), ios::in | ios::binary); + if (!in) { + cerr << "Couldn't open file " << Index_filename; + exit(EXIT_FAILURE); + } + char* buffer; + if (Index_size > 0) { + buffer = (char*) malloc(Index_size); + in.read(buffer, Index_size); + } else { + string index_string((istreambuf_iterator(in)), istreambuf_iterator()); + buffer = (char*) index_string.c_str(); + } + db_index = KrakenDBIndex(buffer); + } + Database.set_index(&db_index); if (Populate_memory) cerr << "complete." << endl; + gettimeofday(&dtv2, NULL); + double seconds = elapsed_seconds(dtv1, dtv2); + fprintf(stderr, + "database loaded in %dm %.2fs. \n", + (int) seconds / 60, fmod(seconds, 60)); + if (Print_classified) { if (Classified_output_file == "-") Classified_output = &cout; @@ -102,27 +152,47 @@ int main(int argc, char **argv) { Unclassified_output = new ofstream(Unclassified_output_file.c_str()); } - if (! Kraken_output_file.empty()) { - if (Kraken_output_file == "-") - Print_kraken = false; - else - Kraken_output = new ofstream(Kraken_output_file.c_str()); + for (vector::iterator it = Kraken_output_files.begin(); it != Kraken_output_files.end(); ++it) { + if (*it == "-") { + Kraken_outputs.push_back(&cout); + } else { + ostream* kraken_output = new ofstream((*it).c_str()); + Kraken_outputs.push_back(kraken_output); + } } - else - Kraken_output = &cout; - struct timeval tv1, tv2; - gettimeofday(&tv1, NULL); - for (int i = optind; i < argc; i++) - process_file(argv[i]); - gettimeofday(&tv2, NULL); + size_t n_inputs = argc - optind; + size_t n_outputs = Kraken_outputs.size(); + if (n_outputs && n_inputs != n_outputs) { + cerr << "Number of output files doesn't match inputs"; + exit(EXIT_FAILURE); + } - report_stats(tv1, tv2); + int j = 0; + for (int i = optind; i < argc; i++) { + struct timeval tv1, tv2; + gettimeofday(&tv1, NULL); + + total_classified = 0; + total_sequences = 0; + total_bases = 0; + ostream* kraken_output; + if (!n_outputs) { + kraken_output = &cout; + } else { + kraken_output = Kraken_outputs[j]; + } + process_file(argv[i], kraken_output); + j++; + gettimeofday(&tv2, NULL); + + report_stats(tv1, tv2); + } return 0; } -void report_stats(struct timeval time1, struct timeval time2) { +double elapsed_seconds(struct timeval time1, struct timeval time2) { time2.tv_usec -= time1.tv_usec; time2.tv_sec -= time1.tv_sec; if (time2.tv_usec < 0) { @@ -132,10 +202,15 @@ void report_stats(struct timeval time1, struct timeval time2) { double seconds = time2.tv_usec; seconds /= 1e6; seconds += time2.tv_sec; + return seconds; +} + +void report_stats(struct timeval time1, struct timeval time2) { + double seconds = elapsed_seconds(time1, time2); if (isatty(fileno(stderr))) cerr << "\r"; - fprintf(stderr, + fprintf(stderr, "%llu sequences (%.2f Mbp) processed in %.3fs (%.1f Kseq/m, %.2f Mbp/m).\n", (unsigned long long) total_sequences, total_bases / 1.0e6, seconds, total_sequences / 1.0e3 / (seconds / 60), @@ -147,7 +222,7 @@ void report_stats(struct timeval time1, struct timeval time2) { (total_sequences - total_classified) * 100.0 / total_sequences); } -void process_file(char *filename) { +void process_file(char *filename, ostream* kraken_output) { string file_str(filename); DNASequenceReader *reader; DNASequence dna; @@ -177,7 +252,7 @@ void process_file(char *filename) { } if (total_nt == 0) break; - + kraken_output_ss.str(""); classified_output_ss.str(""); unclassified_output_ss.str(""); @@ -187,8 +262,7 @@ void process_file(char *filename) { #pragma omp critical(write_output) { - if (Print_kraken) - (*Kraken_output) << kraken_output_ss.str(); + (*kraken_output) << kraken_output_ss.str(); if (Print_classified) (*Classified_output) << classified_output_ss.str(); if (Print_unclassified) @@ -202,8 +276,7 @@ void process_file(char *filename) { } // end parallel section delete reader; - if (Print_kraken) - (*Kraken_output) << std::flush; + (*kraken_output) << std::flush; if (Print_classified) (*Classified_output) << std::flush; if (Print_unclassified) @@ -275,9 +348,6 @@ void classify_sequence(DNASequence &dna, ostringstream &koss, } } - if (! Print_kraken) - return; - if (call) { koss << "C\t"; } @@ -354,7 +424,7 @@ void parse_command_line(int argc, char **argv) { if (argc > 1 && strcmp(argv[1], "-h") == 0) usage(0); - while ((opt = getopt(argc, argv, "d:i:t:u:n:m:o:qfcC:U:M")) != -1) { + while ((opt = getopt(argc, argv, "d:i:D:I:t:u:n:m:o:pqfcC:U:M")) != -1) { switch (opt) { case 'd' : DB_filename = optarg; @@ -362,6 +432,15 @@ void parse_command_line(int argc, char **argv) { case 'i' : Index_filename = optarg; break; + case 'D' : + DB_size = atoll(optarg); + break; + case 'I' : + Index_size = atoll(optarg); + break; + case 'p' : + Pipe_DB = true; + break; case 't' : sig = atoll(optarg); if (sig <= 0) @@ -400,7 +479,7 @@ void parse_command_line(int argc, char **argv) { Unclassified_output_file = optarg; break; case 'o' : - Kraken_output_file = optarg; + Kraken_output_files.push_back(optarg); break; case 'u' : sig = atoll(optarg); @@ -440,6 +519,9 @@ void usage(int exit_code) { << "Options: (*mandatory)" << endl << "* -d filename Kraken DB filename" << endl << "* -i filename Kraken DB index filename" << endl + << " -D filename Kraken DB size" << endl + << " -I filename Kraken DB index size" << endl + << " -p Kraken DB are pipes" << endl << " -n filename NCBI Taxonomy nodes file" << endl << " -o filename Output file for Kraken output" << endl << " -t # Number of threads" << endl diff --git a/src/kraken_headers.hpp b/src/kraken_headers.hpp index bca9858..fb5b574 100644 --- a/src/kraken_headers.hpp +++ b/src/kraken_headers.hpp @@ -33,9 +33,11 @@ #include #include #include +#include #include #include #include +#include #include #include #include