PipelineStatus.scala 13.1 KB
Newer Older
Peter van 't Hof's avatar
Peter van 't Hof committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/**
 * Biopet is built on top of GATK Queue for building bioinformatic
 * pipelines. It is mainly intended to support LUMC SHARK cluster which is running
 * SGE. But other types of HPC that are supported by GATK Queue (such as PBS)
 * should also be able to execute Biopet tools and pipelines.
 *
 * Copyright 2014 Sequencing Analysis Support Core - Leiden University Medical Center
 *
 * Contact us at: sasc@lumc.nl
 *
 * A dual licensing mode is applied. The source code within this project is freely available for non-commercial use under an AGPL
 * license; For commercial users or users who do not want to follow the AGPL
 * license, please contact us to obtain a separate license.
 */
15 16
package nl.lumc.sasc.biopet.core

Peter van 't Hof's avatar
Peter van 't Hof committed
17
import java.io.{ File, PrintWriter }
18

Peter van 't Hof's avatar
Peter van 't Hof committed
19
import nl.lumc.sasc.biopet.utils.{ ConfigUtils, ToolCommand }
20

Peter van 't Hof's avatar
Peter van 't Hof committed
21
import scala.concurrent.{ Await, Future }
22 23
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
24
import scala.sys.process.Process
25 26

/**
Peter van 't Hof's avatar
Peter van 't Hof committed
27 28
 * Created by pjvan_thof on 7-12-16.
 */
29
object PipelineStatus extends ToolCommand {
Peter van 't Hof's avatar
Peter van 't Hof committed
30 31 32 33
  case class Args(pipelineDir: File = null,
                  depsFile: Option[File] = None,
                  outputDir: File = null,
                  follow: Boolean = false,
34
                  refreshTime: Int = 30,
35
                  complatePlots: Boolean = false,
36
                  compressPlots: Boolean = true) extends AbstractArgs
37 38 39 40 41 42 43 44 45 46 47

  class OptParser extends AbstractOptParser {
    opt[File]('d', "pipelineDir") required () maxOccurs 1 valueName "<file>" action { (x, c) =>
      c.copy(pipelineDir = x)
    } text "Output directory of the pipeline"
    opt[File]('o', "outputDir") required () maxOccurs 1 valueName "<file>" action { (x, c) =>
      c.copy(outputDir = x)
    } text "Output directory of this tool"
    opt[File]("depsFile") maxOccurs 1 valueName "<file>" action { (x, c) =>
      c.copy(depsFile = Some(x))
    } text "Location of deps file, not required"
Peter van 't Hof's avatar
Peter van 't Hof committed
48 49 50 51 52 53
    opt[Unit]('f', "follow") maxOccurs 1 action { (x, c) =>
      c.copy(follow = true)
    } text "This will follow a run"
    opt[Int]("refresh") maxOccurs 1 action { (x, c) =>
      c.copy(refreshTime = x)
    } text "Time to check again, default set on 30 seconds"
54 55 56
    opt[Unit]("completePlots") maxOccurs 1 action { (x, c) =>
      c.copy(complatePlots = true)
    } text "Add complete plots, this is disabled because of performance. " +
Sander Bollen's avatar
Sander Bollen committed
57
      "Complete plots does show each job separated, while compressed plots collapse all jobs of the same type together."
58 59
    opt[Unit]("skipCompressPlots") maxOccurs 1 action { (x, c) =>
      c.copy(compressPlots = false)
Sander Bollen's avatar
Sander Bollen committed
60
    } text "Disable compressed plots. By default compressed plots are enabled."
61 62 63 64 65 66 67 68
  }

  def main(args: Array[String]): Unit = {
    logger.info("Start")

    val argsParser = new OptParser
    val cmdArgs: Args = argsParser.parse(args, Args()) getOrElse (throw new IllegalArgumentException)

Peter van 't Hof's avatar
Peter van 't Hof committed
69 70
    val depsFile = cmdArgs.depsFile.getOrElse(getDepsFileFromDir(cmdArgs.pipelineDir))
    val deps = readDepsFile(depsFile)
71
    writePipelineStatus(deps, cmdArgs.outputDir, follow = cmdArgs.follow, refreshTime = cmdArgs.refreshTime,
72
      plots = cmdArgs.complatePlots, compressPlots = cmdArgs.compressPlots)
73 74 75
    logger.info("Done")
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
76 77 78 79 80 81 82 83 84 85 86 87 88
  def getDepsFileFromDir(pipelineDir: File): File = {
    require(pipelineDir.exists(), s"pipelineDir does not exist: $pipelineDir")
    val logDir = new File(pipelineDir, ".log")
    require(logDir.exists(), s"No .log dir found in pipelineDir")
    val runLogDir = logDir.list().sorted.map(new File(logDir, _)).filter(_.isDirectory).last
    val graphDir = new File(runLogDir, "graph")
    require(graphDir.exists(), s"Graph dir is not found: $graphDir")
    new File(graphDir, "deps.json")
  }

  case class Deps(jobs: Map[String, Job], files: Array[Map[String, Any]])

  def readDepsFile(depsFile: File) = {
89 90 91
    val deps = ConfigUtils.fileToConfigMap(depsFile)

    val jobs = ConfigUtils.any2map(deps("jobs")).map(x => x._1 -> new Job(x._1, ConfigUtils.any2map(x._2)))
Peter van 't Hof's avatar
Peter van 't Hof committed
92
    val files = ConfigUtils.any2list(deps("files")).map(x => ConfigUtils.any2map(x)).toArray
93

Peter van 't Hof's avatar
Peter van 't Hof committed
94 95 96 97 98
    Deps(jobs, files)
  }

  def writePipelineStatus(deps: Deps,
                          outputDir: File,
Peter van 't Hof's avatar
Peter van 't Hof committed
99 100 101
                          alreadyDone: Set[String] = Set(),
                          alreadyFailed: Set[String] = Set(),
                          follow: Boolean = false,
102
                          refreshTime: Int = 30,
103 104
                          plots: Boolean = false,
                          compressPlots: Boolean = true): Unit = {
Peter van 't Hof's avatar
Peter van 't Hof committed
105 106

    val jobDone = jobsDone(deps)
Peter van 't Hof's avatar
Peter van 't Hof committed
107 108
    val jobFailed = jobsFailed(deps, jobDone)
    val jobsStart = jobsReadyStart(deps, jobDone)
Peter van 't Hof's avatar
Peter van 't Hof committed
109

110 111
    var futures: List[Future[Any]] = Nil

Peter van 't Hof's avatar
Peter van 't Hof committed
112
    val jobsDeps = deps.jobs.map(x => x._1 -> (x._2.dependsOnJobs match {
113 114 115 116 117
      case l: List[_] => l.map(_.toString)
    }))
    val jobsWriter = new PrintWriter(new File(outputDir, s"jobs.json"))
    jobsWriter.println(ConfigUtils.mapToJson(jobsDeps).spaces2)
    jobsWriter.close()
118
    futures :+= writeGraphvizFile(jobsDeps, new File(outputDir, s"jobs.gv"), jobDone, jobFailed, jobsStart, deps, plots, plots)
Peter van 't Hof's avatar
Peter van 't Hof committed
119
    futures :+= writeGraphvizFile(jobsDeps, new File(outputDir, s"compress.jobs.gv"), jobDone, jobFailed, jobsStart, deps, compressPlots, compressPlots, compress = true)
120

Peter van 't Hof's avatar
Peter van 't Hof committed
121
    val mainJobs = deps.jobs.filter(_._2.mainJob == true).map {
122
      case (name, job) =>
Peter van 't Hof's avatar
Peter van 't Hof committed
123
        name -> getMainDependencies(name, deps)
124 125 126 127 128
    }

    val mainJobsWriter = new PrintWriter(new File(outputDir, s"main_jobs.json"))
    mainJobsWriter.println(ConfigUtils.mapToJson(mainJobs).spaces2)
    mainJobsWriter.close()
129
    futures :+= writeGraphvizFile(mainJobs, new File(outputDir, s"main_jobs.gv"), jobDone, jobFailed, jobsStart, deps, plots, plots)
Peter van 't Hof's avatar
Peter van 't Hof committed
130
    futures :+= writeGraphvizFile(mainJobs, new File(outputDir, s"compress.main_jobs.gv"), jobDone, jobFailed, jobsStart, deps, compressPlots, compressPlots, compress = true)
Peter van 't Hof's avatar
Peter van 't Hof committed
131 132 133 134 135 136

    val totalJobs = deps.jobs.size
    val totalStart = jobsStart.size
    val totalDone = jobDone.size
    val totalFailed = jobFailed.size
    val totalPending = totalJobs - jobsStart.size - jobDone.size - jobFailed.size
137

Peter van 't Hof's avatar
Peter van 't Hof committed
138 139
    futures.foreach(x => Await.ready(x, Duration.Inf))

Peter van 't Hof's avatar
Peter van 't Hof committed
140
    logger.info(s"Total job: ${totalJobs}, Pending: ${totalPending}, Ready to run / running: ${totalStart}, Done: ${totalDone}, Failed ${totalFailed}")
141

Peter van 't Hof's avatar
Peter van 't Hof committed
142 143 144 145
    if (follow) {
      Thread.sleep(refreshTime * 1000)
      writePipelineStatus(deps, outputDir, jobDone, jobFailed, follow)
    }
146 147
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
148 149
  def getMainDependencies(jobName: String, deps: Deps): List[String] = {
    val job = deps.jobs(jobName)
150 151 152 153
    val dependencies = job.dependsOnJobs match {
      case l: List[_] => l.map(_.toString)
    }
    dependencies.flatMap { dep =>
Peter van 't Hof's avatar
Peter van 't Hof committed
154
      deps.jobs(dep).mainJob match {
155
        case true  => List(dep)
Peter van 't Hof's avatar
Peter van 't Hof committed
156
        case false => getMainDependencies(dep, deps)
157 158 159 160 161 162 163
      }
    }.distinct
  }

  val numberRegex = """(.*)_(\d*)$""".r
  def compressOnType(jobs: Map[String, List[String]]): Map[String, List[String]] = {
    val set = for ((job, deps) <- jobs.toSet; dep <- deps) yield {
164
      (compressedName(job)._1, compressedName(dep)._1)
165
    }
166
    // This will collapse a Set[(String, String)] to a Map[String, List[String]]
Peter van 't Hof's avatar
Peter van 't Hof committed
167
    set.groupBy(_._1).map(x => x._1 -> x._2.map(_._2).toList) ++ jobs.filter(_._2.isEmpty).map(job => compressedName(job._1)._1 -> Nil)
168 169
  }

170
  def compressedName(jobName: String) = jobName match {
Peter van 't Hof's avatar
Peter van 't Hof committed
171
    case numberRegex(name, number) => (name, number.toInt)
172 173
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
174 175
  def writeGraphvizFile(jobsDeps: Map[String, List[String]],
                        outputFile: File,
Peter van 't Hof's avatar
Peter van 't Hof committed
176 177
                        jobDone: Set[String],
                        jobFailed: Set[String],
Peter van 't Hof's avatar
Peter van 't Hof committed
178
                        jobsStart: Set[String],
179
                        deps: Deps,
Peter van 't Hof's avatar
Peter van 't Hof committed
180 181
                        png: Boolean = true, svg: Boolean = true, compress: Boolean = false): Future[Unit] = Future {
    val graph = if (compress) compressOnType(jobsDeps) else jobsDeps
182 183
    val writer = new PrintWriter(outputFile)
    writer.println("digraph graphname {")
Peter van 't Hof's avatar
Peter van 't Hof committed
184

Peter van 't Hof's avatar
Peter van 't Hof committed
185 186 187 188 189 190 191 192 193 194 195 196 197 198 199
    graph.foreach {
      case (job, jobDeps) =>
        // Writing color of node
        val compressTotal = if (compress) Some(deps.jobs.keys.filter(compressedName(_)._1 == job)) else None
        val compressDone = if (compress) Some(jobDone.filter(compressedName(_)._1 == job)) else None
        val compressFailed = if (compress) Some(jobFailed.filter(compressedName(_)._1 == job)) else None
        val compressStart = if (compress) Some(jobsStart.filter(compressedName(_)._1 == job)) else None
        val compressIntermediate = if (compress) Some(deps.jobs.filter(x => compressedName(x._1)._1 == job).forall(_._2.intermediate)) else None

        if (compress) {
          val pend = compressTotal.get.size - compressFailed.get.filterNot(compressStart.get.contains(_)).size - compressStart.get.size - compressDone.get.size
          writer.println(s"""  $job [label = "$job
        |Total: ${compressTotal.get.size}
        |Fail: ${compressFailed.get.size}
        |Pend:${pend}
Peter van 't Hof's avatar
Peter van 't Hof committed
200
        |Run: ${compressStart.get.filterNot(compressFailed.get.contains(_)).size}
Peter van 't Hof's avatar
Peter van 't Hof committed
201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226
        |Done: ${compressDone.get.size}"]""".stripMargin)
        }

        if (jobDone.contains(job) || compress && compressTotal == compressDone) writer.println(s"  $job [color = green]")
        else if (jobFailed.contains(job) || compress && compressFailed.get.nonEmpty) writer.println(s"  $job [color = red]")
        else if (jobsStart.contains(job) || compress && compressTotal == compressStart) writer.println(s"  $job [color = orange]")

        // Dashed lined for intermediate jobs
        if ((deps.jobs.contains(job) && deps.jobs(job).intermediate) || (compressIntermediate == Some(true)))
          writer.println(s"  $job [style = dashed]")

        // Writing Node deps
        jobDeps.foreach { dep =>
          if (compress) {
            val depsNames = deps.jobs.filter(x => compressedName(x._1)._1 == dep)
              .filter(_._2.outputUsedByJobs.exists(x => compressedName(x)._1 == job))
              .map(x => x._1 -> x._2.outputUsedByJobs.filter(x => compressedName(x)._1 == job))
            val total = depsNames.size
            val done = depsNames.map(x => x._2.exists(y => jobDone.contains(x._1))).count(_ == true).toFloat / total
            val fail = depsNames.map(x => x._2.exists(y => jobFailed.contains(x._1))).count(_ == true).toFloat / total
            val start = (depsNames.map(x => x._2.exists(y => jobsStart.contains(x._1))).count(_ == true).toFloat / total) - fail
            if (total > 0) writer.println(s"""  $dep -> $job [color="red;%f:orange;%f:green;%f:black;%f"];"""
              .format(fail, start, done, 1.0f - done - fail - start))
            else writer.println(s"  $dep -> $job;")
          } else writer.println(s"  $dep -> $job;")
        }
Peter van 't Hof's avatar
Peter van 't Hof committed
227
    }
228 229
    writer.println("}")
    writer.close()
230

Peter van 't Hof's avatar
Peter van 't Hof committed
231
    writeGvToPlot(outputFile, png, svg)
232 233
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
234 235 236
  def writeGvToPlot(input: File, png: Boolean = true, svg: Boolean = true): Unit = {
    if (png) Process(Seq("dot", "-Tpng", "-O", input.getAbsolutePath)).run().exitValue()
    if (svg) Process(Seq("dot", "-Tsvg", "-O", input.getAbsolutePath)).run().exitValue()
237 238
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
239 240
  def jobsReadyStart(deps: Deps, jobsDone: Set[String]): Set[String] = {
    deps.jobs.filterNot(x => jobsDone.contains(x._1)).filter(_._2.dependsOnJobs.forall(jobsDone)).keySet
Peter van 't Hof's avatar
Peter van 't Hof committed
241 242
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
243 244 245
  def jobsDone(deps: Deps, alreadyDone: Set[String] = Set()): Set[String] = {
    val f = deps.jobs.filterNot(x => alreadyDone.contains(x._1)).map(x => x._2 -> x._2.isDone)
    val dones = f.map(x => x._1 -> Await.result(x._2, Duration.Inf)).filter(_._2).map(_._1.name).toSet ++ alreadyDone
Peter van 't Hof's avatar
Peter van 't Hof committed
246
    val f2 = f.map(x => x._1 -> x._2.map { d =>
247
      if (d || !x._1.intermediate) d
Peter van 't Hof's avatar
Peter van 't Hof committed
248
      else upstreamJobDone(x._1, dones, deps)
249
    })
Peter van 't Hof's avatar
Peter van 't Hof committed
250 251
    val d = f2.map(x => x._1 -> Await.result(x._2, Duration.Inf))
    d.filter(_._2).map(_._1.name).toSet
252 253
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
254
  private def upstreamJobDone(job: Job, dones: Set[String], deps: Deps): Boolean = {
Peter van 't Hof's avatar
Peter van 't Hof committed
255
    job.outputUsedByJobs.map(deps.jobs)
Peter van 't Hof's avatar
Peter van 't Hof committed
256
      .exists(x => dones.contains(x.name) || (x.intermediate && upstreamJobDone(x, dones, deps)))
257 258
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
259 260 261
  def jobsFailed(deps: Deps, dones: Set[String], alreadyFailed: Set[String] = Set()): Set[String] = {
    val f = deps.jobs.filterNot(x => dones.contains(x._1))
      .filterNot(x => alreadyFailed.contains(x._1)).map(x => x._1 -> x._2.isFailed)
262
    f.map(x => x._1 -> Await.result(x._2, Duration.Inf)).filter(_._2).keySet ++ alreadyFailed
Peter van 't Hof's avatar
Peter van 't Hof committed
263 264
  }

265 266 267 268 269 270 271 272 273 274 275
  class Job(val name: String, map: Map[String, Any]) {

    def doneAtStart: Boolean = ConfigUtils.any2boolean(map("done_at_start"))

    def failFiles = ConfigUtils.any2fileList(map("fail_files"))
    def doneFiles = ConfigUtils.any2fileList(map("done_files"))
    def outputUsedByJobs = ConfigUtils.any2stringList(map("output_used_by_jobs"))
    def dependsOnJobs = ConfigUtils.any2stringList(map("depends_on_jobs"))
    def stdoutFile = new File(ConfigUtils.any2string(map("stdout_file")))

    def outputsFiles = ConfigUtils.any2fileList(map("outputs"))
276
    def inputFiles = ConfigUtils.any2fileList(map("inputs"))
277 278 279 280 281

    def mainJob = ConfigUtils.any2boolean(map("main_job"))
    def intermediate = ConfigUtils.any2boolean(map("intermediate"))

    def isDone: Future[Boolean] = Future { doneFiles.forall(_.exists()) }
282
    def isFailed: Future[Boolean] = Future { failFiles.exists(_.exists()) }
283 284
  }
}