GatkPipeline.scala 10.6 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
package nl.lumc.sasc.biopet.pipelines.gatk

import nl.lumc.sasc.biopet.core.MultiSampleQScript
import nl.lumc.sasc.biopet.core.PipelineCommand
import nl.lumc.sasc.biopet.core.config.Configurable
import htsjdk.samtools.SamReaderFactory
import scala.collection.JavaConversions._
import java.io.File
import nl.lumc.sasc.biopet.extensions.gatk.{ CombineVariants, CombineGVCFs }
import nl.lumc.sasc.biopet.extensions.picard.AddOrReplaceReadGroups
import nl.lumc.sasc.biopet.extensions.picard.SamToFastq
import nl.lumc.sasc.biopet.pipelines.bammetrics.BamMetrics
import nl.lumc.sasc.biopet.pipelines.mapping.Mapping
import org.broadinstitute.gatk.queue.QScript
import org.broadinstitute.gatk.utils.commandline.{ Argument }

class GatkPipeline(val root: Configurable) extends QScript with MultiSampleQScript {
  def this() = this(null)

  @Argument(doc = "Only Sample", shortName = "sample", required = false)
  val onlySample: List[String] = Nil

  @Argument(doc = "Skip Genotyping step", shortName = "skipgenotyping", required = false)
  var skipGenotyping: Boolean = false

  @Argument(doc = "Merge gvcfs", shortName = "mergegvcfs", required = false)
  var mergeGvcfs: Boolean = false

  @Argument(doc = "Joint variantcalling", shortName = "jointVariantCalling", required = false)
  var jointVariantcalling: Boolean = config("joint_variantcalling", default = false)

  @Argument(doc = "Joint genotyping", shortName = "jointGenotyping", required = false)
  var jointGenotyping: Boolean = config("joint_genotyping", default = false)

  var singleSampleCalling = config("single_sample_calling", default = true)
  var reference: File = config("reference", required = true)
  var dbsnp: File = config("dbsnp")
  var gvcfFiles: List[File] = Nil
  var finalBamFiles: List[File] = Nil
  var useAllelesOption: Boolean = config("use_alleles_option", default = false)

  class LibraryOutput extends AbstractLibraryOutput {
    var mappedBamFile: File = _
    var variantcalling: GatkVariantcalling.ScriptOutput = _
  }

  class SampleOutput extends AbstractSampleOutput {
    var variantcalling: GatkVariantcalling.ScriptOutput = _
  }

  def init() {
    if (config.contains("target_bed")) {
      defaults ++= Map("gatk" -> Map(("intervals" -> config("target_bed").asStringList)))
    }
    if (config.contains("gvcfFiles"))
      for (file <- config("gvcfFiles").asList)
        gvcfFiles :+= file.toString
    if (outputDir == null) throw new IllegalStateException("Missing Output directory on gatk module")
    else if (!outputDir.endsWith("/")) outputDir += "/"
  }

  val multisampleVariantcalling = new GatkVariantcalling(this) {
    override protected lazy val configName = "gatkvariantcalling"
    override def configPath: List[String] = "multisample" :: super.configPath
  }

  def biopetScript() {
    if (onlySample.isEmpty) {
      runSamplesJobs

      //SampleWide jobs
      if (mergeGvcfs && gvcfFiles.size > 0) {
        val newFile = outputDir + "merged.gvcf.vcf.gz"
        add(CombineGVCFs(this, gvcfFiles, newFile))
        gvcfFiles = List(newFile)
      }

      if (!skipGenotyping && gvcfFiles.size > 0) {
        if (jointGenotyping) {
          val gatkGenotyping = new GatkGenotyping(this)
          gatkGenotyping.inputGvcfs = gvcfFiles
          gatkGenotyping.outputDir = outputDir + "genotyping/"
          gatkGenotyping.init
          gatkGenotyping.biopetScript
          addAll(gatkGenotyping.functions)
          var vcfFile = gatkGenotyping.outputFile
        }
      } else logger.warn("No gVCFs to genotype")

      if (jointVariantcalling) {
        val allBamfiles = for (
          (sampleID, sampleOutput) <- samplesOutput;
          file <- sampleOutput.variantcalling.bamFiles
        ) yield file
        val allRawVcfFiles = for ((sampleID, sampleOutput) <- samplesOutput) yield sampleOutput.variantcalling.rawFilterVcfFile

        val gatkVariantcalling = new GatkVariantcalling(this) {
          override protected lazy val configName = "gatkvariantcalling"
          override def configPath: List[String] = "multisample" :: super.configPath
        }

        if (gatkVariantcalling.useMpileup) {
          val cvRaw = CombineVariants(this, allRawVcfFiles.toList, outputDir + "variantcalling/multisample.raw.vcf.gz")
          add(cvRaw)
          gatkVariantcalling.rawVcfInput = cvRaw.out
        }

        multisampleVariantcalling.preProcesBams = false
        multisampleVariantcalling.doublePreProces = false
        multisampleVariantcalling.inputBams = allBamfiles.toList
        multisampleVariantcalling.outputDir = outputDir + "variantcalling"
        multisampleVariantcalling.outputName = "multisample"
        multisampleVariantcalling.init
        multisampleVariantcalling.biopetScript
        addAll(multisampleVariantcalling.functions)

        if (config("inputtype", default = "dna").asString != "rna" && config("recalibration", default = false).asBoolean) {
          val recalibration = new GatkVariantRecalibration(this)
          recalibration.inputVcf = multisampleVariantcalling.scriptOutput.finalVcfFile
          recalibration.bamFiles = finalBamFiles
          recalibration.outputDir = outputDir + "recalibration/"
          recalibration.init
          recalibration.biopetScript
        }
      }
    } else for (sample <- onlySample) runSingleSampleJobs(sample)
  }

  // Called for each sample
  def runSingleSampleJobs(sampleConfig: Map[String, Any]): SampleOutput = {
    val sampleOutput = new SampleOutput
    var libraryBamfiles: List[File] = List()
    val sampleID: String = sampleConfig("ID").toString
    sampleOutput.libraries = runLibraryJobs(sampleConfig)
    val sampleDir = globalSampleDir + sampleID
    for ((libraryID, libraryOutput) <- sampleOutput.libraries) {
      libraryBamfiles ++= libraryOutput.variantcalling.bamFiles
    }

    if (libraryBamfiles.size > 0) {
      finalBamFiles ++= libraryBamfiles
      val gatkVariantcalling = new GatkVariantcalling(this)
      gatkVariantcalling.inputBams = libraryBamfiles
      gatkVariantcalling.outputDir = sampleDir + "/variantcalling/"
      gatkVariantcalling.preProcesBams = false
      if (!singleSampleCalling) {
        gatkVariantcalling.useHaplotypecaller = false
        gatkVariantcalling.useUnifiedGenotyper = false
      }
      gatkVariantcalling.sampleID = sampleID
      gatkVariantcalling.init
      gatkVariantcalling.biopetScript
      addAll(gatkVariantcalling.functions)
      sampleOutput.variantcalling = gatkVariantcalling.scriptOutput
      gvcfFiles :+= gatkVariantcalling.scriptOutput.gvcfFile
    } else logger.warn("No bamfiles for variant calling for sample: " + sampleID)
    return sampleOutput
  }

  // Called for each run from a sample
  def runSingleLibraryJobs(runConfig: Map[String, Any], sampleConfig: Map[String, Any]): LibraryOutput = {
    val libraryOutput = new LibraryOutput
    val runID: String = runConfig("ID").toString
    val sampleID: String = sampleConfig("ID").toString
    val runDir: String = globalSampleDir + sampleID + "/run_" + runID + "/"
    var inputType = ""
    if (runConfig.contains("inputtype")) inputType = runConfig("inputtype").toString
    else inputType = config("inputtype", default = "dna").toString
    if (runConfig.contains("R1")) {
      val mapping = Mapping.loadFromLibraryConfig(this, runConfig, sampleConfig, runDir)
      addAll(mapping.functions) // Add functions of mapping to curent function pool
      libraryOutput.mappedBamFile = mapping.outputFiles("finalBamFile")
    } else if (runConfig.contains("bam")) {
      var bamFile = new File(runConfig("bam").toString)
      if (!bamFile.exists) throw new IllegalStateException("Bam in config does not exist, file: " + bamFile)

      if (config("bam_to_fastq", default = false).asBoolean) {
        val samToFastq = SamToFastq(this, bamFile, runDir + sampleID + "-" + runID + ".R1.fastq",
          runDir + sampleID + "-" + runID + ".R2.fastq")
        add(samToFastq, isIntermediate = true)
        val mapping = Mapping.loadFromLibraryConfig(this, runConfig, sampleConfig, runDir, startJobs = false)
        mapping.input_R1 = samToFastq.fastqR1
        mapping.input_R2 = samToFastq.fastqR2
        mapping.init
        mapping.biopetScript
        addAll(mapping.functions) // Add functions of mapping to curent function pool
        libraryOutput.mappedBamFile = mapping.outputFiles("finalBamFile")
      } else {
        var readGroupOke = true
        val inputSam = SamReaderFactory.makeDefault.open(bamFile)
        val header = inputSam.getFileHeader.getReadGroups
        for (readGroup <- inputSam.getFileHeader.getReadGroups) {
          if (readGroup.getSample != sampleID) logger.warn("Sample ID readgroup in bam file is not the same")
          if (readGroup.getLibrary != runID) logger.warn("Library ID readgroup in bam file is not the same")
          if (readGroup.getSample != sampleID || readGroup.getLibrary != runID) readGroupOke = false
        }
        inputSam.close

        if (!readGroupOke) {
          if (config("correct_readgroups", default = false)) {
            logger.info("Correcting readgroups, file:" + bamFile)
            val aorrg = AddOrReplaceReadGroups(this, bamFile, new File(runDir + sampleID + "-" + runID + ".bam"))
            aorrg.RGID = sampleID + "-" + runID
            aorrg.RGLB = runID
            aorrg.RGSM = sampleID
            if (runConfig.contains("PL")) aorrg.RGPL = runConfig("PL").toString
            else aorrg.RGPL = "illumina"
            if (runConfig.contains("PU")) aorrg.RGPU = runConfig("PU").toString
            else aorrg.RGPU = "na"
            if (runConfig.contains("CN")) aorrg.RGCN = runConfig("CN").toString
            add(aorrg, isIntermediate = true)
            bamFile = aorrg.output
213
214
          } else throw new IllegalStateException("Sample readgroup and/or library of input bamfile is not correct, file: " + bamFile +
            "\nPlease note that it is possible to set 'correct_readgroups' to true in the config to automatic fix this")
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
        }
        addAll(BamMetrics(this, bamFile, runDir + "metrics/").functions)

        libraryOutput.mappedBamFile = bamFile
      }
    } else logger.error("Sample: " + sampleID + ": No R1 found for run: " + runConfig)

    val gatkVariantcalling = new GatkVariantcalling(this)
    gatkVariantcalling.inputBams = List(libraryOutput.mappedBamFile)
    gatkVariantcalling.outputDir = runDir
    gatkVariantcalling.variantcalling = config("library_variantcalling", default = false)
    gatkVariantcalling.preProcesBams = true
    gatkVariantcalling.sampleID = sampleID
    gatkVariantcalling.init
    gatkVariantcalling.biopetScript
    addAll(gatkVariantcalling.functions)
    libraryOutput.variantcalling = gatkVariantcalling.scriptOutput

    return libraryOutput
  }
}

object GatkPipeline extends PipelineCommand