PipelineStatus.scala 16.3 KB
Newer Older
Peter van 't Hof's avatar
Peter van 't Hof committed
1
/**
2
3
4
5
6
7
8
9
10
11
12
13
14
  * Biopet is built on top of GATK Queue for building bioinformatic
  * pipelines. It is mainly intended to support LUMC SHARK cluster which is running
  * SGE. But other types of HPC that are supported by GATK Queue (such as PBS)
  * should also be able to execute Biopet tools and pipelines.
  *
  * Copyright 2014 Sequencing Analysis Support Core - Leiden University Medical Center
  *
  * Contact us at: sasc@lumc.nl
  *
  * A dual licensing mode is applied. The source code within this project is freely available for non-commercial use under an AGPL
  * license; For commercial users or users who do not want to follow the AGPL
  * license, please contact us to obtain a separate license.
  */
15
package nl.lumc.sasc.biopet.core.pipelinestatus
16

17
import java.io.{File, PrintWriter}
18

19
20
21
22
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import nl.lumc.sasc.biopet.utils.pim._
import nl.lumc.sasc.biopet.utils.pim.{Job => PimJob}
23
import nl.lumc.sasc.biopet.utils.{ConfigUtils, ToolCommand}
24
import play.api.libs.ws.ahc.AhcWSClient
25
26
27

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
28
import scala.concurrent.{Await, Future}
29
import scala.sys.process.Process
30
31

/**
32
33
  * Created by pjvan_thof on 7-12-16.
  */
34
object PipelineStatus extends ToolCommand {
Peter van 't Hof's avatar
Peter van 't Hof committed
35
36
37
38
  case class Args(pipelineDir: File = null,
                  depsFile: Option[File] = None,
                  outputDir: File = null,
                  follow: Boolean = false,
39
                  refreshTime: Int = 30,
40
                  complatePlots: Boolean = false,
41
                  compressPlots: Boolean = true,
Peter van 't Hof's avatar
Peter van 't Hof committed
42
43
                  pimHost: Option[String] = None,
                  pimRunId: Option[String] = None)
44
      extends AbstractArgs
45
46
47
48
49
50
51
52
53
54
55

  class OptParser extends AbstractOptParser {
    opt[File]('d', "pipelineDir") required () maxOccurs 1 valueName "<file>" action { (x, c) =>
      c.copy(pipelineDir = x)
    } text "Output directory of the pipeline"
    opt[File]('o', "outputDir") required () maxOccurs 1 valueName "<file>" action { (x, c) =>
      c.copy(outputDir = x)
    } text "Output directory of this tool"
    opt[File]("depsFile") maxOccurs 1 valueName "<file>" action { (x, c) =>
      c.copy(depsFile = Some(x))
    } text "Location of deps file, not required"
56
    opt[Unit]('f', "follow") maxOccurs 1 action { (_, c) =>
Peter van 't Hof's avatar
Peter van 't Hof committed
57
58
59
60
61
      c.copy(follow = true)
    } text "This will follow a run"
    opt[Int]("refresh") maxOccurs 1 action { (x, c) =>
      c.copy(refreshTime = x)
    } text "Time to check again, default set on 30 seconds"
62
    opt[Unit]("completePlots") maxOccurs 1 action { (_, c) =>
63
64
      c.copy(complatePlots = true)
    } text "Add complete plots, this is disabled because of performance. " +
Sander Bollen's avatar
Sander Bollen committed
65
      "Complete plots does show each job separated, while compressed plots collapse all jobs of the same type together."
66
    opt[Unit]("skipCompressPlots") maxOccurs 1 action { (_, c) =>
67
      c.copy(compressPlots = false)
Sander Bollen's avatar
Sander Bollen committed
68
    } text "Disable compressed plots. By default compressed plots are enabled."
69
70
71
    opt[String]("pimHost") maxOccurs 1 action { (x, c) =>
      c.copy(pimHost = Some(x))
    } text "Pim host to publish status to"
Peter van 't Hof's avatar
Peter van 't Hof committed
72
73
74
    opt[String]("pimRunId") maxOccurs 1 action { (x, c) =>
      c.copy(pimRunId = Some(x))
    } text "Pim run Id to publish status to"
75
76
77
78
79
80
  }

  def main(args: Array[String]): Unit = {
    logger.info("Start")

    val argsParser = new OptParser
81
82
    val cmdArgs
      : Args = argsParser.parse(args, Args()) getOrElse (throw new IllegalArgumentException)
83

Peter van 't Hof's avatar
Peter van 't Hof committed
84
85
86
87
    implicit lazy val system = ActorSystem()
    implicit lazy val materializer = ActorMaterializer()
    implicit lazy val ws = AhcWSClient()

Peter van 't Hof's avatar
Peter van 't Hof committed
88
    val depsFile = cmdArgs.depsFile.getOrElse(getDepsFileFromDir(cmdArgs.pipelineDir))
89
    val deps = Deps.readDepsFile(depsFile)
Peter van 't Hof's avatar
Peter van 't Hof committed
90
91
92
93
94
95
96
97
98

    val pimRunId =
      if (cmdArgs.pimHost.isDefined) Some(cmdArgs.pimRunId.getOrElse {
        val graphDir = depsFile.getAbsoluteFile.getParentFile
        if (graphDir.getName == "graph") "biopet_" + graphDir.getParentFile.getName
        else "biopet_" + depsFile.getAbsolutePath.replaceAll("/", "_")
      })
      else None

Peter van 't Hof's avatar
Peter van 't Hof committed
99
    if (cmdArgs.pimHost.isDefined) {
pjvan_thof's avatar
pjvan_thof committed
100
      require(pimRunId.isDefined, "Could not auto-generate Pim run ID, please supply --pimRunId")
Peter van 't Hof's avatar
Peter van 't Hof committed
101
102
103
      logger.info(s"Status will be pushed to ${cmdArgs.pimHost.get}/run/${pimRunId.get}")
    }

Peter van 't Hof's avatar
Peter van 't Hof committed
104
105
106
107
108
109
110
111
112
113
    writePipelineStatus(
      deps,
      cmdArgs.outputDir,
      follow = cmdArgs.follow,
      refreshTime = cmdArgs.refreshTime,
      plots = cmdArgs.complatePlots,
      compressPlots = cmdArgs.compressPlots,
      pimHost = cmdArgs.pimHost,
      pimRunId = pimRunId
    )
114
    logger.info("Done")
Peter van 't Hof's avatar
Peter van 't Hof committed
115
116
117

    ws.close()
    system.terminate()
118
119
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
120
121
122
123
124
125
126
127
128
129
130
131
  def getDepsFileFromDir(pipelineDir: File): File = {
    require(pipelineDir.exists(), s"pipelineDir does not exist: $pipelineDir")
    val logDir = new File(pipelineDir, ".log")
    require(logDir.exists(), s"No .log dir found in pipelineDir")
    val runLogDir = logDir.list().sorted.map(new File(logDir, _)).filter(_.isDirectory).last
    val graphDir = new File(runLogDir, "graph")
    require(graphDir.exists(), s"Graph dir is not found: $graphDir")
    new File(graphDir, "deps.json")
  }

  def writePipelineStatus(deps: Deps,
                          outputDir: File,
Peter van 't Hof's avatar
Peter van 't Hof committed
132
133
134
                          alreadyDone: Set[String] = Set(),
                          alreadyFailed: Set[String] = Set(),
                          follow: Boolean = false,
135
                          refreshTime: Int = 30,
136
                          plots: Boolean = false,
137
                          compressPlots: Boolean = true,
Peter van 't Hof's avatar
Peter van 't Hof committed
138
                          pimHost: Option[String] = None,
Peter van 't Hof's avatar
Peter van 't Hof committed
139
                          pimRunId: Option[String] = None)(implicit ws: AhcWSClient): Unit = {
Peter van 't Hof's avatar
Peter van 't Hof committed
140
141

    val jobDone = jobsDone(deps)
Peter van 't Hof's avatar
Peter van 't Hof committed
142
143
    val jobFailed = jobsFailed(deps, jobDone)
    val jobsStart = jobsReadyStart(deps, jobDone)
Peter van 't Hof's avatar
Peter van 't Hof committed
144

145
146
    var futures: List[Future[Any]] = Nil

147
148
149
150
    val jobsDeps = deps.jobs.map(x =>
      x._1 -> (x._2.dependsOnJobs match {
        case l: List[_] => l.map(_.toString)
      }))
151
152
153
    val jobsWriter = new PrintWriter(new File(outputDir, s"jobs.json"))
    jobsWriter.println(ConfigUtils.mapToJson(jobsDeps).spaces2)
    jobsWriter.close()
154
    futures :+= writeGraphvizFile(new File(outputDir, s"jobs.gv"),
155
156
157
158
159
160
                                  jobDone,
                                  jobFailed,
                                  jobsStart,
                                  deps,
                                  plots,
                                  plots)
161
    futures :+= writeGraphvizFile(new File(outputDir, s"compress.jobs.gv"),
162
163
164
165
166
167
168
                                  jobDone,
                                  jobFailed,
                                  jobsStart,
                                  deps,
                                  compressPlots,
                                  compressPlots,
                                  compress = true)
169

Peter van 't Hof's avatar
Peter van 't Hof committed
170
    val mainJobs = deps.jobs.filter(_._2.mainJob == true).map {
171
      case (name, _) => name -> deps.getMainDependencies(name)
172
173
174
175
176
    }

    val mainJobsWriter = new PrintWriter(new File(outputDir, s"main_jobs.json"))
    mainJobsWriter.println(ConfigUtils.mapToJson(mainJobs).spaces2)
    mainJobsWriter.close()
177
    futures :+= writeGraphvizFile(new File(outputDir, s"main_jobs.gv"),
178
179
180
181
182
                                  jobDone,
                                  jobFailed,
                                  jobsStart,
                                  deps,
                                  plots,
183
184
185
                                  plots,
                                  main = true)
    futures :+= writeGraphvizFile(new File(outputDir, s"compress.main_jobs.gv"),
186
187
188
189
190
191
                                  jobDone,
                                  jobFailed,
                                  jobsStart,
                                  deps,
                                  compressPlots,
                                  compressPlots,
192
193
                                  compress = true,
                                  main = true)
Peter van 't Hof's avatar
Peter van 't Hof committed
194
195
196
197
198
199

    val totalJobs = deps.jobs.size
    val totalStart = jobsStart.size
    val totalDone = jobDone.size
    val totalFailed = jobFailed.size
    val totalPending = totalJobs - jobsStart.size - jobDone.size - jobFailed.size
200

201
    futures.foreach(x => Await.result(x, Duration.Inf))
Peter van 't Hof's avatar
Peter van 't Hof committed
202

Peter van 't Hof's avatar
Peter van 't Hof committed
203
204
205
206
207
208
    val runId =
      if (pimHost.isDefined)
        Some(
          pimRunId.getOrElse(throw new IllegalStateException(
            "Pim requires a run id, please supply this with --pimRunId")))
      else None
209
    pimHost.foreach { host =>
210
211
      val links: List[Link] = deps
        .compressOnType()
Peter van 't Hof's avatar
Peter van 't Hof committed
212
213
214
215
216
217
        .flatMap(x => x._2.map(y => Link("link", y, "output", x._1, "input", "test")))
        .toList
      val run = Run(
        runId.get,
        Network("graph",
                Nil,
218
219
                deps
                  .compressOnType()
Peter van 't Hof's avatar
Peter van 't Hof committed
220
221
222
223
224
225
226
227
228
229
230
231
                  .map(
                    x =>
                      Node(x._1,
                           "root",
                           List(Port("input", "input")),
                           List(Port("output", "output")),
                           "test"))
                  .toList,
                links),
        "Biopet pipeline",
        "biopet"
      )
Peter van 't Hof's avatar
Peter van 't Hof committed
232

Peter van 't Hof's avatar
Peter van 't Hof committed
233
234
      val request = ws
        .url(s"$host/api/runs/")
235
236
237
        .withHeaders("Accept" -> "application/json", "Content-Type" -> "application/json")
        .put(run.toString)

Peter van 't Hof's avatar
Peter van 't Hof committed
238
      Await.result(request, Duration.Inf) match {
239
        case r if r.status == 200 => logger.debug(r)
Peter van 't Hof's avatar
Peter van 't Hof committed
240
241
        case r => logger.warn(r)
      }
242
243
244
245
246
247
248
249
250
251

      val futures = for (job <- deps.jobs) yield {
        val status = job._1 match {
          case n if jobsStart.contains(n) => JobStatus.running
          case n if jobFailed.contains(n) => JobStatus.failed
          case n if jobDone.contains(n) => JobStatus.success
          case _ => JobStatus.idle
        }
        ws.url(s"$host/api/runs/test/jobs/" + job._1)
          .withHeaders("Accept" -> "application/json", "Content-Type" -> "application/json")
Peter van 't Hof's avatar
Peter van 't Hof committed
252
          .put(PimJob(job._1, Job.compressedName(job._1)._1, runId.get, "none", status).toString)
253
      }
Peter van 't Hof's avatar
Peter van 't Hof committed
254
255
      if (logger.isDebugEnabled) futures.foreach(_.onComplete(logger.debug(_)))
      val results = Await.result(Future.sequence(futures), Duration.Inf)
Peter van 't Hof's avatar
Peter van 't Hof committed
256
257
258
259
      results.foreach {
        case s if s.status == 200 => logger.debug(s)
        case s => logger.warn(s)
      }
260
    }
261
    logger.info(
262
      s"Total job: $totalJobs, Pending: $totalPending, Ready to run / running: $totalStart, Done: $totalDone, Failed $totalFailed")
263

Peter van 't Hof's avatar
Peter van 't Hof committed
264
265
    if (follow) {
      Thread.sleep(refreshTime * 1000)
Peter van 't Hof's avatar
Peter van 't Hof committed
266
267
268
269
270
271
272
273
274
275
      writePipelineStatus(deps,
                          outputDir,
                          jobDone,
                          jobFailed,
                          follow,
                          refreshTime,
                          plots,
                          compressPlots,
                          pimHost,
                          runId)
276
    }
277
278
  }

279
  def writeGraphvizFile(outputFile: File,
Peter van 't Hof's avatar
Peter van 't Hof committed
280
281
                        jobDone: Set[String],
                        jobFailed: Set[String],
Peter van 't Hof's avatar
Peter van 't Hof committed
282
                        jobsStart: Set[String],
283
                        deps: Deps,
284
285
                        png: Boolean = true,
                        svg: Boolean = true,
286
287
288
289
290
291
292
293
                        compress: Boolean = false,
                        main: Boolean = false): Future[Unit] = Future {
    val graph =
      if (compress && main) deps.compressOnType(main = true)
      else if (compress) deps.compressOnType()
      else if (main) deps.getMainDeps
      else deps.jobs.map(x => x._1 -> x._2.dependsOnJobs)

294
295
    val writer = new PrintWriter(outputFile)
    writer.println("digraph graphname {")
Peter van 't Hof's avatar
Peter van 't Hof committed
296

Peter van 't Hof's avatar
Peter van 't Hof committed
297
298
299
    graph.foreach {
      case (job, jobDeps) =>
        // Writing color of node
300
        val compressTotal =
301
          if (compress) Some(deps.jobs.keys.filter(Job.compressedName(_)._1 == job)) else None
302
        val compressDone =
303
          if (compress) Some(jobDone.filter(Job.compressedName(_)._1 == job)) else None
304
        val compressFailed =
305
          if (compress) Some(jobFailed.filter(Job.compressedName(_)._1 == job)) else None
306
        val compressStart =
307
          if (compress) Some(jobsStart.filter(Job.compressedName(_)._1 == job)) else None
308
309
        val compressIntermediate =
          if (compress)
Peter van 't Hof's avatar
Peter van 't Hof committed
310
311
            Some(
              deps.jobs.filter(x => Job.compressedName(x._1)._1 == job).forall(_._2.intermediate))
312
          else None
Peter van 't Hof's avatar
Peter van 't Hof committed
313
314

        if (compress) {
Peter van 't Hof's avatar
Peter van 't Hof committed
315
316
          val pend = compressTotal.get.size - compressFailed.get
            .diff(compressStart.get)
317
            .size - compressStart.get.size - compressDone.get.size
Peter van 't Hof's avatar
Peter van 't Hof committed
318
319
320
          writer.println(s"""  $job [label = "$job
        |Total: ${compressTotal.get.size}
        |Fail: ${compressFailed.get.size}
321
322
        |Pend:$pend
        |Run: ${compressStart.get.diff(compressFailed.get).size}
Peter van 't Hof's avatar
Peter van 't Hof committed
323
324
325
        |Done: ${compressDone.get.size}"]""".stripMargin)
        }

326
327
328
329
330
331
        if (jobDone.contains(job) || compress && compressTotal == compressDone)
          writer.println(s"  $job [color = green]")
        else if (jobFailed.contains(job) || compress && compressFailed.get.nonEmpty)
          writer.println(s"  $job [color = red]")
        else if (jobsStart.contains(job) || compress && compressTotal == compressStart)
          writer.println(s"  $job [color = orange]")
Peter van 't Hof's avatar
Peter van 't Hof committed
332
333

        // Dashed lined for intermediate jobs
334
335
        if ((deps.jobs.contains(job) && deps
              .jobs(job)
336
              .intermediate) || compressIntermediate.contains(true))
Peter van 't Hof's avatar
Peter van 't Hof committed
337
338
339
340
341
          writer.println(s"  $job [style = dashed]")

        // Writing Node deps
        jobDeps.foreach { dep =>
          if (compress) {
342
            val depsNames = deps.jobs
343
344
345
              .filter(x => Job.compressedName(x._1)._1 == dep)
              .filter(_._2.outputUsedByJobs.exists(x => Job.compressedName(x)._1 == job))
              .map(x => x._1 -> x._2.outputUsedByJobs.filter(x => Job.compressedName(x)._1 == job))
Peter van 't Hof's avatar
Peter van 't Hof committed
346
            val total = depsNames.size
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
            val done = depsNames
              .map(x => x._2.exists(y => jobDone.contains(x._1)))
              .count(_ == true)
              .toFloat / total
            val fail = depsNames
              .map(x => x._2.exists(y => jobFailed.contains(x._1)))
              .count(_ == true)
              .toFloat / total
            val start = (depsNames
              .map(x => x._2.exists(y => jobsStart.contains(x._1)))
              .count(_ == true)
              .toFloat / total) - fail
            if (total > 0)
              writer.println(
                s"""  $dep -> $job [color="red;%f:orange;%f:green;%f:black;%f"];"""
                  .format(fail, start, done, 1.0f - done - fail - start))
Peter van 't Hof's avatar
Peter van 't Hof committed
363
364
365
            else writer.println(s"  $dep -> $job;")
          } else writer.println(s"  $dep -> $job;")
        }
Peter van 't Hof's avatar
Peter van 't Hof committed
366
    }
367
368
    writer.println("}")
    writer.close()
369

Peter van 't Hof's avatar
Peter van 't Hof committed
370
    writeGvToPlot(outputFile, png, svg)
371
372
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
373
374
375
  def writeGvToPlot(input: File, png: Boolean = true, svg: Boolean = true): Unit = {
    if (png) Process(Seq("dot", "-Tpng", "-O", input.getAbsolutePath)).run().exitValue()
    if (svg) Process(Seq("dot", "-Tsvg", "-O", input.getAbsolutePath)).run().exitValue()
376
377
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
378
  def jobsReadyStart(deps: Deps, jobsDone: Set[String]): Set[String] = {
379
380
381
382
    deps.jobs
      .filterNot(x => jobsDone.contains(x._1))
      .filter(_._2.dependsOnJobs.forall(jobsDone))
      .keySet
Peter van 't Hof's avatar
Peter van 't Hof committed
383
384
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
385
386
  def jobsDone(deps: Deps, alreadyDone: Set[String] = Set()): Set[String] = {
    val f = deps.jobs.filterNot(x => alreadyDone.contains(x._1)).map(x => x._2 -> x._2.isDone)
387
388
389
390
391
392
393
394
395
    val dones = f
      .map(x => x._1 -> Await.result(x._2, Duration.Inf))
      .filter(_._2)
      .map(_._1.name)
      .toSet ++ alreadyDone
    val f2 = f.map(x =>
      x._1 -> x._2.map { d =>
        if (d || !x._1.intermediate) d
        else upstreamJobDone(x._1, dones, deps)
396
    })
Peter van 't Hof's avatar
Peter van 't Hof committed
397
398
    val d = f2.map(x => x._1 -> Await.result(x._2, Duration.Inf))
    d.filter(_._2).map(_._1.name).toSet
399
400
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
401
  private def upstreamJobDone(job: Job, dones: Set[String], deps: Deps): Boolean = {
402
403
    job.outputUsedByJobs
      .map(deps.jobs)
Peter van 't Hof's avatar
Peter van 't Hof committed
404
      .exists(x => dones.contains(x.name) || (x.intermediate && upstreamJobDone(x, dones, deps)))
405
406
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
407
  def jobsFailed(deps: Deps, dones: Set[String], alreadyFailed: Set[String] = Set()): Set[String] = {
408
409
410
411
    val f = deps.jobs
      .filterNot(x => dones.contains(x._1))
      .filterNot(x => alreadyFailed.contains(x._1))
      .map(x => x._1 -> x._2.isFailed)
412
    f.map(x => x._1 -> Await.result(x._2, Duration.Inf)).filter(_._2).keySet ++ alreadyFailed
Peter van 't Hof's avatar
Peter van 't Hof committed
413
414
  }

415
}