PipelineStatus.scala 8.76 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
package nl.lumc.sasc.biopet.core

import java.io.{File, PrintWriter}

import nl.lumc.sasc.biopet.utils.{ConfigUtils, ToolCommand}

import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._

/**
  * Created by pjvan_thof on 7-12-16.
  */
object PipelineStatus extends ToolCommand {
Peter van 't Hof's avatar
Peter van 't Hof committed
15
16
17
18
19
  case class Args(pipelineDir: File = null,
                  depsFile: Option[File] = None,
                  outputDir: File = null,
                  follow: Boolean = false,
                  refreshTime: Int = 30) extends AbstractArgs
20
21
22
23
24
25
26
27
28
29
30

  class OptParser extends AbstractOptParser {
    opt[File]('d', "pipelineDir") required () maxOccurs 1 valueName "<file>" action { (x, c) =>
      c.copy(pipelineDir = x)
    } text "Output directory of the pipeline"
    opt[File]('o', "outputDir") required () maxOccurs 1 valueName "<file>" action { (x, c) =>
      c.copy(outputDir = x)
    } text "Output directory of this tool"
    opt[File]("depsFile") maxOccurs 1 valueName "<file>" action { (x, c) =>
      c.copy(depsFile = Some(x))
    } text "Location of deps file, not required"
Peter van 't Hof's avatar
Peter van 't Hof committed
31
32
33
34
35
36
37
    opt[Unit]('f', "follow") maxOccurs 1 action { (x, c) =>
      c.copy(follow = true)
    } text "This will follow a run"
    opt[Int]("refresh") maxOccurs 1 action { (x, c) =>
      c.copy(refreshTime = x)
    } text "Time to check again, default set on 30 seconds"

38
39
40
41
42
43
44
45
  }

  def main(args: Array[String]): Unit = {
    logger.info("Start")

    val argsParser = new OptParser
    val cmdArgs: Args = argsParser.parse(args, Args()) getOrElse (throw new IllegalArgumentException)

Peter van 't Hof's avatar
Peter van 't Hof committed
46
47
    val depsFile = cmdArgs.depsFile.getOrElse(getDepsFileFromDir(cmdArgs.pipelineDir))
    val deps = readDepsFile(depsFile)
Peter van 't Hof's avatar
Peter van 't Hof committed
48
    writePipelineStatus(deps, cmdArgs.outputDir, follow = cmdArgs.follow, refreshTime = cmdArgs.refreshTime)
49
50
51
    logger.info("Done")
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
52
53
54
55
56
57
58
59
60
61
62
63
64
  def getDepsFileFromDir(pipelineDir: File): File = {
    require(pipelineDir.exists(), s"pipelineDir does not exist: $pipelineDir")
    val logDir = new File(pipelineDir, ".log")
    require(logDir.exists(), s"No .log dir found in pipelineDir")
    val runLogDir = logDir.list().sorted.map(new File(logDir, _)).filter(_.isDirectory).last
    val graphDir = new File(runLogDir, "graph")
    require(graphDir.exists(), s"Graph dir is not found: $graphDir")
    new File(graphDir, "deps.json")
  }

  case class Deps(jobs: Map[String, Job], files: Array[Map[String, Any]])

  def readDepsFile(depsFile: File) = {
65
66
67
    val deps = ConfigUtils.fileToConfigMap(depsFile)

    val jobs = ConfigUtils.any2map(deps("jobs")).map(x => x._1 -> new Job(x._1, ConfigUtils.any2map(x._2)))
Peter van 't Hof's avatar
Peter van 't Hof committed
68
    val files = ConfigUtils.any2list(deps("files")).map(x => ConfigUtils.any2map(x)).toArray
69

Peter van 't Hof's avatar
Peter van 't Hof committed
70
71
72
73
74
    Deps(jobs, files)
  }

  def writePipelineStatus(deps: Deps,
                          outputDir: File,
Peter van 't Hof's avatar
Peter van 't Hof committed
75
76
77
78
                          alreadyDone: Set[String] = Set(),
                          alreadyFailed: Set[String] = Set(),
                          follow: Boolean = false,
                          refreshTime: Int = 30): Unit = {
Peter van 't Hof's avatar
Peter van 't Hof committed
79
80

    val jobDone = jobsDone(deps)
Peter van 't Hof's avatar
Peter van 't Hof committed
81
82
    val jobFailed = jobsFailed(deps, jobDone)
    val jobsStart = jobsReadyStart(deps, jobDone)
Peter van 't Hof's avatar
Peter van 't Hof committed
83

Peter van 't Hof's avatar
Peter van 't Hof committed
84
    val jobsDeps = deps.jobs.map(x => x._1 -> (x._2.dependsOnJobs match {
85
86
87
88
89
90
      case l: List[_] => l.map(_.toString)
      case _          => throw new IllegalStateException("Value 'depends_on_jobs' is not a list")
    }))
    val jobsWriter = new PrintWriter(new File(outputDir, s"jobs.json"))
    jobsWriter.println(ConfigUtils.mapToJson(jobsDeps).spaces2)
    jobsWriter.close()
Peter van 't Hof's avatar
Peter van 't Hof committed
91
92
    writeGraphvizFile(jobsDeps, new File(outputDir, s"jobs.gv"), jobDone, jobFailed, jobsStart, deps)
    writeGraphvizFile(compressOnType(jobsDeps), new File(outputDir, s"compress.jobs.gv"), jobDone, jobFailed, jobsStart, deps)
93

Peter van 't Hof's avatar
Peter van 't Hof committed
94
    val mainJobs = deps.jobs.filter(_._2.mainJob == true).map {
95
      case (name, job) =>
Peter van 't Hof's avatar
Peter van 't Hof committed
96
        name -> getMainDependencies(name, deps)
97
98
99
100
101
    }

    val mainJobsWriter = new PrintWriter(new File(outputDir, s"main_jobs.json"))
    mainJobsWriter.println(ConfigUtils.mapToJson(mainJobs).spaces2)
    mainJobsWriter.close()
Peter van 't Hof's avatar
Peter van 't Hof committed
102
103
104
105
106
107
108
109
    writeGraphvizFile(mainJobs, new File(outputDir, s"main_jobs.gv"), jobDone, jobFailed, jobsStart, deps)
    writeGraphvizFile(compressOnType(mainJobs), new File(outputDir, s"compress.main_jobs.gv"), jobDone, jobFailed, jobsStart, deps)

    val totalJobs = deps.jobs.size
    val totalStart = jobsStart.size
    val totalDone = jobDone.size
    val totalFailed = jobFailed.size
    val totalPending = totalJobs - jobsStart.size - jobDone.size - jobFailed.size
110

Peter van 't Hof's avatar
Peter van 't Hof committed
111
    logger.info(s"Total job: ${totalJobs}, Pending: ${totalPending}, Ready to run / running: ${totalStart}, Done: ${totalDone}, Failed ${totalFailed}")
112

Peter van 't Hof's avatar
Peter van 't Hof committed
113
114
115
116
    if (follow) {
      Thread.sleep(refreshTime * 1000)
      writePipelineStatus(deps, outputDir, jobDone, jobFailed, follow)
    }
117
118
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
119
120
  def getMainDependencies(jobName: String, deps: Deps): List[String] = {
    val job = deps.jobs(jobName)
121
122
123
124
    val dependencies = job.dependsOnJobs match {
      case l: List[_] => l.map(_.toString)
    }
    dependencies.flatMap { dep =>
Peter van 't Hof's avatar
Peter van 't Hof committed
125
      deps.jobs(dep).mainJob match {
126
        case true  => List(dep)
Peter van 't Hof's avatar
Peter van 't Hof committed
127
        case false => getMainDependencies(dep, deps)
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
      }
    }.distinct
  }

  val numberRegex = """(.*)_(\d*)$""".r
  def compressOnType(jobs: Map[String, List[String]]): Map[String, List[String]] = {
    val set = for ((job, deps) <- jobs.toSet; dep <- deps) yield {
      job match {
        case numberRegex(name, number) => (name, dep match {
          case numberRegex(name, number) => name
        })
      }
    }
    set.groupBy(_._1).map(x => x._1 -> x._2.map(_._2).toList)
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
144
145
  def writeGraphvizFile(jobsDeps: Map[String, List[String]],
                        outputFile: File,
Peter van 't Hof's avatar
Peter van 't Hof committed
146
147
                        jobDone: Set[String],
                        jobFailed: Set[String],
Peter van 't Hof's avatar
Peter van 't Hof committed
148
                        jobsStart: Set[String],
Peter van 't Hof's avatar
Peter van 't Hof committed
149
                        deps: Deps): Unit = {
150
151
    val writer = new PrintWriter(outputFile)
    writer.println("digraph graphname {")
Peter van 't Hof's avatar
Peter van 't Hof committed
152
    jobDone
Peter van 't Hof's avatar
Peter van 't Hof committed
153
154
      .filter(x => jobsDeps.contains(x))
      .foreach(x => writer.println(s"  $x [color = green]"))
Peter van 't Hof's avatar
Peter van 't Hof committed
155
    jobFailed
Peter van 't Hof's avatar
Peter van 't Hof committed
156
157
      .filter(x => jobsDeps.contains(x))
      .foreach(x => writer.println(s"  $x [color = red]"))
Peter van 't Hof's avatar
Peter van 't Hof committed
158
    jobsStart
Peter van 't Hof's avatar
Peter van 't Hof committed
159
      .filter(x => jobsDeps.contains(x))
Peter van 't Hof's avatar
Peter van 't Hof committed
160
      .diff(jobDone)
Peter van 't Hof's avatar
Peter van 't Hof committed
161
      .foreach(x => writer.println(s"  $x [color = orange]"))
Peter van 't Hof's avatar
Peter van 't Hof committed
162
    deps.jobs
Peter van 't Hof's avatar
Peter van 't Hof committed
163
164
165
      .filter(x => jobsDeps.contains(x._1))
      .filter(_._2.intermediate)
      .foreach(x => writer.println(s"  ${x._1} [style = dashed]"))
Peter van 't Hof's avatar
Peter van 't Hof committed
166
    jobsDeps.foreach { case (a, b) => b.foreach(c => writer.println(s"  $c -> $a;")) }
167
168
169
170
    writer.println("}")
    writer.close()
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
171
172
  def jobsReadyStart(deps: Deps, jobsDone: Set[String]): Set[String] = {
    deps.jobs.filterNot(x => jobsDone.contains(x._1)).filter(_._2.dependsOnJobs.forall(jobsDone)).keySet
Peter van 't Hof's avatar
Peter van 't Hof committed
173
174
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
175
176
177
  def jobsDone(deps: Deps, alreadyDone: Set[String] = Set()): Set[String] = {
    val f = deps.jobs.filterNot(x => alreadyDone.contains(x._1)).map(x => x._2 -> x._2.isDone)
    val dones = f.map(x => x._1 -> Await.result(x._2, Duration.Inf)).filter(_._2).map(_._1.name).toSet ++ alreadyDone
178
179
    val f2 = f.map(x => x._1 -> x._2.map{ d =>
      if (d || !x._1.intermediate) d
Peter van 't Hof's avatar
Peter van 't Hof committed
180
      else upstreamJobDone(x._1, dones, deps)
181
    })
Peter van 't Hof's avatar
Peter van 't Hof committed
182
183
    val d = f2.map(x => x._1 -> Await.result(x._2, Duration.Inf))
    d.filter(_._2).map(_._1.name).toSet
184
185
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
186
  private def upstreamJobDone(job: Job, dones: Set[String], deps: Deps): Boolean = {
Peter van 't Hof's avatar
Peter van 't Hof committed
187
    job.outputUsedByJobs.map(deps.jobs)
Peter van 't Hof's avatar
Peter van 't Hof committed
188
      .exists(x => dones.contains(x.name) || (x.intermediate && upstreamJobDone(x, dones, deps)))
189
190
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
191
192
193
194
195
  def jobsFailed(deps: Deps, dones: Set[String], alreadyFailed: Set[String] = Set()): Set[String] = {
    val f = deps.jobs.filterNot(x => dones.contains(x._1))
      .filterNot(x => alreadyFailed.contains(x._1)).map(x => x._1 -> x._2.isFailed)
    val bla = f.map(x => x._1 -> Await.result(x._2, Duration.Inf)).filter(_._2).keySet
    bla ++ alreadyFailed
Peter van 't Hof's avatar
Peter van 't Hof committed
196
197
  }

198
199
200
201
202
203
204
205
206
207
208
  class Job(val name: String, map: Map[String, Any]) {

    def doneAtStart: Boolean = ConfigUtils.any2boolean(map("done_at_start"))

    def failFiles = ConfigUtils.any2fileList(map("fail_files"))
    def doneFiles = ConfigUtils.any2fileList(map("done_files"))
    def outputUsedByJobs = ConfigUtils.any2stringList(map("output_used_by_jobs"))
    def dependsOnJobs = ConfigUtils.any2stringList(map("depends_on_jobs"))
    def stdoutFile = new File(ConfigUtils.any2string(map("stdout_file")))

    def outputsFiles = ConfigUtils.any2fileList(map("outputs"))
209
    def inputFiles = ConfigUtils.any2fileList(map("inputs"))
210
211
212
213
214

    def mainJob = ConfigUtils.any2boolean(map("main_job"))
    def intermediate = ConfigUtils.any2boolean(map("intermediate"))

    def isDone: Future[Boolean] = Future { doneFiles.forall(_.exists()) }
Peter van 't Hof's avatar
Peter van 't Hof committed
215
216
217
    def isFailed: Future[Boolean] = Future {
      failFiles.exists(_.exists())
    }
218
219
  }
}