PipelineStatus.scala 7.53 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package nl.lumc.sasc.biopet.core

import java.io.{File, PrintWriter}

import nl.lumc.sasc.biopet.utils.{ConfigUtils, ToolCommand}

import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._

/**
  * Created by pjvan_thof on 7-12-16.
  */
object PipelineStatus extends ToolCommand {
  case class Args(pipelineDir: File = null, depsFile: Option[File] = None, outputDir: File = null) extends AbstractArgs

  class OptParser extends AbstractOptParser {
    opt[File]('d', "pipelineDir") required () maxOccurs 1 valueName "<file>" action { (x, c) =>
      c.copy(pipelineDir = x)
    } text "Output directory of the pipeline"
    opt[File]('o', "outputDir") required () maxOccurs 1 valueName "<file>" action { (x, c) =>
      c.copy(outputDir = x)
    } text "Output directory of this tool"
    opt[File]("depsFile") maxOccurs 1 valueName "<file>" action { (x, c) =>
      c.copy(depsFile = Some(x))
    } text "Location of deps file, not required"
  }

  def main(args: Array[String]): Unit = {
    logger.info("Start")

    val argsParser = new OptParser
    val cmdArgs: Args = argsParser.parse(args, Args()) getOrElse (throw new IllegalArgumentException)

Peter van 't Hof's avatar
Peter van 't Hof committed
35
36
37
    val depsFile = cmdArgs.depsFile.getOrElse(getDepsFileFromDir(cmdArgs.pipelineDir))
    val deps = readDepsFile(depsFile)
    writePipelineStatus(deps, cmdArgs.outputDir)
38
39
40
    logger.info("Done")
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
41
42
43
44
45
46
47
48
49
50
51
52
53
  def getDepsFileFromDir(pipelineDir: File): File = {
    require(pipelineDir.exists(), s"pipelineDir does not exist: $pipelineDir")
    val logDir = new File(pipelineDir, ".log")
    require(logDir.exists(), s"No .log dir found in pipelineDir")
    val runLogDir = logDir.list().sorted.map(new File(logDir, _)).filter(_.isDirectory).last
    val graphDir = new File(runLogDir, "graph")
    require(graphDir.exists(), s"Graph dir is not found: $graphDir")
    new File(graphDir, "deps.json")
  }

  case class Deps(jobs: Map[String, Job], files: Array[Map[String, Any]])

  def readDepsFile(depsFile: File) = {
54
55
56
    val deps = ConfigUtils.fileToConfigMap(depsFile)

    val jobs = ConfigUtils.any2map(deps("jobs")).map(x => x._1 -> new Job(x._1, ConfigUtils.any2map(x._2)))
Peter van 't Hof's avatar
Peter van 't Hof committed
57
    val files = ConfigUtils.any2list(deps("files")).map(x => ConfigUtils.any2map(x)).toArray
58

Peter van 't Hof's avatar
Peter van 't Hof committed
59
60
61
62
63
64
65
66
67
68
    Deps(jobs, files)
  }

  def writePipelineStatus(deps: Deps,
                          outputDir: File,
                          alreadyDone: Set[String] = Set()): Unit = {

    val jobDone = jobsDone(deps)
    val jobFailed = jobsFailed(deps)
    val jobsRunning = deps.jobs
Peter van 't Hof's avatar
Peter van 't Hof committed
69
70
      .filterNot(x => jobDone(x._1))
      .filterNot(x => jobFailed(x._1))
Peter van 't Hof's avatar
Peter van 't Hof committed
71
      .filter(_._2.stdoutFile.exists()).map(_._1).toList
Peter van 't Hof's avatar
Peter van 't Hof committed
72

Peter van 't Hof's avatar
Peter van 't Hof committed
73
    val jobsDeps = deps.jobs.map(x => x._1 -> (x._2.dependsOnJobs match {
74
75
76
77
78
79
      case l: List[_] => l.map(_.toString)
      case _          => throw new IllegalStateException("Value 'depends_on_jobs' is not a list")
    }))
    val jobsWriter = new PrintWriter(new File(outputDir, s"jobs.json"))
    jobsWriter.println(ConfigUtils.mapToJson(jobsDeps).spaces2)
    jobsWriter.close()
Peter van 't Hof's avatar
Peter van 't Hof committed
80
81
    writeGraphvizFile(jobsDeps, new File(outputDir, s"jobs.gv"), jobDone, jobFailed, jobsRunning, deps)
    writeGraphvizFile(compressOnType(jobsDeps), new File(outputDir, s"compress.jobs.gv"), jobDone, jobFailed, jobsRunning, deps)
82

Peter van 't Hof's avatar
Peter van 't Hof committed
83
    val mainJobs = deps.jobs.filter(_._2.mainJob == true).map {
84
      case (name, job) =>
Peter van 't Hof's avatar
Peter van 't Hof committed
85
        name -> getMainDependencies(name, deps)
86
87
88
89
90
    }

    val mainJobsWriter = new PrintWriter(new File(outputDir, s"main_jobs.json"))
    mainJobsWriter.println(ConfigUtils.mapToJson(mainJobs).spaces2)
    mainJobsWriter.close()
Peter van 't Hof's avatar
Peter van 't Hof committed
91
92
    writeGraphvizFile(mainJobs, new File(outputDir, s"main_jobs.gv"), jobDone, jobFailed, jobsRunning, deps)
    writeGraphvizFile(compressOnType(mainJobs), new File(outputDir, s"compress.main_jobs.gv"), jobDone, jobFailed, jobsRunning, deps)
93
94
95
96
97

    //print(jobsDone(jobs).mkString("\n"))

  }

Peter van 't Hof's avatar
Peter van 't Hof committed
98
99
  def getMainDependencies(jobName: String, deps: Deps): List[String] = {
    val job = deps.jobs(jobName)
100
101
102
103
    val dependencies = job.dependsOnJobs match {
      case l: List[_] => l.map(_.toString)
    }
    dependencies.flatMap { dep =>
Peter van 't Hof's avatar
Peter van 't Hof committed
104
      deps.jobs(dep).mainJob match {
105
        case true  => List(dep)
Peter van 't Hof's avatar
Peter van 't Hof committed
106
        case false => getMainDependencies(dep, deps)
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
      }
    }.distinct
  }

  val numberRegex = """(.*)_(\d*)$""".r
  def compressOnType(jobs: Map[String, List[String]]): Map[String, List[String]] = {
    val set = for ((job, deps) <- jobs.toSet; dep <- deps) yield {
      job match {
        case numberRegex(name, number) => (name, dep match {
          case numberRegex(name, number) => name
        })
      }
    }
    set.groupBy(_._1).map(x => x._1 -> x._2.map(_._2).toList)
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
123
  def writeGraphvizFile(jobsDeps: Map[String, List[String]], outputFile: File,
Peter van 't Hof's avatar
Peter van 't Hof committed
124
125
                        jobDone: Set[String],
                        jobFailed: Set[String],
Peter van 't Hof's avatar
Peter van 't Hof committed
126
                        jobsRunning:  List[String],
Peter van 't Hof's avatar
Peter van 't Hof committed
127
                        deps: Deps): Unit = {
128
129
    val writer = new PrintWriter(outputFile)
    writer.println("digraph graphname {")
Peter van 't Hof's avatar
Peter van 't Hof committed
130
    jobDone
Peter van 't Hof's avatar
Peter van 't Hof committed
131
132
      .filter(x => jobsDeps.contains(x))
      .foreach(x => writer.println(s"  $x [color = green]"))
Peter van 't Hof's avatar
Peter van 't Hof committed
133
    jobFailed
Peter van 't Hof's avatar
Peter van 't Hof committed
134
135
136
      .filter(x => jobsDeps.contains(x))
      .foreach(x => writer.println(s"  $x [color = red]"))
    jobsReadyStart(deps.jobs, jobDone)
Peter van 't Hof's avatar
Peter van 't Hof committed
137
138
139
140
      .filter(jobsDeps.contains)
      .filterNot(jobDone)
      .filterNot(jobsRunning.contains)
      .foreach(x => writer.println(s"  $x [color = orange]"))
Peter van 't Hof's avatar
Peter van 't Hof committed
141
    deps.jobs
Peter van 't Hof's avatar
Peter van 't Hof committed
142
143
144
      .filter(x => jobsDeps.contains(x._1))
      .filter(_._2.intermediate)
      .foreach(x => writer.println(s"  ${x._1} [style = dashed]"))
Peter van 't Hof's avatar
Peter van 't Hof committed
145
    jobsDeps.foreach { case (a, b) => b.foreach(c => writer.println(s"  $c -> $a;")) }
146
147
148
149
    writer.println("}")
    writer.close()
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
150
  def jobsReadyStart(jobs: Map[String, Job], jobsDone: Set[String]): List[String] = {
Peter van 't Hof's avatar
Peter van 't Hof committed
151
152
153
    jobs.filter(_._2.dependsOnJobs.forall(jobsDone)).map(_._1).toList
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
154
155
  def jobsDone(deps: Deps): Set[String] = {
    val f = deps.jobs.map(x => x._2 -> x._2.isDone)
156
157
158
    val dones = f.map(x => x._1 -> Await.result(x._2, Duration.Inf))
    val f2 = f.map(x => x._1 -> x._2.map{ d =>
      if (d || !x._1.intermediate) d
Peter van 't Hof's avatar
Peter van 't Hof committed
159
      else upstreamJobDone(x._1, dones, deps)
160
    })
Peter van 't Hof's avatar
Peter van 't Hof committed
161
162
    val d = f2.map(x => x._1 -> Await.result(x._2, Duration.Inf))
    d.filter(_._2).map(_._1.name).toSet
163
164
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
165
166
167
  private def upstreamJobDone(job: Job, dones: Map[Job, Boolean], deps: Deps): Boolean = {
    job.outputUsedByJobs.map(deps.jobs)
      .exists(x => dones(x) || (x.intermediate && upstreamJobDone(x, dones, deps)))
168
169
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
170
171
172
  def jobsFailed(deps: Deps): Set[String] = {
    val f = deps.jobs.map(x => x._1 -> x._2.isFailed)
    f.map(x => x._1 -> Await.result(x._2, Duration.Inf)).filter(_._2).map(_._1).toSet
Peter van 't Hof's avatar
Peter van 't Hof committed
173
174
  }

175
176
177
178
179
180
181
182
183
184
185
  class Job(val name: String, map: Map[String, Any]) {

    def doneAtStart: Boolean = ConfigUtils.any2boolean(map("done_at_start"))

    def failFiles = ConfigUtils.any2fileList(map("fail_files"))
    def doneFiles = ConfigUtils.any2fileList(map("done_files"))
    def outputUsedByJobs = ConfigUtils.any2stringList(map("output_used_by_jobs"))
    def dependsOnJobs = ConfigUtils.any2stringList(map("depends_on_jobs"))
    def stdoutFile = new File(ConfigUtils.any2string(map("stdout_file")))

    def outputsFiles = ConfigUtils.any2fileList(map("outputs"))
186
    def inputFiles = ConfigUtils.any2fileList(map("inputs"))
187
188
189
190
191
192
193
194

    def mainJob = ConfigUtils.any2boolean(map("main_job"))
    def intermediate = ConfigUtils.any2boolean(map("intermediate"))

    def isDone: Future[Boolean] = Future { doneFiles.forall(_.exists()) }
    def isFailed: Future[Boolean] = Future { failFiles.exists(_.exists()) }
  }
}