WriteSummary.scala 11 KB
Newer Older
bow's avatar
bow committed
1 2 3 4 5 6 7 8 9 10
/**
 * Biopet is built on top of GATK Queue for building bioinformatic
 * pipelines. It is mainly intended to support LUMC SHARK cluster which is running
 * SGE. But other types of HPC that are supported by GATK Queue (such as PBS)
 * should also be able to execute Biopet tools and pipelines.
 *
 * Copyright 2014 Sequencing Analysis Support Core - Leiden University Medical Center
 *
 * Contact us at: sasc@lumc.nl
 *
11
 * A dual licensing mode is applied. The source code within this project is freely available for non-commercial use under an AGPL
bow's avatar
bow committed
12 13 14
 * license; For commercial users or users who do not want to follow the AGPL
 * license, please contact us to obtain a separate license.
 */
15 16
package nl.lumc.sasc.biopet.core.summary

17
import java.io.File
18

Peter van 't Hof's avatar
Peter van 't Hof committed
19
import nl.lumc.sasc.biopet.utils.config.Configurable
20
import nl.lumc.sasc.biopet.core._
21
import nl.lumc.sasc.biopet.utils.ConfigUtils
22
import nl.lumc.sasc.biopet.utils.summary.db.{ SummaryDb, SummaryDbWrite }
Peter van 't Hof's avatar
Peter van 't Hof committed
23
import org.broadinstitute.gatk.queue.function.{ InProcessFunction, QFunction }
24
import org.broadinstitute.gatk.utils.commandline.Input
Peter van 't Hof's avatar
Peter van 't Hof committed
25 26 27

import scala.collection.mutable
import scala.io.Source
28 29
import scala.concurrent.Await
import scala.concurrent.duration.Duration
Peter van 't Hof's avatar
Peter van 't Hof committed
30
import scala.concurrent.ExecutionContext.Implicits.global
31

32
/**
33 34
 * This will collect and write the summary
 *
35 36
 * Created by pjvan_thof on 2/14/15.
 */
37
class WriteSummary(val parent: SummaryQScript) extends InProcessFunction with Configurable {
38 39
  this.analysisName = getClass.getSimpleName

Peter van 't Hof's avatar
Peter van 't Hof committed
40 41
  require(parent != null)

Peter van 't Hof's avatar
Peter van 't Hof committed
42
  /** To access qscript for this summary */
43
  val qscript = parent
44 45 46 47 48 49 50 51

  @Input(doc = "deps", required = false)
  var deps: List[File] = Nil

  var md5sum: Boolean = config("summary_md5", default = true)
  //TODO: add more checksums types

  override def freezeFieldValues(): Unit = {
Peter van 't Hof's avatar
Peter van 't Hof committed
52 53 54 55 56
    init()
    super.freezeFieldValues()
  }

  def init(): Unit = {
Peter van 't Hof's avatar
Peter van 't Hof committed
57 58 59 60
    qscript.summarizables.foreach(_._2.foreach { s =>
      if (!s.addToQscriptSummaryDone) s.addToQscriptSummary(qscript)
      s.addToQscriptSummaryDone = true
    })
Peter van 't Hof's avatar
Peter van 't Hof committed
61

62
    val db = SummaryDb.openSqliteSummary(qscript.summaryDbFile)
Peter van 't Hof's avatar
Peter van 't Hof committed
63
    if (qscript == root) { // This initialize the database
64 65
      qscript match {
        case s: MultiSampleQScript => s.initSummaryDb
Peter van 't Hof's avatar
Peter van 't Hof committed
66 67 68
        case t: SampleLibraryTag => t.sampleId.foreach {
          case sampleName =>
            val sampleId = Await.result(db.getSamples(name = Some(sampleName), runId = Some(qscript.summaryRunId)).map(_.headOption.map(_.id)), Duration.Inf).getOrElse {
69
              Await.result(db.createOrUpdateSample(sampleName, qscript.summaryRunId), Duration.Inf)
Peter van 't Hof's avatar
Peter van 't Hof committed
70 71 72
            }
            t.libId.foreach { libName =>
              val libId = Await.result(db.getSamples(name = Some(libName), runId = Some(qscript.summaryRunId), sampleId = Some(sampleId)).map(_.headOption.map(_.id)), Duration.Inf).getOrElse {
73
                Await.result(db.createOrUpdateLibrary(libName, qscript.summaryRunId, sampleId), Duration.Inf)
Peter van 't Hof's avatar
Peter van 't Hof committed
74
              }
75 76
            }
        }
Peter van 't Hof's avatar
Peter van 't Hof committed
77
        case _ => qscript.summaryRunId
78
      }
Peter van 't Hof's avatar
Peter van 't Hof committed
79
    }
80 81
    val pipelineId = Await.result(db.createPipeline(qscript.summaryName, qscript.summaryRunId), Duration.Inf)
    qscript.summarizables.map(x => Await.result(db.createModule(x._1._1, qscript.summaryRunId, pipelineId), Duration.Inf))
82

Peter van 't Hof's avatar
Peter van 't Hof committed
83 84 85 86 87 88
    for ((_, l) <- qscript.summarizables; s <- l) {
      deps :::= s.summaryDeps
      s match {
        case f: QFunction if qscript.functions.contains(f) => try {
          deps :+= f.firstOutput
        } catch {
Peter van 't Hof's avatar
typo  
Peter van 't Hof committed
89
          case e: NullPointerException => logger.debug("Queue values are not initialized")
Peter van 't Hof's avatar
Peter van 't Hof committed
90 91
        }
        case _ =>
92
      }
93
    }
Peter van 't Hof's avatar
Peter van 't Hof committed
94

95
    jobOutputFile = new File(qscript.outputDir, s".${qscript.summaryName}.summary.out")
96 97
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
98
  /** Function to create summary */
99
  def run(): Unit = {
Peter van 't Hof's avatar
Peter van 't Hof committed
100 101
    val db = SummaryDb.openSqliteSummary(qscript.summaryDbFile)

Peter van 't Hof's avatar
Peter van 't Hof committed
102 103
    val outputDir = new File(Await.result(db.getRuns(runId = Some(qscript.summaryRunId)).map(_.head.outputDir), Duration.Inf))

Peter van 't Hof's avatar
Peter van 't Hof committed
104 105
    val pipelineId = Await.result(db.getPipelines(name = Some(qscript.summaryName), runId = Some(qscript.summaryRunId)).map(_.head.id), Duration.Inf)

Peter van 't Hof's avatar
Peter van 't Hof committed
106
    for (((name, sampleName, libName), summarizables) <- qscript.summarizables.par) {
Peter van 't Hof's avatar
Peter van 't Hof committed
107 108 109 110
      require(summarizables.nonEmpty)
      val stats = ConfigUtils.anyToJson(if (summarizables.size == 1) summarizables.head.summaryStats
      else {
        val s = summarizables.map(_.summaryStats)
Peter van 't Hof's avatar
Peter van 't Hof committed
111
        s.tail.foldLeft(Map("stats" -> s.head))((a, b) =>
Peter van 't Hof's avatar
Peter van 't Hof committed
112 113
          ConfigUtils.mergeMaps(a, Map("stats" -> b), summarizables.head.resolveSummaryConflict))("stats")
      })
Peter van 't Hof's avatar
Peter van 't Hof committed
114 115 116 117 118 119 120 121 122
      val moduleId = Await.result(db.getModules(name = Some(name), runId = Some(qscript.summaryRunId), pipelineId = Some(pipelineId))
        .map(_.head.id), Duration.Inf)
      val sampleId = sampleName.map(name => Await.result(db.getSamples(runId = Some(qscript.summaryRunId), name = Some(name)).map(_.head.id), Duration.Inf))
      val libId = libName.map(name => Await.result(db.getLibraries(runId = Some(qscript.summaryRunId), name = Some(name),
        sampleId = sampleId).map(_.head.id), Duration.Inf))
      db.createOrUpdateStat(qscript.summaryRunId, pipelineId, Some(moduleId),
        sampleId, libId, stats.nospaces)

      for ((key, file) <- summarizables.head.summaryFiles.par)
Peter van 't Hof's avatar
Peter van 't Hof committed
123
        Await.result(WriteSummary.createFile(db, qscript.summaryRunId, pipelineId, Some(moduleId), sampleId, libId, key, file, outputDir), Duration.Inf)
Peter van 't Hof's avatar
Peter van 't Hof committed
124
    }
Peter van 't Hof's avatar
Peter van 't Hof committed
125

Peter van 't Hof's avatar
Peter van 't Hof committed
126 127 128 129
    qscript match {
      case tag: SampleLibraryTag =>
        val sampleId = tag.sampleId.flatMap(name => Await.result(db.getSampleId(qscript.summaryRunId, name), Duration.Inf))
        val libId = tag.libId.flatMap(name => sampleId.flatMap(sampleId => Await.result(db.getLibraryId(qscript.summaryRunId, sampleId, name), Duration.Inf)))
Peter van 't Hof's avatar
Peter van 't Hof committed
130
        for ((key, file) <- qscript.summaryFiles.par)
Peter van 't Hof's avatar
Peter van 't Hof committed
131
          Await.result(WriteSummary.createFile(db, qscript.summaryRunId, pipelineId, None, sampleId, libId, key, file, outputDir), Duration.Inf)
Peter van 't Hof's avatar
Peter van 't Hof committed
132 133 134
        db.createOrUpdateSetting(qscript.summaryRunId, pipelineId, None, sampleId, libId, ConfigUtils.mapToJson(tag.summarySettings).nospaces)
      case q: MultiSampleQScript =>
        // Global level
Peter van 't Hof's avatar
Peter van 't Hof committed
135
        for ((key, file) <- qscript.summaryFiles.par)
Peter van 't Hof's avatar
Peter van 't Hof committed
136
          Await.result(WriteSummary.createFile(db, q.summaryRunId, pipelineId, None, None, None, key, file, outputDir), Duration.Inf)
Peter van 't Hof's avatar
Peter van 't Hof committed
137 138 139 140 141
        db.createOrUpdateSetting(qscript.summaryRunId, pipelineId, None, None, None, ConfigUtils.mapToJson(q.summarySettings).nospaces)

        for ((sampleName, sample) <- q.samples) {
          // Sample level
          val sampleId = Await.result(db.getSampleId(qscript.summaryRunId, sampleName), Duration.Inf).getOrElse(throw new IllegalStateException("Sample should already exist in database"))
Peter van 't Hof's avatar
Peter van 't Hof committed
142
          for ((key, file) <- sample.summaryFiles.par)
Peter van 't Hof's avatar
Peter van 't Hof committed
143
            Await.result(WriteSummary.createFile(db, q.summaryRunId, pipelineId, None, Some(sampleId), None, key, file, outputDir), Duration.Inf)
Peter van 't Hof's avatar
Peter van 't Hof committed
144 145 146 147 148
          db.createOrUpdateSetting(qscript.summaryRunId, pipelineId, None, Some(sampleId), None, ConfigUtils.mapToJson(sample.summarySettings).nospaces)

          for ((libName, lib) <- sample.libraries) {
            // Library level
            val libId = Await.result(db.getLibraryId(qscript.summaryRunId, sampleId, libName), Duration.Inf).getOrElse(throw new IllegalStateException("Library should already exist in database"))
Peter van 't Hof's avatar
Peter van 't Hof committed
149
            for ((key, file) <- lib.summaryFiles.par)
Peter van 't Hof's avatar
Peter van 't Hof committed
150
              Await.result(WriteSummary.createFile(db, q.summaryRunId, pipelineId, None, Some(sampleId), Some(libId), key, file, outputDir), Duration.Inf)
Peter van 't Hof's avatar
Peter van 't Hof committed
151 152 153 154
            db.createOrUpdateSetting(qscript.summaryRunId, pipelineId, None, Some(sampleId), Some(libId), ConfigUtils.mapToJson(lib.summarySettings).nospaces)
          }
        }
      case q =>
Peter van 't Hof's avatar
Peter van 't Hof committed
155
        for ((key, file) <- q.summaryFiles.par)
Peter van 't Hof's avatar
Peter van 't Hof committed
156
          Await.result(WriteSummary.createFile(db, qscript.summaryRunId, pipelineId, None, None, None, key, file, outputDir), Duration.Inf)
Peter van 't Hof's avatar
Peter van 't Hof committed
157 158
        db.createOrUpdateSetting(qscript.summaryRunId, pipelineId, None, None, None, ConfigUtils.mapToJson(q.summarySettings).nospaces)
    }
159

160 161
    val pipeFunctions = (for (f <- qscript.functions) yield f match {
      case f: BiopetCommandLineFunction => f.pipesJobs
Peter van 't Hof's avatar
Peter van 't Hof committed
162
      case _                            => Nil
163 164 165 166 167 168 169 170 171 172 173 174
    }).flatten
    (for (f <- qscript.functions ++ pipeFunctions) yield {
      f match {
        case f: BiopetJavaCommandLineFunction with Version =>
          List(db.createOrUpdateExecutable(qscript.summaryRunId, f.configNamespace, f.getVersion, f.getJavaVersion,
            javaMd5 = BiopetCommandLineFunction.executableMd5Cache.get(f.executable), jarPath = Option(f.jarFile).map(_.getAbsolutePath)))
        case f: BiopetCommandLineFunction with Version =>
          List(db.createOrUpdateExecutable(qscript.summaryRunId, f.configNamespace, f.getVersion, Option(f.executable)))
        case f: Configurable with Version =>
          List(db.createOrUpdateExecutable(qscript.summaryRunId, f.configNamespace, f.getVersion))
        case _ => List()
      }
175
    }).flatten.foreach(Await.ready(_, Duration.Inf))
176
  }
177

178 179
  def prefixSampleLibrary(map: Map[String, Any], sampleId: Option[String], libraryId: Option[String]): Map[String, Any] = {
    sampleId match {
180 181 182
      case Some(s) => Map("samples" -> Map(s -> (libraryId match {
        case Some(l) => Map("libraries" -> Map(l -> map))
        case _       => map
183 184 185 186 187
      })))
      case _ => map
    }
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
188
  /** Convert summarizable to a summary map */
189
  def parseSummarizable(summarizable: Summarizable, name: String) = {
Peter van 't Hof's avatar
Peter van 't Hof committed
190
    val stats = summarizable.summaryStats
191 192
    val files = parseFiles(summarizable.summaryFiles)

193
    Map("stats" -> Map(name -> stats)) ++
194
      (if (files.isEmpty) Map[String, Any]() else Map("files" -> Map(name -> files)))
195 196
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
197
  /** Parse files map to summary map */
198
  def parseFiles(files: Map[String, File]): Map[String, Map[String, Any]] = {
199 200 201
    for ((key, file) <- files) yield key -> parseFile(file)
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
202
  /** parse single file summary map */
203 204 205
  def parseFile(file: File): Map[String, Any] = {
    val map: mutable.Map[String, Any] = mutable.Map()
    map += "path" -> file.getAbsolutePath
Peter van 't Hof's avatar
Peter van 't Hof committed
206
    if (md5sum) map += "md5" -> WriteSummary.parseChecksum(SummaryQScript.md5sumCache(file))
207
    map.toMap
208
  }
Peter van 't Hof's avatar
Peter van 't Hof committed
209
}
210

Peter van 't Hof's avatar
Peter van 't Hof committed
211
object WriteSummary {
Peter van 't Hof's avatar
Peter van 't Hof committed
212
  /** Retrive checksum from file */
Peter van 't Hof's avatar
Peter van 't Hof committed
213
  def parseChecksum(checksumFile: File): String = {
214 215
    Source.fromFile(checksumFile).getLines().toList.head.split(" ")(0)
  }
Peter van 't Hof's avatar
Peter van 't Hof committed
216

217
  def createFile(db: SummaryDbWrite, runId: Int, pipelineId: Int, moduleId: Option[Int], sampleId: Option[Int], libId: Option[Int], key: String, file: File, outputDir: File) = {
Peter van 't Hof's avatar
Peter van 't Hof committed
218 219 220 221 222 223 224 225
    val path = if (file.getAbsolutePath.startsWith(outputDir.getAbsolutePath + File.separator)) {
      "." + file.getAbsolutePath.stripPrefix(s"${outputDir.getAbsolutePath}")
    } else file.getAbsolutePath
    val md5 = SummaryQScript.md5sumCache.get(file).map(WriteSummary.parseChecksum(_))
    val size = if (file.exists()) file.length() else 0L
    val link = if (file.exists()) java.nio.file.Files.isSymbolicLink(file.toPath) else false
    db.createOrUpdateFile(runId, pipelineId, moduleId, sampleId, libId, key, path, md5.getOrElse("checksum_not_found"), link, size)
  }
226
}