Commit 3910e8fe authored by Peter van 't Hof's avatar Peter van 't Hof
Browse files

Added a follow option

parent e7cef5a5
...@@ -12,7 +12,11 @@ import scala.concurrent.duration._ ...@@ -12,7 +12,11 @@ import scala.concurrent.duration._
* Created by pjvan_thof on 7-12-16. * Created by pjvan_thof on 7-12-16.
*/ */
object PipelineStatus extends ToolCommand { object PipelineStatus extends ToolCommand {
case class Args(pipelineDir: File = null, depsFile: Option[File] = None, outputDir: File = null) extends AbstractArgs case class Args(pipelineDir: File = null,
depsFile: Option[File] = None,
outputDir: File = null,
follow: Boolean = false,
refreshTime: Int = 30) extends AbstractArgs
class OptParser extends AbstractOptParser { class OptParser extends AbstractOptParser {
opt[File]('d', "pipelineDir") required () maxOccurs 1 valueName "<file>" action { (x, c) => opt[File]('d', "pipelineDir") required () maxOccurs 1 valueName "<file>" action { (x, c) =>
...@@ -24,6 +28,13 @@ object PipelineStatus extends ToolCommand { ...@@ -24,6 +28,13 @@ object PipelineStatus extends ToolCommand {
opt[File]("depsFile") maxOccurs 1 valueName "<file>" action { (x, c) => opt[File]("depsFile") maxOccurs 1 valueName "<file>" action { (x, c) =>
c.copy(depsFile = Some(x)) c.copy(depsFile = Some(x))
} text "Location of deps file, not required" } text "Location of deps file, not required"
opt[Unit]('f', "follow") maxOccurs 1 action { (x, c) =>
c.copy(follow = true)
} text "This will follow a run"
opt[Int]("refresh") maxOccurs 1 action { (x, c) =>
c.copy(refreshTime = x)
} text "Time to check again, default set on 30 seconds"
} }
def main(args: Array[String]): Unit = { def main(args: Array[String]): Unit = {
...@@ -34,7 +45,7 @@ object PipelineStatus extends ToolCommand { ...@@ -34,7 +45,7 @@ object PipelineStatus extends ToolCommand {
val depsFile = cmdArgs.depsFile.getOrElse(getDepsFileFromDir(cmdArgs.pipelineDir)) val depsFile = cmdArgs.depsFile.getOrElse(getDepsFileFromDir(cmdArgs.pipelineDir))
val deps = readDepsFile(depsFile) val deps = readDepsFile(depsFile)
writePipelineStatus(deps, cmdArgs.outputDir) writePipelineStatus(deps, cmdArgs.outputDir, follow = cmdArgs.follow, refreshTime = cmdArgs.refreshTime)
logger.info("Done") logger.info("Done")
} }
...@@ -61,10 +72,14 @@ object PipelineStatus extends ToolCommand { ...@@ -61,10 +72,14 @@ object PipelineStatus extends ToolCommand {
def writePipelineStatus(deps: Deps, def writePipelineStatus(deps: Deps,
outputDir: File, outputDir: File,
alreadyDone: Set[String] = Set()): Unit = { alreadyDone: Set[String] = Set(),
alreadyFailed: Set[String] = Set(),
follow: Boolean = false,
refreshTime: Int = 30): Unit = {
val jobDone = jobsDone(deps) val jobDone = jobsDone(deps)
val jobFailed = jobsFailed(deps) val jobFailed = jobsFailed(deps, jobDone)
val jobsStart = jobsReadyStart(deps, jobDone)
val jobsDeps = deps.jobs.map(x => x._1 -> (x._2.dependsOnJobs match { val jobsDeps = deps.jobs.map(x => x._1 -> (x._2.dependsOnJobs match {
case l: List[_] => l.map(_.toString) case l: List[_] => l.map(_.toString)
...@@ -73,8 +88,8 @@ object PipelineStatus extends ToolCommand { ...@@ -73,8 +88,8 @@ object PipelineStatus extends ToolCommand {
val jobsWriter = new PrintWriter(new File(outputDir, s"jobs.json")) val jobsWriter = new PrintWriter(new File(outputDir, s"jobs.json"))
jobsWriter.println(ConfigUtils.mapToJson(jobsDeps).spaces2) jobsWriter.println(ConfigUtils.mapToJson(jobsDeps).spaces2)
jobsWriter.close() jobsWriter.close()
writeGraphvizFile(jobsDeps, new File(outputDir, s"jobs.gv"), jobDone, jobFailed, deps) writeGraphvizFile(jobsDeps, new File(outputDir, s"jobs.gv"), jobDone, jobFailed, jobsStart, deps)
writeGraphvizFile(compressOnType(jobsDeps), new File(outputDir, s"compress.jobs.gv"), jobDone, jobFailed, deps) writeGraphvizFile(compressOnType(jobsDeps), new File(outputDir, s"compress.jobs.gv"), jobDone, jobFailed, jobsStart, deps)
val mainJobs = deps.jobs.filter(_._2.mainJob == true).map { val mainJobs = deps.jobs.filter(_._2.mainJob == true).map {
case (name, job) => case (name, job) =>
...@@ -84,11 +99,21 @@ object PipelineStatus extends ToolCommand { ...@@ -84,11 +99,21 @@ object PipelineStatus extends ToolCommand {
val mainJobsWriter = new PrintWriter(new File(outputDir, s"main_jobs.json")) val mainJobsWriter = new PrintWriter(new File(outputDir, s"main_jobs.json"))
mainJobsWriter.println(ConfigUtils.mapToJson(mainJobs).spaces2) mainJobsWriter.println(ConfigUtils.mapToJson(mainJobs).spaces2)
mainJobsWriter.close() mainJobsWriter.close()
writeGraphvizFile(mainJobs, new File(outputDir, s"main_jobs.gv"), jobDone, jobFailed, deps) writeGraphvizFile(mainJobs, new File(outputDir, s"main_jobs.gv"), jobDone, jobFailed, jobsStart, deps)
writeGraphvizFile(compressOnType(mainJobs), new File(outputDir, s"compress.main_jobs.gv"), jobDone, jobFailed, deps) writeGraphvizFile(compressOnType(mainJobs), new File(outputDir, s"compress.main_jobs.gv"), jobDone, jobFailed, jobsStart, deps)
val totalJobs = deps.jobs.size
val totalStart = jobsStart.size
val totalDone = jobDone.size
val totalFailed = jobFailed.size
val totalPending = totalJobs - jobsStart.size - jobDone.size - jobFailed.size
//print(jobsDone(jobs).mkString("\n")) logger.info(s"Total job: ${totalJobs}, Pending: ${totalPending}, Ready to run / running: ${totalStart}, Done: ${totalDone}, Failed ${totalFailed}")
if (follow) {
Thread.sleep(refreshTime * 1000)
writePipelineStatus(deps, outputDir, jobDone, jobFailed, follow)
}
} }
def getMainDependencies(jobName: String, deps: Deps): List[String] = { def getMainDependencies(jobName: String, deps: Deps): List[String] = {
...@@ -116,9 +141,11 @@ object PipelineStatus extends ToolCommand { ...@@ -116,9 +141,11 @@ object PipelineStatus extends ToolCommand {
set.groupBy(_._1).map(x => x._1 -> x._2.map(_._2).toList) set.groupBy(_._1).map(x => x._1 -> x._2.map(_._2).toList)
} }
def writeGraphvizFile(jobsDeps: Map[String, List[String]], outputFile: File, def writeGraphvizFile(jobsDeps: Map[String, List[String]],
outputFile: File,
jobDone: Set[String], jobDone: Set[String],
jobFailed: Set[String], jobFailed: Set[String],
jobsStart: Set[String],
deps: Deps): Unit = { deps: Deps): Unit = {
val writer = new PrintWriter(outputFile) val writer = new PrintWriter(outputFile)
writer.println("digraph graphname {") writer.println("digraph graphname {")
...@@ -128,9 +155,9 @@ object PipelineStatus extends ToolCommand { ...@@ -128,9 +155,9 @@ object PipelineStatus extends ToolCommand {
jobFailed jobFailed
.filter(x => jobsDeps.contains(x)) .filter(x => jobsDeps.contains(x))
.foreach(x => writer.println(s" $x [color = red]")) .foreach(x => writer.println(s" $x [color = red]"))
jobsReadyStart(deps.jobs, jobDone) jobsStart
.filter(x => jobsDeps.contains(x)) .filter(x => jobsDeps.contains(x))
.filterNot(jobDone.contains) .diff(jobDone)
.foreach(x => writer.println(s" $x [color = orange]")) .foreach(x => writer.println(s" $x [color = orange]"))
deps.jobs deps.jobs
.filter(x => jobsDeps.contains(x._1)) .filter(x => jobsDeps.contains(x._1))
...@@ -141,13 +168,13 @@ object PipelineStatus extends ToolCommand { ...@@ -141,13 +168,13 @@ object PipelineStatus extends ToolCommand {
writer.close() writer.close()
} }
def jobsReadyStart(jobs: Map[String, Job], jobsDone: Set[String]): Set[String] = { def jobsReadyStart(deps: Deps, jobsDone: Set[String]): Set[String] = {
jobs.filter(_._2.dependsOnJobs.forall(jobsDone)).map(_._1).toSet deps.jobs.filterNot(x => jobsDone.contains(x._1)).filter(_._2.dependsOnJobs.forall(jobsDone)).keySet
} }
def jobsDone(deps: Deps): Set[String] = { def jobsDone(deps: Deps, alreadyDone: Set[String] = Set()): Set[String] = {
val f = deps.jobs.map(x => x._2 -> x._2.isDone) val f = deps.jobs.filterNot(x => alreadyDone.contains(x._1)).map(x => x._2 -> x._2.isDone)
val dones = f.map(x => x._1 -> Await.result(x._2, Duration.Inf)) val dones = f.map(x => x._1 -> Await.result(x._2, Duration.Inf)).filter(_._2).map(_._1.name).toSet ++ alreadyDone
val f2 = f.map(x => x._1 -> x._2.map{ d => val f2 = f.map(x => x._1 -> x._2.map{ d =>
if (d || !x._1.intermediate) d if (d || !x._1.intermediate) d
else upstreamJobDone(x._1, dones, deps) else upstreamJobDone(x._1, dones, deps)
...@@ -156,14 +183,16 @@ object PipelineStatus extends ToolCommand { ...@@ -156,14 +183,16 @@ object PipelineStatus extends ToolCommand {
d.filter(_._2).map(_._1.name).toSet d.filter(_._2).map(_._1.name).toSet
} }
private def upstreamJobDone(job: Job, dones: Map[Job, Boolean], deps: Deps): Boolean = { private def upstreamJobDone(job: Job, dones: Set[String], deps: Deps): Boolean = {
job.outputUsedByJobs.map(deps.jobs) job.outputUsedByJobs.map(deps.jobs)
.exists(x => dones(x) || (x.intermediate && upstreamJobDone(x, dones, deps))) .exists(x => dones.contains(x.name) || (x.intermediate && upstreamJobDone(x, dones, deps)))
} }
def jobsFailed(deps: Deps): Set[String] = { def jobsFailed(deps: Deps, dones: Set[String], alreadyFailed: Set[String] = Set()): Set[String] = {
val f = deps.jobs.map(x => x._1 -> x._2.isFailed) val f = deps.jobs.filterNot(x => dones.contains(x._1))
f.map(x => x._1 -> Await.result(x._2, Duration.Inf)).filter(_._2).map(_._1).toSet .filterNot(x => alreadyFailed.contains(x._1)).map(x => x._1 -> x._2.isFailed)
val bla = f.map(x => x._1 -> Await.result(x._2, Duration.Inf)).filter(_._2).keySet
bla ++ alreadyFailed
} }
class Job(val name: String, map: Map[String, Any]) { class Job(val name: String, map: Map[String, Any]) {
...@@ -183,6 +212,8 @@ object PipelineStatus extends ToolCommand { ...@@ -183,6 +212,8 @@ object PipelineStatus extends ToolCommand {
def intermediate = ConfigUtils.any2boolean(map("intermediate")) def intermediate = ConfigUtils.any2boolean(map("intermediate"))
def isDone: Future[Boolean] = Future { doneFiles.forall(_.exists()) } def isDone: Future[Boolean] = Future { doneFiles.forall(_.exists()) }
def isFailed: Future[Boolean] = Future { failFiles.exists(_.exists()) } def isFailed: Future[Boolean] = Future {
failFiles.exists(_.exists())
}
} }
} }
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment