MultisampleMapping.scala 14.5 KB
Newer Older
Peter van 't Hof's avatar
Peter van 't Hof committed
1
2
3
4
5
6
7
8
9
10
/**
 * Biopet is built on top of GATK Queue for building bioinformatic
 * pipelines. It is mainly intended to support LUMC SHARK cluster which is running
 * SGE. But other types of HPC that are supported by GATK Queue (such as PBS)
 * should also be able to execute Biopet tools and pipelines.
 *
 * Copyright 2014 Sequencing Analysis Support Core - Leiden University Medical Center
 *
 * Contact us at: sasc@lumc.nl
 *
11
 * A dual licensing mode is applied. The source code within this project is freely available for non-commercial use under an AGPL
Peter van 't Hof's avatar
Peter van 't Hof committed
12
13
14
 * license; For commercial users or users who do not want to follow the AGPL
 * license, please contact us to obtain a separate license.
 */
15
16
17
18
19
package nl.lumc.sasc.biopet.pipelines.mapping

import java.io.File

import htsjdk.samtools.SamReaderFactory
20
import htsjdk.samtools.reference.FastaSequenceFile
Peter van 't Hof's avatar
Peter van 't Hof committed
21
import nl.lumc.sasc.biopet.core.report.ReportBuilderExtension
Peter van 't Hof's avatar
Peter van 't Hof committed
22
import nl.lumc.sasc.biopet.core.{ PipelineCommand, Reference, MultiSampleQScript }
23
import nl.lumc.sasc.biopet.extensions.Ln
24
import nl.lumc.sasc.biopet.extensions.picard._
25
import nl.lumc.sasc.biopet.pipelines.bammetrics.BamMetrics
26
import nl.lumc.sasc.biopet.pipelines.bamtobigwig.Bam2Wig
27
import nl.lumc.sasc.biopet.pipelines.gears.GearsSingle
28
import nl.lumc.sasc.biopet.utils.Logging
Peter van 't Hof's avatar
Peter van 't Hof committed
29
import nl.lumc.sasc.biopet.utils.config.Configurable
30
31
import org.broadinstitute.gatk.queue.QScript

Peter van 't Hof's avatar
Peter van 't Hof committed
32
33
import MultisampleMapping.MergeStrategy

34
35
36
import scala.collection.JavaConversions._

/**
Peter van 't Hof's avatar
Peter van 't Hof committed
37
 * Created by pjvanthof on 18/12/15.
Peter van 't Hof's avatar
Peter van 't Hof committed
38
39
 *
 * This trait is meant to extend pipelines from that require a alignment step
Peter van 't Hof's avatar
Peter van 't Hof committed
40
 */
Peter van 't Hof's avatar
Peter van 't Hof committed
41
42
trait MultisampleMappingTrait extends MultiSampleQScript
  with Reference { qscript: QScript =>
43

44
  /** With this method the merge strategy for libraries to samples is defined. This can be overriden to fix the merge strategy. */
Peter van 't Hof's avatar
Peter van 't Hof committed
45
  def mergeStrategy: MergeStrategy.Value = {
Peter van 't Hof's avatar
Peter van 't Hof committed
46
    val value: String = config("merge_strategy", default = "preprocessmarkduplicates")
Peter van 't Hof's avatar
Peter van 't Hof committed
47
    MergeStrategy.values.find(_.toString.toLowerCase == value.toLowerCase) match {
Peter van 't Hof's avatar
Peter van 't Hof committed
48
      case Some(v) => v
Peter van 't Hof's avatar
Peter van 't Hof committed
49
      case _       => throw new IllegalArgumentException(s"merge_strategy '$value' does not exist")
Peter van 't Hof's avatar
Peter van 't Hof committed
50
51
52
    }
  }

53
  def init(): Unit = {
54
55
  }

56
  /** Is there are jobs that needs to be added before the rest of the jobs this methods can be overriden, to let the sample jobs this work the super call should be done also */
57
  def biopetScript(): Unit = {
Peter van 't Hof's avatar
Peter van 't Hof committed
58
    addSamplesJobs()
Peter van 't Hof's avatar
Peter van 't Hof committed
59
    addSummaryJobs()
60
61
  }

62
  /** This is de default multisample mapping report, this can be extended by other pipelines */
Peter van 't Hof's avatar
Peter van 't Hof committed
63
  override def reportClass: Option[ReportBuilderExtension] = {
64
65
    val report = new MultisampleMappingReport(this)
    report.outputDir = new File(outputDir, "report")
66
    report.summaryDbFile = summaryDbFile
67
68
69
    Some(report)
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
70
71
72
73
  override def defaults: Map[String, Any] = super.defaults ++ Map(
    "reordersam" -> Map("allow_incomplete_dict_concordance" -> true),
    "gears" -> Map("skip_flexiprep" -> true)
  )
74

Peter van 't Hof's avatar
Peter van 't Hof committed
75
  override def fixedValues: Map[String, Any] = super.fixedValues ++ Map("gearssingle" -> Map("skip_flexiprep" -> true))
Peter van 't Hof's avatar
Peter van 't Hof committed
76

77
  /** In a default multisample mapping run there are no multsample jobs. This method can be overriden by other pipelines */
78
  def addMultiSampleJobs(): Unit = {
79
80
81
    // this code will be executed after all code of all samples is executed
  }

82
  /** By default only the reference is put in the summary, when extending pippeline specific files can be added */
83
  def summaryFiles: Map[String, File] = Map("referenceFasta" -> referenceFasta())
84

85
  /** By default only the reference is put in the summary, when extending pippeline specific settings can be added */
86
87
88
  def summarySettings: Map[String, Any] = Map(
    "reference" -> referenceSummary,
    "merge_strategy" -> mergeStrategy.toString)
89
90

  def makeSample(id: String) = new Sample(id)
Peter van 't Hof's avatar
Peter van 't Hof committed
91
  class Sample(sampleId: String) extends AbstractSample(sampleId) { sample =>
92

93
94
    def metricsPreprogressBam = true

95
    def makeLibrary(id: String) = new Library(id)
Peter van 't Hof's avatar
Peter van 't Hof committed
96
    class Library(libId: String) extends AbstractLibrary(libId) { lib =>
97
98

      /** By default the bams files are put in the summary, more files can be added here */
Peter van 't Hof's avatar
Peter van 't Hof committed
99
      def summaryFiles: Map[String, File] = (inputR1.map("input_R1" -> _) :: inputR2.map("input_R2" -> _) ::
Peter van 't Hof's avatar
Peter van 't Hof committed
100
        inputBam.map("input_bam" -> _) :: bamFile.map("output_bam" -> _) ::
bow's avatar
bow committed
101
        preProcessBam.map("output_bam_preprocess" -> _) :: Nil).flatten.toMap
102
103
104

      def summaryStats: Map[String, Any] = Map()

Peter van 't Hof's avatar
Peter van 't Hof committed
105
106
      lazy val inputR1: Option[File] = MultisampleMapping.fileMustBeAbsolute(config("R1"))
      lazy val inputR2: Option[File] = MultisampleMapping.fileMustBeAbsolute(config("R2"))
Peter van 't Hof's avatar
Peter van 't Hof committed
107
108
      lazy val qcFastqR1 = mapping.map(_.flexiprep.fastqR1Qc)
      lazy val qcFastqR2 = mapping.flatMap(_.flexiprep.fastqR2Qc)
Peter van 't Hof's avatar
Peter van 't Hof committed
109
      lazy val inputBam: Option[File] = MultisampleMapping.fileMustBeAbsolute(if (inputR1.isEmpty) config("bam") else None)
110
111
112
      lazy val bamToFastq: Boolean = config("bam_to_fastq", default = false)
      lazy val correctReadgroups: Boolean = config("correct_readgroups", default = false)

Peter van 't Hof's avatar
Peter van 't Hof committed
113
      def keepFinalBamfile: Boolean = samples(sampleId).libraries.size == 1
Peter van 't Hof's avatar
Peter van 't Hof committed
114

Peter van 't Hof's avatar
Peter van 't Hof committed
115
116
      lazy val mapping: Option[Mapping] = if (inputR1.isDefined || (inputBam.isDefined && bamToFastq)) {
        val m: Mapping = new Mapping(qscript) {
117
118
          override def configNamespace = "mapping"
          override def defaults: Map[String, Any] = super.defaults ++
Peter van 't Hof's avatar
Peter van 't Hof committed
119
            Map("keep_final_bamfile" -> keepFinalBamfile)
120
        }
121
122
        m.sampleId = Some(sampleId)
        m.libId = Some(libId)
Peter van 't Hof's avatar
Peter van 't Hof committed
123
        m.outputDir = libDir
124
125
126
        Some(m)
      } else None

Peter van 't Hof's avatar
Peter van 't Hof committed
127
      def bamFile: Option[File] = mapping match {
Peter van 't Hof's avatar
Peter van 't Hof committed
128
        case Some(m)                 => Some(m.finalBamFile)
129
        case _ if inputBam.isDefined => Some(new File(libDir, s"$sampleId-$libId.bam"))
Peter van 't Hof's avatar
Peter van 't Hof committed
130
        case _                       => None
131
132
      }

133
      /** By default the preProcessBam is the same as the normal bamFile. A pipeline can extend this is there are preprocess steps */
Peter van 't Hof's avatar
Peter van 't Hof committed
134
      def preProcessBam: Option[File] = bamFile
Peter van 't Hof's avatar
Peter van 't Hof committed
135

136
      /** This method can be extended to add jobs to the pipeline, to do this the super call of this function must be called by the pipelines */
137
      def addJobs(): Unit = {
Peter van 't Hof's avatar
Peter van 't Hof committed
138
139
140
141
        inputR1.foreach(inputFiles :+= new InputFile(_, config("R1_md5")))
        inputR2.foreach(inputFiles :+= new InputFile(_, config("R2_md5")))
        inputBam.foreach(inputFiles :+= new InputFile(_, config("bam_md5")))

142
143
        if (inputR1.isDefined) {
          mapping.foreach { m =>
Sander van der Zeeuw's avatar
Sander van der Zeeuw committed
144
145
            m.inputR1 = inputR1.get
            m.inputR2 = inputR2
146
147
148
149
150
151
152
            add(m)
          }
        } else if (inputBam.isDefined) {
          if (bamToFastq) {
            val samToFastq = SamToFastq(qscript, inputBam.get,
              new File(libDir, sampleId + "-" + libId + ".R1.fq.gz"),
              new File(libDir, sampleId + "-" + libId + ".R2.fq.gz"))
Peter van 't Hof's avatar
Peter van 't Hof committed
153
            samToFastq.isIntermediate = libraries.size > 1
154
155
            qscript.add(samToFastq)
            mapping.foreach(m => {
Sander van der Zeeuw's avatar
Sander van der Zeeuw committed
156
157
              m.inputR1 = samToFastq.fastqR1
              m.inputR2 = Some(samToFastq.fastqR2)
158
159
160
161
              add(m)
            })
          } else {
            val inputSam = SamReaderFactory.makeDefault.open(inputBam.get)
162
163
164
            val header = inputSam.getFileHeader
            val readGroups = header.getReadGroups
            val referenceFile = new FastaSequenceFile(referenceFasta(), true)
Peter van 't Hof's avatar
Peter van 't Hof committed
165
166
167
            val dictOke: Boolean = {
              var oke = true
              try {
Peter van 't Hof's avatar
Peter van 't Hof committed
168
                header.getSequenceDictionary.assertSameDictionary(referenceDict)
Peter van 't Hof's avatar
Peter van 't Hof committed
169
              } catch {
Peter van 't Hof's avatar
Peter van 't Hof committed
170
                case e: AssertionError =>
Peter van 't Hof's avatar
Peter van 't Hof committed
171
                  logger.error(e.getMessage)
Peter van 't Hof's avatar
Peter van 't Hof committed
172
173
174
175
                  oke = false
              }
              oke
            }
176
177
            inputSam.close()
            referenceFile.close()
178
179
180
181
182

            val readGroupOke = readGroups.forall(readGroup => {
              if (readGroup.getSample != sampleId) logger.warn("Sample ID readgroup in bam file is not the same")
              if (readGroup.getLibrary != libId) logger.warn("Library ID readgroup in bam file is not the same")
              readGroup.getSample == sampleId && readGroup.getLibrary == libId
183
            }) && readGroups.nonEmpty
184

185
186
187
            if (!readGroupOke || !dictOke) {
              if (!readGroupOke && !correctReadgroups) Logging.addError(
                "Sample readgroup and/or library of input bamfile is not correct, file: " + bamFile +
Peter van 't Hof's avatar
Peter van 't Hof committed
188
                  "\nPlease note that it is possible to set 'correct_readgroups' to true in the config to automatic fix this")
Peter van 't Hof's avatar
Peter van 't Hof committed
189
              if (!dictOke) Logging.addError(
Peter van 't Hof's avatar
Peter van 't Hof committed
190
                "Sequence dictionary in the bam file is not the same as the reference, file: " + bamFile)
191

Peter van 't Hof's avatar
Peter van 't Hof committed
192
              if (!readGroupOke && correctReadgroups) {
193
                logger.info("Correcting readgroups, file:" + inputBam.get)
Peter van 't Hof's avatar
Peter van 't Hof committed
194
195
196
197
198
199
                val aorrg = AddOrReplaceReadGroups(qscript, inputBam.get, bamFile.get)
                aorrg.RGID = config("rgid", default = s"$sampleId-$libId")
                aorrg.RGLB = libId
                aorrg.RGSM = sampleId
                aorrg.RGPL = config("rgpl", default = "unknown")
                aorrg.RGPU = config("rgpu", default = "na")
Peter van 't Hof's avatar
Peter van 't Hof committed
200
                aorrg.isIntermediate = libraries.size > 1
201
                qscript.add(aorrg)
202
              }
Peter van 't Hof's avatar
Peter van 't Hof committed
203
            } else add(Ln.linkBamFile(qscript, inputBam.get, bamFile.get): _*)
204
205
206
207
208
209

            val bamMetrics = new BamMetrics(qscript)
            bamMetrics.sampleId = Some(sampleId)
            bamMetrics.libId = Some(libId)
            bamMetrics.inputBam = bamFile.get
            bamMetrics.outputDir = new File(libDir, "metrics")
Peter van 't Hof's avatar
Peter van 't Hof committed
210
            add(bamMetrics)
211
212

            if (config("execute_bam2wig", default = true)) add(Bam2Wig(qscript, bamFile.get))
213
214
215
216
217
          }
        } else logger.warn(s"Sample '$sampleId' does not have any input files")
      }
    }

218
    /** By default the bams files are put in the summary, more files can be added here */
Peter van 't Hof's avatar
Peter van 't Hof committed
219
    def summaryFiles: Map[String, File] = (bamFile.map("output_bam" -> _) ::
bow's avatar
bow committed
220
      preProcessBam.map("output_bam_preprocess" -> _) :: Nil).flatten.toMap
221
222
223

    def summaryStats: Map[String, Any] = Map()

224
    /** This is the merged bam file, None if the merged bam file is NA */
Peter van 't Hof's avatar
Peter van 't Hof committed
225
    def bamFile: Option[File] = if (libraries.flatMap(_._2.bamFile).nonEmpty &&
Peter van 't Hof's avatar
Peter van 't Hof committed
226
227
228
229
      mergeStrategy != MultisampleMapping.MergeStrategy.None)
      Some(new File(sampleDir, s"$sampleId.bam"))
    else None

230
    /** By default the preProcessBam is the same as the normal bamFile. A pipeline can extend this is there are preprocess steps */
Peter van 't Hof's avatar
Peter van 't Hof committed
231
    def preProcessBam: Option[File] = bamFile
Peter van 't Hof's avatar
Peter van 't Hof committed
232

233
    /** Default is set to keep the merged files, user can set this in the config. To change the default this method can be overriden */
Peter van 't Hof's avatar
Peter van 't Hof committed
234
235
    def keepMergedFiles: Boolean = config("keep_merged_files", default = true)

Peter van 't Hof's avatar
Peter van 't Hof committed
236
    /**
Peter van 't Hof's avatar
Peter van 't Hof committed
237
238
     * @deprecated
     */
Peter van 't Hof's avatar
Peter van 't Hof committed
239
240
241
242
243
    lazy val unmappedToGears: Boolean = config("unmapped_to_gears", default = false)
    if (config.contains("unmapped_to_gears")) logger.warn("Config value 'unmapped_to_gears' is replaced with 'mapping_to_gears', Assumes default: mapping_to_gears=unmapped")

    lazy val mappingToGears: String = config("mapping_to_gears", default = if (unmappedToGears) "unmapped" else "none")

244
    /** This method can be extended to add jobs to the pipeline, to do this the super call of this function must be called by the pipelines */
Peter van 't Hof's avatar
Peter van 't Hof committed
245
    def addJobs(): Unit = {
246
      addPerLibJobs() // This add jobs for each library
Peter van 't Hof's avatar
Peter van 't Hof committed
247
248

      mergeStrategy match {
Peter van 't Hof's avatar
Peter van 't Hof committed
249
250
        case MergeStrategy.None =>
        case (MergeStrategy.MergeSam | MergeStrategy.MarkDuplicates) if libraries.flatMap(_._2.bamFile).size == 1 =>
Peter van 't Hof's avatar
Peter van 't Hof committed
251
          add(Ln.linkBamFile(qscript, libraries.flatMap(_._2.bamFile).head, bamFile.get): _*)
Peter van 't Hof's avatar
Peter van 't Hof committed
252
        case (MergeStrategy.PreProcessMergeSam | MergeStrategy.PreProcessMarkDuplicates) if libraries.flatMap(_._2.preProcessBam).size == 1 =>
Peter van 't Hof's avatar
Peter van 't Hof committed
253
          add(Ln.linkBamFile(qscript, libraries.flatMap(_._2.preProcessBam).head, bamFile.get): _*)
Peter van 't Hof's avatar
Peter van 't Hof committed
254
        case MergeStrategy.MergeSam =>
Peter van 't Hof's avatar
Peter van 't Hof committed
255
          add(MergeSamFiles(qscript, libraries.flatMap(_._2.bamFile).toList, bamFile.get, isIntermediate = !keepMergedFiles))
Peter van 't Hof's avatar
Peter van 't Hof committed
256
        case MergeStrategy.PreProcessMergeSam =>
Peter van 't Hof's avatar
Peter van 't Hof committed
257
          add(MergeSamFiles(qscript, libraries.flatMap(_._2.preProcessBam).toList, bamFile.get, isIntermediate = !keepMergedFiles))
Peter van 't Hof's avatar
Peter van 't Hof committed
258
        case MergeStrategy.MarkDuplicates =>
Peter van 't Hof's avatar
Peter van 't Hof committed
259
          add(MarkDuplicates(qscript, libraries.flatMap(_._2.bamFile).toList, bamFile.get, isIntermediate = !keepMergedFiles))
Peter van 't Hof's avatar
Peter van 't Hof committed
260
        case MergeStrategy.PreProcessMarkDuplicates =>
Peter van 't Hof's avatar
Peter van 't Hof committed
261
          add(MarkDuplicates(qscript, libraries.flatMap(_._2.preProcessBam).toList, bamFile.get, isIntermediate = !keepMergedFiles))
Peter van 't Hof's avatar
Peter van 't Hof committed
262
263
264
        case _ => throw new IllegalStateException("This should not be possible, unimplemented MergeStrategy?")
      }

265
      if (mergeStrategy != MergeStrategy.None && libraries.flatMap(_._2.bamFile).nonEmpty) {
Peter van 't Hof's avatar
Peter van 't Hof committed
266
267
        val bamMetrics = new BamMetrics(qscript)
        bamMetrics.sampleId = Some(sampleId)
268
        bamMetrics.inputBam = if (metricsPreprogressBam) preProcessBam.get else bamFile.get
Peter van 't Hof's avatar
Peter van 't Hof committed
269
270
        bamMetrics.outputDir = new File(sampleDir, "metrics")
        add(bamMetrics)
271
272

        if (config("execute_bam2wig", default = true)) add(Bam2Wig(qscript, preProcessBam.get))
Peter van 't Hof's avatar
Peter van 't Hof committed
273
      }
274

Peter van 't Hof's avatar
Peter van 't Hof committed
275
276
277
278
279
280
281
282
283
      mappingToGears match {
        case "unmapped" =>
          val gears = new GearsSingle(qscript)
          gears.bamFile = preProcessBam
          gears.sampleId = Some(sampleId)
          gears.outputDir = new File(sampleDir, "gears")
          add(gears)
        case "all" =>
          val gears = new GearsSingle(qscript)
Peter van 't Hof's avatar
Peter van 't Hof committed
284
285
          gears.fastqR1 = libraries.flatMap(_._2.qcFastqR1).toList
          gears.fastqR2 = libraries.flatMap(_._2.qcFastqR2).toList
Peter van 't Hof's avatar
Peter van 't Hof committed
286
287
288
289
          gears.sampleId = Some(sampleId)
          gears.outputDir = new File(sampleDir, "gears")
          add(gears)
        case "none" =>
Peter van 't Hof's avatar
Peter van 't Hof committed
290
        case x      => Logging.addError(s"$x is not a valid value for 'mapping_to_gears'")
291
      }
292
293
294
    }
  }
}
Peter van 't Hof's avatar
Peter van 't Hof committed
295

296
/** This class is the default implementation that can be used on the command line */
Peter van 't Hof's avatar
Peter van 't Hof committed
297
class MultisampleMapping(val parent: Configurable) extends QScript with MultisampleMappingTrait {
Peter van 't Hof's avatar
Peter van 't Hof committed
298
  def this() = this(null)
Peter van 't Hof's avatar
Peter van 't Hof committed
299
300

  def summaryFile: File = new File(outputDir, "MultisamplePipeline.summary.json")
Peter van 't Hof's avatar
Peter van 't Hof committed
301
}
302
303
304
305

object MultisampleMapping extends PipelineCommand {

  object MergeStrategy extends Enumeration {
Peter van 't Hof's avatar
Peter van 't Hof committed
306
    val None, MergeSam, MarkDuplicates, PreProcessMergeSam, PreProcessMarkDuplicates = Value
307
308
  }

309
  /** When file is not absolute an error is raise att the end of the script of a pipeline */
Peter van 't Hof's avatar
Peter van 't Hof committed
310
  def fileMustBeAbsolute(file: Option[File]): Option[File] = {
311
312
313
314
315
316
317
318
    if (file.forall(_.isAbsolute)) file
    else {
      Logging.addError(s"$file should be a absolute file path")
      file.map(_.getAbsoluteFile)
    }
  }

}