Commit eb3a77c1 authored by Peter van 't Hof's avatar Peter van 't Hof

Added files

parent d3723ba8
......@@ -14,15 +14,15 @@
*/
package nl.lumc.sasc.biopet.core.summary
import java.io.{ File, PrintWriter }
import java.io.{File, PrintWriter}
import nl.lumc.sasc.biopet.utils.config.Configurable
import nl.lumc.sasc.biopet.core._
import nl.lumc.sasc.biopet.utils.ConfigUtils
import nl.lumc.sasc.biopet.LastCommitHash
import nl.lumc.sasc.biopet.utils.summary.SummaryDb
import org.broadinstitute.gatk.queue.function.{ InProcessFunction, QFunction }
import org.broadinstitute.gatk.utils.commandline.{ Input, Output }
import org.broadinstitute.gatk.queue.function.{InProcessFunction, QFunction}
import org.broadinstitute.gatk.utils.commandline.{Input, Output}
import scala.collection.mutable
import scala.io.Source
......@@ -93,13 +93,23 @@ class WriteSummary(val parent: SummaryQScript) extends InProcessFunction with Co
jobOutputFile = new File(qscript.summaryDbFile.getParentFile, "." + qscript.summaryDbFile.getName.stripSuffix(".db") + ".out")
}
def createFile(db: SummaryDb, runId: Int, pipelineId: Int, moduleId: Option[Int], sampleId: Option[Int], libId: Option[Int], key: String, file: File, outputDir: File) = {
val path = file.getAbsolutePath.replace(s"^${outputDir.getAbsolutePath}", ".")
val md5 = WriteSummary.parseChecksum(SummaryQScript.md5sumCache(file))
val size = if (file.exists()) file.length() else 0L
val link = if (file.exists()) java.nio.file.Files.isSymbolicLink(file.toPath) else false
db.createOrUpdateFile(qscript.summaryRunId, pipelineId, moduleId, sampleId, libId, key, path, md5, link, size)
}
/** Function to create summary */
def run(): Unit = {
val db = SummaryDb.openSqliteSummary(qscript.summaryDbFile)
val outputDir = new File(Await.result(db.getRuns(runId = Some(qscript.summaryRunId)).map(_.head.outputDir), Duration.Inf))
val pipelineId = Await.result(db.getPipelines(name = Some(qscript.summaryName), runId = Some(qscript.summaryRunId)).map(_.head.id), Duration.Inf)
for (((name, sampleName, libName), summarizables) <- qscript.summarizables) {
for (((name, sampleName, libName), summarizables) <- qscript.summarizables.par) {
require(summarizables.nonEmpty)
val stats = ConfigUtils.anyToJson(if (summarizables.size == 1) summarizables.head.summaryStats
else {
......@@ -107,15 +117,16 @@ class WriteSummary(val parent: SummaryQScript) extends InProcessFunction with Co
s.tail.foldLeft(Map("stats" -> s.head))((a, b) =>
ConfigUtils.mergeMaps(a, Map("stats" -> b), summarizables.head.resolveSummaryConflict))("stats")
})
val moduleId = db.getModules(name = Some(name), runId = Some(qscript.summaryRunId), pipelineId = Some(pipelineId))
.map(_.head.id)
val sampleId = sampleName.map(name => db.getSamples(runId = Some(qscript.summaryRunId), name = Some(name)).map(_.head.id))
val libId = libName.map(name => db.getLibraries(runId = Some(qscript.summaryRunId), name = Some(name),
sampleId = sampleId.map(Await.result(_, Duration.Inf))).map(_.head.id))
db.createOrUpdateStat(qscript.summaryRunId, pipelineId, Some(Await.result(moduleId, Duration.Inf)),
sampleId.map(Await.result(_, Duration.Inf)), libId.map(Await.result(_, Duration.Inf)), stats.nospaces)
//TODO: Add Files
val moduleId = Await.result(db.getModules(name = Some(name), runId = Some(qscript.summaryRunId), pipelineId = Some(pipelineId))
.map(_.head.id), Duration.Inf)
val sampleId = sampleName.map(name => Await.result(db.getSamples(runId = Some(qscript.summaryRunId), name = Some(name)).map(_.head.id), Duration.Inf))
val libId = libName.map(name => Await.result(db.getLibraries(runId = Some(qscript.summaryRunId), name = Some(name),
sampleId = sampleId).map(_.head.id), Duration.Inf))
db.createOrUpdateStat(qscript.summaryRunId, pipelineId, Some(moduleId),
sampleId, libId, stats.nospaces)
for ((key, file) <- summarizables.head.summaryFiles.par)
Await.result(createFile(db, qscript.summaryRunId, pipelineId, Some(moduleId), sampleId, libId, key, file, outputDir), Duration.Inf)
}
//TODO: Add executables
......@@ -124,28 +135,33 @@ class WriteSummary(val parent: SummaryQScript) extends InProcessFunction with Co
case tag: SampleLibraryTag =>
val sampleId = tag.sampleId.flatMap(name => Await.result(db.getSampleId(qscript.summaryRunId, name), Duration.Inf))
val libId = tag.libId.flatMap(name => sampleId.flatMap(sampleId => Await.result(db.getLibraryId(qscript.summaryRunId, sampleId, name), Duration.Inf)))
//TODO: Add files
for ((key, file) <- qscript.summaryFiles.par)
Await.result(createFile(db, qscript.summaryRunId, pipelineId, None, sampleId, libId, key, file, outputDir), Duration.Inf)
db.createOrUpdateSetting(qscript.summaryRunId, pipelineId, None, sampleId, libId, ConfigUtils.mapToJson(tag.summarySettings).nospaces)
case q: MultiSampleQScript =>
// Global level
//TODO: Add files
for ((key, file) <- qscript.summaryFiles.par)
Await.result(createFile(db, q.summaryRunId, pipelineId, None, None, None, key, file, outputDir), Duration.Inf)
db.createOrUpdateSetting(qscript.summaryRunId, pipelineId, None, None, None, ConfigUtils.mapToJson(q.summarySettings).nospaces)
for ((sampleName, sample) <- q.samples) {
// Sample level
val sampleId = Await.result(db.getSampleId(qscript.summaryRunId, sampleName), Duration.Inf).getOrElse(throw new IllegalStateException("Sample should already exist in database"))
//TODO: Add files
for ((key, file) <- sample.summaryFiles.par)
Await.result(createFile(db, q.summaryRunId, pipelineId, Some(sampleId), None, None, key, file, outputDir), Duration.Inf)
db.createOrUpdateSetting(qscript.summaryRunId, pipelineId, None, Some(sampleId), None, ConfigUtils.mapToJson(sample.summarySettings).nospaces)
for ((libName, lib) <- sample.libraries) {
// Library level
val libId = Await.result(db.getLibraryId(qscript.summaryRunId, sampleId, libName), Duration.Inf).getOrElse(throw new IllegalStateException("Library should already exist in database"))
//TODO: Add files
for ((key, file) <- lib.summaryFiles.par)
Await.result(createFile(db, q.summaryRunId, pipelineId, Some(sampleId), Some(libId), None, key, file, outputDir), Duration.Inf)
db.createOrUpdateSetting(qscript.summaryRunId, pipelineId, None, Some(sampleId), Some(libId), ConfigUtils.mapToJson(lib.summarySettings).nospaces)
}
}
case q =>
//TODO: Add files
for ((key, file) <- q.summaryFiles.par)
Await.result(createFile(db, qscript.summaryRunId, pipelineId, None, None, None, key, file, outputDir), Duration.Inf)
db.createOrUpdateSetting(qscript.summaryRunId, pipelineId, None, None, None, ConfigUtils.mapToJson(q.summarySettings).nospaces)
}
......
......@@ -237,6 +237,41 @@ class SummaryDb(db: Database) extends Closeable {
val q = l.foldLeft(settings.subquery)((a, b) => b(a))
db.run(q.map(_.content).result).map(_.headOption.map(ConfigUtils.jsonTextToMap))
}
def filesFilter(runId: Option[Int] = None, pipelineId: Option[Int] = None, moduleId: Option[Option[Int]],
sampleId: Option[Option[Int]] = None, libId: Option[Option[Int]] = None,
key: Option[String] = None) = {
val l: List[Option[Query[Files, Files#TableElementType, Seq] => Query[Files, Files#TableElementType, Seq]]] = List(
runId.map(x => y => y.filter(_.runId === x)),
pipelineId.map(x => y => y.filter(_.pipelineId === x)),
moduleId.map(x => y => (if (x.isDefined) y.filter(_.moduleId === x) else y.filter(_.moduleId.isEmpty))),
sampleId.map(x => y => (if (x.isDefined) y.filter(_.sampleId === x) else y.filter(_.sampleId.isEmpty))),
libId.map(x => y => (if (x.isDefined) y.filter(_.libraryId === x) else y.filter(_.libraryId.isEmpty))),
key.map(x => y => y.filter(_.key === x))
)
l.flatten.foldLeft(files.subquery)((a, b) => b(a))
}
def getFiles(runId: Option[Int] = None, pipelineId: Option[Int] = None, moduleId: Option[Option[Int]],
sampleId: Option[Option[Int]] = None, libId: Option[Option[Int]] = None,
key: Option[String] = None) = {
db.run(filesFilter(runId, pipelineId, moduleId, sampleId, libId, key).result)
}
def createFile(runId: Int, pipelineId: Int, moduleId: Option[Int] = None,
sampleId: Option[Int] = None, libId: Option[Int] = None,
key:String, path: String, md5: String, link: Boolean = false, size: Long) = {
db.run(files.forceInsert(File(runId, pipelineId, moduleId, sampleId, libId, key, path, md5, link, size)))
}
def createOrUpdateFile(runId: Int, pipelineId: Int, moduleId: Option[Int] = None,
sampleId: Option[Int] = None, libId: Option[Int] = None,
key:String, path: String, md5: String, link: Boolean = false, size: Long) = {
val filter = filesFilter(Some(runId), Some(pipelineId), Some(moduleId), Some(sampleId), Some(libId), Some(key))
val r = Await.result(db.run(filter.size.result), Duration.Inf)
if (r == 0) createFile(runId, pipelineId, moduleId, sampleId, libId, key, path, md5, link, size)
else db.run(filter.update(File(runId, pipelineId, moduleId, sampleId, libId, key, path, md5, link, size)))
}
}
object SummaryDb {
......
......@@ -107,7 +107,7 @@ object Schema {
def sampleId = column[Option[Int]]("sampleId")
def libraryId = column[Option[Int]]("libraryId")
def key = column[String]("key")
def path = column[String]("path")
def path = column[String]("path") // This should be relative to the outputDir
def md5 = column[String]("md5")
def link = column[Boolean]("link", O.Default(false))
def size = column[Long]("size")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment