PipelineStatus.scala 12.5 KB
Newer Older
1
2
package nl.lumc.sasc.biopet.core

Peter van 't Hof's avatar
Peter van 't Hof committed
3
import java.io.{ File, PrintWriter }
4

Peter van 't Hof's avatar
Peter van 't Hof committed
5
import nl.lumc.sasc.biopet.utils.{ ConfigUtils, ToolCommand }
6

Peter van 't Hof's avatar
Peter van 't Hof committed
7
import scala.concurrent.{ Await, Future }
8
9
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
10
import scala.sys.process.Process
11
12

/**
Peter van 't Hof's avatar
Peter van 't Hof committed
13
14
 * Created by pjvan_thof on 7-12-16.
 */
15
object PipelineStatus extends ToolCommand {
Peter van 't Hof's avatar
Peter van 't Hof committed
16
17
18
19
  case class Args(pipelineDir: File = null,
                  depsFile: Option[File] = None,
                  outputDir: File = null,
                  follow: Boolean = false,
20
                  refreshTime: Int = 30,
21
                  complatePlots: Boolean = false,
22
                  compressPlots: Boolean = true) extends AbstractArgs
23
24
25
26
27
28
29
30
31
32
33

  class OptParser extends AbstractOptParser {
    opt[File]('d', "pipelineDir") required () maxOccurs 1 valueName "<file>" action { (x, c) =>
      c.copy(pipelineDir = x)
    } text "Output directory of the pipeline"
    opt[File]('o', "outputDir") required () maxOccurs 1 valueName "<file>" action { (x, c) =>
      c.copy(outputDir = x)
    } text "Output directory of this tool"
    opt[File]("depsFile") maxOccurs 1 valueName "<file>" action { (x, c) =>
      c.copy(depsFile = Some(x))
    } text "Location of deps file, not required"
Peter van 't Hof's avatar
Peter van 't Hof committed
34
35
36
37
38
39
    opt[Unit]('f', "follow") maxOccurs 1 action { (x, c) =>
      c.copy(follow = true)
    } text "This will follow a run"
    opt[Int]("refresh") maxOccurs 1 action { (x, c) =>
      c.copy(refreshTime = x)
    } text "Time to check again, default set on 30 seconds"
40
41
42
    opt[Unit]("completePlots") maxOccurs 1 action { (x, c) =>
      c.copy(complatePlots = true)
    } text "Add complete plots, this is disabled because of performance. " +
Sander Bollen's avatar
Sander Bollen committed
43
      "Complete plots does show each job separated, while compressed plots collapse all jobs of the same type together."
44
45
    opt[Unit]("skipCompressPlots") maxOccurs 1 action { (x, c) =>
      c.copy(compressPlots = false)
Sander Bollen's avatar
Sander Bollen committed
46
    } text "Disable compressed plots. By default compressed plots are enabled."
47
48
49
50
51
52
53
54
  }

  def main(args: Array[String]): Unit = {
    logger.info("Start")

    val argsParser = new OptParser
    val cmdArgs: Args = argsParser.parse(args, Args()) getOrElse (throw new IllegalArgumentException)

Peter van 't Hof's avatar
Peter van 't Hof committed
55
56
    val depsFile = cmdArgs.depsFile.getOrElse(getDepsFileFromDir(cmdArgs.pipelineDir))
    val deps = readDepsFile(depsFile)
57
    writePipelineStatus(deps, cmdArgs.outputDir, follow = cmdArgs.follow, refreshTime = cmdArgs.refreshTime,
58
      plots = cmdArgs.complatePlots, compressPlots = cmdArgs.compressPlots)
59
60
61
    logger.info("Done")
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
62
63
64
65
66
67
68
69
70
71
72
73
74
  def getDepsFileFromDir(pipelineDir: File): File = {
    require(pipelineDir.exists(), s"pipelineDir does not exist: $pipelineDir")
    val logDir = new File(pipelineDir, ".log")
    require(logDir.exists(), s"No .log dir found in pipelineDir")
    val runLogDir = logDir.list().sorted.map(new File(logDir, _)).filter(_.isDirectory).last
    val graphDir = new File(runLogDir, "graph")
    require(graphDir.exists(), s"Graph dir is not found: $graphDir")
    new File(graphDir, "deps.json")
  }

  case class Deps(jobs: Map[String, Job], files: Array[Map[String, Any]])

  def readDepsFile(depsFile: File) = {
75
76
77
    val deps = ConfigUtils.fileToConfigMap(depsFile)

    val jobs = ConfigUtils.any2map(deps("jobs")).map(x => x._1 -> new Job(x._1, ConfigUtils.any2map(x._2)))
Peter van 't Hof's avatar
Peter van 't Hof committed
78
    val files = ConfigUtils.any2list(deps("files")).map(x => ConfigUtils.any2map(x)).toArray
79

Peter van 't Hof's avatar
Peter van 't Hof committed
80
81
82
83
84
    Deps(jobs, files)
  }

  def writePipelineStatus(deps: Deps,
                          outputDir: File,
Peter van 't Hof's avatar
Peter van 't Hof committed
85
86
87
                          alreadyDone: Set[String] = Set(),
                          alreadyFailed: Set[String] = Set(),
                          follow: Boolean = false,
88
                          refreshTime: Int = 30,
89
90
                          plots: Boolean = false,
                          compressPlots: Boolean = true): Unit = {
Peter van 't Hof's avatar
Peter van 't Hof committed
91
92

    val jobDone = jobsDone(deps)
Peter van 't Hof's avatar
Peter van 't Hof committed
93
94
    val jobFailed = jobsFailed(deps, jobDone)
    val jobsStart = jobsReadyStart(deps, jobDone)
Peter van 't Hof's avatar
Peter van 't Hof committed
95

96
97
    var futures: List[Future[Any]] = Nil

Peter van 't Hof's avatar
Peter van 't Hof committed
98
    val jobsDeps = deps.jobs.map(x => x._1 -> (x._2.dependsOnJobs match {
99
100
101
102
103
104
      case l: List[_] => l.map(_.toString)
      case _          => throw new IllegalStateException("Value 'depends_on_jobs' is not a list")
    }))
    val jobsWriter = new PrintWriter(new File(outputDir, s"jobs.json"))
    jobsWriter.println(ConfigUtils.mapToJson(jobsDeps).spaces2)
    jobsWriter.close()
105
    futures :+= writeGraphvizFile(jobsDeps, new File(outputDir, s"jobs.gv"), jobDone, jobFailed, jobsStart, deps, plots, plots)
Peter van 't Hof's avatar
Peter van 't Hof committed
106
    futures :+= writeGraphvizFile(jobsDeps, new File(outputDir, s"compress.jobs.gv"), jobDone, jobFailed, jobsStart, deps, compressPlots, compressPlots, compress = true)
107

Peter van 't Hof's avatar
Peter van 't Hof committed
108
    val mainJobs = deps.jobs.filter(_._2.mainJob == true).map {
109
      case (name, job) =>
Peter van 't Hof's avatar
Peter van 't Hof committed
110
        name -> getMainDependencies(name, deps)
111
112
113
114
115
    }

    val mainJobsWriter = new PrintWriter(new File(outputDir, s"main_jobs.json"))
    mainJobsWriter.println(ConfigUtils.mapToJson(mainJobs).spaces2)
    mainJobsWriter.close()
116
    futures :+= writeGraphvizFile(mainJobs, new File(outputDir, s"main_jobs.gv"), jobDone, jobFailed, jobsStart, deps, plots, plots)
Peter van 't Hof's avatar
Peter van 't Hof committed
117
    futures :+= writeGraphvizFile(mainJobs, new File(outputDir, s"compress.main_jobs.gv"), jobDone, jobFailed, jobsStart, deps, compressPlots, compressPlots, compress = true)
Peter van 't Hof's avatar
Peter van 't Hof committed
118
119
120
121
122
123

    val totalJobs = deps.jobs.size
    val totalStart = jobsStart.size
    val totalDone = jobDone.size
    val totalFailed = jobFailed.size
    val totalPending = totalJobs - jobsStart.size - jobDone.size - jobFailed.size
124

Peter van 't Hof's avatar
Peter van 't Hof committed
125
126
    futures.foreach(x => Await.ready(x, Duration.Inf))

Peter van 't Hof's avatar
Peter van 't Hof committed
127
    logger.info(s"Total job: ${totalJobs}, Pending: ${totalPending}, Ready to run / running: ${totalStart}, Done: ${totalDone}, Failed ${totalFailed}")
128

Peter van 't Hof's avatar
Peter van 't Hof committed
129
130
131
132
    if (follow) {
      Thread.sleep(refreshTime * 1000)
      writePipelineStatus(deps, outputDir, jobDone, jobFailed, follow)
    }
133
134
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
135
136
  def getMainDependencies(jobName: String, deps: Deps): List[String] = {
    val job = deps.jobs(jobName)
137
138
139
140
    val dependencies = job.dependsOnJobs match {
      case l: List[_] => l.map(_.toString)
    }
    dependencies.flatMap { dep =>
Peter van 't Hof's avatar
Peter van 't Hof committed
141
      deps.jobs(dep).mainJob match {
142
        case true  => List(dep)
Peter van 't Hof's avatar
Peter van 't Hof committed
143
        case false => getMainDependencies(dep, deps)
144
145
146
147
148
149
150
      }
    }.distinct
  }

  val numberRegex = """(.*)_(\d*)$""".r
  def compressOnType(jobs: Map[String, List[String]]): Map[String, List[String]] = {
    val set = for ((job, deps) <- jobs.toSet; dep <- deps) yield {
151
      (compressedName(job)._1, compressedName(dep)._1)
152
    }
153
    // This will collapse a Set[(String, String)] to a Map[String, List[String]]
154
155
156
    set.groupBy(_._1).map(x => x._1 -> x._2.map(_._2).toList)
  }

157
  def compressedName(jobName: String) = jobName match {
Peter van 't Hof's avatar
Peter van 't Hof committed
158
    case numberRegex(name, number) => (name, number.toInt)
159
160
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
161
162
  def writeGraphvizFile(jobsDeps: Map[String, List[String]],
                        outputFile: File,
Peter van 't Hof's avatar
Peter van 't Hof committed
163
164
                        jobDone: Set[String],
                        jobFailed: Set[String],
Peter van 't Hof's avatar
Peter van 't Hof committed
165
                        jobsStart: Set[String],
166
                        deps: Deps,
Peter van 't Hof's avatar
Peter van 't Hof committed
167
168
                        png: Boolean = true, svg: Boolean = true, compress: Boolean = false): Future[Unit] = Future {
    val graph = if (compress) compressOnType(jobsDeps) else jobsDeps
169
170
    val writer = new PrintWriter(outputFile)
    writer.println("digraph graphname {")
Peter van 't Hof's avatar
Peter van 't Hof committed
171

Peter van 't Hof's avatar
Peter van 't Hof committed
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
    graph.foreach {
      case (job, jobDeps) =>
        // Writing color of node
        val compressTotal = if (compress) Some(deps.jobs.keys.filter(compressedName(_)._1 == job)) else None
        val compressDone = if (compress) Some(jobDone.filter(compressedName(_)._1 == job)) else None
        val compressFailed = if (compress) Some(jobFailed.filter(compressedName(_)._1 == job)) else None
        val compressStart = if (compress) Some(jobsStart.filter(compressedName(_)._1 == job)) else None
        val compressIntermediate = if (compress) Some(deps.jobs.filter(x => compressedName(x._1)._1 == job).forall(_._2.intermediate)) else None

        if (compress) {
          val pend = compressTotal.get.size - compressFailed.get.filterNot(compressStart.get.contains(_)).size - compressStart.get.size - compressDone.get.size
          writer.println(s"""  $job [label = "$job
        |Total: ${compressTotal.get.size}
        |Fail: ${compressFailed.get.size}
        |Pend:${pend}
        |Start/run: ${compressStart.get.filterNot(compressFailed.get.contains(_)).size}
        |Done: ${compressDone.get.size}"]""".stripMargin)
        }

        if (jobDone.contains(job) || compress && compressTotal == compressDone) writer.println(s"  $job [color = green]")
        else if (jobFailed.contains(job) || compress && compressFailed.get.nonEmpty) writer.println(s"  $job [color = red]")
        else if (jobsStart.contains(job) || compress && compressTotal == compressStart) writer.println(s"  $job [color = orange]")

        // Dashed lined for intermediate jobs
        if ((deps.jobs.contains(job) && deps.jobs(job).intermediate) || (compressIntermediate == Some(true)))
          writer.println(s"  $job [style = dashed]")

        // Writing Node deps
        jobDeps.foreach { dep =>
          if (compress) {
            val depsNames = deps.jobs.filter(x => compressedName(x._1)._1 == dep)
              .filter(_._2.outputUsedByJobs.exists(x => compressedName(x)._1 == job))
              .map(x => x._1 -> x._2.outputUsedByJobs.filter(x => compressedName(x)._1 == job))
            val total = depsNames.size
            val done = depsNames.map(x => x._2.exists(y => jobDone.contains(x._1))).count(_ == true).toFloat / total
            val fail = depsNames.map(x => x._2.exists(y => jobFailed.contains(x._1))).count(_ == true).toFloat / total
            val start = (depsNames.map(x => x._2.exists(y => jobsStart.contains(x._1))).count(_ == true).toFloat / total) - fail
            if (total > 0) writer.println(s"""  $dep -> $job [color="red;%f:orange;%f:green;%f:black;%f"];"""
              .format(fail, start, done, 1.0f - done - fail - start))
            else writer.println(s"  $dep -> $job;")
          } else writer.println(s"  $dep -> $job;")
        }
Peter van 't Hof's avatar
Peter van 't Hof committed
214
    }
215
216
    writer.println("}")
    writer.close()
217

Peter van 't Hof's avatar
Peter van 't Hof committed
218
    writeGvToPlot(outputFile, png, svg)
219
220
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
221
222
223
  def writeGvToPlot(input: File, png: Boolean = true, svg: Boolean = true): Unit = {
    if (png) Process(Seq("dot", "-Tpng", "-O", input.getAbsolutePath)).run().exitValue()
    if (svg) Process(Seq("dot", "-Tsvg", "-O", input.getAbsolutePath)).run().exitValue()
224
225
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
226
227
  def jobsReadyStart(deps: Deps, jobsDone: Set[String]): Set[String] = {
    deps.jobs.filterNot(x => jobsDone.contains(x._1)).filter(_._2.dependsOnJobs.forall(jobsDone)).keySet
Peter van 't Hof's avatar
Peter van 't Hof committed
228
229
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
230
231
232
  def jobsDone(deps: Deps, alreadyDone: Set[String] = Set()): Set[String] = {
    val f = deps.jobs.filterNot(x => alreadyDone.contains(x._1)).map(x => x._2 -> x._2.isDone)
    val dones = f.map(x => x._1 -> Await.result(x._2, Duration.Inf)).filter(_._2).map(_._1.name).toSet ++ alreadyDone
Peter van 't Hof's avatar
Peter van 't Hof committed
233
    val f2 = f.map(x => x._1 -> x._2.map { d =>
234
      if (d || !x._1.intermediate) d
Peter van 't Hof's avatar
Peter van 't Hof committed
235
      else upstreamJobDone(x._1, dones, deps)
236
    })
Peter van 't Hof's avatar
Peter van 't Hof committed
237
238
    val d = f2.map(x => x._1 -> Await.result(x._2, Duration.Inf))
    d.filter(_._2).map(_._1.name).toSet
239
240
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
241
  private def upstreamJobDone(job: Job, dones: Set[String], deps: Deps): Boolean = {
Peter van 't Hof's avatar
Peter van 't Hof committed
242
    job.outputUsedByJobs.map(deps.jobs)
Peter van 't Hof's avatar
Peter van 't Hof committed
243
      .exists(x => dones.contains(x.name) || (x.intermediate && upstreamJobDone(x, dones, deps)))
244
245
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
246
247
248
  def jobsFailed(deps: Deps, dones: Set[String], alreadyFailed: Set[String] = Set()): Set[String] = {
    val f = deps.jobs.filterNot(x => dones.contains(x._1))
      .filterNot(x => alreadyFailed.contains(x._1)).map(x => x._1 -> x._2.isFailed)
249
    f.map(x => x._1 -> Await.result(x._2, Duration.Inf)).filter(_._2).keySet ++ alreadyFailed
Peter van 't Hof's avatar
Peter van 't Hof committed
250
251
  }

252
253
254
255
256
257
258
259
260
261
262
  class Job(val name: String, map: Map[String, Any]) {

    def doneAtStart: Boolean = ConfigUtils.any2boolean(map("done_at_start"))

    def failFiles = ConfigUtils.any2fileList(map("fail_files"))
    def doneFiles = ConfigUtils.any2fileList(map("done_files"))
    def outputUsedByJobs = ConfigUtils.any2stringList(map("output_used_by_jobs"))
    def dependsOnJobs = ConfigUtils.any2stringList(map("depends_on_jobs"))
    def stdoutFile = new File(ConfigUtils.any2string(map("stdout_file")))

    def outputsFiles = ConfigUtils.any2fileList(map("outputs"))
263
    def inputFiles = ConfigUtils.any2fileList(map("inputs"))
264
265
266
267
268

    def mainJob = ConfigUtils.any2boolean(map("main_job"))
    def intermediate = ConfigUtils.any2boolean(map("intermediate"))

    def isDone: Future[Boolean] = Future { doneFiles.forall(_.exists()) }
269
    def isFailed: Future[Boolean] = Future { failFiles.exists(_.exists()) }
270
271
  }
}