Gears.scala 6.47 KB
Newer Older
Peter van 't Hof's avatar
Peter van 't Hof committed
1 2 3 4 5 6 7 8 9 10
/**
 * Biopet is built on top of GATK Queue for building bioinformatic
 * pipelines. It is mainly intended to support LUMC SHARK cluster which is running
 * SGE. But other types of HPC that are supported by GATK Queue (such as PBS)
 * should also be able to execute Biopet tools and pipelines.
 *
 * Copyright 2014 Sequencing Analysis Support Core - Leiden University Medical Center
 *
 * Contact us at: sasc@lumc.nl
 *
11
 * A dual licensing mode is applied. The source code within this project is freely available for non-commercial use under an AGPL
Peter van 't Hof's avatar
Peter van 't Hof committed
12 13 14
 * license; For commercial users or users who do not want to follow the AGPL
 * license, please contact us to obtain a separate license.
 */
15 16
package nl.lumc.sasc.biopet.pipelines.gears

Peter van 't Hof's avatar
Peter van 't Hof committed
17
import nl.lumc.sasc.biopet.core.BiopetQScript.InputFile
18
import nl.lumc.sasc.biopet.core.{MultiSampleQScript, PipelineCommand}
19
import nl.lumc.sasc.biopet.extensions.tools.MergeOtuMaps
20
import nl.lumc.sasc.biopet.extensions.{Bgzip, Gzip, Ln, Zcat}
21
import nl.lumc.sasc.biopet.extensions.qiime.MergeOtuTables
22
import nl.lumc.sasc.biopet.extensions.seqtk.SeqtkSample
23
import nl.lumc.sasc.biopet.pipelines.flexiprep.Flexiprep
24 25 26 27
import nl.lumc.sasc.biopet.utils.config.Configurable
import org.broadinstitute.gatk.queue.QScript

/**
Peter van 't Hof's avatar
Peter van 't Hof committed
28 29
 * Created by pjvanthof on 03/12/15.
 */
30 31 32
class Gears(val root: Configurable) extends QScript with MultiSampleQScript { qscript =>
  def this() = this(null)

Peter van 't Hof's avatar
Peter van 't Hof committed
33 34 35 36 37 38 39
  override def reportClass = {
    val gearsReport = new GearsReport(this)
    gearsReport.outputDir = new File(outputDir, "report")
    gearsReport.summaryFile = summaryFile
    Some(gearsReport)
  }

40 41
  override def fixedValues = Map("gearssingle" -> Map("skip_flexiprep" -> true))

42 43 44 45 46 47 48 49 50 51 52 53 54
  /** Init for pipeline */
  def init(): Unit = {
  }

  /** Name of summary output file */
  def summaryFile: File = new File(outputDir, "gears.summary.json")

  /** Pipeline itself */
  def biopetScript(): Unit = {
    addSamplesJobs()
    addSummaryJobs()
  }

55 56 57 58 59 60 61 62 63 64
  def qiimeClosedDir: Option[File] = {
    if (samples.values.flatMap(_.gs.qiimeClosed).nonEmpty) {
      Some(new File(outputDir, "qiime_closed_reference"))
    } else None

  }

  def qiimeClosedOtuTable: Option[File] = qiimeClosedDir.map(new File(_, "otu_table.biom"))
  def qiimeClosedOtuMap: Option[File] = qiimeClosedDir.map(new File(_, "otu_map.txt"))

65
  /**
Peter van 't Hof's avatar
Peter van 't Hof committed
66 67
   * Method where the multisample jobs should be added, this will be executed only when running the -sample argument is not given.
   */
68
  def addMultiSampleJobs(): Unit = {
69 70 71
    val gss = samples.values.flatMap(_.gs.qiimeClosed).toList
    val closedOtuTables = gss.map(_.otuTable)
    val closedOtuMaps = gss.map(_.otuMap)
72 73 74 75 76
    require(closedOtuTables.size == closedOtuMaps.size)
    if (closedOtuTables.nonEmpty) {
      if (closedOtuTables.size > 1) {
        val mergeTables = new MergeOtuTables(qscript)
        mergeTables.input = closedOtuTables
77
        mergeTables.outputFile = qiimeClosedOtuTable.get
78 79
        add(mergeTables)

Peter van 't Hof's avatar
Peter van 't Hof committed
80
        val mergeMaps = new MergeOtuMaps(qscript)
81
        mergeMaps.input = closedOtuMaps
82
        mergeMaps.output = qiimeClosedOtuMap.get
83 84 85
        add(mergeMaps)

      } else {
86 87
        add(Ln(qscript, closedOtuMaps.head, qiimeClosedOtuMap.get))
        add(Ln(qscript, closedOtuTables.head, qiimeClosedOtuTable.get))
88 89 90 91 92
      }

      //TODO: Plots

    }
93 94 95
  }

  /**
Peter van 't Hof's avatar
Peter van 't Hof committed
96 97 98 99
   * Factory method for Sample class
   * @param id SampleId
   * @return Sample class
   */
100 101 102 103
  def makeSample(id: String): Sample = new Sample(id)

  class Sample(sampleId: String) extends AbstractSample(sampleId) {
    /**
Peter van 't Hof's avatar
Peter van 't Hof committed
104 105 106 107
     * Factory method for Library class
     * @param id SampleId
     * @return Sample class
     */
108 109 110
    def makeLibrary(id: String): Library = new Library(id)

    class Library(libId: String) extends AbstractLibrary(libId) {
111

112 113 114
      lazy val flexiprep = new Flexiprep(qscript)
      flexiprep.sampleId = Some(sampleId)
      flexiprep.libId = Some(libId)
Sander van der Zeeuw's avatar
Sander van der Zeeuw committed
115
      flexiprep.inputR1 = config("R1")
116
      flexiprep.inputR2 = config("R2")
117 118
      flexiprep.outputDir = new File(libDir, "flexiprep")

119 120 121
      lazy val gs = new GearsSingle(qscript)
      gs.sampleId = Some(sampleId)
      gs.libId = Some(libId)
Peter van 't Hof's avatar
Peter van 't Hof committed
122
      gs.outputDir = libDir
123 124 125

      /** Function that add library jobs */
      protected def addJobs(): Unit = {
Sander van der Zeeuw's avatar
Sander van der Zeeuw committed
126
        inputFiles :+= InputFile(flexiprep.inputR1, config("R1_md5"))
127
        flexiprep.inputR2.foreach(inputFiles :+= InputFile(_, config("R2_md5")))
128 129
        add(flexiprep)

130 131
        gs.fastqR1 = Some(addDownsample(flexiprep.fastqR1Qc, gs.outputDir))
        gs.fastqR2 = flexiprep.fastqR2Qc.map(addDownsample(_, gs.outputDir))
132
        add(gs)
133 134 135 136 137 138 139 140 141
      }

      /** Must return files to store into summary */
      def summaryFiles: Map[String, File] = Map()

      /** Must returns stats to store into summary */
      def summaryStats = Map()
    }

142 143 144
    lazy val gs = new GearsSingle(qscript)
    gs.sampleId = Some(sampleId)
    gs.outputDir = sampleDir
145 146 147 148

    /** Function to add sample jobs */
    protected def addJobs(): Unit = {
      addPerLibJobs()
Peter van 't Hof's avatar
Peter van 't Hof committed
149

150 151 152 153 154
      val flexipreps = libraries.values.map(_.flexiprep).toList

      val mergeR1: File = new File(sampleDir, s"$sampleId.R1.fq.gz")
      add(Zcat(qscript, flexipreps.map(_.fastqR1Qc)) | new Gzip(qscript) > mergeR1)

Peter van 't Hof's avatar
Peter van 't Hof committed
155
      val mergeR2 = if (flexipreps.exists(_.paired)) Some(new File(sampleDir, s"$sampleId.R2.fq.gz")) else None
156 157
      mergeR2.foreach { file =>
        add(Zcat(qscript, flexipreps.flatMap(_.fastqR2Qc)) | new Gzip(qscript) > file)
158
      }
159

160 161
      gs.fastqR1 = Some(addDownsample(mergeR1, gs.outputDir))
      gs.fastqR2 = mergeR2.map(addDownsample(_, gs.outputDir))
162
      add(gs)
163 164
    }

165 166 167 168 169 170 171
    /** Must return files to store into summary */
    def summaryFiles: Map[String, File] = Map()

    /** Must returns stats to store into summary */
    def summaryStats: Any = Map()
  }

172 173 174 175 176 177 178 179 180 181 182 183 184 185 186
  val downSample: Option[Double] = config("downsample")

  def addDownsample(input: File, dir: File): File = {
    downSample match {
      case Some(x) =>
        val output = new File(dir, input.getName + ".fq.gz")
        val seqtk = new SeqtkSample(this)
        seqtk.input = input
        seqtk.sample = x
        add(seqtk | new Bgzip(this) > output)
        output
      case _ => input
    }
  }

187
  /** Must return a map with used settings for this pipeline */
188
  def summarySettings: Map[String, Any] = Map("downsample" -> downSample)
189 190

  /** File to put in the summary for thie pipeline */
191
  def summaryFiles: Map[String, File] = (
Peter van 't Hof's avatar
Peter van 't Hof committed
192 193 194
    qiimeClosedOtuTable.map("qiime_closed_otu_table" -> _) ++
    qiimeClosedOtuMap.map("qiime_closed_otu_map" -> _)
  ).toMap
195 196 197
}

object Gears extends PipelineCommand