Commit bef6dd89 authored by Peter van 't Hof's avatar Peter van 't Hof
Browse files

Refactor methods

parent 2295a711
...@@ -32,65 +32,78 @@ object PipelineStatus extends ToolCommand { ...@@ -32,65 +32,78 @@ object PipelineStatus extends ToolCommand {
val argsParser = new OptParser val argsParser = new OptParser
val cmdArgs: Args = argsParser.parse(args, Args()) getOrElse (throw new IllegalArgumentException) val cmdArgs: Args = argsParser.parse(args, Args()) getOrElse (throw new IllegalArgumentException)
val depsFile = cmdArgs.depsFile.getOrElse { val depsFile = cmdArgs.depsFile.getOrElse(getDepsFileFromDir(cmdArgs.pipelineDir))
require(cmdArgs.pipelineDir.exists(), s"pipelineDir does not exist: ${cmdArgs.pipelineDir}") val deps = readDepsFile(depsFile)
val logDir = new File(cmdArgs.pipelineDir, ".log") writePipelineStatus(deps, cmdArgs.outputDir)
require(logDir.exists(), s"No .log dir found in pipelineDir")
val runLogDir = logDir.list().sorted.map(new File(logDir, _)).filter(_.isDirectory).last
val graphDir = new File(runLogDir, "graph")
require(graphDir.exists(), s"Graph dir is not found: $graphDir")
new File(graphDir, "deps.json")
}
writePipelineStatus(depsFile, cmdArgs.outputDir)
logger.info("Done") logger.info("Done")
} }
def writePipelineStatus(depsFile: File, outputDir: File): Unit = { def getDepsFileFromDir(pipelineDir: File): File = {
require(pipelineDir.exists(), s"pipelineDir does not exist: $pipelineDir")
val logDir = new File(pipelineDir, ".log")
require(logDir.exists(), s"No .log dir found in pipelineDir")
val runLogDir = logDir.list().sorted.map(new File(logDir, _)).filter(_.isDirectory).last
val graphDir = new File(runLogDir, "graph")
require(graphDir.exists(), s"Graph dir is not found: $graphDir")
new File(graphDir, "deps.json")
}
case class Deps(jobs: Map[String, Job], files: Array[Map[String, Any]])
def readDepsFile(depsFile: File) = {
val deps = ConfigUtils.fileToConfigMap(depsFile) val deps = ConfigUtils.fileToConfigMap(depsFile)
val jobs = ConfigUtils.any2map(deps("jobs")).map(x => x._1 -> new Job(x._1, ConfigUtils.any2map(x._2))) val jobs = ConfigUtils.any2map(deps("jobs")).map(x => x._1 -> new Job(x._1, ConfigUtils.any2map(x._2)))
val files = ConfigUtils.any2list(deps("files")).map(x => ConfigUtils.any2map(x)).toArray
val jobDone = jobsDone(jobs) Deps(jobs, files)
val jobFailed = jobsFailed(jobs) }
val jobsRunning = jobs
def writePipelineStatus(deps: Deps,
outputDir: File,
alreadyDone: Set[String] = Set()): Unit = {
val jobDone = jobsDone(deps)
val jobFailed = jobsFailed(deps)
val jobsRunning = deps.jobs
.filterNot(x => jobDone(x._1)) .filterNot(x => jobDone(x._1))
.filterNot(x => jobFailed(x._1)) .filterNot(x => jobFailed(x._1))
.filter(_._2.stdoutFile.exists()).map(_._1).toList .filter(_._2.stdoutFile.exists()).map(_._1).toList
val jobsDeps = jobs.map(x => x._1 -> (x._2.dependsOnJobs match { val jobsDeps = deps.jobs.map(x => x._1 -> (x._2.dependsOnJobs match {
case l: List[_] => l.map(_.toString) case l: List[_] => l.map(_.toString)
case _ => throw new IllegalStateException("Value 'depends_on_jobs' is not a list") case _ => throw new IllegalStateException("Value 'depends_on_jobs' is not a list")
})) }))
val jobsWriter = new PrintWriter(new File(outputDir, s"jobs.json")) val jobsWriter = new PrintWriter(new File(outputDir, s"jobs.json"))
jobsWriter.println(ConfigUtils.mapToJson(jobsDeps).spaces2) jobsWriter.println(ConfigUtils.mapToJson(jobsDeps).spaces2)
jobsWriter.close() jobsWriter.close()
writeGraphvizFile(jobsDeps, new File(outputDir, s"jobs.gv"), jobDone, jobFailed, jobsRunning, jobs) writeGraphvizFile(jobsDeps, new File(outputDir, s"jobs.gv"), jobDone, jobFailed, jobsRunning, deps)
writeGraphvizFile(compressOnType(jobsDeps), new File(outputDir, s"compress.jobs.gv"), jobDone, jobFailed, jobsRunning, jobs) writeGraphvizFile(compressOnType(jobsDeps), new File(outputDir, s"compress.jobs.gv"), jobDone, jobFailed, jobsRunning, deps)
val mainJobs = jobs.filter(_._2.mainJob == true).map { val mainJobs = deps.jobs.filter(_._2.mainJob == true).map {
case (name, job) => case (name, job) =>
name -> getMainDependencies(name, jobs) name -> getMainDependencies(name, deps)
} }
val mainJobsWriter = new PrintWriter(new File(outputDir, s"main_jobs.json")) val mainJobsWriter = new PrintWriter(new File(outputDir, s"main_jobs.json"))
mainJobsWriter.println(ConfigUtils.mapToJson(mainJobs).spaces2) mainJobsWriter.println(ConfigUtils.mapToJson(mainJobs).spaces2)
mainJobsWriter.close() mainJobsWriter.close()
writeGraphvizFile(mainJobs, new File(outputDir, s"main_jobs.gv"), jobDone, jobFailed, jobsRunning, jobs) writeGraphvizFile(mainJobs, new File(outputDir, s"main_jobs.gv"), jobDone, jobFailed, jobsRunning, deps)
writeGraphvizFile(compressOnType(mainJobs), new File(outputDir, s"compress.main_jobs.gv"), jobDone, jobFailed, jobsRunning, jobs) writeGraphvizFile(compressOnType(mainJobs), new File(outputDir, s"compress.main_jobs.gv"), jobDone, jobFailed, jobsRunning, deps)
//print(jobsDone(jobs).mkString("\n")) //print(jobsDone(jobs).mkString("\n"))
} }
def getMainDependencies(jobName: String, jobsMap: Map[String, Job]): List[String] = { def getMainDependencies(jobName: String, deps: Deps): List[String] = {
val job = jobsMap(jobName) val job = deps.jobs(jobName)
val dependencies = job.dependsOnJobs match { val dependencies = job.dependsOnJobs match {
case l: List[_] => l.map(_.toString) case l: List[_] => l.map(_.toString)
} }
dependencies.flatMap { dep => dependencies.flatMap { dep =>
jobsMap(dep).mainJob match { deps.jobs(dep).mainJob match {
case true => List(dep) case true => List(dep)
case false => getMainDependencies(dep, jobsMap) case false => getMainDependencies(dep, deps)
} }
}.distinct }.distinct
} }
...@@ -108,26 +121,24 @@ object PipelineStatus extends ToolCommand { ...@@ -108,26 +121,24 @@ object PipelineStatus extends ToolCommand {
} }
def writeGraphvizFile(jobsDeps: Map[String, List[String]], outputFile: File, def writeGraphvizFile(jobsDeps: Map[String, List[String]], outputFile: File,
jobDone: Map[String, Boolean], jobDone: Set[String],
jobFailed: Map[String, Boolean], jobFailed: Set[String],
jobsRunning: List[String], jobsRunning: List[String],
jobs: Map[String, Job]): Unit = { deps: Deps): Unit = {
val writer = new PrintWriter(outputFile) val writer = new PrintWriter(outputFile)
writer.println("digraph graphname {") writer.println("digraph graphname {")
jobDone jobDone
.filter(x => jobsDeps.contains(x._1)) .filter(x => jobsDeps.contains(x))
.filter(_._2) .foreach(x => writer.println(s" $x [color = green]"))
.foreach(x => writer.println(s" ${x._1} [color = green]"))
jobFailed jobFailed
.filter(x => jobsDeps.contains(x._1)) .filter(x => jobsDeps.contains(x))
.filter(_._2) .foreach(x => writer.println(s" $x [color = red]"))
.foreach(x => writer.println(s" ${x._1} [color = red]")) jobsReadyStart(deps.jobs, jobDone)
jobsReadyStart(jobs, jobDone)
.filter(jobsDeps.contains) .filter(jobsDeps.contains)
.filterNot(jobDone) .filterNot(jobDone)
.filterNot(jobsRunning.contains) .filterNot(jobsRunning.contains)
.foreach(x => writer.println(s" $x [color = orange]")) .foreach(x => writer.println(s" $x [color = orange]"))
jobs deps.jobs
.filter(x => jobsDeps.contains(x._1)) .filter(x => jobsDeps.contains(x._1))
.filter(_._2.intermediate) .filter(_._2.intermediate)
.foreach(x => writer.println(s" ${x._1} [style = dashed]")) .foreach(x => writer.println(s" ${x._1} [style = dashed]"))
...@@ -136,28 +147,29 @@ object PipelineStatus extends ToolCommand { ...@@ -136,28 +147,29 @@ object PipelineStatus extends ToolCommand {
writer.close() writer.close()
} }
def jobsReadyStart(jobs: Map[String, Job], jobsDone: Map[String, Boolean]): List[String] = { def jobsReadyStart(jobs: Map[String, Job], jobsDone: Set[String]): List[String] = {
jobs.filter(_._2.dependsOnJobs.forall(jobsDone)).map(_._1).toList jobs.filter(_._2.dependsOnJobs.forall(jobsDone)).map(_._1).toList
} }
def jobsDone(jobs: Map[String, Job]): Map[String, Boolean] = { def jobsDone(deps: Deps): Set[String] = {
val f = jobs.map(x => x._2 -> x._2.isDone) val f = deps.jobs.map(x => x._2 -> x._2.isDone)
val dones = f.map(x => x._1 -> Await.result(x._2, Duration.Inf)) val dones = f.map(x => x._1 -> Await.result(x._2, Duration.Inf))
val f2 = f.map(x => x._1 -> x._2.map{ d => val f2 = f.map(x => x._1 -> x._2.map{ d =>
if (d || !x._1.intermediate) d if (d || !x._1.intermediate) d
else upstreamJobDone(x._1, dones, jobs) else upstreamJobDone(x._1, dones, deps)
}) })
f2.map(x => x._1.name -> Await.result(x._2, Duration.Inf)) val d = f2.map(x => x._1 -> Await.result(x._2, Duration.Inf))
d.filter(_._2).map(_._1.name).toSet
} }
private def upstreamJobDone(job: Job, dones: Map[Job, Boolean], allJobs: Map[String, Job]): Boolean = { private def upstreamJobDone(job: Job, dones: Map[Job, Boolean], deps: Deps): Boolean = {
job.outputUsedByJobs.map(allJobs) job.outputUsedByJobs.map(deps.jobs)
.exists(x => dones(x) || (x.intermediate && upstreamJobDone(x, dones, allJobs))) .exists(x => dones(x) || (x.intermediate && upstreamJobDone(x, dones, deps)))
} }
def jobsFailed(jobs: Map[String, Job]): Map[String, Boolean] = { def jobsFailed(deps: Deps): Set[String] = {
val f = jobs.map(x => x._1 -> x._2.isFailed) val f = deps.jobs.map(x => x._1 -> x._2.isFailed)
f.map(x => x._1 -> Await.result(x._2, Duration.Inf)) f.map(x => x._1 -> Await.result(x._2, Duration.Inf)).filter(_._2).map(_._1).toSet
} }
class Job(val name: String, map: Map[String, Any]) { class Job(val name: String, map: Map[String, Any]) {
......
...@@ -139,7 +139,7 @@ object WriteDependencies extends Logging with Configurable { ...@@ -139,7 +139,7 @@ object WriteDependencies extends Logging with Configurable {
)).spaces2) )).spaces2)
writer.close() writer.close()
PipelineStatus.writePipelineStatus(outputFile, outputDir) PipelineStatus.writePipelineStatus(PipelineStatus.readDepsFile(outputFile), outputDir)
logger.info("done calculating dependencies") logger.info("done calculating dependencies")
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment