GearsSingle.scala 10.1 KB
Newer Older
1
/**
2
3
4
5
6
7
8
9
10
11
12
13
14
  * Biopet is built on top of GATK Queue for building bioinformatic
  * pipelines. It is mainly intended to support LUMC SHARK cluster which is running
  * SGE. But other types of HPC that are supported by GATK Queue (such as PBS)
  * should also be able to execute Biopet tools and pipelines.
  *
  * Copyright 2014 Sequencing Analysis Support Core - Leiden University Medical Center
  *
  * Contact us at: sasc@lumc.nl
  *
  * A dual licensing mode is applied. The source code within this project is freely available for non-commercial use under an AGPL
  * license; For commercial users or users who do not want to follow the AGPL
  * license, please contact us to obtain a separate license.
  */
15
16
package nl.lumc.sasc.biopet.pipelines.gears

Peter van 't Hof's avatar
Peter van 't Hof committed
17
import nl.lumc.sasc.biopet.core.summary.SummaryQScript
Peter van 't Hof's avatar
Peter van 't Hof committed
18
import nl.lumc.sasc.biopet.core.BiopetQScript.InputFile
19
import nl.lumc.sasc.biopet.core.{PipelineCommand, SampleLibraryTag}
20
import nl.lumc.sasc.biopet.extensions.qiime.SplitLibrariesFastq
21
import nl.lumc.sasc.biopet.extensions.{Gzip, Zcat}
Peter van 't Hof's avatar
Peter van 't Hof committed
22
import nl.lumc.sasc.biopet.pipelines.flexiprep.Flexiprep
Peter van 't Hof's avatar
Peter van 't Hof committed
23
import nl.lumc.sasc.biopet.utils.Logging
Peter van 't Hof's avatar
Peter van 't Hof committed
24
import nl.lumc.sasc.biopet.utils.config.Configurable
25
import nl.lumc.sasc.biopet.utils.summary.db.SummaryDb
26
import org.broadinstitute.gatk.queue.QScript
27

28
import scala.concurrent.{Await, Future}
29
import scala.concurrent.duration.Duration
pjvan_thof's avatar
pjvan_thof committed
30
import scala.concurrent.ExecutionContext.Implicits.global
31

32
/**
33
34
35
36
37
38
  * Created by wyleung
  */
class GearsSingle(val parent: Configurable)
    extends QScript
    with SummaryQScript
    with SampleLibraryTag {
39
  def this() = this(null)
40

Wai Yi Leung's avatar
Wai Yi Leung committed
41
  @Input(doc = "R1 reads in FastQ format", shortName = "R1", required = false)
Peter van 't Hof's avatar
Peter van 't Hof committed
42
  var fastqR1: List[File] = Nil
43

Wai Yi Leung's avatar
Wai Yi Leung committed
44
  @Input(doc = "R2 reads in FastQ format", shortName = "R2", required = false)
Peter van 't Hof's avatar
Peter van 't Hof committed
45
  var fastqR2: List[File] = Nil
46

47
48
49
  @Input(doc = "All unmapped reads will be extracted from this bam for analysis",
         shortName = "bam",
         required = false)
Peter van 't Hof's avatar
Peter van 't Hof committed
50
  var bamFile: Option[File] = None
51

Peter van 't Hof's avatar
Peter van 't Hof committed
52
53
  @Argument(required = false)
  var outputName: String = _
54

55
56
57
58
59
60
  override def defaults = Map(
    "splitlibrariesfastq" -> Map(
      "barcode_type" -> "not-barcoded"
    )
  )

Peter van 't Hof's avatar
Peter van 't Hof committed
61
  def getOutputName: String = {
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
    if (outputName == null) {
      sampleId.getOrElse("noName") + libId.map("-" + _).getOrElse("")
    } else outputName
  }

  def mergedR1: File = {
    if (fastqR1.size <= 1) {
      fastqR1.head
    } else {
      new File(outputDir, "merged.R1.fq.gz")
    }
  }

  def mergedR2: Option[File] = {
    if (fastqR2.size <= 1) {
      fastqR2.headOption
    } else {
      Some(new File(outputDir, "merged.R2.fq.gz"))
    }

  }

Peter van 't Hof's avatar
Peter van 't Hof committed
84
  lazy val krakenScript: Option[GearsKraken] = {
85
86
87
88
89
90
91
92
    if (config("gears_use_kraken", default = false)) {
      val kraken = new GearsKraken(this)
      kraken.outputDir = new File(outputDir, "kraken")
      kraken.outputName = getOutputName
      Some(kraken)
    } else None
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
93
  lazy val centrifugeScript: Option[GearsCentrifuge] = {
94
95
96
97
98
99
100
101
    if (config("gears_use_centrifuge", default = true)) {
      val centrifuge = new GearsCentrifuge(this)
      centrifuge.outputDir = new File(outputDir, "centrifuge")
      centrifuge.outputName = getOutputName
      Some(centrifuge)
    } else None
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
102
  lazy val qiimeRatx: Option[GearsQiimeRtax] = {
103
104
105
106
107
108
109
    if (config("gears_use_qiime_rtax", default = false)) {
      val qiimeRatx = new GearsQiimeRtax(this)
      qiimeRatx.outputDir = new File(outputDir, "qiime_rtax")
      Some(qiimeRatx)
    } else None
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
110
  lazy val qiimeClosed: Option[GearsQiimeClosed] =
111
112
    if (config("gears_use_qiime_closed", default = false)) Some(new GearsQiimeClosed(this))
    else None
Peter van 't Hof's avatar
Peter van 't Hof committed
113
  lazy val qiimeOpen: Option[GearsQiimeOpen] =
114
    if (config("gears_use_qiime_open", default = false)) Some(new GearsQiimeOpen(this)) else None
Peter van 't Hof's avatar
Peter van 't Hof committed
115
  lazy val seqCount: Option[GearsSeqCount] =
116
    if (config("gears_use_seq_count", default = false)) Some(new GearsSeqCount(this)) else None
Peter van 't Hof's avatar
Peter van 't Hof committed
117

Peter van 't Hof's avatar
Peter van 't Hof committed
118
119
  /** Executed before running the script */
  def init(): Unit = {
Peter van 't Hof's avatar
Peter van 't Hof committed
120
    if (fastqR1.isEmpty && bamFile.isEmpty)
121
122
123
124
125
      Logging.addError("Please specify fastq-file(s) or bam file")
    if (fastqR1.nonEmpty == bamFile.isDefined)
      Logging.addError("Provide either a bam file or a R1/R2 file")
    if (fastqR2.nonEmpty && fastqR1.size != fastqR2.size)
      Logging.addError("R1 and R2 has not the same number of files")
Peter van 't Hof's avatar
Peter van 't Hof committed
126
    if (sampleId == null || sampleId.isEmpty)
127
      Logging.addError("Missing sample ID on GearsSingle module")
Peter van 't Hof's avatar
Peter van 't Hof committed
128

129
130
    if (!skipFlexiprep) {
      val db = SummaryDb.openSqliteSummary(summaryDbFile)
pjvan_thof's avatar
pjvan_thof committed
131
      val future = for {
pjvan_thof's avatar
pjvan_thof committed
132
        sample <- db.getSamples(runId = summaryRunId, name = sampleId).map(_.headOption)
133
134
        sId <- sample
          .map(s => Future.successful(s.id))
pjvan_thof's avatar
pjvan_thof committed
135
          .getOrElse(db.createSample(sampleId.getOrElse("noSampleName"), summaryRunId))
136
137
138
139
140
        library <- db
          .getLibraries(runId = summaryRunId, name = libId, sampleId = Some(sId))
          .map(_.headOption)
        lId <- library
          .map(l => Future.successful(l.id))
pjvan_thof's avatar
pjvan_thof committed
141
          .getOrElse(db.createLibrary(libId.getOrElse("noLibName"), summaryRunId, sId))
pjvan_thof's avatar
pjvan_thof committed
142
143
      } yield lId
      Await.result(future, Duration.Inf)
144
    }
Peter van 't Hof's avatar
Peter van 't Hof committed
145
    if (outputName == null) {
Ruben Vorderman's avatar
Ruben Vorderman committed
146
      outputName = sampleId.getOrElse("noName") + libId.map("-" + _).getOrElse("")
147
    }
Peter van 't Hof's avatar
Peter van 't Hof committed
148

Sander Bollen's avatar
Sander Bollen committed
149
    if (fastqR1.nonEmpty) {
Peter van 't Hof's avatar
Peter van 't Hof committed
150
151
      fastqR1.foreach(inputFiles :+= InputFile(_))
      fastqR2.foreach(inputFiles :+= InputFile(_))
Peter van 't Hof's avatar
Peter van 't Hof committed
152
    } else bamFile.foreach(inputFiles :+= InputFile(_))
Peter van 't Hof's avatar
Peter van 't Hof committed
153
  }
154

Peter van 't Hof's avatar
Peter van 't Hof committed
155
  override def reportClass: Some[GearsSingleReport] = {
156
    val gears = new GearsSingleReport(this)
Wai Yi Leung's avatar
Wai Yi Leung committed
157
    gears.outputDir = new File(outputDir, "report")
158
    gears.summaryDbFile = summaryDbFile
159
160
    sampleId.foreach(gears.args += "sampleId" -> _)
    libId.foreach(gears.args += "libId" -> _)
Wai Yi Leung's avatar
Wai Yi Leung committed
161
162
    Some(gears)
  }
163

Peter van 't Hof's avatar
Peter van 't Hof committed
164
165
  protected var skipFlexiprep: Boolean = config("skip_flexiprep", default = false)

Peter van 't Hof's avatar
Peter van 't Hof committed
166
  protected def executeFlexiprep(r1: List[File], r2: List[File]): (File, Option[File]) = {
167
    val read1: File =
168
      if (r1.size == 1) mergedR1
169
      else {
170
171
172
        val merger = Zcat(this, r1) | new Gzip(this) > mergedR1
        merger.isIntermediate = true
        add(merger)
173
        mergedR1
174
      }
Peter van 't Hof's avatar
Peter van 't Hof committed
175

176
    val read2: Option[File] =
177
      if (r2.size <= 1) mergedR2
178
      else {
179
180
181
        val merger = Zcat(this, r2) | new Gzip(this) > mergedR2.get
        merger.isIntermediate = true
        add(merger)
182
        mergedR2
183
      }
Peter van 't Hof's avatar
Peter van 't Hof committed
184

185
186
187
188
189
190
191
192
193
194
195
    flexiprep
      .map { f =>
        f.inputR1 = read1
        f.inputR2 = read2
        f.sampleId = Some(sampleId.getOrElse("noSampleName"))
        f.libId = Some(libId.getOrElse("noLibName"))
        f.outputDir = new File(outputDir, "flexiprep")
        add(f)
        (f.fastqR1Qc, f.fastqR2Qc)
      }
      .getOrElse((read1, read2))
196
197
  }

198
199
200
201
  lazy protected val flexiprep: Option[Flexiprep] = if (!skipFlexiprep) {
    Some(new Flexiprep(this))
  } else None

Peter van 't Hof's avatar
Peter van 't Hof committed
202
203
  /** Method to add jobs */
  def biopetScript(): Unit = {
Peter van 't Hof's avatar
Peter van 't Hof committed
204
    val (r1, r2): (File, Option[File]) = (fastqR1, fastqR2, bamFile) match {
Peter van 't Hof's avatar
Peter van 't Hof committed
205
      case (r1, _, _) if r1.nonEmpty => executeFlexiprep(r1, fastqR2)
Peter van 't Hof's avatar
Peter van 't Hof committed
206
207
      case (_, _, Some(bam)) =>
        val extract = new ExtractUnmappedReads(this)
Peter van 't Hof's avatar
Peter van 't Hof committed
208
        extract.outputDir = outputDir
Peter van 't Hof's avatar
Peter van 't Hof committed
209
        extract.bamFile = bam
Peter van 't Hof's avatar
Peter van 't Hof committed
210
        extract.outputName = outputName
Peter van 't Hof's avatar
Peter van 't Hof committed
211
        add(extract)
Sander Bollen's avatar
Sander Bollen committed
212
213
        fastqR1 = extract.fastqUnmappedR1 :: Nil
        fastqR2 = extract.fastqUnmappedR2.toList
Peter van 't Hof's avatar
Peter van 't Hof committed
214
        executeFlexiprep(extract.fastqUnmappedR1 :: Nil, extract.fastqUnmappedR2.toList)
Peter van 't Hof's avatar
Peter van 't Hof committed
215
      case _ => throw new IllegalArgumentException("Missing input files")
Peter van 't Hof's avatar
Peter van 't Hof committed
216
217
    }

Peter van 't Hof's avatar
Peter van 't Hof committed
218
219
220
    lazy val combinedFastq = {
      r2 match {
        case Some(r2) =>
221
222
223
224
225
226
          val combineReads = new CombineReads(this)
          combineReads.outputDir = new File(outputDir, "combine_reads")
          combineReads.fastqR1 = r1
          combineReads.fastqR2 = r2
          add(combineReads)
          combineReads.combinedFastq
Peter van 't Hof's avatar
Peter van 't Hof committed
227
228
229
230
        case _ => r1
      }
    }

231
232
233
234
235
236
237
238
239
240
    lazy val splitLibrariesFastq: File = {
      val splitLib = new SplitLibrariesFastq(this)
      splitLib.input :+= combinedFastq
      splitLib.outputDir = new File(outputDir, "split_libraries_fastq")
      sampleId.foreach(splitLib.sampleIds :+= _.replaceAll("_", "-"))
      splitLib.isIntermediate = true
      add(splitLib)
      splitLib.outputSeqs
    }

Peter van 't Hof's avatar
Peter van 't Hof committed
241
    krakenScript foreach { kraken =>
Sander Bollen's avatar
Sander Bollen committed
242
243
      kraken.fastqR1 = mergedR1
      kraken.fastqR2 = mergedR2
Peter van 't Hof's avatar
Peter van 't Hof committed
244
      add(kraken)
Peter van 't Hof's avatar
Peter van 't Hof committed
245
246
    }

Peter van 't Hof's avatar
Peter van 't Hof committed
247
    centrifugeScript foreach { centrifuge =>
Sander Bollen's avatar
Sander Bollen committed
248
249
      centrifuge.fastqR1 = mergedR1
      centrifuge.fastqR2 = mergedR2
Peter van 't Hof's avatar
Peter van 't Hof committed
250
      add(centrifuge)
251
      outputFiles += "centrifuge_output" -> centrifuge.centrifugeOutput
Peter van 't Hof's avatar
Peter van 't Hof committed
252
253
    }

Peter van 't Hof's avatar
Peter van 't Hof committed
254
    qiimeRatx foreach { qiimeRatx =>
Sander Bollen's avatar
Sander Bollen committed
255
256
      qiimeRatx.fastqR1 = mergedR1
      qiimeRatx.fastqR2 = mergedR2
Peter van 't Hof's avatar
Peter van 't Hof committed
257
      add(qiimeRatx)
Peter van 't Hof's avatar
Peter van 't Hof committed
258
    }
259

Peter van 't Hof's avatar
Peter van 't Hof committed
260
    qiimeClosed foreach { qiimeClosed =>
Peter van 't Hof's avatar
Peter van 't Hof committed
261
      qiimeClosed.outputDir = new File(outputDir, "qiime_closed")
262
      qiimeClosed.fastaInput = splitLibrariesFastq
Peter van 't Hof's avatar
Peter van 't Hof committed
263
      add(qiimeClosed)
264
      outputFiles += "qiime_closed_otu_table" -> qiimeClosed.otuTable
265
266
    }

Peter van 't Hof's avatar
Peter van 't Hof committed
267
268
    qiimeOpen foreach { qiimeOpen =>
      qiimeOpen.outputDir = new File(outputDir, "qiime_open")
269
      qiimeOpen.fastaInput = splitLibrariesFastq
Peter van 't Hof's avatar
Peter van 't Hof committed
270
      add(qiimeOpen)
271
      outputFiles += "qiime_open_otu_table" -> qiimeOpen.otuTable
Peter van 't Hof's avatar
Peter van 't Hof committed
272
273
    }

Peter van 't Hof's avatar
Peter van 't Hof committed
274
275
276
277
    seqCount.foreach { seqCount =>
      seqCount.fastqInput = combinedFastq
      seqCount.outputDir = new File(outputDir, "seq_count")
      add(seqCount)
278
      outputFiles += "seq_count_count_file" -> seqCount.countFile
Peter van 't Hof's avatar
Peter van 't Hof committed
279
280
    }

Wai Yi Leung's avatar
Wai Yi Leung committed
281
    addSummaryJobs()
282
  }
Peter van 't Hof's avatar
Peter van 't Hof committed
283

284
  /** Pipeline settings shown in the summary file */
Peter van 't Hof's avatar
Peter van 't Hof committed
285
286
287
288
  def summarySettings: Map[String, Any] = Map(
    "skip_flexiprep" -> skipFlexiprep,
    "gears_use_kraken" -> krakenScript.isDefined,
    "gear_use_qiime_rtax" -> qiimeRatx.isDefined,
Peter van 't Hof's avatar
Peter van 't Hof committed
289
290
    "gear_use_qiime_closed" -> qiimeClosed.isDefined,
    "gear_use_qiime_open" -> qiimeOpen.isDefined
Peter van 't Hof's avatar
Peter van 't Hof committed
291
  )
Peter van 't Hof's avatar
Peter van 't Hof committed
292

293
  /** Statistics shown in the summary file */
294
295
296
297
298
299
  def summaryFiles: Map[String, File] =
    Map.empty ++
      (if (bamFile.isDefined) Map("input_bam" -> bamFile.get) else Map()) ++
      fastqR1.zipWithIndex.map(x => s"input_R1_${x._2}" -> x._1) ++
      fastqR2.zipWithIndex.map(x => s"input_R2_${x._2}" -> x._1) ++
      outputFiles
300
301
302
}

/** This object give a default main method to the pipelines */
303
object GearsSingle extends PipelineCommand