SummaryDb.scala 17 KB
Newer Older
Peter van 't Hof's avatar
Peter van 't Hof committed
1
package nl.lumc.sasc.biopet.utils.summary.db
2

Peter van 't Hof's avatar
Peter van 't Hof committed
3
import nl.lumc.sasc.biopet.utils.ConfigUtils
4
import nl.lumc.sasc.biopet.utils.summary.db.Schema._
Peter van 't Hof's avatar
Peter van 't Hof committed
5
import slick.driver.H2Driver.api._
6 7

import scala.concurrent.ExecutionContext.Implicits.global
Peter van 't Hof's avatar
Peter van 't Hof committed
8
import scala.concurrent.duration.Duration
9 10
import scala.concurrent.{ Await, Future }
import java.io.{ Closeable, File }
Peter van 't Hof's avatar
Peter van 't Hof committed
11
import java.sql.Date
12

13
/**
14 15
 * This class interface wityh a summary database
 *
Peter van 't Hof's avatar
Peter van 't Hof committed
16 17
 * Created by pjvanthof on 05/02/2017.
 */
18
class SummaryDb(val db: Database) extends Closeable {
19 20 21

  def close(): Unit = db.close()

22
  /** This method will create all tables */
23 24 25 26
  def createTables(): Unit = {
    try {
      val setup = DBIO.seq(
        (runs.schema ++ samples.schema ++
27 28
          libraries.schema ++ pipelines.schema ++
          modules.schema ++ stats.schema ++ settings.schema ++
29 30 31 32 33 34 35
          files.schema ++ executables.schema).create
      )
      val setupFuture = db.run(setup)
      Await.result(setupFuture, Duration.Inf)
    }
  }

36
  /** This method will create a new run and return the runId */
Peter van 't Hof's avatar
Peter van 't Hof committed
37 38
  def createRun(runName: String, outputDir: String, version: String, commitHash: String,
                creationDate: Date): Future[Int] = {
39
    val id = Await.result(db.run(runs.size.result), Duration.Inf)
Peter van 't Hof's avatar
Peter van 't Hof committed
40
    db.run(runs.forceInsert(Run(id, runName, outputDir, version, commitHash, creationDate))).map(_ => id)
41 42
  }

43 44
  /** This will return all runs that match the critiria given */
  def getRuns(runId: Option[Int] = None, runName: Option[String] = None, outputDir: Option[String] = None): Future[Seq[Run]] = {
45 46 47 48
    val q = runs.filter { run =>
      List(
        runId.map(run.id === _),
        runName.map(run.runName === _),
49
        outputDir.map(run.outputDir === _)
50 51 52 53 54
      ).collect({ case Some(criteria) => criteria }).reduceLeftOption(_ && _).getOrElse(true: Rep[Boolean])
    }
    db.run(q.result)
  }

55
  /** This creates a new sample and return the sampleId */
Peter van 't Hof's avatar
Peter van 't Hof committed
56
  def createSample(name: String, runId: Int, tags: Option[String] = None): Future[Int] = {
57
    val id = Await.result(db.run(samples.size.result), Duration.Inf)
58
    db.run(samples.forceInsert(Sample(id, name, runId, tags))).map(_ => id)
59 60
  }

61 62
  /** This will return all samples that match given criteria */
  def getSamples(sampleId: Option[Int] = None, runId: Option[Int] = None, name: Option[String] = None): Future[Seq[Sample]] = {
63 64 65 66
    val q = samples.filter { sample =>
      List(
        sampleId.map(sample.id === _),
        runId.map(sample.runId === _),
67
        name.map(sample.name === _)
68 69
      ).collect({ case Some(criteria) => criteria }).reduceLeftOption(_ && _).getOrElse(true: Rep[Boolean])
    }
70
    db.run(q.result)
71 72
  }

73 74
  /** Return samplId of a specific runId + sampleName */
  def getSampleId(runId: Int, sampleName: String): Future[Option[Int]] = {
Peter van 't Hof's avatar
Peter van 't Hof committed
75 76 77
    getSamples(runId = Some(runId), name = Some(sampleName)).map(_.headOption.map(_.id))
  }

78 79 80 81 82
  /** Return samplId of a specific runId + sampleName */
  def getSampleName(sampleId: Int): Future[Option[Int]] = {
    getSamples(sampleId = Some(sampleId)).map(_.headOption.map(_.id))
  }

83
  /** Return sample tags of a specific sample as a map */
Peter van 't Hof's avatar
Peter van 't Hof committed
84 85 86 87 88
  def getSampleTags(sampleId: Int): Future[Option[Map[String, Any]]] = {
    db.run(samples.filter(_.id === sampleId).map(_.tags).result)
      .map(_.headOption.flatten.map(ConfigUtils.jsonTextToMap))
  }

89
  /** This will create a new library */
Peter van 't Hof's avatar
Peter van 't Hof committed
90 91
  def createLibrary(name: String, runId: Int, sampleId: Int, tags: Option[String] = None): Future[Int] = {
    val id = Await.result(db.run(libraries.size.result), Duration.Inf)
92
    db.run(libraries.forceInsert(Library(id, name, runId, sampleId, tags))).map(_ => id)
Peter van 't Hof's avatar
Peter van 't Hof committed
93 94
  }

95 96
  /** This returns all libraries that match the given criteria */
  def getLibraries(libId: Option[Int] = None, name: Option[String] = None, runId: Option[Int] = None, sampleId: Option[Int] = None): Future[Seq[Library]] = {
Peter van 't Hof's avatar
Peter van 't Hof committed
97 98 99 100 101 102 103 104
    val q = libraries.filter { lib =>
      List(
        libId.map(lib.id === _),
        sampleId.map(lib.sampleId === _),
        runId.map(lib.runId === _),
        name.map(lib.name === _) // not a condition as `criteriaRoast` evaluates to `None`
      ).collect({ case Some(criteria) => criteria }).reduceLeftOption(_ && _).getOrElse(true: Rep[Boolean])
    }
105
    db.run(q.result)
Peter van 't Hof's avatar
Peter van 't Hof committed
106 107
  }

108 109
  /** Return a libraryId for a specific combination */
  def getLibraryId(runId: Int, sampleId: Int, name: String): Future[Option[Int]] = {
Peter van 't Hof's avatar
Peter van 't Hof committed
110 111 112
    getLibraries(runId = Some(runId), sampleId = Some(sampleId), name = Some(name)).map(_.headOption.map(_.id))
  }

113
  /** Return library tags as a map */
Peter van 't Hof's avatar
Peter van 't Hof committed
114 115 116
  def getLibraryTags(libId: Int): Future[Option[Map[String, Any]]] = {
    db.run(libraries.filter(_.id === libId).map(_.tags).result)
      .map(_.headOption.flatten.map(ConfigUtils.jsonTextToMap))
117 118
  }

119
  /** Creates a new pipeline, even if it already exist. This may give a database exeption */
120
  def forceCreatePipeline(name: String, runId: Int): Future[Int] = {
121 122 123 124
    val id = Await.result(db.run(pipelines.size.result), Duration.Inf)
    db.run(pipelines.forceInsert(Pipeline(id, name, runId))).map(_ => id)
  }

125
  /** Creates a new pipeline if it does not yet exist */
126 127
  def createPipeline(name: String, runId: Int): Future[Int] = {
    getPipelines(name = Some(name), runId = Some(runId))
Peter van 't Hof's avatar
Peter van 't Hof committed
128
      .flatMap {
129
        m =>
Peter van 't Hof's avatar
Peter van 't Hof committed
130 131
          if (m.isEmpty) forceCreatePipeline(name, runId)
          else Future(m.head.id)
132 133 134
      }
  }

135 136
  /** Get all pipelines that match given criteria */
  def getPipelines(pipelineId: Option[Int] = None, name: Option[String] = None, runId: Option[Int] = None): Future[Seq[Pipeline]] = {
137 138 139 140
    val q = pipelines.filter { lib =>
      List(
        pipelineId.map(lib.id === _),
        runId.map(lib.runId === _),
141
        name.map(lib.name === _)
142 143
      ).collect({ case Some(criteria) => criteria }).reduceLeftOption(_ && _).getOrElse(true: Rep[Boolean])
    }
144
    db.run(q.result)
145 146
  }

147
  /** Creates a new module, even if it already exist. This may give a database exeption */
148
  def forceCreateModule(name: String, runId: Int, pipelineId: Int): Future[Int] = {
149 150 151 152
    val id = Await.result(db.run(modules.size.result), Duration.Inf)
    db.run(modules.forceInsert(Module(id, name, runId, pipelineId))).map(_ => id)
  }

153
  /** Creates a new module if it does not yet exist */
154 155
  def createModule(name: String, runId: Int, pipelineId: Int): Future[Int] = {
    getModules(name = Some(name), runId = Some(runId), pipelineId = Some(pipelineId))
Peter van 't Hof's avatar
Peter van 't Hof committed
156
      .flatMap {
157
        m =>
Peter van 't Hof's avatar
Peter van 't Hof committed
158 159
          if (m.isEmpty) forceCreateModule(name, runId, pipelineId)
          else Future(m.head.id)
160 161 162
      }
  }

163 164
  /** Return all module with given criteria */
  def getModules(moduleId: Option[Int] = None, name: Option[String] = None, runId: Option[Int] = None, pipelineId: Option[Int] = None): Future[Seq[Module]] = {
165 166 167 168 169
    val q = modules.filter { lib =>
      List(
        moduleId.map(lib.id === _),
        runId.map(lib.runId === _),
        pipelineId.map(lib.pipelineId === _),
170
        name.map(lib.name === _)
171 172
      ).collect({ case Some(criteria) => criteria }).reduceLeftOption(_ && _).getOrElse(true: Rep[Boolean])
    }
173
    db.run(q.result)
174 175
  }

176
  /** Create a new stat in the database, This method is need checking before */
177
  def createStat(runId: Int, pipelineId: Int, moduleId: Option[Int] = None,
178
                 sampleId: Option[Int] = None, libId: Option[Int] = None, content: String): Future[Int] = {
179 180 181
    db.run(stats.forceInsert(Stat(runId, pipelineId, moduleId, sampleId, libId, content)))
  }

182
  /** This create or update a stat */
Peter van 't Hof's avatar
Peter van 't Hof committed
183
  def createOrUpdateStat(runId: Int, pipelineId: Int, moduleId: Option[Int] = None,
184
                         sampleId: Option[Int] = None, libId: Option[Int] = None, content: String): Future[Int] = {
Peter van 't Hof's avatar
Peter van 't Hof committed
185 186 187 188 189 190
    val filter = statsFilter(Some(runId), Some(pipelineId), Some(moduleId), Some(sampleId), Some(libId))
    val r = Await.result(db.run(filter.size.result), Duration.Inf)
    if (r == 0) createStat(runId, pipelineId, moduleId, sampleId, libId, content)
    else db.run(filter.map(_.content).update(content))
  }

191
  /** Return a Query for [[Stats]] */
192
  def statsFilter(runId: Option[Int] = None, pipelineId: Option[Int] = None, moduleId: Option[Option[Int]] = None,
Peter van 't Hof's avatar
Peter van 't Hof committed
193
                          sampleId: Option[Option[Int]] = None, libId: Option[Option[Int]] = None) = {
Peter van 't Hof's avatar
Peter van 't Hof committed
194 195
    var f: Query[Stats, Stats#TableElementType, Seq] = stats
    runId.foreach(r => f = f.filter(_.runId === r))
196
    pipelineId.foreach(r => f = f.filter(_.pipelineId === r))
197 198 199
    moduleId.foreach(r => f = if (r.isDefined) f.filter(_.moduleId === r.get) else f.filter(_.moduleId.isEmpty))
    sampleId.foreach(r => f = if (r.isDefined) f.filter(_.sampleId === r.get) else f.filter(_.sampleId.isEmpty))
    libId.foreach(r => f = if (r.isDefined) f.filter(_.libraryId === r.get) else f.filter(_.libraryId.isEmpty))
200
    f
Peter van 't Hof's avatar
Peter van 't Hof committed
201
  }
202

203
  /** Return all stats that match given criteria */
Peter van 't Hof's avatar
Peter van 't Hof committed
204
  def getStats(runId: Option[Int] = None, pipelineId: Option[Int] = None, moduleId: Option[Option[Int]] = None,
205
               sampleId: Option[Option[Int]] = None, libId: Option[Option[Int]] = None): Future[Seq[Stat]] = {
Peter van 't Hof's avatar
Peter van 't Hof committed
206
    db.run(statsFilter(runId, pipelineId, moduleId, sampleId, libId).result)
207 208
  }

209
  /** Get a single stat as [[Map[String, Any]] */
210 211
  def getStat(runId: Int, pipelineId: Int, moduleId: Option[Int] = None,
              sampleId: Option[Int] = None, libId: Option[Int] = None): Future[Option[Map[String, Any]]] = {
212 213
    getStats(Some(runId), Some(pipelineId), Some(moduleId), Some(sampleId), Some(libId))
      .map(_.headOption.map(x => ConfigUtils.jsonTextToMap(x.content)))
214 215
  }

216
  /** This method creates a new setting. This method need checking before */
217
  def createSetting(runId: Int, pipelineId: Int, moduleId: Option[Int] = None,
218
                    sampleId: Option[Int] = None, libId: Option[Int] = None, content: String): Future[Int] = {
219 220 221
    db.run(settings.forceInsert(Setting(runId, pipelineId, moduleId, sampleId, libId, content)))
  }

222
  /** This return a [[Query]] for [[Settings]] */
223
  def settingsFilter(runId: Option[Int] = None, pipelineId: Option[Int] = None, moduleId: Option[Option[Int]] = None,
224
                             sampleId: Option[Option[Int]] = None, libId: Option[Option[Int]] = None) = {
Peter van 't Hof's avatar
Peter van 't Hof committed
225 226 227
    var f: Query[Settings, Settings#TableElementType, Seq] = settings
    runId.foreach(r => f = f.filter(_.runId === r))
    pipelineId.foreach(r => f = f.filter(_.pipelineId === r))
228 229 230
    moduleId.foreach(r => f = if (r.isDefined) f.filter(_.moduleId === r.get) else f.filter(_.moduleId.isEmpty))
    sampleId.foreach(r => f = if (r.isDefined) f.filter(_.sampleId === r.get) else f.filter(_.sampleId.isEmpty))
    libId.foreach(r => f = if (r.isDefined) f.filter(_.libraryId === r.get) else f.filter(_.libraryId.isEmpty))
231
    f
232
  }
233

234
  /** This method creates or update a setting. */
235
  def createOrUpdateSetting(runId: Int, pipelineId: Int, moduleId: Option[Int] = None,
236
                            sampleId: Option[Int] = None, libId: Option[Int] = None, content: String): Future[Int] = {
237 238 239
    val filter = settingsFilter(Some(runId), Some(pipelineId), Some(moduleId), Some(sampleId), Some(libId))
    val r = Await.result(db.run(filter.size.result), Duration.Inf)
    if (r == 0) createSetting(runId, pipelineId, moduleId, sampleId, libId, content)
240
    else db.run(filter.update(Setting(runId, pipelineId, moduleId, sampleId, libId, content)))
241 242
  }

243
  /** Return all settings that match the given criteria */
244
  def getSettings(runId: Option[Int] = None, pipelineId: Option[Int] = None, moduleId: Option[Option[Int]] = None,
245
                  sampleId: Option[Option[Int]] = None, libId: Option[Option[Int]] = None): Future[Seq[Setting]] = {
246
    db.run(settingsFilter(runId, pipelineId, moduleId, sampleId, libId).result)
247 248
  }

249
  /** Return a specific setting as [[Map[String, Any]] */
250
  def getSetting(runId: Int, pipelineId: Int, moduleId: Option[Int] = None,
Peter van 't Hof's avatar
Peter van 't Hof committed
251
                 sampleId: Option[Int] = None, libId: Option[Int] = None): Future[Option[Map[String, Any]]] = {
252 253
    getSettings(Some(runId), Some(pipelineId), Some(moduleId), Some(sampleId), Some(libId))
      .map(_.headOption.map(x => ConfigUtils.jsonTextToMap(x.content)))
254
  }
Peter van 't Hof's avatar
Peter van 't Hof committed
255

256
  /** Return a [[Query]] for [[Files]] */
257
  def filesFilter(runId: Option[Int] = None, pipelineId: Option[Int] = None, moduleId: Option[Option[Int]],
258 259
                          sampleId: Option[Option[Int]] = None, libId: Option[Option[Int]] = None,
                          key: Option[String] = None) = {
Peter van 't Hof's avatar
Peter van 't Hof committed
260 261 262 263
    var f: Query[Files, Files#TableElementType, Seq] = files
    runId.foreach(r => f = f.filter(_.runId === r))
    pipelineId.foreach(r => f = f.filter(_.pipelineId === r))
    key.foreach(r => f = f.filter(_.key === r))
264 265 266
    moduleId.foreach(r => f = if (r.isDefined) f.filter(_.moduleId === r.get) else f.filter(_.moduleId.isEmpty))
    sampleId.foreach(r => f = if (r.isDefined) f.filter(_.sampleId === r.get) else f.filter(_.sampleId.isEmpty))
    libId.foreach(r => f = if (r.isDefined) f.filter(_.libraryId === r.get) else f.filter(_.libraryId.isEmpty))
267
    f
Peter van 't Hof's avatar
Peter van 't Hof committed
268 269
  }

270
  /** Returns all [[Files]] with the given criteria */
Peter van 't Hof's avatar
Peter van 't Hof committed
271 272
  def getFiles(runId: Option[Int] = None, pipelineId: Option[Int] = None, moduleId: Option[Option[Int]],
               sampleId: Option[Option[Int]] = None, libId: Option[Option[Int]] = None,
273
               key: Option[String] = None): Future[Seq[Schema.File]] = {
Peter van 't Hof's avatar
Peter van 't Hof committed
274 275 276
    db.run(filesFilter(runId, pipelineId, moduleId, sampleId, libId, key).result)
  }

277
  /** Creates a file. This method will raise expection if it already exist */
Peter van 't Hof's avatar
Peter van 't Hof committed
278 279
  def createFile(runId: Int, pipelineId: Int, moduleId: Option[Int] = None,
                 sampleId: Option[Int] = None, libId: Option[Int] = None,
280
                 key: String, path: String, md5: String, link: Boolean = false, size: Long): Future[Int] = {
Peter van 't Hof's avatar
Peter van 't Hof committed
281
    db.run(files.forceInsert(Schema.File(runId, pipelineId, moduleId, sampleId, libId, key, path, md5, link, size)))
Peter van 't Hof's avatar
Peter van 't Hof committed
282 283
  }

284
  /** Create or update a File */
Peter van 't Hof's avatar
Peter van 't Hof committed
285 286
  def createOrUpdateFile(runId: Int, pipelineId: Int, moduleId: Option[Int] = None,
                         sampleId: Option[Int] = None, libId: Option[Int] = None,
287
                         key: String, path: String, md5: String, link: Boolean = false, size: Long): Future[Int] = {
Peter van 't Hof's avatar
Peter van 't Hof committed
288 289 290
    val filter = filesFilter(Some(runId), Some(pipelineId), Some(moduleId), Some(sampleId), Some(libId), Some(key))
    val r = Await.result(db.run(filter.size.result), Duration.Inf)
    if (r == 0) createFile(runId, pipelineId, moduleId, sampleId, libId, key, path, md5, link, size)
Peter van 't Hof's avatar
Peter van 't Hof committed
291
    else db.run(filter.update(Schema.File(runId, pipelineId, moduleId, sampleId, libId, key, path, md5, link, size)))
Peter van 't Hof's avatar
Peter van 't Hof committed
292
  }
Peter van 't Hof's avatar
Peter van 't Hof committed
293

294
  /** Returns a [[Query]] for [[Executables]] */
295
  def executablesFilter(runId: Option[Int], toolName: Option[String]) = {
Peter van 't Hof's avatar
Peter van 't Hof committed
296 297 298 299
    var q: Query[Executables, Executables#TableElementType, Seq] = executables
    runId.foreach(r => q = q.filter(_.runId === r))
    toolName.foreach(r => q = q.filter(_.toolName === r))
    q
Peter van 't Hof's avatar
Peter van 't Hof committed
300 301
  }

302 303
  /** Return all executables with given criteria */
  def getExecutables(runId: Option[Int] = None, toolName: Option[String] = None): Future[Seq[Executable]] = {
Peter van 't Hof's avatar
Peter van 't Hof committed
304 305 306
    db.run(executablesFilter(runId, toolName).result)
  }

307
  /** Creates a exeutable. This method will raise expection if it already exist */
Peter van 't Hof's avatar
Peter van 't Hof committed
308
  def createExecutable(runId: Int, toolName: String, version: Option[String] = None, path: Option[String] = None,
309
                       javaVersion: Option[String] = None, exeMd5: Option[String] = None, javaMd5: Option[String] = None, jarPath: Option[String] = None): Future[Int] = {
Peter van 't Hof's avatar
Peter van 't Hof committed
310 311 312
    db.run(executables.forceInsert(Executable(runId, toolName, version, path, javaVersion, exeMd5, javaMd5, jarPath)))
  }

313
  /** Create or update a [[Executable]] */
Peter van 't Hof's avatar
Peter van 't Hof committed
314
  def createOrUpdateExecutable(runId: Int, toolName: String, version: Option[String] = None, path: Option[String] = None,
315
                               javaVersion: Option[String] = None, exeMd5: Option[String] = None, javaMd5: Option[String] = None, jarPath: Option[String] = None): Future[Int] = {
Peter van 't Hof's avatar
Peter van 't Hof committed
316 317 318 319 320
    val filter = executablesFilter(Some(runId), Some(toolName))
    val r = Await.result(db.run(filter.size.result), Duration.Inf)
    if (r == 0) createExecutable(runId, toolName, version, javaVersion, exeMd5, javaMd5)
    else db.run(filter.update(Executable(runId, toolName, version, path, javaVersion, exeMd5, javaMd5, jarPath)))
  }
321
}
322 323

object SummaryDb {
324 325
  private var summaryConnections = Map[File, SummaryDb]()

326
  /** This closing all summary that are still in the cache */
327 328
  def closeAll(): Unit = {
    summaryConnections.foreach(_._2.close())
329
    summaryConnections = summaryConnections.empty
330 331
  }

332
  /** This will open a sqlite database and create tables when the database did not exist yet */
333
  def openSqliteSummary(file: File): SummaryDb = {
334
    if (!summaryConnections.contains(file)) {
Peter van 't Hof's avatar
Peter van 't Hof committed
335 336 337 338
      val config: org.sqlite.SQLiteConfig = new org.sqlite.SQLiteConfig()
      config.enforceForeignKeys(true)
      config.setBusyTimeout("10000")
      config.setSynchronous(org.sqlite.SQLiteConfig.SynchronousMode.OFF)
339
      val exist = file.exists()
Peter van 't Hof's avatar
Peter van 't Hof committed
340
      val db = Database.forURL(s"jdbc:sqlite:${file.getAbsolutePath}", driver = "org.sqlite.JDBC", prop = config.toProperties, executor = AsyncExecutor("single_thread", 1, 1000))
341 342 343 344 345
      val s = new SummaryDb(db)
      if (!exist) s.createTables()
      summaryConnections += file -> s
    }
    summaryConnections(file)
346 347
  }
}