BiopetCommandLineFunctionTrait.scala 9.14 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/**
 * Biopet is built on top of GATK Queue for building bioinformatic
 * pipelines. It is mainly intended to support LUMC SHARK cluster which is running
 * SGE. But other types of HPC that are supported by GATK Queue (such as PBS)
 * should also be able to execute Biopet tools and pipelines.
 *
 * Copyright 2014 Sequencing Analysis Support Core - Leiden University Medical Center
 *
 * Contact us at: sasc@lumc.nl
 *
 * A dual licensing mode is applied. The source code within this project that are
 * not part of GATK Queue is freely available for non-commercial use under an AGPL
 * license; For commercial users or users who do not want to follow the AGPL
 * license, please contact us to obtain a separate license.
 */
16 17
package nl.lumc.sasc.biopet.core

Peter van 't Hof's avatar
Peter van 't Hof committed
18 19 20
import java.io.{ File, FileInputStream }
import java.security.MessageDigest

Peter van 't Hof's avatar
Peter van 't Hof committed
21
import nl.lumc.sasc.biopet.core.config.Configurable
Peter van 't Hof's avatar
Peter van 't Hof committed
22
import org.broadinstitute.gatk.queue.function.CommandLineFunction
23
import org.broadinstitute.gatk.utils.commandline.Input
Peter van 't Hof's avatar
Peter van 't Hof committed
24

25
import scala.collection.mutable
26
import scala.sys.process.{ Process, ProcessLogger }
27 28
import scala.util.matching.Regex

Peter van 't Hof's avatar
Peter van 't Hof committed
29
/** Biopet command line trait to auto check executable and cluster values */
30
trait BiopetCommandLineFunctionTrait extends CommandLineFunction with Configurable {
Peter van 't Hof's avatar
Peter van 't Hof committed
31
  analysisName = configName
bow's avatar
bow committed
32 33

  @Input(doc = "deps", required = false)
34
  var deps: List[File] = Nil
bow's avatar
bow committed
35

36
  var threads = 0
Peter van 't Hof's avatar
Peter van 't Hof committed
37
  def defaultThreads = 1
bow's avatar
bow committed
38

39
  var vmem: Option[String] = config("vmem")
Peter van 't Hof's avatar
Peter van 't Hof committed
40 41
  protected def defaultCoreMemory: Double = 1.0
  protected def defaultVmemFactor: Double = 1.4
42
  var vmemFactor: Double = config("vmem_factor", default = defaultVmemFactor)
Peter van 't Hof's avatar
Peter van 't Hof committed
43

Peter van 't Hof's avatar
Peter van 't Hof committed
44
  var residentFactor: Double = config("resident_factor", default = 1.2)
45 46

  private var coreMemory: Double = _
Peter van 't Hof's avatar
Peter van 't Hof committed
47

Peter van 't Hof's avatar
Peter van 't Hof committed
48
  var executable: String = _
bow's avatar
bow committed
49

Peter van 't Hof's avatar
Peter van 't Hof committed
50 51 52 53
  /**
   * Can override this method. This is executed just before the job is ready to run.
   * Can check on run time files from pipeline here
   */
54
  protected[core] def beforeCmd() {}
Peter van 't Hof's avatar
Peter van 't Hof committed
55

Peter van 't Hof's avatar
Peter van 't Hof committed
56
  /** Can override this method. This is executed after the script is done en queue starts to generate the graph */
57
  protected[core] def beforeGraph() {}
Peter van 't Hof's avatar
Peter van 't Hof committed
58

Peter van 't Hof's avatar
Peter van 't Hof committed
59
  /** Set default output file, threads and vmem for current job */
60
  override def freezeFieldValues() {
61 62
    preProcessExecutable()
    beforeGraph()
63
    if (jobOutputFile == null) jobOutputFile = new File(firstOutput.getAbsoluteFile.getParent, "." + firstOutput.getName + "." + configName + ".out")
bow's avatar
bow committed
64

65 66
    if (threads == 0) threads = getThreads(defaultThreads)
    if (threads > 1) nCoresRequest = Option(threads)
bow's avatar
bow committed
67

68 69 70 71 72
    coreMemory = config("core_memory", default = defaultCoreMemory).asDouble + (0.5 * retry)

    if (config.contains("memory_limit")) memoryLimit = config("memory_limit")
    else memoryLimit = Some(coreMemory * threads)

Peter van 't Hof's avatar
Peter van 't Hof committed
73
    if (config.contains("resident_limit")) residentLimit = config("resident_limit")
Peter van 't Hof's avatar
Peter van 't Hof committed
74
    else residentLimit = Some((coreMemory + (0.5 * retry)) * residentFactor)
75

76
    if (!config.contains("vmem")) vmem = Some((coreMemory * (vmemFactor + (0.5 * retry))) + "G")
Peter van 't Hof's avatar
Peter van 't Hof committed
77
    if (vmem.isDefined) jobResourceRequests :+= "h_vmem=" + vmem.get
78
    jobName = configName + ":" + (if (firstOutput != null) firstOutput.getName else jobOutputFile)
bow's avatar
bow committed
79

80 81
    super.freezeFieldValues()
  }
bow's avatar
bow committed
82

83 84
  var retry = 0

Peter van 't Hof's avatar
Peter van 't Hof committed
85 86
  override def setupRetry(): Unit = {
    super.setupRetry()
87
    if (vmem.isDefined) jobResourceRequests = jobResourceRequests.filterNot(_.contains("h_vmem="))
88
    logger.info("Auto raise memory on retry")
89 90
    retry += 1
    this.freeze()
Peter van 't Hof's avatar
Peter van 't Hof committed
91 92
  }

93 94 95
  /** can override this value is executable may not be converted to CanonicalPath */
  val executableToCanonicalPath = true

Peter van 't Hof's avatar
Peter van 't Hof committed
96 97 98
  /**
   * Checks executable. Follow full CanonicalPath, checks if it is existing and do a md5sum on it to store in job report
   */
99
  protected[core] def preProcessExecutable() {
100 101 102 103 104 105 106 107 108 109
    if (!BiopetCommandLineFunctionTrait.executableMd5Cache.contains(executable)) {
      try if (executable != null) {
        if (!BiopetCommandLineFunctionTrait.executableCache.contains(executable)) {
          val oldExecutable = executable
          val buffer = new StringBuffer()
          val cmd = Seq("which", executable)
          val process = Process(cmd).run(ProcessLogger(buffer.append(_)))
          if (process.exitValue == 0) {
            executable = buffer.toString
            val file = new File(executable)
110 111
            if (executableToCanonicalPath) executable = file.getCanonicalPath
            else executable = file.getAbsolutePath
112
          } else {
113
            BiopetQScript.addError("executable: '" + executable + "' not found, please check config")
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
          }
          BiopetCommandLineFunctionTrait.executableCache += oldExecutable -> executable
          BiopetCommandLineFunctionTrait.executableCache += executable -> executable
        } else {
          executable = BiopetCommandLineFunctionTrait.executableCache(executable)
        }

        if (!BiopetCommandLineFunctionTrait.executableMd5Cache.contains(executable)) {
          val is = new FileInputStream(executable)
          val cnt = is.available
          val bytes = Array.ofDim[Byte](cnt)
          is.read(bytes)
          is.close()
          val temp = MessageDigest.getInstance("MD5").digest(bytes).map("%02X".format(_)).mkString.toLowerCase
          BiopetCommandLineFunctionTrait.executableMd5Cache += executable -> temp
        }
      } catch {
        case ioe: java.io.IOException => logger.warn("Could not use 'which', check on executable skipped: " + ioe)
132 133
      }
    }
134 135
    val md5 = BiopetCommandLineFunctionTrait.executableMd5Cache.get(executable)
    addJobReportBinding("md5sum_exe", md5.getOrElse("None"))
136
  }
bow's avatar
bow committed
137

Peter van 't Hof's avatar
Peter van 't Hof committed
138
  /** executes checkExecutable method and fill job report */
139 140 141
  final protected def preCmdInternal() {
    preProcessExecutable()
    beforeCmd()
bow's avatar
bow committed
142

Peter van 't Hof's avatar
Peter van 't Hof committed
143 144
    addJobReportBinding("cores", nCoresRequest match {
      case Some(n) if n > 0 => n
145
      case _                => 1
Peter van 't Hof's avatar
Peter van 't Hof committed
146
    })
147
    addJobReportBinding("version", getVersion)
148
  }
bow's avatar
bow committed
149

Peter van 't Hof's avatar
Peter van 't Hof committed
150
  /** Command to get version of executable */
151
  protected def versionCommand: String = null
Peter van 't Hof's avatar
Peter van 't Hof committed
152 153

  /** Regex to get version from version command output */
Peter van 't Hof's avatar
Peter van 't Hof committed
154
  protected def versionRegex: Regex = null
Peter van 't Hof's avatar
Peter van 't Hof committed
155 156

  /** Allowed exit codes for the version command */
Peter van 't Hof's avatar
Peter van 't Hof committed
157
  protected def versionExitcode = List(0)
Peter van 't Hof's avatar
Peter van 't Hof committed
158 159

  /** Executes the version command */
160
  private[core] def getVersionInternal: Option[String] = {
161 162 163 164 165 166
    if (versionCommand == null || versionRegex == null) None
    else getVersionInternal(versionCommand, versionRegex)
  }

  /** Executes the version command */
  private[core] def getVersionInternal(versionCommand: String, versionRegex: Regex): Option[String] = {
167
    if (versionCommand == null || versionRegex == null) return None
168
    val exe = new File(versionCommand.trim.split(" ")(0))
169
    if (!exe.exists()) return None
170 171
    val stdout = new StringBuffer()
    val stderr = new StringBuffer()
172 173 174 175
    def outputLog = "Version command: \n" + versionCommand +
      "\n output log: \n stdout: \n" + stdout.toString +
      "\n stderr: \n" + stderr.toString
    val process = Process(versionCommand).run(ProcessLogger(stdout append _ + "\n", stderr append _ + "\n"))
176
    if (!versionExitcode.contains(process.exitValue())) {
177
      logger.warn("getVersion give exit code " + process.exitValue + ", version not found \n" + outputLog)
178
      return None
179
    }
180
    for (line <- stdout.toString.split("\n") ++ stderr.toString.split("\n")) {
bow's avatar
bow committed
181
      line match {
182
        case versionRegex(m) => return Some(m)
Peter van 't Hof's avatar
Peter van 't Hof committed
183
        case _               =>
184 185
      }
    }
186
    logger.warn("getVersion give a exit code " + process.exitValue + " but no version was found, executable correct? \n" + outputLog)
187
    None
188
  }
bow's avatar
bow committed
189

Peter van 't Hof's avatar
Peter van 't Hof committed
190
  /** Get version from cache otherwise execute the version command  */
191
  def getVersion: Option[String] = {
Peter van 't Hof's avatar
Peter van 't Hof committed
192
    if (!BiopetCommandLineFunctionTrait.executableCache.contains(executable))
193
      preProcessExecutable()
194 195 196 197 198 199
    if (!BiopetCommandLineFunctionTrait.versionCache.contains(versionCommand))
      getVersionInternal match {
        case Some(version) => BiopetCommandLineFunctionTrait.versionCache += versionCommand -> version
        case _             =>
      }
    BiopetCommandLineFunctionTrait.versionCache.get(versionCommand)
Peter van 't Hof's avatar
Peter van 't Hof committed
200 201
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
202
  def getThreads: Int = getThreads(defaultThreads)
Peter van 't Hof's avatar
Peter van 't Hof committed
203

Peter van 't Hof's avatar
Peter van 't Hof committed
204 205 206 207 208
  /**
   * Get threads from config
   * @param default default when not found in config
   * @return number of threads
   */
bow's avatar
bow committed
209
  def getThreads(default: Int): Int = {
Peter van 't Hof's avatar
Peter van 't Hof committed
210
    val maxThreads: Int = config("maxthreads", default = 24)
bow's avatar
bow committed
211
    val threads: Int = config("threads", default = default)
212 213
    if (maxThreads > threads) threads
    else maxThreads
214
  }
bow's avatar
bow committed
215

Peter van 't Hof's avatar
Peter van 't Hof committed
216 217 218 219 220 221
  /**
   * Get threads from config
   * @param default default when not found in config
   * @param module Module when this is difrent from default
   * @return number of threads
   */
bow's avatar
bow committed
222
  def getThreads(default: Int, module: String): Int = {
Peter van 't Hof's avatar
Peter van 't Hof committed
223
    val maxThreads: Int = config("maxthreads", default = 24, submodule = module)
bow's avatar
bow committed
224
    val threads: Int = config("threads", default = default, submodule = module)
225 226
    if (maxThreads > threads) threads
    else maxThreads
227 228
  }
}
Peter van 't Hof's avatar
Peter van 't Hof committed
229

Peter van 't Hof's avatar
Peter van 't Hof committed
230
/** stores global caches */
Peter van 't Hof's avatar
Peter van 't Hof committed
231
object BiopetCommandLineFunctionTrait {
232 233 234
  private[core] val versionCache: mutable.Map[String, String] = mutable.Map()
  private[core] val executableMd5Cache: mutable.Map[String, String] = mutable.Map()
  private[core] val executableCache: mutable.Map[String, String] = mutable.Map()
235
}