Commit 67716789 authored by Peter van 't Hof's avatar Peter van 't Hof
Browse files

Added dot executable to generate plots

parent 3910e8fe
......@@ -7,6 +7,7 @@ import nl.lumc.sasc.biopet.utils.{ConfigUtils, ToolCommand}
import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.sys.process.Process
/**
* Created by pjvan_thof on 7-12-16.
......@@ -16,7 +17,8 @@ object PipelineStatus extends ToolCommand {
depsFile: Option[File] = None,
outputDir: File = null,
follow: Boolean = false,
refreshTime: Int = 30) extends AbstractArgs
refreshTime: Int = 30,
plots: Boolean = true) extends AbstractArgs
class OptParser extends AbstractOptParser {
opt[File]('d', "pipelineDir") required () maxOccurs 1 valueName "<file>" action { (x, c) =>
......@@ -34,7 +36,9 @@ object PipelineStatus extends ToolCommand {
opt[Int]("refresh") maxOccurs 1 action { (x, c) =>
c.copy(refreshTime = x)
} text "Time to check again, default set on 30 seconds"
opt[Unit]("skipPlots") maxOccurs 1 action { (x, c) =>
c.copy(plots = false)
} text "This will follow a run"
}
def main(args: Array[String]): Unit = {
......@@ -45,7 +49,7 @@ object PipelineStatus extends ToolCommand {
val depsFile = cmdArgs.depsFile.getOrElse(getDepsFileFromDir(cmdArgs.pipelineDir))
val deps = readDepsFile(depsFile)
writePipelineStatus(deps, cmdArgs.outputDir, follow = cmdArgs.follow, refreshTime = cmdArgs.refreshTime)
writePipelineStatus(deps, cmdArgs.outputDir, follow = cmdArgs.follow, refreshTime = cmdArgs.refreshTime, plots = cmdArgs.plots)
logger.info("Done")
}
......@@ -75,12 +79,15 @@ object PipelineStatus extends ToolCommand {
alreadyDone: Set[String] = Set(),
alreadyFailed: Set[String] = Set(),
follow: Boolean = false,
refreshTime: Int = 30): Unit = {
refreshTime: Int = 30,
plots: Boolean = true): Unit = {
val jobDone = jobsDone(deps)
val jobFailed = jobsFailed(deps, jobDone)
val jobsStart = jobsReadyStart(deps, jobDone)
var futures: List[Future[Any]] = Nil
val jobsDeps = deps.jobs.map(x => x._1 -> (x._2.dependsOnJobs match {
case l: List[_] => l.map(_.toString)
case _ => throw new IllegalStateException("Value 'depends_on_jobs' is not a list")
......@@ -88,8 +95,8 @@ object PipelineStatus extends ToolCommand {
val jobsWriter = new PrintWriter(new File(outputDir, s"jobs.json"))
jobsWriter.println(ConfigUtils.mapToJson(jobsDeps).spaces2)
jobsWriter.close()
writeGraphvizFile(jobsDeps, new File(outputDir, s"jobs.gv"), jobDone, jobFailed, jobsStart, deps)
writeGraphvizFile(compressOnType(jobsDeps), new File(outputDir, s"compress.jobs.gv"), jobDone, jobFailed, jobsStart, deps)
futures :+= writeGraphvizFile(jobsDeps, new File(outputDir, s"jobs.gv"), jobDone, jobFailed, jobsStart, deps, plots)
futures :+= writeGraphvizFile(compressOnType(jobsDeps), new File(outputDir, s"compress.jobs.gv"), jobDone, jobFailed, jobsStart, deps, plots)
val mainJobs = deps.jobs.filter(_._2.mainJob == true).map {
case (name, job) =>
......@@ -99,8 +106,8 @@ object PipelineStatus extends ToolCommand {
val mainJobsWriter = new PrintWriter(new File(outputDir, s"main_jobs.json"))
mainJobsWriter.println(ConfigUtils.mapToJson(mainJobs).spaces2)
mainJobsWriter.close()
writeGraphvizFile(mainJobs, new File(outputDir, s"main_jobs.gv"), jobDone, jobFailed, jobsStart, deps)
writeGraphvizFile(compressOnType(mainJobs), new File(outputDir, s"compress.main_jobs.gv"), jobDone, jobFailed, jobsStart, deps)
futures :+= writeGraphvizFile(mainJobs, new File(outputDir, s"main_jobs.gv"), jobDone, jobFailed, jobsStart, deps, plots)
futures :+= writeGraphvizFile(compressOnType(mainJobs), new File(outputDir, s"compress.main_jobs.gv"), jobDone, jobFailed, jobsStart, deps, plots)
val totalJobs = deps.jobs.size
val totalStart = jobsStart.size
......@@ -110,6 +117,7 @@ object PipelineStatus extends ToolCommand {
logger.info(s"Total job: ${totalJobs}, Pending: ${totalPending}, Ready to run / running: ${totalStart}, Done: ${totalDone}, Failed ${totalFailed}")
futures.foreach(x => Await.ready(x, Duration.Inf))
if (follow) {
Thread.sleep(refreshTime * 1000)
writePipelineStatus(deps, outputDir, jobDone, jobFailed, follow)
......@@ -146,7 +154,8 @@ object PipelineStatus extends ToolCommand {
jobDone: Set[String],
jobFailed: Set[String],
jobsStart: Set[String],
deps: Deps): Unit = {
deps: Deps,
plots: Boolean): Future[Unit] = Future {
val writer = new PrintWriter(outputFile)
writer.println("digraph graphname {")
jobDone
......@@ -166,6 +175,13 @@ object PipelineStatus extends ToolCommand {
jobsDeps.foreach { case (a, b) => b.foreach(c => writer.println(s" $c -> $a;")) }
writer.println("}")
writer.close()
writeGvToPlot(outputFile)
}
def writeGvToPlot(input: File): Unit = {
Process(Seq("dot", "-Tpng", "-O", input.getAbsolutePath)).run()
Process(Seq("dot", "-Tsvg", "-O", input.getAbsolutePath)).run()
}
def jobsReadyStart(deps: Deps, jobsDone: Set[String]): Set[String] = {
......@@ -191,8 +207,7 @@ object PipelineStatus extends ToolCommand {
def jobsFailed(deps: Deps, dones: Set[String], alreadyFailed: Set[String] = Set()): Set[String] = {
val f = deps.jobs.filterNot(x => dones.contains(x._1))
.filterNot(x => alreadyFailed.contains(x._1)).map(x => x._1 -> x._2.isFailed)
val bla = f.map(x => x._1 -> Await.result(x._2, Duration.Inf)).filter(_._2).keySet
bla ++ alreadyFailed
f.map(x => x._1 -> Await.result(x._2, Duration.Inf)).filter(_._2).keySet ++ alreadyFailed
}
class Job(val name: String, map: Map[String, Any]) {
......@@ -212,8 +227,6 @@ object PipelineStatus extends ToolCommand {
def intermediate = ConfigUtils.any2boolean(map("intermediate"))
def isDone: Future[Boolean] = Future { doneFiles.forall(_.exists()) }
def isFailed: Future[Boolean] = Future {
failFiles.exists(_.exists())
}
def isFailed: Future[Boolean] = Future { failFiles.exists(_.exists()) }
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment