BiopetCommandLineFunctionTrait.scala 9.13 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
 * Biopet is built on top of GATK Queue for building bioinformatic
 * pipelines. It is mainly intended to support LUMC SHARK cluster which is running
 * SGE. But other types of HPC that are supported by GATK Queue (such as PBS)
 * should also be able to execute Biopet tools and pipelines.
 *
 * Copyright 2014 Sequencing Analysis Support Core - Leiden University Medical Center
 *
 * Contact us at: sasc@lumc.nl
 *
 * A dual licensing mode is applied. The source code within this project that are
 * not part of GATK Queue is freely available for non-commercial use under an AGPL
 * license; For commercial users or users who do not want to follow the AGPL
 * license, please contact us to obtain a separate license.
 */
16
17
18
package nl.lumc.sasc.biopet.core

import java.io.File
Peter van 't Hof's avatar
Peter van 't Hof committed
19
import nl.lumc.sasc.biopet.core.config.Configurable
Peter van 't Hof's avatar
Peter van 't Hof committed
20
21
import org.broadinstitute.gatk.queue.QException
import org.broadinstitute.gatk.queue.function.CommandLineFunction
22
23
import org.broadinstitute.gatk.utils.commandline.{ Input, Argument }
import scala.sys.process.{ Process, ProcessLogger }
24
import scala.util.matching.Regex
25
26
import java.io.FileInputStream
import java.security.MessageDigest
27

Peter van 't Hof's avatar
Peter van 't Hof committed
28
/** Biopet command line trait to auto check executable and cluster values */
29
trait BiopetCommandLineFunctionTrait extends CommandLineFunction with Configurable {
Peter van 't Hof's avatar
Peter van 't Hof committed
30
  analysisName = configName
bow's avatar
bow committed
31
32

  @Input(doc = "deps", required = false)
33
  var deps: List[File] = Nil
bow's avatar
bow committed
34

35
  var threads = 0
Peter van 't Hof's avatar
Peter van 't Hof committed
36
  def defaultThreads = 1
bow's avatar
bow committed
37

38
  var vmem: Option[String] = config("vmem")
Peter van 't Hof's avatar
Peter van 't Hof committed
39
40
  protected def defaultCoreMemory: Double = 1.0
  protected def defaultVmemFactor: Double = 1.4
41
  var vmemFactor: Double = config("vmem_factor", default = defaultVmemFactor)
Peter van 't Hof's avatar
Peter van 't Hof committed
42

Peter van 't Hof's avatar
Peter van 't Hof committed
43
  var residentFactor: Double = config("resident_factor", default = 1.2)
44
45

  private var coreMemory: Double = _
Peter van 't Hof's avatar
Peter van 't Hof committed
46

Peter van 't Hof's avatar
Peter van 't Hof committed
47
  var executable: String = _
bow's avatar
bow committed
48

Peter van 't Hof's avatar
Peter van 't Hof committed
49
50
51
52
  /**
   * Can override this method. This is executed just before the job is ready to run.
   * Can check on run time files from pipeline here
   */
Peter van 't Hof's avatar
Peter van 't Hof committed
53
  protected[core] def beforeCmd {}
Peter van 't Hof's avatar
Peter van 't Hof committed
54

Peter van 't Hof's avatar
Peter van 't Hof committed
55
  /** Can override this method. This is executed after the script is done en queue starts to generate the graph */
56
  protected[core] def beforeGraph {}
Peter van 't Hof's avatar
Peter van 't Hof committed
57

Peter van 't Hof's avatar
Peter van 't Hof committed
58
  /** Set default output file, threads and vmem for current job */
59
  override def freezeFieldValues() {
60
    preProcesExecutable
61
    beforeGraph
62
    if (jobOutputFile == null) jobOutputFile = new File(firstOutput.getAbsoluteFile.getParent, "." + firstOutput.getName + "." + configName + ".out")
bow's avatar
bow committed
63

64
65
    if (threads == 0) threads = getThreads(defaultThreads)
    if (threads > 1) nCoresRequest = Option(threads)
bow's avatar
bow committed
66

67
68
69
70
71
    coreMemory = config("core_memory", default = defaultCoreMemory).asDouble + (0.5 * retry)

    if (config.contains("memory_limit")) memoryLimit = config("memory_limit")
    else memoryLimit = Some(coreMemory * threads)

Peter van 't Hof's avatar
Peter van 't Hof committed
72
    if (config.contains("resident_limit")) residentLimit = config("resident_limit")
Peter van 't Hof's avatar
Peter van 't Hof committed
73
    else residentLimit = Some((coreMemory + (0.5 * retry)) * residentFactor)
74

75
    if (!config.contains("vmem")) vmem = Some((coreMemory * (vmemFactor + (0.5 * retry))) + "G")
Peter van 't Hof's avatar
Peter van 't Hof committed
76
    if (vmem.isDefined) jobResourceRequests :+= "h_vmem=" + vmem.get
77
    jobName = configName + ":" + (if (firstOutput != null) firstOutput.getName else jobOutputFile)
bow's avatar
bow committed
78

79
80
    super.freezeFieldValues()
  }
bow's avatar
bow committed
81

82
83
  var retry = 0

Peter van 't Hof's avatar
Peter van 't Hof committed
84
85
  override def setupRetry(): Unit = {
    super.setupRetry()
86
    if (vmem.isDefined) jobResourceRequests = jobResourceRequests.filterNot(_.contains("h_vmem="))
87
    logger.info("Auto raise memory on retry")
88
89
    retry += 1
    this.freeze()
Peter van 't Hof's avatar
Peter van 't Hof committed
90
91
  }

92
93
94
  /** can override this value is executable may not be converted to CanonicalPath */
  val executableToCanonicalPath = true

Peter van 't Hof's avatar
Peter van 't Hof committed
95
96
97
  /**
   * Checks executable. Follow full CanonicalPath, checks if it is existing and do a md5sum on it to store in job report
   */
98
  protected[core] def preProcesExecutable {
99
100
101
102
103
104
105
106
107
108
    if (!BiopetCommandLineFunctionTrait.executableMd5Cache.contains(executable)) {
      try if (executable != null) {
        if (!BiopetCommandLineFunctionTrait.executableCache.contains(executable)) {
          val oldExecutable = executable
          val buffer = new StringBuffer()
          val cmd = Seq("which", executable)
          val process = Process(cmd).run(ProcessLogger(buffer.append(_)))
          if (process.exitValue == 0) {
            executable = buffer.toString
            val file = new File(executable)
109
110
            if (executableToCanonicalPath) executable = file.getCanonicalPath
            else executable = file.getAbsolutePath
111
          } else {
112
            BiopetQScript.addError("executable: '" + executable + "' not found, please check config")
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
          }
          BiopetCommandLineFunctionTrait.executableCache += oldExecutable -> executable
          BiopetCommandLineFunctionTrait.executableCache += executable -> executable
        } else {
          executable = BiopetCommandLineFunctionTrait.executableCache(executable)
        }

        if (!BiopetCommandLineFunctionTrait.executableMd5Cache.contains(executable)) {
          val is = new FileInputStream(executable)
          val cnt = is.available
          val bytes = Array.ofDim[Byte](cnt)
          is.read(bytes)
          is.close()
          val temp = MessageDigest.getInstance("MD5").digest(bytes).map("%02X".format(_)).mkString.toLowerCase
          BiopetCommandLineFunctionTrait.executableMd5Cache += executable -> temp
        }
      } catch {
        case ioe: java.io.IOException => logger.warn("Could not use 'which', check on executable skipped: " + ioe)
131
132
      }
    }
133
134
    val md5 = BiopetCommandLineFunctionTrait.executableMd5Cache.get(executable)
    addJobReportBinding("md5sum_exe", md5.getOrElse("None"))
135
  }
bow's avatar
bow committed
136

Peter van 't Hof's avatar
Peter van 't Hof committed
137
  /** executes checkExecutable method and fill job report */
138
  final protected def preCmdInternal {
139
    preProcesExecutable
bow's avatar
bow committed
140

141
    beforeCmd
bow's avatar
bow committed
142

Peter van 't Hof's avatar
Peter van 't Hof committed
143
144
    addJobReportBinding("cores", nCoresRequest match {
      case Some(n) if n > 0 => n
145
      case _                => 1
Peter van 't Hof's avatar
Peter van 't Hof committed
146
    })
147
    addJobReportBinding("version", getVersion)
148
  }
bow's avatar
bow committed
149

Peter van 't Hof's avatar
Peter van 't Hof committed
150
  /** Command to get version of executable */
151
  protected def versionCommand: String = null
Peter van 't Hof's avatar
Peter van 't Hof committed
152
153

  /** Regex to get version from version command output */
Peter van 't Hof's avatar
Peter van 't Hof committed
154
  protected def versionRegex: Regex = null
Peter van 't Hof's avatar
Peter van 't Hof committed
155
156

  /** Allowed exit codes for the version command */
Peter van 't Hof's avatar
Peter van 't Hof committed
157
  protected def versionExitcode = List(0)
Peter van 't Hof's avatar
Peter van 't Hof committed
158
159

  /** Executes the version command */
160
161
162
163
164
165
166
  private[core] def getVersionInternal(): Option[String] = {
    if (versionCommand == null || versionRegex == null) None
    else getVersionInternal(versionCommand, versionRegex)
  }

  /** Executes the version command */
  private[core] def getVersionInternal(versionCommand: String, versionRegex: Regex): Option[String] = {
167
    if (versionCommand == null || versionRegex == null) return None
168
    val exe = new File(versionCommand.trim.split(" ")(0))
169
    if (!exe.exists()) return None
170
171
    val stdout = new StringBuffer()
    val stderr = new StringBuffer()
172
173
174
175
    def outputLog = "Version command: \n" + versionCommand +
      "\n output log: \n stdout: \n" + stdout.toString +
      "\n stderr: \n" + stderr.toString
    val process = Process(versionCommand).run(ProcessLogger(stdout append _ + "\n", stderr append _ + "\n"))
176
    if (!versionExitcode.contains(process.exitValue)) {
177
      logger.warn("getVersion give exit code " + process.exitValue + ", version not found \n" + outputLog)
178
      return None
179
    }
180
    for (line <- stdout.toString.split("\n") ++ stderr.toString.split("\n")) {
bow's avatar
bow committed
181
      line match {
182
        case versionRegex(m) => return Some(m)
Peter van 't Hof's avatar
Peter van 't Hof committed
183
        case _               =>
184
185
      }
    }
186
    logger.warn("getVersion give a exit code " + process.exitValue + " but no version was found, executable correct? \n" + outputLog)
187
    return None
188
  }
bow's avatar
bow committed
189

Peter van 't Hof's avatar
Peter van 't Hof committed
190
  /** Get version from cache otherwise execute the version command  */
191
  def getVersion: Option[String] = {
Peter van 't Hof's avatar
Peter van 't Hof committed
192
    if (!BiopetCommandLineFunctionTrait.executableCache.contains(executable))
193
      preProcesExecutable
194
195
196
197
198
199
    if (!BiopetCommandLineFunctionTrait.versionCache.contains(versionCommand))
      getVersionInternal match {
        case Some(version) => BiopetCommandLineFunctionTrait.versionCache += versionCommand -> version
        case _             =>
      }
    BiopetCommandLineFunctionTrait.versionCache.get(versionCommand)
Peter van 't Hof's avatar
Peter van 't Hof committed
200
201
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
202
203
204
205
206
  /**
   * Get threads from config
   * @param default default when not found in config
   * @return number of threads
   */
bow's avatar
bow committed
207
208
209
  def getThreads(default: Int): Int = {
    val maxThreads: Int = config("maxthreads", default = 8)
    val threads: Int = config("threads", default = default)
210
211
212
    if (maxThreads > threads) return threads
    else return maxThreads
  }
bow's avatar
bow committed
213

Peter van 't Hof's avatar
Peter van 't Hof committed
214
215
216
217
218
219
  /**
   * Get threads from config
   * @param default default when not found in config
   * @param module Module when this is difrent from default
   * @return number of threads
   */
bow's avatar
bow committed
220
221
222
  def getThreads(default: Int, module: String): Int = {
    val maxThreads: Int = config("maxthreads", default = 8, submodule = module)
    val threads: Int = config("threads", default = default, submodule = module)
223
224
225
226
    if (maxThreads > threads) return threads
    else return maxThreads
  }
}
Peter van 't Hof's avatar
Peter van 't Hof committed
227

Peter van 't Hof's avatar
Peter van 't Hof committed
228
/** stores global caches */
Peter van 't Hof's avatar
Peter van 't Hof committed
229
230
object BiopetCommandLineFunctionTrait {
  import scala.collection.mutable.Map
231
  private[core] val versionCache: Map[String, String] = Map()
232
  private[core] val executableMd5Cache: Map[String, String] = Map()
233
  private[core] val executableCache: Map[String, String] = Map()
234
}