Gentrap.scala 33.5 KB
Newer Older
bow's avatar
bow committed
1
/**
2
3
4
5
 * Biopet is built on top of GATK Queue for building bioinformatic
 * pipelines. It is mainly intended to support LUMC SHARK cluster which is running
 * SGE. But other types of HPC that are supported by GATK Queue (such as PBS)
 * should also be able to execute Biopet tools and pipelines.
bow's avatar
bow committed
6
 *
7
8
9
10
11
12
13
14
 * Copyright 2014 Sequencing Analysis Support Core - Leiden University Medical Center
 *
 * Contact us at: sasc@lumc.nl
 *
 * A dual licensing mode is applied. The source code within this project that are
 * not part of GATK Queue is freely available for non-commercial use under an AGPL
 * license; For commercial users or users who do not want to follow the AGPL
 * license, please contact us to obtain a separate license.
bow's avatar
bow committed
15
 */
bow's avatar
bow committed
16
17
package nl.lumc.sasc.biopet.pipelines.gentrap

18
import java.io.File
bow's avatar
bow committed
19

bow's avatar
bow committed
20
import nl.lumc.sasc.biopet.FullVersion
bow's avatar
bow committed
21
import nl.lumc.sasc.biopet.core._
Peter van 't Hof's avatar
Peter van 't Hof committed
22
import nl.lumc.sasc.biopet.extensions.picard.{ MergeSamFiles, SortSam }
bow's avatar
bow committed
23
import nl.lumc.sasc.biopet.extensions.samtools.SamtoolsView
Peter van 't Hof's avatar
Peter van 't Hof committed
24
25
import nl.lumc.sasc.biopet.extensions.tools.{ MergeTables, WipeReads }
import nl.lumc.sasc.biopet.extensions.{ HtseqCount, Ln }
bow's avatar
bow committed
26
import nl.lumc.sasc.biopet.pipelines.bamtobigwig.Bam2Wig
Peter van 't Hof's avatar
Peter van 't Hof committed
27
28
import nl.lumc.sasc.biopet.pipelines.gentrap.extensions.{ CustomVarScan, Pdflatex, RawBaseCounter }
import nl.lumc.sasc.biopet.pipelines.gentrap.scripts.{ AggrBaseCount, PdfReportTemplateWriter, PlotHeatmap }
29
import nl.lumc.sasc.biopet.pipelines.mapping.MultisampleMappingTrait
30
31
import nl.lumc.sasc.biopet.utils.Logging
import nl.lumc.sasc.biopet.utils.config._
Peter van 't Hof's avatar
Peter van 't Hof committed
32
33
34
35
36
37
import org.broadinstitute.gatk.queue.QScript
import org.broadinstitute.gatk.queue.function.QFunction
import picard.analysis.directed.RnaSeqMetricsCollector.StrandSpecificity

import scala.language.reflectiveCalls
import scalaz.Scalaz._
bow's avatar
bow committed
38

bow's avatar
bow committed
39
40
41
/**
 * Gentrap pipeline
 * Generic transcriptome analysis pipeline
bow's avatar
bow committed
42
 *
43
 * @author Peter van 't Hof <p.j.van_t_hof@lumc.nl>
bow's avatar
bow committed
44
 * @author Wibowo Arindrarto <w.arindrarto@lumc.nl>
bow's avatar
bow committed
45
 */
46
class Gentrap(val root: Configurable) extends QScript
47
  with MultisampleMappingTrait { qscript =>
bow's avatar
bow committed
48

bow's avatar
bow committed
49
  import Gentrap.ExpMeasures._
50
  import Gentrap.StrandProtocol._
Peter van 't Hof's avatar
Peter van 't Hof committed
51
  import Gentrap._
bow's avatar
bow committed
52

53
  // alternative constructor for initialization with empty configuration
bow's avatar
bow committed
54
55
  def this() = this(null)

bow's avatar
bow committed
56
  /** Split aligner to use */
bow's avatar
bow committed
57
58
  var aligner: String = config("aligner", default = "gsnap")

59
  /** Expression measurement modes */
bow's avatar
bow committed
60
  // see the enumeration below for valid modes
61
62
63
64
  var expMeasures: Set[ExpMeasures.Value] = {
    if (config.contains("expression_measures"))
      config("expression_measures")
        .asStringList
65
        .flatMap { makeExpMeasure }
66
67
68
69
70
71
        .toSet
    else {
      Logging.addError("'expression_measures' is missing in the config")
      Set()
    }
  }
72

73
  /** Strandedness modes */
74
75
  var strandProtocol: StrandProtocol.Value = {
    if (config.contains("strand_protocol"))
76
      makeStrandProtocol(config("strand_protocol").asString).getOrElse(StrandProtocol.NonSpecific)
77
78
79
80
81
    else {
      Logging.addError("'strand_protocol' is missing in the config")
      StrandProtocol.NonSpecific
    }
  }
82

83
  /** GTF reference file */
bow's avatar
bow committed
84
  var annotationGtf: Option[File] = config("annotation_gtf")
85
86

  /** BED reference file */
bow's avatar
bow committed
87
  var annotationBed: Option[File] = config("annotation_bed")
88
89

  /** refFlat reference file */
bow's avatar
bow committed
90
  var annotationRefFlat: File = config("annotation_refflat")
bow's avatar
bow committed
91

bow's avatar
bow committed
92
93
94
  /** rRNA refFlat annotation */
  var ribosomalRefFlat: Option[File] = config("ribosome_refflat")

95
96
97
  /** Whether to remove rRNA regions or not */
  var removeRibosomalReads: Boolean = config("remove_ribosomal_reads", default = false)

98
99
  /** Whether to do simple variant calling on RNA or not */
  var callVariants: Boolean = config("call_variants", default = false)
100

bow's avatar
bow committed
101
  /** Default pipeline config */
102
  override def defaults = Map(
103
    "merge_strategy" -> "preprocessmergesam",
Peter van 't Hof's avatar
Peter van 't Hof committed
104
105
106
107
108
    "gsnap" -> Map(
      "novelsplicing" -> 1,
      "batch" -> 4,
      "format" -> "sam"
    ),
Peter van 't Hof's avatar
Peter van 't Hof committed
109
110
111
112
113
114
115
116
117
118
119
    "bammetrics" -> Map(
      "transcript_refflat" -> annotationRefFlat,
      "collectrnaseqmetrics" -> ((if (strandProtocol != null) Map(
        "strand_specificity" -> (strandProtocol match {
          case NonSpecific => StrandSpecificity.NONE.toString
          case Dutp        => StrandSpecificity.SECOND_READ_TRANSCRIPTION_STRAND.toString
          case otherwise   => throw new IllegalStateException(otherwise.toString)
        })
      )
      else Map()) ++ (if (ribosomalRefFlat != null) ribosomalRefFlat.map("ribosomal_intervals" -> _.getAbsolutePath).toList else Nil))
    ),
Peter van 't Hof's avatar
Peter van 't Hof committed
120
121
122
123
124
125
126
127
128
    "cutadapt" -> Map("minimum_length" -> 20),
    // avoid conflicts when merging since the MarkDuplicate tags often cause merges to fail
    "picard" -> Map(
      "programrecordid" -> "null"
    ),
    // disable markduplicates since it may not play well with all aligners (this can still be overriden via config)
    "mapping" -> Map(
      "skip_markduplicates" -> true,
      "skip_metrics" -> true
129
    )
Peter van 't Hof's avatar
Peter van 't Hof committed
130
  )
131

132
133
134
  /** Adds output merge jobs for the given expression mode */
  // TODO: can we combine the enum with the file extension (to reduce duplication and potential errors)
  private def makeMergeTableJob(inFunc: (Sample => Option[File]), ext: String, idCols: List[Int], valCol: Int,
135
                                numHeaderLines: Int = 0, outBaseName: String = "all_samples",
136
                                fallback: String = "-"): Option[MergeTables] = {
137
138
139
    val tables = samples.values.map { inFunc }.toList.flatten
    tables.nonEmpty
      .option {
bow's avatar
bow committed
140
141
        val job = new MergeTables(qscript)
        job.inputTables = tables
142
        job.output = new File(outputDir, "expression_estimates" + File.separator + outBaseName + ext)
bow's avatar
bow committed
143
144
145
        job.idColumnIndices = idCols.map(_.toString)
        job.valueColumnIndex = valCol
        job.fileExtension = Option(ext)
146
        job.fallbackString = Option(fallback)
147
        job.numHeaderLines = Option(numHeaderLines)
bow's avatar
bow committed
148
149
150
        // TODO: separate the addition into another function?
        job
      }
151
152
  }

bow's avatar
bow committed
153
  /** Expression measures which are subject to TMM normalization during correlation calculation */
bow's avatar
bow committed
154
155
156
  protected lazy val forTmmNormalization: Set[ExpMeasures.Value] =
    Set(FragmentsPerGene, FragmentsPerExon, BasesPerGene, BasesPerExon)

bow's avatar
bow committed
157
  /** Returns a QFunction to generate heatmaps */
bow's avatar
bow committed
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
  private def makeHeatmapJob(mergeJob: Option[MergeTables], outName: String,
                             expMeasure: ExpMeasures.Value, isCuffIsoform: Boolean = false): Option[PlotHeatmap] =
    (mergeJob.isDefined && samples.size > 2)
      .option {
        val job = new PlotHeatmap(qscript)
        job.input = mergeJob.get.output
        job.output = new File(outputDir, "heatmaps" + File.separator + s"heatmap_$outName.png")
        job.tmmNormalize = forTmmNormalization.contains(expMeasure)
        job.useLog = job.tmmNormalize
        job.countType =
          if (expMeasure.toString.startsWith("Cufflinks")) {
            if (isCuffIsoform) Option("CufflinksIsoform")
            else Option("CufflinksGene")
          } else Option(expMeasure.toString)
        job
      }

175
  /** Merged gene fragment count table */
bow's avatar
bow committed
176
  private lazy val geneFragmentsCountJob =
177
178
    makeMergeTableJob((s: Sample) => s.geneFragmentsCount, ".fragments_per_gene", List(1), 2, numHeaderLines = 0,
      fallback = "0")
179

bow's avatar
bow committed
180
  /** Heatmap job for gene fragment count */
bow's avatar
bow committed
181
182
  private lazy val geneFragmentsCountHeatmapJob =
    makeHeatmapJob(geneFragmentsCountJob, "fragments_per_gene", FragmentsPerGene)
bow's avatar
bow committed
183

184
  /** Merged exon fragment count table */
bow's avatar
bow committed
185
  private lazy val exonFragmentsCountJob =
186
187
    makeMergeTableJob((s: Sample) => s.exonFragmentsCount, ".fragments_per_exon", List(1), 2, numHeaderLines = 0,
      fallback = "0")
188

bow's avatar
bow committed
189
  /** Heatmap job for exon fragment count */
bow's avatar
bow committed
190
191
192
  private lazy val exonFragmentsCountHeatmapJob =
    makeHeatmapJob(exonFragmentsCountJob, "fragments_per_exon", FragmentsPerExon)

193
  /** Merged gene base count table */
bow's avatar
bow committed
194
  private lazy val geneBasesCountJob =
195
196
    makeMergeTableJob((s: Sample) => s.geneBasesCount, ".bases_per_gene", List(1), 2, numHeaderLines = 1,
      fallback = "0")
197

bow's avatar
bow committed
198
  /** Heatmap job for gene base count */
bow's avatar
bow committed
199
200
201
  private lazy val geneBasesCountHeatmapJob =
    makeHeatmapJob(geneBasesCountJob, "bases_per_gene", BasesPerGene)

202
  /** Merged exon base count table */
bow's avatar
bow committed
203
  private lazy val exonBasesCountJob =
204
205
    makeMergeTableJob((s: Sample) => s.exonBasesCount, ".bases_per_exon", List(1), 2, numHeaderLines = 1,
      fallback = "0")
206

bow's avatar
bow committed
207
  /** Heatmap job for exon base count */
bow's avatar
bow committed
208
209
210
  private lazy val exonBasesCountHeatmapJob =
    makeHeatmapJob(exonBasesCountJob, "bases_per_exon", BasesPerExon)

211
  /** Merged gene FPKM table for Cufflinks, strict mode */
bow's avatar
bow committed
212
  private lazy val geneFpkmCufflinksStrictJob =
213
214
    makeMergeTableJob((s: Sample) => s.geneFpkmCufflinksStrict, ".genes_fpkm_cufflinks_strict", List(1, 7), 10,
      numHeaderLines = 1, fallback = "0.0")
bow's avatar
bow committed
215

bow's avatar
bow committed
216
  /** Heatmap job for gene FPKM Cufflinks, strict mode */
bow's avatar
bow committed
217
218
  private lazy val geneFpkmCufflinksStrictHeatmapJob =
    makeHeatmapJob(geneFpkmCufflinksStrictJob, "genes_fpkm_cufflinks_strict", CufflinksStrict)
219
220

  /** Merged exon FPKM table for Cufflinks, strict mode */
bow's avatar
bow committed
221
  private lazy val isoFpkmCufflinksStrictJob =
222
223
    makeMergeTableJob((s: Sample) => s.isoformFpkmCufflinksStrict, ".isoforms_fpkm_cufflinks_strict", List(1, 7), 10,
      numHeaderLines = 1, fallback = "0.0")
bow's avatar
bow committed
224

bow's avatar
bow committed
225
  /** Heatmap job for isoform FPKM Cufflinks, strict mode */
bow's avatar
bow committed
226
  private lazy val isoFpkmCufflinksStrictHeatmapJob =
Peter van 't Hof's avatar
Peter van 't Hof committed
227
    makeHeatmapJob(isoFpkmCufflinksStrictJob, "isoforms_fpkm_cufflinks_strict", CufflinksStrict, isCuffIsoform = true)
228
229

  /** Merged gene FPKM table for Cufflinks, guided mode */
bow's avatar
bow committed
230
  private lazy val geneFpkmCufflinksGuidedJob =
231
232
    makeMergeTableJob((s: Sample) => s.geneFpkmCufflinksGuided, ".genes_fpkm_cufflinks_guided", List(1, 7), 10,
      numHeaderLines = 1, fallback = "0.0")
bow's avatar
bow committed
233

bow's avatar
bow committed
234
  /** Heatmap job for gene FPKM Cufflinks, guided mode */
bow's avatar
bow committed
235
236
  private lazy val geneFpkmCufflinksGuidedHeatmapJob =
    makeHeatmapJob(geneFpkmCufflinksGuidedJob, "genes_fpkm_cufflinks_guided", CufflinksGuided)
237
238

  /** Merged isoforms FPKM table for Cufflinks, guided mode */
bow's avatar
bow committed
239
  private lazy val isoFpkmCufflinksGuidedJob =
240
241
    makeMergeTableJob((s: Sample) => s.isoformFpkmCufflinksGuided, ".isoforms_fpkm_cufflinks_guided", List(1, 7), 10,
      numHeaderLines = 1, fallback = "0.0")
bow's avatar
bow committed
242

bow's avatar
bow committed
243
  /** Heatmap job for isoform FPKM Cufflinks, guided mode */
bow's avatar
bow committed
244
  private lazy val isoFpkmCufflinksGuidedHeatmapJob =
Peter van 't Hof's avatar
Peter van 't Hof committed
245
    makeHeatmapJob(isoFpkmCufflinksGuidedJob, "isoforms_fpkm_cufflinks_guided", CufflinksGuided, isCuffIsoform = true)
246
247

  /** Merged gene FPKM table for Cufflinks, blind mode */
bow's avatar
bow committed
248
  private lazy val geneFpkmCufflinksBlindJob =
249
250
    makeMergeTableJob((s: Sample) => s.geneFpkmCufflinksBlind, ".genes_fpkm_cufflinks_blind", List(1, 7), 10,
      numHeaderLines = 1, fallback = "0.0")
bow's avatar
bow committed
251

bow's avatar
bow committed
252
  /** Heatmap job for gene FPKM Cufflinks, blind mode */
bow's avatar
bow committed
253
254
  private lazy val geneFpkmCufflinksBlindHeatmapJob =
    makeHeatmapJob(geneFpkmCufflinksBlindJob, "genes_fpkm_cufflinks_blind", CufflinksBlind)
255
256

  /** Merged isoforms FPKM table for Cufflinks, blind mode */
bow's avatar
bow committed
257
  private lazy val isoFpkmCufflinksBlindJob =
258
259
    makeMergeTableJob((s: Sample) => s.isoformFpkmCufflinksBlind, ".isoforms_fpkm_cufflinks_blind", List(1, 7), 10,
      numHeaderLines = 1, fallback = "0.0")
bow's avatar
bow committed
260

bow's avatar
bow committed
261
  /** Heatmap job for isoform FPKM Cufflinks, blind mode */
bow's avatar
bow committed
262
  private lazy val isoFpkmCufflinksBlindHeatmapJob =
Peter van 't Hof's avatar
Peter van 't Hof committed
263
    makeHeatmapJob(isoFpkmCufflinksBlindJob, "isoforms_fpkm_cufflinks_blind", CufflinksBlind, isCuffIsoform = true)
264

bow's avatar
bow committed
265
  /** Container for merge table jobs */
bow's avatar
bow committed
266
  private lazy val mergeTableJobs: Map[String, Option[MergeTables]] = Map(
bow's avatar
bow committed
267
268
269
270
271
    "gene_fragments_count" -> geneFragmentsCountJob,
    "exon_fragments_count" -> exonFragmentsCountJob,
    "gene_bases_count" -> geneBasesCountJob,
    "exon_bases_count" -> exonBasesCountJob,
    "gene_fpkm_cufflinks_strict" -> geneFpkmCufflinksStrictJob,
bow's avatar
bow committed
272
    "isoform_fpkm_cufflinks_strict" -> isoFpkmCufflinksStrictJob,
bow's avatar
bow committed
273
    "gene_fpkm_cufflinks_guided" -> geneFpkmCufflinksGuidedJob,
bow's avatar
bow committed
274
275
276
    "isoform_fpkm_cufflinks_guided" -> isoFpkmCufflinksGuidedJob,
    "gene_fpkm_cufflinks_blind" -> geneFpkmCufflinksBlindJob,
    "isoform_fpkm_cufflinks_blind" -> isoFpkmCufflinksBlindJob
bow's avatar
bow committed
277
  )
278

bow's avatar
bow committed
279
  /** Container for heatmap jobs */
bow's avatar
bow committed
280
  private lazy val heatmapJobs: Map[String, Option[PlotHeatmap]] = Map(
bow's avatar
bow committed
281
282
283
284
285
286
287
288
289
290
    "gene_fragments_count_heatmap" -> geneFragmentsCountHeatmapJob,
    "exon_fragments_count_heatmap" -> exonFragmentsCountHeatmapJob,
    "gene_bases_count_heatmap" -> geneBasesCountHeatmapJob,
    "exon_bases_count_heatmap" -> exonBasesCountHeatmapJob,
    "gene_fpkm_cufflinks_strict_heatmap" -> geneFpkmCufflinksStrictHeatmapJob,
    "isoform_fpkm_cufflinks_strict_heatmap" -> isoFpkmCufflinksStrictHeatmapJob,
    "gene_fpkm_cufflinks_guided_heatmap" -> geneFpkmCufflinksGuidedHeatmapJob,
    "isoform_fpkm_cufflinks_guided_heatmap" -> isoFpkmCufflinksGuidedHeatmapJob,
    "gene_fpkm_cufflinks_blind_heatmap" -> geneFpkmCufflinksBlindHeatmapJob,
    "isoform_fpkm_cufflinks_blind_heatmap" -> isoFpkmCufflinksBlindHeatmapJob
bow's avatar
bow committed
291
292
  )

bow's avatar
bow committed
293
294
295
296
  /** Output summary file */
  def summaryFile: File = new File(outputDir, "gentrap.summary.json")

  /** Files that will be listed in the summary file */
297
  override def summaryFiles: Map[String, File] = super.summaryFiles ++ Map(
bow's avatar
bow committed
298
299
300
301
302
303
    "annotation_refflat" -> annotationRefFlat
  ) ++ Map(
      "annotation_gtf" -> annotationGtf,
      "annotation_bed" -> annotationBed,
      "ribosome_refflat" -> ribosomalRefFlat
    ).collect { case (key, Some(value)) => key -> value } ++
bow's avatar
bow committed
304
305
      mergeTableJobs.collect { case (key, Some(value)) => key -> value.output } ++
      heatmapJobs.collect { case (key, Some(value)) => key -> value.output }
bow's avatar
bow committed
306
307
308
309
310

  /** Statistics shown in the summary file */
  def summaryStats: Map[String, Any] = Map()

  /** Pipeline settings shown in the summary file */
311
  override def summarySettings: Map[String, Any] = super.summarySettings ++ Map(
bow's avatar
bow committed
312
    "aligner" -> aligner,
bow's avatar
bow committed
313
314
315
316
    "expression_measures" -> expMeasures.toList.map(_.toString),
    "strand_protocol" -> strandProtocol.toString,
    "call_variants" -> callVariants,
    "remove_ribosomal_reads" -> removeRibosomalReads,
bow's avatar
bow committed
317
    "version" -> FullVersion
bow's avatar
bow committed
318
  )
bow's avatar
bow committed
319

320
  /** Job for writing PDF report template */
bow's avatar
bow committed
321
  protected lazy val pdfTemplateJob: PdfReportTemplateWriter = {
322
323
    val job = new PdfReportTemplateWriter(qscript)
    job.summaryFile = summaryFile
bow's avatar
bow committed
324
325
326
327
328
329
330
331
332
333
    job.output = new File(outputDir, "gentrap_report.tex")
    job
  }

  /** Job for writing PDF report */
  protected def pdfReportJob: Pdflatex = {
    val job = new Pdflatex(qscript)
    job.input = pdfTemplateJob.output
    job.outputDir = new File(outputDir, "report")
    job.name = "gentrap_report"
334
335
336
    job
  }

bow's avatar
bow committed
337
  /** Steps to run before biopetScript */
338
339
340
  override def init(): Unit = {
    super.init()

bow's avatar
bow committed
341
    // TODO: validate that exons are flattened or not (depending on another option flag?)
bow's avatar
bow committed
342
    // validate required annotation files
Peter van 't Hof's avatar
Peter van 't Hof committed
343
344
    if (expMeasures.contains(FragmentsPerGene) && annotationGtf.isEmpty)
      Logging.addError("GTF file must be defined for counting fragments per gene, config key: 'annotation_gtf'")
345

Peter van 't Hof's avatar
Peter van 't Hof committed
346
347
348
    if (expMeasures.contains(FragmentsPerExon) && annotationGtf.isEmpty)
      Logging.addError("GTF file must be defined for counting fragments per exon, config key: 'annotation_gtf'")
    // TODO: validate that GTF file contains exon features
bow's avatar
bow committed
349

Peter van 't Hof's avatar
Peter van 't Hof committed
350
351
    if (expMeasures.contains(BasesPerGene) && annotationBed.isEmpty)
      Logging.addError("BED file must be defined for counting bases per gene, config key: 'annotation_bed'")
352

Peter van 't Hof's avatar
Peter van 't Hof committed
353
354
    if (expMeasures.contains(BasesPerExon) && annotationBed.isEmpty)
      Logging.addError("BED file must be defined for counting bases per exon, config key: 'annotation_bed'")
355

Peter van 't Hof's avatar
Peter van 't Hof committed
356
357
    if ((expMeasures.contains(CufflinksBlind) || expMeasures.contains(CufflinksGuided) || expMeasures.contains(CufflinksStrict)) && annotationGtf.isEmpty)
      Logging.addError("GTF file must be defined for Cufflinks-based modes, config key: 'annotation_gtf'")
358

Peter van 't Hof's avatar
Peter van 't Hof committed
359
360
361
362
363
364
365
    if (removeRibosomalReads && ribosomalRefFlat.isEmpty)
      Logging.addError("rRNA intervals must be supplied if removeRibosomalReads is set, config key: 'ribosome_refflat'")

    annotationGtf.foreach(inputFiles :+= new InputFile(_))
    annotationBed.foreach(inputFiles :+= new InputFile(_))
    ribosomalRefFlat.foreach(inputFiles :+= new InputFile(_))
    if (annotationRefFlat.getName.nonEmpty) inputFiles :+= new InputFile(annotationRefFlat)
bow's avatar
bow committed
366
  }
367

bow's avatar
bow committed
368
  /** Pipeline run for multiple samples */
369
370
  override def addMultiSampleJobs(): Unit = {
    super.addMultiSampleJobs
371
    // merge expression tables
bow's avatar
bow committed
372
    mergeTableJobs.values.foreach { case maybeJob => maybeJob.foreach(add(_)) }
bow's avatar
bow committed
373
374
375
376
377
378
    // add heatmap jobs
    heatmapJobs.values.foreach { case maybeJob => maybeJob.foreach(add(_)) }
    // plot heatmap for each expression measure if sample is > 1
    if (samples.size > 1) {
      geneFragmentsCountJob
    }
bow's avatar
bow committed
379
    // TODO: use proper notation
Peter van 't Hof's avatar
Peter van 't Hof committed
380
381
    //add(pdfTemplateJob)
    //add(pdfReportJob)
382
  }
383

bow's avatar
bow committed
384
  /** Returns a [[Sample]] object */
385
  override def makeSample(sampleId: String): Sample = new Sample(sampleId)
bow's avatar
bow committed
386

bow's avatar
bow committed
387
388
389
390
391
  /**
   * Gentrap sample
   *
   * @param sampleId Unique identifier of the sample
   */
392
  class Sample(sampleId: String) extends super.Sample(sampleId) with CufflinksProducer {
393
394
395

    /** Shortcut to qscript object */
    protected def pipeline: Gentrap = qscript
bow's avatar
bow committed
396

397
    /** Summary stats of the sample */
398
    override def summaryStats: Map[String, Any] = super.summaryStats ++ Map(
399
400
401
      "all_paired" -> allPaired,
      "all_single" -> allSingle
    )
402
403

    /** Summary files of the sample */
404
    override def summaryFiles: Map[String, File] = super.summaryFiles ++ Map(
405
      "alignment" -> alnFile
406
    ) ++ Map(
407
408
        "gene_fragments_count" -> geneFragmentsCount,
        "exon_fragments_count" -> exonFragmentsCount,
409
410
        "gene_bases_count" -> geneBasesCount,
        "exon_bases_count" -> exonBasesCount,
411
412
413
414
415
416
        "gene_fpkm_cufflinks_strict" -> cufflinksStrictJobSet.collect { case js => js.geneFpkmJob.output },
        "isoform_fpkm_cufflinks_strict" -> cufflinksStrictJobSet.collect { case js => js.isoformFpkmJob.output },
        "gene_fpkm_cufflinks_guided" -> cufflinksGuidedJobSet.collect { case js => js.geneFpkmJob.output },
        "isoform_fpkm_cufflinks_guided" -> cufflinksGuidedJobSet.collect { case js => js.isoformFpkmJob.output },
        "gene_fpkm_cufflinks_blind" -> cufflinksBlindJobSet.collect { case js => js.geneFpkmJob.output },
        "isoform_fpkm_cufflinks_blind" -> cufflinksBlindJobSet.collect { case js => js.isoformFpkmJob.output },
417
        "variant_calls" -> variantCalls
418
419
      ).collect { case (key, Some(value)) => key -> value }

bow's avatar
bow committed
420
421
    /** Per-sample alignment file, post rRNA cleanup (if chosen) */
    lazy val alnFile: File = wipeJob match {
422
      case Some(j) => j.outputBam
423
      case None    => preProcessBam.get
bow's avatar
bow committed
424
    }
bow's avatar
bow committed
425

426
    /** Read count per gene file */
427
    def geneFragmentsCount: Option[File] = fragmentsPerGeneJob
428
429
430
      .collect { case job => job.output }

    /** Read count per exon file */
431
    def exonFragmentsCount: Option[File] = fragmentsPerExonJob
432
433
      .collect { case job => job.output }

bow's avatar
bow committed
434
435
436
437
438
439
440
441
    /** Base count per gene file */
    def geneBasesCount: Option[File] = basesPerGeneJob
      .collect { case job => job.output }

    /** Base count per exon file */
    def exonBasesCount: Option[File] = basesPerExonJob
      .collect { case job => job.output }

442
443
444
445
446
    /** JobSet for Cufflinks strict mode */
    protected lazy val cufflinksStrictJobSet: Option[CufflinksJobSet] = expMeasures
      .find(_ == CufflinksStrict)
      .collect { case found => new CufflinksJobSet(found) }

447
    /** Gene tracking file from Cufflinks strict mode */
bow's avatar
bow committed
448
    def geneFpkmCufflinksStrict: Option[File] = cufflinksStrictJobSet
449
      .collect { case jobSet => jobSet.geneFpkmJob.output }
450
451

    /** Isoforms tracking file from Cufflinks strict mode */
bow's avatar
bow committed
452
    def isoformFpkmCufflinksStrict: Option[File] = cufflinksStrictJobSet
453
454
455
456
457
458
      .collect { case jobSet => jobSet.isoformFpkmJob.output }

    /** JobSet for Cufflinks strict mode */
    protected lazy val cufflinksGuidedJobSet: Option[CufflinksJobSet] = expMeasures
      .find(_ == CufflinksGuided)
      .collect { case found => new CufflinksJobSet(found) }
459

460
    /** Gene tracking file from Cufflinks guided mode */
461
462
    def geneFpkmCufflinksGuided: Option[File] = cufflinksGuidedJobSet
      .collect { case jobSet => jobSet.geneFpkmJob.output }
463
464

    /** Isoforms tracking file from Cufflinks guided mode */
465
466
467
468
469
470
471
    def isoformFpkmCufflinksGuided: Option[File] = cufflinksGuidedJobSet
      .collect { case jobSet => jobSet.isoformFpkmJob.output }

    /** JobSet for Cufflinks blind mode */
    protected lazy val cufflinksBlindJobSet: Option[CufflinksJobSet] = expMeasures
      .find(_ == CufflinksBlind)
      .collect { case found => new CufflinksJobSet(found) }
472

473
    /** Gene tracking file from Cufflinks guided mode */
474
475
    def geneFpkmCufflinksBlind: Option[File] = cufflinksBlindJobSet
      .collect { case jobSet => jobSet.geneFpkmJob.output }
476
477

    /** Isoforms tracking file from Cufflinks blind mode */
478
479
    def isoformFpkmCufflinksBlind: Option[File] = cufflinksBlindJobSet
      .collect { case jobSet => jobSet.isoformFpkmJob.output }
480

481
482
483
484
    /** Raw variant calling file */
    def variantCalls: Option[File] = varCallJob
      .collect { case job => job.output }

485
    /** ID-sorting job for HTseq-count jobs */
486
    private def idSortingJob: Option[SortSam] = (expMeasures.contains(FragmentsPerExon) || expMeasures.contains(FragmentsPerGene))
bow's avatar
bow committed
487
488
489
490
491
      .option {
        val job = new SortSam(qscript)
        job.input = alnFile
        job.output = createFile(".idsorted.bam")
        job.sortOrder = "queryname"
bow's avatar
bow committed
492
        job.isIntermediate = true
bow's avatar
bow committed
493
494
        job
      }
495
496

    /** Read counting job per gene */
497
498
    private def fragmentsPerGeneJob: Option[HtseqCount] = expMeasures
      .contains(FragmentsPerGene)
bow's avatar
bow committed
499
500
      .option {
        require(idSortingJob.nonEmpty)
501
        val job = new HtseqCount(qscript)
Peter van 't Hof's avatar
Peter van 't Hof committed
502
        annotationGtf.foreach(job.inputAnnotation = _)
bow's avatar
bow committed
503
        job.inputAlignment = idSortingJob.get.output
504
        job.output = createFile(".fragments_per_gene")
505
        job.format = Option("bam")
bow's avatar
bow committed
506
507
        // We are forcing the sort order to be ID-sorted, since HTSeq-count often chokes when using position-sorting due
        // to its buffer not being large enough.
508
        job.order = Option("name")
509
        job.stranded = strandProtocol match {
510
511
512
513
514
515
          case NonSpecific => Option("no")
          case Dutp        => Option("reverse")
          case _           => throw new IllegalStateException
        }
        job
      }
bow's avatar
bow committed
516

517
    /** Read counting job per exon */
518
519
    private def fragmentsPerExonJob: Option[HtseqCount] = expMeasures
      .contains(FragmentsPerExon)
bow's avatar
bow committed
520
521
      .option {
        require(idSortingJob.nonEmpty)
522
523
        val job = new HtseqCount(qscript)
        job.inputAnnotation = annotationGtf.get
bow's avatar
bow committed
524
        job.inputAlignment = idSortingJob.get.output
bow's avatar
bow committed
525
        job.output = createFile(".fragments_per_exon")
526
527
        job.format = Option("bam")
        job.order = Option("name")
528
        job.stranded = strandProtocol match {
529
530
531
532
533
534
535
          case NonSpecific => Option("no")
          case Dutp        => Option("reverse")
          case _           => throw new IllegalStateException
        }
        // TODO: ensure that the "exon_id" attributes exist for all exons in the GTF
        job.idattr = Option("exon_id")
        job
bow's avatar
bow committed
536
      }
bow's avatar
bow committed
537

bow's avatar
bow committed
538
539
    /** Container for strand-separation jobs */
    private case class StrandSeparationJobSet(pair1Job: SamtoolsView, pair2Job: Option[SamtoolsView],
540
                                              combineJob: QFunction { def output: File }) {
bow's avatar
bow committed
541
      def addAllJobs(): Unit = {
542
        add(pair1Job); pair2Job.foreach(add(_)); add(combineJob)
bow's avatar
bow committed
543
544
545
      }
    }

bow's avatar
bow committed
546
    /** Alignment file of reads from the plus strand, only defined when run is strand-specific */
bow's avatar
bow committed
547
    def alnFilePlusStrand: Option[File] = alnPlusStrandJobs
548
      .collect { case jobSet => jobSet.combineJob.output }
bow's avatar
bow committed
549

bow's avatar
bow committed
550
    /** Jobs for generating reads from the plus strand, only defined when run is strand-specific */
551
    private def alnPlusStrandJobs: Option[StrandSeparationJobSet] = strandProtocol match {
bow's avatar
bow committed
552
      case Dutp =>
553
        val r2Job = this.allPaired
554
555
556
557
558
          .option {
            val job = new SamtoolsView(qscript)
            job.input = alnFile
            job.b = true
            job.h = true
559
            job.f = List("0x80")
560
561
562
563
564
            job.F = List("0x10")
            job.output = createFile(".r2.bam")
            job.isIntermediate = true
            job
          }
565
566
567
568
569
570
571
572
573
574
575

        val f1Job = new SamtoolsView(qscript)
        f1Job.input = alnFile
        f1Job.b = true
        f1Job.h = true
        f1Job.f = if (this.allSingle) List("0x10") else List("0x50")
        f1Job.output = createFile(".f1.bam")
        // since we are symlinking if the other pair does not exist,
        // we want to keep this job as non-intermediate as well
        f1Job.isIntermediate = r2Job.isDefined

576
        val perStrandFiles = r2Job match {
bow's avatar
bow committed
577
578
          case Some(r2j) => List(f1Job.output, r2j.output)
          case None      => List(f1Job.output)
bow's avatar
bow committed
579
        }
bow's avatar
bow committed
580
        val combineJob = makeCombineJob(perStrandFiles, createFile(".plus_strand.bam"))
bow's avatar
bow committed
581

582
        Option(StrandSeparationJobSet(f1Job, r2Job, combineJob.alnJob))
bow's avatar
bow committed
583
584
585
586
587

      case NonSpecific => None
      case _           => throw new IllegalStateException
    }

bow's avatar
bow committed
588
    /** Alignment file of reads from the minus strand, only defined when run is strand-specific */
bow's avatar
bow committed
589
    def alnFileMinusStrand: Option[File] = alnMinusStrandJobs
590
      .collect { case jobSet => jobSet.combineJob.output }
bow's avatar
bow committed
591

bow's avatar
bow committed
592
    /** Jobs for generating reads from the minus, only defined when run is strand-specific */
593
    private def alnMinusStrandJobs: Option[StrandSeparationJobSet] = strandProtocol match {
bow's avatar
bow committed
594
      case Dutp =>
595
        val r1Job = this.allPaired
596
597
598
599
600
          .option {
            val job = new SamtoolsView(qscript)
            job.input = alnFile
            job.b = true
            job.h = true
601
            job.f = List("0x40")
602
603
604
605
606
            job.F = List("0x10")
            job.output = createFile(".r1.bam")
            job.isIntermediate = true
            job
          }
607
608
609
610
611
612
613
614
615
616
617
618

        val f2Job = new SamtoolsView(qscript)
        f2Job.input = alnFile
        f2Job.b = true
        f2Job.h = true
        f2Job.output = createFile(".f2.bam")
        // since we are symlinking if the other pair does not exist,
        // we want to keep this job as non-intermediate as well
        f2Job.isIntermediate = r1Job.isDefined
        if (this.allSingle) f2Job.F = List("0x10")
        else f2Job.f = List("0x90")

619
        val perStrandFiles = r1Job match {
bow's avatar
bow committed
620
621
          case Some(r1j) => List(f2Job.output, r1j.output)
          case None      => List(f2Job.output)
bow's avatar
bow committed
622
        }
bow's avatar
bow committed
623
        val combineJob = makeCombineJob(perStrandFiles, createFile(".minus_strand.bam"))
bow's avatar
bow committed
624

625
        Option(StrandSeparationJobSet(f2Job, r1Job, combineJob.alnJob))
bow's avatar
bow committed
626
627
628
629
630

      case NonSpecific => None
      case _           => throw new IllegalStateException
    }
    /** Raw base counting job */
631
    private def rawBaseCountJob: Option[RawBaseCounter] = strandProtocol match {
bow's avatar
bow committed
632
633
634
635
636
      case NonSpecific =>
        (expMeasures.contains(BasesPerExon) || expMeasures.contains(BasesPerGene))
          .option {
            val job = new RawBaseCounter(qscript)
            job.inputBoth = alnFile
Peter van 't Hof's avatar
Peter van 't Hof committed
637
            annotationBed.foreach(job.annotationBed = _)
bow's avatar
bow committed
638
639
640
            job.output = createFile(".raw_base_count")
            job
          }
bow's avatar
bow committed
641
      case Dutp =>
bow's avatar
bow committed
642
643
644
645
646
647
        (expMeasures.contains(BasesPerExon) || expMeasures.contains(BasesPerGene))
          .option {
            require(alnFilePlusStrand.isDefined && alnFileMinusStrand.isDefined)
            val job = new RawBaseCounter(qscript)
            job.inputPlus = alnFilePlusStrand.get
            job.inputMinus = alnFileMinusStrand.get
Peter van 't Hof's avatar
Peter van 't Hof committed
648
            annotationBed.foreach(job.annotationBed = _)
bow's avatar
bow committed
649
650
651
652
653
            job.output = createFile(".raw_base_count")
            job
          }
      case _ => throw new IllegalStateException
    }
bow's avatar
bow committed
654

655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
    /** Base counting job per gene */
    private def basesPerGeneJob: Option[AggrBaseCount] = expMeasures
      .contains(BasesPerGene)
      .option {
        require(rawBaseCountJob.nonEmpty)
        val job = new AggrBaseCount(qscript)
        job.input = rawBaseCountJob.get.output
        job.output = createFile(".bases_per_gene")
        job.inputLabel = sampleId
        job.mode = "gene"
        job
      }

    /** Base counting job per exon */
    private def basesPerExonJob: Option[AggrBaseCount] = expMeasures
      .contains(BasesPerExon)
      .option {
        require(rawBaseCountJob.nonEmpty)
        val job = new AggrBaseCount(qscript)
        job.input = rawBaseCountJob.get.output
        job.output = createFile(".bases_per_exon")
        job.inputLabel = sampleId
        job.mode = "exon"
        job
      }

681
682
683
684
685
686
687
688
689
    /** Variant calling job */
    private def varCallJob: Option[CustomVarScan] = callVariants
      .option {
        val job = new CustomVarScan(qscript)
        job.input = alnFile
        job.output = createFile(".raw.vcf.gz")
        job
      }

bow's avatar
bow committed
690
    /** Job for removing ribosomal reads */
bow's avatar
bow committed
691
692
    private def wipeJob: Option[WipeReads] = removeRibosomalReads
      .option {
Peter van 't Hof's avatar
Peter van 't Hof committed
693
        //require(ribosomalRefFlat.isDefined)
bow's avatar
bow committed
694
        val job = new WipeReads(qscript)
695
        job.inputBam = bamFile.get
Peter van 't Hof's avatar
Peter van 't Hof committed
696
        ribosomalRefFlat.foreach(job.intervalFile = _)
697
        job.outputBam = createFile(".cleaned.bam")
bow's avatar
bow committed
698
699
700
701
        job.discardedBam = createFile(".rrna.bam")
        job
      }

bow's avatar
bow committed
702
    /** Super type of Ln and MergeSamFiles */
703
704
705
706
    case class CombineFileJobSet(alnJob: QFunction { def output: File }, idxJob: Option[Ln]) {
      /** Adds all jobs in this jobset */
      def addAll(): Unit = { add(alnJob); idxJob.foreach(add(_)) }
    }
707
708

    /** Ln or MergeSamFile job, depending on how many inputs are supplied */
bow's avatar
bow committed
709
    private def makeCombineJob(inFiles: List[File], outFile: File,
710
                               mergeSortOrder: String = "coordinate"): CombineFileJobSet = {
711
      require(inFiles.nonEmpty, "At least one input files required for combine job")
712
      if (inFiles.size == 1) {
713
714

        val jobBam = new Ln(qscript)
715
        jobBam.input = inFiles.head.getAbsoluteFile
716
717
718
        jobBam.output = outFile

        val jobIdx = new Ln(qscript)
719
720
        jobIdx.input = swapExt(libraries.values.head.libDir, jobBam.input, ".bam", ".bai")
        jobIdx.output = swapExt(sampleDir, jobBam.output, ".bam", ".bai")
721
722

        CombineFileJobSet(jobBam, Some(jobIdx))
723
724
725
726
727
      } else {
        val job = new MergeSamFiles(qscript)
        job.input = inFiles
        job.output = outFile
        job.sortOrder = mergeSortOrder
728
        CombineFileJobSet(job, None)
729
      }
730
731
    }

732
    /** Whether all libraries are paired or not */
733
    def allPaired: Boolean = libraries.values.forall(_.mapping.forall(_.input_R2.isDefined))
734
735

    /** Whether all libraries are single or not */
736
    def allSingle: Boolean = libraries.values.forall(_.mapping.forall(_.input_R2.isEmpty))
737

738
    // TODO: add warnings or other messages for config values that are hard-coded by the pipeline
bow's avatar
bow committed
739
    /** Adds all jobs for the sample */
740
741
    override def addJobs(): Unit = {
      super.addJobs()
742
743
      // TODO: this is our requirement since it's easier to calculate base counts when all libraries are either paired or single
      require(allPaired || allSingle, s"Sample $sampleId contains only single-end or paired-end libraries")
bow's avatar
bow committed
744
      // merge or symlink per-library alignments
bow's avatar
bow committed
745
746
747
748
      // add bigwig output, also per-strand when possible
      addAll(Bam2Wig(qscript, alnFile).functions)
      alnFilePlusStrand.collect { case f => addAll(Bam2Wig(qscript, f).functions) }
      alnFileMinusStrand.collect { case f => addAll(Bam2Wig(qscript, f).functions) }
bow's avatar
bow committed
749
750
751
      // add strand-specific jobs if defined
      alnPlusStrandJobs.foreach(_.addAllJobs())
      alnMinusStrandJobs.foreach(_.addAllJobs())
752
      // add htseq-count jobs, if defined
bow's avatar
bow committed
753
      idSortingJob.foreach(add(_))
754
755
      fragmentsPerGeneJob.foreach(add(_))
      fragmentsPerExonJob.foreach(add(_))
756
      // add custom base count jobs, if defined
bow's avatar
bow committed
757
      rawBaseCountJob.foreach(add(_))
758
759
      basesPerGeneJob.foreach(add(_))
      basesPerExonJob.foreach(add(_))
bow's avatar
bow committed
760
761
      // symlink results with distinct extensions ~ actually to make it easier to use MergeTables on these as well
      // since the Queue argument parser doesn't play nice with Map[_, _] types
bow's avatar
bow committed
762
763
764
      cufflinksStrictJobSet.foreach(_.jobs.foreach(add(_)))
      cufflinksGuidedJobSet.foreach(_.jobs.foreach(add(_)))
      cufflinksBlindJobSet.foreach(_.jobs.foreach(add(_)))
765
766
      // add variant calling job if requested
      varCallJob.foreach(add(_))
767
    }
bow's avatar
bow committed
768
  }
bow's avatar
bow committed
769
}
bow's avatar
bow committed
770

bow's avatar
bow committed
771
object Gentrap extends PipelineCommand {
772

bow's avatar
bow committed
773
774
  /** Enumeration of available expression measures */
  object ExpMeasures extends Enumeration {
bow's avatar
bow committed
775
776
777
    val FragmentsPerGene, FragmentsPerExon, BasesPerGene, BasesPerExon, CufflinksStrict, CufflinksGuided, CufflinksBlind = Value
    //Cuffquant,
    //Rsem = Value
778
  }
bow's avatar
bow committed
779

bow's avatar
bow committed
780
  /** Enumeration of available strandedness */
781
  object StrandProtocol extends Enumeration {
bow's avatar
bow committed
782
    // for now, only non-strand specific and dUTP stranded protocol is supported
bow's avatar
bow committed
783
    // TODO: other strandedness protocol?
bow's avatar
bow committed
784
785
    val NonSpecific, Dutp = Value
  }
786
787
788
789
790
791
792
793

  /** Converts string with underscores into camel-case strings */
  private def camelize(ustring: String): String = ustring
    .split("_")
    .map(_.toLowerCase.capitalize)
    .mkString("")

  /** Conversion from raw user-supplied expression measure string to enum value */
794
  private def makeExpMeasure(rawName: String): Option[ExpMeasures.Value] = {
795
    try {
796
      Some(ExpMeasures.withName(camelize(rawName)))
797
    } catch {
798
799
800
801
      case nse: NoSuchElementException =>
        Logging.addError(s"Invalid expression measure: $rawName")
        None
      case e: Exception => throw e
802
803
804
805
    }
  }

  /** Conversion from raw user-supplied expression measure string to enum value */
806
  private def makeStrandProtocol(rawName: String): Option[StrandProtocol.Value] = {
807
    try {
808
      Some(StrandProtocol.withName(camelize(rawName)))
809
    } catch {
810
811
812
813
      case nse: NoSuchElementException =>
        Logging.addError(s"Invalid strand protocol: $rawName")
        None
      case e: Exception => throw e
814
815
    }
  }
816
}