Commit da1870e4 authored by Peter van 't Hof's avatar Peter van 't Hof

Made a limit on number of processes in Sys

parent c6b8bddc
...@@ -17,9 +17,9 @@ package nl.lumc.sasc.biopet.core.report ...@@ -17,9 +17,9 @@ package nl.lumc.sasc.biopet.core.report
import java.io._ import java.io._
import nl.lumc.sasc.biopet.core.ToolCommandFunction import nl.lumc.sasc.biopet.core.ToolCommandFunction
import nl.lumc.sasc.biopet.utils.summary.db.Schema.{Library, Module, Pipeline, Sample} import nl.lumc.sasc.biopet.utils.summary.db.Schema.{ Library, Module, Pipeline, Sample }
import nl.lumc.sasc.biopet.utils.summary.db.SummaryDb import nl.lumc.sasc.biopet.utils.summary.db.SummaryDb
import nl.lumc.sasc.biopet.utils.{IoUtils, Logging, ToolCommand} import nl.lumc.sasc.biopet.utils.{ IoUtils, Logging, ToolCommand }
import org.broadinstitute.gatk.utils.commandline.Input import org.broadinstitute.gatk.utils.commandline.Input
import org.fusesource.scalate.TemplateEngine import org.fusesource.scalate.TemplateEngine
......
...@@ -14,13 +14,13 @@ ...@@ -14,13 +14,13 @@
*/ */
package nl.lumc.sasc.biopet.tools package nl.lumc.sasc.biopet.tools
import java.io.{File, PrintWriter} import java.io.{ File, PrintWriter }
import java.util import java.util
import htsjdk.variant.vcf.VCFFileReader import htsjdk.variant.vcf.VCFFileReader
import nl.lumc.sasc.biopet.utils.ToolCommand import nl.lumc.sasc.biopet.utils.ToolCommand
import nl.lumc.sasc.biopet.utils.rscript.ScatterPlot import nl.lumc.sasc.biopet.utils.rscript.ScatterPlot
import nl.lumc.sasc.biopet.utils.intervals.{BedRecord, BedRecordList} import nl.lumc.sasc.biopet.utils.intervals.{ BedRecord, BedRecordList }
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
import scala.collection.mutable import scala.collection.mutable
......
...@@ -14,10 +14,10 @@ ...@@ -14,10 +14,10 @@
*/ */
package nl.lumc.sasc.biopet.tools.bamstats package nl.lumc.sasc.biopet.tools.bamstats
import java.io.{File, IOException, PrintWriter} import java.io.{ File, IOException, PrintWriter }
import nl.lumc.sasc.biopet.utils.rscript.LinePlot import nl.lumc.sasc.biopet.utils.rscript.LinePlot
import nl.lumc.sasc.biopet.utils.{Logging, sortAnyAny} import nl.lumc.sasc.biopet.utils.{ Logging, sortAnyAny }
import scala.collection.mutable import scala.collection.mutable
import scala.concurrent.ExecutionContext import scala.concurrent.ExecutionContext
......
...@@ -19,7 +19,9 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SO ...@@ -19,7 +19,9 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SO
import nl.lumc.sasc.biopet.utils.Logging import nl.lumc.sasc.biopet.utils.Logging
import scala.concurrent.{ ExecutionContext, Future, Promise } import scala.collection.parallel.mutable.ParMap
import scala.concurrent.duration.Duration
import scala.concurrent._
import scala.sys.process.{ Process, ProcessLogger } import scala.sys.process.{ Process, ProcessLogger }
import scala.util.Try import scala.util.Try
...@@ -72,6 +74,10 @@ object Sys extends Sys { ...@@ -72,6 +74,10 @@ object Sys extends Sys {
trait Sys { trait Sys {
import Sys._ import Sys._
private val cache: ParMap[Seq[String], AsyncExecResult] = ParMap()
var maxRunningProcesses: Int = 5
def exec(cmd: String): ExecResult = exec(cmd.split(" ")) def exec(cmd: String): ExecResult = exec(cmd.split(" "))
/** /**
...@@ -102,7 +108,22 @@ trait Sys { ...@@ -102,7 +108,22 @@ trait Sys {
* @return [[AsyncExecResult]] * @return [[AsyncExecResult]]
*/ */
def execAsync(cmd: Seq[String])(implicit ec: ExecutionContext): AsyncExecResult = { def execAsync(cmd: Seq[String])(implicit ec: ExecutionContext): AsyncExecResult = {
new AsyncExecResult { while (cache.size >= maxRunningProcesses) {
for ((cmd, c) <- cache.toList) {
val results = Option(c)
if (!results.map(_.isRunning).getOrElse(true)) try {
cache -= cmd
} catch {
case e: NullPointerException =>
}
else try {
results.foreach(x => Await.ready(x.get, Duration.fromNanos(100000)))
} catch {
case e: TimeoutException =>
}
}
}
val results = new AsyncExecResult {
val (fut, cancelFut) = runAsync(cmd) val (fut, cancelFut) = runAsync(cmd)
override def map[T](f: ExecResult => T): Future[T] = fut.map(f) override def map[T](f: ExecResult => T): Future[T] = fut.map(f)
...@@ -117,6 +138,8 @@ trait Sys { ...@@ -117,6 +138,8 @@ trait Sys {
override def get: Future[ExecResult] = fut override def get: Future[ExecResult] = fut
} }
cache += cmd -> results
results
} }
// helper for 'execAsync' - runs the given cmd asynchronous. // helper for 'execAsync' - runs the given cmd asynchronous.
......
...@@ -14,16 +14,14 @@ ...@@ -14,16 +14,14 @@
*/ */
package nl.lumc.sasc.biopet.utils.rscript package nl.lumc.sasc.biopet.utils.rscript
import java.io.{File, FileOutputStream} import java.io.{ File, FileOutputStream }
import nl.lumc.sasc.biopet.utils.Logging import nl.lumc.sasc.biopet.utils.Logging
import nl.lumc.sasc.biopet.utils.config.Configurable import nl.lumc.sasc.biopet.utils.config.Configurable
import nl.lumc.sasc.biopet.utils.process.Sys import nl.lumc.sasc.biopet.utils.process.Sys
import nl.lumc.sasc.biopet.utils.process.Sys.AsyncExecResult
import scala.collection.parallel.mutable.ParSeq
import scala.concurrent.duration.Duration import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext} import scala.concurrent.{ Await, ExecutionContext }
import scala.sys.process.ProcessLogger import scala.sys.process.ProcessLogger
/** /**
...@@ -66,8 +64,6 @@ trait Rscript extends Configurable { ...@@ -66,8 +64,6 @@ trait Rscript extends Configurable {
} }
} }
private val cache: ParSeq[AsyncExecResult] = Nil
/** /**
* Execute rscript on local system * Execute rscript on local system
* @param logger How to handle stdout and stderr * @param logger How to handle stdout and stderr
...@@ -77,14 +73,8 @@ trait Rscript extends Configurable { ...@@ -77,14 +73,8 @@ trait Rscript extends Configurable {
Logging.logger.info("Running: " + cmd.mkString(" ")) Logging.logger.info("Running: " + cmd.mkString(" "))
while (cache.size > 5) for (c <- cache) Thread.
val results = Sys.execAsync(cmd) val results = Sys.execAsync(cmd)
while(results.isRunning) {
Thread.sleep(1000)
}
val (exitcode, stdout, stderr) = Await.result(results.map(x => (x._1, x._2, x._3)), Duration.Inf) val (exitcode, stdout, stderr) = Await.result(results.map(x => (x._1, x._2, x._3)), Duration.Inf)
Logging.logger.info("stdout:\n" + stdout + "\n") Logging.logger.info("stdout:\n" + stdout + "\n")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment