WriteSummary.scala 16.2 KB
Newer Older
bow's avatar
bow committed
1
/**
2
3
4
5
6
7
8
9
10
11
12
13
14
  * Biopet is built on top of GATK Queue for building bioinformatic
  * pipelines. It is mainly intended to support LUMC SHARK cluster which is running
  * SGE. But other types of HPC that are supported by GATK Queue (such as PBS)
  * should also be able to execute Biopet tools and pipelines.
  *
  * Copyright 2014 Sequencing Analysis Support Core - Leiden University Medical Center
  *
  * Contact us at: sasc@lumc.nl
  *
  * A dual licensing mode is applied. The source code within this project is freely available for non-commercial use under an AGPL
  * license; For commercial users or users who do not want to follow the AGPL
  * license, please contact us to obtain a separate license.
  */
15
16
package nl.lumc.sasc.biopet.core.summary

17
import java.io.File
18

Peter van 't Hof's avatar
Peter van 't Hof committed
19
import nl.lumc.sasc.biopet.utils.config.Configurable
20
import nl.lumc.sasc.biopet.core._
21
import nl.lumc.sasc.biopet.utils.ConfigUtils
22
23
import nl.lumc.sasc.biopet.utils.summary.db.{SummaryDb, SummaryDbWrite}
import org.broadinstitute.gatk.queue.function.{InProcessFunction, QFunction}
24
import org.broadinstitute.gatk.utils.commandline.Input
Peter van 't Hof's avatar
Peter van 't Hof committed
25
26
27

import scala.collection.mutable
import scala.io.Source
pjvan_thof's avatar
pjvan_thof committed
28
import scala.concurrent.{Await, Future}
29
import scala.concurrent.duration.Duration
Peter van 't Hof's avatar
Peter van 't Hof committed
30
import scala.concurrent.ExecutionContext.Implicits.global
31

32
/**
33
34
35
36
  * This will collect and write the summary
  *
  * Created by pjvan_thof on 2/14/15.
  */
37
class WriteSummary(val parent: SummaryQScript) extends InProcessFunction with Configurable {
38
39
  this.analysisName = getClass.getSimpleName

Peter van 't Hof's avatar
Peter van 't Hof committed
40
41
  require(parent != null)

Peter van 't Hof's avatar
Peter van 't Hof committed
42
  /** To access qscript for this summary */
pjvan_thof's avatar
pjvan_thof committed
43
  val qscript: SummaryQScript = parent
44
45
46
47
48
49
50
51

  @Input(doc = "deps", required = false)
  var deps: List[File] = Nil

  var md5sum: Boolean = config("summary_md5", default = true)
  //TODO: add more checksums types

  override def freezeFieldValues(): Unit = {
Peter van 't Hof's avatar
Peter van 't Hof committed
52
53
54
55
56
    init()
    super.freezeFieldValues()
  }

  def init(): Unit = {
Peter van 't Hof's avatar
Peter van 't Hof committed
57
58
59
60
    qscript.summarizables.foreach(_._2.foreach { s =>
      if (!s.addToQscriptSummaryDone) s.addToQscriptSummary(qscript)
      s.addToQscriptSummaryDone = true
    })
Peter van 't Hof's avatar
Peter van 't Hof committed
61

62
    val db = SummaryDb.openSqliteSummary(qscript.summaryDbFile)
Peter van 't Hof's avatar
Peter van 't Hof committed
63
    if (qscript == root) { // This initialize the database
64
      qscript match {
65
        case s: MultiSampleQScript => s.initSummaryDb()
66
        case t: SampleLibraryTag =>
pjvan_thof's avatar
pjvan_thof committed
67
68
69
70
71
72
73
74
75
76
77
78
79
80
          t.sampleId.foreach { sampleName =>
            val sampleId = Await
              .result(db.getSamples(name = Some(sampleName), runId = Some(qscript.summaryRunId))
                        .map(_.headOption.map(_.id)),
                      Duration.Inf)
              .getOrElse {
                Await.result(db.createOrUpdateSample(sampleName, qscript.summaryRunId),
                             Duration.Inf)
              }
            t.libId.foreach { libName =>
              Await
                .result(db.getSamples(name = Some(libName),
                                      runId = Some(qscript.summaryRunId),
                                      sampleId = Some(sampleId))
81
82
83
                          .map(_.headOption.map(_.id)),
                        Duration.Inf)
                .getOrElse {
pjvan_thof's avatar
pjvan_thof committed
84
                  Await.result(db.createOrUpdateLibrary(libName, qscript.summaryRunId, sampleId),
85
86
                               Duration.Inf)
                }
pjvan_thof's avatar
pjvan_thof committed
87
            }
88
          }
Peter van 't Hof's avatar
Peter van 't Hof committed
89
        case _ => qscript.summaryRunId
90
      }
Peter van 't Hof's avatar
Peter van 't Hof committed
91
    }
92
93
94
95
    val pipelineId =
      Await.result(db.createPipeline(qscript.summaryName, qscript.summaryRunId), Duration.Inf)
    qscript.summarizables.map(x =>
      Await.result(db.createModule(x._1._1, qscript.summaryRunId, pipelineId), Duration.Inf))
96

Peter van 't Hof's avatar
Peter van 't Hof committed
97
98
99
    for ((_, l) <- qscript.summarizables; s <- l) {
      deps :::= s.summaryDeps
      s match {
100
101
102
103
        case f: QFunction if qscript.functions.contains(f) =>
          try {
            deps :+= f.firstOutput
          } catch {
pjvan_thof's avatar
pjvan_thof committed
104
            case _: NullPointerException => logger.debug("Queue values are not initialized")
105
          }
Peter van 't Hof's avatar
Peter van 't Hof committed
106
        case _ =>
107
      }
108
    }
Peter van 't Hof's avatar
Peter van 't Hof committed
109

110
    jobOutputFile = new File(qscript.outputDir, s".${qscript.summaryName}.summary.out")
111
112
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
113
  /** Function to create summary */
114
  def run(): Unit = {
Peter van 't Hof's avatar
Peter van 't Hof committed
115
116
    val db = SummaryDb.openSqliteSummary(qscript.summaryDbFile)

117
118
119
    val outputDir = new File(
      Await.result(db.getRuns(runId = Some(qscript.summaryRunId)).map(_.head.outputDir),
                   Duration.Inf))
Peter van 't Hof's avatar
Peter van 't Hof committed
120

121
122
123
124
    val pipelineId = Await.result(
      db.getPipelines(name = Some(qscript.summaryName), runId = Some(qscript.summaryRunId))
        .map(_.head.id),
      Duration.Inf)
Peter van 't Hof's avatar
Peter van 't Hof committed
125

Peter van 't Hof's avatar
Peter van 't Hof committed
126
    for (((name, sampleName, libName), summarizables) <- qscript.summarizables.par) {
Peter van 't Hof's avatar
Peter van 't Hof committed
127
      require(summarizables.nonEmpty)
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
      val stats = ConfigUtils.anyToJson(
        if (summarizables.size == 1) summarizables.head.summaryStats
        else {
          val s = summarizables.map(_.summaryStats)
          s.tail.foldLeft(Map("stats" -> s.head))((a, b) =>
            ConfigUtils
              .mergeMaps(a, Map("stats" -> b), summarizables.head.resolveSummaryConflict))("stats")
        })
      val moduleId = Await.result(db.getModules(name = Some(name),
                                                runId = Some(qscript.summaryRunId),
                                                pipelineId = Some(pipelineId))
                                    .map(_.head.id),
                                  Duration.Inf)
      val sampleId = sampleName.map(
        name =>
          Await.result(
            db.getSamples(runId = Some(qscript.summaryRunId), name = Some(name)).map(_.head.id),
            Duration.Inf))
      val libId = libName.map(
        name =>
          Await.result(db.getLibraries(runId = Some(qscript.summaryRunId),
                                       name = Some(name),
                                       sampleId = sampleId)
                         .map(_.head.id),
                       Duration.Inf))
      db.createOrUpdateStat(qscript.summaryRunId,
                            pipelineId,
                            Some(moduleId),
                            sampleId,
                            libId,
                            stats.nospaces)
Peter van 't Hof's avatar
Peter van 't Hof committed
159
160

      for ((key, file) <- summarizables.head.summaryFiles.par)
161
162
163
164
165
166
167
168
169
170
        Await.result(WriteSummary.createFile(db,
                                             qscript.summaryRunId,
                                             pipelineId,
                                             Some(moduleId),
                                             sampleId,
                                             libId,
                                             key,
                                             file,
                                             outputDir),
                     Duration.Inf)
Peter van 't Hof's avatar
Peter van 't Hof committed
171
    }
Peter van 't Hof's avatar
Peter van 't Hof committed
172

Peter van 't Hof's avatar
Peter van 't Hof committed
173
174
    qscript match {
      case tag: SampleLibraryTag =>
175
176
177
178
179
        val sampleId = tag.sampleId.flatMap(name =>
          Await.result(db.getSampleId(qscript.summaryRunId, name), Duration.Inf))
        val libId = tag.libId.flatMap(name =>
          sampleId.flatMap(sampleId =>
            Await.result(db.getLibraryId(qscript.summaryRunId, sampleId, name), Duration.Inf)))
pjvan_thof's avatar
pjvan_thof committed
180
181
182
183
184
        if (tag.sampleId.isDefined)
          require(sampleId.isDefined, s"Sample '${tag.sampleId.get}' is not found in database yet")
        if (tag.libId.isDefined)
          require(libId.isDefined,
                  s"Library '${tag.libId.get}' for '${tag.sampleId}' is not found in database yet")
Peter van 't Hof's avatar
Peter van 't Hof committed
185
        for ((key, file) <- qscript.summaryFiles.par)
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
          Await.result(WriteSummary.createFile(db,
                                               qscript.summaryRunId,
                                               pipelineId,
                                               None,
                                               sampleId,
                                               libId,
                                               key,
                                               file,
                                               outputDir),
                       Duration.Inf)
        db.createOrUpdateSetting(qscript.summaryRunId,
                                 pipelineId,
                                 None,
                                 sampleId,
                                 libId,
                                 ConfigUtils.mapToJson(tag.summarySettings).nospaces)
Peter van 't Hof's avatar
Peter van 't Hof committed
202
203
      case q: MultiSampleQScript =>
        // Global level
Peter van 't Hof's avatar
Peter van 't Hof committed
204
        for ((key, file) <- qscript.summaryFiles.par)
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
          Await.result(WriteSummary.createFile(db,
                                               q.summaryRunId,
                                               pipelineId,
                                               None,
                                               None,
                                               None,
                                               key,
                                               file,
                                               outputDir),
                       Duration.Inf)
        db.createOrUpdateSetting(qscript.summaryRunId,
                                 pipelineId,
                                 None,
                                 None,
                                 None,
                                 ConfigUtils.mapToJson(q.summarySettings).nospaces)
Peter van 't Hof's avatar
Peter van 't Hof committed
221
222
223

        for ((sampleName, sample) <- q.samples) {
          // Sample level
224
225
226
          val sampleId = Await
            .result(db.getSampleId(qscript.summaryRunId, sampleName), Duration.Inf)
            .getOrElse(throw new IllegalStateException("Sample should already exist in database"))
Peter van 't Hof's avatar
Peter van 't Hof committed
227
          for ((key, file) <- sample.summaryFiles.par)
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
            Await.result(WriteSummary.createFile(db,
                                                 q.summaryRunId,
                                                 pipelineId,
                                                 None,
                                                 Some(sampleId),
                                                 None,
                                                 key,
                                                 file,
                                                 outputDir),
                         Duration.Inf)
          db.createOrUpdateSetting(qscript.summaryRunId,
                                   pipelineId,
                                   None,
                                   Some(sampleId),
                                   None,
                                   ConfigUtils.mapToJson(sample.summarySettings).nospaces)
Peter van 't Hof's avatar
Peter van 't Hof committed
244
245
246

          for ((libName, lib) <- sample.libraries) {
            // Library level
247
248
249
250
            val libId = Await
              .result(db.getLibraryId(qscript.summaryRunId, sampleId, libName), Duration.Inf)
              .getOrElse(
                throw new IllegalStateException("Library should already exist in database"))
Peter van 't Hof's avatar
Peter van 't Hof committed
251
            for ((key, file) <- lib.summaryFiles.par)
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
              Await.result(WriteSummary.createFile(db,
                                                   q.summaryRunId,
                                                   pipelineId,
                                                   None,
                                                   Some(sampleId),
                                                   Some(libId),
                                                   key,
                                                   file,
                                                   outputDir),
                           Duration.Inf)
            db.createOrUpdateSetting(qscript.summaryRunId,
                                     pipelineId,
                                     None,
                                     Some(sampleId),
                                     Some(libId),
                                     ConfigUtils.mapToJson(lib.summarySettings).nospaces)
Peter van 't Hof's avatar
Peter van 't Hof committed
268
269
270
          }
        }
      case q =>
Peter van 't Hof's avatar
Peter van 't Hof committed
271
        for ((key, file) <- q.summaryFiles.par)
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
          Await.result(WriteSummary.createFile(db,
                                               qscript.summaryRunId,
                                               pipelineId,
                                               None,
                                               None,
                                               None,
                                               key,
                                               file,
                                               outputDir),
                       Duration.Inf)
        db.createOrUpdateSetting(qscript.summaryRunId,
                                 pipelineId,
                                 None,
                                 None,
                                 None,
                                 ConfigUtils.mapToJson(q.summarySettings).nospaces)
Peter van 't Hof's avatar
Peter van 't Hof committed
288
    }
289

290
291
292
293
294
295
    val pipeFunctions = (for (f <- qscript.functions)
      yield
        f match {
          case f: BiopetCommandLineFunction => f.pipesJobs
          case _ => Nil
        }).flatten
296
297
298
    (for (f <- qscript.functions ++ pipeFunctions) yield {
      f match {
        case f: BiopetJavaCommandLineFunction with Version =>
299
300
301
302
303
304
305
306
307
          List(
            db.createOrUpdateExecutable(
              qscript.summaryRunId,
              f.configNamespace,
              f.getVersion,
              f.getJavaVersion,
              javaMd5 = BiopetCommandLineFunction.executableMd5Cache.get(f.executable),
              jarPath = Option(f.jarFile).map(_.getAbsolutePath)
            ))
308
        case f: BiopetCommandLineFunction with Version =>
309
310
311
312
313
          List(
            db.createOrUpdateExecutable(qscript.summaryRunId,
                                        f.configNamespace,
                                        f.getVersion,
                                        Option(f.executable)))
314
315
316
317
        case f: Configurable with Version =>
          List(db.createOrUpdateExecutable(qscript.summaryRunId, f.configNamespace, f.getVersion))
        case _ => List()
      }
318
    }).flatten.foreach(Await.ready(_, Duration.Inf))
319
  }
320

321
322
323
  def prefixSampleLibrary(map: Map[String, Any],
                          sampleId: Option[String],
                          libraryId: Option[String]): Map[String, Any] = {
324
    sampleId match {
325
326
327
328
329
      case Some(s) =>
        Map("samples" -> Map(s -> (libraryId match {
          case Some(l) => Map("libraries" -> Map(l -> map))
          case _ => map
        })))
330
331
332
333
      case _ => map
    }
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
334
  /** Convert summarizable to a summary map */
pjvan_thof's avatar
pjvan_thof committed
335
  def parseSummarizable(summarizable: Summarizable, name: String): Map[String, Any] = {
Peter van 't Hof's avatar
Peter van 't Hof committed
336
    val stats = summarizable.summaryStats
337
338
    val files = parseFiles(summarizable.summaryFiles)

339
    Map("stats" -> Map(name -> stats)) ++
340
      (if (files.isEmpty) Map[String, Any]() else Map("files" -> Map(name -> files)))
341
342
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
343
  /** Parse files map to summary map */
344
  def parseFiles(files: Map[String, File]): Map[String, Map[String, Any]] = {
345
346
347
    for ((key, file) <- files) yield key -> parseFile(file)
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
348
  /** parse single file summary map */
349
350
351
  def parseFile(file: File): Map[String, Any] = {
    val map: mutable.Map[String, Any] = mutable.Map()
    map += "path" -> file.getAbsolutePath
Peter van 't Hof's avatar
Peter van 't Hof committed
352
    if (md5sum) map += "md5" -> WriteSummary.parseChecksum(SummaryQScript.md5sumCache(file))
353
    map.toMap
354
  }
Peter van 't Hof's avatar
Peter van 't Hof committed
355
}
356

Peter van 't Hof's avatar
Peter van 't Hof committed
357
object WriteSummary {
358

Peter van 't Hof's avatar
Peter van 't Hof committed
359
  /** Retrive checksum from file */
Peter van 't Hof's avatar
Peter van 't Hof committed
360
  def parseChecksum(checksumFile: File): String = {
pjvan_thof's avatar
pjvan_thof committed
361
362
363
364
    val reader = Source.fromFile(checksumFile)
    val lines = reader.getLines().toList
    reader.close()
    lines.head.split(" ")(0)
365
  }
Peter van 't Hof's avatar
Peter van 't Hof committed
366

367
368
369
370
371
372
373
374
  def createFile(db: SummaryDbWrite,
                 runId: Int,
                 pipelineId: Int,
                 moduleId: Option[Int],
                 sampleId: Option[Int],
                 libId: Option[Int],
                 key: String,
                 file: File,
pjvan_thof's avatar
pjvan_thof committed
375
                 outputDir: File): Future[Int] = {
Peter van 't Hof's avatar
Peter van 't Hof committed
376
377
378
    val path = if (file.getAbsolutePath.startsWith(outputDir.getAbsolutePath + File.separator)) {
      "." + file.getAbsolutePath.stripPrefix(s"${outputDir.getAbsolutePath}")
    } else file.getAbsolutePath
pjvan_thof's avatar
pjvan_thof committed
379
    val md5 = SummaryQScript.md5sumCache.get(file).map(WriteSummary.parseChecksum)
Peter van 't Hof's avatar
Peter van 't Hof committed
380
381
    val size = if (file.exists()) file.length() else 0L
    val link = if (file.exists()) java.nio.file.Files.isSymbolicLink(file.toPath) else false
382
383
384
385
386
387
388
389
390
391
    db.createOrUpdateFile(runId,
                          pipelineId,
                          moduleId,
                          sampleId,
                          libId,
                          key,
                          path,
                          md5.getOrElse("checksum_not_found"),
                          link,
                          size)
Peter van 't Hof's avatar
Peter van 't Hof committed
392
  }
393
}