Commit 2511965c authored by Peter van 't Hof's avatar Peter van 't Hof
Browse files

Added first version of pim support

parent cd17b601
......@@ -16,6 +16,7 @@ package nl.lumc.sasc.biopet.core
import java.io.{File, PrintWriter}
import nl.lumc.sasc.biopet.core.pipelinestatus.{Deps, PipelineStatus}
import nl.lumc.sasc.biopet.core.summary.WriteSummary
import nl.lumc.sasc.biopet.utils.config.Configurable
import nl.lumc.sasc.biopet.utils.{ConfigUtils, Logging}
......@@ -161,7 +162,7 @@ object WriteDependencies extends Logging with Configurable {
.spaces2)
writer.close()
PipelineStatus.writePipelineStatus(PipelineStatus.readDepsFile(outputFile), outputDir)
PipelineStatus.writePipelineStatus(Deps.readDepsFile(outputFile), outputDir)
logger.info("done calculating dependencies")
}
......
package nl.lumc.sasc.biopet.core.pipelinestatus
import java.io.File
import nl.lumc.sasc.biopet.utils.ConfigUtils
/**
* Created by pjvanthof on 24/06/2017.
*/
case class Deps(jobs: Map[String, Job], files: Array[Map[String, Any]]) {
lazy val compressOnType: Map[String, List[String]] = {
Deps.compressOnType(jobs.map(x => x._1 -> x._2.dependsOnJobs))
}
def getMainDependencies(jobName: String): List[String] = {
val job = this.jobs(jobName)
val dependencies = job.dependsOnJobs match {
case l: List[_] => l.map(_.toString)
}
dependencies.flatMap { dep =>
if (this.jobs(dep).mainJob) List(dep)
else getMainDependencies(dep)
}.distinct
}
}
object Deps {
def readDepsFile(depsFile: File): Deps = {
val deps = ConfigUtils.fileToConfigMap(depsFile)
val jobs =
ConfigUtils.any2map(deps("jobs")).map(x => x._1 -> new Job(x._1, ConfigUtils.any2map(x._2)))
val files = ConfigUtils.any2list(deps("files")).map(x => ConfigUtils.any2map(x)).toArray
Deps(jobs, files)
}
def compressOnType(jobs: Map[String, List[String]]): Map[String, List[String]] = {
val set = for ((job, deps) <- jobs.toSet; dep <- deps) yield {
(Job.compressedName(job)._1, Job.compressedName(dep)._1)
}
// This will collapse a Set[(String, String)] to a Map[String, List[String]]
set.groupBy(_._1).map(x => x._1 -> x._2.map(_._2).toList) ++ jobs
.filter(_._2.isEmpty)
.map(job => Job.compressedName(job._1)._1 -> Nil)
}
}
package nl.lumc.sasc.biopet.core.pipelinestatus
import java.io.File
import nl.lumc.sasc.biopet.utils.ConfigUtils
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.matching.Regex
/**
* Created by pjvanthof on 24/06/2017.
*/
class Job(val name: String, map: Map[String, Any]) {
def doneAtStart: Boolean = ConfigUtils.any2boolean(map("done_at_start"))
def failFiles: List[File] = ConfigUtils.any2fileList(map("fail_files"))
def doneFiles: List[File] = ConfigUtils.any2fileList(map("done_files"))
def outputUsedByJobs: List[String] = ConfigUtils.any2stringList(map("output_used_by_jobs"))
def dependsOnJobs: List[String] = ConfigUtils.any2stringList(map("depends_on_jobs"))
def stdoutFile = new File(ConfigUtils.any2string(map("stdout_file")))
def outputsFiles: List[File] = ConfigUtils.any2fileList(map("outputs"))
def inputFiles: List[File] = ConfigUtils.any2fileList(map("inputs"))
def mainJob: Boolean = ConfigUtils.any2boolean(map("main_job"))
def intermediate: Boolean = ConfigUtils.any2boolean(map("intermediate"))
def isDone: Future[Boolean] = Future { doneFiles.forall(_.exists()) }
def isFailed: Future[Boolean] = Future { failFiles.exists(_.exists()) }
def compressedName: (String, Int) = Job.compressedName(name)
}
object Job {
val numberRegex: Regex = """(.*)_(\d*)$""".r
def compressedName(jobName: String): (String, Int) = jobName match {
case Job.numberRegex(name, number) => (name, number.toInt)
}
}
......@@ -12,15 +12,20 @@
* license; For commercial users or users who do not want to follow the AGPL
* license, please contact us to obtain a separate license.
*/
package nl.lumc.sasc.biopet.core
package nl.lumc.sasc.biopet.core.pipelinestatus
import java.io.{File, PrintWriter}
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import nl.lumc.sasc.biopet.utils.pim._
import nl.lumc.sasc.biopet.utils.pim.{Job => PimJob}
import nl.lumc.sasc.biopet.utils.{ConfigUtils, ToolCommand}
import play.api.libs.ws.ahc.AhcWSClient
import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import scala.sys.process.Process
/**
......@@ -33,7 +38,8 @@ object PipelineStatus extends ToolCommand {
follow: Boolean = false,
refreshTime: Int = 30,
complatePlots: Boolean = false,
compressPlots: Boolean = true)
compressPlots: Boolean = true,
pimHost: Option[String] = None)
extends AbstractArgs
class OptParser extends AbstractOptParser {
......@@ -46,21 +52,28 @@ object PipelineStatus extends ToolCommand {
opt[File]("depsFile") maxOccurs 1 valueName "<file>" action { (x, c) =>
c.copy(depsFile = Some(x))
} text "Location of deps file, not required"
opt[Unit]('f', "follow") maxOccurs 1 action { (x, c) =>
opt[Unit]('f', "follow") maxOccurs 1 action { (_, c) =>
c.copy(follow = true)
} text "This will follow a run"
opt[Int]("refresh") maxOccurs 1 action { (x, c) =>
c.copy(refreshTime = x)
} text "Time to check again, default set on 30 seconds"
opt[Unit]("completePlots") maxOccurs 1 action { (x, c) =>
opt[Unit]("completePlots") maxOccurs 1 action { (_, c) =>
c.copy(complatePlots = true)
} text "Add complete plots, this is disabled because of performance. " +
"Complete plots does show each job separated, while compressed plots collapse all jobs of the same type together."
opt[Unit]("skipCompressPlots") maxOccurs 1 action { (x, c) =>
opt[Unit]("skipCompressPlots") maxOccurs 1 action { (_, c) =>
c.copy(compressPlots = false)
} text "Disable compressed plots. By default compressed plots are enabled."
opt[String]("pimHost") maxOccurs 1 action { (x, c) =>
c.copy(pimHost = Some(x))
} text "Pim host to publish status to"
}
implicit lazy val system = ActorSystem()
implicit lazy val materializer = ActorMaterializer()
lazy val ws = AhcWSClient()
def main(args: Array[String]): Unit = {
logger.info("Start")
......@@ -69,14 +82,16 @@ object PipelineStatus extends ToolCommand {
: Args = argsParser.parse(args, Args()) getOrElse (throw new IllegalArgumentException)
val depsFile = cmdArgs.depsFile.getOrElse(getDepsFileFromDir(cmdArgs.pipelineDir))
val deps = readDepsFile(depsFile)
val deps = Deps.readDepsFile(depsFile)
writePipelineStatus(deps,
cmdArgs.outputDir,
follow = cmdArgs.follow,
refreshTime = cmdArgs.refreshTime,
plots = cmdArgs.complatePlots,
compressPlots = cmdArgs.compressPlots)
compressPlots = cmdArgs.compressPlots,
pimHost = cmdArgs.pimHost)
logger.info("Done")
sys.exit()
}
def getDepsFileFromDir(pipelineDir: File): File = {
......@@ -89,18 +104,6 @@ object PipelineStatus extends ToolCommand {
new File(graphDir, "deps.json")
}
case class Deps(jobs: Map[String, Job], files: Array[Map[String, Any]])
def readDepsFile(depsFile: File) = {
val deps = ConfigUtils.fileToConfigMap(depsFile)
val jobs =
ConfigUtils.any2map(deps("jobs")).map(x => x._1 -> new Job(x._1, ConfigUtils.any2map(x._2)))
val files = ConfigUtils.any2list(deps("files")).map(x => ConfigUtils.any2map(x)).toArray
Deps(jobs, files)
}
def writePipelineStatus(deps: Deps,
outputDir: File,
alreadyDone: Set[String] = Set(),
......@@ -108,7 +111,8 @@ object PipelineStatus extends ToolCommand {
follow: Boolean = false,
refreshTime: Int = 30,
plots: Boolean = false,
compressPlots: Boolean = true): Unit = {
compressPlots: Boolean = true,
pimHost: Option[String] = None): Unit = {
val jobDone = jobsDone(deps)
val jobFailed = jobsFailed(deps, jobDone)
......@@ -142,8 +146,7 @@ object PipelineStatus extends ToolCommand {
compress = true)
val mainJobs = deps.jobs.filter(_._2.mainJob == true).map {
case (name, job) =>
name -> getMainDependencies(name, deps)
case (name, _) => name -> deps.getMainDependencies(name)
}
val mainJobsWriter = new PrintWriter(new File(outputDir, s"main_jobs.json"))
......@@ -175,41 +178,41 @@ object PipelineStatus extends ToolCommand {
futures.foreach(x => Await.ready(x, Duration.Inf))
pimHost.foreach { host =>
val links: List[Link] = deps.compressOnType.flatMap(x => x._2.map(y => Link("test", y, "output", x._1, "input", "test"))).toList
val run = Run("test", Network(
"test", Nil, deps.compressOnType.map(x => Node(x._1, "root", List(Port("input", "test")), List(Port("output", "test")), "test")).toList,
links), "test", "biopet")
println(run.toString)
val request = ws.url(s"$host/api/runs/")
.withHeaders("Accept" -> "application/json", "Content-Type" -> "application/json")
.put(run.toString)
val content = Await.result(request, Duration.Inf)
println(content.status)
println(content.body)
val futures = for (job <- deps.jobs) yield {
val status = job._1 match {
case n if jobsStart.contains(n) => JobStatus.running
case n if jobFailed.contains(n) => JobStatus.failed
case n if jobDone.contains(n) => JobStatus.success
case _ => JobStatus.idle
}
ws.url(s"$host/api/runs/test/jobs/" + job._1)
.withHeaders("Accept" -> "application/json", "Content-Type" -> "application/json")
.put(PimJob(job._1, Job.compressedName(job._1)._1, "test", "test", status).toString)
}
Await.result(Future.sequence(futures), Duration.Inf).foreach(println)
}
logger.info(
s"Total job: ${totalJobs}, Pending: ${totalPending}, Ready to run / running: ${totalStart}, Done: ${totalDone}, Failed ${totalFailed}")
s"Total job: $totalJobs, Pending: $totalPending, Ready to run / running: $totalStart, Done: $totalDone, Failed $totalFailed")
if (follow) {
Thread.sleep(refreshTime * 1000)
writePipelineStatus(deps, outputDir, jobDone, jobFailed, follow)
}
}
def getMainDependencies(jobName: String, deps: Deps): List[String] = {
val job = deps.jobs(jobName)
val dependencies = job.dependsOnJobs match {
case l: List[_] => l.map(_.toString)
writePipelineStatus(deps, outputDir, jobDone, jobFailed, follow, refreshTime, plots, compressPlots, pimHost)
}
dependencies.flatMap { dep =>
deps.jobs(dep).mainJob match {
case true => List(dep)
case false => getMainDependencies(dep, deps)
}
}.distinct
}
val numberRegex = """(.*)_(\d*)$""".r
def compressOnType(jobs: Map[String, List[String]]): Map[String, List[String]] = {
val set = for ((job, deps) <- jobs.toSet; dep <- deps) yield {
(compressedName(job)._1, compressedName(dep)._1)
}
// This will collapse a Set[(String, String)] to a Map[String, List[String]]
set.groupBy(_._1).map(x => x._1 -> x._2.map(_._2).toList) ++ jobs
.filter(_._2.isEmpty)
.map(job => compressedName(job._1)._1 -> Nil)
}
def compressedName(jobName: String) = jobName match {
case numberRegex(name, number) => (name, number.toInt)
}
def writeGraphvizFile(jobsDeps: Map[String, List[String]],
......@@ -221,7 +224,7 @@ object PipelineStatus extends ToolCommand {
png: Boolean = true,
svg: Boolean = true,
compress: Boolean = false): Future[Unit] = Future {
val graph = if (compress) compressOnType(jobsDeps) else jobsDeps
val graph = if (compress) deps.compressOnType else jobsDeps
val writer = new PrintWriter(outputFile)
writer.println("digraph graphname {")
......@@ -229,27 +232,26 @@ object PipelineStatus extends ToolCommand {
case (job, jobDeps) =>
// Writing color of node
val compressTotal =
if (compress) Some(deps.jobs.keys.filter(compressedName(_)._1 == job)) else None
if (compress) Some(deps.jobs.keys.filter(Job.compressedName(_)._1 == job)) else None
val compressDone =
if (compress) Some(jobDone.filter(compressedName(_)._1 == job)) else None
if (compress) Some(jobDone.filter(Job.compressedName(_)._1 == job)) else None
val compressFailed =
if (compress) Some(jobFailed.filter(compressedName(_)._1 == job)) else None
if (compress) Some(jobFailed.filter(Job.compressedName(_)._1 == job)) else None
val compressStart =
if (compress) Some(jobsStart.filter(compressedName(_)._1 == job)) else None
if (compress) Some(jobsStart.filter(Job.compressedName(_)._1 == job)) else None
val compressIntermediate =
if (compress)
Some(deps.jobs.filter(x => compressedName(x._1)._1 == job).forall(_._2.intermediate))
Some(deps.jobs.filter(x => Job.compressedName(x._1)._1 == job).forall(_._2.intermediate))
else None
if (compress) {
val pend = compressTotal.get.size - compressFailed.get
.filterNot(compressStart.get.contains(_))
val pend = compressTotal.get.size - compressFailed.get.diff(compressStart.get)
.size - compressStart.get.size - compressDone.get.size
writer.println(s""" $job [label = "$job
|Total: ${compressTotal.get.size}
|Fail: ${compressFailed.get.size}
|Pend:${pend}
|Run: ${compressStart.get.filterNot(compressFailed.get.contains(_)).size}
|Pend:$pend
|Run: ${compressStart.get.diff(compressFailed.get).size}
|Done: ${compressDone.get.size}"]""".stripMargin)
}
......@@ -263,16 +265,16 @@ object PipelineStatus extends ToolCommand {
// Dashed lined for intermediate jobs
if ((deps.jobs.contains(job) && deps
.jobs(job)
.intermediate) || (compressIntermediate == Some(true)))
.intermediate) || compressIntermediate.contains(true))
writer.println(s" $job [style = dashed]")
// Writing Node deps
jobDeps.foreach { dep =>
if (compress) {
val depsNames = deps.jobs
.filter(x => compressedName(x._1)._1 == dep)
.filter(_._2.outputUsedByJobs.exists(x => compressedName(x)._1 == job))
.map(x => x._1 -> x._2.outputUsedByJobs.filter(x => compressedName(x)._1 == job))
.filter(x => Job.compressedName(x._1)._1 == dep)
.filter(_._2.outputUsedByJobs.exists(x => Job.compressedName(x)._1 == job))
.map(x => x._1 -> x._2.outputUsedByJobs.filter(x => Job.compressedName(x)._1 == job))
val total = depsNames.size
val done = depsNames
.map(x => x._2.exists(y => jobDone.contains(x._1)))
......@@ -342,23 +344,4 @@ object PipelineStatus extends ToolCommand {
f.map(x => x._1 -> Await.result(x._2, Duration.Inf)).filter(_._2).keySet ++ alreadyFailed
}
class Job(val name: String, map: Map[String, Any]) {
def doneAtStart: Boolean = ConfigUtils.any2boolean(map("done_at_start"))
def failFiles = ConfigUtils.any2fileList(map("fail_files"))
def doneFiles = ConfigUtils.any2fileList(map("done_files"))
def outputUsedByJobs = ConfigUtils.any2stringList(map("output_used_by_jobs"))
def dependsOnJobs = ConfigUtils.any2stringList(map("depends_on_jobs"))
def stdoutFile = new File(ConfigUtils.any2string(map("stdout_file")))
def outputsFiles = ConfigUtils.any2fileList(map("outputs"))
def inputFiles = ConfigUtils.any2fileList(map("inputs"))
def mainJob = ConfigUtils.any2boolean(map("main_job"))
def intermediate = ConfigUtils.any2boolean(map("intermediate"))
def isDone: Future[Boolean] = Future { doneFiles.forall(_.exists()) }
def isFailed: Future[Boolean] = Future { failFiles.exists(_.exists()) }
}
}
......@@ -21,6 +21,7 @@ import org.scalatest.Matchers
import org.scalatest.testng.TestNGSuite
import org.testng.annotations.Test
import PipelineStatusTest.Status
import nl.lumc.sasc.biopet.core.pipelinestatus.{Deps, PipelineStatus}
import nl.lumc.sasc.biopet.utils.IoUtils._
import org.apache.commons.io.FileUtils
......@@ -103,7 +104,7 @@ class PipelineStatusTest extends TestNGSuite with Matchers {
val depsFile = File.createTempFile("deps.", ".json")
depsFile.deleteOnExit()
PipelineStatusTest.writeDeps(depsFile, new File("/tmp"))
val deps = PipelineStatus.readDepsFile(depsFile)
val deps = Deps.readDepsFile(depsFile)
deps.jobs.size shouldBe 3
deps.files.length shouldBe 5
......
......@@ -14,7 +14,7 @@
*/
package nl.lumc.sasc.biopet
import nl.lumc.sasc.biopet.core.PipelineStatus
import nl.lumc.sasc.biopet.core.pipelinestatus.PipelineStatus
import nl.lumc.sasc.biopet.utils.{BiopetExecutable, MainCommand}
object BiopetExecutableMain extends BiopetExecutable {
......
......@@ -32,8 +32,8 @@
<dependencies>
<dependency>
<groupId>com.typesafe.play</groupId>
<artifactId>play-ws_2.10</artifactId>
<version>2.4.11</version>
<artifactId>play-ws_2.11</artifactId>
<version>2.5.15</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
......
package nl.lumc.sasc.biopet.utils.pim
import akka.actor.ActorSystem
import com.typesafe.config.ConfigFactory
import play.api.libs.ws._
import play.api.libs.ws.WSClient
import scala.concurrent.Future
import argonaut.Json
import nl.lumc.sasc.biopet.utils.ConfigUtils
import play.api.Play.current
import scala.concurrent.ExecutionContext.Implicits.global
/**
* Created by pjvanthof on 17/03/2017.
......@@ -17,14 +9,17 @@ import scala.concurrent.ExecutionContext.Implicits.global
trait PimClasses {
def toMap: Map[String, Any]
def toJson = ConfigUtils.mapToJson(toMap)
def toJson: Json = ConfigUtils.mapToJson(toMap)
override def toString: String = toJson.nospaces
}
case class Run(id: String ,
case class Run(id: String,
network: Network,
description: String,
workflowEngine: String,
collapse: Boolean = false) extends PimClasses {
collapse: Boolean = false)
extends PimClasses {
def toMap = Map(
"id" -> id,
"network" -> network.toMap,
......@@ -34,7 +29,8 @@ case class Run(id: String ,
)
}
case class Network(description: String, groups: List[Group], nodes: List[Node], links: List[Link]) extends PimClasses {
case class Network(description: String, groups: List[Group], nodes: List[Node], links: List[Link])
extends PimClasses {
def toMap = Map(
"description" -> description,
"groups" -> groups.map(_.toMap),
......@@ -50,7 +46,12 @@ case class Group(description: String, id: String, parentGroup: String) extends P
"parent_group" -> parentGroup
)
}
case class Node(id: String, groupId: Group, inPorts: List[Port], outPorts: List[Port], nodeType: String) extends PimClasses {
case class Node(id: String,
groupId: String,
inPorts: List[Port],
outPorts: List[Port],
nodeType: String)
extends PimClasses {
def toMap = Map(
"id" -> id,
"group_id" -> groupId,
......@@ -59,7 +60,13 @@ case class Node(id: String, groupId: Group, inPorts: List[Port], outPorts: List[
"type" -> nodeType
)
}
case class Link(id:String, fromNode: String, fromPort: String, toNode: String, toPort: String, linkType: String) extends PimClasses {
case class Link(id: String,
fromNode: String,
fromPort: String,
toNode: String,
toPort: String,
linkType: String)
extends PimClasses {
def toMap = Map(
"id" -> id,
"from_node" -> fromNode,
......@@ -76,8 +83,13 @@ case class Port(id: String, description: String) extends PimClasses {
)
}
case class Job(id: String, nodeId: String, runId: String, sampleId: String, status: JobStatus.Value) extends PimClasses {
def toMap() = Map(
case class Job(id: String,
nodeId: String,
runId: String,
sampleId: String,
status: JobStatus.Value)
extends PimClasses {
def toMap = Map(
"id" -> id,
"node_id" -> nodeId,
"run_id" -> runId,
......@@ -86,26 +98,6 @@ case class Job(id: String, nodeId: String, runId: String, sampleId: String, stat
)
}
object JobStatus extends Enumeration {
val idle, running, success, failed = Value
}
object Pim {
def createRun(run: Run): Unit = {
// implicit val system = ActorSystem()
// system.registerOnTermination {
// System.exit(0)
// }
//implicit val materializer = ActorMaterializer()
// Create the standalone WS client
// no argument defaults to a AhcWSClientConfig created from
// "AhcWSClientConfigFactory.forConfig(ConfigFactory.load, this.getClass.getClassLoader)"
// val wsClient = WSClientConfigFactory.forConfig(ConfigFactory.load, this.getClass.getClassLoader)
// wsClient.url("http://www.google.com")
//
// println(wsClient.body)
}
}
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment