Commit 710f37d2 authored by Peter van 't Hof's avatar Peter van 't Hof
Browse files

Fixing closing issue

parent ce794c01
......@@ -16,11 +16,14 @@ package nl.lumc.sasc.biopet.core
import java.io.{File, PrintWriter}
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import nl.lumc.sasc.biopet.core.pipelinestatus.{Deps, PipelineStatus}
import nl.lumc.sasc.biopet.core.summary.WriteSummary
import nl.lumc.sasc.biopet.utils.config.Configurable
import nl.lumc.sasc.biopet.utils.{ConfigUtils, Logging}
import org.broadinstitute.gatk.queue.function.{CommandLineFunction, QFunction}
import play.api.libs.ws.ahc.AhcWSClient
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
......@@ -48,7 +51,7 @@ object WriteDependencies extends Logging with Configurable {
* This method will generate a json file where information about job and file dependencies are stored
*
* @param functions This should be all functions that are given to the graph of Queue
* @param outputDir
* @param outputDir Output where the files will be placed
*/
def writeDependencies(functions: Seq[QFunction], outputDir: File): Unit = {
outputDir.mkdirs()
......@@ -60,17 +63,17 @@ object WriteDependencies extends Logging with Configurable {
case class QueueFile(file: File) {
private val inputJobs: ListBuffer[QFunction] = ListBuffer()
def addInputJob(function: QFunction) = inputJobs += function
def inputJobNames = inputJobs.toList.map(functionNames)
def addInputJob(function: QFunction): inputJobs.type = inputJobs += function
def inputJobNames: List[String] = inputJobs.toList.map(functionNames)
private val outputJobs: ListBuffer[QFunction] = ListBuffer()
def addOutputJob(function: QFunction) = {
def addOutputJob(function: QFunction): outputJobs.type = {
if (outputJobs.nonEmpty) logger.warn(s"File '$file' is found as output of multiple jobs")
outputJobs += function
}
def outputJobNames = outputJobs.toList.map(functionNames)
def outputJobNames: List[String] = outputJobs.toList.map(functionNames)
def getMap = {
def getMap: Map[String, Any] = {
val fileExist = file.exists()
if (!fileExist && outputJobs.isEmpty) {
if (errorOnMissingInput) Logging.addError(s"Input file does not exist: $file")
......@@ -86,7 +89,7 @@ object WriteDependencies extends Logging with Configurable {
)
}
def isIntermediate = outputJobs.exists(_.isIntermediate)
def isIntermediate: Boolean = outputJobs.exists(_.isIntermediate)
}
val files: mutable.Map[File, QueueFile] = mutable.Map()
......@@ -162,8 +165,15 @@ object WriteDependencies extends Logging with Configurable {
.spaces2)
writer.close()
PipelineStatus.writePipelineStatus(Deps.readDepsFile(outputFile), outputDir)
logger.info("done calculating dependencies")
implicit lazy val system = ActorSystem()
implicit lazy val materializer = ActorMaterializer()
implicit lazy val ws = AhcWSClient()
PipelineStatus.writePipelineStatus(Deps.readDepsFile(outputFile), outputDir)
ws.close()
system.terminate()
}
}
......@@ -15,7 +15,6 @@
package nl.lumc.sasc.biopet.core.pipelinestatus
import java.io.{File, PrintWriter}
import java.util.Date
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
......@@ -75,10 +74,6 @@ object PipelineStatus extends ToolCommand {
} text "Pim run Id to publish status to"
}
implicit lazy val system = ActorSystem()
implicit lazy val materializer = ActorMaterializer()
lazy val ws = AhcWSClient()
def main(args: Array[String]): Unit = {
logger.info("Start")
......@@ -86,6 +81,10 @@ object PipelineStatus extends ToolCommand {
val cmdArgs
: Args = argsParser.parse(args, Args()) getOrElse (throw new IllegalArgumentException)
implicit lazy val system = ActorSystem()
implicit lazy val materializer = ActorMaterializer()
implicit lazy val ws = AhcWSClient()
val depsFile = cmdArgs.depsFile.getOrElse(getDepsFileFromDir(cmdArgs.pipelineDir))
val deps = Deps.readDepsFile(depsFile)
......@@ -97,6 +96,11 @@ object PipelineStatus extends ToolCommand {
})
else None
if (cmdArgs.pimHost.isDefined) {
require(pimRunId.isDefined, "Could not auto genrate Pim run ID, please supply --pimRunId")
logger.info(s"Status will be pushed to ${cmdArgs.pimHost.get}/run/${pimRunId.get}")
}
writePipelineStatus(
deps,
cmdArgs.outputDir,
......@@ -108,7 +112,9 @@ object PipelineStatus extends ToolCommand {
pimRunId = pimRunId
)
logger.info("Done")
sys.exit()
ws.close()
system.terminate()
}
def getDepsFileFromDir(pipelineDir: File): File = {
......@@ -130,7 +136,7 @@ object PipelineStatus extends ToolCommand {
plots: Boolean = false,
compressPlots: Boolean = true,
pimHost: Option[String] = None,
pimRunId: Option[String] = None): Unit = {
pimRunId: Option[String] = None)(implicit ws: AhcWSClient): Unit = {
val jobDone = jobsDone(deps)
val jobFailed = jobsFailed(deps, jobDone)
......@@ -196,7 +202,12 @@ object PipelineStatus extends ToolCommand {
futures.foreach(x => Await.ready(x, Duration.Inf))
val runId = if (pimHost.isDefined) Some(pimRunId.getOrElse("biopet_" + new Date())) else None
val runId =
if (pimHost.isDefined)
Some(
pimRunId.getOrElse(throw new IllegalStateException(
"Pim requires a run id, please supply this with --pimRunId")))
else None
pimHost.foreach { host =>
val links: List[Link] = deps.compressOnType
.flatMap(x => x._2.map(y => Link("link", y, "output", x._1, "input", "test")))
......@@ -218,7 +229,7 @@ object PipelineStatus extends ToolCommand {
"Biopet pipeline",
"biopet"
)
println(run.toString)
val request = ws
.url(s"$host/api/runs/")
.withHeaders("Accept" -> "application/json", "Content-Type" -> "application/json")
......@@ -226,8 +237,7 @@ object PipelineStatus extends ToolCommand {
val content = Await.result(request, Duration.Inf)
println(content.status)
println(content.body)
println(content)
val futures = for (job <- deps.jobs) yield {
val status = job._1 match {
......@@ -240,7 +250,9 @@ object PipelineStatus extends ToolCommand {
.withHeaders("Accept" -> "application/json", "Content-Type" -> "application/json")
.put(PimJob(job._1, Job.compressedName(job._1)._1, runId.get, "none", status).toString)
}
Await.result(Future.sequence(futures), Duration.Inf).foreach(println)
if (logger.isDebugEnabled) futures.foreach(_.onComplete(logger.debug(_)))
val results = Await.result(Future.sequence(futures), Duration.Inf)
results.filter(_.status != 200).foreach(println)
}
logger.info(
s"Total job: $totalJobs, Pending: $totalPending, Ready to run / running: $totalStart, Done: $totalDone, Failed $totalFailed")
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment