SummaryDb.scala 16.7 KB
Newer Older
Peter van 't Hof's avatar
Peter van 't Hof committed
1
package nl.lumc.sasc.biopet.utils.summary.db
2

Peter van 't Hof's avatar
Peter van 't Hof committed
3
import nl.lumc.sasc.biopet.utils.ConfigUtils
4
import nl.lumc.sasc.biopet.utils.summary.db.Schema._
Peter van 't Hof's avatar
Peter van 't Hof committed
5
import slick.driver.H2Driver.api._
6
7

import scala.concurrent.ExecutionContext.Implicits.global
Peter van 't Hof's avatar
Peter van 't Hof committed
8
import scala.concurrent.duration.Duration
Peter van 't Hof's avatar
Peter van 't Hof committed
9
import scala.concurrent.{ Await, Future }
10

11
12
import java.io.{ Closeable, File }

13
/**
14
15
 * This class interface wityh a summary database
 *
Peter van 't Hof's avatar
Peter van 't Hof committed
16
17
 * Created by pjvanthof on 05/02/2017.
 */
18
19
20
21
class SummaryDb(db: Database) extends Closeable {

  def close(): Unit = db.close()

22
  /** This method will create all tables */
23
24
25
26
  def createTables(): Unit = {
    try {
      val setup = DBIO.seq(
        (runs.schema ++ samples.schema ++
27
28
          libraries.schema ++ pipelines.schema ++
          modules.schema ++ stats.schema ++ settings.schema ++
29
30
31
32
33
34
35
          files.schema ++ executables.schema).create
      )
      val setupFuture = db.run(setup)
      Await.result(setupFuture, Duration.Inf)
    }
  }

36
  /** This method will create a new run and return the runId */
37
38
  def createRun(runName: String, outputDir: String): Future[Int] = {
    val id = Await.result(db.run(runs.size.result), Duration.Inf)
39
    db.run(runs.forceInsert(Run(id, runName, outputDir))).map(_ => id)
40
41
  }

42
43
  /** This will return all runs that match the critiria given */
  def getRuns(runId: Option[Int] = None, runName: Option[String] = None, outputDir: Option[String] = None): Future[Seq[Run]] = {
44
45
46
47
    val q = runs.filter { run =>
      List(
        runId.map(run.id === _),
        runName.map(run.runName === _),
48
        outputDir.map(run.outputDir === _)
49
50
51
52
53
      ).collect({ case Some(criteria) => criteria }).reduceLeftOption(_ && _).getOrElse(true: Rep[Boolean])
    }
    db.run(q.result)
  }

54
  /** This creates a new sample and return the sampleId */
Peter van 't Hof's avatar
Peter van 't Hof committed
55
  def createSample(name: String, runId: Int, tags: Option[String] = None): Future[Int] = {
56
    val id = Await.result(db.run(samples.size.result), Duration.Inf)
57
    db.run(samples.forceInsert(Sample(id, name, runId, tags))).map(_ => id)
58
59
  }

60
61
  /** This will return all samples that match given criteria */
  def getSamples(sampleId: Option[Int] = None, runId: Option[Int] = None, name: Option[String] = None): Future[Seq[Sample]] = {
62
63
64
65
    val q = samples.filter { sample =>
      List(
        sampleId.map(sample.id === _),
        runId.map(sample.runId === _),
66
        name.map(sample.name === _)
67
68
      ).collect({ case Some(criteria) => criteria }).reduceLeftOption(_ && _).getOrElse(true: Rep[Boolean])
    }
69
    db.run(q.result)
70
71
  }

72
73
  /** Return samplId of a specific runId + sampleName */
  def getSampleId(runId: Int, sampleName: String): Future[Option[Int]] = {
Peter van 't Hof's avatar
Peter van 't Hof committed
74
75
76
    getSamples(runId = Some(runId), name = Some(sampleName)).map(_.headOption.map(_.id))
  }

77
  /** Return sample tags of a specific sample as a map */
Peter van 't Hof's avatar
Peter van 't Hof committed
78
79
80
81
82
  def getSampleTags(sampleId: Int): Future[Option[Map[String, Any]]] = {
    db.run(samples.filter(_.id === sampleId).map(_.tags).result)
      .map(_.headOption.flatten.map(ConfigUtils.jsonTextToMap))
  }

83
  /** This will create a new library */
Peter van 't Hof's avatar
Peter van 't Hof committed
84
85
  def createLibrary(name: String, runId: Int, sampleId: Int, tags: Option[String] = None): Future[Int] = {
    val id = Await.result(db.run(libraries.size.result), Duration.Inf)
86
    db.run(libraries.forceInsert(Library(id, name, runId, sampleId, tags))).map(_ => id)
Peter van 't Hof's avatar
Peter van 't Hof committed
87
88
  }

89
90
  /** This returns all libraries that match the given criteria */
  def getLibraries(libId: Option[Int] = None, name: Option[String] = None, runId: Option[Int] = None, sampleId: Option[Int] = None): Future[Seq[Library]] = {
Peter van 't Hof's avatar
Peter van 't Hof committed
91
92
93
94
95
96
97
98
    val q = libraries.filter { lib =>
      List(
        libId.map(lib.id === _),
        sampleId.map(lib.sampleId === _),
        runId.map(lib.runId === _),
        name.map(lib.name === _) // not a condition as `criteriaRoast` evaluates to `None`
      ).collect({ case Some(criteria) => criteria }).reduceLeftOption(_ && _).getOrElse(true: Rep[Boolean])
    }
99
    db.run(q.result)
Peter van 't Hof's avatar
Peter van 't Hof committed
100
101
  }

102
103
  /** Return a libraryId for a specific combination */
  def getLibraryId(runId: Int, sampleId: Int, name: String): Future[Option[Int]] = {
Peter van 't Hof's avatar
Peter van 't Hof committed
104
105
106
    getLibraries(runId = Some(runId), sampleId = Some(sampleId), name = Some(name)).map(_.headOption.map(_.id))
  }

107
  /** Return library tags as a map */
Peter van 't Hof's avatar
Peter van 't Hof committed
108
109
110
  def getLibraryTags(libId: Int): Future[Option[Map[String, Any]]] = {
    db.run(libraries.filter(_.id === libId).map(_.tags).result)
      .map(_.headOption.flatten.map(ConfigUtils.jsonTextToMap))
111
112
  }

113
  /** Creates a new pipeline, even if it already exist. This may give a database exeption */
114
  def forceCreatePipeline(name: String, runId: Int): Future[Int] = {
115
116
117
118
    val id = Await.result(db.run(pipelines.size.result), Duration.Inf)
    db.run(pipelines.forceInsert(Pipeline(id, name, runId))).map(_ => id)
  }

119
  /** Creates a new pipeline if it does not yet exist */
120
121
  def createPipeline(name: String, runId: Int): Future[Int] = {
    getPipelines(name = Some(name), runId = Some(runId))
Peter van 't Hof's avatar
Peter van 't Hof committed
122
      .flatMap {
123
        m =>
Peter van 't Hof's avatar
Peter van 't Hof committed
124
125
          if (m.isEmpty) forceCreatePipeline(name, runId)
          else Future(m.head.id)
126
127
128
      }
  }

129
130
  /** Get all pipelines that match given criteria */
  def getPipelines(pipelineId: Option[Int] = None, name: Option[String] = None, runId: Option[Int] = None): Future[Seq[Pipeline]] = {
131
132
133
134
    val q = pipelines.filter { lib =>
      List(
        pipelineId.map(lib.id === _),
        runId.map(lib.runId === _),
135
        name.map(lib.name === _)
136
137
      ).collect({ case Some(criteria) => criteria }).reduceLeftOption(_ && _).getOrElse(true: Rep[Boolean])
    }
138
    db.run(q.result)
139
140
  }

141
  /** Creates a new module, even if it already exist. This may give a database exeption */
142
  def forceCreateModule(name: String, runId: Int, pipelineId: Int): Future[Int] = {
143
144
145
146
    val id = Await.result(db.run(modules.size.result), Duration.Inf)
    db.run(modules.forceInsert(Module(id, name, runId, pipelineId))).map(_ => id)
  }

147
  /** Creates a new module if it does not yet exist */
148
149
  def createModule(name: String, runId: Int, pipelineId: Int): Future[Int] = {
    getModules(name = Some(name), runId = Some(runId), pipelineId = Some(pipelineId))
Peter van 't Hof's avatar
Peter van 't Hof committed
150
      .flatMap {
151
        m =>
Peter van 't Hof's avatar
Peter van 't Hof committed
152
153
          if (m.isEmpty) forceCreateModule(name, runId, pipelineId)
          else Future(m.head.id)
154
155
156
      }
  }

157
158
  /** Return all module with given criteria */
  def getModules(moduleId: Option[Int] = None, name: Option[String] = None, runId: Option[Int] = None, pipelineId: Option[Int] = None): Future[Seq[Module]] = {
159
160
161
162
163
    val q = modules.filter { lib =>
      List(
        moduleId.map(lib.id === _),
        runId.map(lib.runId === _),
        pipelineId.map(lib.pipelineId === _),
164
        name.map(lib.name === _)
165
166
      ).collect({ case Some(criteria) => criteria }).reduceLeftOption(_ && _).getOrElse(true: Rep[Boolean])
    }
167
    db.run(q.result)
168
169
  }

170
  /** Create a new stat in the database, This method is need checking before */
171
  def createStat(runId: Int, pipelineId: Int, moduleId: Option[Int] = None,
172
                 sampleId: Option[Int] = None, libId: Option[Int] = None, content: String): Future[Int] = {
173
174
175
    db.run(stats.forceInsert(Stat(runId, pipelineId, moduleId, sampleId, libId, content)))
  }

176
  /** This create or update a stat */
Peter van 't Hof's avatar
Peter van 't Hof committed
177
  def createOrUpdateStat(runId: Int, pipelineId: Int, moduleId: Option[Int] = None,
178
                         sampleId: Option[Int] = None, libId: Option[Int] = None, content: String): Future[Int] = {
Peter van 't Hof's avatar
Peter van 't Hof committed
179
180
181
182
183
184
    val filter = statsFilter(Some(runId), Some(pipelineId), Some(moduleId), Some(sampleId), Some(libId))
    val r = Await.result(db.run(filter.size.result), Duration.Inf)
    if (r == 0) createStat(runId, pipelineId, moduleId, sampleId, libId, content)
    else db.run(filter.map(_.content).update(content))
  }

185
  /** Return a Query for [[Stats]] */
Peter van 't Hof's avatar
Peter van 't Hof committed
186
  private def statsFilter(runId: Option[Int] = None, pipelineId: Option[Int] = None, moduleId: Option[Option[Int]] = None,
Peter van 't Hof's avatar
Peter van 't Hof committed
187
                          sampleId: Option[Option[Int]] = None, libId: Option[Option[Int]] = None) = {
Peter van 't Hof's avatar
Peter van 't Hof committed
188
189
    var f: Query[Stats, Stats#TableElementType, Seq] = stats
    runId.foreach(r => f = f.filter(_.runId === r))
190
    pipelineId.foreach(r => f = f.filter(_.pipelineId === r))
191
192
193
    moduleId.foreach(r => f = if (r.isDefined) f.filter(_.moduleId === r.get) else f.filter(_.moduleId.isEmpty))
    sampleId.foreach(r => f = if (r.isDefined) f.filter(_.sampleId === r.get) else f.filter(_.sampleId.isEmpty))
    libId.foreach(r => f = if (r.isDefined) f.filter(_.libraryId === r.get) else f.filter(_.libraryId.isEmpty))
194
    f
Peter van 't Hof's avatar
Peter van 't Hof committed
195
  }
196

197
  /** Return all stats that match given criteria */
Peter van 't Hof's avatar
Peter van 't Hof committed
198
  def getStats(runId: Option[Int] = None, pipelineId: Option[Int] = None, moduleId: Option[Option[Int]] = None,
199
               sampleId: Option[Option[Int]] = None, libId: Option[Option[Int]] = None): Future[Seq[Stat]] = {
Peter van 't Hof's avatar
Peter van 't Hof committed
200
    db.run(statsFilter(runId, pipelineId, moduleId, sampleId, libId).result)
201
202
  }

203
  /** Get a single stat as [[Map[String, Any]] */
204
205
  def getStat(runId: Int, pipelineId: Int, moduleId: Option[Int] = None,
              sampleId: Option[Int] = None, libId: Option[Int] = None): Future[Option[Map[String, Any]]] = {
206
207
    getStats(Some(runId), Some(pipelineId), Some(moduleId), Some(sampleId), Some(libId))
      .map(_.headOption.map(x => ConfigUtils.jsonTextToMap(x.content)))
208
209
  }

210
  /** This method creates a new setting. This method need checking before */
211
  def createSetting(runId: Int, pipelineId: Int, moduleId: Option[Int] = None,
212
                    sampleId: Option[Int] = None, libId: Option[Int] = None, content: String): Future[Int] = {
213
214
215
    db.run(settings.forceInsert(Setting(runId, pipelineId, moduleId, sampleId, libId, content)))
  }

216
217
  /** This return a [[Query]] for [[Settings]] */
  private def settingsFilter(runId: Option[Int] = None, pipelineId: Option[Int] = None, moduleId: Option[Option[Int]] = None,
218
                     sampleId: Option[Option[Int]] = None, libId: Option[Option[Int]] = None) = {
Peter van 't Hof's avatar
Peter van 't Hof committed
219
220
221
    var f: Query[Settings, Settings#TableElementType, Seq] = settings
    runId.foreach(r => f = f.filter(_.runId === r))
    pipelineId.foreach(r => f = f.filter(_.pipelineId === r))
222
223
224
    moduleId.foreach(r => f = if (r.isDefined) f.filter(_.moduleId === r.get) else f.filter(_.moduleId.isEmpty))
    sampleId.foreach(r => f = if (r.isDefined) f.filter(_.sampleId === r.get) else f.filter(_.sampleId.isEmpty))
    libId.foreach(r => f = if (r.isDefined) f.filter(_.libraryId === r.get) else f.filter(_.libraryId.isEmpty))
225
    f
226
  }
227

228
  /** This method creates or update a setting. */
229
  def createOrUpdateSetting(runId: Int, pipelineId: Int, moduleId: Option[Int] = None,
230
                            sampleId: Option[Int] = None, libId: Option[Int] = None, content: String): Future[Int] = {
231
232
233
    val filter = settingsFilter(Some(runId), Some(pipelineId), Some(moduleId), Some(sampleId), Some(libId))
    val r = Await.result(db.run(filter.size.result), Duration.Inf)
    if (r == 0) createSetting(runId, pipelineId, moduleId, sampleId, libId, content)
234
    else db.run(filter.update(Setting(runId, pipelineId, moduleId, sampleId, libId, content)))
235
236
  }

237
  /** Return all settings that match the given criteria */
238
  def getSettings(runId: Option[Int] = None, pipelineId: Option[Int] = None, moduleId: Option[Option[Int]] = None,
239
                  sampleId: Option[Option[Int]] = None, libId: Option[Option[Int]] = None): Future[Seq[Setting]] = {
240
    db.run(settingsFilter(runId, pipelineId, moduleId, sampleId, libId).result)
241
242
  }

243
  /** Return a specific setting as [[Map[String, Any]] */
244
  def getSetting(runId: Int, pipelineId: Int, moduleId: Option[Int] = None,
Peter van 't Hof's avatar
Peter van 't Hof committed
245
                 sampleId: Option[Int] = None, libId: Option[Int] = None): Future[Option[Map[String, Any]]] = {
246
247
    getSettings(Some(runId), Some(pipelineId), Some(moduleId), Some(sampleId), Some(libId))
      .map(_.headOption.map(x => ConfigUtils.jsonTextToMap(x.content)))
248
  }
Peter van 't Hof's avatar
Peter van 't Hof committed
249

250
251
  /** Return a [[Query]] for [[Files]] */
  private def filesFilter(runId: Option[Int] = None, pipelineId: Option[Int] = None, moduleId: Option[Option[Int]],
Peter van 't Hof's avatar
Peter van 't Hof committed
252
253
                  sampleId: Option[Option[Int]] = None, libId: Option[Option[Int]] = None,
                  key: Option[String] = None) = {
Peter van 't Hof's avatar
Peter van 't Hof committed
254
255
256
257
    var f: Query[Files, Files#TableElementType, Seq] = files
    runId.foreach(r => f = f.filter(_.runId === r))
    pipelineId.foreach(r => f = f.filter(_.pipelineId === r))
    key.foreach(r => f = f.filter(_.key === r))
258
259
260
    moduleId.foreach(r => f = if (r.isDefined) f.filter(_.moduleId === r.get) else f.filter(_.moduleId.isEmpty))
    sampleId.foreach(r => f = if (r.isDefined) f.filter(_.sampleId === r.get) else f.filter(_.sampleId.isEmpty))
    libId.foreach(r => f = if (r.isDefined) f.filter(_.libraryId === r.get) else f.filter(_.libraryId.isEmpty))
261
    f
Peter van 't Hof's avatar
Peter van 't Hof committed
262
263
  }

264
  /** Returns all [[Files]] with the given criteria */
Peter van 't Hof's avatar
Peter van 't Hof committed
265
266
  def getFiles(runId: Option[Int] = None, pipelineId: Option[Int] = None, moduleId: Option[Option[Int]],
               sampleId: Option[Option[Int]] = None, libId: Option[Option[Int]] = None,
267
               key: Option[String] = None): Future[Seq[Schema.File]] = {
Peter van 't Hof's avatar
Peter van 't Hof committed
268
269
270
    db.run(filesFilter(runId, pipelineId, moduleId, sampleId, libId, key).result)
  }

271
  /** Creates a file. This method will raise expection if it already exist */
Peter van 't Hof's avatar
Peter van 't Hof committed
272
273
  def createFile(runId: Int, pipelineId: Int, moduleId: Option[Int] = None,
                 sampleId: Option[Int] = None, libId: Option[Int] = None,
274
                 key: String, path: String, md5: String, link: Boolean = false, size: Long): Future[Int] = {
Peter van 't Hof's avatar
Peter van 't Hof committed
275
    db.run(files.forceInsert(Schema.File(runId, pipelineId, moduleId, sampleId, libId, key, path, md5, link, size)))
Peter van 't Hof's avatar
Peter van 't Hof committed
276
277
  }

278
  /** Create or update a File */
Peter van 't Hof's avatar
Peter van 't Hof committed
279
280
  def createOrUpdateFile(runId: Int, pipelineId: Int, moduleId: Option[Int] = None,
                         sampleId: Option[Int] = None, libId: Option[Int] = None,
281
                         key: String, path: String, md5: String, link: Boolean = false, size: Long): Future[Int] = {
Peter van 't Hof's avatar
Peter van 't Hof committed
282
283
284
    val filter = filesFilter(Some(runId), Some(pipelineId), Some(moduleId), Some(sampleId), Some(libId), Some(key))
    val r = Await.result(db.run(filter.size.result), Duration.Inf)
    if (r == 0) createFile(runId, pipelineId, moduleId, sampleId, libId, key, path, md5, link, size)
Peter van 't Hof's avatar
Peter van 't Hof committed
285
    else db.run(filter.update(Schema.File(runId, pipelineId, moduleId, sampleId, libId, key, path, md5, link, size)))
Peter van 't Hof's avatar
Peter van 't Hof committed
286
  }
Peter van 't Hof's avatar
Peter van 't Hof committed
287

288
289
  /** Returns a [[Query]] for [[Executables]] */
  private def executablesFilter(runId: Option[Int], toolName: Option[String]) = {
Peter van 't Hof's avatar
Peter van 't Hof committed
290
291
292
293
    var q: Query[Executables, Executables#TableElementType, Seq] = executables
    runId.foreach(r => q = q.filter(_.runId === r))
    toolName.foreach(r => q = q.filter(_.toolName === r))
    q
Peter van 't Hof's avatar
Peter van 't Hof committed
294
295
  }

296
297
  /** Return all executables with given criteria */
  def getExecutables(runId: Option[Int] = None, toolName: Option[String] = None): Future[Seq[Executable]] = {
Peter van 't Hof's avatar
Peter van 't Hof committed
298
299
300
    db.run(executablesFilter(runId, toolName).result)
  }

301
  /** Creates a exeutable. This method will raise expection if it already exist */
Peter van 't Hof's avatar
Peter van 't Hof committed
302
  def createExecutable(runId: Int, toolName: String, version: Option[String] = None, path: Option[String] = None,
303
                       javaVersion: Option[String] = None, exeMd5: Option[String] = None, javaMd5: Option[String] = None, jarPath: Option[String] = None): Future[Int] = {
Peter van 't Hof's avatar
Peter van 't Hof committed
304
305
306
    db.run(executables.forceInsert(Executable(runId, toolName, version, path, javaVersion, exeMd5, javaMd5, jarPath)))
  }

307
  /** Create or update a [[Executable]] */
Peter van 't Hof's avatar
Peter van 't Hof committed
308
  def createOrUpdateExecutable(runId: Int, toolName: String, version: Option[String] = None, path: Option[String] = None,
309
                               javaVersion: Option[String] = None, exeMd5: Option[String] = None, javaMd5: Option[String] = None, jarPath: Option[String] = None): Future[Int] = {
Peter van 't Hof's avatar
Peter van 't Hof committed
310
311
312
313
314
    val filter = executablesFilter(Some(runId), Some(toolName))
    val r = Await.result(db.run(filter.size.result), Duration.Inf)
    if (r == 0) createExecutable(runId, toolName, version, javaVersion, exeMd5, javaMd5)
    else db.run(filter.update(Executable(runId, toolName, version, path, javaVersion, exeMd5, javaMd5, jarPath)))
  }
315
}
316
317

object SummaryDb {
318
319
  private var summaryConnections = Map[File, SummaryDb]()

320
  /** This closing all summary that are still in the cache */
321
322
  def closeAll(): Unit = {
    summaryConnections.foreach(_._2.close())
323
    summaryConnections = summaryConnections.empty
324
325
  }

326
  /** This will open a sqlite database and create tables when the database did not exist yet */
327
  def openSqliteSummary(file: File): SummaryDb = {
328
    if (!summaryConnections.contains(file)) {
Peter van 't Hof's avatar
Peter van 't Hof committed
329
330
331
332
      val config: org.sqlite.SQLiteConfig = new org.sqlite.SQLiteConfig()
      config.enforceForeignKeys(true)
      config.setBusyTimeout("10000")
      config.setSynchronous(org.sqlite.SQLiteConfig.SynchronousMode.OFF)
333
      val exist = file.exists()
Peter van 't Hof's avatar
Peter van 't Hof committed
334
      val db = Database.forURL(s"jdbc:sqlite:${file.getAbsolutePath}", driver = "org.sqlite.JDBC", prop = config.toProperties, executor = AsyncExecutor("single_thread", 1, 1000))
335
336
337
338
339
      val s = new SummaryDb(db)
      if (!exist) s.createTables()
      summaryConnections += file -> s
    }
    summaryConnections(file)
340
341
  }
}