GearsTrait.scala 12.9 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
 * Biopet is built on top of GATK Queue for building bioinformatic
 * pipelines. It is mainly intended to support LUMC SHARK cluster which is running
 * SGE. But other types of HPC that are supported by GATK Queue (such as PBS)
 * should also be able to execute Biopet tools and pipelines.
 *
 * Copyright 2014 Sequencing Analysis Support Core - Leiden University Medical Center
 *
 * Contact us at: sasc@lumc.nl
 *
 * A dual licensing mode is applied. The source code within this project that are
 * not part of GATK Queue is freely available for non-commercial use under an AGPL
 * license; For commercial users or users who do not want to follow the AGPL
 * license, please contact us to obtain a separate license.
 */
package nl.lumc.sasc.biopet.pipelines.gears

import java.io.File

20
import nl.lumc.sasc.biopet.FullVersion
21
22
23
24
25
import htsjdk.samtools.SamReaderFactory
import nl.lumc.sasc.biopet.core.summary.SummaryQScript
import nl.lumc.sasc.biopet.core.MultiSampleQScript
import nl.lumc.sasc.biopet.extensions.Ln
import nl.lumc.sasc.biopet.extensions.kraken.{ KrakenReport, Kraken }
Wai Yi Leung's avatar
Wai Yi Leung committed
26
import nl.lumc.sasc.biopet.extensions.picard.{ MergeSamFiles, AddOrReplaceReadGroups, SamToFastq, MarkDuplicates }
Wai Yi Leung's avatar
Wai Yi Leung committed
27
import nl.lumc.sasc.biopet.extensions.sambamba.SambambaView
28
29
import nl.lumc.sasc.biopet.pipelines.bammetrics.BamMetrics
import nl.lumc.sasc.biopet.pipelines.mapping.Mapping
Wai Yi Leung's avatar
Wai Yi Leung committed
30
import nl.lumc.sasc.biopet.tools.FastqSync
31
import org.broadinstitute.gatk.queue.QScript
Wai Yi Leung's avatar
Wai Yi Leung committed
32
import org.broadinstitute.gatk.queue.function.QFunction
33
import scala.collection.JavaConversions._
34

35
36
37
38
/**
 * This is a trait for the Gears pipeline
 * The ShivaTrait is used as template for this pipeline
 */
39
trait GearsTrait extends MultiSampleQScript with SummaryQScript { qscript =>
40
41
42
43
44
45
46
47
48
49
50

  /** Executed before running the script */
  def init: Unit = {
  }

  /** Method to add jobs */
  def biopetScript: Unit = {
    addSamplesJobs()
    addSummaryJobs
  }

51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
  /** Multisample meta-genome comparison */
  def addMultiSampleJobs: Unit = {
    // generate report from multiple samples, this is:
    // - the TSV
    // - the Spearman correlation plot + table
  }

  /** Location of summary file */
  def summaryFile = new File(outputDir, "gears.summary.json")

  /** Settings of pipeline for summary */
  def summarySettings = Map(
    "version" -> FullVersion
  )

  /** Files for the summary */
  def summaryFiles = Map()

69
70
71
72
73
74
75
76
77
78
79
  /** Method to make a sample */
  def makeSample(id: String) = new Sample(id)

  /** Class that will generate jobs for a sample */
  class Sample(sampleId: String) extends AbstractSample(sampleId) {
    /** Sample specific files to add to summary */
    def summaryFiles: Map[String, File] = {
      preProcessBam match {
        case Some(preProcessBam) => Map("bamFile" -> preProcessBam)
        case _                   => Map()
      }
80
81
82
    } ++ Map(
      "alignment" -> alnFile
    )
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100

    /** Sample specific stats to add to summary */
    def summaryStats: Map[String, Any] = Map()

    /** Method to make a library */
    def makeLibrary(id: String) = new Library(id)

    /** Class to generate jobs for a library */
    class Library(libId: String) extends AbstractLibrary(libId) {
      /** Library specific files to add to the summary */
      def summaryFiles: Map[String, File] = {
        (bamFile, preProcessBam) match {
          case (Some(bamFile), Some(preProcessBam)) => Map("bamFile" -> bamFile, "preProcessBam" -> preProcessBam)
          case (Some(bamFile), _)                   => Map("bamFile" -> bamFile)
          case _                                    => Map()
        }
      }

Wai Yi Leung's avatar
Wai Yi Leung committed
101
      /** Alignment results of this library ~ can only be accessed after addJobs is run! */
102
103
104
105
      def alnFile: File = bamFile match {
        case Some(b) => b
        case _       => throw new IllegalStateException("The bamfile is not generated yet")
      }
Wai Yi Leung's avatar
Wai Yi Leung committed
106

107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
      /** Library specific stats to add to summary */
      def summaryStats: Map[String, Any] = Map()

      /** Method to execute library preprocess */
      def preProcess(input: File): Option[File] = None

      /** Method to make the mapping submodule */
      def makeMapping = {
        val mapping = new Mapping(qscript)
        mapping.sampleId = Some(sampleId)
        mapping.libId = Some(libId)
        mapping.outputDir = libDir
        mapping.outputName = sampleId + "-" + libId
        (Some(mapping), Some(mapping.finalBamFile), preProcess(mapping.finalBamFile))
      }

      /**
       * Determine where where to start the pipeline in cases where both R1 (fastq) and BAM is specified
       */
      lazy val (mapping, bamFile, preProcessBam): (Option[Mapping], Option[File], Option[File]) =
        (config.contains("R1"), config.contains("bam")) match {
          case (true, _) => makeMapping // Default starting from fastq files
          case (false, true) => // Starting from bam file
            config("bam_to_fastq", default = false).asBoolean match {
              case true => makeMapping // bam file will be converted to fastq
              case false => {
                val file = new File(libDir, sampleId + "-" + libId + ".final.bam")
                (None, Some(file), preProcess(file))
              }
            }
          case _ => (None, None, None)
        }

      /** This will add jobs for this library */
      def addJobs(): Unit = {
        (config.contains("R1"), config.contains("bam")) match {
          case (true, _) => mapping.foreach(mapping => {
            mapping.input_R1 = config("R1")
          })
          case (false, true) => config("bam_to_fastq", default = false).asBoolean match {
            case true => {
              val samToFastq = SamToFastq(qscript, config("bam"),
                new File(libDir, sampleId + "-" + libId + ".R1.fastq"),
                new File(libDir, sampleId + "-" + libId + ".R2.fastq"))
              samToFastq.isIntermediate = true
              qscript.add(samToFastq)
              mapping.foreach(mapping => {
                mapping.input_R1 = samToFastq.fastqR1
                mapping.input_R2 = Some(samToFastq.fastqR2)
              })
            }
            case false => {
              val inputSam = SamReaderFactory.makeDefault.open(config("bam"))
              val readGroups = inputSam.getFileHeader.getReadGroups

              val readGroupOke = readGroups.forall(readGroup => {
                if (readGroup.getSample != sampleId) logger.warn("Sample ID readgroup in bam file is not the same")
                if (readGroup.getLibrary != libId) logger.warn("Library ID readgroup in bam file is not the same")
                readGroup.getSample == sampleId && readGroup.getLibrary == libId
              })
              inputSam.close

              if (!readGroupOke) {
                if (config("correct_readgroups", default = false).asBoolean) {
                  logger.info("Correcting readgroups, file:" + config("bam"))
                  val aorrg = AddOrReplaceReadGroups(qscript, config("bam"), bamFile.get)
                  aorrg.RGID = sampleId + "-" + libId
                  aorrg.RGLB = libId
                  aorrg.RGSM = sampleId
                  aorrg.isIntermediate = true
                  qscript.add(aorrg)
                } else throw new IllegalStateException("Sample readgroup and/or library of input bamfile is not correct, file: " + bamFile +
                  "\nPlease note that it is possible to set 'correct_readgroups' to true in the config to automatic fix this")
              } else {
                val oldBamFile: File = config("bam")
                val oldIndex: File = new File(oldBamFile.getAbsolutePath.stripSuffix(".bam") + ".bai")
                val newIndex: File = new File(libDir, oldBamFile.getName.stripSuffix(".bam") + ".bai")
                val baiLn = Ln(qscript, oldIndex, newIndex)
                add(baiLn)

                val bamLn = Ln(qscript, oldBamFile, bamFile.get)
                bamLn.deps :+= baiLn.output
                add(bamLn)
              }

            }
          }
          case _ => logger.warn("Sample: " + sampleId + "  Library: " + libId + ", no reads found")
        }
        mapping.foreach(mapping => {
          mapping.init
          mapping.biopetScript
Wai Yi Leung's avatar
Wai Yi Leung committed
199
          addAll(mapping.functions) // Add functions of mapping to current function pool
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
          addSummaryQScript(mapping)
        })
      }
    }

    /** This will add jobs for the double preprocessing */
    protected def addDoublePreProcess(input: List[File], isIntermediate: Boolean = false): Option[File] = {
      if (input == Nil) None
      else if (input.tail == Nil) {
        val bamFile = new File(sampleDir, input.head.getName)
        val oldIndex: File = new File(input.head.getAbsolutePath.stripSuffix(".bam") + ".bai")
        val newIndex: File = new File(sampleDir, input.head.getName.stripSuffix(".bam") + ".bai")
        val baiLn = Ln(qscript, oldIndex, newIndex)
        add(baiLn)

        val bamLn = Ln(qscript, input.head, bamFile)
        bamLn.deps :+= baiLn.output
        add(bamLn)
        Some(bamFile)
      } else {
        val md = new MarkDuplicates(qscript)
        md.input = input
        md.output = new File(sampleDir, sampleId + ".dedup.bam")
        md.outputMetrics = new File(sampleDir, sampleId + ".dedup.metrics")
        md.isIntermediate = isIntermediate
        md.removeDuplicates = true
        add(md)
        addSummarizable(md, "mark_duplicates")
        Some(md.output)
      }
    }

    lazy val preProcessBam: Option[File] = addDoublePreProcess(libraries.map(lib => {
      (lib._2.bamFile, lib._2.preProcessBam) match {
        case (_, Some(file)) => Some(file)
        case (Some(file), _) => Some(file)
        case _               => None
      }
    }).flatten.toList)
239
    def alnFile: File = sampleBamLinkJob.output
Wai Yi Leung's avatar
Wai Yi Leung committed
240
241

    /** Job for combining all library BAMs */
242
    private def sampleBamLinkJob: Ln =
Wai Yi Leung's avatar
Wai Yi Leung committed
243
244
245
246
      makeCombineJob(libraries.values.map(_.alnFile).toList, createFile(".bam"))

    /** Ln or MergeSamFile job, depending on how many inputs are supplied */
    private def makeCombineJob(inFiles: List[File], outFile: File,
247
                               mergeSortOrder: String = "coordinate"): Ln = {
Wai Yi Leung's avatar
Wai Yi Leung committed
248
      require(inFiles.nonEmpty, "At least one input files for combine job")
249
250
251
252
253
      val input: File = {

        if (inFiles.size == 1) inFiles.head
        else {
          val mergedBam = createFile(".merged.bam")
Wai Yi Leung's avatar
Wai Yi Leung committed
254
255
256
257
258
259
          val mergejob = new MergeSamFiles(qscript)
          mergejob.input = inFiles
          mergejob.output = mergedBam
          mergejob.sortOrder = mergeSortOrder
          add(mergejob)
          mergejob.output
260
        }
Wai Yi Leung's avatar
Wai Yi Leung committed
261
      }
262

263
264
265
266
      val linkJob = new Ln(qscript)
      linkJob.input = input
      linkJob.output = outFile
      linkJob
267

Wai Yi Leung's avatar
Wai Yi Leung committed
268
269
    }

270
271
272
    /** This will add sample jobs */
    def addJobs(): Unit = {
      addPerLibJobs()
Wai Yi Leung's avatar
Wai Yi Leung committed
273
      // merge or symlink per-library alignments
274
      add(sampleBamLinkJob)
275
276
277
278
279
280
281
282
283
284
285
286

      if (preProcessBam.isDefined) {
        val bamMetrics = new BamMetrics(qscript)
        bamMetrics.sampleId = Some(sampleId)
        bamMetrics.inputBam = preProcessBam.get
        bamMetrics.outputDir = sampleDir
        bamMetrics.init
        bamMetrics.biopetScript
        addAll(bamMetrics.functions)
        addSummaryQScript(bamMetrics)
      }

287
      // sambamba view -f bam -F "unmapped or mate_is_unmapped" <alnFile> > <extracted.bam>
Wai Yi Leung's avatar
Wai Yi Leung committed
288
289
290
291
292
293
      val samFilterUnmapped = new SambambaView(qscript)
      samFilterUnmapped.input = alnFile
      samFilterUnmapped.filter = Some("unmapped or mate_is_unmapped")
      samFilterUnmapped.output = createFile(".unmapped.bam")
      samFilterUnmapped.isIntermediate = true
      qscript.add(samFilterUnmapped)
294
295

      // start bam to fastq (only on unaligned reads) also extract the matesam
Wai Yi Leung's avatar
Wai Yi Leung committed
296
      val samToFastq = SamToFastq(qscript, alnFile,
Wai Yi Leung's avatar
Wai Yi Leung committed
297
298
299
        createFile(".unmap.R1.fastq"),
        createFile(".unmap.R2.fastq")
      )
300
301
302
      samToFastq.isIntermediate = true
      qscript.add(samToFastq)

Wai Yi Leung's avatar
Wai Yi Leung committed
303
304
305
306
307
308
309
310
311
      // sync the fastq records
      val fastqsync = new FastqSync(qscript)
      fastqsync.refFastq = samToFastq.fastqR1
      fastqsync.inputFastq1 = samToFastq.fastqR1
      fastqsync.inputFastq2 = samToFastq.fastqR2
      fastqsync.outputFastq1 = createFile(".unmapsynced.R1.fastq.gz")
      fastqsync.outputFastq2 = createFile(".unmapsynced.R2.fastq.gz")
      fastqsync.outputStats = createFile(".syncstats.json")
      qscript.add(fastqsync)
312
313
314

      // start kraken
      val krakenAnalysis = new Kraken(qscript)
Wai Yi Leung's avatar
Wai Yi Leung committed
315
      krakenAnalysis.input = List(fastqsync.outputFastq1, fastqsync.outputFastq2)
316
      krakenAnalysis.output = createFile(".krkn.raw")
Wai Yi Leung's avatar
Wai Yi Leung committed
317
      krakenAnalysis.paired = true
318
319
      krakenAnalysis.classified_out = Option(createFile(".krkn.classified.fastq"))
      krakenAnalysis.unclassified_out = Option(createFile(".krkn.unclassified.fastq"))
320
321
322
323
324
325
326
      qscript.add(krakenAnalysis)

      // create kraken summary file

      val krakenReport = new KrakenReport(qscript)
      krakenReport.input = krakenAnalysis.output
      krakenReport.show_zeros = true
327
      krakenReport.output = createFile(".krkn.full")
328
329
330
331
332
333
      qscript.add(krakenReport)

    }
  }

}