Commit 8d5abea6 authored by pjvan_thof's avatar pjvan_thof

Better error catching

parent 1dc47660
......@@ -2,7 +2,14 @@ package nl.lumc.sasc.biopet.core.pipelinestatus
import java.io.File
import nl.lumc.sasc.biopet.core.pipelinestatus.PipelineStatus.logger
import nl.lumc.sasc.biopet.utils.ConfigUtils
import nl.lumc.sasc.biopet.utils.pim._
import play.api.libs.ws.WSResponse
import play.api.libs.ws.ahc.AhcWSClient
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
/**
* This class can store the deps.json from a pipeline that stores all jobs and files and the connections
......@@ -45,6 +52,45 @@ case class Deps(jobs: Map[String, Job], files: Array[Map[String, Any]]) {
}.distinct
}
/** This publish the graph to a pim host */
def publishCompressedGraphToPim(host: String, runId: String)(
implicit ws: AhcWSClient): Future[WSResponse] = {
val links: List[Link] = this
.compressOnType()
.flatMap(x => x._2.map(y => Link("link", y, "output", x._1, "input", "test")))
.toList
val run = Run(
runId,
Network("graph",
Nil,
this
.compressOnType()
.map(
x =>
Node(x._1,
"root",
List(Port("input", "input")),
List(Port("output", "output")),
"test"))
.toList,
links),
"Biopet pipeline",
"biopet"
)
val request = ws
.url(s"$host/api/runs/")
.withHeaders("Accept" -> "application/json", "Content-Type" -> "application/json")
.put(run.toString)
request.onFailure { case e => logger.warn("Post workflow did fail", e) }
request.onSuccess {
case r if r.status == 200 =>
logger.debug(r)
case r => logger.warn(r)
}
request
}
}
object Deps {
......
......@@ -99,6 +99,8 @@ object PipelineStatus extends ToolCommand {
if (cmdArgs.pimHost.isDefined) {
require(pimRunId.isDefined, "Could not auto-generate Pim run ID, please supply --pimRunId")
logger.info(s"Status will be pushed to ${cmdArgs.pimHost.get}/run/${pimRunId.get}")
Await.result(deps.publishCompressedGraphToPim(cmdArgs.pimHost.get, pimRunId.get),
Duration.Inf)
}
writePipelineStatus(
......@@ -127,16 +129,18 @@ object PipelineStatus extends ToolCommand {
new File(graphDir, "deps.json")
}
def writePipelineStatus(deps: Deps,
outputDir: File,
alreadyDone: Set[String] = Set(),
alreadyFailed: Set[String] = Set(),
follow: Boolean = false,
refreshTime: Int = 30,
plots: Boolean = false,
compressPlots: Boolean = true,
pimHost: Option[String] = None,
pimRunId: Option[String] = None)(implicit ws: AhcWSClient): Unit = {
def writePipelineStatus(
deps: Deps,
outputDir: File,
alreadyDone: Set[String] = Set(),
alreadyFailed: Set[String] = Set(),
follow: Boolean = false,
refreshTime: Int = 30,
plots: Boolean = false,
compressPlots: Boolean = true,
pimHost: Option[String] = None,
pimRunId: Option[String] = None,
pimStatus: Map[String, JobStatus.Value] = Map())(implicit ws: AhcWSClient): Unit = {
val jobDone = jobsDone(deps)
val jobFailed = jobsFailed(deps, jobDone)
......@@ -200,63 +204,38 @@ object PipelineStatus extends ToolCommand {
futures.foreach(x => Await.result(x, Duration.Inf))
val runId =
if (pimHost.isDefined)
Some(
pimRunId.getOrElse(throw new IllegalStateException(
"Pim requires a run id, please supply this with --pimRunId")))
else None
pimHost.foreach { host =>
val links: List[Link] = deps
.compressOnType()
.flatMap(x => x._2.map(y => Link("link", y, "output", x._1, "input", "test")))
.toList
val run = Run(
runId.get,
Network("graph",
Nil,
deps
.compressOnType()
.map(
x =>
Node(x._1,
"root",
List(Port("input", "input")),
List(Port("output", "output")),
"test"))
.toList,
links),
"Biopet pipeline",
"biopet"
)
val request = ws
.url(s"$host/api/runs/")
.withHeaders("Accept" -> "application/json", "Content-Type" -> "application/json")
.put(run.toString)
Await.result(request, Duration.Inf) match {
case r if r.status == 200 => logger.debug(r)
case r => logger.warn(r)
}
val putStatuses = pimHost.map { host =>
val runId = pimRunId.getOrElse(
throw new IllegalStateException(
"Pim requires a run id, please supply this with --pimRunId"))
val futures = for (job <- deps.jobs) yield {
val futures = (for (job <- deps.jobs) yield {
val status = job._1 match {
case n if jobsStart.contains(n) => JobStatus.running
case n if jobFailed.contains(n) => JobStatus.failed
case n if jobDone.contains(n) => JobStatus.success
case _ => JobStatus.idle
}
ws.url(s"$host/api/runs/test/jobs/" + job._1)
.withHeaders("Accept" -> "application/json", "Content-Type" -> "application/json")
.put(PimJob(job._1, Job.compressedName(job._1)._1, runId.get, "none", status).toString)
}
if (!pimStatus.get(job._1).contains(status)) {
Thread.sleep(20)
Some(
ws.url(s"$host/api/runs/test/jobs/" + job._1)
.withHeaders("Accept" -> "application/json", "Content-Type" -> "application/json")
.put(PimJob(job._1, Job.compressedName(job._1)._1, runId, "none", status).toString)
.map(job._1 -> (_, status)))
} else None
}).flatten
if (logger.isDebugEnabled) futures.foreach(_.onComplete(logger.debug(_)))
val results = Await.result(Future.sequence(futures), Duration.Inf)
results.foreach {
case s if s.status == 200 => logger.debug(s)
case s => logger.warn(s)
futures.foreach { f =>
f.onFailure { case e => logger.warn("Post job did fail", e) }
f.onSuccess {
case r if r._2._1.status == 200 => logger.debug(r)
case r => logger.warn("Post job did fail: " + r)
}
}
Await.ready(Future.sequence(futures), Duration.Inf)
futures.flatMap(_.value.flatMap(_.toOption)).map(x => x._1 -> x._2._2).toMap
}
logger.info(
s"Total job: $totalJobs, Pending: $totalPending, Ready to run / running: $totalStart, Done: $totalDone, Failed $totalFailed")
......@@ -272,7 +251,8 @@ object PipelineStatus extends ToolCommand {
plots,
compressPlots,
pimHost,
runId)
pimRunId,
pimStatus ++ putStatuses.getOrElse(Map()))
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment