PipelineStatus.scala 7.24 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package nl.lumc.sasc.biopet.core

import java.io.{File, PrintWriter}

import nl.lumc.sasc.biopet.utils.{ConfigUtils, ToolCommand}

import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._

/**
  * Created by pjvan_thof on 7-12-16.
  */
object PipelineStatus extends ToolCommand {
  case class Args(pipelineDir: File = null, depsFile: Option[File] = None, outputDir: File = null) extends AbstractArgs

  class OptParser extends AbstractOptParser {
    opt[File]('d', "pipelineDir") required () maxOccurs 1 valueName "<file>" action { (x, c) =>
      c.copy(pipelineDir = x)
    } text "Output directory of the pipeline"
    opt[File]('o', "outputDir") required () maxOccurs 1 valueName "<file>" action { (x, c) =>
      c.copy(outputDir = x)
    } text "Output directory of this tool"
    opt[File]("depsFile") maxOccurs 1 valueName "<file>" action { (x, c) =>
      c.copy(depsFile = Some(x))
    } text "Location of deps file, not required"
  }

  def main(args: Array[String]): Unit = {
    logger.info("Start")

    val argsParser = new OptParser
    val cmdArgs: Args = argsParser.parse(args, Args()) getOrElse (throw new IllegalArgumentException)

Peter van 't Hof's avatar
Peter van 't Hof committed
35
36
37
    val depsFile = cmdArgs.depsFile.getOrElse(getDepsFileFromDir(cmdArgs.pipelineDir))
    val deps = readDepsFile(depsFile)
    writePipelineStatus(deps, cmdArgs.outputDir)
38
39
40
    logger.info("Done")
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
41
42
43
44
45
46
47
48
49
50
51
52
53
  def getDepsFileFromDir(pipelineDir: File): File = {
    require(pipelineDir.exists(), s"pipelineDir does not exist: $pipelineDir")
    val logDir = new File(pipelineDir, ".log")
    require(logDir.exists(), s"No .log dir found in pipelineDir")
    val runLogDir = logDir.list().sorted.map(new File(logDir, _)).filter(_.isDirectory).last
    val graphDir = new File(runLogDir, "graph")
    require(graphDir.exists(), s"Graph dir is not found: $graphDir")
    new File(graphDir, "deps.json")
  }

  case class Deps(jobs: Map[String, Job], files: Array[Map[String, Any]])

  def readDepsFile(depsFile: File) = {
54
55
56
    val deps = ConfigUtils.fileToConfigMap(depsFile)

    val jobs = ConfigUtils.any2map(deps("jobs")).map(x => x._1 -> new Job(x._1, ConfigUtils.any2map(x._2)))
Peter van 't Hof's avatar
Peter van 't Hof committed
57
    val files = ConfigUtils.any2list(deps("files")).map(x => ConfigUtils.any2map(x)).toArray
58

Peter van 't Hof's avatar
Peter van 't Hof committed
59
60
61
62
63
64
65
66
67
    Deps(jobs, files)
  }

  def writePipelineStatus(deps: Deps,
                          outputDir: File,
                          alreadyDone: Set[String] = Set()): Unit = {

    val jobDone = jobsDone(deps)
    val jobFailed = jobsFailed(deps)
Peter van 't Hof's avatar
Peter van 't Hof committed
68

Peter van 't Hof's avatar
Peter van 't Hof committed
69
    val jobsDeps = deps.jobs.map(x => x._1 -> (x._2.dependsOnJobs match {
70
71
72
73
74
75
      case l: List[_] => l.map(_.toString)
      case _          => throw new IllegalStateException("Value 'depends_on_jobs' is not a list")
    }))
    val jobsWriter = new PrintWriter(new File(outputDir, s"jobs.json"))
    jobsWriter.println(ConfigUtils.mapToJson(jobsDeps).spaces2)
    jobsWriter.close()
Peter van 't Hof's avatar
Peter van 't Hof committed
76
77
    writeGraphvizFile(jobsDeps, new File(outputDir, s"jobs.gv"), jobDone, jobFailed, deps)
    writeGraphvizFile(compressOnType(jobsDeps), new File(outputDir, s"compress.jobs.gv"), jobDone, jobFailed, deps)
78

Peter van 't Hof's avatar
Peter van 't Hof committed
79
    val mainJobs = deps.jobs.filter(_._2.mainJob == true).map {
80
      case (name, job) =>
Peter van 't Hof's avatar
Peter van 't Hof committed
81
        name -> getMainDependencies(name, deps)
82
83
84
85
86
    }

    val mainJobsWriter = new PrintWriter(new File(outputDir, s"main_jobs.json"))
    mainJobsWriter.println(ConfigUtils.mapToJson(mainJobs).spaces2)
    mainJobsWriter.close()
Peter van 't Hof's avatar
Peter van 't Hof committed
87
88
    writeGraphvizFile(mainJobs, new File(outputDir, s"main_jobs.gv"), jobDone, jobFailed, deps)
    writeGraphvizFile(compressOnType(mainJobs), new File(outputDir, s"compress.main_jobs.gv"), jobDone, jobFailed, deps)
89
90
91
92
93

    //print(jobsDone(jobs).mkString("\n"))

  }

Peter van 't Hof's avatar
Peter van 't Hof committed
94
95
  def getMainDependencies(jobName: String, deps: Deps): List[String] = {
    val job = deps.jobs(jobName)
96
97
98
99
    val dependencies = job.dependsOnJobs match {
      case l: List[_] => l.map(_.toString)
    }
    dependencies.flatMap { dep =>
Peter van 't Hof's avatar
Peter van 't Hof committed
100
      deps.jobs(dep).mainJob match {
101
        case true  => List(dep)
Peter van 't Hof's avatar
Peter van 't Hof committed
102
        case false => getMainDependencies(dep, deps)
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
      }
    }.distinct
  }

  val numberRegex = """(.*)_(\d*)$""".r
  def compressOnType(jobs: Map[String, List[String]]): Map[String, List[String]] = {
    val set = for ((job, deps) <- jobs.toSet; dep <- deps) yield {
      job match {
        case numberRegex(name, number) => (name, dep match {
          case numberRegex(name, number) => name
        })
      }
    }
    set.groupBy(_._1).map(x => x._1 -> x._2.map(_._2).toList)
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
119
  def writeGraphvizFile(jobsDeps: Map[String, List[String]], outputFile: File,
Peter van 't Hof's avatar
Peter van 't Hof committed
120
121
122
                        jobDone: Set[String],
                        jobFailed: Set[String],
                        deps: Deps): Unit = {
123
124
    val writer = new PrintWriter(outputFile)
    writer.println("digraph graphname {")
Peter van 't Hof's avatar
Peter van 't Hof committed
125
    jobDone
Peter van 't Hof's avatar
Peter van 't Hof committed
126
127
      .filter(x => jobsDeps.contains(x))
      .foreach(x => writer.println(s"  $x [color = green]"))
Peter van 't Hof's avatar
Peter van 't Hof committed
128
    jobFailed
Peter van 't Hof's avatar
Peter van 't Hof committed
129
130
131
      .filter(x => jobsDeps.contains(x))
      .foreach(x => writer.println(s"  $x [color = red]"))
    jobsReadyStart(deps.jobs, jobDone)
Peter van 't Hof's avatar
Peter van 't Hof committed
132
133
      .filter(x => jobsDeps.contains(x))
      .filterNot(jobDone.contains)
Peter van 't Hof's avatar
Peter van 't Hof committed
134
      .foreach(x => writer.println(s"  $x [color = orange]"))
Peter van 't Hof's avatar
Peter van 't Hof committed
135
    deps.jobs
Peter van 't Hof's avatar
Peter van 't Hof committed
136
137
138
      .filter(x => jobsDeps.contains(x._1))
      .filter(_._2.intermediate)
      .foreach(x => writer.println(s"  ${x._1} [style = dashed]"))
Peter van 't Hof's avatar
Peter van 't Hof committed
139
    jobsDeps.foreach { case (a, b) => b.foreach(c => writer.println(s"  $c -> $a;")) }
140
141
142
143
    writer.println("}")
    writer.close()
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
144
145
  def jobsReadyStart(jobs: Map[String, Job], jobsDone: Set[String]): Set[String] = {
    jobs.filter(_._2.dependsOnJobs.forall(jobsDone)).map(_._1).toSet
Peter van 't Hof's avatar
Peter van 't Hof committed
146
147
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
148
149
  def jobsDone(deps: Deps): Set[String] = {
    val f = deps.jobs.map(x => x._2 -> x._2.isDone)
150
151
152
    val dones = f.map(x => x._1 -> Await.result(x._2, Duration.Inf))
    val f2 = f.map(x => x._1 -> x._2.map{ d =>
      if (d || !x._1.intermediate) d
Peter van 't Hof's avatar
Peter van 't Hof committed
153
      else upstreamJobDone(x._1, dones, deps)
154
    })
Peter van 't Hof's avatar
Peter van 't Hof committed
155
156
    val d = f2.map(x => x._1 -> Await.result(x._2, Duration.Inf))
    d.filter(_._2).map(_._1.name).toSet
157
158
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
159
160
161
  private def upstreamJobDone(job: Job, dones: Map[Job, Boolean], deps: Deps): Boolean = {
    job.outputUsedByJobs.map(deps.jobs)
      .exists(x => dones(x) || (x.intermediate && upstreamJobDone(x, dones, deps)))
162
163
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
164
165
166
  def jobsFailed(deps: Deps): Set[String] = {
    val f = deps.jobs.map(x => x._1 -> x._2.isFailed)
    f.map(x => x._1 -> Await.result(x._2, Duration.Inf)).filter(_._2).map(_._1).toSet
Peter van 't Hof's avatar
Peter van 't Hof committed
167
168
  }

169
170
171
172
173
174
175
176
177
178
179
  class Job(val name: String, map: Map[String, Any]) {

    def doneAtStart: Boolean = ConfigUtils.any2boolean(map("done_at_start"))

    def failFiles = ConfigUtils.any2fileList(map("fail_files"))
    def doneFiles = ConfigUtils.any2fileList(map("done_files"))
    def outputUsedByJobs = ConfigUtils.any2stringList(map("output_used_by_jobs"))
    def dependsOnJobs = ConfigUtils.any2stringList(map("depends_on_jobs"))
    def stdoutFile = new File(ConfigUtils.any2string(map("stdout_file")))

    def outputsFiles = ConfigUtils.any2fileList(map("outputs"))
180
    def inputFiles = ConfigUtils.any2fileList(map("inputs"))
181
182
183
184
185
186
187
188

    def mainJob = ConfigUtils.any2boolean(map("main_job"))
    def intermediate = ConfigUtils.any2boolean(map("intermediate"))

    def isDone: Future[Boolean] = Future { doneFiles.forall(_.exists()) }
    def isFailed: Future[Boolean] = Future { failFiles.exists(_.exists()) }
  }
}