Commit bf7a1163 authored by Peter van 't Hof's avatar Peter van 't Hof

Split read and write methods to another class, multi threaded reading enabled for report

parent 0d9298f4
......@@ -164,7 +164,7 @@ trait ReportBuilder extends ToolCommand {
require(cmdArgs.outputDir.exists(), "Output dir does not exist")
require(cmdArgs.outputDir.isDirectory, "Output dir is not a directory")
setSummary = SummaryDb.openSqliteSummary(cmdArgs.summaryDbFile)
setSummary = SummaryDb.openReadOnlySqliteSummary(cmdArgs.summaryDbFile)
setRunId = cmdArgs.runId
cmdArgs.pageArgs.get("sampleId") match {
......
......@@ -14,15 +14,14 @@
*/
package nl.lumc.sasc.biopet.core.summary
import java.io.{ File, PrintWriter }
import java.io.File
import nl.lumc.sasc.biopet.utils.config.Configurable
import nl.lumc.sasc.biopet.core._
import nl.lumc.sasc.biopet.utils.ConfigUtils
import nl.lumc.sasc.biopet.LastCommitHash
import nl.lumc.sasc.biopet.utils.summary.db.SummaryDb
import nl.lumc.sasc.biopet.utils.summary.db.{ SummaryDb, SummaryDbWrite }
import org.broadinstitute.gatk.queue.function.{ InProcessFunction, QFunction }
import org.broadinstitute.gatk.utils.commandline.{ Input, Output }
import org.broadinstitute.gatk.utils.commandline.Input
import scala.collection.mutable
import scala.io.Source
......@@ -215,7 +214,7 @@ object WriteSummary {
Source.fromFile(checksumFile).getLines().toList.head.split(" ")(0)
}
def createFile(db: SummaryDb, runId: Int, pipelineId: Int, moduleId: Option[Int], sampleId: Option[Int], libId: Option[Int], key: String, file: File, outputDir: File) = {
def createFile(db: SummaryDbWrite, runId: Int, pipelineId: Int, moduleId: Option[Int], sampleId: Option[Int], libId: Option[Int], key: String, file: File, outputDir: File) = {
val path = if (file.getAbsolutePath.startsWith(outputDir.getAbsolutePath + File.separator)) {
"." + file.getAbsolutePath.stripPrefix(s"${outputDir.getAbsolutePath}")
} else file.getAbsolutePath
......
......@@ -10,6 +10,9 @@ import scala.concurrent.{ Await, Future }
import java.io.{ Closeable, File }
import java.sql.Date
import nl.lumc.sasc.biopet.utils.summary.db.SummaryDb._
import nl.lumc.sasc.biopet.utils.summary.db.SummaryDb.Implicts._
import scala.language.implicitConversions
/**
......@@ -17,34 +20,12 @@ import scala.language.implicitConversions
*
* Created by pjvanthof on 05/02/2017.
*/
class SummaryDb(val db: Database) extends Closeable {
trait SummaryDb extends Closeable {
import nl.lumc.sasc.biopet.utils.summary.db.SummaryDb._
import nl.lumc.sasc.biopet.utils.summary.db.SummaryDb.Implicts._
def db: Database
def close(): Unit = db.close()
/** This method will create all tables */
def createTables(): Unit = {
try {
val setup = DBIO.seq(
(runs.schema ++ samples.schema ++
libraries.schema ++ pipelines.schema ++
modules.schema ++ stats.schema ++ settings.schema ++
files.schema ++ executables.schema).create
)
val setupFuture = db.run(setup)
Await.result(setupFuture, Duration.Inf)
}
}
/** This method will create a new run and return the runId */
def createRun(runName: String, outputDir: String, version: String, commitHash: String,
creationDate: Date): Future[Int] = {
val id = Await.result(db.run(runs.size.result), Duration.Inf)
db.run(runs.forceInsert(Run(id, runName, outputDir, version, commitHash, creationDate))).map(_ => id)
}
/** This will return all runs that match the critiria given */
def getRuns(runId: Option[Int] = None, runName: Option[String] = None, outputDir: Option[String] = None): Future[Seq[Run]] = {
val q = runs.filter { run =>
......@@ -57,21 +38,6 @@ class SummaryDb(val db: Database) extends Closeable {
db.run(q.result)
}
/** This creates a new sample and return the sampleId */
def createSample(name: String, runId: Int, tags: Option[String] = None): Future[Int] = {
val id = Await.result(db.run(samples.size.result), Duration.Inf)
db.run(samples.forceInsert(Sample(id, name, runId, tags))).map(_ => id)
}
def createOrUpdateSample(name: String, runId: Int, tags: Option[String] = None): Future[Int] = {
getSampleId(runId, name).flatMap(_ match {
case Some(id: Int) =>
db.run(samples.filter(_.name === name).filter(_.id === id).map(_.tags).update(tags))
.map(_ => id)
case _ => createSample(name, runId, tags)
})
}
/** This will return all samples that match given criteria */
def getSamples(sampleId: Option[Int] = None, runId: Option[Int] = None, name: Option[String] = None): Future[Seq[Sample]] = {
val q = samples.filter { sample =>
......@@ -100,21 +66,6 @@ class SummaryDb(val db: Database) extends Closeable {
.map(_.headOption.flatten.map(ConfigUtils.jsonTextToMap))
}
/** This will create a new library */
def createLibrary(name: String, runId: Int, sampleId: Int, tags: Option[String] = None): Future[Int] = {
val id = Await.result(db.run(libraries.size.result), Duration.Inf)
db.run(libraries.forceInsert(Library(id, name, runId, sampleId, tags))).map(_ => id)
}
def createOrUpdateLibrary(name: String, runId: Int, sampleId: Int, tags: Option[String] = None): Future[Int] = {
getLibraryId(runId, sampleId, name).flatMap(_ match {
case Some(id: Int) =>
db.run(libraries.filter(_.name === name).filter(_.id === id).filter(_.sampleId === sampleId).map(_.tags).update(tags))
.map(_ => id)
case _ => createLibrary(name, runId, sampleId, tags)
})
}
/** This returns all libraries that match the given criteria */
def getLibraries(libId: Option[Int] = None, name: Option[String] = None, runId: Option[Int] = None, sampleId: Option[Int] = None): Future[Seq[Library]] = {
val q = libraries.filter { lib =>
......@@ -144,22 +95,6 @@ class SummaryDb(val db: Database) extends Closeable {
.map(_.headOption.flatten.map(ConfigUtils.jsonTextToMap))
}
/** Creates a new pipeline, even if it already exist. This may give a database exeption */
def forceCreatePipeline(name: String, runId: Int): Future[Int] = {
val id = Await.result(db.run(pipelines.size.result), Duration.Inf)
db.run(pipelines.forceInsert(Pipeline(id, name, runId))).map(_ => id)
}
/** Creates a new pipeline if it does not yet exist */
def createPipeline(name: String, runId: Int): Future[Int] = {
getPipelines(name = Some(name), runId = Some(runId))
.flatMap {
m =>
if (m.isEmpty) forceCreatePipeline(name, runId)
else Future(m.head.id)
}
}
/** Get all pipelines that match given criteria */
def getPipelines(pipelineId: Option[Int] = None, name: Option[String] = None, runId: Option[Int] = None): Future[Seq[Pipeline]] = {
val q = pipelines.filter { lib =>
......@@ -182,22 +117,6 @@ class SummaryDb(val db: Database) extends Closeable {
getPipelines(pipelineId = Some(pipelineId)).map(_.headOption.map(_.name))
}
/** Creates a new module, even if it already exist. This may give a database exeption */
def forceCreateModule(name: String, runId: Int, pipelineId: Int): Future[Int] = {
val id = Await.result(db.run(modules.size.result), Duration.Inf)
db.run(modules.forceInsert(Module(id, name, runId, pipelineId))).map(_ => id)
}
/** Creates a new module if it does not yet exist */
def createModule(name: String, runId: Int, pipelineId: Int): Future[Int] = {
getModules(name = Some(name), runId = Some(runId), pipelineId = Some(pipelineId))
.flatMap {
m =>
if (m.isEmpty) forceCreateModule(name, runId, pipelineId)
else Future(m.head.id)
}
}
/** Return all module with given criteria */
def getModules(moduleId: Option[Int] = None, name: Option[String] = None, runId: Option[Int] = None, pipelineId: Option[Int] = None): Future[Seq[Module]] = {
val q = modules.filter { lib =>
......@@ -221,22 +140,6 @@ class SummaryDb(val db: Database) extends Closeable {
getModules(pipelineId = Some(pipelineId), moduleId = Some(moduleId)).map(_.headOption.map(_.name))
}
/** Create a new stat in the database, This method is need checking before */
def createStat(runId: Int, pipelineId: Int, moduleId: Option[Int] = None,
sampleId: Option[Int] = None, libId: Option[Int] = None, content: String): Future[Int] = {
db.run(stats.forceInsert(Stat(runId, pipelineId, moduleId, sampleId, libId, content)))
}
/** This create or update a stat */
def createOrUpdateStat(runId: Int, pipelineId: Int, moduleId: Option[Int] = None,
sampleId: Option[Int] = None, libId: Option[Int] = None, content: String): Future[Int] = {
val filter = statsFilter(Some(runId), pipelineId, Some(moduleId.map(ModuleId(_)).getOrElse(NoModule)),
Some(sampleId.map(SampleId(_)).getOrElse(NoSample)), Some(libId.map(LibraryId(_)).getOrElse(NoLibrary)))
val r = Await.result(db.run(filter.size.result), Duration.Inf)
if (r == 0) createStat(runId, pipelineId, moduleId, sampleId, libId, content)
else db.run(filter.map(_.content).update(content))
}
/** Return a Query for [[Stats]] */
def statsFilter(runId: Option[Int] = None, pipeline: Option[PipelineQuery] = None, module: Option[ModuleQuery] = None,
sample: Option[SampleQuery] = None, library: Option[LibraryQuery] = None,
......@@ -322,12 +225,6 @@ class SummaryDb(val db: Database) extends Closeable {
}).toMap
}
/** This method creates a new setting. This method need checking before */
def createSetting(runId: Int, pipelineId: Int, moduleId: Option[Int] = None,
sampleId: Option[Int] = None, libId: Option[Int] = None, content: String): Future[Int] = {
db.run(settings.forceInsert(Setting(runId, pipelineId, moduleId, sampleId, libId, content)))
}
def settingsFilter(runId: Option[Int] = None, pipeline: Option[PipelineQuery] = None, module: Option[ModuleQuery] = None,
sample: Option[SampleQuery] = None, library: Option[LibraryQuery] = None,
mustHaveSample: Boolean = false, mustHaveLibrary: Boolean = false) = {
......@@ -362,15 +259,6 @@ class SummaryDb(val db: Database) extends Closeable {
f
}
/** This method creates or update a setting. */
def createOrUpdateSetting(runId: Int, pipelineId: Int, moduleId: Option[Int] = None,
sampleId: Option[Int] = None, libId: Option[Int] = None, content: String): Future[Int] = {
val filter = settingsFilter(Some(runId), PipelineId(pipelineId), moduleId.map(ModuleId), sampleId.map(SampleId), libId.map(LibraryId))
val r = Await.result(db.run(filter.size.result), Duration.Inf)
if (r == 0) createSetting(runId, pipelineId, moduleId, sampleId, libId, content)
else db.run(filter.update(Setting(runId, pipelineId, moduleId, sampleId, libId, content)))
}
/** Return all settings that match the given criteria */
def getSettings(runId: Option[Int] = None, pipeline: Option[PipelineQuery] = None, module: Option[ModuleQuery] = None,
sample: Option[SampleQuery] = None, library: Option[LibraryQuery] = None): Future[Seq[Setting]] = {
......@@ -458,6 +346,138 @@ class SummaryDb(val db: Database) extends Closeable {
db.run(filesFilter(Some(runId), Some(pipeline), Some(module), Some(sample), Some(library), Some(key)).result).map(_.headOption)
}
/** Returns a [[Query]] for [[Executables]] */
def executablesFilter(runId: Option[Int], toolName: Option[String]) = {
var q: Query[Executables, Executables#TableElementType, Seq] = executables
runId.foreach(r => q = q.filter(_.runId === r))
toolName.foreach(r => q = q.filter(_.toolName === r))
q
}
/** Return all executables with given criteria */
def getExecutables(runId: Option[Int] = None, toolName: Option[String] = None): Future[Seq[Executable]] = {
db.run(executablesFilter(runId, toolName).result)
}
}
class SummaryDbReadOnly(val db: Database) extends SummaryDb
class SummaryDbWrite(val db: Database) extends SummaryDb {
/** This method will create all tables */
def createTables(): Unit = {
try {
val setup = DBIO.seq(
(runs.schema ++ samples.schema ++
libraries.schema ++ pipelines.schema ++
modules.schema ++ stats.schema ++ settings.schema ++
files.schema ++ executables.schema).create
)
val setupFuture = db.run(setup)
Await.result(setupFuture, Duration.Inf)
}
}
/** This method will create a new run and return the runId */
def createRun(runName: String, outputDir: String, version: String, commitHash: String,
creationDate: Date): Future[Int] = {
val id = Await.result(db.run(runs.size.result), Duration.Inf)
db.run(runs.forceInsert(Run(id, runName, outputDir, version, commitHash, creationDate))).map(_ => id)
}
/** This creates a new sample and return the sampleId */
def createSample(name: String, runId: Int, tags: Option[String] = None): Future[Int] = {
val id = Await.result(db.run(samples.size.result), Duration.Inf)
db.run(samples.forceInsert(Sample(id, name, runId, tags))).map(_ => id)
}
def createOrUpdateSample(name: String, runId: Int, tags: Option[String] = None): Future[Int] = {
getSampleId(runId, name).flatMap(_ match {
case Some(id: Int) =>
db.run(samples.filter(_.name === name).filter(_.id === id).map(_.tags).update(tags))
.map(_ => id)
case _ => createSample(name, runId, tags)
})
}
/** This will create a new library */
def createLibrary(name: String, runId: Int, sampleId: Int, tags: Option[String] = None): Future[Int] = {
val id = Await.result(db.run(libraries.size.result), Duration.Inf)
db.run(libraries.forceInsert(Library(id, name, runId, sampleId, tags))).map(_ => id)
}
def createOrUpdateLibrary(name: String, runId: Int, sampleId: Int, tags: Option[String] = None): Future[Int] = {
getLibraryId(runId, sampleId, name).flatMap(_ match {
case Some(id: Int) =>
db.run(libraries.filter(_.name === name).filter(_.id === id).filter(_.sampleId === sampleId).map(_.tags).update(tags))
.map(_ => id)
case _ => createLibrary(name, runId, sampleId, tags)
})
}
/** Creates a new pipeline, even if it already exist. This may give a database exeption */
def forceCreatePipeline(name: String, runId: Int): Future[Int] = {
val id = Await.result(db.run(pipelines.size.result), Duration.Inf)
db.run(pipelines.forceInsert(Pipeline(id, name, runId))).map(_ => id)
}
/** Creates a new pipeline if it does not yet exist */
def createPipeline(name: String, runId: Int): Future[Int] = {
getPipelines(name = Some(name), runId = Some(runId))
.flatMap {
m =>
if (m.isEmpty) forceCreatePipeline(name, runId)
else Future(m.head.id)
}
}
/** Creates a new module, even if it already exist. This may give a database exeption */
def forceCreateModule(name: String, runId: Int, pipelineId: Int): Future[Int] = {
val id = Await.result(db.run(modules.size.result), Duration.Inf)
db.run(modules.forceInsert(Module(id, name, runId, pipelineId))).map(_ => id)
}
/** Creates a new module if it does not yet exist */
def createModule(name: String, runId: Int, pipelineId: Int): Future[Int] = {
getModules(name = Some(name), runId = Some(runId), pipelineId = Some(pipelineId))
.flatMap {
m =>
if (m.isEmpty) forceCreateModule(name, runId, pipelineId)
else Future(m.head.id)
}
}
/** Create a new stat in the database, This method is need checking before */
def createStat(runId: Int, pipelineId: Int, moduleId: Option[Int] = None,
sampleId: Option[Int] = None, libId: Option[Int] = None, content: String): Future[Int] = {
db.run(stats.forceInsert(Stat(runId, pipelineId, moduleId, sampleId, libId, content)))
}
/** This create or update a stat */
def createOrUpdateStat(runId: Int, pipelineId: Int, moduleId: Option[Int] = None,
sampleId: Option[Int] = None, libId: Option[Int] = None, content: String): Future[Int] = {
val filter = statsFilter(Some(runId), pipelineId, Some(moduleId.map(ModuleId(_)).getOrElse(NoModule)),
Some(sampleId.map(SampleId(_)).getOrElse(NoSample)), Some(libId.map(LibraryId(_)).getOrElse(NoLibrary)))
val r = Await.result(db.run(filter.size.result), Duration.Inf)
if (r == 0) createStat(runId, pipelineId, moduleId, sampleId, libId, content)
else db.run(filter.map(_.content).update(content))
}
/** This method creates a new setting. This method need checking before */
def createSetting(runId: Int, pipelineId: Int, moduleId: Option[Int] = None,
sampleId: Option[Int] = None, libId: Option[Int] = None, content: String): Future[Int] = {
db.run(settings.forceInsert(Setting(runId, pipelineId, moduleId, sampleId, libId, content)))
}
/** This method creates or update a setting. */
def createOrUpdateSetting(runId: Int, pipelineId: Int, moduleId: Option[Int] = None,
sampleId: Option[Int] = None, libId: Option[Int] = None, content: String): Future[Int] = {
val filter = settingsFilter(Some(runId), PipelineId(pipelineId), moduleId.map(ModuleId), sampleId.map(SampleId), libId.map(LibraryId))
val r = Await.result(db.run(filter.size.result), Duration.Inf)
if (r == 0) createSetting(runId, pipelineId, moduleId, sampleId, libId, content)
else db.run(filter.update(Setting(runId, pipelineId, moduleId, sampleId, libId, content)))
}
/** Creates a file. This method will raise expection if it already exist */
def createFile(runId: Int, pipelineId: Int, moduleId: Option[Int] = None,
sampleId: Option[Int] = None, libId: Option[Int] = None,
......@@ -475,19 +495,6 @@ class SummaryDb(val db: Database) extends Closeable {
else db.run(filter.update(Schema.File(runId, pipelineId, moduleId, sampleId, libId, key, path, md5, link, size)))
}
/** Returns a [[Query]] for [[Executables]] */
def executablesFilter(runId: Option[Int], toolName: Option[String]) = {
var q: Query[Executables, Executables#TableElementType, Seq] = executables
runId.foreach(r => q = q.filter(_.runId === r))
toolName.foreach(r => q = q.filter(_.toolName === r))
q
}
/** Return all executables with given criteria */
def getExecutables(runId: Option[Int] = None, toolName: Option[String] = None): Future[Seq[Executable]] = {
db.run(executablesFilter(runId, toolName).result)
}
/** Creates a exeutable. This method will raise expection if it already exist */
def createExecutable(runId: Int, toolName: String, version: Option[String] = None, path: Option[String] = None,
javaVersion: Option[String] = None, exeMd5: Option[String] = None, javaMd5: Option[String] = None, jarPath: Option[String] = None): Future[Int] = {
......@@ -502,6 +509,7 @@ class SummaryDb(val db: Database) extends Closeable {
if (r == 0) createExecutable(runId, toolName, version, javaVersion, exeMd5, javaMd5)
else db.run(filter.update(Executable(runId, toolName, version, path, javaVersion, exeMd5, javaMd5, jarPath)))
}
}
object SummaryDb {
......@@ -552,7 +560,7 @@ object SummaryDb {
implicit def libraryQueryToOptionLibraryQuery(x: LibraryQuery): Option[LibraryQuery] = Some(x)
}
private var summaryConnections = Map[File, SummaryDb]()
private var summaryConnections = Map[File, SummaryDbWrite]()
/** This closing all summary that are still in the cache */
def closeAll(): Unit = {
......@@ -561,18 +569,29 @@ object SummaryDb {
}
/** This will open a sqlite database and create tables when the database did not exist yet */
def openSqliteSummary(file: File): SummaryDb = {
def openSqliteSummary(file: File): SummaryDbWrite = {
if (!summaryConnections.contains(file)) {
val config: org.sqlite.SQLiteConfig = new org.sqlite.SQLiteConfig()
config.enforceForeignKeys(true)
config.setBusyTimeout("10000")
config.setSynchronous(org.sqlite.SQLiteConfig.SynchronousMode.OFF)
config.setSynchronous(org.sqlite.SQLiteConfig.SynchronousMode.FULL)
val exist = file.exists()
val db = Database.forURL(s"jdbc:sqlite:${file.getAbsolutePath}", driver = "org.sqlite.JDBC", prop = config.toProperties, executor = AsyncExecutor("single_thread", 1, 1000))
val s = new SummaryDb(db)
val s = new SummaryDbWrite(db)
if (!exist) s.createTables()
summaryConnections += file -> s
}
summaryConnections(file)
}
def openReadOnlySqliteSummary(file: File): SummaryDbReadOnly = {
require(file.exists(), s"File does not exist: $file")
val config: org.sqlite.SQLiteConfig = new org.sqlite.SQLiteConfig()
config.enforceForeignKeys(true)
config.setBusyTimeout("10000")
config.setSynchronous(org.sqlite.SQLiteConfig.SynchronousMode.FULL)
config.setReadOnly(true)
val db = Database.forURL(s"jdbc:sqlite:${file.getAbsolutePath}", driver = "org.sqlite.JDBC", prop = config.toProperties)
new SummaryDbReadOnly(db)
}
}
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment