MultisampleMapping.scala 16.8 KB
Newer Older
Peter van 't Hof's avatar
Peter van 't Hof committed
1
/**
2 3 4 5 6 7 8 9 10 11 12 13 14
  * Biopet is built on top of GATK Queue for building bioinformatic
  * pipelines. It is mainly intended to support LUMC SHARK cluster which is running
  * SGE. But other types of HPC that are supported by GATK Queue (such as PBS)
  * should also be able to execute Biopet tools and pipelines.
  *
  * Copyright 2014 Sequencing Analysis Support Core - Leiden University Medical Center
  *
  * Contact us at: sasc@lumc.nl
  *
  * A dual licensing mode is applied. The source code within this project is freely available for non-commercial use under an AGPL
  * license; For commercial users or users who do not want to follow the AGPL
  * license, please contact us to obtain a separate license.
  */
15 16 17 18 19
package nl.lumc.sasc.biopet.pipelines.mapping

import java.io.File

import htsjdk.samtools.SamReaderFactory
20
import htsjdk.samtools.reference.FastaSequenceFile
Peter van 't Hof's avatar
Peter van 't Hof committed
21
import nl.lumc.sasc.biopet.core.report.ReportBuilderExtension
22
import nl.lumc.sasc.biopet.core.{MultiSampleQScript, PipelineCommand, Reference}
23
import nl.lumc.sasc.biopet.extensions.Ln
24
import nl.lumc.sasc.biopet.extensions.picard._
25
import nl.lumc.sasc.biopet.pipelines.bammetrics.BamMetrics
26
import nl.lumc.sasc.biopet.pipelines.bamtobigwig.Bam2Wig
27
import nl.lumc.sasc.biopet.pipelines.gears.GearsSingle
28
import nl.lumc.sasc.biopet.utils.Logging
Peter van 't Hof's avatar
Peter van 't Hof committed
29
import nl.lumc.sasc.biopet.utils.config.Configurable
30
import org.broadinstitute.gatk.queue.QScript
Peter van 't Hof's avatar
Peter van 't Hof committed
31
import MultisampleMapping.MergeStrategy
32
import nl.lumc.sasc.biopet.extensions.sambamba.{SambambaMarkdup, SambambaMerge}
Peter van 't Hof's avatar
Peter van 't Hof committed
33

34 35 36
import scala.collection.JavaConversions._

/**
37 38 39 40 41
  * Created by pjvanthof on 18/12/15.
  *
  * This trait is meant to extend pipelines from that require a alignment step
  */
trait MultisampleMappingTrait extends MultiSampleQScript with Reference { qscript: QScript =>
42

43
  /** With this method the merge strategy for libraries to samples is defined. This can be overriden to fix the merge strategy. */
Peter van 't Hof's avatar
Peter van 't Hof committed
44
  def mergeStrategy: MergeStrategy.Value = {
Peter van 't Hof's avatar
Peter van 't Hof committed
45
    val value: String = config("merge_strategy", default = "preprocessmarkduplicates")
Peter van 't Hof's avatar
Peter van 't Hof committed
46
    MergeStrategy.values.find(_.toString.toLowerCase == value.toLowerCase) match {
Peter van 't Hof's avatar
Peter van 't Hof committed
47
      case Some(v) => v
48
      case _ => throw new IllegalArgumentException(s"merge_strategy '$value' does not exist")
Peter van 't Hof's avatar
Peter van 't Hof committed
49 50 51
    }
  }

52
  def init(): Unit = {}
53

54
  /** Is there are jobs that needs to be added before the rest of the jobs this methods can be overriden, to let the sample jobs this work the super call should be done also */
55
  def biopetScript(): Unit = {
Peter van 't Hof's avatar
Peter van 't Hof committed
56
    addSamplesJobs()
Peter van 't Hof's avatar
Peter van 't Hof committed
57
    addSummaryJobs()
58 59
  }

60
  /** This is de default multisample mapping report, this can be extended by other pipelines */
Peter van 't Hof's avatar
Peter van 't Hof committed
61
  override def reportClass: Option[ReportBuilderExtension] = {
62 63
    val report = new MultisampleMappingReport(this)
    report.outputDir = new File(outputDir, "report")
64
    report.summaryDbFile = summaryDbFile
65 66 67
    Some(report)
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
68 69 70 71
  override def defaults: Map[String, Any] = super.defaults ++ Map(
    "reordersam" -> Map("allow_incomplete_dict_concordance" -> true),
    "gears" -> Map("skip_flexiprep" -> true)
  )
72

73 74
  override def fixedValues: Map[String, Any] =
    super.fixedValues ++ Map("gearssingle" -> Map("skip_flexiprep" -> true))
Peter van 't Hof's avatar
Peter van 't Hof committed
75

76
  /** In a default multisample mapping run there are no multsample jobs. This method can be overriden by other pipelines */
77
  def addMultiSampleJobs(): Unit = {
78 79 80
    // this code will be executed after all code of all samples is executed
  }

81
  /** By default only the reference is put in the summary, when extending pippeline specific files can be added */
82
  def summaryFiles: Map[String, File] = Map("referenceFasta" -> referenceFasta())
83

84
  /** By default only the reference is put in the summary, when extending pippeline specific settings can be added */
85 86
  def summarySettings: Map[String, Any] =
    Map("reference" -> referenceSummary, "merge_strategy" -> mergeStrategy.toString)
87 88

  def makeSample(id: String) = new Sample(id)
Peter van 't Hof's avatar
Peter van 't Hof committed
89
  class Sample(sampleId: String) extends AbstractSample(sampleId) { sample =>
90

Sander Bollen's avatar
Sander Bollen committed
91
    val gearsPipeline: Option[GearsSingle] = mappingToGears match {
92 93 94 95 96 97 98 99 100 101 102
      case "unmapped" =>
        val gears = new GearsSingle(qscript)
        gears.sampleId = Some(sampleId)
        gears.outputDir = new File(sampleDir, "gears")
        Some(gears)
      case "all" =>
        val gears = new GearsSingle(qscript)
        gears.sampleId = Some(sampleId)
        gears.outputDir = new File(sampleDir, "gears")
        Some(gears)
      case "none" => None
Peter van 't Hof's avatar
Peter van 't Hof committed
103
      case x =>
104 105 106 107
        Logging.addError(s"$x is not a valid value for 'mapping_to_gears'")
        None
    }

108 109
    def metricsPreprogressBam = true

110
    def makeLibrary(id: String) = new Library(id)
Peter van 't Hof's avatar
Peter van 't Hof committed
111
    class Library(libId: String) extends AbstractLibrary(libId) { lib =>
112 113

      /** By default the bams files are put in the summary, more files can be added here */
114 115 116 117
      def summaryFiles: Map[String, File] =
        (inputR1.map("input_R1" -> _) :: inputR2.map("input_R2" -> _) ::
          inputBam.map("input_bam" -> _) :: bamFile.map("output_bam" -> _) ::
          preProcessBam.map("output_bam_preprocess" -> _) :: Nil).flatten.toMap
118 119 120

      def summaryStats: Map[String, Any] = Map()

Peter van 't Hof's avatar
Peter van 't Hof committed
121 122
      lazy val inputR1: Option[File] = MultisampleMapping.fileMustBeAbsolute(config("R1"))
      lazy val inputR2: Option[File] = MultisampleMapping.fileMustBeAbsolute(config("R2"))
Peter van 't Hof's avatar
Peter van 't Hof committed
123 124
      lazy val qcFastqR1: Option[File] = mapping.map(_.flexiprep.fastqR1Qc)
      lazy val qcFastqR2: Option[File] = mapping.flatMap(_.flexiprep.fastqR2Qc)
125 126
      lazy val inputBam: Option[File] =
        MultisampleMapping.fileMustBeAbsolute(if (inputR1.isEmpty) config("bam") else None)
127 128 129
      lazy val bamToFastq: Boolean = config("bam_to_fastq", default = false)
      lazy val correctReadgroups: Boolean = config("correct_readgroups", default = false)

Peter van 't Hof's avatar
Peter van 't Hof committed
130
      def keepFinalBamfile: Boolean = samples(sampleId).libraries.size == 1
Peter van 't Hof's avatar
Peter van 't Hof committed
131

132 133 134 135 136 137 138 139 140 141 142
      lazy val mapping: Option[Mapping] =
        if (inputR1.isDefined || (inputBam.isDefined && bamToFastq)) {
          val m: Mapping = new Mapping(qscript) {
            override def configNamespace = "mapping"
            override def defaults: Map[String, Any] =
              super.defaults ++
                Map("keep_final_bamfile" -> keepFinalBamfile)
          }
          m.sampleId = Some(sampleId)
          m.libId = Some(libId)
          m.outputDir = libDir
Sander Bollen's avatar
Sander Bollen committed
143
          m.centrifugeKreport =
Sander Bollen's avatar
Sander Bollen committed
144
            gearsPipeline.flatMap(g => g.centrifugeScript.map(c => c.centrifugeNonUniqueKReport))
Sander Bollen's avatar
Sander Bollen committed
145
          m.centrifugeOutputFile =
Sander Bollen's avatar
Sander Bollen committed
146
            gearsPipeline.flatMap(g => g.centrifugeScript.map(c => c.centrifugeOutput))
147 148
          Some(m)
        } else None
149

Peter van 't Hof's avatar
Peter van 't Hof committed
150
      def bamFile: Option[File] = mapping match {
151
        case Some(m) => Some(m.mergedBamFile)
152
        case _ if inputBam.isDefined => Some(new File(libDir, s"$sampleId-$libId.bam"))
153
        case _ => None
154 155
      }

156
      /** By default the preProcessBam is the same as the normal bamFile. A pipeline can extend this is there are preprocess steps */
Peter van 't Hof's avatar
Peter van 't Hof committed
157
      def preProcessBam: Option[File] = bamFile
Peter van 't Hof's avatar
Peter van 't Hof committed
158

159
      /** This method can be extended to add jobs to the pipeline, to do this the super call of this function must be called by the pipelines */
160
      def addJobs(): Unit = {
Peter van 't Hof's avatar
Peter van 't Hof committed
161 162 163 164
        inputR1.foreach(inputFiles :+= new InputFile(_, config("R1_md5")))
        inputR2.foreach(inputFiles :+= new InputFile(_, config("R2_md5")))
        inputBam.foreach(inputFiles :+= new InputFile(_, config("bam_md5")))

165 166
        if (inputR1.isDefined) {
          mapping.foreach { m =>
Sander van der Zeeuw's avatar
Sander van der Zeeuw committed
167 168
            m.inputR1 = inputR1.get
            m.inputR2 = inputR2
169 170 171 172
            add(m)
          }
        } else if (inputBam.isDefined) {
          if (bamToFastq) {
173 174 175 176
            val samToFastq = SamToFastq(qscript,
                                        inputBam.get,
                                        new File(libDir, sampleId + "-" + libId + ".R1.fq.gz"),
                                        new File(libDir, sampleId + "-" + libId + ".R2.fq.gz"))
Peter van 't Hof's avatar
Peter van 't Hof committed
177
            samToFastq.isIntermediate = libraries.size > 1
178 179
            qscript.add(samToFastq)
            mapping.foreach(m => {
Sander van der Zeeuw's avatar
Sander van der Zeeuw committed
180 181
              m.inputR1 = samToFastq.fastqR1
              m.inputR2 = Some(samToFastq.fastqR2)
182 183 184 185
              add(m)
            })
          } else {
            val inputSam = SamReaderFactory.makeDefault.open(inputBam.get)
186 187 188
            val header = inputSam.getFileHeader
            val readGroups = header.getReadGroups
            val referenceFile = new FastaSequenceFile(referenceFasta(), true)
Peter van 't Hof's avatar
Peter van 't Hof committed
189 190 191
            val dictOke: Boolean = {
              var oke = true
              try {
Peter van 't Hof's avatar
Peter van 't Hof committed
192
                header.getSequenceDictionary.assertSameDictionary(referenceDict)
Peter van 't Hof's avatar
Peter van 't Hof committed
193
              } catch {
Peter van 't Hof's avatar
Peter van 't Hof committed
194
                case e: AssertionError =>
Peter van 't Hof's avatar
Peter van 't Hof committed
195
                  logger.error(e.getMessage)
Peter van 't Hof's avatar
Peter van 't Hof committed
196 197 198 199
                  oke = false
              }
              oke
            }
200 201
            inputSam.close()
            referenceFile.close()
202 203

            val readGroupOke = readGroups.forall(readGroup => {
204 205 206 207
              if (readGroup.getSample != sampleId)
                logger.warn("Sample ID readgroup in bam file is not the same")
              if (readGroup.getLibrary != libId)
                logger.warn("Library ID readgroup in bam file is not the same")
208
              readGroup.getSample == sampleId && readGroup.getLibrary == libId
209
            }) && readGroups.nonEmpty
210

211
            if (!readGroupOke || !dictOke) {
212 213 214 215 216 217 218
              if (!readGroupOke && !correctReadgroups)
                Logging.addError(
                  "Sample readgroup and/or library of input bamfile is not correct, file: " + bamFile +
                    "\nPlease note that it is possible to set 'correct_readgroups' to true in the config to automatic fix this")
              if (!dictOke)
                Logging.addError(
                  "Sequence dictionary in the bam file is not the same as the reference, file: " + bamFile)
219

Peter van 't Hof's avatar
Peter van 't Hof committed
220
              if (!readGroupOke && correctReadgroups) {
221
                logger.info("Correcting readgroups, file:" + inputBam.get)
Peter van 't Hof's avatar
Peter van 't Hof committed
222 223 224 225 226 227
                val aorrg = AddOrReplaceReadGroups(qscript, inputBam.get, bamFile.get)
                aorrg.RGID = config("rgid", default = s"$sampleId-$libId")
                aorrg.RGLB = libId
                aorrg.RGSM = sampleId
                aorrg.RGPL = config("rgpl", default = "unknown")
                aorrg.RGPU = config("rgpu", default = "na")
Peter van 't Hof's avatar
Peter van 't Hof committed
228
                aorrg.isIntermediate = libraries.size > 1
229
                qscript.add(aorrg)
230
              }
Peter van 't Hof's avatar
Peter van 't Hof committed
231
            } else add(Ln.linkBamFile(qscript, inputBam.get, bamFile.get): _*)
232
          }
233

Peter van 't Hof's avatar
Peter van 't Hof committed
234 235 236 237 238 239 240 241 242
          if (!bamToFastq) {
            val bamMetrics = new BamMetrics(qscript)
            bamMetrics.sampleId = Some(sampleId)
            bamMetrics.libId = Some(libId)
            bamMetrics.inputBam = bamFile.get
            bamMetrics.outputDir = new File(libDir, "metrics")
            bamMetrics.paired = inputR2.isDefined || inputBam.isDefined
            add(bamMetrics)
          }
243
          if (config("execute_bam2wig", default = true)) add(Bam2Wig(qscript, bamFile.get))
244

245 246 247 248
        } else logger.warn(s"Sample '$sampleId' does not have any input files")
      }
    }

249
    /** By default the bams files are put in the summary, more files can be added here */
250 251 252
    def summaryFiles: Map[String, File] =
      (bamFile.map("output_bam" -> _) ::
        preProcessBam.map("output_bam_preprocess" -> _) :: Nil).flatten.toMap
253 254 255

    def summaryStats: Map[String, Any] = Map()

256
    /** This is the merged bam file, None if the merged bam file is NA */
257 258 259 260 261
    def bamFile: Option[File] =
      if (libraries.flatMap(_._2.bamFile).nonEmpty &&
          mergeStrategy != MultisampleMapping.MergeStrategy.None)
        Some(new File(sampleDir, s"$sampleId.bam"))
      else None
Peter van 't Hof's avatar
Peter van 't Hof committed
262

263
    /** By default the preProcessBam is the same as the normal bamFile. A pipeline can extend this is there are preprocess steps */
Peter van 't Hof's avatar
Peter van 't Hof committed
264
    def preProcessBam: Option[File] = bamFile
Peter van 't Hof's avatar
Peter van 't Hof committed
265

266
    /** Default is set to keep the merged files, user can set this in the config. To change the default this method can be overriden */
Peter van 't Hof's avatar
Peter van 't Hof committed
267 268
    def keepMergedFiles: Boolean = config("keep_merged_files", default = true)

Peter van 't Hof's avatar
Peter van 't Hof committed
269
    /**
270 271
      * @deprecated
      */
Peter van 't Hof's avatar
Peter van 't Hof committed
272
    lazy val unmappedToGears: Boolean = config("unmapped_to_gears", default = false)
273 274 275
    if (config.contains("unmapped_to_gears"))
      logger.warn(
        "Config value 'unmapped_to_gears' is replaced with 'mapping_to_gears', Assumes default: mapping_to_gears=unmapped")
Peter van 't Hof's avatar
Peter van 't Hof committed
276

277 278
    lazy val mappingToGears: String =
      config("mapping_to_gears", default = if (unmappedToGears) "unmapped" else "none")
Peter van 't Hof's avatar
Peter van 't Hof committed
279

280
    /** This method can be extended to add jobs to the pipeline, to do this the super call of this function must be called by the pipelines */
Peter van 't Hof's avatar
Peter van 't Hof committed
281
    def addJobs(): Unit = {
282
      addPerLibJobs() // This add jobs for each library
Peter van 't Hof's avatar
Peter van 't Hof committed
283 284

      mergeStrategy match {
Peter van 't Hof's avatar
Peter van 't Hof committed
285
        case MergeStrategy.None =>
286
        case (MergeStrategy.MergeSam) if libraries.flatMap(_._2.bamFile).size == 1 =>
Peter van 't Hof's avatar
Peter van 't Hof committed
287
          add(Ln.linkBamFile(qscript, libraries.flatMap(_._2.bamFile).head, bamFile.get): _*)
288 289
        case (MergeStrategy.PreProcessMergeSam)
            if libraries.flatMap(_._2.preProcessBam).size == 1 =>
Peter van 't Hof's avatar
Peter van 't Hof committed
290
          add(Ln.linkBamFile(qscript, libraries.flatMap(_._2.preProcessBam).head, bamFile.get): _*)
Peter van 't Hof's avatar
Peter van 't Hof committed
291
        case MergeStrategy.MergeSam =>
292 293 294 295 296
          add(
            MergeSamFiles(qscript,
                          libraries.flatMap(_._2.bamFile).toList,
                          bamFile.get,
                          isIntermediate = !keepMergedFiles))
Peter van 't Hof's avatar
Peter van 't Hof committed
297
        case MergeStrategy.PreProcessMergeSam =>
298 299 300 301 302
          add(
            MergeSamFiles(qscript,
                          libraries.flatMap(_._2.preProcessBam).toList,
                          bamFile.get,
                          isIntermediate = !keepMergedFiles))
Peter van 't Hof's avatar
Peter van 't Hof committed
303
        case MergeStrategy.MarkDuplicates =>
304 305 306 307 308
          add(
            MarkDuplicates(qscript,
                           libraries.flatMap(_._2.bamFile).toList,
                           bamFile.get,
                           isIntermediate = !keepMergedFiles))
Peter van 't Hof's avatar
Peter van 't Hof committed
309
        case MergeStrategy.PreProcessMarkDuplicates =>
310 311 312 313 314
          add(
            MarkDuplicates(qscript,
                           libraries.flatMap(_._2.preProcessBam).toList,
                           bamFile.get,
                           isIntermediate = !keepMergedFiles))
315
        case MergeStrategy.PreProcessSambambaMarkdup =>
Peter van 't Hof's avatar
Peter van 't Hof committed
316
          val mergedBam = if (libraries.flatMap(_._2.bamFile).size == 1) {
317 318 319 320
            add(
              Ln.linkBamFile(qscript,
                             libraries.flatMap(_._2.preProcessBam).head,
                             new File(sampleDir, "merged.bam")): _*)
Peter van 't Hof's avatar
Peter van 't Hof committed
321
            libraries.flatMap(_._2.preProcessBam).head
322 323 324
          } else {
            val merge = new SambambaMerge(qscript)
            merge.input = libraries.flatMap(_._2.preProcessBam).toList
Peter van 't Hof's avatar
Peter van 't Hof committed
325
            merge.output = new File(sampleDir, "merged.bam")
Peter van 't Hof's avatar
Peter van 't Hof committed
326
            merge.isIntermediate = true
327
            add(merge)
Peter van 't Hof's avatar
Peter van 't Hof committed
328
            merge.output
329 330
          }
          add(SambambaMarkdup(qscript, mergedBam, bamFile.get, isIntermediate = !keepMergedFiles))
331 332 333 334 335 336 337
          add(
            Ln(qscript,
               bamFile.get + ".bai",
               bamFile.get.getAbsolutePath.stripSuffix(".bam") + ".bai"))
        case _ =>
          throw new IllegalStateException(
            "This should not be possible, unimplemented MergeStrategy?")
Peter van 't Hof's avatar
Peter van 't Hof committed
338 339
      }

340
      if (mergeStrategy != MergeStrategy.None && libraries.flatMap(_._2.bamFile).nonEmpty) {
Peter van 't Hof's avatar
Peter van 't Hof committed
341 342
        val bamMetrics = new BamMetrics(qscript)
        bamMetrics.sampleId = Some(sampleId)
343
        bamMetrics.inputBam = if (metricsPreprogressBam) preProcessBam.get else bamFile.get
Peter van 't Hof's avatar
Peter van 't Hof committed
344
        bamMetrics.outputDir = new File(sampleDir, "metrics")
Peter van 't Hof's avatar
Peter van 't Hof committed
345 346
        bamMetrics.paired =
          libraries.exists(x => x._2.inputR1.isDefined || x._2.inputBam.isDefined)
Peter van 't Hof's avatar
Peter van 't Hof committed
347
        add(bamMetrics)
348 349

        if (config("execute_bam2wig", default = true)) add(Bam2Wig(qscript, preProcessBam.get))
Peter van 't Hof's avatar
Peter van 't Hof committed
350
      }
351

Sander Bollen's avatar
Sander Bollen committed
352 353
      mappingToGears match {
        case "unmapped" =>
Sander Bollen's avatar
Sander Bollen committed
354 355
          gearsPipeline.get.bamFile = preProcessBam
          add(gearsPipeline.get)
Sander Bollen's avatar
Sander Bollen committed
356
        case "all" =>
Sander Bollen's avatar
Sander Bollen committed
357 358 359
          gearsPipeline.get.fastqR1 = libraries.flatMap(_._2.qcFastqR1).toList
          gearsPipeline.get.fastqR2 = libraries.flatMap(_._2.qcFastqR2).toList
          add(gearsPipeline.get)
360
        case _ =>
361
      }
362 363 364
    }
  }
}
Peter van 't Hof's avatar
Peter van 't Hof committed
365

366
/** This class is the default implementation that can be used on the command line */
Peter van 't Hof's avatar
Peter van 't Hof committed
367
class MultisampleMapping(val parent: Configurable) extends QScript with MultisampleMappingTrait {
Peter van 't Hof's avatar
Peter van 't Hof committed
368 369
  def this() = this(null)
}
370 371 372 373

object MultisampleMapping extends PipelineCommand {

  object MergeStrategy extends Enumeration {
374 375
    val None, MergeSam, MarkDuplicates, PreProcessMergeSam, PreProcessMarkDuplicates,
    PreProcessSambambaMarkdup = Value
376 377
  }

378
  /** When file is not absolute an error is raise att the end of the script of a pipeline */
Peter van 't Hof's avatar
Peter van 't Hof committed
379
  def fileMustBeAbsolute(file: Option[File]): Option[File] = {
380 381 382 383 384 385 386
    if (file.forall(_.isAbsolute)) file
    else {
      Logging.addError(s"$file should be a absolute file path")
      file.map(_.getAbsoluteFile)
    }
  }
}