MultisampleMapping.scala 14.3 KB
Newer Older
Peter van 't Hof's avatar
Peter van 't Hof committed
1
2
3
4
5
6
7
8
9
10
/**
 * Biopet is built on top of GATK Queue for building bioinformatic
 * pipelines. It is mainly intended to support LUMC SHARK cluster which is running
 * SGE. But other types of HPC that are supported by GATK Queue (such as PBS)
 * should also be able to execute Biopet tools and pipelines.
 *
 * Copyright 2014 Sequencing Analysis Support Core - Leiden University Medical Center
 *
 * Contact us at: sasc@lumc.nl
 *
11
 * A dual licensing mode is applied. The source code within this project is freely available for non-commercial use under an AGPL
Peter van 't Hof's avatar
Peter van 't Hof committed
12
13
14
 * license; For commercial users or users who do not want to follow the AGPL
 * license, please contact us to obtain a separate license.
 */
15
16
17
18
19
package nl.lumc.sasc.biopet.pipelines.mapping

import java.io.File

import htsjdk.samtools.SamReaderFactory
20
import htsjdk.samtools.reference.FastaSequenceFile
Peter van 't Hof's avatar
Peter van 't Hof committed
21
import nl.lumc.sasc.biopet.core.report.ReportBuilderExtension
Peter van 't Hof's avatar
Peter van 't Hof committed
22
import nl.lumc.sasc.biopet.core.{ PipelineCommand, Reference, MultiSampleQScript }
23
import nl.lumc.sasc.biopet.extensions.Ln
24
import nl.lumc.sasc.biopet.extensions.picard._
25
import nl.lumc.sasc.biopet.pipelines.bammetrics.BamMetrics
26
import nl.lumc.sasc.biopet.pipelines.bamtobigwig.Bam2Wig
27
import nl.lumc.sasc.biopet.pipelines.gears.GearsSingle
28
import nl.lumc.sasc.biopet.utils.Logging
Peter van 't Hof's avatar
Peter van 't Hof committed
29
import nl.lumc.sasc.biopet.utils.config.Configurable
30
31
import org.broadinstitute.gatk.queue.QScript

Peter van 't Hof's avatar
Peter van 't Hof committed
32
33
import MultisampleMapping.MergeStrategy

34
35
36
import scala.collection.JavaConversions._

/**
Peter van 't Hof's avatar
Peter van 't Hof committed
37
 * Created by pjvanthof on 18/12/15.
Peter van 't Hof's avatar
Peter van 't Hof committed
38
39
 *
 * This trait is meant to extend pipelines from that require a alignment step
Peter van 't Hof's avatar
Peter van 't Hof committed
40
 */
Peter van 't Hof's avatar
Peter van 't Hof committed
41
42
trait MultisampleMappingTrait extends MultiSampleQScript
  with Reference { qscript: QScript =>
43

44
  /** With this method the merge strategy for libraries to samples is defined. This can be overriden to fix the merge strategy. */
Peter van 't Hof's avatar
Peter van 't Hof committed
45
  def mergeStrategy: MergeStrategy.Value = {
Peter van 't Hof's avatar
Peter van 't Hof committed
46
    val value: String = config("merge_strategy", default = "preprocessmarkduplicates")
Peter van 't Hof's avatar
Peter van 't Hof committed
47
    MergeStrategy.values.find(_.toString.toLowerCase == value.toLowerCase) match {
Peter van 't Hof's avatar
Peter van 't Hof committed
48
      case Some(v) => v
Peter van 't Hof's avatar
Peter van 't Hof committed
49
      case _       => throw new IllegalArgumentException(s"merge_strategy '$value' does not exist")
Peter van 't Hof's avatar
Peter van 't Hof committed
50
51
52
    }
  }

53
  def init(): Unit = {
54
55
  }

56
  /** Is there are jobs that needs to be added before the rest of the jobs this methods can be overriden, to let the sample jobs this work the super call should be done also */
57
  def biopetScript(): Unit = {
Peter van 't Hof's avatar
Peter van 't Hof committed
58
    addSamplesJobs()
Peter van 't Hof's avatar
Peter van 't Hof committed
59
    addSummaryJobs()
60
61
  }

62
  /** This is de default multisample mapping report, this can be extended by other pipelines */
Peter van 't Hof's avatar
Peter van 't Hof committed
63
  override def reportClass: Option[ReportBuilderExtension] = {
64
65
66
67
68
69
    val report = new MultisampleMappingReport(this)
    report.outputDir = new File(outputDir, "report")
    report.summaryFile = summaryFile
    Some(report)
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
70
  override def defaults: Map[String, Any] = super.defaults ++ Map("reordersam" -> Map("allow_incomplete_dict_concordance" -> true))
71

Peter van 't Hof's avatar
Peter van 't Hof committed
72
  override def fixedValues: Map[String, Any] = super.fixedValues ++ Map("gearssingle" -> Map("skip_flexiprep" -> true))
Peter van 't Hof's avatar
Peter van 't Hof committed
73

74
  /** In a default multisample mapping run there are no multsample jobs. This method can be overriden by other pipelines */
75
  def addMultiSampleJobs(): Unit = {
76
77
78
    // this code will be executed after all code of all samples is executed
  }

79
  /** By default only the reference is put in the summary, when extending pippeline specific files can be added */
80
  def summaryFiles: Map[String, File] = Map("referenceFasta" -> referenceFasta())
81

82
  /** By default only the reference is put in the summary, when extending pippeline specific settings can be added */
83
84
85
  def summarySettings: Map[String, Any] = Map(
    "reference" -> referenceSummary,
    "merge_strategy" -> mergeStrategy.toString)
86
87

  def makeSample(id: String) = new Sample(id)
Peter van 't Hof's avatar
Peter van 't Hof committed
88
  class Sample(sampleId: String) extends AbstractSample(sampleId) { sample =>
89
90

    def makeLibrary(id: String) = new Library(id)
Peter van 't Hof's avatar
Peter van 't Hof committed
91
    class Library(libId: String) extends AbstractLibrary(libId) { lib =>
92
93

      /** By default the bams files are put in the summary, more files can be added here */
Peter van 't Hof's avatar
Peter van 't Hof committed
94
      def summaryFiles: Map[String, File] = (inputR1.map("input_R1" -> _) :: inputR2.map("input_R2" -> _) ::
Peter van 't Hof's avatar
Peter van 't Hof committed
95
        inputBam.map("input_bam" -> _) :: bamFile.map("output_bam" -> _) ::
bow's avatar
bow committed
96
        preProcessBam.map("output_bam_preprocess" -> _) :: Nil).flatten.toMap
97
98
99

      def summaryStats: Map[String, Any] = Map()

Peter van 't Hof's avatar
Peter van 't Hof committed
100
101
102
      lazy val inputR1: Option[File] = MultisampleMapping.fileMustBeAbsolute(config("R1"))
      lazy val inputR2: Option[File] = MultisampleMapping.fileMustBeAbsolute(config("R2"))
      lazy val inputBam: Option[File] = MultisampleMapping.fileMustBeAbsolute(if (inputR1.isEmpty) config("bam") else None)
103
104
105
      lazy val bamToFastq: Boolean = config("bam_to_fastq", default = false)
      lazy val correctReadgroups: Boolean = config("correct_readgroups", default = false)

Peter van 't Hof's avatar
Peter van 't Hof committed
106
      def keepFinalBamfile: Boolean = samples(sampleId).libraries.size == 1
Peter van 't Hof's avatar
Peter van 't Hof committed
107

Peter van 't Hof's avatar
Peter van 't Hof committed
108
109
      lazy val mapping: Option[Mapping] = if (inputR1.isDefined || (inputBam.isDefined && bamToFastq)) {
        val m: Mapping = new Mapping(qscript) {
110
111
          override def configNamespace = "mapping"
          override def defaults: Map[String, Any] = super.defaults ++
Peter van 't Hof's avatar
Peter van 't Hof committed
112
            Map("keep_final_bamfile" -> keepFinalBamfile)
113
        }
114
115
        m.sampleId = Some(sampleId)
        m.libId = Some(libId)
Peter van 't Hof's avatar
Peter van 't Hof committed
116
        m.outputDir = libDir
117
118
119
        Some(m)
      } else None

Peter van 't Hof's avatar
Peter van 't Hof committed
120
      def bamFile: Option[File] = mapping match {
Peter van 't Hof's avatar
Peter van 't Hof committed
121
        case Some(m)                 => Some(m.finalBamFile)
122
        case _ if inputBam.isDefined => Some(new File(libDir, s"$sampleId-$libId.bam"))
Peter van 't Hof's avatar
Peter van 't Hof committed
123
        case _                       => None
124
125
      }

126
      /** By default the preProcessBam is the same as the normal bamFile. A pipeline can extend this is there are preprocess steps */
Peter van 't Hof's avatar
Peter van 't Hof committed
127
      def preProcessBam: Option[File] = bamFile
Peter van 't Hof's avatar
Peter van 't Hof committed
128

129
      /** This method can be extended to add jobs to the pipeline, to do this the super call of this function must be called by the pipelines */
130
      def addJobs(): Unit = {
Peter van 't Hof's avatar
Peter van 't Hof committed
131
132
133
134
        inputR1.foreach(inputFiles :+= new InputFile(_, config("R1_md5")))
        inputR2.foreach(inputFiles :+= new InputFile(_, config("R2_md5")))
        inputBam.foreach(inputFiles :+= new InputFile(_, config("bam_md5")))

135
136
        if (inputR1.isDefined) {
          mapping.foreach { m =>
Sander van der Zeeuw's avatar
Sander van der Zeeuw committed
137
138
            m.inputR1 = inputR1.get
            m.inputR2 = inputR2
139
140
141
142
143
144
145
            add(m)
          }
        } else if (inputBam.isDefined) {
          if (bamToFastq) {
            val samToFastq = SamToFastq(qscript, inputBam.get,
              new File(libDir, sampleId + "-" + libId + ".R1.fq.gz"),
              new File(libDir, sampleId + "-" + libId + ".R2.fq.gz"))
Peter van 't Hof's avatar
Peter van 't Hof committed
146
            samToFastq.isIntermediate = libraries.size > 1
147
148
            qscript.add(samToFastq)
            mapping.foreach(m => {
Sander van der Zeeuw's avatar
Sander van der Zeeuw committed
149
150
              m.inputR1 = samToFastq.fastqR1
              m.inputR2 = Some(samToFastq.fastqR2)
151
152
153
154
              add(m)
            })
          } else {
            val inputSam = SamReaderFactory.makeDefault.open(inputBam.get)
155
156
157
            val header = inputSam.getFileHeader
            val readGroups = header.getReadGroups
            val referenceFile = new FastaSequenceFile(referenceFasta(), true)
Peter van 't Hof's avatar
Peter van 't Hof committed
158
159
160
            val dictOke: Boolean = {
              var oke = true
              try {
Peter van 't Hof's avatar
Peter van 't Hof committed
161
                header.getSequenceDictionary.assertSameDictionary(referenceDict)
Peter van 't Hof's avatar
Peter van 't Hof committed
162
              } catch {
Peter van 't Hof's avatar
Peter van 't Hof committed
163
                case e: AssertionError =>
Peter van 't Hof's avatar
Peter van 't Hof committed
164
                  logger.error(e.getMessage)
Peter van 't Hof's avatar
Peter van 't Hof committed
165
166
167
168
                  oke = false
              }
              oke
            }
169
170
            inputSam.close()
            referenceFile.close()
171
172
173
174
175

            val readGroupOke = readGroups.forall(readGroup => {
              if (readGroup.getSample != sampleId) logger.warn("Sample ID readgroup in bam file is not the same")
              if (readGroup.getLibrary != libId) logger.warn("Library ID readgroup in bam file is not the same")
              readGroup.getSample == sampleId && readGroup.getLibrary == libId
176
            }) && readGroups.nonEmpty
177

178
179
180
            if (!readGroupOke || !dictOke) {
              if (!readGroupOke && !correctReadgroups) Logging.addError(
                "Sample readgroup and/or library of input bamfile is not correct, file: " + bamFile +
Peter van 't Hof's avatar
Peter van 't Hof committed
181
                  "\nPlease note that it is possible to set 'correct_readgroups' to true in the config to automatic fix this")
Peter van 't Hof's avatar
Peter van 't Hof committed
182
              if (!dictOke) Logging.addError(
Peter van 't Hof's avatar
Peter van 't Hof committed
183
                "Sequence dictionary in the bam file is not the same as the reference, file: " + bamFile)
184

Peter van 't Hof's avatar
Peter van 't Hof committed
185
              if (!readGroupOke && correctReadgroups) {
186
                logger.info("Correcting readgroups, file:" + inputBam.get)
Peter van 't Hof's avatar
Peter van 't Hof committed
187
188
189
190
191
192
                val aorrg = AddOrReplaceReadGroups(qscript, inputBam.get, bamFile.get)
                aorrg.RGID = config("rgid", default = s"$sampleId-$libId")
                aorrg.RGLB = libId
                aorrg.RGSM = sampleId
                aorrg.RGPL = config("rgpl", default = "unknown")
                aorrg.RGPU = config("rgpu", default = "na")
Peter van 't Hof's avatar
Peter van 't Hof committed
193
                aorrg.isIntermediate = libraries.size > 1
194
                qscript.add(aorrg)
195
              }
Peter van 't Hof's avatar
Peter van 't Hof committed
196
            } else add(Ln.linkBamFile(qscript, inputBam.get, bamFile.get): _*)
197
198
199
200
201
202

            val bamMetrics = new BamMetrics(qscript)
            bamMetrics.sampleId = Some(sampleId)
            bamMetrics.libId = Some(libId)
            bamMetrics.inputBam = bamFile.get
            bamMetrics.outputDir = new File(libDir, "metrics")
Peter van 't Hof's avatar
Peter van 't Hof committed
203
            add(bamMetrics)
204
205

            if (config("execute_bam2wig", default = true)) add(Bam2Wig(qscript, bamFile.get))
206
207
208
209
210
          }
        } else logger.warn(s"Sample '$sampleId' does not have any input files")
      }
    }

211
    /** By default the bams files are put in the summary, more files can be added here */
Peter van 't Hof's avatar
Peter van 't Hof committed
212
    def summaryFiles: Map[String, File] = (bamFile.map("output_bam" -> _) ::
bow's avatar
bow committed
213
      preProcessBam.map("output_bam_preprocess" -> _) :: Nil).flatten.toMap
214
215
216

    def summaryStats: Map[String, Any] = Map()

217
    /** This is the merged bam file, None if the merged bam file is NA */
Peter van 't Hof's avatar
Peter van 't Hof committed
218
    def bamFile: Option[File] = if (libraries.flatMap(_._2.bamFile).nonEmpty &&
Peter van 't Hof's avatar
Peter van 't Hof committed
219
220
221
222
      mergeStrategy != MultisampleMapping.MergeStrategy.None)
      Some(new File(sampleDir, s"$sampleId.bam"))
    else None

223
    /** By default the preProcessBam is the same as the normal bamFile. A pipeline can extend this is there are preprocess steps */
Peter van 't Hof's avatar
Peter van 't Hof committed
224
    def preProcessBam: Option[File] = bamFile
Peter van 't Hof's avatar
Peter van 't Hof committed
225

226
    /** Default is set to keep the merged files, user can set this in the config. To change the default this method can be overriden */
Peter van 't Hof's avatar
Peter van 't Hof committed
227
228
    def keepMergedFiles: Boolean = config("keep_merged_files", default = true)

Peter van 't Hof's avatar
Peter van 't Hof committed
229
    /**
Peter van 't Hof's avatar
Peter van 't Hof committed
230
231
     * @deprecated
     */
Peter van 't Hof's avatar
Peter van 't Hof committed
232
233
234
235
236
    lazy val unmappedToGears: Boolean = config("unmapped_to_gears", default = false)
    if (config.contains("unmapped_to_gears")) logger.warn("Config value 'unmapped_to_gears' is replaced with 'mapping_to_gears', Assumes default: mapping_to_gears=unmapped")

    lazy val mappingToGears: String = config("mapping_to_gears", default = if (unmappedToGears) "unmapped" else "none")

237
    /** This method can be extended to add jobs to the pipeline, to do this the super call of this function must be called by the pipelines */
Peter van 't Hof's avatar
Peter van 't Hof committed
238
    def addJobs(): Unit = {
239
      addPerLibJobs() // This add jobs for each library
Peter van 't Hof's avatar
Peter van 't Hof committed
240
241

      mergeStrategy match {
Peter van 't Hof's avatar
Peter van 't Hof committed
242
243
        case MergeStrategy.None =>
        case (MergeStrategy.MergeSam | MergeStrategy.MarkDuplicates) if libraries.flatMap(_._2.bamFile).size == 1 =>
Peter van 't Hof's avatar
Peter van 't Hof committed
244
          add(Ln.linkBamFile(qscript, libraries.flatMap(_._2.bamFile).head, bamFile.get): _*)
Peter van 't Hof's avatar
Peter van 't Hof committed
245
        case (MergeStrategy.PreProcessMergeSam | MergeStrategy.PreProcessMarkDuplicates) if libraries.flatMap(_._2.preProcessBam).size == 1 =>
Peter van 't Hof's avatar
Peter van 't Hof committed
246
          add(Ln.linkBamFile(qscript, libraries.flatMap(_._2.preProcessBam).head, bamFile.get): _*)
Peter van 't Hof's avatar
Peter van 't Hof committed
247
        case MergeStrategy.MergeSam =>
Peter van 't Hof's avatar
Peter van 't Hof committed
248
          add(MergeSamFiles(qscript, libraries.flatMap(_._2.bamFile).toList, bamFile.get, isIntermediate = !keepMergedFiles))
Peter van 't Hof's avatar
Peter van 't Hof committed
249
        case MergeStrategy.PreProcessMergeSam =>
Peter van 't Hof's avatar
Peter van 't Hof committed
250
          add(MergeSamFiles(qscript, libraries.flatMap(_._2.preProcessBam).toList, bamFile.get, isIntermediate = !keepMergedFiles))
Peter van 't Hof's avatar
Peter van 't Hof committed
251
        case MergeStrategy.MarkDuplicates =>
Peter van 't Hof's avatar
Peter van 't Hof committed
252
          add(MarkDuplicates(qscript, libraries.flatMap(_._2.bamFile).toList, bamFile.get, isIntermediate = !keepMergedFiles))
Peter van 't Hof's avatar
Peter van 't Hof committed
253
        case MergeStrategy.PreProcessMarkDuplicates =>
Peter van 't Hof's avatar
Peter van 't Hof committed
254
          add(MarkDuplicates(qscript, libraries.flatMap(_._2.preProcessBam).toList, bamFile.get, isIntermediate = !keepMergedFiles))
Peter van 't Hof's avatar
Peter van 't Hof committed
255
256
257
        case _ => throw new IllegalStateException("This should not be possible, unimplemented MergeStrategy?")
      }

258
      if (mergeStrategy != MergeStrategy.None && libraries.flatMap(_._2.bamFile).nonEmpty) {
Peter van 't Hof's avatar
Peter van 't Hof committed
259
260
261
262
263
        val bamMetrics = new BamMetrics(qscript)
        bamMetrics.sampleId = Some(sampleId)
        bamMetrics.inputBam = preProcessBam.get
        bamMetrics.outputDir = new File(sampleDir, "metrics")
        add(bamMetrics)
264
265

        if (config("execute_bam2wig", default = true)) add(Bam2Wig(qscript, preProcessBam.get))
Peter van 't Hof's avatar
Peter van 't Hof committed
266
      }
267

Peter van 't Hof's avatar
Peter van 't Hof committed
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
      mappingToGears match {
        case "unmapped" =>
          val gears = new GearsSingle(qscript)
          gears.bamFile = preProcessBam
          gears.sampleId = Some(sampleId)
          gears.outputDir = new File(sampleDir, "gears")
          add(gears)
        case "all" =>
          val gears = new GearsSingle(qscript)
          gears.fastqR1 = libraries.flatMap(_._2.inputR1).toList
          gears.fastqR2 = libraries.flatMap(_._2.inputR2).toList
          gears.sampleId = Some(sampleId)
          gears.outputDir = new File(sampleDir, "gears")
          add(gears)
        case "none" =>
Peter van 't Hof's avatar
Peter van 't Hof committed
283
        case x      => Logging.addError(s"$x is not a valid value for 'mapping_to_gears'")
284
      }
285
286
287
    }
  }
}
Peter van 't Hof's avatar
Peter van 't Hof committed
288

289
/** This class is the default implementation that can be used on the command line */
Peter van 't Hof's avatar
Peter van 't Hof committed
290
291
class MultisampleMapping(val root: Configurable) extends QScript with MultisampleMappingTrait {
  def this() = this(null)
Peter van 't Hof's avatar
Peter van 't Hof committed
292
293

  def summaryFile: File = new File(outputDir, "MultisamplePipeline.summary.json")
Peter van 't Hof's avatar
Peter van 't Hof committed
294
}
295
296
297
298

object MultisampleMapping extends PipelineCommand {

  object MergeStrategy extends Enumeration {
Peter van 't Hof's avatar
Peter van 't Hof committed
299
    val None, MergeSam, MarkDuplicates, PreProcessMergeSam, PreProcessMarkDuplicates = Value
300
301
  }

302
  /** When file is not absolute an error is raise att the end of the script of a pipeline */
Peter van 't Hof's avatar
Peter van 't Hof committed
303
  def fileMustBeAbsolute(file: Option[File]): Option[File] = {
304
305
306
307
308
309
310
311
    if (file.forall(_.isAbsolute)) file
    else {
      Logging.addError(s"$file should be a absolute file path")
      file.map(_.getAbsoluteFile)
    }
  }

}