Yamsvp.scala 5.51 KB
Newer Older
Peter van 't Hof's avatar
Peter van 't Hof committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
/*
 * Structural variation calling
 */

package nl.lumc.sasc.biopet.pipelines.yamsvp

import nl.lumc.sasc.biopet.core.config.Configurable
import nl.lumc.sasc.biopet.core.MultiSampleQScript
import nl.lumc.sasc.biopet.core.PipelineCommand

import nl.lumc.sasc.biopet.extensions.sambamba.{ SambambaIndex, SambambaMerge }
import nl.lumc.sasc.biopet.extensions.svcallers.pindel.Pindel
import nl.lumc.sasc.biopet.extensions.svcallers.{ Breakdancer, Clever }

import nl.lumc.sasc.biopet.pipelines.mapping.Mapping

import org.broadinstitute.gatk.queue.QScript
import org.broadinstitute.gatk.queue.function._
import org.broadinstitute.gatk.queue.engine.JobRunInfo

class Yamsvp(val root: Configurable) extends QScript with MultiSampleQScript {
  def this() = this(null)

  var reference: File = config("reference", required = true)
  var finalBamFiles: List[File] = Nil

  class LibraryOutput extends AbstractLibraryOutput {
    var mappedBamFile: File = _
  }

  class SampleOutput extends AbstractSampleOutput {
    var vcf: Map[String, List[File]] = Map()
    var mappedBamFile: File = _
  }

  override def init() {
    if (outputDir == null)
      throw new IllegalStateException("Output directory is not specified in the config / argument")
    else if (!outputDir.endsWith("/"))
      outputDir += "/"
  }

  def biopetScript() {
    // write the pipeline here
    // start with QC, alignment, call sambamba, call sv callers, reporting

    // read config and set all parameters for the pipeline
    logger.info("Starting YAM SV Pipeline")
    runSamplesJobs
    //

  }

  override def onExecutionDone(jobs: Map[QFunction, JobRunInfo], success: Boolean) {
    logger.info("YAM SV Pipeline has run .......................")
  }

  def runSingleSampleJobs(sampleConfig: Map[String, Any]): SampleOutput = {
    val sampleOutput = new SampleOutput
    var libraryBamfiles: List[File] = List()
    var outputFiles: Map[String, List[File]] = Map()
    var libraryFastqFiles: List[File] = List()
    val sampleID: String = sampleConfig("ID").toString
    val sampleDir: String = outputDir + sampleID + "/"
    val alignmentDir: String = sampleDir + "alignment/"

    val svcallingDir: String = sampleDir + "svcalls/"

    sampleOutput.libraries = runLibraryJobs(sampleConfig)
    for ((libraryID, libraryOutput) <- sampleOutput.libraries) {
      // this is extending the libraryBamfiles list like '~=' in D or .append in Python or .push_back in C++
      libraryBamfiles ++= List(libraryOutput.mappedBamFile)
    }

    val bamFile: File =
      if (libraryBamfiles.size == 1) libraryBamfiles.head
      else if (libraryBamfiles.size > 1) {
        val mergeSamFiles = new SambambaMerge(root)
        mergeSamFiles.input = libraryBamfiles
        mergeSamFiles.output = alignmentDir + sampleID + ".merged.bam"
        add(mergeSamFiles)
        mergeSamFiles.output
      } else null

    val bamIndex = SambambaIndex(root, bamFile)
    add(bamIndex)

    /// bamfile will be used as input for the SV callers. First run Clever
    //    val cleverVCF : File = sampleDir + "/" + sampleID + ".clever.vcf"

    val cleverDir = svcallingDir + sampleID + ".clever/"
    val clever = Clever(this, bamFile, this.reference, svcallingDir, cleverDir)
    clever.deps = List(bamIndex.output)
    sampleOutput.vcf += ("clever" -> List(clever.outputvcf))
    add(clever)

    val breakdancerDir = svcallingDir + sampleID + ".breakdancer/"
    val breakdancer = Breakdancer(this, bamFile, this.reference, breakdancerDir)
    sampleOutput.vcf += ("breakdancer" -> List(breakdancer.outputvcf))
    addAll(breakdancer.functions)

    // for pindel we should use per library config collected into one config file
    //    val pindelDir = svcallingDir + sampleID + ".pindel/"
    //    val pindel = Pindel(this, bamFile, this.reference, pindelDir)
    //    sampleOutput.vcf += ("pindel" -> List(pindel.outputvcf))
    //    addAll(pindel.functions)
    //
    //    
    return sampleOutput
  }

  // Called for each run from a sample

  def runSingleLibraryJobs(runConfig: Map[String, Any], sampleConfig: Map[String, Any]): LibraryOutput = {
    val libraryOutput = new LibraryOutput

    val runID: String = runConfig("ID").toString
    val sampleID: String = sampleConfig("ID").toString
    val alignmentDir: String = outputDir + sampleID + "/alignment/"
    val runDir: String = alignmentDir + "run_" + runID + "/"

    if (runConfig.contains("R1")) {
      val mapping = new Mapping(this)

      mapping.aligner = config("aligner", default = "stampy")
      mapping.skipFlexiprep = false
      mapping.skipMarkduplicates = true // we do the dedup marking using Sambamba

      if (runConfig.contains("R1")) mapping.input_R1 = new File(runConfig("R1").toString)
      if (runConfig.contains("R2")) mapping.input_R2 = new File(runConfig("R2").toString)
      mapping.paired = (mapping.input_R2 != null)
      mapping.RGLB = runConfig("ID").toString
      mapping.RGSM = sampleConfig("ID").toString
      if (runConfig.contains("PL")) mapping.RGPL = runConfig("PL").toString
      if (runConfig.contains("PU")) mapping.RGPU = runConfig("PU").toString
      if (runConfig.contains("CN")) mapping.RGCN = runConfig("CN").toString
      mapping.outputDir = runDir

      mapping.init
      mapping.biopetScript
      addAll(mapping.functions)

      // start sambamba dedup

      libraryOutput.mappedBamFile = mapping.outputFiles("finalBamFile")
    } else this.logger.error("Sample: " + sampleID + ": No R1 found for run: " + runConfig)
    return libraryOutput
    //    logger.debug(outputFiles)
    //    return outputFiles
  }
}

object Yamsvp extends PipelineCommand