SummaryDb.scala 16.8 KB
Newer Older
Peter van 't Hof's avatar
Peter van 't Hof committed
1
package nl.lumc.sasc.biopet.utils.summary.db
2

Peter van 't Hof's avatar
Peter van 't Hof committed
3
import nl.lumc.sasc.biopet.utils.ConfigUtils
4
import nl.lumc.sasc.biopet.utils.summary.db.Schema._
Peter van 't Hof's avatar
Peter van 't Hof committed
5
import slick.driver.H2Driver.api._
6
7

import scala.concurrent.ExecutionContext.Implicits.global
Peter van 't Hof's avatar
Peter van 't Hof committed
8
import scala.concurrent.duration.Duration
Peter van 't Hof's avatar
Peter van 't Hof committed
9
10
11
import scala.concurrent.{Await, Future}
import java.io.{Closeable, File}
import java.sql.Date
12

13
/**
14
15
 * This class interface wityh a summary database
 *
Peter van 't Hof's avatar
Peter van 't Hof committed
16
17
 * Created by pjvanthof on 05/02/2017.
 */
18
19
20
21
class SummaryDb(db: Database) extends Closeable {

  def close(): Unit = db.close()

22
  /** This method will create all tables */
23
24
25
26
  def createTables(): Unit = {
    try {
      val setup = DBIO.seq(
        (runs.schema ++ samples.schema ++
27
28
          libraries.schema ++ pipelines.schema ++
          modules.schema ++ stats.schema ++ settings.schema ++
29
30
31
32
33
34
35
          files.schema ++ executables.schema).create
      )
      val setupFuture = db.run(setup)
      Await.result(setupFuture, Duration.Inf)
    }
  }

36
  /** This method will create a new run and return the runId */
Peter van 't Hof's avatar
Peter van 't Hof committed
37
38
  def createRun(runName: String, outputDir: String, version: String, commitHash: String,
                creationDate: Date): Future[Int] = {
39
    val id = Await.result(db.run(runs.size.result), Duration.Inf)
Peter van 't Hof's avatar
Peter van 't Hof committed
40
    db.run(runs.forceInsert(Run(id, runName, outputDir, version, commitHash, creationDate))).map(_ => id)
41
42
  }

43
44
  /** This will return all runs that match the critiria given */
  def getRuns(runId: Option[Int] = None, runName: Option[String] = None, outputDir: Option[String] = None): Future[Seq[Run]] = {
45
46
47
48
    val q = runs.filter { run =>
      List(
        runId.map(run.id === _),
        runName.map(run.runName === _),
49
        outputDir.map(run.outputDir === _)
50
51
52
53
54
      ).collect({ case Some(criteria) => criteria }).reduceLeftOption(_ && _).getOrElse(true: Rep[Boolean])
    }
    db.run(q.result)
  }

55
  /** This creates a new sample and return the sampleId */
Peter van 't Hof's avatar
Peter van 't Hof committed
56
  def createSample(name: String, runId: Int, tags: Option[String] = None): Future[Int] = {
57
    val id = Await.result(db.run(samples.size.result), Duration.Inf)
58
    db.run(samples.forceInsert(Sample(id, name, runId, tags))).map(_ => id)
59
60
  }

61
62
  /** This will return all samples that match given criteria */
  def getSamples(sampleId: Option[Int] = None, runId: Option[Int] = None, name: Option[String] = None): Future[Seq[Sample]] = {
63
64
65
66
    val q = samples.filter { sample =>
      List(
        sampleId.map(sample.id === _),
        runId.map(sample.runId === _),
67
        name.map(sample.name === _)
68
69
      ).collect({ case Some(criteria) => criteria }).reduceLeftOption(_ && _).getOrElse(true: Rep[Boolean])
    }
70
    db.run(q.result)
71
72
  }

73
74
  /** Return samplId of a specific runId + sampleName */
  def getSampleId(runId: Int, sampleName: String): Future[Option[Int]] = {
Peter van 't Hof's avatar
Peter van 't Hof committed
75
76
77
    getSamples(runId = Some(runId), name = Some(sampleName)).map(_.headOption.map(_.id))
  }

78
  /** Return sample tags of a specific sample as a map */
Peter van 't Hof's avatar
Peter van 't Hof committed
79
80
81
82
83
  def getSampleTags(sampleId: Int): Future[Option[Map[String, Any]]] = {
    db.run(samples.filter(_.id === sampleId).map(_.tags).result)
      .map(_.headOption.flatten.map(ConfigUtils.jsonTextToMap))
  }

84
  /** This will create a new library */
Peter van 't Hof's avatar
Peter van 't Hof committed
85
86
  def createLibrary(name: String, runId: Int, sampleId: Int, tags: Option[String] = None): Future[Int] = {
    val id = Await.result(db.run(libraries.size.result), Duration.Inf)
87
    db.run(libraries.forceInsert(Library(id, name, runId, sampleId, tags))).map(_ => id)
Peter van 't Hof's avatar
Peter van 't Hof committed
88
89
  }

90
91
  /** This returns all libraries that match the given criteria */
  def getLibraries(libId: Option[Int] = None, name: Option[String] = None, runId: Option[Int] = None, sampleId: Option[Int] = None): Future[Seq[Library]] = {
Peter van 't Hof's avatar
Peter van 't Hof committed
92
93
94
95
96
97
98
99
    val q = libraries.filter { lib =>
      List(
        libId.map(lib.id === _),
        sampleId.map(lib.sampleId === _),
        runId.map(lib.runId === _),
        name.map(lib.name === _) // not a condition as `criteriaRoast` evaluates to `None`
      ).collect({ case Some(criteria) => criteria }).reduceLeftOption(_ && _).getOrElse(true: Rep[Boolean])
    }
100
    db.run(q.result)
Peter van 't Hof's avatar
Peter van 't Hof committed
101
102
  }

103
104
  /** Return a libraryId for a specific combination */
  def getLibraryId(runId: Int, sampleId: Int, name: String): Future[Option[Int]] = {
Peter van 't Hof's avatar
Peter van 't Hof committed
105
106
107
    getLibraries(runId = Some(runId), sampleId = Some(sampleId), name = Some(name)).map(_.headOption.map(_.id))
  }

108
  /** Return library tags as a map */
Peter van 't Hof's avatar
Peter van 't Hof committed
109
110
111
  def getLibraryTags(libId: Int): Future[Option[Map[String, Any]]] = {
    db.run(libraries.filter(_.id === libId).map(_.tags).result)
      .map(_.headOption.flatten.map(ConfigUtils.jsonTextToMap))
112
113
  }

114
  /** Creates a new pipeline, even if it already exist. This may give a database exeption */
115
  def forceCreatePipeline(name: String, runId: Int): Future[Int] = {
116
117
118
119
    val id = Await.result(db.run(pipelines.size.result), Duration.Inf)
    db.run(pipelines.forceInsert(Pipeline(id, name, runId))).map(_ => id)
  }

120
  /** Creates a new pipeline if it does not yet exist */
121
122
  def createPipeline(name: String, runId: Int): Future[Int] = {
    getPipelines(name = Some(name), runId = Some(runId))
Peter van 't Hof's avatar
Peter van 't Hof committed
123
      .flatMap {
124
        m =>
Peter van 't Hof's avatar
Peter van 't Hof committed
125
126
          if (m.isEmpty) forceCreatePipeline(name, runId)
          else Future(m.head.id)
127
128
129
      }
  }

130
131
  /** Get all pipelines that match given criteria */
  def getPipelines(pipelineId: Option[Int] = None, name: Option[String] = None, runId: Option[Int] = None): Future[Seq[Pipeline]] = {
132
133
134
135
    val q = pipelines.filter { lib =>
      List(
        pipelineId.map(lib.id === _),
        runId.map(lib.runId === _),
136
        name.map(lib.name === _)
137
138
      ).collect({ case Some(criteria) => criteria }).reduceLeftOption(_ && _).getOrElse(true: Rep[Boolean])
    }
139
    db.run(q.result)
140
141
  }

142
  /** Creates a new module, even if it already exist. This may give a database exeption */
143
  def forceCreateModule(name: String, runId: Int, pipelineId: Int): Future[Int] = {
144
145
146
147
    val id = Await.result(db.run(modules.size.result), Duration.Inf)
    db.run(modules.forceInsert(Module(id, name, runId, pipelineId))).map(_ => id)
  }

148
  /** Creates a new module if it does not yet exist */
149
150
  def createModule(name: String, runId: Int, pipelineId: Int): Future[Int] = {
    getModules(name = Some(name), runId = Some(runId), pipelineId = Some(pipelineId))
Peter van 't Hof's avatar
Peter van 't Hof committed
151
      .flatMap {
152
        m =>
Peter van 't Hof's avatar
Peter van 't Hof committed
153
154
          if (m.isEmpty) forceCreateModule(name, runId, pipelineId)
          else Future(m.head.id)
155
156
157
      }
  }

158
159
  /** Return all module with given criteria */
  def getModules(moduleId: Option[Int] = None, name: Option[String] = None, runId: Option[Int] = None, pipelineId: Option[Int] = None): Future[Seq[Module]] = {
160
161
162
163
164
    val q = modules.filter { lib =>
      List(
        moduleId.map(lib.id === _),
        runId.map(lib.runId === _),
        pipelineId.map(lib.pipelineId === _),
165
        name.map(lib.name === _)
166
167
      ).collect({ case Some(criteria) => criteria }).reduceLeftOption(_ && _).getOrElse(true: Rep[Boolean])
    }
168
    db.run(q.result)
169
170
  }

171
  /** Create a new stat in the database, This method is need checking before */
172
  def createStat(runId: Int, pipelineId: Int, moduleId: Option[Int] = None,
173
                 sampleId: Option[Int] = None, libId: Option[Int] = None, content: String): Future[Int] = {
174
175
176
    db.run(stats.forceInsert(Stat(runId, pipelineId, moduleId, sampleId, libId, content)))
  }

177
  /** This create or update a stat */
Peter van 't Hof's avatar
Peter van 't Hof committed
178
  def createOrUpdateStat(runId: Int, pipelineId: Int, moduleId: Option[Int] = None,
179
                         sampleId: Option[Int] = None, libId: Option[Int] = None, content: String): Future[Int] = {
Peter van 't Hof's avatar
Peter van 't Hof committed
180
181
182
183
184
185
    val filter = statsFilter(Some(runId), Some(pipelineId), Some(moduleId), Some(sampleId), Some(libId))
    val r = Await.result(db.run(filter.size.result), Duration.Inf)
    if (r == 0) createStat(runId, pipelineId, moduleId, sampleId, libId, content)
    else db.run(filter.map(_.content).update(content))
  }

186
  /** Return a Query for [[Stats]] */
Peter van 't Hof's avatar
Peter van 't Hof committed
187
  private def statsFilter(runId: Option[Int] = None, pipelineId: Option[Int] = None, moduleId: Option[Option[Int]] = None,
Peter van 't Hof's avatar
Peter van 't Hof committed
188
                          sampleId: Option[Option[Int]] = None, libId: Option[Option[Int]] = None) = {
Peter van 't Hof's avatar
Peter van 't Hof committed
189
190
    var f: Query[Stats, Stats#TableElementType, Seq] = stats
    runId.foreach(r => f = f.filter(_.runId === r))
191
    pipelineId.foreach(r => f = f.filter(_.pipelineId === r))
192
193
194
    moduleId.foreach(r => f = if (r.isDefined) f.filter(_.moduleId === r.get) else f.filter(_.moduleId.isEmpty))
    sampleId.foreach(r => f = if (r.isDefined) f.filter(_.sampleId === r.get) else f.filter(_.sampleId.isEmpty))
    libId.foreach(r => f = if (r.isDefined) f.filter(_.libraryId === r.get) else f.filter(_.libraryId.isEmpty))
195
    f
Peter van 't Hof's avatar
Peter van 't Hof committed
196
  }
197

198
  /** Return all stats that match given criteria */
Peter van 't Hof's avatar
Peter van 't Hof committed
199
  def getStats(runId: Option[Int] = None, pipelineId: Option[Int] = None, moduleId: Option[Option[Int]] = None,
200
               sampleId: Option[Option[Int]] = None, libId: Option[Option[Int]] = None): Future[Seq[Stat]] = {
Peter van 't Hof's avatar
Peter van 't Hof committed
201
    db.run(statsFilter(runId, pipelineId, moduleId, sampleId, libId).result)
202
203
  }

204
  /** Get a single stat as [[Map[String, Any]] */
205
206
  def getStat(runId: Int, pipelineId: Int, moduleId: Option[Int] = None,
              sampleId: Option[Int] = None, libId: Option[Int] = None): Future[Option[Map[String, Any]]] = {
207
208
    getStats(Some(runId), Some(pipelineId), Some(moduleId), Some(sampleId), Some(libId))
      .map(_.headOption.map(x => ConfigUtils.jsonTextToMap(x.content)))
209
210
  }

211
  /** This method creates a new setting. This method need checking before */
212
  def createSetting(runId: Int, pipelineId: Int, moduleId: Option[Int] = None,
213
                    sampleId: Option[Int] = None, libId: Option[Int] = None, content: String): Future[Int] = {
214
215
216
    db.run(settings.forceInsert(Setting(runId, pipelineId, moduleId, sampleId, libId, content)))
  }

217
218
  /** This return a [[Query]] for [[Settings]] */
  private def settingsFilter(runId: Option[Int] = None, pipelineId: Option[Int] = None, moduleId: Option[Option[Int]] = None,
219
                             sampleId: Option[Option[Int]] = None, libId: Option[Option[Int]] = None) = {
Peter van 't Hof's avatar
Peter van 't Hof committed
220
221
222
    var f: Query[Settings, Settings#TableElementType, Seq] = settings
    runId.foreach(r => f = f.filter(_.runId === r))
    pipelineId.foreach(r => f = f.filter(_.pipelineId === r))
223
224
225
    moduleId.foreach(r => f = if (r.isDefined) f.filter(_.moduleId === r.get) else f.filter(_.moduleId.isEmpty))
    sampleId.foreach(r => f = if (r.isDefined) f.filter(_.sampleId === r.get) else f.filter(_.sampleId.isEmpty))
    libId.foreach(r => f = if (r.isDefined) f.filter(_.libraryId === r.get) else f.filter(_.libraryId.isEmpty))
226
    f
227
  }
228

229
  /** This method creates or update a setting. */
230
  def createOrUpdateSetting(runId: Int, pipelineId: Int, moduleId: Option[Int] = None,
231
                            sampleId: Option[Int] = None, libId: Option[Int] = None, content: String): Future[Int] = {
232
233
234
    val filter = settingsFilter(Some(runId), Some(pipelineId), Some(moduleId), Some(sampleId), Some(libId))
    val r = Await.result(db.run(filter.size.result), Duration.Inf)
    if (r == 0) createSetting(runId, pipelineId, moduleId, sampleId, libId, content)
235
    else db.run(filter.update(Setting(runId, pipelineId, moduleId, sampleId, libId, content)))
236
237
  }

238
  /** Return all settings that match the given criteria */
239
  def getSettings(runId: Option[Int] = None, pipelineId: Option[Int] = None, moduleId: Option[Option[Int]] = None,
240
                  sampleId: Option[Option[Int]] = None, libId: Option[Option[Int]] = None): Future[Seq[Setting]] = {
241
    db.run(settingsFilter(runId, pipelineId, moduleId, sampleId, libId).result)
242
243
  }

244
  /** Return a specific setting as [[Map[String, Any]] */
245
  def getSetting(runId: Int, pipelineId: Int, moduleId: Option[Int] = None,
Peter van 't Hof's avatar
Peter van 't Hof committed
246
                 sampleId: Option[Int] = None, libId: Option[Int] = None): Future[Option[Map[String, Any]]] = {
247
248
    getSettings(Some(runId), Some(pipelineId), Some(moduleId), Some(sampleId), Some(libId))
      .map(_.headOption.map(x => ConfigUtils.jsonTextToMap(x.content)))
249
  }
Peter van 't Hof's avatar
Peter van 't Hof committed
250

251
252
  /** Return a [[Query]] for [[Files]] */
  private def filesFilter(runId: Option[Int] = None, pipelineId: Option[Int] = None, moduleId: Option[Option[Int]],
253
254
                          sampleId: Option[Option[Int]] = None, libId: Option[Option[Int]] = None,
                          key: Option[String] = None) = {
Peter van 't Hof's avatar
Peter van 't Hof committed
255
256
257
258
    var f: Query[Files, Files#TableElementType, Seq] = files
    runId.foreach(r => f = f.filter(_.runId === r))
    pipelineId.foreach(r => f = f.filter(_.pipelineId === r))
    key.foreach(r => f = f.filter(_.key === r))
259
260
261
    moduleId.foreach(r => f = if (r.isDefined) f.filter(_.moduleId === r.get) else f.filter(_.moduleId.isEmpty))
    sampleId.foreach(r => f = if (r.isDefined) f.filter(_.sampleId === r.get) else f.filter(_.sampleId.isEmpty))
    libId.foreach(r => f = if (r.isDefined) f.filter(_.libraryId === r.get) else f.filter(_.libraryId.isEmpty))
262
    f
Peter van 't Hof's avatar
Peter van 't Hof committed
263
264
  }

265
  /** Returns all [[Files]] with the given criteria */
Peter van 't Hof's avatar
Peter van 't Hof committed
266
267
  def getFiles(runId: Option[Int] = None, pipelineId: Option[Int] = None, moduleId: Option[Option[Int]],
               sampleId: Option[Option[Int]] = None, libId: Option[Option[Int]] = None,
268
               key: Option[String] = None): Future[Seq[Schema.File]] = {
Peter van 't Hof's avatar
Peter van 't Hof committed
269
270
271
    db.run(filesFilter(runId, pipelineId, moduleId, sampleId, libId, key).result)
  }

272
  /** Creates a file. This method will raise expection if it already exist */
Peter van 't Hof's avatar
Peter van 't Hof committed
273
274
  def createFile(runId: Int, pipelineId: Int, moduleId: Option[Int] = None,
                 sampleId: Option[Int] = None, libId: Option[Int] = None,
275
                 key: String, path: String, md5: String, link: Boolean = false, size: Long): Future[Int] = {
Peter van 't Hof's avatar
Peter van 't Hof committed
276
    db.run(files.forceInsert(Schema.File(runId, pipelineId, moduleId, sampleId, libId, key, path, md5, link, size)))
Peter van 't Hof's avatar
Peter van 't Hof committed
277
278
  }

279
  /** Create or update a File */
Peter van 't Hof's avatar
Peter van 't Hof committed
280
281
  def createOrUpdateFile(runId: Int, pipelineId: Int, moduleId: Option[Int] = None,
                         sampleId: Option[Int] = None, libId: Option[Int] = None,
282
                         key: String, path: String, md5: String, link: Boolean = false, size: Long): Future[Int] = {
Peter van 't Hof's avatar
Peter van 't Hof committed
283
284
285
    val filter = filesFilter(Some(runId), Some(pipelineId), Some(moduleId), Some(sampleId), Some(libId), Some(key))
    val r = Await.result(db.run(filter.size.result), Duration.Inf)
    if (r == 0) createFile(runId, pipelineId, moduleId, sampleId, libId, key, path, md5, link, size)
Peter van 't Hof's avatar
Peter van 't Hof committed
286
    else db.run(filter.update(Schema.File(runId, pipelineId, moduleId, sampleId, libId, key, path, md5, link, size)))
Peter van 't Hof's avatar
Peter van 't Hof committed
287
  }
Peter van 't Hof's avatar
Peter van 't Hof committed
288

289
290
  /** Returns a [[Query]] for [[Executables]] */
  private def executablesFilter(runId: Option[Int], toolName: Option[String]) = {
Peter van 't Hof's avatar
Peter van 't Hof committed
291
292
293
294
    var q: Query[Executables, Executables#TableElementType, Seq] = executables
    runId.foreach(r => q = q.filter(_.runId === r))
    toolName.foreach(r => q = q.filter(_.toolName === r))
    q
Peter van 't Hof's avatar
Peter van 't Hof committed
295
296
  }

297
298
  /** Return all executables with given criteria */
  def getExecutables(runId: Option[Int] = None, toolName: Option[String] = None): Future[Seq[Executable]] = {
Peter van 't Hof's avatar
Peter van 't Hof committed
299
300
301
    db.run(executablesFilter(runId, toolName).result)
  }

302
  /** Creates a exeutable. This method will raise expection if it already exist */
Peter van 't Hof's avatar
Peter van 't Hof committed
303
  def createExecutable(runId: Int, toolName: String, version: Option[String] = None, path: Option[String] = None,
304
                       javaVersion: Option[String] = None, exeMd5: Option[String] = None, javaMd5: Option[String] = None, jarPath: Option[String] = None): Future[Int] = {
Peter van 't Hof's avatar
Peter van 't Hof committed
305
306
307
    db.run(executables.forceInsert(Executable(runId, toolName, version, path, javaVersion, exeMd5, javaMd5, jarPath)))
  }

308
  /** Create or update a [[Executable]] */
Peter van 't Hof's avatar
Peter van 't Hof committed
309
  def createOrUpdateExecutable(runId: Int, toolName: String, version: Option[String] = None, path: Option[String] = None,
310
                               javaVersion: Option[String] = None, exeMd5: Option[String] = None, javaMd5: Option[String] = None, jarPath: Option[String] = None): Future[Int] = {
Peter van 't Hof's avatar
Peter van 't Hof committed
311
312
313
314
315
    val filter = executablesFilter(Some(runId), Some(toolName))
    val r = Await.result(db.run(filter.size.result), Duration.Inf)
    if (r == 0) createExecutable(runId, toolName, version, javaVersion, exeMd5, javaMd5)
    else db.run(filter.update(Executable(runId, toolName, version, path, javaVersion, exeMd5, javaMd5, jarPath)))
  }
316
}
317
318

object SummaryDb {
319
320
  private var summaryConnections = Map[File, SummaryDb]()

321
  /** This closing all summary that are still in the cache */
322
323
  def closeAll(): Unit = {
    summaryConnections.foreach(_._2.close())
324
    summaryConnections = summaryConnections.empty
325
326
  }

327
  /** This will open a sqlite database and create tables when the database did not exist yet */
328
  def openSqliteSummary(file: File): SummaryDb = {
329
    if (!summaryConnections.contains(file)) {
Peter van 't Hof's avatar
Peter van 't Hof committed
330
331
332
333
      val config: org.sqlite.SQLiteConfig = new org.sqlite.SQLiteConfig()
      config.enforceForeignKeys(true)
      config.setBusyTimeout("10000")
      config.setSynchronous(org.sqlite.SQLiteConfig.SynchronousMode.OFF)
334
      val exist = file.exists()
Peter van 't Hof's avatar
Peter van 't Hof committed
335
      val db = Database.forURL(s"jdbc:sqlite:${file.getAbsolutePath}", driver = "org.sqlite.JDBC", prop = config.toProperties, executor = AsyncExecutor("single_thread", 1, 1000))
336
337
338
339
340
      val s = new SummaryDb(db)
      if (!exist) s.createTables()
      summaryConnections += file -> s
    }
    summaryConnections(file)
341
342
  }
}