SummaryQScript.scala 7.74 KB
Newer Older
bow's avatar
bow committed
1
2
3
4
5
6
7
8
9
10
/**
 * Biopet is built on top of GATK Queue for building bioinformatic
 * pipelines. It is mainly intended to support LUMC SHARK cluster which is running
 * SGE. But other types of HPC that are supported by GATK Queue (such as PBS)
 * should also be able to execute Biopet tools and pipelines.
 *
 * Copyright 2014 Sequencing Analysis Support Core - Leiden University Medical Center
 *
 * Contact us at: sasc@lumc.nl
 *
11
 * A dual licensing mode is applied. The source code within this project is freely available for non-commercial use under an AGPL
bow's avatar
bow committed
12
13
14
 * license; For commercial users or users who do not want to follow the AGPL
 * license, please contact us to obtain a separate license.
 */
15
16
package nl.lumc.sasc.biopet.core.summary

Peter van 't Hof's avatar
Peter van 't Hof committed
17
18
import java.io.{File, PrintWriter}
import java.sql.Date
19

20
import nl.lumc.sasc.biopet.core._
Peter van 't Hof's avatar
Peter van 't Hof committed
21
import nl.lumc.sasc.biopet.core.extensions.{CheckChecksum, Md5sum}
Peter van 't Hof's avatar
Peter van 't Hof committed
22
import nl.lumc.sasc.biopet.utils.summary.db.SummaryDb
23
import org.broadinstitute.gatk.queue.QScript
Peter van 't Hof's avatar
Peter van 't Hof committed
24
import nl.lumc.sasc.biopet.LastCommitHash
25

Peter van 't Hof's avatar
Peter van 't Hof committed
26
import scala.collection.mutable
27
28
29
import scala.concurrent.Await
import scala.concurrent.duration.Duration
import scala.io.Source
Peter van 't Hof's avatar
Peter van 't Hof committed
30

31
/**
32
33
 * This trait is used for qscript / pipelines that will produce a summary
 *
34
35
 * Created by pjvan_thof on 2/14/15.
 */
36
trait SummaryQScript extends BiopetQScript { qscript: QScript =>
37
38

  /** Key is sample/library, None is sample or library is not applicable */
Peter van 't Hof's avatar
Peter van 't Hof committed
39
  private[summary] var summarizables: Map[(String, Option[String], Option[String]), List[Summarizable]] = Map()
Peter van 't Hof's avatar
Peter van 't Hof committed
40
41

  /** Qscripts summaries that need to be merge into this summary */
42
43
  private[summary] var summaryQScripts: List[SummaryQScript] = Nil

Peter van 't Hof's avatar
Peter van 't Hof committed
44
  /** Name of the pipeline in the summary */
Sander Bollen's avatar
Sander Bollen committed
45
  var summaryName = configNamespace
46

Peter van 't Hof's avatar
Peter van 't Hof committed
47
  /** Must return a map with used settings for this pipeline */
Peter van 't Hof's avatar
Peter van 't Hof committed
48
49
  def summarySettings: Map[String, Any]

Peter van 't Hof's avatar
Peter van 't Hof committed
50
  /** File to put in the summary for thie pipeline */
Peter van 't Hof's avatar
Peter van 't Hof committed
51
52
  def summaryFiles: Map[String, File]

Peter van 't Hof's avatar
Peter van 't Hof committed
53
  /** Name of summary output file */
54
55
  def summaryFile: File

56
  def summaryDbFile: File = root match {
Peter van 't Hof's avatar
Peter van 't Hof committed
57
58
    case s: SummaryQScript => new File(s.outputDir, s"${s.summaryName}.summary.db")
    case _                 => throw new IllegalStateException("Root should be a SummaryQScript")
59
60
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
61
62
63
64
65
66
67
68
  /**
   * Add a module to summary for this pipeline
   *
   * Auto detect sample and library from pipeline
   *
   * @param summarizable summarizable to add to summary for this pipeline
   * @param name Name of module
   */
Peter van 't Hof's avatar
Peter van 't Hof committed
69
  def addSummarizable(summarizable: Summarizable, name: String): Unit = {
70
71
72
73
    this match {
      case tag: SampleLibraryTag => addSummarizable(summarizable, name, tag.sampleId, tag.libId)
      case _                     => addSummarizable(summarizable, name, None, None)
    }
Peter van 't Hof's avatar
Peter van 't Hof committed
74
75
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
76
77
78
79
80
  /**
   * Add a module to summary for this pipeline
   *
   * @param summarizable summarizable to add to summary for this pipeline
   * @param name Name of module
81
   * @param sampleId Id of sample
Peter van 't Hof's avatar
Peter van 't Hof committed
82
   */
Peter van 't Hof's avatar
Peter van 't Hof committed
83
84
85
86
  def addSummarizable(summarizable: Summarizable, name: String, sampleId: Option[String]): Unit = {
    addSummarizable(summarizable, name, sampleId, None)
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
87
88
89
90
91
  /**
   * Add a module to summary for this pipeline
   *
   * @param summarizable summarizable to add to summary for this pipeline
   * @param name Name of module
92
93
   * @param sampleId Id of sample
   * @param libraryId Id of libary
Peter van 't Hof's avatar
Peter van 't Hof committed
94
   */
Peter van 't Hof's avatar
Peter van 't Hof committed
95
96
  def addSummarizable(summarizable: Summarizable, name: String, sampleId: Option[String], libraryId: Option[String]): Unit = {
    if (libraryId.isDefined) require(sampleId.isDefined) // Library always require a sample
Peter van 't Hof's avatar
Peter van 't Hof committed
97
    summarizables += (name, sampleId, libraryId) -> (summarizable :: summarizables.getOrElse((name, sampleId, libraryId), Nil))
98
99
  }

100
  /** Add an other qscript to merge in output summary */
101
102
103
104
  def addSummaryQScript(summaryQScript: SummaryQScript): Unit = {
    summaryQScripts :+= summaryQScript
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
105
106
  private var addedJobs = false

107
108
109
110
111
112
113
114
115
116
117
  final lazy val summaryRunId: Int = {
    if (runIdFile.exists()) {
      val reader = Source.fromFile(runIdFile)
      val id = reader.getLines().next().toInt
      reader.close()
      id
    } else createRun
  }

  private def runIdFile = root match {
    case s: SummaryQScript => new File(s.outputDir, s".log/summary.runid")
Peter van 't Hof's avatar
Peter van 't Hof committed
118
    case _                 => throw new IllegalStateException("Root should be a SummaryQscript")
119
120
121
122
  }

  private def createRun(): Int = {
    val db = SummaryDb.openSqliteSummary(summaryDbFile)
Peter van 't Hof's avatar
Peter van 't Hof committed
123
    val dir = root match {
124
125
      case q: BiopetQScript => q.outputDir
      case _                => throw new IllegalStateException("Root should be a BiopetQscript")
Peter van 't Hof's avatar
Peter van 't Hof committed
126
    }
Peter van 't Hof's avatar
Peter van 't Hof committed
127
128
    val id = Await.result(db.createRun(summaryName, dir.getAbsolutePath, nl.lumc.sasc.biopet.Version,
      LastCommitHash, new Date(System.currentTimeMillis())), Duration.Inf)
Peter van 't Hof's avatar
Peter van 't Hof committed
129
    runIdFile.getParentFile.mkdir()
130
131
132
133
134
135
    val writer = new PrintWriter(runIdFile)
    writer.println(id)
    writer.close()
    id
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
136
  /** Add jobs to qscript to execute summary, also add checksum jobs */
137
  def addSummaryJobs(): Unit = {
Peter van 't Hof's avatar
Peter van 't Hof committed
138
    if (addedJobs) throw new IllegalStateException("Summary jobs for this QScript are already executed")
139
140
    val writeSummary = new WriteSummary(this)

Peter van 't Hof's avatar
Peter van 't Hof committed
141
    def addChecksum(file: File): Unit = {
142
143
144
      if (writeSummary.md5sum) {
        if (!SummaryQScript.md5sumCache.contains(file)) {
          val md5sum = new Md5sum(this) {
Sander Bollen's avatar
Sander Bollen committed
145
            override def configNamespace = "md5sum"
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160

            override def cmdLine: String = super.cmdLine + " || " +
              required("echo") + required("error_on_capture  " + input.toString) + " > " + required(output)
          }
          md5sum.input = file
          md5sum.output = new File(file.getParentFile, file.getName + ".md5")

          // Need to not write a md5 file outside the outputDir
          if (!file.getAbsolutePath.startsWith(outputDir.getAbsolutePath))
            md5sum.output = new File(outputDir, ".md5" + file.getAbsolutePath + ".md5")

          writeSummary.deps :+= md5sum.output
          SummaryQScript.md5sumCache += file -> md5sum.output
          add(md5sum)
        } else writeSummary.deps :+= SummaryQScript.md5sumCache(file)
161
162
163
164
      }
      //TODO: add more checksums types
    }

Peter van 't Hof's avatar
Peter van 't Hof committed
165
166
    for ((_, summarizableList) <- summarizables; summarizable <- summarizableList) {
      summarizable match {
167
168
        case f: BiopetCommandLineFunction => f.beforeGraph()
        case _                            =>
Peter van 't Hof's avatar
Peter van 't Hof committed
169
170
171
      }
    }

Peter van 't Hof's avatar
Peter van 't Hof committed
172
    //Automatic checksums
173
    for ((_, summarizableList) <- summarizables; summarizable <- summarizableList; (_, file) <- summarizable.summaryFiles) {
Peter van 't Hof's avatar
Peter van 't Hof committed
174
      addChecksum(file)
175
176
177
178
179
      summarizable match {
        case f: BiopetJavaCommandLineFunction => if (f.jarFile != null) addChecksum(f.jarFile)
        case _                                =>
      }
    }
Peter van 't Hof's avatar
Peter van 't Hof committed
180

Peter van 't Hof's avatar
Peter van 't Hof committed
181
182
183
184
185
186
187
188
189
    for (inputFile <- inputFiles) {
      inputFile.md5 match {
        case Some(checksum) => {
          val checkMd5 = new CheckChecksum
          checkMd5.inputFile = inputFile.file
          require(SummaryQScript.md5sumCache.contains(inputFile.file),
            s"Md5 job is not executed, checksum file can't be found for: ${inputFile.file}")
          checkMd5.checksumFile = SummaryQScript.md5sumCache(inputFile.file)
          checkMd5.checksum = checksum
Peter van 't Hof's avatar
Peter van 't Hof committed
190
          checkMd5.jobOutputFile = new File(checkMd5.checksumFile.getParentFile, checkMd5.checksumFile.getName + ".check.out")
Peter van 't Hof's avatar
Peter van 't Hof committed
191
192
193
194
195
196
          add(checkMd5)
        }
        case _ =>
      }
    }

Peter van 't Hof's avatar
Peter van 't Hof committed
197
198
199
    for ((_, file) <- this.summaryFiles)
      addChecksum(file)

200
    this match {
201
      case q: MultiSampleQScript if q.onlySamples.nonEmpty && !q.samples.forall(x => q.onlySamples.contains(x._1)) =>
202
203
204
        logger.info("Write summary is skipped because sample flag is used")
      case _ => add(writeSummary)
    }
Peter van 't Hof's avatar
Peter van 't Hof committed
205
206

    addedJobs = true
207
208
  }
}
Peter van 't Hof's avatar
Peter van 't Hof committed
209
210

object SummaryQScript {
Peter van 't Hof's avatar
Peter van 't Hof committed
211
212

  /** Cache to have no duplicate jobs */
213
  protected[summary] val md5sumCache: mutable.Map[File, File] = mutable.Map()
Peter van 't Hof's avatar
Peter van 't Hof committed
214
}