diff --git a/README.md b/README.md index 5e669dc..49622f6 100644 --- a/README.md +++ b/README.md @@ -26,7 +26,7 @@ The softwares used in this pipeline are: * [IGVTools](http://www.broadinstitute.org/igv/igvtools) * [PhantomPeak](http://code.google.com/p/phantompeakqualtools/) __In fact, the script **run_spp_nodups.R** is from PhantomPeak, but PhantomPeak still need to be installed in R.__ * [ngs.plot](https://code.google.com/p/ngsplot/) -* If cluster supporting needed, [drmaa_for_python](https://pypi.python.org/pypi/drmaa) is needed. Now only LSF is supported, but it is easy to modify it to fit your demands. +* If cluster supporting needed, [drmaa_for_python](https://pypi.python.org/pypi/drmaa) is needed. Now LSF and SGE are supported, but it is easy to modify it to fit your demands. Install above softwares and make sure they are in $PATH. @@ -40,7 +40,7 @@ Put the scripts in ./bin to a place in $PATH or add ./bin to $PATH. python pipeline.py config.yaml ``` -Or on an LSF cluster: +Or on an LSF or SGE cluster: ```bash nohup python pipeline.py config.yaml & diff --git a/project/script-sge/config.yaml b/project/script-sge/config.yaml new file mode 100644 index 0000000..4e9286f --- /dev/null +++ b/project/script-sge/config.yaml @@ -0,0 +1,42 @@ +project_name: "test_ChIP-seq" +project_dir: "~/projects/test_ChIP-seq" + +## Aligner: bowtie or bowtie2 +aligner: "bowtie" +pair_end: "yes" + +## If use bowtie, then bowtie1 index path to be assigned here. +## "Genome" replaced by genome name as "hg19" or "mm9", and +## uncomment next line. +bowtie_index: "~/data/bowtie_index/Genome" + +## If use bowtie2, then "Genome" should be replace by +## bowtie2 index name, as "hg19" or "mm9" +## $BOWTIE2_INDEXES should be added in the environment variables +# bowtie_index: "Genome" + +bam_sort_buff: "2G" +IGV_genome: "hg19" +# ngsplot_genome: "reference_genome" +# ngsplot_fraglen: 150 + +## the folder under project folder, containing fastq folder. +data_dir: "data" + +## match pattern for input files. Now it could be: +## *.fastq, *.fq, *.gz +## for pair end data, use "*R1*.fastq.gz" or "*R1*.fastq" +input_files: "*R1*.fastq.gz" + +## cluster settings +cores: 4 # number of cores to use for multi-threaded programs. +queue: "queue_name" +h_vmem: "32G" + +## wall_time for every step, hh:mm +wall_time: + alignFastqByBowtie: "23:59" + runFastqc: "4:00" + rmdupBam: "20:00" + genTDF: "20:00" + runPhantomPeak: "20:00" diff --git a/project/script-sge/pipeline.py b/project/script-sge/pipeline.py new file mode 100644 index 0000000..dd67f94 --- /dev/null +++ b/project/script-sge/pipeline.py @@ -0,0 +1,227 @@ +#! /usr/bin/env python + +import os +import sys +import yaml +from ruffus import * +import glob +import subprocess +import string +import drmaa +from ruffus.drmaa_wrapper import run_job, error_drmaa_job + +my_drmaa_session = drmaa.Session() +my_drmaa_session.initialize() + +def expandOsPath(path): + """ + To expand the path with shell variables. + Arguments: + - `path`: path string + """ + return os.path.expanduser(os.path.expandvars(path)) + +def genFilesWithPattern(pathList, Pattern): + """ + To generate files list on the fly based on wildcards pattern. + Arguments: + - `pathList`: the path of the files + - `Pattern`: pattern like config["input_files"] + """ + pathList.append(Pattern) + Files = expandOsPath(os.path.join(*pathList)) + return Files + +def cluster_options(config, task_name, cores, logfile): + """ + Generate a string of cluster options to feed an LSF job. + Arguments: + - `config`: configuration as associative array from the YAML file. + - `task_name`: the specific task name, such as runPhantomPeak. + - `cores`: number of cores to use for this task. + - `logfile`: log file name. + """ + ## Here are the paramters for SGE. + str_options = "-cwd -V -pe shm %d -q %s -j y -o %s" % \ + (cores, config["queue"], logfile) + if "h_vmem" in config: + str_options = str_options + " -l h_vmem=%s" % (config["h_vmem"]) + return str_options + + +config_name = sys.argv[1] +config_f = open(config_name, "r") +config = yaml.load(config_f) +config_f.close() +inputfiles = expandOsPath(os.path.join(config["project_dir"], config["data_dir"], "fastq", config["input_files"])) +FqFiles = [x for x in glob.glob(inputfiles)] +fq_name, fq_ext = os.path.splitext(config["input_files"]) +fq_ext_suffix = ".alignment.log" +Bam_path = expandOsPath(os.path.join(config["project_dir"], config["data_dir"])) + "/" +FastQC_path = expandOsPath(os.path.join(config["project_dir"], config["data_dir"], "FastQC")) +rmdup_path = expandOsPath(os.path.join(config["project_dir"], config["data_dir"], "rmdup")) + +scipt_path = os.path.dirname(os.path.realpath(__file__)) + +@transform(FqFiles, formatter(fq_ext), os.path.join(Bam_path, "{basename[0]}.bam"), config) +def alignFastqByBowtie(FqFileName, OutputBamFileName, config): + """ + To align '.fastq' to genome. + Arguments: + - `FqFileName`: file to be processed + """ + if "aligner" in config: + if config["aligner"] == "bowtie": + cmds = ['fastq2bam_by_bowtie.sh'] + cmds.append(FqFileName) + cmds.append(expandOsPath(config['bowtie_index'])) + elif config["aligner"] == "bowtie2": + cmds = ['fastq2bam_by_bowtie2.sh'] + cmds.append(FqFileName) + cmds.append(config['bowtie_index']) + else: + raise KeyError + else: + cmds = ['fastq2bam_by_bowtie.sh'] + cmds.append(FqFileName) + cmds.append(expandOsPath(config['bowtie_index'])) + + target = expandOsPath(os.path.join(config["project_dir"], config["data_dir"])) + cmds.append(target) + cmds.append(config["pair_end"]) + cores = int(config['cores']) + if cores == 0: + cores = 1 + cmds.append(str(cores)) + logfile = FqFileName + ".alignment.log" + + run_job(" ".join(cmds), + job_name = "alignFastqByBowtie_" + os.path.basename(FqFileName), + job_other_options = cluster_options(config, "alignFastqByBowtie", cores, logfile), + job_script_directory = os.path.dirname(os.path.realpath(__file__)), + job_environment={ 'BASH_ENV' : '/srv/gsfs0/home/nshao/.bashrc' }, + retain_job_scripts = True, drmaa_session=my_drmaa_session) + + return 0 + +@follows(alignFastqByBowtie, mkdir(FastQC_path)) +@transform(alignFastqByBowtie, suffix(".bam"), ".bam.fastqc.log", config) +def runFastqc(BamFileName, fastqcLog, config): + """ + To run FastQC + Arguments: + - `BamFileName`: bam file + - `config`: config + """ + cmds = ['fastqc'] + cmds.append("-o") + cmds.append(expandOsPath(os.path.join(config["project_dir"], config["data_dir"], "FastQC"))) + cores = int(config['cores']) + if cores == 0: + cores = 1 + cmds.append("-t") + cmds.append(str(cores)) + cmds.append(BamFileName) + logfile = BamFileName + ".fastqc.log" + + run_job(" ".join(cmds), + job_name = "fastqc_" + os.path.basename(BamFileName), + job_other_options = cluster_options(config, "runFastqc", cores, logfile), + job_script_directory = os.path.dirname(os.path.realpath(__file__)), + job_environment={ 'BASH_ENV' : '~/.bashrc' }, + retain_job_scripts = True, drmaa_session=my_drmaa_session) + + return 0 + +@follows(runFastqc, mkdir(rmdup_path)) +@transform(alignFastqByBowtie, formatter(".bam"), os.path.join(rmdup_path, "{basename[0]}_rmdup.bam"), config) +def rmdupBam(BamFileName, rmdupFile, config): + """ + To remove duplicates + Arguments: + - `BamFileName`: bam file + - `config`: config + """ + if config["pair_end"]=="no": + cmds = ['rmdup.bam.sh'] + else: + cmds = ['rmdup_PE.bam.sh'] + cmds.append(BamFileName) + cmds.append(rmdup_path) + #if "bam_sort_buff" in config: + # cmds.append(config["bam_sort_buff"]) + logfile = BamFileName + ".rmdup.log" + + cores = 1 + + run_job(" ".join(cmds), + job_name = "rmdup_" + os.path.basename(BamFileName), + job_other_options = cluster_options(config, "rmdupBam", cores, logfile), + job_script_directory = os.path.dirname(os.path.realpath(__file__)), + job_environment={ 'BASH_ENV' : '~/.bashrc' }, + retain_job_scripts = True, drmaa_session=my_drmaa_session) + + return 0 + +@follows(rmdupBam, mkdir(expandOsPath(os.path.join(rmdup_path, "tdf")))) +@transform(rmdupBam, suffix(".bam"), ".bam.tdf.log", config) +def genTDF(BamFileName, tdfLog, config): + """ + To generate TDF files for IGV + Arguments: + - `BamFileName`: bam file + - `config`: config + """ + cmds = ['igvtools'] + cmds.append("count") + cmds.append(BamFileName) + TDFPath = expandOsPath(os.path.join(rmdup_path, "tdf")) + baseName = os.path.basename(BamFileName) + cmds.append(os.path.join(TDFPath, baseName.replace(".bam", ".tdf"))) + cmds.append(config["IGV_genome"]) + logfile = BamFileName + ".tdf.log" + + cores = 1 + + run_job(" ".join(cmds), + job_name = "genTDF_" + os.path.basename(BamFileName), + job_other_options = cluster_options(config, "genTDF", cores, logfile), + job_script_directory = os.path.dirname(os.path.realpath(__file__)), + job_environment={ 'BASH_ENV' : '~/.bashrc' }, + retain_job_scripts = True, drmaa_session=my_drmaa_session) + + return 0 + +@follows(genTDF) +@transform(rmdupBam, suffix(".bam"), ".bam.phantomPeak.log", config) +def runPhantomPeak(BamFileName, Log, config): + """ + To check data with phantomPeak + Arguments: + - `BamFileName`: bam file + - `config`: config + """ + cmds = ['runPhantomPeak.sh'] + cmds.append(BamFileName) + cmds.append(str(config["cores"])) + logfile = BamFileName + ".phantomPeak.log" + + cores = int(config['cores']) + if cores == 0: + cores = 1 + + run_job(" ".join(cmds), + job_name = "runPhantomPeak_" + os.path.basename(BamFileName), + job_other_options = cluster_options(config, "runPhantomPeak", cores, logfile), + job_script_directory = os.path.dirname(os.path.realpath(__file__)), + job_environment={ 'BASH_ENV' : '~/.bashrc' }, + retain_job_scripts = True, drmaa_session=my_drmaa_session) + + return 0 + +if __name__ == '__main__': + ## run to step of PhantomPeak + ## multithread number need to be changed! + pipeline_run([runPhantomPeak], multithread=200) + + my_drmaa_session.exit() diff --git a/project/script-sge/results_parser.py b/project/script-sge/results_parser.py new file mode 100644 index 0000000..0111507 --- /dev/null +++ b/project/script-sge/results_parser.py @@ -0,0 +1,221 @@ +#! /usr/bin/env python + +import sys +import os +import glob +import re +import yaml +from collections import namedtuple + +def expandOsPath(path): + """ + To expand the path with shell variables. + Arguments: + - `path`: path string + """ + return os.path.expanduser(os.path.expandvars(path)) + +def genFilesWithPattern(pathList, Pattern): + """ + To generate files list on the fly. + Arguments: + - `pathList`: the path of the files + - `Pattern`: pattern like config["input_files"] + """ + pathList.append(Pattern) + Files = glob.glob(expandOsPath(os.path.join(*pathList))) + return Files + +def parse_bowtie1_log(s): + total_pattern = re.compile(r"""\#\s+reads\s+processed:\s(?P.+)\s*""", # total_reads + re.VERBOSE) + unique_mapped_pattern = re.compile("""\#\s+reads\s+with\s+at\s+least\s+one\s+reported\s+alignment:\s+(?P\S+)\s+\(\S+\)""", # unique_mapped_reads + re.VERBOSE) + multiple_mapped_pattern = re.compile("""\#\s+reads\s+with\s+alignments\s+suppressed\s+due\s+to\s+-m:\s+(?P\d+)\s+\(\S+\)""", #multiple_mapped_reads + re.VERBOSE) + for line in s: + match = total_pattern.match(line) + if match: + total_reads = match.group("total_reads") + match = unique_mapped_pattern.match(line) + if match: + unique_mapped_reads = match.group("unique_mapped_reads") + match = multiple_mapped_pattern.match(line) + if match: + multiple_mapped_reads = match.group("multiple_mapped_reads") + res = namedtuple('res', ['total_reads', 'unique_mapped_reads', 'suppressed_multiple_mapped_reads']) + r = res(total_reads=total_reads, + unique_mapped_reads=unique_mapped_reads, + suppressed_multiple_mapped_reads=multiple_mapped_reads) + return r + +def parse_bowtie2_log(s): + total_pattern = re.compile(r"""(?P\d+)\s+reads;\s+of\s+these:""", # total_reads + re.VERBOSE) + unique_mapped_pattern = re.compile("""\s*(?P\d+)\s+\(\S+\).+exactly\s+1\s+time""", # unique_mapped_reads + re.VERBOSE) + multiple_mapped_pattern = re.compile("""\s+(?P\d+)\s+\(\S+\).+aligned\s+>1\s+times""", # unique_mapped_reads + re.VERBOSE) + for line in s: + match = total_pattern.match(line) + if match: + total_reads = match.group("total_reads") + match = unique_mapped_pattern.match(line) + if match: + unique_mapped_reads = match.group("unique_mapped_reads") + match = multiple_mapped_pattern.match(line) + if match: + multiple_mapped_reads = match.group("multiple_mapped_reads") + res = namedtuple('res', ['total_reads', 'unique_mapped_reads', 'multiple_mapped_reads']) + r = res(total_reads=total_reads, + unique_mapped_reads=unique_mapped_reads, + multiple_mapped_reads=multiple_mapped_reads) + return r + +def parse_rmdup_log(s): + pattern = re.compile(r'\[bam_rmdupse_core\]\s+(?P\d+)\s/\s\d+', re.VERBOSE) + for line in s: + match = pattern.match(line) + if match: + dup_reads = match.group("dup_reads") + res = namedtuple('res', ['dup_reads']) + r = res(dup_reads=dup_reads) + return r + +def parse_phantomPeak_log(s): + NSC_pattern = re.compile(r'.*\(NSC\)\s*(?P\d*\.\d*).+', re.VERBOSE) + RSC_pattern = re.compile(r'.*\(RSC\)\s*(?P\d*\.\d*).+', re.VERBOSE) + for line in s: + match = NSC_pattern.match(line) + if match: + NSC = match.group("NSC") + match = RSC_pattern.match(line) + if match: + RSC = match.group("RSC") + res = namedtuple('res', ['NSC', 'RSC']) + r = res(NSC=NSC, RSC=RSC) + return r + +def getSummaryFiles(input_type, config, search_paths): + """ + Get all summary files under the folders. + input_type: file types. + config: config loaded from yaml. + """ + input_type = "*" + input_type + files = genFilesWithPattern([config["project_dir"], config["data_dir"]], input_type) + for search_path in search_paths: + files.extend(genFilesWithPattern([config["project_dir"], config["data_dir"], search_path], + input_type)) + return files + +def getFileId(file_basename): + """ + Remove suffix of the summary file to get file id. + """ + suffixes = ['.fastq.alignment.log', '.fq.alignment.log', '.gz.alignment.log', '.bam.rmdup.log', '_rmdup.bam.phantomPeak.log'] + for suffix in suffixes: + file_basename = file_basename.replace(suffix, '') + return file_basename + +## Search subdirectories under data folder. +search_paths = ["fastq", "rmdup"] + +## Used for final results. +summary_dict = {} + +## Load the same config yaml file of the pipeline. +config_name = sys.argv[1] +config_f = open(config_name, "r") +config = yaml.load(config_f) +config_f.close() + +if config["aligner"] == "bowtie": + ## To be used in debug + # input_files = {".alignment.log":("total_reads", "unique_mapped_reads")} + + ## Summary files used for summarizing. + input_files = { + ".alignment.log":("total_reads", "unique_mapped_reads", "suppressed_multiple_mapped_reads"), + ".rmdup.log":("dup_reads"), + ".phantomPeak.log":("NSC", "RSC") + } + + ## Decide the parser here by a dict. + parser_dict = { + ".alignment.log": parse_bowtie1_log, + ".rmdup.log": parse_rmdup_log, + ".phantomPeak.log": parse_phantomPeak_log + } + + ## Used to assign the output field in output file. + output_header = [ + "sample", + "total_reads", + "unique_mapped_reads", + "suppressed_multiple_mapped_reads", + "dup_reads", + "NSC", + "RSC"] + +elif config["aligner"] == "bowtie2": + ## to be used in debug + # input_files = {".alignment.log":("total_reads", "unique_mapped_reads", "multiple_mapped_reads")} + + ## Summary files used for summarizing. + input_files = { + ".alignment.log":("total_reads", "unique_mapped_reads", "multiple_mapped_reads"), + ".rmdup.log":("dup_reads"), + ".phantomPeak.log":("NSC", "RSC") + } + + ## Decide the parser here by a dict. + parser_dict = { + ".alignment.log": parse_bowtie2_log, + ".rmdup.log": parse_rmdup_log, + ".phantomPeak.log": parse_phantomPeak_log + } + + ## Used to assign the output field in output file. + output_header = [ + "sample", + "total_reads", + "unique_mapped_reads", + "multiple_mapped_reads", + "dup_reads", + "NSC", + "RSC"] + +## Scan the files to summarize the pipeline. +for input_type, summary_types in input_files.items(): + summary_files = getSummaryFiles(input_type, config, search_paths) + if len(summary_files) != 0: + for summary_file in summary_files: + file_id = getFileId(os.path.basename(summary_file)) + if file_id not in summary_dict: + summary_dict[file_id] = {'sample':file_id} + input_file = file(summary_file) + lines = input_file.readlines() + input_file.close() + ## Here the value of the dict is the parser function! + res = parser_dict[input_type](lines) + ## Unpack the results into dict. + for i in range(len(res._fields)): + if res._fields[i] not in output_header: + output_header.append(res._fields[i]) + summary_dict[file_id][res._fields[i]] = res[i] + +## Output to file, and the columns order is decided by output_header. +output_file = file("summary_stats.txt", "w") +header_line = "\t".join(output_header) + "\n" +output_file.write(header_line) +for sample in summary_dict.keys(): + output_list = [] + for stat in output_header: + if stat in summary_dict[sample]: + output_list.append(summary_dict[sample][stat]) + else: + output_list.append("NA") + line = "\t".join(output_list) + "\n" + output_file.write(line) +output_file.close()