Rscript.scala 3.26 KB
Newer Older
Peter van 't Hof's avatar
Peter van 't Hof committed
1
2
3
4
5
6
7
8
9
10
/**
 * Biopet is built on top of GATK Queue for building bioinformatic
 * pipelines. It is mainly intended to support LUMC SHARK cluster which is running
 * SGE. But other types of HPC that are supported by GATK Queue (such as PBS)
 * should also be able to execute Biopet tools and pipelines.
 *
 * Copyright 2014 Sequencing Analysis Support Core - Leiden University Medical Center
 *
 * Contact us at: sasc@lumc.nl
 *
11
 * A dual licensing mode is applied. The source code within this project is freely available for non-commercial use under an AGPL
Peter van 't Hof's avatar
Peter van 't Hof committed
12
13
14
 * license; For commercial users or users who do not want to follow the AGPL
 * license, please contact us to obtain a separate license.
 */
15
16
package nl.lumc.sasc.biopet.utils.rscript

Peter van 't Hof's avatar
WIP    
Peter van 't Hof committed
17
import java.io.{File, FileOutputStream}
18
19
20

import nl.lumc.sasc.biopet.utils.Logging
import nl.lumc.sasc.biopet.utils.config.Configurable
21
import nl.lumc.sasc.biopet.utils.process.Sys
Peter van 't Hof's avatar
WIP    
Peter van 't Hof committed
22
import nl.lumc.sasc.biopet.utils.process.Sys.AsyncExecResult
23

Peter van 't Hof's avatar
WIP    
Peter van 't Hof committed
24
25
26
import scala.collection.parallel.mutable.ParSeq
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext}
Peter van 't Hof's avatar
Peter van 't Hof committed
27
import scala.sys.process.ProcessLogger
28
29

/**
Peter van 't Hof's avatar
Peter van 't Hof committed
30
31
 * Trait for rscripts, can be used to execute rscripts locally
 *
32
33
34
35
36
 * Created by pjvanthof on 13/09/15.
 */
trait Rscript extends Configurable {
  protected var script: File

Sander Bollen's avatar
Sander Bollen committed
37
  def rscriptExecutable: String = config("exe", default = "Rscript", namespace = "rscript")
38
39
40
41
42
43
44
45
46
47
48
49
50

  /** This is the defaul implementation, to add arguments override this */
  def cmd: Seq[String] = Seq(rscriptExecutable, script.getAbsolutePath)

  /**
   * If script not exist in file system it try to copy it from the jar
   * @param dir Directory to store temp script, if None or not given File.createTempFile is called
   */
  protected def checkScript(dir: Option[File] = None): Unit = {
    if (script.exists()) {
      script = script.getAbsoluteFile
    } else {
      val rScript: File = dir match {
Peter van 't Hof's avatar
Peter van 't Hof committed
51
52
        case Some(d) => new File(d, script.getName)
        case _ =>
53
54
55
          val file = File.createTempFile(script.getName, ".R")
          file.deleteOnExit()
          file
56
      }
Peter van 't Hof's avatar
Peter van 't Hof committed
57
      if (!rScript.getAbsoluteFile.getParentFile.exists) rScript.getParentFile.mkdirs
58
59
60
61
62
63
64
65
66
67
68

      val is = getClass.getResourceAsStream(script.getPath)
      val os = new FileOutputStream(rScript)

      org.apache.commons.io.IOUtils.copy(is, os)
      os.close()

      script = rScript
    }
  }

Peter van 't Hof's avatar
WIP    
Peter van 't Hof committed
69
70
  private val cache: ParSeq[AsyncExecResult] = Nil

71
72
73
74
  /**
   * Execute rscript on local system
   * @param logger How to handle stdout and stderr
   */
Peter van 't Hof's avatar
WIP    
Peter van 't Hof committed
75
  def runLocal(logger: ProcessLogger)(implicit ec: ExecutionContext): Unit = {
76
77
78
79
    checkScript()

    Logging.logger.info("Running: " + cmd.mkString(" "))

Peter van 't Hof's avatar
WIP    
Peter van 't Hof committed
80
81
82
83
84
85
86
87
88
    while (cache.size > 5) for (c <- cache) Thread.

    val results = Sys.execAsync(cmd)

    while(results.isRunning) {
      Thread.sleep(1000)
    }

    val (exitcode, stdout, stderr) = Await.result(results.map(x => (x._1, x._2, x._3)), Duration.Inf)
89
90
91
92
93

    Logging.logger.info("stdout:\n" + stdout + "\n")
    Logging.logger.info("stderr:\n" + stderr)

    Logging.logger.info(exitcode)
94
95
96
97
98
99
  }

  /**
   * Execute rscript on local system
   * Stdout and stderr will go to biopet logger
   */
Peter van 't Hof's avatar
WIP    
Peter van 't Hof committed
100
  def runLocal()(implicit ec: ExecutionContext): Unit = {
101
102
103
    runLocal(ProcessLogger(Logging.logger.info(_)))
  }
}