Flexiprep.scala 10.6 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
 * Biopet is built on top of GATK Queue for building bioinformatic
 * pipelines. It is mainly intended to support LUMC SHARK cluster which is running
 * SGE. But other types of HPC that are supported by GATK Queue (such as PBS)
 * should also be able to execute Biopet tools and pipelines.
 *
 * Copyright 2014 Sequencing Analysis Support Core - Leiden University Medical Center
 *
 * Contact us at: sasc@lumc.nl
 *
 * A dual licensing mode is applied. The source code within this project that are
 * not part of GATK Queue is freely available for non-commercial use under an AGPL
 * license; For commercial users or users who do not want to follow the AGPL
 * license, please contact us to obtain a separate license.
 */
16
17
package nl.lumc.sasc.biopet.pipelines.flexiprep

Peter van 't Hof's avatar
Peter van 't Hof committed
18
import nl.lumc.sasc.biopet.core.summary.SummaryQScript
Peter van 't Hof's avatar
Peter van 't Hof committed
19
import nl.lumc.sasc.biopet.core.{ BiopetFifoPipe, PipelineCommand, SampleLibraryTag }
Peter van 't Hof's avatar
Peter van 't Hof committed
20
import nl.lumc.sasc.biopet.extensions.{ Zcat, Gzip }
21
import nl.lumc.sasc.biopet.utils.config.Configurable
22
import nl.lumc.sasc.biopet.utils.IoUtils._
23
import nl.lumc.sasc.biopet.extensions.tools.{ ValidateFastq, SeqStat, FastqSync }
24

Peter van 't Hof's avatar
Peter van 't Hof committed
25
import org.broadinstitute.gatk.queue.QScript
26

27
class Flexiprep(val root: Configurable) extends QScript with SummaryQScript with SampleLibraryTag {
28
29
30
31
32
33
  def this() = this(null)

  @Input(doc = "R1 fastq file (gzipped allowed)", shortName = "R1", required = true)
  var input_R1: File = _

  @Input(doc = "R2 fastq file (gzipped allowed)", shortName = "R2", required = false)
34
  var input_R2: Option[File] = None
35

36
37
  /** Skip Trim fastq files */
  var skipTrim: Boolean = config("skip_trim", default = false)
38

39
40
  /** Skip Clip fastq files */
  var skipClip: Boolean = config("skip_clip", default = false)
41

42
43
44
  /** Make a final fastq files, by default only when flexiprep is the main pipeline */
  var keepQcFastqFiles: Boolean = config("keepQcFastqFiles", default = root == null)

Peter van 't Hof's avatar
Peter van 't Hof committed
45
  /** Location of summary file */
46
  def summaryFile = new File(outputDir, sampleId.getOrElse("x") + "-" + libId.getOrElse("x") + ".qc.summary.json")
47

Peter van 't Hof's avatar
Peter van 't Hof committed
48
  /** Returns files to store in summary */
Peter van 't Hof's avatar
Peter van 't Hof committed
49
  def summaryFiles: Map[String, File] = {
50
51
    Map("input_R1" -> input_R1, "output_R1" -> fastqR1Qc) ++
      (if (paired) Map("input_R2" -> input_R2.get, "output_R2" -> fastqR2Qc.get) else Map())
Peter van 't Hof's avatar
Peter van 't Hof committed
52
  }
53

Peter van 't Hof's avatar
Peter van 't Hof committed
54
  /** returns settings to store in summary */
Peter van 't Hof's avatar
Peter van 't Hof committed
55
  def summarySettings = Map("skip_trim" -> skipTrim, "skip_clip" -> skipClip, "paired" -> paired)
56

57
  var paired: Boolean = input_R2.isDefined
58
59
60
61
62
63
64
65
  var R1_name: String = _
  var R2_name: String = _

  var fastqc_R1: Fastqc = _
  var fastqc_R2: Fastqc = _
  var fastqc_R1_after: Fastqc = _
  var fastqc_R2_after: Fastqc = _

Peter van 't Hof's avatar
Peter van 't Hof committed
66
67
68
69
70
71
72
73
74
75
  override def reportClass = {
    val flexiprepReport = new FlexiprepReport(this)
    flexiprepReport.outputDir = new File(outputDir, "report")
    flexiprepReport.summaryFile = summaryFile
    flexiprepReport.args = Map(
      "sampleId" -> sampleId.getOrElse("."),
      "libId" -> libId.getOrElse("."))
    Some(flexiprepReport)
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
76
  /** Function that's need to be executed before the script is accessed */
77
  def init() {
78
    require(outputDir != null, "Missing output directory on flexiprep module")
79
    require(input_R1 != null, "Missing input R1 on flexiprep module")
Peter van 't Hof's avatar
Peter van 't Hof committed
80
81
    require(sampleId != null, "Missing sample ID on flexiprep module")
    require(libId != null, "Missing library ID on flexiprep module")
82

83
    paired = input_R2.isDefined
84

Peter van 't Hof's avatar
Peter van 't Hof committed
85
86
    inputFiles :+= new InputFile(input_R1)
    input_R2.foreach(inputFiles :+= new InputFile(_))
Peter van 't Hof's avatar
Peter van 't Hof committed
87

88
    R1_name = getUncompressedFileName(input_R1)
89
    input_R2.foreach { fileR2 =>
90
91
      paired = true
      R2_name = getUncompressedFileName(fileR2)
92
93
94
    }
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
95
  /** Script to add jobs */
96
97
98
  def biopetScript() {
    runInitialJobs()

Peter van 't Hof's avatar
Peter van 't Hof committed
99
    if (paired) runTrimClip(input_R1, input_R2, outputDir)
100
    else runTrimClip(input_R1, outputDir)
101
102
103
104
105
106

    val R1_files = for ((k, v) <- outputFiles if k.endsWith("output_R1")) yield v
    val R2_files = for ((k, v) <- outputFiles if k.endsWith("output_R2")) yield v
    runFinalize(R1_files.toList, R2_files.toList)
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
107
  /** Add init non chunkable jobs */
108
  def runInitialJobs() {
109
110
    outputFiles += ("fastq_input_R1" -> input_R1)
    if (paired) outputFiles += ("fastq_input_R2" -> input_R2.get)
111

Peter van 't Hof's avatar
Peter van 't Hof committed
112
    fastqc_R1 = Fastqc(this, input_R1, new File(outputDir, R1_name + ".fastqc/"))
113
    add(fastqc_R1)
114
    addSummarizable(fastqc_R1, "fastqc_R1")
115
116
    outputFiles += ("fastqc_R1" -> fastqc_R1.output)

117
118
119
120
121
122
    val validateFastq = new ValidateFastq(this)
    validateFastq.r1Fastq = input_R1
    validateFastq.r2Fastq = input_R2
    validateFastq.jobOutputFile = new File(outputDir, ".validate_fastq.log.out")
    add(validateFastq)

123
124
125
126
127
128
    if (config("abort_on_corrupt_fastq", default = true)) {
      val checkValidateFastq = new CheckValidateFastq
      checkValidateFastq.inputLogFile = validateFastq.jobOutputFile
      checkValidateFastq.jobOutputFile = new File(outputDir, ".check.validate_fastq.log.out")
      add(checkValidateFastq)
    }
129

130
    if (paired) {
Peter van 't Hof's avatar
Peter van 't Hof committed
131
      fastqc_R2 = Fastqc(this, input_R2.get, new File(outputDir, R2_name + ".fastqc/"))
132
      add(fastqc_R2)
133
      addSummarizable(fastqc_R2, "fastqc_R2")
134
135
      outputFiles += ("fastqc_R2" -> fastqc_R2.output)
    }
Peter van 't Hof's avatar
Peter van 't Hof committed
136
137
138
139
140
141
142
143
144
145
146
147

    val seqstat_R1 = SeqStat(this, input_R1, outputDir)
    seqstat_R1.isIntermediate = true
    add(seqstat_R1)
    addSummarizable(seqstat_R1, "seqstat_R1")

    if (paired) {
      val seqstat_R2 = SeqStat(this, input_R2.get, outputDir)
      seqstat_R2.isIntermediate = true
      add(seqstat_R2)
      addSummarizable(seqstat_R2, "seqstat_R2")
    }
148
149
  }

150
151
152
153
154
155
  def fastqR1Qc = if (paired)
    new File(outputDir, s"${sampleId.getOrElse("x")}-${libId.getOrElse("x")}.R1.qc.sync.fq.gz")
  else new File(outputDir, s"${sampleId.getOrElse("x")}-${libId.getOrElse("x")}.R1.qc.fq.gz")
  def fastqR2Qc = if (paired)
    Some(new File(outputDir, s"${sampleId.getOrElse("x")}-${libId.getOrElse("x")}.R2.qc.sync.fq.gz"))
  else None
Peter van 't Hof's avatar
Peter van 't Hof committed
156

Peter van 't Hof's avatar
Peter van 't Hof committed
157
  /** Adds all chunkable jobs of flexiprep */
Peter van 't Hof's avatar
Peter van 't Hof committed
158
  def runTrimClip(R1_in: File, outDir: File, chunk: String): (File, Option[File]) =
Peter van 't Hof's avatar
Peter van 't Hof committed
159
    runTrimClip(R1_in, None, outDir, chunk)
Peter van 't Hof's avatar
Peter van 't Hof committed
160

Peter van 't Hof's avatar
Peter van 't Hof committed
161
  /** Adds all chunkable jobs of flexiprep */
Peter van 't Hof's avatar
Peter van 't Hof committed
162
  def runTrimClip(R1_in: File, outDir: File): (File, Option[File]) =
Peter van 't Hof's avatar
Peter van 't Hof committed
163
    runTrimClip(R1_in, None, outDir, "")
Peter van 't Hof's avatar
Peter van 't Hof committed
164

Peter van 't Hof's avatar
Peter van 't Hof committed
165
  /** Adds all chunkable jobs of flexiprep */
Peter van 't Hof's avatar
Peter van 't Hof committed
166
  def runTrimClip(R1_in: File, R2_in: Option[File], outDir: File): (File, Option[File]) =
167
    runTrimClip(R1_in, R2_in, outDir, "")
Peter van 't Hof's avatar
Peter van 't Hof committed
168

Peter van 't Hof's avatar
Peter van 't Hof committed
169
  /** Adds all chunkable jobs of flexiprep */
Peter van 't Hof's avatar
Peter van 't Hof committed
170
171
172
173
  def runTrimClip(R1_in: File,
                  R2_in: Option[File],
                  outDir: File,
                  chunkarg: String): (File, Option[File]) = {
174
175
    val chunk = if (chunkarg.isEmpty || chunkarg.endsWith("_")) chunkarg else chunkarg + "_"

Peter van 't Hof's avatar
Peter van 't Hof committed
176
177
    var R1 = R1_in
    var R2 = R2_in
178

179
180
    val qcCmdR1 = new QcCommand(this, fastqc_R1)
    qcCmdR1.input = R1_in
Peter van 't Hof's avatar
Peter van 't Hof committed
181
    qcCmdR1.read = "R1"
Peter van 't Hof's avatar
Peter van 't Hof committed
182
    qcCmdR1.output = if (paired) new File(outDir, fastqR1Qc.getName.stripSuffix(".gz"))
Peter van 't Hof's avatar
Peter van 't Hof committed
183
    else fastqR1Qc
Peter van 't Hof's avatar
Peter van 't Hof committed
184
    qcCmdR1.deps :+= fastqc_R1.output
185
    qcCmdR1.isIntermediate = paired || !keepQcFastqFiles
Peter van 't Hof's avatar
Peter van 't Hof committed
186
    addSummarizable(qcCmdR1, "qc_command_R1")
187
188

    if (paired) {
189
190
      val qcCmdR2 = new QcCommand(this, fastqc_R2)
      qcCmdR2.input = R2_in.get
Peter van 't Hof's avatar
Peter van 't Hof committed
191
      qcCmdR2.output = new File(outDir, fastqR2Qc.get.getName.stripSuffix(".gz"))
Peter van 't Hof's avatar
Peter van 't Hof committed
192
      qcCmdR2.read = "R2"
Peter van 't Hof's avatar
Peter van 't Hof committed
193
      addSummarizable(qcCmdR2, "qc_command_R2")
Peter van 't Hof's avatar
Peter van 't Hof committed
194
195
196

      qcCmdR1.compress = false
      qcCmdR2.compress = false
197
198
199

      val fqSync = new FastqSync(this)
      fqSync.refFastq = R1_in
Peter van 't Hof's avatar
Peter van 't Hof committed
200
201
      fqSync.inputFastq1 = qcCmdR1.output
      fqSync.inputFastq2 = qcCmdR2.output
Peter van 't Hof's avatar
Peter van 't Hof committed
202
203
      fqSync.outputFastq1 = new File(outDir, fastqR1Qc.getName)
      fqSync.outputFastq2 = new File(outDir, fastqR2Qc.get.getName)
204
      fqSync.outputStats = new File(outDir, s"${sampleId.getOrElse("x")}-${libId.getOrElse("x")}.sync.stats")
Peter van 't Hof's avatar
Peter van 't Hof committed
205
206
207
208
209
210

      val pipe = new BiopetFifoPipe(this, fqSync :: Nil) {
        override def configName = "qc-cmd"

        override def beforeGraph(): Unit = {
          fqSync.beforeGraph()
Peter van 't Hof's avatar
Peter van 't Hof committed
211
          commands = qcCmdR1.jobs ::: qcCmdR2.jobs ::: fqSync :: Nil
Peter van 't Hof's avatar
Peter van 't Hof committed
212
213
214
215
216
217
218
219
          super.beforeGraph()
        }

        override def beforeCmd(): Unit = {
          qcCmdR1.beforeCmd()
          qcCmdR2.beforeCmd()
          fqSync.beforeCmd()
          commands = qcCmdR1.jobs ::: qcCmdR2.jobs ::: fqSync :: Nil
Peter van 't Hof's avatar
Peter van 't Hof committed
220
          commands.foreach(addPipeJob)
Peter van 't Hof's avatar
Peter van 't Hof committed
221
222
223
224
225
226
227
228
229
          super.beforeCmd()
        }
      }

      pipe.deps ::= fastqc_R1.output
      pipe.deps ::= fastqc_R2.output
      pipe.isIntermediate = !keepQcFastqFiles
      add(pipe)

230
231
232
233
      addSummarizable(fqSync, "fastq_sync")
      outputFiles += ("syncStats" -> fqSync.outputStats)
      R1 = fqSync.outputFastq1
      R2 = Some(fqSync.outputFastq2)
234
235
236
237
    } else {
      add(qcCmdR1)
      R1 = qcCmdR1.output
    }
238

bow's avatar
bow committed
239
    val seqstat_R1_after = SeqStat(this, R1, outDir)
240
    add(seqstat_R1_after)
241
    addSummarizable(seqstat_R1_after, "seqstat_R1_qc")
242
243

    if (paired) {
bow's avatar
bow committed
244
      val seqstat_R2_after = SeqStat(this, R2.get, outDir)
245
      add(seqstat_R2_after)
246
      addSummarizable(seqstat_R2_after, "seqstat_R2_qc")
247
248
249
    }

    outputFiles += (chunk + "output_R1" -> R1)
Peter van 't Hof's avatar
Peter van 't Hof committed
250
    if (paired) outputFiles += (chunk + "output_R2" -> R2.get)
Peter van 't Hof's avatar
Peter van 't Hof committed
251
    (R1, R2)
252
253
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
254
  /** Adds last non chunkable jobs */
255
  def runFinalize(fastq_R1: List[File], fastq_R2: List[File]) {
256
257
    if (fastq_R1.length != fastq_R2.length && paired)
      throw new IllegalStateException("R1 and R2 file number is not the same")
258

Peter van 't Hof's avatar
Peter van 't Hof committed
259
    if (fastq_R1.length > 1) {
Peter van 't Hof's avatar
Peter van 't Hof committed
260
261
262
263
264
265
266
267
      val zcat = new Zcat(this)
      zcat.input = fastq_R1
      add(zcat | new Gzip(this) > fastqR1Qc)
      if (paired) {
        val zcat = new Zcat(this)
        zcat.input = fastq_R2
        add(zcat | new Gzip(this) > fastqR2Qc.get)
      }
Peter van 't Hof's avatar
Peter van 't Hof committed
268
    }
269

270
271
272
273
274
275
    val validateFastq = new ValidateFastq(this)
    validateFastq.r1Fastq = fastqR1Qc
    validateFastq.r2Fastq = fastqR2Qc
    validateFastq.jobOutputFile = new File(outputDir, ".validate_fastq.qc.log.out")
    add(validateFastq)

276
277
278
279
280
281
    if (config("abort_on_corrupt_fastq", default = true)) {
      val checkValidateFastq = new CheckValidateFastq
      checkValidateFastq.inputLogFile = validateFastq.jobOutputFile
      checkValidateFastq.jobOutputFile = new File(outputDir, ".check.validate_fastq.qc.log.out")
      add(checkValidateFastq)
    }
282

Peter van 't Hof's avatar
Peter van 't Hof committed
283
284
    outputFiles += ("output_R1_gzip" -> fastqR1Qc)
    if (paired) outputFiles += ("output_R2_gzip" -> fastqR2Qc.get)
285

Peter van 't Hof's avatar
Peter van 't Hof committed
286
287
288
    fastqc_R1_after = Fastqc(this, fastqR1Qc, new File(outputDir, R1_name + ".qc.fastqc/"))
    add(fastqc_R1_after)
    addSummarizable(fastqc_R1_after, "fastqc_R1_qc")
Peter van 't Hof's avatar
Peter van 't Hof committed
289

Peter van 't Hof's avatar
Peter van 't Hof committed
290
291
292
293
    if (paired) {
      fastqc_R2_after = Fastqc(this, fastqR2Qc.get, new File(outputDir, R2_name + ".qc.fastqc/"))
      add(fastqc_R2_after)
      addSummarizable(fastqc_R2_after, "fastqc_R2_qc")
294
295
    }

Peter van 't Hof's avatar
Peter van 't Hof committed
296
    addSummaryJobs()
297
298
299
300
  }
}

object Flexiprep extends PipelineCommand