Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi-threading implemented at the extension step #8

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 127 additions & 56 deletions bin/ExtendOrFormatContigs.pl
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
###############################################################
#Marten Boetzer 1-03-2010 #
#Multi-thread implemented by Jose F. Sanchez-Herrero #
# 1-08-2017 #
#Marten Boetzer 1-03-2010 #
#SSPACE perl subscript ExtendOrFormatContigs.pl #
#This script, based on the the -x parameter; #
# -Formats the contigs to appropriate format (-x 0) #
Expand All @@ -9,9 +11,12 @@
use strict;
use File::Basename;
use File::Path;
use FindBin qw($Bin);
use lib "$Bin/Parallel/";
use Parallel::ForkManager;
use Cwd qw(abs_path);

my ($MAX, $MAX_TOP, $TRACK_COUNT) = (0, 100, 1);

my $seplines = ("-" x 60)."\n";

my $contig = $ARGV[0];
Expand All @@ -37,7 +42,6 @@
my $filenameOutExt = $base_name . ".singlereads.fasta";
my ($bin);
if($extending == 1){

&ExtendContigs($base_name, $filecontig, $filenameOutExt);
print SUMFILE "\n" if($minContigLength > 0);
&FormatContigs() if($minContigLength > 0);
Expand All @@ -60,63 +64,130 @@ sub ExtendContigs{
&getUnmappedReads($filecontig, $readfile);
#-------------------------------------------------CONTIG EXTENSION USING UNMAPPED PAIRS STORED IN $SET
&printMessage("\n=>".getDate().": Contig extension initiated\n");
my $outfileTig = "intermediate_results/" . $base_name . ".extendedcontigs.fasta";

open (TIG, ">$outfileTig") || die "Can't write to $outfileTig -- fatal\n";
#--------------------------------------------ASSEMBLY START

ASSEMBLY:
open(IN, $filecontig) || die "Can't open $filecontig -- fatal\n";
my ($exttig_count, $counter, $NCount, $orig_mer, $prevhead) = (0, 0, 0, 0, '');
while(<IN>){
s/\r\n/\n/;
chomp;
$seq.= uc($_) if(eof(IN));
if (/\>(\S+)/ || eof(IN)){
my $head=$1;
$orig_mer = length($seq);
if($seq ne ''){
$NCount++ if($seq=~/([NX])/i);
my $start_sequence = uc($seq);
my $reads_needed = 1; #tracks coverage
my $total_bases = $orig_mer * $reads_needed;

($seq, $reads_needed, $total_bases) = doExtension("3", $orig_mer, $seq, $reads_needed, $total_bases, $min_overlap, $base_overlap, $min_base_ratio, $verbose, $counter, $max_trim) if($orig_mer >= $MIN_READ_LENGTH && $orig_mer >= $min_overlap);

my $seqrc = reverseComplement($seq);
($seqrc, $reads_needed, $total_bases) = doExtension("5", $orig_mer, $seqrc, $reads_needed, $total_bases, $min_overlap, $base_overlap, $min_base_ratio, $verbose, $counter, $max_trim) if($orig_mer >= $MIN_READ_LENGTH && $orig_mer >= $min_overlap);

my $leng = length($seqrc);
my $reversetig = reverseComplement($seqrc); ### return to sequence, as inputted
if($leng > $orig_mer){ ### commented out: && $start_sequence ne $seqrc && $start_sequence ne $reversetig
my $cov = $total_bases / $leng;
printf TIG ">extcontig%i|size%i|read%i|cov%.2f|seed:$prevhead\n%s\n", ($counter, $leng, $reads_needed, $cov, $reversetig); #print contigs to file
$exttig_count++;
}else{
my $cov = $reads_needed = 0;
my $singlet_leng = length($start_sequence);
printf TIG ">contig%i|size%i|read%i|cov%.2f|seed:$prevhead\n%s\n", ($counter, $leng, $reads_needed, $cov, $reversetig); #print singlets to file
}
}
CounterPrint(++$counter);
$prevhead = $head;
$seq='';
}else{
$seq .= uc($_);
}
}
CounterPrint(" ");
print SUMFILE "\tNumber of contig sequences =".($counter-1). "\n";
print SUMFILE "\t\tNumber of contigs containing N's (may prevent proper contig extension) = $NCount\n";

print SUMFILE "\tNumber of contigs extended = $exttig_count\n".$seplines;
close IN;
$filecontig = $outfileTig;
if($@){
my $message = $@;
&printMessage("\nSomething went wrong running $0 ".getDate()."\n$message\n");
}
close TIG;

## Split given fasta file in as many CPUs as expected and send multiple threads for each
# Splits fasta file and takes into account to add the whole sequence if it is broken
my $file = $filecontig;
my $file_size = -s $file; #To get only size
my $block = int($file_size/$threads);

my $path = abs_path();

open (FH, "<$file") or die "Could not open source file. $!";
print "\t- Splitting file into blocks of $block characters aprox ...\n";
my $j = 0; my @files;
my @name = split("/", $file);
my $file_name = $name[-1];
while (1) {
my $chunk;
my @tmp = split ("\.fasta", $file_name);
my $block_file = $path."/intermediate_results/".$tmp[0]."_part-".$j."_tmp.fasta";
push (@files, $block_file);
open(OUT, ">$block_file") or die "Could not open destination file";
if (!eof(FH)) { read(FH, $chunk,$block);
if ($j > 0) { $chunk = ">".$chunk; }
print OUT $chunk;
} ## Print the amount of chars
if (!eof(FH)) { $chunk = <FH>; print OUT $chunk; } ## print the whole line if it is broken
if (!eof(FH)) {
$/ = ">"; ## Telling perl where a new line starts
$chunk = <FH>; chop $chunk; print OUT $chunk;
$/ = "\n";
} ## print the sequence if it is broken
$j++; close(OUT); last if eof(FH);
}
close(FH);

my $pm = new Parallel::ForkManager($threads); ## Number of subprocesses not equal to CPUs. Each subprocesses will have multiple CPUs if available
$pm->run_on_finish(
sub { my ($pid, $exit_code, $ident) = @_;
print "\n\n** Child process finished with PID $pid and exit code: $exit_code\n\n";
} );
$pm->run_on_start( sub { my ($pid,$ident)=@_; print "\n\n** Parsing block of FASTA file $ident PID:$pid\n\n"; } );
for (my $i=0; $i < scalar @files; $i++) {
my $pid = $pm->start($files[$i], $i) and next; print "\nExecuting subprocess command\n\n";
## Send subprocess

open(IN, $files[$i]) || die "Can't open $files[$i] -- fatal\n";

my $out_put = $files[$i]."-extendedcontigs.fasta";
open (OUT, ">$out_put");
my ($exttig_count, $counter, $NCount, $orig_mer, $prevhead) = (0, 0, 0, 0, '');
$/ = ">"; ## Telling perl where a new line starts
while (<IN>) {
next if /^#/ || /^\s*$/;
chomp;
my ($head, $seq) = split(/\n/,$_,2);
next unless ($head && $seq);

$NCount++ if($seq=~/([NX])/i);
my $start_sequence = uc($seq);
my $reads_needed = 1; #tracks coverage
my $total_bases = $orig_mer * $reads_needed;

($seq, $reads_needed, $total_bases) = doExtension("3", $orig_mer, $seq, $reads_needed, $total_bases, $min_overlap, $base_overlap, $min_base_ratio, $verbose, $counter, $max_trim) if($orig_mer >= $MIN_READ_LENGTH && $orig_mer >= $min_overlap);

my $seqrc = reverseComplement($seq);
($seqrc, $reads_needed, $total_bases) = doExtension("5", $orig_mer, $seqrc, $reads_needed, $total_bases, $min_overlap, $base_overlap, $min_base_ratio, $verbose, $counter, $max_trim) if($orig_mer >= $MIN_READ_LENGTH && $orig_mer >= $min_overlap);

my $leng = length($seqrc);
my $reversetig = reverseComplement($seqrc); ### return to sequence, as inputted
if($leng > $orig_mer){ ### commented out: && $start_sequence ne $seqrc && $start_sequence ne $reversetig
my $cov = $total_bases / $leng;
printf OUT ">extcontig%i|size%i|read%i|cov%.2f|seed:$prevhead\n%s\n", ($counter, $leng, $reads_needed, $cov, $reversetig); #print contigs to file
$exttig_count++;
}else{
my $cov = $reads_needed = 0;
my $singlet_leng = length($start_sequence);
printf OUT ">contig%i|size%i|read%i|cov%.2f|seed:$prevhead\n%s\n", ($counter, $leng, $reads_needed, $cov, $reversetig); #print singlets to file
}
$counter++;
}
$/ = "\n";
close(IN); close (OUT);

## print STATS
my $stats = $files[$i]."-extendedcontigs.STATS.txt";
open (STATS, ">$stats");
print STATS "NCount=$NCount\ncounter=$counter\nexttig_count=$exttig_count\n";
close (STATS);

## finish subprocess
$pm->finish($i); # pass an exit code to finish
}
$pm->wait_all_children; print "\n** All parsing child processes have finished...\n\n";

## concatenate results
my $outfileTig = $path."/intermediate_results/" . $base_name . ".extendedcontigs.fasta";
my $stats_file = $path."/intermediate_results/stats-tmp.txt";

#open (TIG, ">$outfileTig") || die "Can't write to $outfileTig -- fatal\n";
system("cat $path/intermediate_results/*extendedcontigs.fasta >> $outfileTig");
system("cat $path/intermediate_results/*extendedcontigs.STATS.txt >> $stats_file");

my %stats;
open (STAT, "<$stats_file");
while (<STAT>) {
my $line = $_;
chomp $line;
my @array = split("=", $line);
$stats{$array[0]} += $array[1];
}
close (STAT);

print SUMFILE "\tNumber of contig sequences =".($stats{"counter"}-1). "\n";
print SUMFILE "\t\tNumber of contigs containing N's (may prevent proper contig extension) = ".$stats{"NCount"}."\n";
print SUMFILE "\tNumber of contigs extended = ".$stats{"exttig_count"}."\n".$seplines;

$filecontig = $outfileTig;
if($@){
my $message = $@;
&printMessage("\nSomething went wrong running $0 ".getDate()."\n$message\n");
}
}

###STORE CONTIGS TO APPROPRIATE FORMAT WHEN CONTIGS WILL NOT BE EXTENDED
Expand Down