MultisampleMapping.scala 14.5 KB
Newer Older
Peter van 't Hof's avatar
Peter van 't Hof committed
1 2 3 4 5 6 7 8 9 10
/**
 * Biopet is built on top of GATK Queue for building bioinformatic
 * pipelines. It is mainly intended to support LUMC SHARK cluster which is running
 * SGE. But other types of HPC that are supported by GATK Queue (such as PBS)
 * should also be able to execute Biopet tools and pipelines.
 *
 * Copyright 2014 Sequencing Analysis Support Core - Leiden University Medical Center
 *
 * Contact us at: sasc@lumc.nl
 *
11
 * A dual licensing mode is applied. The source code within this project is freely available for non-commercial use under an AGPL
Peter van 't Hof's avatar
Peter van 't Hof committed
12 13 14
 * license; For commercial users or users who do not want to follow the AGPL
 * license, please contact us to obtain a separate license.
 */
15 16 17 18 19
package nl.lumc.sasc.biopet.pipelines.mapping

import java.io.File

import htsjdk.samtools.SamReaderFactory
20
import htsjdk.samtools.reference.FastaSequenceFile
Peter van 't Hof's avatar
Peter van 't Hof committed
21
import nl.lumc.sasc.biopet.core.report.ReportBuilderExtension
Peter van 't Hof's avatar
Peter van 't Hof committed
22
import nl.lumc.sasc.biopet.core.{ PipelineCommand, Reference, MultiSampleQScript }
23
import nl.lumc.sasc.biopet.extensions.Ln
24
import nl.lumc.sasc.biopet.extensions.picard._
25
import nl.lumc.sasc.biopet.pipelines.bammetrics.BamMetrics
26
import nl.lumc.sasc.biopet.pipelines.bamtobigwig.Bam2Wig
27
import nl.lumc.sasc.biopet.pipelines.gears.GearsSingle
28
import nl.lumc.sasc.biopet.utils.Logging
Peter van 't Hof's avatar
Peter van 't Hof committed
29
import nl.lumc.sasc.biopet.utils.config.Configurable
30 31
import org.broadinstitute.gatk.queue.QScript

Peter van 't Hof's avatar
Peter van 't Hof committed
32 33
import MultisampleMapping.MergeStrategy

34 35 36
import scala.collection.JavaConversions._

/**
Peter van 't Hof's avatar
Peter van 't Hof committed
37
 * Created by pjvanthof on 18/12/15.
Peter van 't Hof's avatar
Peter van 't Hof committed
38 39
 *
 * This trait is meant to extend pipelines from that require a alignment step
Peter van 't Hof's avatar
Peter van 't Hof committed
40
 */
Peter van 't Hof's avatar
Peter van 't Hof committed
41 42
trait MultisampleMappingTrait extends MultiSampleQScript
  with Reference { qscript: QScript =>
43

44
  /** With this method the merge strategy for libraries to samples is defined. This can be overriden to fix the merge strategy. */
Peter van 't Hof's avatar
Peter van 't Hof committed
45
  def mergeStrategy: MergeStrategy.Value = {
Peter van 't Hof's avatar
Peter van 't Hof committed
46
    val value: String = config("merge_strategy", default = "preprocessmarkduplicates")
Peter van 't Hof's avatar
Peter van 't Hof committed
47
    MergeStrategy.values.find(_.toString.toLowerCase == value.toLowerCase) match {
Peter van 't Hof's avatar
Peter van 't Hof committed
48
      case Some(v) => v
Peter van 't Hof's avatar
Peter van 't Hof committed
49
      case _       => throw new IllegalArgumentException(s"merge_strategy '$value' does not exist")
Peter van 't Hof's avatar
Peter van 't Hof committed
50 51 52
    }
  }

53
  def init(): Unit = {
54 55
  }

56
  /** Is there are jobs that needs to be added before the rest of the jobs this methods can be overriden, to let the sample jobs this work the super call should be done also */
57
  def biopetScript(): Unit = {
Peter van 't Hof's avatar
Peter van 't Hof committed
58
    addSamplesJobs()
Peter van 't Hof's avatar
Peter van 't Hof committed
59
    addSummaryJobs()
60 61
  }

62
  /** This is de default multisample mapping report, this can be extended by other pipelines */
Peter van 't Hof's avatar
Peter van 't Hof committed
63
  override def reportClass: Option[ReportBuilderExtension] = {
64 65 66 67 68 69
    val report = new MultisampleMappingReport(this)
    report.outputDir = new File(outputDir, "report")
    report.summaryFile = summaryFile
    Some(report)
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
70 71 72 73
  override def defaults: Map[String, Any] = super.defaults ++ Map(
    "reordersam" -> Map("allow_incomplete_dict_concordance" -> true),
    "gears" -> Map("skip_flexiprep" -> true)
  )
74

Peter van 't Hof's avatar
Peter van 't Hof committed
75
  override def fixedValues: Map[String, Any] = super.fixedValues ++ Map("gearssingle" -> Map("skip_flexiprep" -> true))
76

77
  /** In a default multisample mapping run there are no multsample jobs. This method can be overriden by other pipelines */
78
  def addMultiSampleJobs(): Unit = {
79 80 81
    // this code will be executed after all code of all samples is executed
  }

82
  /** By default only the reference is put in the summary, when extending pippeline specific files can be added */
83
  def summaryFiles: Map[String, File] = Map("referenceFasta" -> referenceFasta())
84

85
  /** By default only the reference is put in the summary, when extending pippeline specific settings can be added */
86 87 88
  def summarySettings: Map[String, Any] = Map(
    "reference" -> referenceSummary,
    "merge_strategy" -> mergeStrategy.toString)
89 90

  def makeSample(id: String) = new Sample(id)
Peter van 't Hof's avatar
Peter van 't Hof committed
91
  class Sample(sampleId: String) extends AbstractSample(sampleId) { sample =>
92 93

    def makeLibrary(id: String) = new Library(id)
Peter van 't Hof's avatar
Peter van 't Hof committed
94
    class Library(libId: String) extends AbstractLibrary(libId) { lib =>
95 96

      /** By default the bams files are put in the summary, more files can be added here */
Peter van 't Hof's avatar
Peter van 't Hof committed
97
      def summaryFiles: Map[String, File] = (inputR1.map("input_R1" -> _) :: inputR2.map("input_R2" -> _) ::
Peter van 't Hof's avatar
Peter van 't Hof committed
98
        inputBam.map("input_bam" -> _) :: bamFile.map("output_bam" -> _) ::
bow's avatar
bow committed
99
        preProcessBam.map("output_bam_preprocess" -> _) :: Nil).flatten.toMap
100 101 102

      def summaryStats: Map[String, Any] = Map()

Peter van 't Hof's avatar
Peter van 't Hof committed
103 104
      lazy val inputR1: Option[File] = MultisampleMapping.fileMustBeAbsolute(config("R1"))
      lazy val inputR2: Option[File] = MultisampleMapping.fileMustBeAbsolute(config("R2"))
Peter van 't Hof's avatar
Peter van 't Hof committed
105 106
      lazy val qcFastqR1 = mapping.map(_.flexiprep.fastqR1Qc)
      lazy val qcFastqR2 = mapping.flatMap(_.flexiprep.fastqR2Qc)
Peter van 't Hof's avatar
Peter van 't Hof committed
107
      lazy val inputBam: Option[File] = MultisampleMapping.fileMustBeAbsolute(if (inputR1.isEmpty) config("bam") else None)
108 109 110
      lazy val bamToFastq: Boolean = config("bam_to_fastq", default = false)
      lazy val correctReadgroups: Boolean = config("correct_readgroups", default = false)

Peter van 't Hof's avatar
Peter van 't Hof committed
111
      def keepFinalBamfile: Boolean = samples(sampleId).libraries.size == 1
Peter van 't Hof's avatar
Peter van 't Hof committed
112

Peter van 't Hof's avatar
Peter van 't Hof committed
113 114
      lazy val mapping: Option[Mapping] = if (inputR1.isDefined || (inputBam.isDefined && bamToFastq)) {
        val m: Mapping = new Mapping(qscript) {
115 116
          override def configNamespace = "mapping"
          override def defaults: Map[String, Any] = super.defaults ++
Peter van 't Hof's avatar
Peter van 't Hof committed
117
            Map("keep_final_bamfile" -> keepFinalBamfile)
118
        }
119 120
        m.sampleId = Some(sampleId)
        m.libId = Some(libId)
Peter van 't Hof's avatar
Peter van 't Hof committed
121
        m.outputDir = libDir
122 123 124
        Some(m)
      } else None

Peter van 't Hof's avatar
Peter van 't Hof committed
125
      def bamFile: Option[File] = mapping match {
Peter van 't Hof's avatar
Peter van 't Hof committed
126
        case Some(m)                 => Some(m.finalBamFile)
127
        case _ if inputBam.isDefined => Some(new File(libDir, s"$sampleId-$libId.bam"))
Peter van 't Hof's avatar
Peter van 't Hof committed
128
        case _                       => None
129 130
      }

131
      /** By default the preProcessBam is the same as the normal bamFile. A pipeline can extend this is there are preprocess steps */
Peter van 't Hof's avatar
Peter van 't Hof committed
132
      def preProcessBam: Option[File] = bamFile
Peter van 't Hof's avatar
Peter van 't Hof committed
133

134
      /** This method can be extended to add jobs to the pipeline, to do this the super call of this function must be called by the pipelines */
135
      def addJobs(): Unit = {
Peter van 't Hof's avatar
Peter van 't Hof committed
136 137 138 139
        inputR1.foreach(inputFiles :+= new InputFile(_, config("R1_md5")))
        inputR2.foreach(inputFiles :+= new InputFile(_, config("R2_md5")))
        inputBam.foreach(inputFiles :+= new InputFile(_, config("bam_md5")))

140 141
        if (inputR1.isDefined) {
          mapping.foreach { m =>
142 143
            m.inputR1 = inputR1.get
            m.inputR2 = inputR2
144 145 146 147 148 149 150
            add(m)
          }
        } else if (inputBam.isDefined) {
          if (bamToFastq) {
            val samToFastq = SamToFastq(qscript, inputBam.get,
              new File(libDir, sampleId + "-" + libId + ".R1.fq.gz"),
              new File(libDir, sampleId + "-" + libId + ".R2.fq.gz"))
Peter van 't Hof's avatar
Peter van 't Hof committed
151
            samToFastq.isIntermediate = libraries.size > 1
152 153
            qscript.add(samToFastq)
            mapping.foreach(m => {
154 155
              m.inputR1 = samToFastq.fastqR1
              m.inputR2 = Some(samToFastq.fastqR2)
156 157 158 159
              add(m)
            })
          } else {
            val inputSam = SamReaderFactory.makeDefault.open(inputBam.get)
160 161 162
            val header = inputSam.getFileHeader
            val readGroups = header.getReadGroups
            val referenceFile = new FastaSequenceFile(referenceFasta(), true)
Peter van 't Hof's avatar
Peter van 't Hof committed
163 164 165
            val dictOke: Boolean = {
              var oke = true
              try {
Peter van 't Hof's avatar
Peter van 't Hof committed
166
                header.getSequenceDictionary.assertSameDictionary(referenceDict)
Peter van 't Hof's avatar
Peter van 't Hof committed
167
              } catch {
Peter van 't Hof's avatar
Peter van 't Hof committed
168
                case e: AssertionError =>
Peter van 't Hof's avatar
Peter van 't Hof committed
169
                  logger.error(e.getMessage)
Peter van 't Hof's avatar
Peter van 't Hof committed
170 171 172 173
                  oke = false
              }
              oke
            }
174 175
            inputSam.close()
            referenceFile.close()
176 177 178 179 180

            val readGroupOke = readGroups.forall(readGroup => {
              if (readGroup.getSample != sampleId) logger.warn("Sample ID readgroup in bam file is not the same")
              if (readGroup.getLibrary != libId) logger.warn("Library ID readgroup in bam file is not the same")
              readGroup.getSample == sampleId && readGroup.getLibrary == libId
181
            }) && readGroups.nonEmpty
182

183 184 185
            if (!readGroupOke || !dictOke) {
              if (!readGroupOke && !correctReadgroups) Logging.addError(
                "Sample readgroup and/or library of input bamfile is not correct, file: " + bamFile +
Peter van 't Hof's avatar
Peter van 't Hof committed
186
                  "\nPlease note that it is possible to set 'correct_readgroups' to true in the config to automatic fix this")
Peter van 't Hof's avatar
Peter van 't Hof committed
187
              if (!dictOke) Logging.addError(
Peter van 't Hof's avatar
Peter van 't Hof committed
188
                "Sequence dictionary in the bam file is not the same as the reference, file: " + bamFile)
189

Peter van 't Hof's avatar
Peter van 't Hof committed
190
              if (!readGroupOke && correctReadgroups) {
191
                logger.info("Correcting readgroups, file:" + inputBam.get)
Peter van 't Hof's avatar
Peter van 't Hof committed
192 193 194 195 196 197
                val aorrg = AddOrReplaceReadGroups(qscript, inputBam.get, bamFile.get)
                aorrg.RGID = config("rgid", default = s"$sampleId-$libId")
                aorrg.RGLB = libId
                aorrg.RGSM = sampleId
                aorrg.RGPL = config("rgpl", default = "unknown")
                aorrg.RGPU = config("rgpu", default = "na")
Peter van 't Hof's avatar
Peter van 't Hof committed
198
                aorrg.isIntermediate = libraries.size > 1
199
                qscript.add(aorrg)
200
              }
Peter van 't Hof's avatar
Peter van 't Hof committed
201
            } else add(Ln.linkBamFile(qscript, inputBam.get, bamFile.get): _*)
202 203 204 205 206 207

            val bamMetrics = new BamMetrics(qscript)
            bamMetrics.sampleId = Some(sampleId)
            bamMetrics.libId = Some(libId)
            bamMetrics.inputBam = bamFile.get
            bamMetrics.outputDir = new File(libDir, "metrics")
Peter van 't Hof's avatar
Peter van 't Hof committed
208
            add(bamMetrics)
209 210

            if (config("execute_bam2wig", default = true)) add(Bam2Wig(qscript, bamFile.get))
211 212 213 214 215
          }
        } else logger.warn(s"Sample '$sampleId' does not have any input files")
      }
    }

216
    /** By default the bams files are put in the summary, more files can be added here */
Peter van 't Hof's avatar
Peter van 't Hof committed
217
    def summaryFiles: Map[String, File] = (bamFile.map("output_bam" -> _) ::
bow's avatar
bow committed
218
      preProcessBam.map("output_bam_preprocess" -> _) :: Nil).flatten.toMap
219 220 221

    def summaryStats: Map[String, Any] = Map()

222
    /** This is the merged bam file, None if the merged bam file is NA */
Peter van 't Hof's avatar
Peter van 't Hof committed
223
    def bamFile: Option[File] = if (libraries.flatMap(_._2.bamFile).nonEmpty &&
Peter van 't Hof's avatar
Peter van 't Hof committed
224 225 226 227
      mergeStrategy != MultisampleMapping.MergeStrategy.None)
      Some(new File(sampleDir, s"$sampleId.bam"))
    else None

228
    /** By default the preProcessBam is the same as the normal bamFile. A pipeline can extend this is there are preprocess steps */
Peter van 't Hof's avatar
Peter van 't Hof committed
229
    def preProcessBam: Option[File] = bamFile
Peter van 't Hof's avatar
Peter van 't Hof committed
230

231
    /** Default is set to keep the merged files, user can set this in the config. To change the default this method can be overriden */
Peter van 't Hof's avatar
Peter van 't Hof committed
232 233
    def keepMergedFiles: Boolean = config("keep_merged_files", default = true)

Peter van 't Hof's avatar
Peter van 't Hof committed
234
    /**
Peter van 't Hof's avatar
Peter van 't Hof committed
235 236
     * @deprecated
     */
Peter van 't Hof's avatar
Peter van 't Hof committed
237 238 239 240 241
    lazy val unmappedToGears: Boolean = config("unmapped_to_gears", default = false)
    if (config.contains("unmapped_to_gears")) logger.warn("Config value 'unmapped_to_gears' is replaced with 'mapping_to_gears', Assumes default: mapping_to_gears=unmapped")

    lazy val mappingToGears: String = config("mapping_to_gears", default = if (unmappedToGears) "unmapped" else "none")

242
    /** This method can be extended to add jobs to the pipeline, to do this the super call of this function must be called by the pipelines */
Peter van 't Hof's avatar
Peter van 't Hof committed
243
    def addJobs(): Unit = {
244
      addPerLibJobs() // This add jobs for each library
Peter van 't Hof's avatar
Peter van 't Hof committed
245 246

      mergeStrategy match {
Peter van 't Hof's avatar
Peter van 't Hof committed
247 248
        case MergeStrategy.None =>
        case (MergeStrategy.MergeSam | MergeStrategy.MarkDuplicates) if libraries.flatMap(_._2.bamFile).size == 1 =>
Peter van 't Hof's avatar
Peter van 't Hof committed
249
          add(Ln.linkBamFile(qscript, libraries.flatMap(_._2.bamFile).head, bamFile.get): _*)
Peter van 't Hof's avatar
Peter van 't Hof committed
250
        case (MergeStrategy.PreProcessMergeSam | MergeStrategy.PreProcessMarkDuplicates) if libraries.flatMap(_._2.preProcessBam).size == 1 =>
Peter van 't Hof's avatar
Peter van 't Hof committed
251
          add(Ln.linkBamFile(qscript, libraries.flatMap(_._2.preProcessBam).head, bamFile.get): _*)
Peter van 't Hof's avatar
Peter van 't Hof committed
252
        case MergeStrategy.MergeSam =>
Peter van 't Hof's avatar
Peter van 't Hof committed
253
          add(MergeSamFiles(qscript, libraries.flatMap(_._2.bamFile).toList, bamFile.get, isIntermediate = !keepMergedFiles))
Peter van 't Hof's avatar
Peter van 't Hof committed
254
        case MergeStrategy.PreProcessMergeSam =>
Peter van 't Hof's avatar
Peter van 't Hof committed
255
          add(MergeSamFiles(qscript, libraries.flatMap(_._2.preProcessBam).toList, bamFile.get, isIntermediate = !keepMergedFiles))
Peter van 't Hof's avatar
Peter van 't Hof committed
256
        case MergeStrategy.MarkDuplicates =>
Peter van 't Hof's avatar
Peter van 't Hof committed
257
          add(MarkDuplicates(qscript, libraries.flatMap(_._2.bamFile).toList, bamFile.get, isIntermediate = !keepMergedFiles))
Peter van 't Hof's avatar
Peter van 't Hof committed
258
        case MergeStrategy.PreProcessMarkDuplicates =>
Peter van 't Hof's avatar
Peter van 't Hof committed
259
          add(MarkDuplicates(qscript, libraries.flatMap(_._2.preProcessBam).toList, bamFile.get, isIntermediate = !keepMergedFiles))
Peter van 't Hof's avatar
Peter van 't Hof committed
260 261 262
        case _ => throw new IllegalStateException("This should not be possible, unimplemented MergeStrategy?")
      }

263
      if (mergeStrategy != MergeStrategy.None && libraries.flatMap(_._2.bamFile).nonEmpty) {
Peter van 't Hof's avatar
Peter van 't Hof committed
264 265 266 267 268
        val bamMetrics = new BamMetrics(qscript)
        bamMetrics.sampleId = Some(sampleId)
        bamMetrics.inputBam = preProcessBam.get
        bamMetrics.outputDir = new File(sampleDir, "metrics")
        add(bamMetrics)
269 270

        if (config("execute_bam2wig", default = true)) add(Bam2Wig(qscript, preProcessBam.get))
Peter van 't Hof's avatar
Peter van 't Hof committed
271
      }
272

Peter van 't Hof's avatar
Peter van 't Hof committed
273 274 275 276 277 278 279 280 281
      mappingToGears match {
        case "unmapped" =>
          val gears = new GearsSingle(qscript)
          gears.bamFile = preProcessBam
          gears.sampleId = Some(sampleId)
          gears.outputDir = new File(sampleDir, "gears")
          add(gears)
        case "all" =>
          val gears = new GearsSingle(qscript)
Peter van 't Hof's avatar
Peter van 't Hof committed
282 283
          gears.fastqR1 = libraries.flatMap(_._2.qcFastqR1).toList
          gears.fastqR2 = libraries.flatMap(_._2.qcFastqR2).toList
Peter van 't Hof's avatar
Peter van 't Hof committed
284 285 286 287
          gears.sampleId = Some(sampleId)
          gears.outputDir = new File(sampleDir, "gears")
          add(gears)
        case "none" =>
Peter van 't Hof's avatar
Peter van 't Hof committed
288
        case x      => Logging.addError(s"$x is not a valid value for 'mapping_to_gears'")
289
      }
290 291 292
    }
  }
}
Peter van 't Hof's avatar
Peter van 't Hof committed
293

294
/** This class is the default implementation that can be used on the command line */
Peter van 't Hof's avatar
Peter van 't Hof committed
295 296
class MultisampleMapping(val root: Configurable) extends QScript with MultisampleMappingTrait {
  def this() = this(null)
Peter van 't Hof's avatar
Peter van 't Hof committed
297 298

  def summaryFile: File = new File(outputDir, "MultisamplePipeline.summary.json")
Peter van 't Hof's avatar
Peter van 't Hof committed
299
}
300 301 302 303

object MultisampleMapping extends PipelineCommand {

  object MergeStrategy extends Enumeration {
Peter van 't Hof's avatar
Peter van 't Hof committed
304
    val None, MergeSam, MarkDuplicates, PreProcessMergeSam, PreProcessMarkDuplicates = Value
305 306
  }

307
  /** When file is not absolute an error is raise att the end of the script of a pipeline */
Peter van 't Hof's avatar
Peter van 't Hof committed
308
  def fileMustBeAbsolute(file: Option[File]): Option[File] = {
309 310 311 312 313 314 315 316
    if (file.forall(_.isAbsolute)) file
    else {
      Logging.addError(s"$file should be a absolute file path")
      file.map(_.getAbsoluteFile)
    }
  }

}