Commit 23f8f923 authored by Peter van 't Hof's avatar Peter van 't Hof Committed by GitHub
Browse files

Merge pull request #150 from biopet/fix-BIOPET-492

First implementation of PIM in biopet
parents 6edd8e7b 8d5abea6
......@@ -16,10 +16,14 @@ package nl.lumc.sasc.biopet.core
import java.io.{File, PrintWriter}
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import nl.lumc.sasc.biopet.core.pipelinestatus.{Deps, PipelineStatus}
import nl.lumc.sasc.biopet.core.summary.WriteSummary
import nl.lumc.sasc.biopet.utils.config.Configurable
import nl.lumc.sasc.biopet.utils.{ConfigUtils, Logging}
import org.broadinstitute.gatk.queue.function.{CommandLineFunction, QFunction}
import play.api.libs.ws.ahc.AhcWSClient
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
......@@ -47,7 +51,7 @@ object WriteDependencies extends Logging with Configurable {
* This method will generate a json file where information about job and file dependencies are stored
*
* @param functions This should be all functions that are given to the graph of Queue
* @param outputDir
* @param outputDir Output where the files will be placed
*/
def writeDependencies(functions: Seq[QFunction], outputDir: File): Unit = {
outputDir.mkdirs()
......@@ -59,17 +63,17 @@ object WriteDependencies extends Logging with Configurable {
case class QueueFile(file: File) {
private val inputJobs: ListBuffer[QFunction] = ListBuffer()
def addInputJob(function: QFunction) = inputJobs += function
def inputJobNames = inputJobs.toList.map(functionNames)
def addInputJob(function: QFunction): Unit = inputJobs += function
def inputJobNames: List[String] = inputJobs.toList.map(functionNames)
private val outputJobs: ListBuffer[QFunction] = ListBuffer()
def addOutputJob(function: QFunction) = {
def addOutputJob(function: QFunction): Unit = {
if (outputJobs.nonEmpty) logger.warn(s"File '$file' is found as output of multiple jobs")
outputJobs += function
}
def outputJobNames = outputJobs.toList.map(functionNames)
def outputJobNames: List[String] = outputJobs.toList.map(functionNames)
def getMap = {
def getMap: Map[String, Any] = {
val fileExist = file.exists()
if (!fileExist && outputJobs.isEmpty) {
if (errorOnMissingInput) Logging.addError(s"Input file does not exist: $file")
......@@ -85,7 +89,7 @@ object WriteDependencies extends Logging with Configurable {
)
}
def isIntermediate = outputJobs.exists(_.isIntermediate)
def isIntermediate: Boolean = outputJobs.exists(_.isIntermediate)
}
val files: mutable.Map[File, QueueFile] = mutable.Map()
......@@ -161,8 +165,15 @@ object WriteDependencies extends Logging with Configurable {
.spaces2)
writer.close()
PipelineStatus.writePipelineStatus(PipelineStatus.readDepsFile(outputFile), outputDir)
logger.info("done calculating dependencies")
implicit lazy val system = ActorSystem()
implicit lazy val materializer = ActorMaterializer()
implicit lazy val ws = AhcWSClient()
PipelineStatus.writePipelineStatus(Deps.readDepsFile(outputFile), outputDir)
ws.close()
system.terminate()
}
}
package nl.lumc.sasc.biopet.core.pipelinestatus
import java.io.File
import nl.lumc.sasc.biopet.core.pipelinestatus.PipelineStatus.logger
import nl.lumc.sasc.biopet.utils.ConfigUtils
import nl.lumc.sasc.biopet.utils.pim._
import play.api.libs.ws.WSResponse
import play.api.libs.ws.ahc.AhcWSClient
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
/**
* This class can store the deps.json from a pipeline that stores all jobs and files and the connections
*
* Created by pjvanthof on 24/06/2017.
*/
case class Deps(jobs: Map[String, Job], files: Array[Map[String, Any]]) {
/**
* This method will compress the graph by combining all common job names
* @param main When set true the non main jobs will be skipped in the graph
* @return List of dependencies
*/
def compressOnType(main: Boolean = false): Map[String, List[String]] = {
(for ((_, job) <- jobs.toSet if !main || job.mainJob) yield {
job.name -> (if (main) getMainDependencies(job.name).map(Job.compressedName(_)._1)
else job.dependsOnJobs.map(Job.compressedName(_)._1))
}).groupBy(x => Job.compressedName(x._1)._1)
.map(x => x._1 -> x._2.flatMap(_._2).toList.distinct)
}
/** this will return all main dependencies */
def getMainDeps: Map[String, List[String]] = {
jobs.filter(_._2.mainJob).map(x => x._1 -> getMainDependencies(x._1))
}
/**
* This will return for a single job the main dependencies.
* When a job depend on a non main job it will take the dependencies from that job till it finds a main dependency
*/
def getMainDependencies(jobName: String): List[String] = {
val job = this.jobs(jobName)
val dependencies = job.dependsOnJobs match {
case l: List[_] => l.map(_.toString)
}
dependencies.flatMap { dep =>
if (this.jobs(dep).mainJob) List(dep)
else getMainDependencies(dep)
}.distinct
}
/** This publish the graph to a pim host */
def publishCompressedGraphToPim(host: String, runId: String)(
implicit ws: AhcWSClient): Future[WSResponse] = {
val links: List[Link] = this
.compressOnType()
.flatMap(x => x._2.map(y => Link("link", y, "output", x._1, "input", "test")))
.toList
val run = Run(
runId,
Network("graph",
Nil,
this
.compressOnType()
.map(
x =>
Node(x._1,
"root",
List(Port("input", "input")),
List(Port("output", "output")),
"test"))
.toList,
links),
"Biopet pipeline",
"biopet"
)
val request = ws
.url(s"$host/api/runs/")
.withHeaders("Accept" -> "application/json", "Content-Type" -> "application/json")
.put(run.toString)
request.onFailure { case e => logger.warn("Post workflow did fail", e) }
request.onSuccess {
case r if r.status == 200 =>
logger.debug(r)
case r => logger.warn(r)
}
request
}
}
object Deps {
/** This will read a deps.json and returns it as a [[Deps]] class */
def readDepsFile(depsFile: File): Deps = {
val deps = ConfigUtils.fileToConfigMap(depsFile)
val jobs =
ConfigUtils.any2map(deps("jobs")).map(x => x._1 -> new Job(x._1, ConfigUtils.any2map(x._2)))
val files = ConfigUtils.any2list(deps("files")).map(x => ConfigUtils.any2map(x)).toArray
Deps(jobs, files)
}
}
package nl.lumc.sasc.biopet.core.pipelinestatus
import java.io.File
import nl.lumc.sasc.biopet.utils.ConfigUtils
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.matching.Regex
/**
* This class can store a single job from a deps.json
*
* Created by pjvanthof on 24/06/2017.
*/
class Job(val name: String, map: Map[String, Any]) {
/** When true this job was done at the moment of the deps.json creation */
def doneAtStart: Boolean = ConfigUtils.any2boolean(map("done_at_start"))
/** If one of this files exist the job is marked as failed */
def failFiles: List[File] = ConfigUtils.any2fileList(map("fail_files"))
/** If all of this files exist the job is marked as done */
def doneFiles: List[File] = ConfigUtils.any2fileList(map("done_files"))
/** Returns a list of jobs that depends on this job */
def outputUsedByJobs: List[String] = ConfigUtils.any2stringList(map("output_used_by_jobs"))
/** Returns a list of job where this job depends on */
def dependsOnJobs: List[String] = ConfigUtils.any2stringList(map("depends_on_jobs"))
/** Location of the stdout file of this job */
def stdoutFile = new File(ConfigUtils.any2string(map("stdout_file")))
/** All output files of this job */
def outputsFiles: List[File] = ConfigUtils.any2fileList(map("outputs"))
/** All input files of this job */
def inputFiles: List[File] = ConfigUtils.any2fileList(map("inputs"))
/** When true this job is marked as a main job in the graph */
def mainJob: Boolean = ConfigUtils.any2boolean(map("main_job"))
/** When true this job is marked as a intermediate job */
def intermediate: Boolean = ConfigUtils.any2boolean(map("intermediate"))
/** Return a [[Future[Boolean]] to check if the job is done */
def isDone: Future[Boolean] = Future { doneFiles.forall(_.exists()) }
/** Return a [[Future[Boolean]] to check if the job is failed */
def isFailed: Future[Boolean] = Future { failFiles.exists(_.exists()) }
/** Returns the compressed name of this job */
def compressedName: (String, Int) = Job.compressedName(name)
}
object Job {
val numberRegex: Regex = """(.*)_(\d*)$""".r
/** This splits a job name from it's id */
def compressedName(jobName: String): (String, Int) = jobName match {
case Job.numberRegex(name, number) => (name, number.toInt)
}
}
......@@ -12,15 +12,20 @@
* license; For commercial users or users who do not want to follow the AGPL
* license, please contact us to obtain a separate license.
*/
package nl.lumc.sasc.biopet.core
package nl.lumc.sasc.biopet.core.pipelinestatus
import java.io.{File, PrintWriter}
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import nl.lumc.sasc.biopet.utils.pim._
import nl.lumc.sasc.biopet.utils.pim.{Job => PimJob}
import nl.lumc.sasc.biopet.utils.{ConfigUtils, ToolCommand}
import play.api.libs.ws.ahc.AhcWSClient
import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import scala.sys.process.Process
/**
......@@ -33,7 +38,9 @@ object PipelineStatus extends ToolCommand {
follow: Boolean = false,
refreshTime: Int = 30,
complatePlots: Boolean = false,
compressPlots: Boolean = true)
compressPlots: Boolean = true,
pimHost: Option[String] = None,
pimRunId: Option[String] = None)
extends AbstractArgs
class OptParser extends AbstractOptParser {
......@@ -46,19 +53,25 @@ object PipelineStatus extends ToolCommand {
opt[File]("depsFile") maxOccurs 1 valueName "<file>" action { (x, c) =>
c.copy(depsFile = Some(x))
} text "Location of deps file, not required"
opt[Unit]('f', "follow") maxOccurs 1 action { (x, c) =>
opt[Unit]('f', "follow") maxOccurs 1 action { (_, c) =>
c.copy(follow = true)
} text "This will follow a run"
opt[Int]("refresh") maxOccurs 1 action { (x, c) =>
c.copy(refreshTime = x)
} text "Time to check again, default set on 30 seconds"
opt[Unit]("completePlots") maxOccurs 1 action { (x, c) =>
opt[Unit]("completePlots") maxOccurs 1 action { (_, c) =>
c.copy(complatePlots = true)
} text "Add complete plots, this is disabled because of performance. " +
"Complete plots does show each job separated, while compressed plots collapse all jobs of the same type together."
opt[Unit]("skipCompressPlots") maxOccurs 1 action { (x, c) =>
opt[Unit]("skipCompressPlots") maxOccurs 1 action { (_, c) =>
c.copy(compressPlots = false)
} text "Disable compressed plots. By default compressed plots are enabled."
opt[String]("pimHost") maxOccurs 1 action { (x, c) =>
c.copy(pimHost = Some(x))
} text "Pim host to publish status to"
opt[String]("pimRunId") maxOccurs 1 action { (x, c) =>
c.copy(pimRunId = Some(x))
} text "Pim run Id to publish status to"
}
def main(args: Array[String]): Unit = {
......@@ -68,15 +81,42 @@ object PipelineStatus extends ToolCommand {
val cmdArgs
: Args = argsParser.parse(args, Args()) getOrElse (throw new IllegalArgumentException)
implicit lazy val system = ActorSystem()
implicit lazy val materializer = ActorMaterializer()
implicit lazy val ws = AhcWSClient()
val depsFile = cmdArgs.depsFile.getOrElse(getDepsFileFromDir(cmdArgs.pipelineDir))
val deps = readDepsFile(depsFile)
writePipelineStatus(deps,
cmdArgs.outputDir,
follow = cmdArgs.follow,
refreshTime = cmdArgs.refreshTime,
plots = cmdArgs.complatePlots,
compressPlots = cmdArgs.compressPlots)
val deps = Deps.readDepsFile(depsFile)
val pimRunId =
if (cmdArgs.pimHost.isDefined) Some(cmdArgs.pimRunId.getOrElse {
val graphDir = depsFile.getAbsoluteFile.getParentFile
if (graphDir.getName == "graph") "biopet_" + graphDir.getParentFile.getName
else "biopet_" + depsFile.getAbsolutePath.replaceAll("/", "_")
})
else None
if (cmdArgs.pimHost.isDefined) {
require(pimRunId.isDefined, "Could not auto-generate Pim run ID, please supply --pimRunId")
logger.info(s"Status will be pushed to ${cmdArgs.pimHost.get}/run/${pimRunId.get}")
Await.result(deps.publishCompressedGraphToPim(cmdArgs.pimHost.get, pimRunId.get),
Duration.Inf)
}
writePipelineStatus(
deps,
cmdArgs.outputDir,
follow = cmdArgs.follow,
refreshTime = cmdArgs.refreshTime,
plots = cmdArgs.complatePlots,
compressPlots = cmdArgs.compressPlots,
pimHost = cmdArgs.pimHost,
pimRunId = pimRunId
)
logger.info("Done")
ws.close()
system.terminate()
}
def getDepsFileFromDir(pipelineDir: File): File = {
......@@ -89,26 +129,18 @@ object PipelineStatus extends ToolCommand {
new File(graphDir, "deps.json")
}
case class Deps(jobs: Map[String, Job], files: Array[Map[String, Any]])
def readDepsFile(depsFile: File) = {
val deps = ConfigUtils.fileToConfigMap(depsFile)
val jobs =
ConfigUtils.any2map(deps("jobs")).map(x => x._1 -> new Job(x._1, ConfigUtils.any2map(x._2)))
val files = ConfigUtils.any2list(deps("files")).map(x => ConfigUtils.any2map(x)).toArray
Deps(jobs, files)
}
def writePipelineStatus(deps: Deps,
outputDir: File,
alreadyDone: Set[String] = Set(),
alreadyFailed: Set[String] = Set(),
follow: Boolean = false,
refreshTime: Int = 30,
plots: Boolean = false,
compressPlots: Boolean = true): Unit = {
def writePipelineStatus(
deps: Deps,
outputDir: File,
alreadyDone: Set[String] = Set(),
alreadyFailed: Set[String] = Set(),
follow: Boolean = false,
refreshTime: Int = 30,
plots: Boolean = false,
compressPlots: Boolean = true,
pimHost: Option[String] = None,
pimRunId: Option[String] = None,
pimStatus: Map[String, JobStatus.Value] = Map())(implicit ws: AhcWSClient): Unit = {
val jobDone = jobsDone(deps)
val jobFailed = jobsFailed(deps, jobDone)
......@@ -123,16 +155,14 @@ object PipelineStatus extends ToolCommand {
val jobsWriter = new PrintWriter(new File(outputDir, s"jobs.json"))
jobsWriter.println(ConfigUtils.mapToJson(jobsDeps).spaces2)
jobsWriter.close()
futures :+= writeGraphvizFile(jobsDeps,
new File(outputDir, s"jobs.gv"),
futures :+= writeGraphvizFile(new File(outputDir, s"jobs.gv"),
jobDone,
jobFailed,
jobsStart,
deps,
plots,
plots)
futures :+= writeGraphvizFile(jobsDeps,
new File(outputDir, s"compress.jobs.gv"),
futures :+= writeGraphvizFile(new File(outputDir, s"compress.jobs.gv"),
jobDone,
jobFailed,
jobsStart,
......@@ -142,30 +172,29 @@ object PipelineStatus extends ToolCommand {
compress = true)
val mainJobs = deps.jobs.filter(_._2.mainJob == true).map {
case (name, job) =>
name -> getMainDependencies(name, deps)
case (name, _) => name -> deps.getMainDependencies(name)
}
val mainJobsWriter = new PrintWriter(new File(outputDir, s"main_jobs.json"))
mainJobsWriter.println(ConfigUtils.mapToJson(mainJobs).spaces2)
mainJobsWriter.close()
futures :+= writeGraphvizFile(mainJobs,
new File(outputDir, s"main_jobs.gv"),
futures :+= writeGraphvizFile(new File(outputDir, s"main_jobs.gv"),
jobDone,
jobFailed,
jobsStart,
deps,
plots,
plots)
futures :+= writeGraphvizFile(mainJobs,
new File(outputDir, s"compress.main_jobs.gv"),
plots,
main = true)
futures :+= writeGraphvizFile(new File(outputDir, s"compress.main_jobs.gv"),
jobDone,
jobFailed,
jobsStart,
deps,
compressPlots,
compressPlots,
compress = true)
compress = true,
main = true)
val totalJobs = deps.jobs.size
val totalStart = jobsStart.size
......@@ -173,55 +202,75 @@ object PipelineStatus extends ToolCommand {
val totalFailed = jobFailed.size
val totalPending = totalJobs - jobsStart.size - jobDone.size - jobFailed.size
futures.foreach(x => Await.ready(x, Duration.Inf))
futures.foreach(x => Await.result(x, Duration.Inf))
logger.info(
s"Total job: ${totalJobs}, Pending: ${totalPending}, Ready to run / running: ${totalStart}, Done: ${totalDone}, Failed ${totalFailed}")
val putStatuses = pimHost.map { host =>
val runId = pimRunId.getOrElse(
throw new IllegalStateException(
"Pim requires a run id, please supply this with --pimRunId"))
if (follow) {
Thread.sleep(refreshTime * 1000)
writePipelineStatus(deps, outputDir, jobDone, jobFailed, follow)
}
}
val futures = (for (job <- deps.jobs) yield {
val status = job._1 match {
case n if jobsStart.contains(n) => JobStatus.running
case n if jobFailed.contains(n) => JobStatus.failed
case n if jobDone.contains(n) => JobStatus.success
case _ => JobStatus.idle
}
def getMainDependencies(jobName: String, deps: Deps): List[String] = {
val job = deps.jobs(jobName)
val dependencies = job.dependsOnJobs match {
case l: List[_] => l.map(_.toString)
}
dependencies.flatMap { dep =>
deps.jobs(dep).mainJob match {
case true => List(dep)
case false => getMainDependencies(dep, deps)
if (!pimStatus.get(job._1).contains(status)) {
Thread.sleep(20)
Some(
ws.url(s"$host/api/runs/test/jobs/" + job._1)
.withHeaders("Accept" -> "application/json", "Content-Type" -> "application/json")
.put(PimJob(job._1, Job.compressedName(job._1)._1, runId, "none", status).toString)
.map(job._1 -> (_, status)))
} else None
}).flatten
if (logger.isDebugEnabled) futures.foreach(_.onComplete(logger.debug(_)))
futures.foreach { f =>
f.onFailure { case e => logger.warn("Post job did fail", e) }
f.onSuccess {
case r if r._2._1.status == 200 => logger.debug(r)
case r => logger.warn("Post job did fail: " + r)
}
}
}.distinct
}
val numberRegex = """(.*)_(\d*)$""".r
def compressOnType(jobs: Map[String, List[String]]): Map[String, List[String]] = {
val set = for ((job, deps) <- jobs.toSet; dep <- deps) yield {
(compressedName(job)._1, compressedName(dep)._1)
Await.ready(Future.sequence(futures), Duration.Inf)
futures.flatMap(_.value.flatMap(_.toOption)).map(x => x._1 -> x._2._2).toMap
}
// This will collapse a Set[(String, String)] to a Map[String, List[String]]
set.groupBy(_._1).map(x => x._1 -> x._2.map(_._2).toList) ++ jobs
.filter(_._2.isEmpty)
.map(job => compressedName(job._1)._1 -> Nil)
}
logger.info(
s"Total job: $totalJobs, Pending: $totalPending, Ready to run / running: $totalStart, Done: $totalDone, Failed $totalFailed")
def compressedName(jobName: String) = jobName match {
case numberRegex(name, number) => (name, number.toInt)
if (follow) {
Thread.sleep(refreshTime * 1000)
writePipelineStatus(deps,
outputDir,
jobDone,
jobFailed,
follow,
refreshTime,
plots,
compressPlots,
pimHost,
pimRunId,
pimStatus ++ putStatuses.getOrElse(Map()))
}
}
def writeGraphvizFile(jobsDeps: Map[String, List[String]],
outputFile: File,
def writeGraphvizFile(outputFile: File,
jobDone: Set[String],
jobFailed: Set[String],
jobsStart: Set[String],
deps: Deps,
png: Boolean = true,
svg: Boolean = true,
compress: Boolean = false): Future[Unit] = Future {
val graph = if (compress) compressOnType(jobsDeps) else jobsDeps
compress: Boolean = false,
main: Boolean = false): Future[Unit] = Future {
val graph =
if (compress && main) deps.compressOnType(main = true)
else if (compress) deps.compressOnType()
else if (main) deps.getMainDeps
else deps.jobs.map(x => x._1 -> x._2.dependsOnJobs)
val writer = new PrintWriter(outputFile)
writer.println("digraph graphname {")