WriteSummary.scala 11 KB
Newer Older
bow's avatar
bow committed
1
2
3
4
5
6
7
8
9
10
/**
 * Biopet is built on top of GATK Queue for building bioinformatic
 * pipelines. It is mainly intended to support LUMC SHARK cluster which is running
 * SGE. But other types of HPC that are supported by GATK Queue (such as PBS)
 * should also be able to execute Biopet tools and pipelines.
 *
 * Copyright 2014 Sequencing Analysis Support Core - Leiden University Medical Center
 *
 * Contact us at: sasc@lumc.nl
 *
11
 * A dual licensing mode is applied. The source code within this project is freely available for non-commercial use under an AGPL
bow's avatar
bow committed
12
13
14
 * license; For commercial users or users who do not want to follow the AGPL
 * license, please contact us to obtain a separate license.
 */
15
16
package nl.lumc.sasc.biopet.core.summary

Peter van 't Hof's avatar
Peter van 't Hof committed
17
import java.io.{ File, PrintWriter }
18

Peter van 't Hof's avatar
Peter van 't Hof committed
19
import nl.lumc.sasc.biopet.utils.config.Configurable
20
import nl.lumc.sasc.biopet.core._
21
import nl.lumc.sasc.biopet.utils.ConfigUtils
Peter van 't Hof's avatar
Peter van 't Hof committed
22
import nl.lumc.sasc.biopet.LastCommitHash
Peter van 't Hof's avatar
Peter van 't Hof committed
23
import nl.lumc.sasc.biopet.utils.summary.db.SummaryDb
Peter van 't Hof's avatar
Peter van 't Hof committed
24
25
import org.broadinstitute.gatk.queue.function.{ InProcessFunction, QFunction }
import org.broadinstitute.gatk.utils.commandline.{ Input, Output }
Peter van 't Hof's avatar
Peter van 't Hof committed
26
27
28

import scala.collection.mutable
import scala.io.Source
29
30
import scala.concurrent.Await
import scala.concurrent.duration.Duration
Peter van 't Hof's avatar
Peter van 't Hof committed
31
import scala.concurrent.ExecutionContext.Implicits.global
32

33
/**
34
35
 * This will collect and write the summary
 *
36
37
 * Created by pjvan_thof on 2/14/15.
 */
38
class WriteSummary(val parent: SummaryQScript) extends InProcessFunction with Configurable {
39
40
  this.analysisName = getClass.getSimpleName

Peter van 't Hof's avatar
Peter van 't Hof committed
41
42
  require(parent != null)

Peter van 't Hof's avatar
Peter van 't Hof committed
43
  /** To access qscript for this summary */
44
  val qscript = parent
45
46
47
48
49
50
51
52

  @Input(doc = "deps", required = false)
  var deps: List[File] = Nil

  var md5sum: Boolean = config("summary_md5", default = true)
  //TODO: add more checksums types

  override def freezeFieldValues(): Unit = {
Peter van 't Hof's avatar
Peter van 't Hof committed
53
54
55
56
57
    init()
    super.freezeFieldValues()
  }

  def init(): Unit = {
Peter van 't Hof's avatar
Peter van 't Hof committed
58
59
60
61
    qscript.summarizables.foreach(_._2.foreach { s =>
      if (!s.addToQscriptSummaryDone) s.addToQscriptSummary(qscript)
      s.addToQscriptSummaryDone = true
    })
Peter van 't Hof's avatar
Peter van 't Hof committed
62

63
    val db = SummaryDb.openSqliteSummary(qscript.summaryDbFile)
Peter van 't Hof's avatar
Peter van 't Hof committed
64
    if (qscript == root) { // This initialize the database
65
66
      qscript match {
        case s: MultiSampleQScript => s.initSummaryDb
Peter van 't Hof's avatar
Peter van 't Hof committed
67
68
69
        case t: SampleLibraryTag => t.sampleId.foreach {
          case sampleName =>
            val sampleId = Await.result(db.getSamples(name = Some(sampleName), runId = Some(qscript.summaryRunId)).map(_.headOption.map(_.id)), Duration.Inf).getOrElse {
70
              Await.result(db.createOrUpdateSample(sampleName, qscript.summaryRunId), Duration.Inf)
Peter van 't Hof's avatar
Peter van 't Hof committed
71
72
73
            }
            t.libId.foreach { libName =>
              val libId = Await.result(db.getSamples(name = Some(libName), runId = Some(qscript.summaryRunId), sampleId = Some(sampleId)).map(_.headOption.map(_.id)), Duration.Inf).getOrElse {
74
                Await.result(db.createOrUpdateLibrary(libName, qscript.summaryRunId, sampleId), Duration.Inf)
Peter van 't Hof's avatar
Peter van 't Hof committed
75
              }
76
77
            }
        }
Peter van 't Hof's avatar
Peter van 't Hof committed
78
        case _ => qscript.summaryRunId
79
      }
Peter van 't Hof's avatar
Peter van 't Hof committed
80
    }
81
82
    val pipelineId = Await.result(db.createPipeline(qscript.summaryName, qscript.summaryRunId), Duration.Inf)
    qscript.summarizables.map(x => Await.result(db.createModule(x._1._1, qscript.summaryRunId, pipelineId), Duration.Inf))
83

Peter van 't Hof's avatar
Peter van 't Hof committed
84
85
86
87
88
89
    for ((_, l) <- qscript.summarizables; s <- l) {
      deps :::= s.summaryDeps
      s match {
        case f: QFunction if qscript.functions.contains(f) => try {
          deps :+= f.firstOutput
        } catch {
Peter van 't Hof's avatar
typo    
Peter van 't Hof committed
90
          case e: NullPointerException => logger.debug("Queue values are not initialized")
Peter van 't Hof's avatar
Peter van 't Hof committed
91
92
        }
        case _ =>
93
      }
94
    }
Peter van 't Hof's avatar
Peter van 't Hof committed
95

96
    jobOutputFile = new File(qscript.outputDir, s".${qscript.summaryName}.summary.out")
97
98
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
99
  /** Function to create summary */
100
  def run(): Unit = {
Peter van 't Hof's avatar
Peter van 't Hof committed
101
102
    val db = SummaryDb.openSqliteSummary(qscript.summaryDbFile)

Peter van 't Hof's avatar
Peter van 't Hof committed
103
104
    val outputDir = new File(Await.result(db.getRuns(runId = Some(qscript.summaryRunId)).map(_.head.outputDir), Duration.Inf))

Peter van 't Hof's avatar
Peter van 't Hof committed
105
106
    val pipelineId = Await.result(db.getPipelines(name = Some(qscript.summaryName), runId = Some(qscript.summaryRunId)).map(_.head.id), Duration.Inf)

Peter van 't Hof's avatar
Peter van 't Hof committed
107
    for (((name, sampleName, libName), summarizables) <- qscript.summarizables.par) {
Peter van 't Hof's avatar
Peter van 't Hof committed
108
109
110
111
      require(summarizables.nonEmpty)
      val stats = ConfigUtils.anyToJson(if (summarizables.size == 1) summarizables.head.summaryStats
      else {
        val s = summarizables.map(_.summaryStats)
Peter van 't Hof's avatar
Peter van 't Hof committed
112
        s.tail.foldLeft(Map("stats" -> s.head))((a, b) =>
Peter van 't Hof's avatar
Peter van 't Hof committed
113
114
          ConfigUtils.mergeMaps(a, Map("stats" -> b), summarizables.head.resolveSummaryConflict))("stats")
      })
Peter van 't Hof's avatar
Peter van 't Hof committed
115
116
117
118
119
120
121
122
123
      val moduleId = Await.result(db.getModules(name = Some(name), runId = Some(qscript.summaryRunId), pipelineId = Some(pipelineId))
        .map(_.head.id), Duration.Inf)
      val sampleId = sampleName.map(name => Await.result(db.getSamples(runId = Some(qscript.summaryRunId), name = Some(name)).map(_.head.id), Duration.Inf))
      val libId = libName.map(name => Await.result(db.getLibraries(runId = Some(qscript.summaryRunId), name = Some(name),
        sampleId = sampleId).map(_.head.id), Duration.Inf))
      db.createOrUpdateStat(qscript.summaryRunId, pipelineId, Some(moduleId),
        sampleId, libId, stats.nospaces)

      for ((key, file) <- summarizables.head.summaryFiles.par)
Peter van 't Hof's avatar
Peter van 't Hof committed
124
        Await.result(WriteSummary.createFile(db, qscript.summaryRunId, pipelineId, Some(moduleId), sampleId, libId, key, file, outputDir), Duration.Inf)
Peter van 't Hof's avatar
Peter van 't Hof committed
125
    }
Peter van 't Hof's avatar
Peter van 't Hof committed
126

Peter van 't Hof's avatar
Peter van 't Hof committed
127
128
129
130
    qscript match {
      case tag: SampleLibraryTag =>
        val sampleId = tag.sampleId.flatMap(name => Await.result(db.getSampleId(qscript.summaryRunId, name), Duration.Inf))
        val libId = tag.libId.flatMap(name => sampleId.flatMap(sampleId => Await.result(db.getLibraryId(qscript.summaryRunId, sampleId, name), Duration.Inf)))
Peter van 't Hof's avatar
Peter van 't Hof committed
131
        for ((key, file) <- qscript.summaryFiles.par)
Peter van 't Hof's avatar
Peter van 't Hof committed
132
          Await.result(WriteSummary.createFile(db, qscript.summaryRunId, pipelineId, None, sampleId, libId, key, file, outputDir), Duration.Inf)
Peter van 't Hof's avatar
Peter van 't Hof committed
133
134
135
        db.createOrUpdateSetting(qscript.summaryRunId, pipelineId, None, sampleId, libId, ConfigUtils.mapToJson(tag.summarySettings).nospaces)
      case q: MultiSampleQScript =>
        // Global level
Peter van 't Hof's avatar
Peter van 't Hof committed
136
        for ((key, file) <- qscript.summaryFiles.par)
Peter van 't Hof's avatar
Peter van 't Hof committed
137
          Await.result(WriteSummary.createFile(db, q.summaryRunId, pipelineId, None, None, None, key, file, outputDir), Duration.Inf)
Peter van 't Hof's avatar
Peter van 't Hof committed
138
139
140
141
142
        db.createOrUpdateSetting(qscript.summaryRunId, pipelineId, None, None, None, ConfigUtils.mapToJson(q.summarySettings).nospaces)

        for ((sampleName, sample) <- q.samples) {
          // Sample level
          val sampleId = Await.result(db.getSampleId(qscript.summaryRunId, sampleName), Duration.Inf).getOrElse(throw new IllegalStateException("Sample should already exist in database"))
Peter van 't Hof's avatar
Peter van 't Hof committed
143
          for ((key, file) <- sample.summaryFiles.par)
Peter van 't Hof's avatar
Peter van 't Hof committed
144
            Await.result(WriteSummary.createFile(db, q.summaryRunId, pipelineId, None, Some(sampleId), None, key, file, outputDir), Duration.Inf)
Peter van 't Hof's avatar
Peter van 't Hof committed
145
146
147
148
149
          db.createOrUpdateSetting(qscript.summaryRunId, pipelineId, None, Some(sampleId), None, ConfigUtils.mapToJson(sample.summarySettings).nospaces)

          for ((libName, lib) <- sample.libraries) {
            // Library level
            val libId = Await.result(db.getLibraryId(qscript.summaryRunId, sampleId, libName), Duration.Inf).getOrElse(throw new IllegalStateException("Library should already exist in database"))
Peter van 't Hof's avatar
Peter van 't Hof committed
150
            for ((key, file) <- lib.summaryFiles.par)
Peter van 't Hof's avatar
Peter van 't Hof committed
151
              Await.result(WriteSummary.createFile(db, q.summaryRunId, pipelineId, None, Some(sampleId), Some(libId), key, file, outputDir), Duration.Inf)
Peter van 't Hof's avatar
Peter van 't Hof committed
152
153
154
155
            db.createOrUpdateSetting(qscript.summaryRunId, pipelineId, None, Some(sampleId), Some(libId), ConfigUtils.mapToJson(lib.summarySettings).nospaces)
          }
        }
      case q =>
Peter van 't Hof's avatar
Peter van 't Hof committed
156
        for ((key, file) <- q.summaryFiles.par)
Peter van 't Hof's avatar
Peter van 't Hof committed
157
          Await.result(WriteSummary.createFile(db, qscript.summaryRunId, pipelineId, None, None, None, key, file, outputDir), Duration.Inf)
Peter van 't Hof's avatar
Peter van 't Hof committed
158
159
        db.createOrUpdateSetting(qscript.summaryRunId, pipelineId, None, None, None, ConfigUtils.mapToJson(q.summarySettings).nospaces)
    }
160

161
162
    val pipeFunctions = (for (f <- qscript.functions) yield f match {
      case f: BiopetCommandLineFunction => f.pipesJobs
Peter van 't Hof's avatar
Peter van 't Hof committed
163
      case _                            => Nil
164
165
166
167
168
169
170
171
172
173
174
175
    }).flatten
    (for (f <- qscript.functions ++ pipeFunctions) yield {
      f match {
        case f: BiopetJavaCommandLineFunction with Version =>
          List(db.createOrUpdateExecutable(qscript.summaryRunId, f.configNamespace, f.getVersion, f.getJavaVersion,
            javaMd5 = BiopetCommandLineFunction.executableMd5Cache.get(f.executable), jarPath = Option(f.jarFile).map(_.getAbsolutePath)))
        case f: BiopetCommandLineFunction with Version =>
          List(db.createOrUpdateExecutable(qscript.summaryRunId, f.configNamespace, f.getVersion, Option(f.executable)))
        case f: Configurable with Version =>
          List(db.createOrUpdateExecutable(qscript.summaryRunId, f.configNamespace, f.getVersion))
        case _ => List()
      }
176
    }).flatten.foreach(Await.ready(_, Duration.Inf))
177
  }
178

179
180
  def prefixSampleLibrary(map: Map[String, Any], sampleId: Option[String], libraryId: Option[String]): Map[String, Any] = {
    sampleId match {
181
182
183
      case Some(s) => Map("samples" -> Map(s -> (libraryId match {
        case Some(l) => Map("libraries" -> Map(l -> map))
        case _       => map
184
185
186
187
188
      })))
      case _ => map
    }
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
189
  /** Convert summarizable to a summary map */
190
  def parseSummarizable(summarizable: Summarizable, name: String) = {
Peter van 't Hof's avatar
Peter van 't Hof committed
191
    val stats = summarizable.summaryStats
192
193
    val files = parseFiles(summarizable.summaryFiles)

194
    Map("stats" -> Map(name -> stats)) ++
195
      (if (files.isEmpty) Map[String, Any]() else Map("files" -> Map(name -> files)))
196
197
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
198
  /** Parse files map to summary map */
199
  def parseFiles(files: Map[String, File]): Map[String, Map[String, Any]] = {
200
201
202
    for ((key, file) <- files) yield key -> parseFile(file)
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
203
  /** parse single file summary map */
204
205
206
  def parseFile(file: File): Map[String, Any] = {
    val map: mutable.Map[String, Any] = mutable.Map()
    map += "path" -> file.getAbsolutePath
Peter van 't Hof's avatar
Peter van 't Hof committed
207
    if (md5sum) map += "md5" -> WriteSummary.parseChecksum(SummaryQScript.md5sumCache(file))
208
    map.toMap
209
  }
Peter van 't Hof's avatar
Peter van 't Hof committed
210
}
211

Peter van 't Hof's avatar
Peter van 't Hof committed
212
object WriteSummary {
Peter van 't Hof's avatar
Peter van 't Hof committed
213
  /** Retrive checksum from file */
Peter van 't Hof's avatar
Peter van 't Hof committed
214
  def parseChecksum(checksumFile: File): String = {
215
216
    Source.fromFile(checksumFile).getLines().toList.head.split(" ")(0)
  }
Peter van 't Hof's avatar
Peter van 't Hof committed
217
218
219
220
221
222
223
224
225
226

  def createFile(db: SummaryDb, runId: Int, pipelineId: Int, moduleId: Option[Int], sampleId: Option[Int], libId: Option[Int], key: String, file: File, outputDir: File) = {
    val path = if (file.getAbsolutePath.startsWith(outputDir.getAbsolutePath + File.separator)) {
      "." + file.getAbsolutePath.stripPrefix(s"${outputDir.getAbsolutePath}")
    } else file.getAbsolutePath
    val md5 = SummaryQScript.md5sumCache.get(file).map(WriteSummary.parseChecksum(_))
    val size = if (file.exists()) file.length() else 0L
    val link = if (file.exists()) java.nio.file.Files.isSymbolicLink(file.toPath) else false
    db.createOrUpdateFile(runId, pipelineId, moduleId, sampleId, libId, key, path, md5.getOrElse("checksum_not_found"), link, size)
  }
227
}