SummaryDb.scala 15.2 KB
Newer Older
Peter van 't Hof's avatar
Peter van 't Hof committed
1
package nl.lumc.sasc.biopet.utils.summary.db
2

Peter van 't Hof's avatar
Peter van 't Hof committed
3
import java.io.{ Closeable, File }
4

Peter van 't Hof's avatar
Peter van 't Hof committed
5
import nl.lumc.sasc.biopet.utils.ConfigUtils
6
import nl.lumc.sasc.biopet.utils.summary.db.Schema._
Peter van 't Hof's avatar
Peter van 't Hof committed
7
import slick.driver.H2Driver.api._
8
9

import scala.concurrent.ExecutionContext.Implicits.global
Peter van 't Hof's avatar
Peter van 't Hof committed
10
import scala.concurrent.duration.Duration
Peter van 't Hof's avatar
Peter van 't Hof committed
11
import scala.concurrent.{ Await, Future }
12
13

/**
Peter van 't Hof's avatar
Peter van 't Hof committed
14
15
 * Created by pjvanthof on 05/02/2017.
 */
16
17
18
19
class SummaryDb(db: Database) extends Closeable {

  def close(): Unit = db.close()

20
21
22
23
24
  /** This method will create all table */
  def createTables(): Unit = {
    try {
      val setup = DBIO.seq(
        (runs.schema ++ samples.schema ++
25
26
          libraries.schema ++ pipelines.schema ++
          modules.schema ++ stats.schema ++ settings.schema ++
27
28
29
30
31
32
33
34
35
          files.schema ++ executables.schema).create
      )
      val setupFuture = db.run(setup)
      Await.result(setupFuture, Duration.Inf)
    }
  }

  def createRun(runName: String, outputDir: String): Future[Int] = {
    val id = Await.result(db.run(runs.size.result), Duration.Inf)
36
    db.run(runs.forceInsert(Run(id, runName, outputDir))).map(_ => id)
37
38
39
40
41
42
43
  }

  def getRuns(runId: Option[Int] = None, runName: Option[String] = None, outputDir: Option[String] = None) = {
    val q = runs.filter { run =>
      List(
        runId.map(run.id === _),
        runName.map(run.runName === _),
44
        outputDir.map(run.outputDir === _)
45
46
47
48
49
      ).collect({ case Some(criteria) => criteria }).reduceLeftOption(_ && _).getOrElse(true: Rep[Boolean])
    }
    db.run(q.result)
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
50
  def createSample(name: String, runId: Int, tags: Option[String] = None): Future[Int] = {
51
    val id = Await.result(db.run(samples.size.result), Duration.Inf)
52
    db.run(samples.forceInsert(Sample(id, name, runId, tags))).map(_ => id)
53
54
55
56
57
58
59
  }

  def getSamples(sampleId: Option[Int] = None, runId: Option[Int] = None, name: Option[String] = None) = {
    val q = samples.filter { sample =>
      List(
        sampleId.map(sample.id === _),
        runId.map(sample.runId === _),
60
        name.map(sample.name === _)
61
62
      ).collect({ case Some(criteria) => criteria }).reduceLeftOption(_ && _).getOrElse(true: Rep[Boolean])
    }
63
    db.run(q.result)
64
65
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
66
67
68
69
  def getSampleId(runId: Int, sampleName: String) = {
    getSamples(runId = Some(runId), name = Some(sampleName)).map(_.headOption.map(_.id))
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
70
71
72
73
74
75
76
  def getSampleTags(sampleId: Int): Future[Option[Map[String, Any]]] = {
    db.run(samples.filter(_.id === sampleId).map(_.tags).result)
      .map(_.headOption.flatten.map(ConfigUtils.jsonTextToMap))
  }

  def createLibrary(name: String, runId: Int, sampleId: Int, tags: Option[String] = None): Future[Int] = {
    val id = Await.result(db.run(libraries.size.result), Duration.Inf)
77
    db.run(libraries.forceInsert(Library(id, name, runId, sampleId, tags))).map(_ => id)
Peter van 't Hof's avatar
Peter van 't Hof committed
78
79
80
81
82
83
84
85
86
87
88
  }

  def getLibraries(libId: Option[Int] = None, name: Option[String] = None, runId: Option[Int] = None, sampleId: Option[Int] = None) = {
    val q = libraries.filter { lib =>
      List(
        libId.map(lib.id === _),
        sampleId.map(lib.sampleId === _),
        runId.map(lib.runId === _),
        name.map(lib.name === _) // not a condition as `criteriaRoast` evaluates to `None`
      ).collect({ case Some(criteria) => criteria }).reduceLeftOption(_ && _).getOrElse(true: Rep[Boolean])
    }
89
    db.run(q.result)
Peter van 't Hof's avatar
Peter van 't Hof committed
90
91
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
92
93
94
95
  def getLibraryId(runId: Int, sampleId: Int, name: String) = {
    getLibraries(runId = Some(runId), sampleId = Some(sampleId), name = Some(name)).map(_.headOption.map(_.id))
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
96
97
98
  def getLibraryTags(libId: Int): Future[Option[Map[String, Any]]] = {
    db.run(libraries.filter(_.id === libId).map(_.tags).result)
      .map(_.headOption.flatten.map(ConfigUtils.jsonTextToMap))
99
100
  }

101
  def forceCreatePipeline(name: String, runId: Int): Future[Int] = {
102
103
104
105
    val id = Await.result(db.run(pipelines.size.result), Duration.Inf)
    db.run(pipelines.forceInsert(Pipeline(id, name, runId))).map(_ => id)
  }

106
107
  def createPipeline(name: String, runId: Int): Future[Int] = {
    getPipelines(name = Some(name), runId = Some(runId))
Peter van 't Hof's avatar
Peter van 't Hof committed
108
109
110
111
      .flatMap {
        case m =>
          if (m.isEmpty) forceCreatePipeline(name, runId)
          else Future(m.head.id)
112
113
114
      }
  }

115
116
117
118
119
  def getPipelines(pipelineId: Option[Int] = None, name: Option[String] = None, runId: Option[Int] = None) = {
    val q = pipelines.filter { lib =>
      List(
        pipelineId.map(lib.id === _),
        runId.map(lib.runId === _),
120
        name.map(lib.name === _)
121
122
      ).collect({ case Some(criteria) => criteria }).reduceLeftOption(_ && _).getOrElse(true: Rep[Boolean])
    }
123
    db.run(q.result)
124
125
  }

126
  def forceCreateModule(name: String, runId: Int, pipelineId: Int): Future[Int] = {
127
128
129
130
    val id = Await.result(db.run(modules.size.result), Duration.Inf)
    db.run(modules.forceInsert(Module(id, name, runId, pipelineId))).map(_ => id)
  }

131
132
  def createModule(name: String, runId: Int, pipelineId: Int): Future[Int] = {
    getModules(name = Some(name), runId = Some(runId), pipelineId = Some(pipelineId))
Peter van 't Hof's avatar
Peter van 't Hof committed
133
134
135
136
      .flatMap {
        case m =>
          if (m.isEmpty) forceCreateModule(name, runId, pipelineId)
          else Future(m.head.id)
137
138
139
      }
  }

140
141
142
143
144
145
  def getModules(moduleId: Option[Int] = None, name: Option[String] = None, runId: Option[Int] = None, pipelineId: Option[Int] = None) = {
    val q = modules.filter { lib =>
      List(
        moduleId.map(lib.id === _),
        runId.map(lib.runId === _),
        pipelineId.map(lib.pipelineId === _),
146
        name.map(lib.name === _)
147
148
      ).collect({ case Some(criteria) => criteria }).reduceLeftOption(_ && _).getOrElse(true: Rep[Boolean])
    }
149
    db.run(q.result)
150
151
  }

152
153
154
155
156
  def createStat(runId: Int, pipelineId: Int, moduleId: Option[Int] = None,
                 sampleId: Option[Int] = None, libId: Option[Int] = None, content: String) = {
    db.run(stats.forceInsert(Stat(runId, pipelineId, moduleId, sampleId, libId, content)))
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
157
158
159
160
161
162
163
164
165
  def createOrUpdateStat(runId: Int, pipelineId: Int, moduleId: Option[Int] = None,
                         sampleId: Option[Int] = None, libId: Option[Int] = None, content: String) = {
    val filter = statsFilter(Some(runId), Some(pipelineId), Some(moduleId), Some(sampleId), Some(libId))
    val r = Await.result(db.run(filter.size.result), Duration.Inf)
    if (r == 0) createStat(runId, pipelineId, moduleId, sampleId, libId, content)
    else db.run(filter.map(_.content).update(content))
  }

  private def statsFilter(runId: Option[Int] = None, pipelineId: Option[Int] = None, moduleId: Option[Option[Int]] = None,
Peter van 't Hof's avatar
Peter van 't Hof committed
166
                          sampleId: Option[Option[Int]] = None, libId: Option[Option[Int]] = None) = {
Peter van 't Hof's avatar
Peter van 't Hof committed
167
168
    var f: Query[Stats, Stats#TableElementType, Seq] = stats
    runId.foreach(r => f = f.filter(_.runId === r))
169
170
171
172
173
    pipelineId.foreach(r => f = f.filter(_.pipelineId === r))
    moduleId.foreach(r => f = (if (r.isDefined) f.filter(_.moduleId === r.get) else f.filter(_.moduleId.isEmpty)))
    sampleId.foreach(r => f = (if (r.isDefined) f.filter(_.sampleId === r.get) else f.filter(_.sampleId.isEmpty)))
    libId.foreach(r => f = (if (r.isDefined) f.filter(_.libraryId === r.get) else f.filter(_.libraryId.isEmpty)))
    f
Peter van 't Hof's avatar
Peter van 't Hof committed
174
  }
175

Peter van 't Hof's avatar
Peter van 't Hof committed
176
177
178
  def getStats(runId: Option[Int] = None, pipelineId: Option[Int] = None, moduleId: Option[Option[Int]] = None,
               sampleId: Option[Option[Int]] = None, libId: Option[Option[Int]] = None) = {
    db.run(statsFilter(runId, pipelineId, moduleId, sampleId, libId).result)
179
180
181
182
  }

  def getStat(runId: Int, pipelineId: Int, moduleId: Option[Int] = None,
              sampleId: Option[Int] = None, libId: Option[Int] = None): Future[Option[Map[String, Any]]] = {
Peter van 't Hof's avatar
Peter van 't Hof committed
183
184
185
186
187
188
189
    val l: List[Query[Stats, Stats#TableElementType, Seq] => Query[Stats, Stats#TableElementType, Seq]] = List(
      y => y.filter(_.runId === runId),
      y => y.filter(_.pipelineId === pipelineId),
      y => (if (moduleId.isDefined) y.filter(_.moduleId === moduleId) else y.filter(_.moduleId.isEmpty)),
      y => (if (sampleId.isDefined) y.filter(_.sampleId === sampleId) else y.filter(_.sampleId.isEmpty)),
      y => (if (libId.isDefined) y.filter(_.libraryId === libId) else y.filter(_.libraryId.isEmpty))
    )
Peter van 't Hof's avatar
Peter van 't Hof committed
190
    val q = l.foldLeft(stats.subquery)((a, b) => b(a))
191
192
193
194
195

    db.run(q.map(_.content).result).map(_.headOption.map(ConfigUtils.jsonTextToMap))
  }

  def createSetting(runId: Int, pipelineId: Int, moduleId: Option[Int] = None,
Peter van 't Hof's avatar
Peter van 't Hof committed
196
                    sampleId: Option[Int] = None, libId: Option[Int] = None, content: String) = {
197
198
199
    db.run(settings.forceInsert(Setting(runId, pipelineId, moduleId, sampleId, libId, content)))
  }

200
201
  def settingsFilter(runId: Option[Int] = None, pipelineId: Option[Int] = None, moduleId: Option[Option[Int]] = None,
                     sampleId: Option[Option[Int]] = None, libId: Option[Option[Int]] = None) = {
Peter van 't Hof's avatar
Peter van 't Hof committed
202
203
204
    var f: Query[Settings, Settings#TableElementType, Seq] = settings
    runId.foreach(r => f = f.filter(_.runId === r))
    pipelineId.foreach(r => f = f.filter(_.pipelineId === r))
205
206
207
208
    moduleId.foreach(r => f = (if (r.isDefined) f.filter(_.moduleId === r.get) else f.filter(_.moduleId.isEmpty)))
    sampleId.foreach(r => f = (if (r.isDefined) f.filter(_.sampleId === r.get) else f.filter(_.sampleId.isEmpty)))
    libId.foreach(r => f = (if (r.isDefined) f.filter(_.libraryId === r.get) else f.filter(_.libraryId.isEmpty)))
    f
209
  }
210

211
  def createOrUpdateSetting(runId: Int, pipelineId: Int, moduleId: Option[Int] = None,
Peter van 't Hof's avatar
Peter van 't Hof committed
212
                            sampleId: Option[Int] = None, libId: Option[Int] = None, content: String) = {
213
214
215
    val filter = settingsFilter(Some(runId), Some(pipelineId), Some(moduleId), Some(sampleId), Some(libId))
    val r = Await.result(db.run(filter.size.result), Duration.Inf)
    if (r == 0) createSetting(runId, pipelineId, moduleId, sampleId, libId, content)
216
    else db.run(filter.update(Setting(runId, pipelineId, moduleId, sampleId, libId, content)))
217
218
219
220
221
  }

  def getSettings(runId: Option[Int] = None, pipelineId: Option[Int] = None, moduleId: Option[Option[Int]] = None,
                  sampleId: Option[Option[Int]] = None, libId: Option[Option[Int]] = None) = {
    db.run(settingsFilter(runId, pipelineId, moduleId, sampleId, libId).result)
222
223
224
  }

  def getSetting(runId: Int, pipelineId: Int, moduleId: Option[Int] = None,
Peter van 't Hof's avatar
Peter van 't Hof committed
225
                 sampleId: Option[Int] = None, libId: Option[Int] = None): Future[Option[Map[String, Any]]] = {
Peter van 't Hof's avatar
Peter van 't Hof committed
226
227
228
229
230
231
232
233
    val l: List[Query[Settings, Settings#TableElementType, Seq] => Query[Settings, Settings#TableElementType, Seq]] = List(
      _.filter(_.runId === runId),
      _.filter(_.pipelineId === pipelineId),
      y => (if (moduleId.isDefined) y.filter(_.moduleId === moduleId) else y.filter(_.moduleId.isEmpty)),
      y => (if (sampleId.isDefined) y.filter(_.sampleId === sampleId) else y.filter(_.sampleId.isEmpty)),
      y => (if (libId.isDefined) y.filter(_.libraryId === libId) else y.filter(_.libraryId.isEmpty))
    )

Peter van 't Hof's avatar
Peter van 't Hof committed
234
    val q = l.foldLeft(settings.subquery)((a, b) => b(a))
235
236
    db.run(q.map(_.content).result).map(_.headOption.map(ConfigUtils.jsonTextToMap))
  }
Peter van 't Hof's avatar
Peter van 't Hof committed
237
238
239
240

  def filesFilter(runId: Option[Int] = None, pipelineId: Option[Int] = None, moduleId: Option[Option[Int]],
                  sampleId: Option[Option[Int]] = None, libId: Option[Option[Int]] = None,
                  key: Option[String] = None) = {
Peter van 't Hof's avatar
Peter van 't Hof committed
241
242
243
244
    var f: Query[Files, Files#TableElementType, Seq] = files
    runId.foreach(r => f = f.filter(_.runId === r))
    pipelineId.foreach(r => f = f.filter(_.pipelineId === r))
    key.foreach(r => f = f.filter(_.key === r))
245
246
247
248
    moduleId.foreach(r => f = (if (r.isDefined) f.filter(_.moduleId === r.get) else f.filter(_.moduleId.isEmpty)))
    sampleId.foreach(r => f = (if (r.isDefined) f.filter(_.sampleId === r.get) else f.filter(_.sampleId.isEmpty)))
    libId.foreach(r => f = (if (r.isDefined) f.filter(_.libraryId === r.get) else f.filter(_.libraryId.isEmpty)))
    f
Peter van 't Hof's avatar
Peter van 't Hof committed
249
250
251
252
253
254
255
256
257
258
  }

  def getFiles(runId: Option[Int] = None, pipelineId: Option[Int] = None, moduleId: Option[Option[Int]],
               sampleId: Option[Option[Int]] = None, libId: Option[Option[Int]] = None,
               key: Option[String] = None) = {
    db.run(filesFilter(runId, pipelineId, moduleId, sampleId, libId, key).result)
  }

  def createFile(runId: Int, pipelineId: Int, moduleId: Option[Int] = None,
                 sampleId: Option[Int] = None, libId: Option[Int] = None,
Peter van 't Hof's avatar
Peter van 't Hof committed
259
                 key: String, path: String, md5: String, link: Boolean = false, size: Long) = {
Peter van 't Hof's avatar
Peter van 't Hof committed
260
    db.run(files.forceInsert(Schema.File(runId, pipelineId, moduleId, sampleId, libId, key, path, md5, link, size)))
Peter van 't Hof's avatar
Peter van 't Hof committed
261
262
263
264
  }

  def createOrUpdateFile(runId: Int, pipelineId: Int, moduleId: Option[Int] = None,
                         sampleId: Option[Int] = None, libId: Option[Int] = None,
Peter van 't Hof's avatar
Peter van 't Hof committed
265
                         key: String, path: String, md5: String, link: Boolean = false, size: Long) = {
Peter van 't Hof's avatar
Peter van 't Hof committed
266
267
268
    val filter = filesFilter(Some(runId), Some(pipelineId), Some(moduleId), Some(sampleId), Some(libId), Some(key))
    val r = Await.result(db.run(filter.size.result), Duration.Inf)
    if (r == 0) createFile(runId, pipelineId, moduleId, sampleId, libId, key, path, md5, link, size)
Peter van 't Hof's avatar
Peter van 't Hof committed
269
    else db.run(filter.update(Schema.File(runId, pipelineId, moduleId, sampleId, libId, key, path, md5, link, size)))
Peter van 't Hof's avatar
Peter van 't Hof committed
270
  }
Peter van 't Hof's avatar
Peter van 't Hof committed
271
272

  def executablesFilter(runId: Option[Int], toolName: Option[String]) = {
Peter van 't Hof's avatar
Peter van 't Hof committed
273
274
275
276
    var q: Query[Executables, Executables#TableElementType, Seq] = executables
    runId.foreach(r => q = q.filter(_.runId === r))
    toolName.foreach(r => q = q.filter(_.toolName === r))
    q
Peter van 't Hof's avatar
Peter van 't Hof committed
277
278
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
279
  def getExecutables(runId: Option[Int] = None, toolName: Option[String] = None) = {
Peter van 't Hof's avatar
Peter van 't Hof committed
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
    db.run(executablesFilter(runId, toolName).result)
  }

  def createExecutable(runId: Int, toolName: String, version: Option[String] = None, path: Option[String] = None,
                       javaVersion: Option[String] = None, exeMd5: Option[String] = None, javaMd5: Option[String] = None, jarPath: Option[String] = None) = {
    db.run(executables.forceInsert(Executable(runId, toolName, version, path, javaVersion, exeMd5, javaMd5, jarPath)))
  }

  def createOrUpdateExecutable(runId: Int, toolName: String, version: Option[String] = None, path: Option[String] = None,
                               javaVersion: Option[String] = None, exeMd5: Option[String] = None, javaMd5: Option[String] = None, jarPath: Option[String] = None) = {
    val filter = executablesFilter(Some(runId), Some(toolName))
    val r = Await.result(db.run(filter.size.result), Duration.Inf)
    if (r == 0) createExecutable(runId, toolName, version, javaVersion, exeMd5, javaMd5)
    else db.run(filter.update(Executable(runId, toolName, version, path, javaVersion, exeMd5, javaMd5, jarPath)))
  }
295
}
296
297

object SummaryDb {
298
299
300
301
302
303
  private var summaryConnections = Map[File, SummaryDb]()

  def closeAll(): Unit = {
    summaryConnections.foreach(_._2.close())
  }

304
  def openSqliteSummary(file: File): SummaryDb = {
305
    if (!summaryConnections.contains(file)) {
Peter van 't Hof's avatar
Peter van 't Hof committed
306
307
308
309
      val config: org.sqlite.SQLiteConfig = new org.sqlite.SQLiteConfig()
      config.enforceForeignKeys(true)
      config.setBusyTimeout("10000")
      config.setSynchronous(org.sqlite.SQLiteConfig.SynchronousMode.OFF)
310
      val exist = file.exists()
Peter van 't Hof's avatar
Peter van 't Hof committed
311
      val db = Database.forURL(s"jdbc:sqlite:${file.getAbsolutePath}", driver = "org.sqlite.JDBC", prop = config.toProperties, executor = AsyncExecutor("single_thread", 1, 1000))
312
313
314
315
316
      val s = new SummaryDb(db)
      if (!exist) s.createTables()
      summaryConnections += file -> s
    }
    summaryConnections(file)
317
318
  }
}