/**
* Biopet is built on top of GATK Queue for building bioinformatic
* pipelines. It is mainly intended to support LUMC SHARK cluster which is running
* SGE. But other types of HPC that are supported by GATK Queue (such as PBS)
* should also be able to execute Biopet tools and pipelines.
*
* Copyright 2014 Sequencing Analysis Support Core - Leiden University Medical Center
*
* Contact us at: sasc@lumc.nl
*
* A dual licensing mode is applied. The source code within this project that are
* not part of GATK Queue is freely available for non-commercial use under an AGPL
* license; For commercial users or users who do not want to follow the AGPL
* license, please contact us to obtain a separate license.
*/
package nl.lumc.sasc.biopet.pipelines.gentrap
import java.io.File
import nl.lumc.sasc.biopet.FullVersion
import nl.lumc.sasc.biopet.core._
import nl.lumc.sasc.biopet.extensions.picard.{ MergeSamFiles, SortSam }
import nl.lumc.sasc.biopet.extensions.samtools.SamtoolsView
import nl.lumc.sasc.biopet.extensions.tools.{ MergeTables, WipeReads }
import nl.lumc.sasc.biopet.extensions.{ HtseqCount, Ln }
import nl.lumc.sasc.biopet.pipelines.bamtobigwig.Bam2Wig
import nl.lumc.sasc.biopet.pipelines.gentrap.extensions.{ CustomVarScan, Pdflatex, RawBaseCounter }
import nl.lumc.sasc.biopet.pipelines.gentrap.scripts.{ AggrBaseCount, PdfReportTemplateWriter, PlotHeatmap }
import nl.lumc.sasc.biopet.pipelines.mapping.MultisampleMappingTrait
import nl.lumc.sasc.biopet.utils.Logging
import nl.lumc.sasc.biopet.utils.config._
import org.broadinstitute.gatk.queue.QScript
import org.broadinstitute.gatk.queue.function.QFunction
import picard.analysis.directed.RnaSeqMetricsCollector.StrandSpecificity
import scala.language.reflectiveCalls
import scalaz.Scalaz._
/**
* Gentrap pipeline
* Generic transcriptome analysis pipeline
*
* @author Peter van 't Hof
* @author Wibowo Arindrarto
*/
class Gentrap(val root: Configurable) extends QScript
with MultisampleMappingTrait { qscript =>
import Gentrap.ExpMeasures._
import Gentrap.StrandProtocol._
import Gentrap._
// alternative constructor for initialization with empty configuration
def this() = this(null)
/** Split aligner to use */
var aligner: String = config("aligner", default = "gsnap")
/** Expression measurement modes */
// see the enumeration below for valid modes
var expMeasures: Set[ExpMeasures.Value] = {
if (config.contains("expression_measures"))
config("expression_measures")
.asStringList
.flatMap { makeExpMeasure }
.toSet
else {
Logging.addError("'expression_measures' is missing in the config")
Set()
}
}
/** Strandedness modes */
var strandProtocol: StrandProtocol.Value = {
if (config.contains("strand_protocol"))
makeStrandProtocol(config("strand_protocol").asString).getOrElse(StrandProtocol.NonSpecific)
else {
Logging.addError("'strand_protocol' is missing in the config")
StrandProtocol.NonSpecific
}
}
/** GTF reference file */
var annotationGtf: Option[File] = config("annotation_gtf")
/** BED reference file */
var annotationBed: Option[File] = config("annotation_bed")
/** refFlat reference file */
var annotationRefFlat: File = config("annotation_refflat")
/** rRNA refFlat annotation */
var ribosomalRefFlat: Option[File] = config("ribosome_refflat")
/** Whether to remove rRNA regions or not */
var removeRibosomalReads: Boolean = config("remove_ribosomal_reads", default = false)
/** Whether to do simple variant calling on RNA or not */
var callVariants: Boolean = config("call_variants", default = false)
/** Default pipeline config */
override def defaults = Map(
"merge_strategy" -> "preprocessmergesam",
"gsnap" -> Map(
"novelsplicing" -> 1,
"batch" -> 4,
"format" -> "sam"
),
"bammetrics" -> Map(
"transcript_refflat" -> annotationRefFlat,
"collectrnaseqmetrics" -> ((if (strandProtocol != null) Map(
"strand_specificity" -> (strandProtocol match {
case NonSpecific => StrandSpecificity.NONE.toString
case Dutp => StrandSpecificity.SECOND_READ_TRANSCRIPTION_STRAND.toString
case otherwise => throw new IllegalStateException(otherwise.toString)
})
)
else Map()) ++ (if (ribosomalRefFlat != null) ribosomalRefFlat.map("ribosomal_intervals" -> _.getAbsolutePath).toList else Nil))
),
"cutadapt" -> Map("minimum_length" -> 20),
// avoid conflicts when merging since the MarkDuplicate tags often cause merges to fail
"picard" -> Map(
"programrecordid" -> "null"
),
// disable markduplicates since it may not play well with all aligners (this can still be overriden via config)
"mapping" -> Map(
"skip_markduplicates" -> true,
"skip_metrics" -> true
)
)
/** Adds output merge jobs for the given expression mode */
// TODO: can we combine the enum with the file extension (to reduce duplication and potential errors)
private def makeMergeTableJob(inFunc: (Sample => Option[File]), ext: String, idCols: List[Int], valCol: Int,
numHeaderLines: Int = 0, outBaseName: String = "all_samples",
fallback: String = "-"): Option[MergeTables] = {
val tables = samples.values.map { inFunc }.toList.flatten
tables.nonEmpty
.option {
val job = new MergeTables(qscript)
job.inputTables = tables
job.output = new File(outputDir, "expression_estimates" + File.separator + outBaseName + ext)
job.idColumnIndices = idCols.map(_.toString)
job.valueColumnIndex = valCol
job.fileExtension = Option(ext)
job.fallbackString = Option(fallback)
job.numHeaderLines = Option(numHeaderLines)
// TODO: separate the addition into another function?
job
}
}
/** Expression measures which are subject to TMM normalization during correlation calculation */
protected lazy val forTmmNormalization: Set[ExpMeasures.Value] =
Set(FragmentsPerGene, FragmentsPerExon, BasesPerGene, BasesPerExon)
/** Returns a QFunction to generate heatmaps */
private def makeHeatmapJob(mergeJob: Option[MergeTables], outName: String,
expMeasure: ExpMeasures.Value, isCuffIsoform: Boolean = false): Option[PlotHeatmap] =
(mergeJob.isDefined && samples.size > 2)
.option {
val job = new PlotHeatmap(qscript)
job.input = mergeJob.get.output
job.output = new File(outputDir, "heatmaps" + File.separator + s"heatmap_$outName.png")
job.tmmNormalize = forTmmNormalization.contains(expMeasure)
job.useLog = job.tmmNormalize
job.countType =
if (expMeasure.toString.startsWith("Cufflinks")) {
if (isCuffIsoform) Option("CufflinksIsoform")
else Option("CufflinksGene")
} else Option(expMeasure.toString)
job
}
/** Merged gene fragment count table */
private lazy val geneFragmentsCountJob =
makeMergeTableJob((s: Sample) => s.geneFragmentsCount, ".fragments_per_gene", List(1), 2, numHeaderLines = 0,
fallback = "0")
/** Heatmap job for gene fragment count */
private lazy val geneFragmentsCountHeatmapJob =
makeHeatmapJob(geneFragmentsCountJob, "fragments_per_gene", FragmentsPerGene)
/** Merged exon fragment count table */
private lazy val exonFragmentsCountJob =
makeMergeTableJob((s: Sample) => s.exonFragmentsCount, ".fragments_per_exon", List(1), 2, numHeaderLines = 0,
fallback = "0")
/** Heatmap job for exon fragment count */
private lazy val exonFragmentsCountHeatmapJob =
makeHeatmapJob(exonFragmentsCountJob, "fragments_per_exon", FragmentsPerExon)
/** Merged gene base count table */
private lazy val geneBasesCountJob =
makeMergeTableJob((s: Sample) => s.geneBasesCount, ".bases_per_gene", List(1), 2, numHeaderLines = 1,
fallback = "0")
/** Heatmap job for gene base count */
private lazy val geneBasesCountHeatmapJob =
makeHeatmapJob(geneBasesCountJob, "bases_per_gene", BasesPerGene)
/** Merged exon base count table */
private lazy val exonBasesCountJob =
makeMergeTableJob((s: Sample) => s.exonBasesCount, ".bases_per_exon", List(1), 2, numHeaderLines = 1,
fallback = "0")
/** Heatmap job for exon base count */
private lazy val exonBasesCountHeatmapJob =
makeHeatmapJob(exonBasesCountJob, "bases_per_exon", BasesPerExon)
/** Merged gene FPKM table for Cufflinks, strict mode */
private lazy val geneFpkmCufflinksStrictJob =
makeMergeTableJob((s: Sample) => s.geneFpkmCufflinksStrict, ".genes_fpkm_cufflinks_strict", List(1, 7), 10,
numHeaderLines = 1, fallback = "0.0")
/** Heatmap job for gene FPKM Cufflinks, strict mode */
private lazy val geneFpkmCufflinksStrictHeatmapJob =
makeHeatmapJob(geneFpkmCufflinksStrictJob, "genes_fpkm_cufflinks_strict", CufflinksStrict)
/** Merged exon FPKM table for Cufflinks, strict mode */
private lazy val isoFpkmCufflinksStrictJob =
makeMergeTableJob((s: Sample) => s.isoformFpkmCufflinksStrict, ".isoforms_fpkm_cufflinks_strict", List(1, 7), 10,
numHeaderLines = 1, fallback = "0.0")
/** Heatmap job for isoform FPKM Cufflinks, strict mode */
private lazy val isoFpkmCufflinksStrictHeatmapJob =
makeHeatmapJob(isoFpkmCufflinksStrictJob, "isoforms_fpkm_cufflinks_strict", CufflinksStrict, isCuffIsoform = true)
/** Merged gene FPKM table for Cufflinks, guided mode */
private lazy val geneFpkmCufflinksGuidedJob =
makeMergeTableJob((s: Sample) => s.geneFpkmCufflinksGuided, ".genes_fpkm_cufflinks_guided", List(1, 7), 10,
numHeaderLines = 1, fallback = "0.0")
/** Heatmap job for gene FPKM Cufflinks, guided mode */
private lazy val geneFpkmCufflinksGuidedHeatmapJob =
makeHeatmapJob(geneFpkmCufflinksGuidedJob, "genes_fpkm_cufflinks_guided", CufflinksGuided)
/** Merged isoforms FPKM table for Cufflinks, guided mode */
private lazy val isoFpkmCufflinksGuidedJob =
makeMergeTableJob((s: Sample) => s.isoformFpkmCufflinksGuided, ".isoforms_fpkm_cufflinks_guided", List(1, 7), 10,
numHeaderLines = 1, fallback = "0.0")
/** Heatmap job for isoform FPKM Cufflinks, guided mode */
private lazy val isoFpkmCufflinksGuidedHeatmapJob =
makeHeatmapJob(isoFpkmCufflinksGuidedJob, "isoforms_fpkm_cufflinks_guided", CufflinksGuided, isCuffIsoform = true)
/** Merged gene FPKM table for Cufflinks, blind mode */
private lazy val geneFpkmCufflinksBlindJob =
makeMergeTableJob((s: Sample) => s.geneFpkmCufflinksBlind, ".genes_fpkm_cufflinks_blind", List(1, 7), 10,
numHeaderLines = 1, fallback = "0.0")
/** Heatmap job for gene FPKM Cufflinks, blind mode */
private lazy val geneFpkmCufflinksBlindHeatmapJob =
makeHeatmapJob(geneFpkmCufflinksBlindJob, "genes_fpkm_cufflinks_blind", CufflinksBlind)
/** Merged isoforms FPKM table for Cufflinks, blind mode */
private lazy val isoFpkmCufflinksBlindJob =
makeMergeTableJob((s: Sample) => s.isoformFpkmCufflinksBlind, ".isoforms_fpkm_cufflinks_blind", List(1, 7), 10,
numHeaderLines = 1, fallback = "0.0")
/** Heatmap job for isoform FPKM Cufflinks, blind mode */
private lazy val isoFpkmCufflinksBlindHeatmapJob =
makeHeatmapJob(isoFpkmCufflinksBlindJob, "isoforms_fpkm_cufflinks_blind", CufflinksBlind, isCuffIsoform = true)
/** Container for merge table jobs */
private lazy val mergeTableJobs: Map[String, Option[MergeTables]] = Map(
"gene_fragments_count" -> geneFragmentsCountJob,
"exon_fragments_count" -> exonFragmentsCountJob,
"gene_bases_count" -> geneBasesCountJob,
"exon_bases_count" -> exonBasesCountJob,
"gene_fpkm_cufflinks_strict" -> geneFpkmCufflinksStrictJob,
"isoform_fpkm_cufflinks_strict" -> isoFpkmCufflinksStrictJob,
"gene_fpkm_cufflinks_guided" -> geneFpkmCufflinksGuidedJob,
"isoform_fpkm_cufflinks_guided" -> isoFpkmCufflinksGuidedJob,
"gene_fpkm_cufflinks_blind" -> geneFpkmCufflinksBlindJob,
"isoform_fpkm_cufflinks_blind" -> isoFpkmCufflinksBlindJob
)
/** Container for heatmap jobs */
private lazy val heatmapJobs: Map[String, Option[PlotHeatmap]] = Map(
"gene_fragments_count_heatmap" -> geneFragmentsCountHeatmapJob,
"exon_fragments_count_heatmap" -> exonFragmentsCountHeatmapJob,
"gene_bases_count_heatmap" -> geneBasesCountHeatmapJob,
"exon_bases_count_heatmap" -> exonBasesCountHeatmapJob,
"gene_fpkm_cufflinks_strict_heatmap" -> geneFpkmCufflinksStrictHeatmapJob,
"isoform_fpkm_cufflinks_strict_heatmap" -> isoFpkmCufflinksStrictHeatmapJob,
"gene_fpkm_cufflinks_guided_heatmap" -> geneFpkmCufflinksGuidedHeatmapJob,
"isoform_fpkm_cufflinks_guided_heatmap" -> isoFpkmCufflinksGuidedHeatmapJob,
"gene_fpkm_cufflinks_blind_heatmap" -> geneFpkmCufflinksBlindHeatmapJob,
"isoform_fpkm_cufflinks_blind_heatmap" -> isoFpkmCufflinksBlindHeatmapJob
)
/** Output summary file */
def summaryFile: File = new File(outputDir, "gentrap.summary.json")
/** Files that will be listed in the summary file */
override def summaryFiles: Map[String, File] = super.summaryFiles ++ Map(
"annotation_refflat" -> annotationRefFlat
) ++ Map(
"annotation_gtf" -> annotationGtf,
"annotation_bed" -> annotationBed,
"ribosome_refflat" -> ribosomalRefFlat
).collect { case (key, Some(value)) => key -> value } ++
mergeTableJobs.collect { case (key, Some(value)) => key -> value.output } ++
heatmapJobs.collect { case (key, Some(value)) => key -> value.output }
/** Statistics shown in the summary file */
def summaryStats: Map[String, Any] = Map()
/** Pipeline settings shown in the summary file */
override def summarySettings: Map[String, Any] = super.summarySettings ++ Map(
"aligner" -> aligner,
"expression_measures" -> expMeasures.toList.map(_.toString),
"strand_protocol" -> strandProtocol.toString,
"call_variants" -> callVariants,
"remove_ribosomal_reads" -> removeRibosomalReads,
"version" -> FullVersion
)
/** Job for writing PDF report template */
protected lazy val pdfTemplateJob: PdfReportTemplateWriter = {
val job = new PdfReportTemplateWriter(qscript)
job.summaryFile = summaryFile
job.output = new File(outputDir, "gentrap_report.tex")
job
}
/** Job for writing PDF report */
protected def pdfReportJob: Pdflatex = {
val job = new Pdflatex(qscript)
job.input = pdfTemplateJob.output
job.outputDir = new File(outputDir, "report")
job.name = "gentrap_report"
job
}
/** Steps to run before biopetScript */
override def init(): Unit = {
super.init()
// TODO: validate that exons are flattened or not (depending on another option flag?)
// validate required annotation files
if (expMeasures.contains(FragmentsPerGene) && annotationGtf.isEmpty)
Logging.addError("GTF file must be defined for counting fragments per gene, config key: 'annotation_gtf'")
if (expMeasures.contains(FragmentsPerExon) && annotationGtf.isEmpty)
Logging.addError("GTF file must be defined for counting fragments per exon, config key: 'annotation_gtf'")
// TODO: validate that GTF file contains exon features
if (expMeasures.contains(BasesPerGene) && annotationBed.isEmpty)
Logging.addError("BED file must be defined for counting bases per gene, config key: 'annotation_bed'")
if (expMeasures.contains(BasesPerExon) && annotationBed.isEmpty)
Logging.addError("BED file must be defined for counting bases per exon, config key: 'annotation_bed'")
if ((expMeasures.contains(CufflinksBlind) || expMeasures.contains(CufflinksGuided) || expMeasures.contains(CufflinksStrict)) && annotationGtf.isEmpty)
Logging.addError("GTF file must be defined for Cufflinks-based modes, config key: 'annotation_gtf'")
if (removeRibosomalReads && ribosomalRefFlat.isEmpty)
Logging.addError("rRNA intervals must be supplied if removeRibosomalReads is set, config key: 'ribosome_refflat'")
annotationGtf.foreach(inputFiles :+= new InputFile(_))
annotationBed.foreach(inputFiles :+= new InputFile(_))
ribosomalRefFlat.foreach(inputFiles :+= new InputFile(_))
if (annotationRefFlat.getName.nonEmpty) inputFiles :+= new InputFile(annotationRefFlat)
}
/** Pipeline run for multiple samples */
override def addMultiSampleJobs(): Unit = {
super.addMultiSampleJobs
// merge expression tables
mergeTableJobs.values.foreach { case maybeJob => maybeJob.foreach(add(_)) }
// add heatmap jobs
heatmapJobs.values.foreach { case maybeJob => maybeJob.foreach(add(_)) }
// plot heatmap for each expression measure if sample is > 1
if (samples.size > 1) {
geneFragmentsCountJob
}
// TODO: use proper notation
//add(pdfTemplateJob)
//add(pdfReportJob)
}
/** Returns a [[Sample]] object */
override def makeSample(sampleId: String): Sample = new Sample(sampleId)
/**
* Gentrap sample
*
* @param sampleId Unique identifier of the sample
*/
class Sample(sampleId: String) extends super.Sample(sampleId) with CufflinksProducer {
/** Shortcut to qscript object */
protected def pipeline: Gentrap = qscript
/** Summary stats of the sample */
override def summaryStats: Map[String, Any] = super.summaryStats ++ Map(
"all_paired" -> allPaired,
"all_single" -> allSingle
)
/** Summary files of the sample */
override def summaryFiles: Map[String, File] = super.summaryFiles ++ Map(
"alignment" -> alnFile
) ++ Map(
"gene_fragments_count" -> geneFragmentsCount,
"exon_fragments_count" -> exonFragmentsCount,
"gene_bases_count" -> geneBasesCount,
"exon_bases_count" -> exonBasesCount,
"gene_fpkm_cufflinks_strict" -> cufflinksStrictJobSet.collect { case js => js.geneFpkmJob.output },
"isoform_fpkm_cufflinks_strict" -> cufflinksStrictJobSet.collect { case js => js.isoformFpkmJob.output },
"gene_fpkm_cufflinks_guided" -> cufflinksGuidedJobSet.collect { case js => js.geneFpkmJob.output },
"isoform_fpkm_cufflinks_guided" -> cufflinksGuidedJobSet.collect { case js => js.isoformFpkmJob.output },
"gene_fpkm_cufflinks_blind" -> cufflinksBlindJobSet.collect { case js => js.geneFpkmJob.output },
"isoform_fpkm_cufflinks_blind" -> cufflinksBlindJobSet.collect { case js => js.isoformFpkmJob.output },
"variant_calls" -> variantCalls
).collect { case (key, Some(value)) => key -> value }
/** Per-sample alignment file, post rRNA cleanup (if chosen) */
lazy val alnFile: File = wipeJob match {
case Some(j) => j.outputBam
case None => preProcessBam.get
}
/** Read count per gene file */
def geneFragmentsCount: Option[File] = fragmentsPerGeneJob
.collect { case job => job.output }
/** Read count per exon file */
def exonFragmentsCount: Option[File] = fragmentsPerExonJob
.collect { case job => job.output }
/** Base count per gene file */
def geneBasesCount: Option[File] = basesPerGeneJob
.collect { case job => job.output }
/** Base count per exon file */
def exonBasesCount: Option[File] = basesPerExonJob
.collect { case job => job.output }
/** JobSet for Cufflinks strict mode */
protected lazy val cufflinksStrictJobSet: Option[CufflinksJobSet] = expMeasures
.find(_ == CufflinksStrict)
.collect { case found => new CufflinksJobSet(found) }
/** Gene tracking file from Cufflinks strict mode */
def geneFpkmCufflinksStrict: Option[File] = cufflinksStrictJobSet
.collect { case jobSet => jobSet.geneFpkmJob.output }
/** Isoforms tracking file from Cufflinks strict mode */
def isoformFpkmCufflinksStrict: Option[File] = cufflinksStrictJobSet
.collect { case jobSet => jobSet.isoformFpkmJob.output }
/** JobSet for Cufflinks strict mode */
protected lazy val cufflinksGuidedJobSet: Option[CufflinksJobSet] = expMeasures
.find(_ == CufflinksGuided)
.collect { case found => new CufflinksJobSet(found) }
/** Gene tracking file from Cufflinks guided mode */
def geneFpkmCufflinksGuided: Option[File] = cufflinksGuidedJobSet
.collect { case jobSet => jobSet.geneFpkmJob.output }
/** Isoforms tracking file from Cufflinks guided mode */
def isoformFpkmCufflinksGuided: Option[File] = cufflinksGuidedJobSet
.collect { case jobSet => jobSet.isoformFpkmJob.output }
/** JobSet for Cufflinks blind mode */
protected lazy val cufflinksBlindJobSet: Option[CufflinksJobSet] = expMeasures
.find(_ == CufflinksBlind)
.collect { case found => new CufflinksJobSet(found) }
/** Gene tracking file from Cufflinks guided mode */
def geneFpkmCufflinksBlind: Option[File] = cufflinksBlindJobSet
.collect { case jobSet => jobSet.geneFpkmJob.output }
/** Isoforms tracking file from Cufflinks blind mode */
def isoformFpkmCufflinksBlind: Option[File] = cufflinksBlindJobSet
.collect { case jobSet => jobSet.isoformFpkmJob.output }
/** Raw variant calling file */
def variantCalls: Option[File] = varCallJob
.collect { case job => job.output }
/** ID-sorting job for HTseq-count jobs */
private def idSortingJob: Option[SortSam] = (expMeasures.contains(FragmentsPerExon) || expMeasures.contains(FragmentsPerGene))
.option {
val job = new SortSam(qscript)
job.input = alnFile
job.output = createFile(".idsorted.bam")
job.sortOrder = "queryname"
job.isIntermediate = true
job
}
/** Read counting job per gene */
private def fragmentsPerGeneJob: Option[HtseqCount] = expMeasures
.contains(FragmentsPerGene)
.option {
require(idSortingJob.nonEmpty)
val job = new HtseqCount(qscript)
annotationGtf.foreach(job.inputAnnotation = _)
job.inputAlignment = idSortingJob.get.output
job.output = createFile(".fragments_per_gene")
job.format = Option("bam")
// We are forcing the sort order to be ID-sorted, since HTSeq-count often chokes when using position-sorting due
// to its buffer not being large enough.
job.order = Option("name")
job.stranded = strandProtocol match {
case NonSpecific => Option("no")
case Dutp => Option("reverse")
case _ => throw new IllegalStateException
}
job
}
/** Read counting job per exon */
private def fragmentsPerExonJob: Option[HtseqCount] = expMeasures
.contains(FragmentsPerExon)
.option {
require(idSortingJob.nonEmpty)
val job = new HtseqCount(qscript)
job.inputAnnotation = annotationGtf.get
job.inputAlignment = idSortingJob.get.output
job.output = createFile(".fragments_per_exon")
job.format = Option("bam")
job.order = Option("name")
job.stranded = strandProtocol match {
case NonSpecific => Option("no")
case Dutp => Option("reverse")
case _ => throw new IllegalStateException
}
// TODO: ensure that the "exon_id" attributes exist for all exons in the GTF
job.idattr = Option("exon_id")
job
}
/** Container for strand-separation jobs */
private case class StrandSeparationJobSet(pair1Job: SamtoolsView, pair2Job: Option[SamtoolsView],
combineJob: QFunction { def output: File }) {
def addAllJobs(): Unit = {
add(pair1Job); pair2Job.foreach(add(_)); add(combineJob)
}
}
/** Alignment file of reads from the plus strand, only defined when run is strand-specific */
def alnFilePlusStrand: Option[File] = alnPlusStrandJobs
.collect { case jobSet => jobSet.combineJob.output }
/** Jobs for generating reads from the plus strand, only defined when run is strand-specific */
private def alnPlusStrandJobs: Option[StrandSeparationJobSet] = strandProtocol match {
case Dutp =>
val r2Job = this.allPaired
.option {
val job = new SamtoolsView(qscript)
job.input = alnFile
job.b = true
job.h = true
job.f = List("0x80")
job.F = List("0x10")
job.output = createFile(".r2.bam")
job.isIntermediate = true
job
}
val f1Job = new SamtoolsView(qscript)
f1Job.input = alnFile
f1Job.b = true
f1Job.h = true
f1Job.f = if (this.allSingle) List("0x10") else List("0x50")
f1Job.output = createFile(".f1.bam")
// since we are symlinking if the other pair does not exist,
// we want to keep this job as non-intermediate as well
f1Job.isIntermediate = r2Job.isDefined
val perStrandFiles = r2Job match {
case Some(r2j) => List(f1Job.output, r2j.output)
case None => List(f1Job.output)
}
val combineJob = makeCombineJob(perStrandFiles, createFile(".plus_strand.bam"))
Option(StrandSeparationJobSet(f1Job, r2Job, combineJob.alnJob))
case NonSpecific => None
case _ => throw new IllegalStateException
}
/** Alignment file of reads from the minus strand, only defined when run is strand-specific */
def alnFileMinusStrand: Option[File] = alnMinusStrandJobs
.collect { case jobSet => jobSet.combineJob.output }
/** Jobs for generating reads from the minus, only defined when run is strand-specific */
private def alnMinusStrandJobs: Option[StrandSeparationJobSet] = strandProtocol match {
case Dutp =>
val r1Job = this.allPaired
.option {
val job = new SamtoolsView(qscript)
job.input = alnFile
job.b = true
job.h = true
job.f = List("0x40")
job.F = List("0x10")
job.output = createFile(".r1.bam")
job.isIntermediate = true
job
}
val f2Job = new SamtoolsView(qscript)
f2Job.input = alnFile
f2Job.b = true
f2Job.h = true
f2Job.output = createFile(".f2.bam")
// since we are symlinking if the other pair does not exist,
// we want to keep this job as non-intermediate as well
f2Job.isIntermediate = r1Job.isDefined
if (this.allSingle) f2Job.F = List("0x10")
else f2Job.f = List("0x90")
val perStrandFiles = r1Job match {
case Some(r1j) => List(f2Job.output, r1j.output)
case None => List(f2Job.output)
}
val combineJob = makeCombineJob(perStrandFiles, createFile(".minus_strand.bam"))
Option(StrandSeparationJobSet(f2Job, r1Job, combineJob.alnJob))
case NonSpecific => None
case _ => throw new IllegalStateException
}
/** Raw base counting job */
private def rawBaseCountJob: Option[RawBaseCounter] = strandProtocol match {
case NonSpecific =>
(expMeasures.contains(BasesPerExon) || expMeasures.contains(BasesPerGene))
.option {
val job = new RawBaseCounter(qscript)
job.inputBoth = alnFile
annotationBed.foreach(job.annotationBed = _)
job.output = createFile(".raw_base_count")
job
}
case Dutp =>
(expMeasures.contains(BasesPerExon) || expMeasures.contains(BasesPerGene))
.option {
require(alnFilePlusStrand.isDefined && alnFileMinusStrand.isDefined)
val job = new RawBaseCounter(qscript)
job.inputPlus = alnFilePlusStrand.get
job.inputMinus = alnFileMinusStrand.get
annotationBed.foreach(job.annotationBed = _)
job.output = createFile(".raw_base_count")
job
}
case _ => throw new IllegalStateException
}
/** Base counting job per gene */
private def basesPerGeneJob: Option[AggrBaseCount] = expMeasures
.contains(BasesPerGene)
.option {
require(rawBaseCountJob.nonEmpty)
val job = new AggrBaseCount(qscript)
job.input = rawBaseCountJob.get.output
job.output = createFile(".bases_per_gene")
job.inputLabel = sampleId
job.mode = "gene"
job
}
/** Base counting job per exon */
private def basesPerExonJob: Option[AggrBaseCount] = expMeasures
.contains(BasesPerExon)
.option {
require(rawBaseCountJob.nonEmpty)
val job = new AggrBaseCount(qscript)
job.input = rawBaseCountJob.get.output
job.output = createFile(".bases_per_exon")
job.inputLabel = sampleId
job.mode = "exon"
job
}
/** Variant calling job */
private def varCallJob: Option[CustomVarScan] = callVariants
.option {
val job = new CustomVarScan(qscript)
job.input = alnFile
job.output = createFile(".raw.vcf.gz")
job
}
/** Job for removing ribosomal reads */
private def wipeJob: Option[WipeReads] = removeRibosomalReads
.option {
//require(ribosomalRefFlat.isDefined)
val job = new WipeReads(qscript)
job.inputBam = bamFile.get
ribosomalRefFlat.foreach(job.intervalFile = _)
job.outputBam = createFile(".cleaned.bam")
job.discardedBam = createFile(".rrna.bam")
job
}
/** Super type of Ln and MergeSamFiles */
case class CombineFileJobSet(alnJob: QFunction { def output: File }, idxJob: Option[Ln]) {
/** Adds all jobs in this jobset */
def addAll(): Unit = { add(alnJob); idxJob.foreach(add(_)) }
}
/** Ln or MergeSamFile job, depending on how many inputs are supplied */
private def makeCombineJob(inFiles: List[File], outFile: File,
mergeSortOrder: String = "coordinate"): CombineFileJobSet = {
require(inFiles.nonEmpty, "At least one input files required for combine job")
if (inFiles.size == 1) {
val jobBam = new Ln(qscript)
jobBam.input = inFiles.head.getAbsoluteFile
jobBam.output = outFile
val jobIdx = new Ln(qscript)
jobIdx.input = swapExt(libraries.values.head.libDir, jobBam.input, ".bam", ".bai")
jobIdx.output = swapExt(sampleDir, jobBam.output, ".bam", ".bai")
CombineFileJobSet(jobBam, Some(jobIdx))
} else {
val job = new MergeSamFiles(qscript)
job.input = inFiles
job.output = outFile
job.sortOrder = mergeSortOrder
CombineFileJobSet(job, None)
}
}
/** Whether all libraries are paired or not */
def allPaired: Boolean = libraries.values.forall(_.mapping.forall(_.input_R2.isDefined))
/** Whether all libraries are single or not */
def allSingle: Boolean = libraries.values.forall(_.mapping.forall(_.input_R2.isEmpty))
// TODO: add warnings or other messages for config values that are hard-coded by the pipeline
/** Adds all jobs for the sample */
override def addJobs(): Unit = {
super.addJobs()
// TODO: this is our requirement since it's easier to calculate base counts when all libraries are either paired or single
require(allPaired || allSingle, s"Sample $sampleId contains only single-end or paired-end libraries")
// merge or symlink per-library alignments
// add bigwig output, also per-strand when possible
addAll(Bam2Wig(qscript, alnFile).functions)
alnFilePlusStrand.collect { case f => addAll(Bam2Wig(qscript, f).functions) }
alnFileMinusStrand.collect { case f => addAll(Bam2Wig(qscript, f).functions) }
// add strand-specific jobs if defined
alnPlusStrandJobs.foreach(_.addAllJobs())
alnMinusStrandJobs.foreach(_.addAllJobs())
// add htseq-count jobs, if defined
idSortingJob.foreach(add(_))
fragmentsPerGeneJob.foreach(add(_))
fragmentsPerExonJob.foreach(add(_))
// add custom base count jobs, if defined
rawBaseCountJob.foreach(add(_))
basesPerGeneJob.foreach(add(_))
basesPerExonJob.foreach(add(_))
// symlink results with distinct extensions ~ actually to make it easier to use MergeTables on these as well
// since the Queue argument parser doesn't play nice with Map[_, _] types
cufflinksStrictJobSet.foreach(_.jobs.foreach(add(_)))
cufflinksGuidedJobSet.foreach(_.jobs.foreach(add(_)))
cufflinksBlindJobSet.foreach(_.jobs.foreach(add(_)))
// add variant calling job if requested
varCallJob.foreach(add(_))
}
}
}
object Gentrap extends PipelineCommand {
/** Enumeration of available expression measures */
object ExpMeasures extends Enumeration {
val FragmentsPerGene, FragmentsPerExon, BasesPerGene, BasesPerExon, CufflinksStrict, CufflinksGuided, CufflinksBlind = Value
//Cuffquant,
//Rsem = Value
}
/** Enumeration of available strandedness */
object StrandProtocol extends Enumeration {
// for now, only non-strand specific and dUTP stranded protocol is supported
// TODO: other strandedness protocol?
val NonSpecific, Dutp = Value
}
/** Converts string with underscores into camel-case strings */
private def camelize(ustring: String): String = ustring
.split("_")
.map(_.toLowerCase.capitalize)
.mkString("")
/** Conversion from raw user-supplied expression measure string to enum value */
private def makeExpMeasure(rawName: String): Option[ExpMeasures.Value] = {
try {
Some(ExpMeasures.withName(camelize(rawName)))
} catch {
case nse: NoSuchElementException =>
Logging.addError(s"Invalid expression measure: $rawName")
None
case e: Exception => throw e
}
}
/** Conversion from raw user-supplied expression measure string to enum value */
private def makeStrandProtocol(rawName: String): Option[StrandProtocol.Value] = {
try {
Some(StrandProtocol.withName(camelize(rawName)))
} catch {
case nse: NoSuchElementException =>
Logging.addError(s"Invalid strand protocol: $rawName")
None
case e: Exception => throw e
}
}
}