BiopetCommandLineFunctionTrait.scala 8.65 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
 * Biopet is built on top of GATK Queue for building bioinformatic
 * pipelines. It is mainly intended to support LUMC SHARK cluster which is running
 * SGE. But other types of HPC that are supported by GATK Queue (such as PBS)
 * should also be able to execute Biopet tools and pipelines.
 *
 * Copyright 2014 Sequencing Analysis Support Core - Leiden University Medical Center
 *
 * Contact us at: sasc@lumc.nl
 *
 * A dual licensing mode is applied. The source code within this project that are
 * not part of GATK Queue is freely available for non-commercial use under an AGPL
 * license; For commercial users or users who do not want to follow the AGPL
 * license, please contact us to obtain a separate license.
 */
16
17
18
package nl.lumc.sasc.biopet.core

import java.io.File
Peter van 't Hof's avatar
Peter van 't Hof committed
19
import nl.lumc.sasc.biopet.core.config.Configurable
Peter van 't Hof's avatar
Peter van 't Hof committed
20
21
import org.broadinstitute.gatk.queue.QException
import org.broadinstitute.gatk.queue.function.CommandLineFunction
22
23
import org.broadinstitute.gatk.utils.commandline.{ Input, Argument }
import scala.sys.process.{ Process, ProcessLogger }
24
import scala.util.matching.Regex
25
26
import java.io.FileInputStream
import java.security.MessageDigest
27

Peter van 't Hof's avatar
Peter van 't Hof committed
28
/** Biopet command line trait to auto check executable and cluster values */
29
trait BiopetCommandLineFunctionTrait extends CommandLineFunction with Configurable {
Peter van 't Hof's avatar
Peter van 't Hof committed
30
  analysisName = configName
bow's avatar
bow committed
31
32

  @Input(doc = "deps", required = false)
33
  var deps: List[File] = Nil
bow's avatar
bow committed
34

35
36
  var threads = 0
  val defaultThreads = 1
bow's avatar
bow committed
37

38
  var vmem: Option[String] = config("vmem")
Peter van 't Hof's avatar
Peter van 't Hof committed
39
  protected val defaultCoreMemory: Double = 1.0
40
41
42
43
44
  var vmemFactor: Double = config("vmem_factor", default =
    this match {
      case _: BiopetJavaCommandLineFunction => 2.5
      case _                                => 1.5
    })
Peter van 't Hof's avatar
Peter van 't Hof committed
45
46
47

  private var coreMemory: Double = config("core_memory", default = defaultCoreMemory)

Peter van 't Hof's avatar
Peter van 't Hof committed
48
  var executable: String = _
bow's avatar
bow committed
49

Peter van 't Hof's avatar
Peter van 't Hof committed
50
51
52
53
  /**
   * Can override this method. This is executed just before the job is ready to run.
   * Can check on run time files from pipeline here
   */
Peter van 't Hof's avatar
Peter van 't Hof committed
54
  protected[core] def beforeCmd {}
Peter van 't Hof's avatar
Peter van 't Hof committed
55

Peter van 't Hof's avatar
Peter van 't Hof committed
56
  /** Can override this method. This is executed after the script is done en queue starts to generate the graph */
57
  protected[core] def beforeGraph {}
Peter van 't Hof's avatar
Peter van 't Hof committed
58

Peter van 't Hof's avatar
Peter van 't Hof committed
59
  /** Set default output file, threads and vmem for current job */
60
  override def freezeFieldValues() {
61
    preProcesExecutable
62
    beforeGraph
63
    if (jobOutputFile == null) jobOutputFile = new File(firstOutput.getAbsoluteFile.getParent, "." + firstOutput.getName + "." + configName + ".out")
bow's avatar
bow committed
64

65
66
    if (threads == 0) threads = getThreads(defaultThreads)
    if (threads > 1) nCoresRequest = Option(threads)
bow's avatar
bow committed
67

68
69
    if (memoryLimit.isEmpty) memoryLimit = Some(coreMemory * threads)

Peter van 't Hof's avatar
Peter van 't Hof committed
70
    if (vmem.isEmpty) vmem = Some((defaultCoreMemory * vmemFactor) + "G")
Peter van 't Hof's avatar
Peter van 't Hof committed
71
    if (vmem.isDefined) jobResourceRequests :+= "h_vmem=" + vmem.get
72
    jobName = configName + ":" + (if (firstOutput != null) firstOutput.getName else jobOutputFile)
bow's avatar
bow committed
73

74
75
    super.freezeFieldValues()
  }
bow's avatar
bow committed
76

Peter van 't Hof's avatar
Peter van 't Hof committed
77
78
79
80
81
  override def setupRetry(): Unit = {
    super.setupRetry()
    coreMemory += 1.0
    vmemFactor += 0.5
    memoryLimit = Some(coreMemory * threads)
82
83

    if (vmem.isDefined) jobResourceRequests = jobResourceRequests.filter(_ != "h_vmem=" + vmem.get)
Peter van 't Hof's avatar
Peter van 't Hof committed
84
85
86
87
    vmem = Some((defaultCoreMemory * vmemFactor) + "G")
    jobResourceRequests :+= "h_vmem=" + vmem.get
  }

88
89
90
  /** can override this value is executable may not be converted to CanonicalPath */
  val executableToCanonicalPath = true

Peter van 't Hof's avatar
Peter van 't Hof committed
91
92
93
  /**
   * Checks executable. Follow full CanonicalPath, checks if it is existing and do a md5sum on it to store in job report
   */
94
  protected[core] def preProcesExecutable {
95
96
97
98
99
100
101
102
103
104
    if (!BiopetCommandLineFunctionTrait.executableMd5Cache.contains(executable)) {
      try if (executable != null) {
        if (!BiopetCommandLineFunctionTrait.executableCache.contains(executable)) {
          val oldExecutable = executable
          val buffer = new StringBuffer()
          val cmd = Seq("which", executable)
          val process = Process(cmd).run(ProcessLogger(buffer.append(_)))
          if (process.exitValue == 0) {
            executable = buffer.toString
            val file = new File(executable)
105
106
            if (executableToCanonicalPath) executable = file.getCanonicalPath
            else executable = file.getAbsolutePath
107
          } else {
108
            BiopetQScript.addError("executable: '" + executable + "' not found, please check config")
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
          }
          BiopetCommandLineFunctionTrait.executableCache += oldExecutable -> executable
          BiopetCommandLineFunctionTrait.executableCache += executable -> executable
        } else {
          executable = BiopetCommandLineFunctionTrait.executableCache(executable)
        }

        if (!BiopetCommandLineFunctionTrait.executableMd5Cache.contains(executable)) {
          val is = new FileInputStream(executable)
          val cnt = is.available
          val bytes = Array.ofDim[Byte](cnt)
          is.read(bytes)
          is.close()
          val temp = MessageDigest.getInstance("MD5").digest(bytes).map("%02X".format(_)).mkString.toLowerCase
          BiopetCommandLineFunctionTrait.executableMd5Cache += executable -> temp
        }
      } catch {
        case ioe: java.io.IOException => logger.warn("Could not use 'which', check on executable skipped: " + ioe)
127
128
      }
    }
129
130
    val md5 = BiopetCommandLineFunctionTrait.executableMd5Cache.get(executable)
    addJobReportBinding("md5sum_exe", md5.getOrElse("None"))
131
  }
bow's avatar
bow committed
132

Peter van 't Hof's avatar
Peter van 't Hof committed
133
  /** executes checkExecutable method and fill job report */
134
  final protected def preCmdInternal {
135
    preProcesExecutable
bow's avatar
bow committed
136

137
    beforeCmd
bow's avatar
bow committed
138

Peter van 't Hof's avatar
Peter van 't Hof committed
139
140
    addJobReportBinding("cores", nCoresRequest match {
      case Some(n) if n > 0 => n
141
      case _                => 1
Peter van 't Hof's avatar
Peter van 't Hof committed
142
    })
143
    addJobReportBinding("version", getVersion)
144
  }
bow's avatar
bow committed
145

Peter van 't Hof's avatar
Peter van 't Hof committed
146
  /** Command to get version of executable */
147
  protected def versionCommand: String = null
Peter van 't Hof's avatar
Peter van 't Hof committed
148
149

  /** Regex to get version from version command output */
150
  protected val versionRegex: Regex = null
Peter van 't Hof's avatar
Peter van 't Hof committed
151
152
153
154
155

  /** Allowed exit codes for the version command */
  protected val versionExitcode = List(0)

  /** Executes the version command */
156
157
  private def getVersionInternal: Option[String] = {
    if (versionCommand == null || versionRegex == null) return None
158
    val exe = new File(versionCommand.trim.split(" ")(0))
159
    if (!exe.exists()) return None
160
161
    val stdout = new StringBuffer()
    val stderr = new StringBuffer()
162
163
164
165
    def outputLog = "Version command: \n" + versionCommand +
      "\n output log: \n stdout: \n" + stdout.toString +
      "\n stderr: \n" + stderr.toString
    val process = Process(versionCommand).run(ProcessLogger(stdout append _ + "\n", stderr append _ + "\n"))
166
    if (!versionExitcode.contains(process.exitValue)) {
167
      logger.warn("getVersion give exit code " + process.exitValue + ", version not found \n" + outputLog)
168
      return None
169
    }
170
    for (line <- stdout.toString.split("\n") ++ stderr.toString.split("\n")) {
bow's avatar
bow committed
171
      line match {
172
        case versionRegex(m) => return Some(m)
Peter van 't Hof's avatar
Peter van 't Hof committed
173
        case _               =>
174
175
      }
    }
176
    logger.warn("getVersion give a exit code " + process.exitValue + " but no version was found, executable correct? \n" + outputLog)
177
    return None
178
  }
bow's avatar
bow committed
179

Peter van 't Hof's avatar
Peter van 't Hof committed
180
  /** Get version from cache otherwise execute the version command  */
181
  def getVersion: Option[String] = {
Peter van 't Hof's avatar
Peter van 't Hof committed
182
    if (!BiopetCommandLineFunctionTrait.executableCache.contains(executable))
183
      preProcesExecutable
184
185
186
187
188
189
    if (!BiopetCommandLineFunctionTrait.versionCache.contains(versionCommand))
      getVersionInternal match {
        case Some(version) => BiopetCommandLineFunctionTrait.versionCache += versionCommand -> version
        case _             =>
      }
    BiopetCommandLineFunctionTrait.versionCache.get(versionCommand)
Peter van 't Hof's avatar
Peter van 't Hof committed
190
191
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
192
193
194
195
196
  /**
   * Get threads from config
   * @param default default when not found in config
   * @return number of threads
   */
bow's avatar
bow committed
197
198
199
  def getThreads(default: Int): Int = {
    val maxThreads: Int = config("maxthreads", default = 8)
    val threads: Int = config("threads", default = default)
200
201
202
    if (maxThreads > threads) return threads
    else return maxThreads
  }
bow's avatar
bow committed
203

Peter van 't Hof's avatar
Peter van 't Hof committed
204
205
206
207
208
209
  /**
   * Get threads from config
   * @param default default when not found in config
   * @param module Module when this is difrent from default
   * @return number of threads
   */
bow's avatar
bow committed
210
211
212
  def getThreads(default: Int, module: String): Int = {
    val maxThreads: Int = config("maxthreads", default = 8, submodule = module)
    val threads: Int = config("threads", default = default, submodule = module)
213
214
215
216
    if (maxThreads > threads) return threads
    else return maxThreads
  }
}
Peter van 't Hof's avatar
Peter van 't Hof committed
217

Peter van 't Hof's avatar
Peter van 't Hof committed
218
/** stores global caches */
Peter van 't Hof's avatar
Peter van 't Hof committed
219
220
221
object BiopetCommandLineFunctionTrait {
  import scala.collection.mutable.Map
  private val versionCache: Map[String, String] = Map()
222
  private[core] val executableMd5Cache: Map[String, String] = Map()
223
  private val executableCache: Map[String, String] = Map()
224
}