Commit dd6dc7b7 authored by Peter van 't Hof's avatar Peter van 't Hof
Browse files

Aggregate threads and corememory for pipes

parent bdd9ce38
......@@ -84,16 +84,19 @@ trait BiopetCommandLineFunction extends CommandLineFunction with Configurable {
val firstOutput = try {
this.firstOutput
} catch {
case e:NullPointerException => null
case e: NullPointerException => null
}
pipesJobs.foreach(_.beforeGraph())
pipesJobs.foreach(_.internalBeforeGraph())
if (jobOutputFile == null && firstOutput != null)
jobOutputFile = new File(firstOutput.getAbsoluteFile.getParent, "." + firstOutput.getName + "." + configName + ".out")
if (threads == 0) threads = getThreads(defaultThreads)
if (threads == 0) threads = getThreads(defaultThreads) + pipesJobs.map(_.threads).sum
if (threads > 1) nCoresRequest = Option(threads)
_coreMemory = config("core_memory", default = defaultCoreMemory).asDouble + (0.5 * retry)
_coreMemory = config("core_memory", default = defaultCoreMemory + pipesJobs.map(_.coreMemeory).sum).asDouble + (0.5 * retry)
if (config.contains("memory_limit")) memoryLimit = config("memory_limit")
else memoryLimit = Some(_coreMemory * threads)
......@@ -311,6 +314,8 @@ trait BiopetCommandLineFunction extends CommandLineFunction with Configurable {
cmd
}
private[core] var pipesJobs: List[BiopetCommandLineFunction] = Nil
def requiredInput(prefix: String, arg: Either[File, BiopetCommandLineFunction]): String = {
arg match {
case Left(file) => {
......@@ -319,11 +324,7 @@ trait BiopetCommandLineFunction extends CommandLineFunction with Configurable {
}
case Right(cmd) => {
cmd._outputAsStdout = true
cmd.beforeGraph()
cmd.internalBeforeGraph()
this.beforeGraph()
this.internalBeforeGraph()
this.threads += cmd.threads
pipesJobs :+= cmd
try {
if (cmd.outputs != null) outputFiles ++= cmd.outputs
if (cmd.inputs != null) deps ++= cmd.inputs
......@@ -343,11 +344,7 @@ trait BiopetCommandLineFunction extends CommandLineFunction with Configurable {
}
case Right(cmd) => {
cmd._inputAsStdin = true
cmd.beforeGraph()
cmd.internalBeforeGraph()
this.beforeGraph()
this.internalBeforeGraph()
this.threads += cmd.threads
pipesJobs :+= cmd
try {
if (cmd.outputs != null) outputFiles ++= cmd.outputs
if (cmd.inputs != null) deps ++= cmd.inputs
......
......@@ -26,10 +26,10 @@ class BiopetPipe(val commands: List[BiopetCommandLineFunction]) extends BiopetCo
case e: Exception => Nil
}
pipesJobs :::= commands
override def beforeGraph() {
super.beforeGraph()
commands.foreach(_.beforeGraph())
commands.foreach(_.internalBeforeGraph())
stdoutFile = stdoutFile.map(_.getAbsoluteFile)
stdinFile = stdinFile.map(_.getAbsoluteFile)
......@@ -45,14 +45,8 @@ class BiopetPipe(val commands: List[BiopetCommandLineFunction]) extends BiopetCo
require(inputOutput.isEmpty, "File found as input and output in the same job, files: " + inputOutput.mkString(", "))
}
override def defaultCoreMemory = {
(for (command <- commands) yield {
val threads = command.getThreads(command.defaultThreads)
val totalThreads = defaultThreads
(threads.toDouble / totalThreads.toDouble) * command.defaultCoreMemory
}).sum
}
override def defaultThreads = commands.map(c => c.getThreads(c.defaultThreads)).sum
override def defaultCoreMemory = 0.0
override def defaultThreads = 0
val root: Configurable = commands.head.root
override def configName = commands.map(_.configName).mkString("-")
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment