GearsSingle.scala 8.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10
/**
 * Biopet is built on top of GATK Queue for building bioinformatic
 * pipelines. It is mainly intended to support LUMC SHARK cluster which is running
 * SGE. But other types of HPC that are supported by GATK Queue (such as PBS)
 * should also be able to execute Biopet tools and pipelines.
 *
 * Copyright 2014 Sequencing Analysis Support Core - Leiden University Medical Center
 *
 * Contact us at: sasc@lumc.nl
 *
11
 * A dual licensing mode is applied. The source code within this project is freely available for non-commercial use under an AGPL
12 13 14 15 16
 * license; For commercial users or users who do not want to follow the AGPL
 * license, please contact us to obtain a separate license.
 */
package nl.lumc.sasc.biopet.pipelines.gears

Peter van 't Hof's avatar
Peter van 't Hof committed
17
import nl.lumc.sasc.biopet.core.summary.SummaryQScript
18
import nl.lumc.sasc.biopet.core.BiopetQScript.InputFile
Peter van 't Hof's avatar
Peter van 't Hof committed
19 20
import nl.lumc.sasc.biopet.core.{ PipelineCommand, SampleLibraryTag }
import nl.lumc.sasc.biopet.extensions.{ Gzip, Zcat }
Peter van 't Hof's avatar
Peter van 't Hof committed
21
import nl.lumc.sasc.biopet.pipelines.flexiprep.Flexiprep
Peter van 't Hof's avatar
Peter van 't Hof committed
22
import nl.lumc.sasc.biopet.utils.Logging
Peter van 't Hof's avatar
Peter van 't Hof committed
23
import nl.lumc.sasc.biopet.utils.config.Configurable
24
import nl.lumc.sasc.biopet.utils.summary.db.SummaryDb
25
import org.broadinstitute.gatk.queue.QScript
26 27 28

import scala.concurrent.Await
import scala.concurrent.duration.Duration
Peter van 't Hof's avatar
Peter van 't Hof committed
29
import scala.concurrent.ExecutionContext.Implicits.global
30

31
/**
Wai Yi Leung's avatar
Wai Yi Leung committed
32
 * Created by wyleung
33
 */
Peter van 't Hof's avatar
Peter van 't Hof committed
34
class GearsSingle(val parent: Configurable) extends QScript with SummaryQScript with SampleLibraryTag {
35
  def this() = this(null)
36

Wai Yi Leung's avatar
Wai Yi Leung committed
37
  @Input(doc = "R1 reads in FastQ format", shortName = "R1", required = false)
38
  var fastqR1: List[File] = Nil
39

Wai Yi Leung's avatar
Wai Yi Leung committed
40
  @Input(doc = "R2 reads in FastQ format", shortName = "R2", required = false)
41
  var fastqR2: List[File] = Nil
42

Wai Yi Leung's avatar
Wai Yi Leung committed
43
  @Input(doc = "All unmapped reads will be extracted from this bam for analysis", shortName = "bam", required = false)
Peter van 't Hof's avatar
Peter van 't Hof committed
44
  var bamFile: Option[File] = None
45

Peter van 't Hof's avatar
Peter van 't Hof committed
46 47
  @Argument(required = false)
  var outputName: String = _
48

Peter van 't Hof's avatar
Peter van 't Hof committed
49 50
  lazy val krakenScript = if (config("gears_use_kraken", default = false)) Some(new GearsKraken(this)) else None
  lazy val centrifugeScript = if (config("gears_use_centrifuge", default = true)) Some(new GearsCentrifuge(this)) else None
Peter van 't Hof's avatar
Peter van 't Hof committed
51 52
  lazy val qiimeRatx = if (config("gears_use_qiime_rtax", default = false)) Some(new GearsQiimeRtax(this)) else None
  lazy val qiimeClosed = if (config("gears_use_qiime_closed", default = false)) Some(new GearsQiimeClosed(this)) else None
53
  lazy val qiimeOpen = if (config("gears_use_qiime_open", default = false)) Some(new GearsQiimeOpen(this)) else None
Peter van 't Hof's avatar
Peter van 't Hof committed
54
  lazy val seqCount = if (config("gears_use_seq_count", default = false)) Some(new GearsSeqCount(this)) else None
55

Peter van 't Hof's avatar
Peter van 't Hof committed
56 57
  /** Executed before running the script */
  def init(): Unit = {
58 59 60
    if (fastqR1.isEmpty && !bamFile.isDefined) Logging.addError("Please specify fastq-file(s) or bam file")
    if (fastqR1.nonEmpty == bamFile.isDefined) Logging.addError("Provide either a bam file or a R1/R2 file")
    if (fastqR2.nonEmpty && fastqR1.size != fastqR2.size) Logging.addError("R1 and R2 has not the same number of files")
Peter van 't Hof's avatar
Peter van 't Hof committed
61
    if (sampleId == null || sampleId == None) Logging.addError("Missing sample ID on GearsSingle module")
Peter van 't Hof's avatar
Peter van 't Hof committed
62

63 64 65 66 67 68 69
    if (!skipFlexiprep) {
      val db = SummaryDb.openSqliteSummary(summaryDbFile)
      val sample = Await.result(db.getSamples(runId = summaryRunId, name = sampleId).map(_.headOption), Duration.Inf)
      val sId = sample.map(_.id).getOrElse(Await.result(db.createSample(sampleId.getOrElse("noSample"), summaryRunId), Duration.Inf))
      val library = Await.result(db.getLibraries(runId = summaryRunId, name = libId, sampleId = Some(sId)).map(_.headOption), Duration.Inf)
      val lId = library.map(_.id).getOrElse(Await.result(db.createLibrary(libId.getOrElse("noLib"), summaryRunId, sId), Duration.Inf))
    }
Peter van 't Hof's avatar
Peter van 't Hof committed
70
    if (outputName == null) {
rhpvorderman's avatar
rhpvorderman committed
71
      outputName = sampleId.getOrElse("noName") + libId.map("-" + _).getOrElse("")
72
    }
73

74
    if (fastqR1.nonEmpty) {
75 76
      fastqR1.foreach(inputFiles :+= InputFile(_))
      fastqR2.foreach(inputFiles :+= InputFile(_))
77
    } else bamFile.foreach(inputFiles :+= InputFile(_))
Peter van 't Hof's avatar
Peter van 't Hof committed
78
  }
79

Wai Yi Leung's avatar
Wai Yi Leung committed
80
  override def reportClass = {
81
    val gears = new GearsSingleReport(this)
Wai Yi Leung's avatar
Wai Yi Leung committed
82
    gears.outputDir = new File(outputDir, "report")
83
    gears.summaryDbFile = summaryDbFile
84 85
    sampleId.foreach(gears.args += "sampleId" -> _)
    libId.foreach(gears.args += "libId" -> _)
Wai Yi Leung's avatar
Wai Yi Leung committed
86 87
    Some(gears)
  }
88

Peter van 't Hof's avatar
Peter van 't Hof committed
89 90
  protected var skipFlexiprep: Boolean = config("skip_flexiprep", default = false)

91 92 93
  protected def executeFlexiprep(r1: List[File], r2: List[File]): (File, Option[File]) = {
    val read1: File = if (r1.size == 1) r1.head else {
      val outputFile = new File(outputDir, "merged.R1.fq.gz")
94
      add(Zcat(this, r1) | new Gzip(this) > outputFile)
95 96 97 98 99
      outputFile
    }

    val read2: Option[File] = if (r2.size <= 1) r2.headOption else {
      val outputFile = new File(outputDir, "merged.R2.fq.gz")
100
      add(Zcat(this, r2) | new Gzip(this) > outputFile)
101 102 103
      Some(outputFile)
    }

104 105 106 107 108 109 110 111 112
    flexiprep.map { f =>
      f.inputR1 = read1
      f.inputR2 = read2
      f.sampleId = Some(sampleId.getOrElse("noSampleName"))
      f.libId = Some(libId.getOrElse("noLibName"))
      f.outputDir = new File(outputDir, "flexiprep")
      add(f)
      (f.fastqR1Qc, f.fastqR2Qc)
    }.getOrElse((read1, read2))
113 114
  }

115 116 117 118
  lazy protected val flexiprep: Option[Flexiprep] = if (!skipFlexiprep) {
    Some(new Flexiprep(this))
  } else None

Peter van 't Hof's avatar
Peter van 't Hof committed
119 120
  /** Method to add jobs */
  def biopetScript(): Unit = {
Peter van 't Hof's avatar
Peter van 't Hof committed
121
    val (r1, r2): (File, Option[File]) = (fastqR1, fastqR2, bamFile) match {
122
      case (r1, _, _) if r1.nonEmpty => executeFlexiprep(r1, fastqR2)
123 124
      case (_, _, Some(bam)) =>
        val extract = new ExtractUnmappedReads(this)
125
        extract.outputDir = outputDir
126
        extract.bamFile = bam
127
        extract.outputName = outputName
Peter van 't Hof's avatar
Peter van 't Hof committed
128
        add(extract)
129
        executeFlexiprep(extract.fastqUnmappedR1 :: Nil, extract.fastqUnmappedR2.toList)
Peter van 't Hof's avatar
Peter van 't Hof committed
130
      case _ => throw new IllegalArgumentException("Missing input files")
131 132
    }

133 134 135
    lazy val combinedFastq = {
      r2 match {
        case Some(r2) =>
136 137 138 139 140 141
          val combineReads = new CombineReads(this)
          combineReads.outputDir = new File(outputDir, "combine_reads")
          combineReads.fastqR1 = r1
          combineReads.fastqR2 = r2
          add(combineReads)
          combineReads.combinedFastq
142 143 144 145
        case _ => r1
      }
    }

Peter van 't Hof's avatar
Peter van 't Hof committed
146
    krakenScript foreach { kraken =>
147
      kraken.outputDir = new File(outputDir, "kraken")
148 149
      kraken.fastqR1 = r1
      kraken.fastqR2 = r2
150
      kraken.outputName = outputName
Peter van 't Hof's avatar
Peter van 't Hof committed
151
      add(kraken)
152 153
    }

154 155 156 157 158 159
    centrifugeScript foreach { centrifuge =>
      centrifuge.outputDir = new File(outputDir, "centrifuge")
      centrifuge.fastqR1 = r1
      centrifuge.fastqR2 = r2
      centrifuge.outputName = outputName
      add(centrifuge)
160
      outputFiles += "centrifuge_output" -> centrifuge.centrifugeOutput
161 162
    }

Peter van 't Hof's avatar
Peter van 't Hof committed
163
    qiimeRatx foreach { qiimeRatx =>
Peter van 't Hof's avatar
Peter van 't Hof committed
164
      qiimeRatx.outputDir = new File(outputDir, "qiime_rtax")
165 166
      qiimeRatx.fastqR1 = r1
      qiimeRatx.fastqR2 = r2
Peter van 't Hof's avatar
Peter van 't Hof committed
167
      add(qiimeRatx)
168
    }
169

Peter van 't Hof's avatar
Peter van 't Hof committed
170
    qiimeClosed foreach { qiimeClosed =>
Peter van 't Hof's avatar
Peter van 't Hof committed
171
      qiimeClosed.outputDir = new File(outputDir, "qiime_closed")
172
      qiimeClosed.fastqInput = combinedFastq
Peter van 't Hof's avatar
Peter van 't Hof committed
173
      add(qiimeClosed)
174
      outputFiles += "qiime_closed_otu_table" -> qiimeClosed.otuTable
175 176
    }

177 178 179 180
    qiimeOpen foreach { qiimeOpen =>
      qiimeOpen.outputDir = new File(outputDir, "qiime_open")
      qiimeOpen.fastqInput = combinedFastq
      add(qiimeOpen)
181
      outputFiles += "qiime_open_otu_table" -> qiimeOpen.otuTable
182 183
    }

Peter van 't Hof's avatar
Peter van 't Hof committed
184 185 186 187
    seqCount.foreach { seqCount =>
      seqCount.fastqInput = combinedFastq
      seqCount.outputDir = new File(outputDir, "seq_count")
      add(seqCount)
188
      outputFiles += "seq_count_count_file" -> seqCount.countFile
Peter van 't Hof's avatar
Peter van 't Hof committed
189 190
    }

Wai Yi Leung's avatar
Wai Yi Leung committed
191
    addSummaryJobs()
192
  }
Peter van 't Hof's avatar
Peter van 't Hof committed
193

194
  /** Pipeline settings shown in the summary file */
Peter van 't Hof's avatar
Peter van 't Hof committed
195 196 197 198
  def summarySettings: Map[String, Any] = Map(
    "skip_flexiprep" -> skipFlexiprep,
    "gears_use_kraken" -> krakenScript.isDefined,
    "gear_use_qiime_rtax" -> qiimeRatx.isDefined,
199 200
    "gear_use_qiime_closed" -> qiimeClosed.isDefined,
    "gear_use_qiime_open" -> qiimeOpen.isDefined
Peter van 't Hof's avatar
Peter van 't Hof committed
201
  )
Peter van 't Hof's avatar
Peter van 't Hof committed
202

203
  /** Statistics shown in the summary file */
204 205
  def summaryFiles: Map[String, File] = Map.empty ++
    (if (bamFile.isDefined) Map("input_bam" -> bamFile.get) else Map()) ++
206 207
    fastqR1.zipWithIndex.map(x => s"input_R1_${x._2}" -> x._1) ++
    fastqR2.zipWithIndex.map(x => s"input_R2_${x._2}" -> x._1) ++
Wai Yi Leung's avatar
Wai Yi Leung committed
208
    outputFiles
209 210 211
}

/** This object give a default main method to the pipelines */
212
object GearsSingle extends PipelineCommand