PipelineStatus.scala 10.2 KB
Newer Older
1
2
package nl.lumc.sasc.biopet.core

Peter van 't Hof's avatar
Peter van 't Hof committed
3
import java.io.{ File, PrintWriter }
4

Peter van 't Hof's avatar
Peter van 't Hof committed
5
import nl.lumc.sasc.biopet.utils.{ ConfigUtils, ToolCommand }
6

Peter van 't Hof's avatar
Peter van 't Hof committed
7
import scala.concurrent.{ Await, Future }
8
9
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
10
import scala.sys.process.Process
11
12

/**
Peter van 't Hof's avatar
Peter van 't Hof committed
13
14
 * Created by pjvan_thof on 7-12-16.
 */
15
object PipelineStatus extends ToolCommand {
Peter van 't Hof's avatar
Peter van 't Hof committed
16
17
18
19
  case class Args(pipelineDir: File = null,
                  depsFile: Option[File] = None,
                  outputDir: File = null,
                  follow: Boolean = false,
20
                  refreshTime: Int = 30,
21
                  complatePlots: Boolean = false,
22
                  compressPlots: Boolean = true) extends AbstractArgs
23
24
25
26
27
28
29
30
31
32
33

  class OptParser extends AbstractOptParser {
    opt[File]('d', "pipelineDir") required () maxOccurs 1 valueName "<file>" action { (x, c) =>
      c.copy(pipelineDir = x)
    } text "Output directory of the pipeline"
    opt[File]('o', "outputDir") required () maxOccurs 1 valueName "<file>" action { (x, c) =>
      c.copy(outputDir = x)
    } text "Output directory of this tool"
    opt[File]("depsFile") maxOccurs 1 valueName "<file>" action { (x, c) =>
      c.copy(depsFile = Some(x))
    } text "Location of deps file, not required"
Peter van 't Hof's avatar
Peter van 't Hof committed
34
35
36
37
38
39
    opt[Unit]('f', "follow") maxOccurs 1 action { (x, c) =>
      c.copy(follow = true)
    } text "This will follow a run"
    opt[Int]("refresh") maxOccurs 1 action { (x, c) =>
      c.copy(refreshTime = x)
    } text "Time to check again, default set on 30 seconds"
40
41
42
    opt[Unit]("completePlots") maxOccurs 1 action { (x, c) =>
      c.copy(complatePlots = true)
    } text "Add complete plots, this is disabled because of performance. " +
Sander Bollen's avatar
Sander Bollen committed
43
      "Complete plots does show each job separated, while compressed plots collapse all jobs of the same type together."
44
45
    opt[Unit]("skipCompressPlots") maxOccurs 1 action { (x, c) =>
      c.copy(compressPlots = false)
Sander Bollen's avatar
Sander Bollen committed
46
    } text "Disable compressed plots. By default compressed plots are enabled."
47
48
49
50
51
52
53
54
  }

  def main(args: Array[String]): Unit = {
    logger.info("Start")

    val argsParser = new OptParser
    val cmdArgs: Args = argsParser.parse(args, Args()) getOrElse (throw new IllegalArgumentException)

Peter van 't Hof's avatar
Peter van 't Hof committed
55
56
    val depsFile = cmdArgs.depsFile.getOrElse(getDepsFileFromDir(cmdArgs.pipelineDir))
    val deps = readDepsFile(depsFile)
57
    writePipelineStatus(deps, cmdArgs.outputDir, follow = cmdArgs.follow, refreshTime = cmdArgs.refreshTime,
58
      plots = cmdArgs.complatePlots, compressPlots = cmdArgs.compressPlots)
59
60
61
    logger.info("Done")
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
62
63
64
65
66
67
68
69
70
71
72
73
74
  def getDepsFileFromDir(pipelineDir: File): File = {
    require(pipelineDir.exists(), s"pipelineDir does not exist: $pipelineDir")
    val logDir = new File(pipelineDir, ".log")
    require(logDir.exists(), s"No .log dir found in pipelineDir")
    val runLogDir = logDir.list().sorted.map(new File(logDir, _)).filter(_.isDirectory).last
    val graphDir = new File(runLogDir, "graph")
    require(graphDir.exists(), s"Graph dir is not found: $graphDir")
    new File(graphDir, "deps.json")
  }

  case class Deps(jobs: Map[String, Job], files: Array[Map[String, Any]])

  def readDepsFile(depsFile: File) = {
75
76
77
    val deps = ConfigUtils.fileToConfigMap(depsFile)

    val jobs = ConfigUtils.any2map(deps("jobs")).map(x => x._1 -> new Job(x._1, ConfigUtils.any2map(x._2)))
Peter van 't Hof's avatar
Peter van 't Hof committed
78
    val files = ConfigUtils.any2list(deps("files")).map(x => ConfigUtils.any2map(x)).toArray
79

Peter van 't Hof's avatar
Peter van 't Hof committed
80
81
82
83
84
    Deps(jobs, files)
  }

  def writePipelineStatus(deps: Deps,
                          outputDir: File,
Peter van 't Hof's avatar
Peter van 't Hof committed
85
86
87
                          alreadyDone: Set[String] = Set(),
                          alreadyFailed: Set[String] = Set(),
                          follow: Boolean = false,
88
                          refreshTime: Int = 30,
89
90
                          plots: Boolean = false,
                          compressPlots: Boolean = true): Unit = {
Peter van 't Hof's avatar
Peter van 't Hof committed
91
92

    val jobDone = jobsDone(deps)
Peter van 't Hof's avatar
Peter van 't Hof committed
93
94
    val jobFailed = jobsFailed(deps, jobDone)
    val jobsStart = jobsReadyStart(deps, jobDone)
Peter van 't Hof's avatar
Peter van 't Hof committed
95

96
97
    var futures: List[Future[Any]] = Nil

Peter van 't Hof's avatar
Peter van 't Hof committed
98
    val jobsDeps = deps.jobs.map(x => x._1 -> (x._2.dependsOnJobs match {
99
100
101
102
103
104
      case l: List[_] => l.map(_.toString)
      case _          => throw new IllegalStateException("Value 'depends_on_jobs' is not a list")
    }))
    val jobsWriter = new PrintWriter(new File(outputDir, s"jobs.json"))
    jobsWriter.println(ConfigUtils.mapToJson(jobsDeps).spaces2)
    jobsWriter.close()
105
    futures :+= writeGraphvizFile(jobsDeps, new File(outputDir, s"jobs.gv"), jobDone, jobFailed, jobsStart, deps, plots, plots)
Peter van 't Hof's avatar
Peter van 't Hof committed
106
    futures :+= writeGraphvizFile(jobsDeps, new File(outputDir, s"compress.jobs.gv"), jobDone, jobFailed, jobsStart, deps, compressPlots, compressPlots, compress = true)
107

Peter van 't Hof's avatar
Peter van 't Hof committed
108
    val mainJobs = deps.jobs.filter(_._2.mainJob == true).map {
109
      case (name, job) =>
Peter van 't Hof's avatar
Peter van 't Hof committed
110
        name -> getMainDependencies(name, deps)
111
112
113
114
115
    }

    val mainJobsWriter = new PrintWriter(new File(outputDir, s"main_jobs.json"))
    mainJobsWriter.println(ConfigUtils.mapToJson(mainJobs).spaces2)
    mainJobsWriter.close()
116
    futures :+= writeGraphvizFile(mainJobs, new File(outputDir, s"main_jobs.gv"), jobDone, jobFailed, jobsStart, deps, plots, plots)
Peter van 't Hof's avatar
Peter van 't Hof committed
117
    futures :+= writeGraphvizFile(mainJobs, new File(outputDir, s"compress.main_jobs.gv"), jobDone, jobFailed, jobsStart, deps, compressPlots, compressPlots, compress = true)
Peter van 't Hof's avatar
Peter van 't Hof committed
118
119
120
121
122
123

    val totalJobs = deps.jobs.size
    val totalStart = jobsStart.size
    val totalDone = jobDone.size
    val totalFailed = jobFailed.size
    val totalPending = totalJobs - jobsStart.size - jobDone.size - jobFailed.size
124

Peter van 't Hof's avatar
Peter van 't Hof committed
125
126
    futures.foreach(x => Await.ready(x, Duration.Inf))

Peter van 't Hof's avatar
Peter van 't Hof committed
127
    logger.info(s"Total job: ${totalJobs}, Pending: ${totalPending}, Ready to run / running: ${totalStart}, Done: ${totalDone}, Failed ${totalFailed}")
128

Peter van 't Hof's avatar
Peter van 't Hof committed
129
130
131
132
    if (follow) {
      Thread.sleep(refreshTime * 1000)
      writePipelineStatus(deps, outputDir, jobDone, jobFailed, follow)
    }
133
134
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
135
136
  def getMainDependencies(jobName: String, deps: Deps): List[String] = {
    val job = deps.jobs(jobName)
137
138
139
140
    val dependencies = job.dependsOnJobs match {
      case l: List[_] => l.map(_.toString)
    }
    dependencies.flatMap { dep =>
Peter van 't Hof's avatar
Peter van 't Hof committed
141
      deps.jobs(dep).mainJob match {
142
        case true  => List(dep)
Peter van 't Hof's avatar
Peter van 't Hof committed
143
        case false => getMainDependencies(dep, deps)
144
145
146
147
148
149
150
151
152
153
154
155
156
      }
    }.distinct
  }

  val numberRegex = """(.*)_(\d*)$""".r
  def compressOnType(jobs: Map[String, List[String]]): Map[String, List[String]] = {
    val set = for ((job, deps) <- jobs.toSet; dep <- deps) yield {
      job match {
        case numberRegex(name, number) => (name, dep match {
          case numberRegex(name, number) => name
        })
      }
    }
157
    // This will collapse a Set[(String, String)] to a Map[String, List[String]]
158
159
160
    set.groupBy(_._1).map(x => x._1 -> x._2.map(_._2).toList)
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
161
162
  def writeGraphvizFile(jobsDeps: Map[String, List[String]],
                        outputFile: File,
Peter van 't Hof's avatar
Peter van 't Hof committed
163
164
                        jobDone: Set[String],
                        jobFailed: Set[String],
Peter van 't Hof's avatar
Peter van 't Hof committed
165
                        jobsStart: Set[String],
166
                        deps: Deps,
Peter van 't Hof's avatar
Peter van 't Hof committed
167
168
                        png: Boolean = true, svg: Boolean = true, compress: Boolean = false): Future[Unit] = Future {
    val graph = if (compress) compressOnType(jobsDeps) else jobsDeps
169
170
    val writer = new PrintWriter(outputFile)
    writer.println("digraph graphname {")
Peter van 't Hof's avatar
Peter van 't Hof committed
171
172
173
174
175
176
177
178
179
180
181
182
183

    graph.foreach { case (job, jobDeps) =>
      // Writing color of node
      if (jobDone.contains(job)) writer.println(s"  $job [color = green]")
      else if (jobFailed.contains(job)) writer.println(s"  $job [color = red]")
      else if (jobsStart.contains(job)) writer.println(s"  $job [color = orange]")

      // Dashed lined for intermediate jobs
      if (deps.jobs.contains(job) && deps.jobs(job).intermediate) writer.println(s"  $job [style = dashed]")

      // Writing Node deps
      jobDeps.foreach(c => writer.println(s"  $c -> $job;"))
    }
184
185
    writer.println("}")
    writer.close()
186

Peter van 't Hof's avatar
Peter van 't Hof committed
187
    writeGvToPlot(outputFile, png, svg)
188
189
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
190
191
192
  def writeGvToPlot(input: File, png: Boolean = true, svg: Boolean = true): Unit = {
    if (png) Process(Seq("dot", "-Tpng", "-O", input.getAbsolutePath)).run().exitValue()
    if (svg) Process(Seq("dot", "-Tsvg", "-O", input.getAbsolutePath)).run().exitValue()
193
194
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
195
196
  def jobsReadyStart(deps: Deps, jobsDone: Set[String]): Set[String] = {
    deps.jobs.filterNot(x => jobsDone.contains(x._1)).filter(_._2.dependsOnJobs.forall(jobsDone)).keySet
Peter van 't Hof's avatar
Peter van 't Hof committed
197
198
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
199
200
201
  def jobsDone(deps: Deps, alreadyDone: Set[String] = Set()): Set[String] = {
    val f = deps.jobs.filterNot(x => alreadyDone.contains(x._1)).map(x => x._2 -> x._2.isDone)
    val dones = f.map(x => x._1 -> Await.result(x._2, Duration.Inf)).filter(_._2).map(_._1.name).toSet ++ alreadyDone
Peter van 't Hof's avatar
Peter van 't Hof committed
202
    val f2 = f.map(x => x._1 -> x._2.map { d =>
203
      if (d || !x._1.intermediate) d
Peter van 't Hof's avatar
Peter van 't Hof committed
204
      else upstreamJobDone(x._1, dones, deps)
205
    })
Peter van 't Hof's avatar
Peter van 't Hof committed
206
207
    val d = f2.map(x => x._1 -> Await.result(x._2, Duration.Inf))
    d.filter(_._2).map(_._1.name).toSet
208
209
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
210
  private def upstreamJobDone(job: Job, dones: Set[String], deps: Deps): Boolean = {
Peter van 't Hof's avatar
Peter van 't Hof committed
211
    job.outputUsedByJobs.map(deps.jobs)
Peter van 't Hof's avatar
Peter van 't Hof committed
212
      .exists(x => dones.contains(x.name) || (x.intermediate && upstreamJobDone(x, dones, deps)))
213
214
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
215
216
217
  def jobsFailed(deps: Deps, dones: Set[String], alreadyFailed: Set[String] = Set()): Set[String] = {
    val f = deps.jobs.filterNot(x => dones.contains(x._1))
      .filterNot(x => alreadyFailed.contains(x._1)).map(x => x._1 -> x._2.isFailed)
218
    f.map(x => x._1 -> Await.result(x._2, Duration.Inf)).filter(_._2).keySet ++ alreadyFailed
Peter van 't Hof's avatar
Peter van 't Hof committed
219
220
  }

221
222
223
224
225
226
227
228
229
230
231
  class Job(val name: String, map: Map[String, Any]) {

    def doneAtStart: Boolean = ConfigUtils.any2boolean(map("done_at_start"))

    def failFiles = ConfigUtils.any2fileList(map("fail_files"))
    def doneFiles = ConfigUtils.any2fileList(map("done_files"))
    def outputUsedByJobs = ConfigUtils.any2stringList(map("output_used_by_jobs"))
    def dependsOnJobs = ConfigUtils.any2stringList(map("depends_on_jobs"))
    def stdoutFile = new File(ConfigUtils.any2string(map("stdout_file")))

    def outputsFiles = ConfigUtils.any2fileList(map("outputs"))
232
    def inputFiles = ConfigUtils.any2fileList(map("inputs"))
233
234
235
236
237

    def mainJob = ConfigUtils.any2boolean(map("main_job"))
    def intermediate = ConfigUtils.any2boolean(map("intermediate"))

    def isDone: Future[Boolean] = Future { doneFiles.forall(_.exists()) }
238
    def isFailed: Future[Boolean] = Future { failFiles.exists(_.exists()) }
239
240
  }
}