BiopetCommandLineFunctionTrait.scala 7.74 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
 * Biopet is built on top of GATK Queue for building bioinformatic
 * pipelines. It is mainly intended to support LUMC SHARK cluster which is running
 * SGE. But other types of HPC that are supported by GATK Queue (such as PBS)
 * should also be able to execute Biopet tools and pipelines.
 *
 * Copyright 2014 Sequencing Analysis Support Core - Leiden University Medical Center
 *
 * Contact us at: sasc@lumc.nl
 *
 * A dual licensing mode is applied. The source code within this project that are
 * not part of GATK Queue is freely available for non-commercial use under an AGPL
 * license; For commercial users or users who do not want to follow the AGPL
 * license, please contact us to obtain a separate license.
 */
16
17
18
package nl.lumc.sasc.biopet.core

import java.io.File
Peter van 't Hof's avatar
Peter van 't Hof committed
19
import nl.lumc.sasc.biopet.core.config.Configurable
Peter van 't Hof's avatar
Peter van 't Hof committed
20
21
import org.broadinstitute.gatk.queue.QException
import org.broadinstitute.gatk.queue.function.CommandLineFunction
22
23
import org.broadinstitute.gatk.utils.commandline.{ Input, Argument }
import scala.sys.process.{ Process, ProcessLogger }
24
import scala.util.matching.Regex
25
26
import java.io.FileInputStream
import java.security.MessageDigest
27

Peter van 't Hof's avatar
Peter van 't Hof committed
28
29
30
/**
 * Biopet command line trait to auto check executable and cluster values
 */
31
trait BiopetCommandLineFunctionTrait extends CommandLineFunction with Configurable {
Peter van 't Hof's avatar
Peter van 't Hof committed
32
  analysisName = configName
bow's avatar
bow committed
33
34

  @Input(doc = "deps", required = false)
35
  var deps: List[File] = Nil
bow's avatar
bow committed
36

37
38
  var threads = 0
  val defaultThreads = 1
bow's avatar
bow committed
39

Peter van 't Hof's avatar
Peter van 't Hof committed
40
  var vmem: Option[String] = None
41
  val defaultVmem: String = ""
Peter van 't Hof's avatar
Peter van 't Hof committed
42
  var executable: String = _
bow's avatar
bow committed
43

Peter van 't Hof's avatar
Peter van 't Hof committed
44
45
46
47
  /**
   * Can override this method. This is executed just before the job is ready to run.
   * Can check on run time files from pipeline here
   */
Peter van 't Hof's avatar
Peter van 't Hof committed
48
  protected[core] def beforeCmd {}
Peter van 't Hof's avatar
Peter van 't Hof committed
49
50

  /**
Peter van 't Hof's avatar
Peter van 't Hof committed
51
   * Can override this method. This is executed after the script is done en queue starts to generate the graph
Peter van 't Hof's avatar
Peter van 't Hof committed
52
   */
53
  protected[core] def beforeGraph {}
Peter van 't Hof's avatar
Peter van 't Hof committed
54
55
56
57

  /**
   * Set default output file, threads and vmem for current job
   */
58
  override def freezeFieldValues() {
59
    preProcesExecutable
60
    beforeGraph
61
    if (jobOutputFile == null) jobOutputFile = new File(firstOutput.getAbsoluteFile.getParent, "." + firstOutput.getName + "." + configName + ".out")
bow's avatar
bow committed
62

63
64
    if (threads == 0) threads = getThreads(defaultThreads)
    if (threads > 1) nCoresRequest = Option(threads)
bow's avatar
bow committed
65

Peter van 't Hof's avatar
Peter van 't Hof committed
66
    if (vmem.isEmpty) {
Peter van 't Hof's avatar
Peter van 't Hof committed
67
      vmem = config("vmem")
68
      if (vmem.isEmpty && defaultVmem.nonEmpty) vmem = Some(defaultVmem)
69
    }
Peter van 't Hof's avatar
Peter van 't Hof committed
70
    if (vmem.isDefined) jobResourceRequests :+= "h_vmem=" + vmem.get
71
    jobName = configName + ":" + (if (firstOutput != null) firstOutput.getName else jobOutputFile)
bow's avatar
bow committed
72

73
74
    super.freezeFieldValues()
  }
bow's avatar
bow committed
75

Peter van 't Hof's avatar
Peter van 't Hof committed
76
77
78
  /**
   * Checks executable. Follow full CanonicalPath, checks if it is existing and do a md5sum on it to store in job report
   */
79
  protected[core] def preProcesExecutable {
80
81
82
83
84
85
86
87
88
89
90
91
    if (!BiopetCommandLineFunctionTrait.executableMd5Cache.contains(executable)) {
      try if (executable != null) {
        if (!BiopetCommandLineFunctionTrait.executableCache.contains(executable)) {
          val oldExecutable = executable
          val buffer = new StringBuffer()
          val cmd = Seq("which", executable)
          val process = Process(cmd).run(ProcessLogger(buffer.append(_)))
          if (process.exitValue == 0) {
            executable = buffer.toString
            val file = new File(executable)
            executable = file.getCanonicalPath
          } else {
92
            BiopetQScript.addError("executable: '" + executable + "' not found, please check config")
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
          }
          BiopetCommandLineFunctionTrait.executableCache += oldExecutable -> executable
          BiopetCommandLineFunctionTrait.executableCache += executable -> executable
        } else {
          executable = BiopetCommandLineFunctionTrait.executableCache(executable)
        }

        if (!BiopetCommandLineFunctionTrait.executableMd5Cache.contains(executable)) {
          val is = new FileInputStream(executable)
          val cnt = is.available
          val bytes = Array.ofDim[Byte](cnt)
          is.read(bytes)
          is.close()
          val temp = MessageDigest.getInstance("MD5").digest(bytes).map("%02X".format(_)).mkString.toLowerCase
          BiopetCommandLineFunctionTrait.executableMd5Cache += executable -> temp
        }
      } catch {
        case ioe: java.io.IOException => logger.warn("Could not use 'which', check on executable skipped: " + ioe)
111
112
      }
    }
113
114
    val md5 = BiopetCommandLineFunctionTrait.executableMd5Cache.get(executable)
    addJobReportBinding("md5sum_exe", md5.getOrElse("None"))
115
  }
bow's avatar
bow committed
116

Peter van 't Hof's avatar
Peter van 't Hof committed
117
118
119
  /**
   * executes checkExecutable method and fill job report
   */
120
  final protected def preCmdInternal {
121
    preProcesExecutable
bow's avatar
bow committed
122

123
    beforeCmd
bow's avatar
bow committed
124

Peter van 't Hof's avatar
Peter van 't Hof committed
125
126
    addJobReportBinding("cores", nCoresRequest match {
      case Some(n) if n > 0 => n
127
      case _                => 1
Peter van 't Hof's avatar
Peter van 't Hof committed
128
    })
129
    addJobReportBinding("version", getVersion)
130
  }
bow's avatar
bow committed
131

Peter van 't Hof's avatar
Peter van 't Hof committed
132
133
134
135
  /**
   * Command to get version of executable
   * @return
   */
136
  protected def versionCommand: String = null
Peter van 't Hof's avatar
Peter van 't Hof committed
137
138

  /** Regex to get version from version command output */
139
  protected val versionRegex: Regex = null
Peter van 't Hof's avatar
Peter van 't Hof committed
140
141
142
143
144

  /** Allowed exit codes for the version command */
  protected val versionExitcode = List(0)

  /** Executes the version command */
Peter van 't Hof's avatar
Peter van 't Hof committed
145
  private def getVersionInternal: String = {
146
    if (versionCommand == null || versionRegex == null) return "N/A"
147
    val exe = new File(versionCommand.trim.split(" ")(0))
Peter van 't Hof's avatar
Peter van 't Hof committed
148
    if (!exe.exists()) return "N/A"
149
150
    val stdout = new StringBuffer()
    val stderr = new StringBuffer()
151
152
153
154
    def outputLog = "Version command: \n" + versionCommand +
      "\n output log: \n stdout: \n" + stdout.toString +
      "\n stderr: \n" + stderr.toString
    val process = Process(versionCommand).run(ProcessLogger(stdout append _ + "\n", stderr append _ + "\n"))
155
    if (!versionExitcode.contains(process.exitValue)) {
156
      logger.warn("getVersion give exit code " + process.exitValue + ", version not found \n" + outputLog)
157
158
      return "N/A"
    }
159
    for (line <- stdout.toString.split("\n") ++ stderr.toString.split("\n")) {
bow's avatar
bow committed
160
      line match {
161
        case versionRegex(m) => return m
Peter van 't Hof's avatar
Peter van 't Hof committed
162
        case _               =>
163
164
      }
    }
165
    logger.warn("getVersion give a exit code " + process.exitValue + " but no version was found, executable correct? \n" + outputLog)
166
167
    return "N/A"
  }
bow's avatar
bow committed
168

Peter van 't Hof's avatar
Peter van 't Hof committed
169
  /** Get version from cache otherwise execute the version command  */
Peter van 't Hof's avatar
Peter van 't Hof committed
170
  def getVersion: String = {
Peter van 't Hof's avatar
Peter van 't Hof committed
171
    if (!BiopetCommandLineFunctionTrait.executableCache.contains(executable))
172
      preProcesExecutable
Peter van 't Hof's avatar
Peter van 't Hof committed
173
174
175
176
177
    if (!BiopetCommandLineFunctionTrait.versionCache.contains(executable))
      BiopetCommandLineFunctionTrait.versionCache += executable -> getVersionInternal
    return BiopetCommandLineFunctionTrait.versionCache(executable)
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
178
179
180
181
182
  /**
   * Get threads from config
   * @param default default when not found in config
   * @return number of threads
   */
bow's avatar
bow committed
183
184
185
  def getThreads(default: Int): Int = {
    val maxThreads: Int = config("maxthreads", default = 8)
    val threads: Int = config("threads", default = default)
186
187
188
    if (maxThreads > threads) return threads
    else return maxThreads
  }
bow's avatar
bow committed
189

Peter van 't Hof's avatar
Peter van 't Hof committed
190
191
192
193
194
195
  /**
   * Get threads from config
   * @param default default when not found in config
   * @param module Module when this is difrent from default
   * @return number of threads
   */
bow's avatar
bow committed
196
197
198
  def getThreads(default: Int, module: String): Int = {
    val maxThreads: Int = config("maxthreads", default = 8, submodule = module)
    val threads: Int = config("threads", default = default, submodule = module)
199
200
201
202
    if (maxThreads > threads) return threads
    else return maxThreads
  }
}
Peter van 't Hof's avatar
Peter van 't Hof committed
203

Peter van 't Hof's avatar
Peter van 't Hof committed
204
205
206
/**
 * stores global caches
 */
Peter van 't Hof's avatar
Peter van 't Hof committed
207
208
209
object BiopetCommandLineFunctionTrait {
  import scala.collection.mutable.Map
  private val versionCache: Map[String, String] = Map()
210
  private[core] val executableMd5Cache: Map[String, String] = Map()
211
  private val executableCache: Map[String, String] = Map()
212
}