Gears.scala 12.7 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
 * Biopet is built on top of GATK Queue for building bioinformatic
 * pipelines. It is mainly intended to support LUMC SHARK cluster which is running
 * SGE. But other types of HPC that are supported by GATK Queue (such as PBS)
 * should also be able to execute Biopet tools and pipelines.
 *
 * Copyright 2014 Sequencing Analysis Support Core - Leiden University Medical Center
 *
 * Contact us at: sasc@lumc.nl
 *
 * A dual licensing mode is applied. The source code within this project that are
 * not part of GATK Queue is freely available for non-commercial use under an AGPL
 * license; For commercial users or users who do not want to follow the AGPL
 * license, please contact us to obtain a separate license.
 */
package nl.lumc.sasc.biopet.pipelines.gears

18
import htsjdk.samtools.SamReaderFactory
Peter van 't Hof's avatar
Peter van 't Hof committed
19
import nl.lumc.sasc.biopet.core.{ PipelineCommand, MultiSampleQScript }
Peter van 't Hof's avatar
Peter van 't Hof committed
20
import nl.lumc.sasc.biopet.utils.config.Configurable
21
22
23
24
25
26
import nl.lumc.sasc.biopet.extensions.Ln
import nl.lumc.sasc.biopet.extensions.kraken.{ Kraken, KrakenReport }
import nl.lumc.sasc.biopet.extensions.picard.{ AddOrReplaceReadGroups, MarkDuplicates, MergeSamFiles, SamToFastq }
import nl.lumc.sasc.biopet.extensions.sambamba.SambambaView
import nl.lumc.sasc.biopet.pipelines.bammetrics.BamMetrics
import nl.lumc.sasc.biopet.pipelines.mapping.Mapping
27
import nl.lumc.sasc.biopet.extensions.tools.FastqSync
28
29
import org.broadinstitute.gatk.queue.QScript

30
31
32
33
34
35
36
import scala.collection.JavaConversions._

/**
 * This is a trait for the Gears pipeline
 * The ShivaTrait is used as template for this pipeline
 */
class Gears(val root: Configurable) extends QScript with MultiSampleQScript { qscript =>
37
  def this() = this(null)
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59

  /** Executed before running the script */
  def init(): Unit = {
  }

  /** Method to add jobs */
  def biopetScript(): Unit = {
    addSamplesJobs()
    addSummaryJobs()
  }

  /** Multisample meta-genome comparison */
  def addMultiSampleJobs(): Unit = {
    // generate report from multiple samples, this is:
    // - the TSV
    // - the Spearman correlation plot + table
  }

  /** Location of summary file */
  def summaryFile = new File(outputDir, "gears.summary.json")

  /** Settings of pipeline for summary */
Peter van 't Hof's avatar
Peter van 't Hof committed
60
  def summarySettings = Map()
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293

  /** Files for the summary */
  def summaryFiles = Map()

  /** Method to make a sample */
  def makeSample(id: String) = new Sample(id)

  /** Class that will generate jobs for a sample */
  class Sample(sampleId: String) extends AbstractSample(sampleId) {
    /** Sample specific files to add to summary */
    def summaryFiles: Map[String, File] = {
      preProcessBam match {
        case Some(pb) => Map("bamFile" -> pb)
        case _        => Map()
      }
    } ++ Map("alignment" -> alnFile)

    /** Sample specific stats to add to summary */
    def summaryStats: Map[String, Any] = Map()

    /** Method to make a library */
    def makeLibrary(id: String) = new Library(id)

    /** Class to generate jobs for a library */
    class Library(libId: String) extends AbstractLibrary(libId) {
      /** Library specific files to add to the summary */
      def summaryFiles: Map[String, File] = {
        (bamFile, preProcessBam) match {
          case (Some(b), Some(pb)) => Map("bamFile" -> b, "preProcessBam" -> pb)
          case (Some(b), _)        => Map("bamFile" -> b)
          case _                   => Map()
        }
      }

      /** Alignment results of this library ~ can only be accessed after addJobs is run! */
      def alnFile: File = bamFile match {
        case Some(b) => b
        case _       => throw new IllegalStateException("The bamfile is not generated yet")
      }

      /** Library specific stats to add to summary */
      def summaryStats: Map[String, Any] = Map()

      /** Method to execute library preprocess */
      def preProcess(input: File): Option[File] = None

      /** Method to make the mapping submodule */
      def makeMapping = {
        val mapping = new Mapping(qscript)
        mapping.sampleId = Some(sampleId)
        mapping.libId = Some(libId)
        mapping.outputDir = libDir
        mapping.outputName = sampleId + "-" + libId
        (Some(mapping), Some(mapping.finalBamFile), preProcess(mapping.finalBamFile))
      }

      /**
       * Determine where where to start the pipeline in cases where both R1 (fastq) and BAM is specified
       */
      lazy val (mapping, bamFile, preProcessBam): (Option[Mapping], Option[File], Option[File]) =
        (config.contains("R1"), config.contains("bam")) match {
          case (true, _) => makeMapping // Default starting from fastq files
          case (false, true) => // Starting from bam file
            config("bam_to_fastq", default = false).asBoolean match {
              case true => makeMapping // bam file will be converted to fastq
              case false =>
                val file = new File(libDir, sampleId + "-" + libId + ".final.bam")
                (None, Some(file), preProcess(file))
            }
          case _ => (None, None, None)
        }

      /** This will add jobs for this library */
      def addJobs(): Unit = {
        (config.contains("R1"), config.contains("bam")) match {
          case (true, _) => mapping.foreach(mapping => {
            mapping.input_R1 = config("R1")
          })
          case (false, true) => config("bam_to_fastq", default = false).asBoolean match {
            case true =>
              val samToFastq = SamToFastq(qscript, config("bam"),
                new File(libDir, sampleId + "-" + libId + ".R1.fastq"),
                new File(libDir, sampleId + "-" + libId + ".R2.fastq"))
              samToFastq.isIntermediate = true
              qscript.add(samToFastq)
              mapping.foreach(mapping => {
                mapping.input_R1 = samToFastq.fastqR1
                mapping.input_R2 = Some(samToFastq.fastqR2)
              })
            case false =>
              val inputSam = SamReaderFactory.makeDefault.open(config("bam"))
              val readGroups = inputSam.getFileHeader.getReadGroups

              val readGroupOke = readGroups.forall(readGroup => {
                if (readGroup.getSample != sampleId) logger.warn("Sample ID readgroup in bam file is not the same")
                if (readGroup.getLibrary != libId) logger.warn("Library ID readgroup in bam file is not the same")
                readGroup.getSample == sampleId && readGroup.getLibrary == libId
              })
              inputSam.close()

              if (!readGroupOke) {
                if (config("correct_readgroups", default = false).asBoolean) {
                  logger.info("Correcting readgroups, file:" + config("bam"))
                  val aorrg = AddOrReplaceReadGroups(qscript, config("bam"), bamFile.get)
                  aorrg.RGID = sampleId + "-" + libId
                  aorrg.RGLB = libId
                  aorrg.RGSM = sampleId
                  aorrg.isIntermediate = true
                  qscript.add(aorrg)
                } else throw new IllegalStateException("Sample readgroup and/or library of input bamfile is not correct, file: " + bamFile +
                  "\nPlease note that it is possible to set 'correct_readgroups' to true in the config to automatic fix this")
              } else {
                val oldBamFile: File = config("bam")
                val oldIndex: File = new File(oldBamFile.getAbsolutePath.stripSuffix(".bam") + ".bai")
                val newIndex: File = new File(libDir, oldBamFile.getName.stripSuffix(".bam") + ".bai")
                val baiLn = Ln(qscript, oldIndex, newIndex)
                add(baiLn)

                val bamLn = Ln(qscript, oldBamFile, bamFile.get)
                bamLn.deps :+= baiLn.output
                add(bamLn)
              }
          }
          case _ => logger.warn("Sample: " + sampleId + "  Library: " + libId + ", no reads found")
        }
        mapping.foreach(mapping => {
          mapping.init()
          mapping.biopetScript()
          addAll(mapping.functions) // Add functions of mapping to current function pool
          addSummaryQScript(mapping)
        })
      }
    }

    /** This will add jobs for the double preprocessing */
    protected def addDoublePreProcess(input: List[File], isIntermediate: Boolean = false): Option[File] = {
      if (input == Nil) None
      else if (input.tail == Nil) {
        val bamFile = new File(sampleDir, input.head.getName)
        val oldIndex: File = new File(input.head.getAbsolutePath.stripSuffix(".bam") + ".bai")
        val newIndex: File = new File(sampleDir, input.head.getName.stripSuffix(".bam") + ".bai")
        val baiLn = Ln(qscript, oldIndex, newIndex)
        add(baiLn)

        val bamLn = Ln(qscript, input.head, bamFile)
        bamLn.deps :+= baiLn.output
        add(bamLn)
        Some(bamFile)
      } else {
        val md = new MarkDuplicates(qscript)
        md.input = input
        md.output = new File(sampleDir, sampleId + ".dedup.bam")
        md.outputMetrics = new File(sampleDir, sampleId + ".dedup.metrics")
        md.isIntermediate = isIntermediate
        md.removeDuplicates = true
        add(md)
        addSummarizable(md, "mark_duplicates")
        Some(md.output)
      }
    }

    lazy val preProcessBam: Option[File] = addDoublePreProcess(libraries.flatMap(lib => {
      (lib._2.bamFile, lib._2.preProcessBam) match {
        case (_, Some(file)) => Some(file)
        case (Some(file), _) => Some(file)
        case _               => None
      }
    }).toList)
    def alnFile: File = sampleBamLinkJob.output

    /** Job for combining all library BAMs */
    private def sampleBamLinkJob: Ln =
      makeCombineJob(libraries.values.map(_.alnFile).toList, createFile(".bam"))

    /** Ln or MergeSamFile job, depending on how many inputs are supplied */
    private def makeCombineJob(inFiles: List[File], outFile: File,
                               mergeSortOrder: String = "coordinate"): Ln = {
      require(inFiles.nonEmpty, "At least one input files for combine job")
      val input: File = {

        if (inFiles.size == 1) inFiles.head
        else {
          val mergedBam = createFile(".merged.bam")
          val mergejob = new MergeSamFiles(qscript)
          mergejob.input = inFiles
          mergejob.output = mergedBam
          mergejob.sortOrder = mergeSortOrder
          add(mergejob)
          mergejob.output
        }
      }

      val linkJob = new Ln(qscript)
      linkJob.input = input
      linkJob.output = outFile
      linkJob

    }

    /** This will add sample jobs */
    def addJobs(): Unit = {
      addPerLibJobs()
      // merge or symlink per-library alignments
      add(sampleBamLinkJob)

      if (preProcessBam.isDefined) {
        val bamMetrics = new BamMetrics(qscript)
        bamMetrics.sampleId = Some(sampleId)
        bamMetrics.inputBam = preProcessBam.get
        bamMetrics.outputDir = sampleDir
        bamMetrics.init()
        bamMetrics.biopetScript()
        addAll(bamMetrics.functions)
        addSummaryQScript(bamMetrics)
      }

      // sambamba view -f bam -F "unmapped or mate_is_unmapped" <alnFile> > <extracted.bam>
      val samFilterUnmapped = new SambambaView(qscript)
      samFilterUnmapped.input = alnFile
      samFilterUnmapped.filter = Some("unmapped or mate_is_unmapped")
      samFilterUnmapped.output = createFile(".unmapped.bam")
      samFilterUnmapped.isIntermediate = true
      qscript.add(samFilterUnmapped)

      // start bam to fastq (only on unaligned reads) also extract the matesam
      val samToFastq = SamToFastq(qscript, alnFile,
        createFile(".unmap.R1.fastq"),
        createFile(".unmap.R2.fastq")
      )
      samToFastq.isIntermediate = true
      qscript.add(samToFastq)

      // sync the fastq records
Peter van 't Hof's avatar
Peter van 't Hof committed
294
295
      val fastqSync = new FastqSync(qscript)
      fastqSync.refFastq = samToFastq.fastqR1
Peter van 't Hof's avatar
Peter van 't Hof committed
296
297
      fastqSync.inputFastq1 = samToFastq.fastqR1
      fastqSync.inputFastq2 = samToFastq.fastqR2
Peter van 't Hof's avatar
Peter van 't Hof committed
298
299
300
301
      fastqSync.outputFastq1 = createFile(".unmapsynced.R1.fastq.gz")
      fastqSync.outputFastq2 = createFile(".unmapsynced.R2.fastq.gz")
      fastqSync.outputStats = createFile(".syncstats.json")
      qscript.add(fastqSync)
302
303
304

      // start kraken
      val krakenAnalysis = new Kraken(qscript)
Peter van 't Hof's avatar
Peter van 't Hof committed
305
      krakenAnalysis.input = List(fastqSync.outputFastq1, fastqSync.outputFastq2)
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
      krakenAnalysis.output = createFile(".krkn.raw")
      krakenAnalysis.paired = true
      krakenAnalysis.classified_out = Option(createFile(".krkn.classified.fastq"))
      krakenAnalysis.unclassified_out = Option(createFile(".krkn.unclassified.fastq"))
      qscript.add(krakenAnalysis)

      // create kraken summary file

      val krakenReport = new KrakenReport(qscript)
      krakenReport.input = krakenAnalysis.output
      krakenReport.show_zeros = true
      krakenReport.output = createFile(".krkn.full")
      qscript.add(krakenReport)
    }
  }
321
322
323
324
}

/** This object give a default main method to the pipelines */
object Gears extends PipelineCommand