BiopetCommandLineFunctionTrait.scala 7.89 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
 * Biopet is built on top of GATK Queue for building bioinformatic
 * pipelines. It is mainly intended to support LUMC SHARK cluster which is running
 * SGE. But other types of HPC that are supported by GATK Queue (such as PBS)
 * should also be able to execute Biopet tools and pipelines.
 *
 * Copyright 2014 Sequencing Analysis Support Core - Leiden University Medical Center
 *
 * Contact us at: sasc@lumc.nl
 *
 * A dual licensing mode is applied. The source code within this project that are
 * not part of GATK Queue is freely available for non-commercial use under an AGPL
 * license; For commercial users or users who do not want to follow the AGPL
 * license, please contact us to obtain a separate license.
 */
16
17
18
package nl.lumc.sasc.biopet.core

import java.io.File
Peter van 't Hof's avatar
Peter van 't Hof committed
19
import nl.lumc.sasc.biopet.core.config.Configurable
Peter van 't Hof's avatar
Peter van 't Hof committed
20
21
import org.broadinstitute.gatk.queue.QException
import org.broadinstitute.gatk.queue.function.CommandLineFunction
22
23
import org.broadinstitute.gatk.utils.commandline.{ Input, Argument }
import scala.sys.process.{ Process, ProcessLogger }
24
import scala.util.matching.Regex
25
26
import java.io.FileInputStream
import java.security.MessageDigest
27

Peter van 't Hof's avatar
Peter van 't Hof committed
28
/** Biopet command line trait to auto check executable and cluster values */
29
trait BiopetCommandLineFunctionTrait extends CommandLineFunction with Configurable {
Peter van 't Hof's avatar
Peter van 't Hof committed
30
  analysisName = configName
bow's avatar
bow committed
31
32

  @Input(doc = "deps", required = false)
33
  var deps: List[File] = Nil
bow's avatar
bow committed
34

35
36
  var threads = 0
  val defaultThreads = 1
bow's avatar
bow committed
37

Peter van 't Hof's avatar
Peter van 't Hof committed
38
  var vmem: Option[String] = None
39
  val defaultVmem: String = ""
Peter van 't Hof's avatar
Peter van 't Hof committed
40
  var executable: String = _
bow's avatar
bow committed
41

Peter van 't Hof's avatar
Peter van 't Hof committed
42
43
44
45
  /**
   * Can override this method. This is executed just before the job is ready to run.
   * Can check on run time files from pipeline here
   */
Peter van 't Hof's avatar
Peter van 't Hof committed
46
  protected[core] def beforeCmd {}
Peter van 't Hof's avatar
Peter van 't Hof committed
47

Peter van 't Hof's avatar
Peter van 't Hof committed
48
  /** Can override this method. This is executed after the script is done en queue starts to generate the graph */
49
  protected[core] def beforeGraph {}
Peter van 't Hof's avatar
Peter van 't Hof committed
50

Peter van 't Hof's avatar
Peter van 't Hof committed
51
  /** Set default output file, threads and vmem for current job */
52
  override def freezeFieldValues() {
53
    preProcesExecutable
54
    beforeGraph
55
    if (jobOutputFile == null) jobOutputFile = new File(firstOutput.getAbsoluteFile.getParent, "." + firstOutput.getName + "." + configName + ".out")
bow's avatar
bow committed
56

57
58
    if (threads == 0) threads = getThreads(defaultThreads)
    if (threads > 1) nCoresRequest = Option(threads)
bow's avatar
bow committed
59

Peter van 't Hof's avatar
Peter van 't Hof committed
60
    if (vmem.isEmpty) {
Peter van 't Hof's avatar
Peter van 't Hof committed
61
      vmem = config("vmem")
62
      if (vmem.isEmpty && defaultVmem.nonEmpty) vmem = Some(defaultVmem)
63
    }
Peter van 't Hof's avatar
Peter van 't Hof committed
64
    if (vmem.isDefined) jobResourceRequests :+= "h_vmem=" + vmem.get
65
    jobName = configName + ":" + (if (firstOutput != null) firstOutput.getName else jobOutputFile)
bow's avatar
bow committed
66

67
68
    super.freezeFieldValues()
  }
bow's avatar
bow committed
69

70
71
72
  /** can override this value is executable may not be converted to CanonicalPath */
  val executableToCanonicalPath = true

Peter van 't Hof's avatar
Peter van 't Hof committed
73
74
75
  /**
   * Checks executable. Follow full CanonicalPath, checks if it is existing and do a md5sum on it to store in job report
   */
76
  protected[core] def preProcesExecutable {
77
78
79
80
81
82
83
84
85
86
    if (!BiopetCommandLineFunctionTrait.executableMd5Cache.contains(executable)) {
      try if (executable != null) {
        if (!BiopetCommandLineFunctionTrait.executableCache.contains(executable)) {
          val oldExecutable = executable
          val buffer = new StringBuffer()
          val cmd = Seq("which", executable)
          val process = Process(cmd).run(ProcessLogger(buffer.append(_)))
          if (process.exitValue == 0) {
            executable = buffer.toString
            val file = new File(executable)
87
88
            if (executableToCanonicalPath) executable = file.getCanonicalPath
            else executable = file.getAbsolutePath
89
          } else {
90
            BiopetQScript.addError("executable: '" + executable + "' not found, please check config")
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
          }
          BiopetCommandLineFunctionTrait.executableCache += oldExecutable -> executable
          BiopetCommandLineFunctionTrait.executableCache += executable -> executable
        } else {
          executable = BiopetCommandLineFunctionTrait.executableCache(executable)
        }

        if (!BiopetCommandLineFunctionTrait.executableMd5Cache.contains(executable)) {
          val is = new FileInputStream(executable)
          val cnt = is.available
          val bytes = Array.ofDim[Byte](cnt)
          is.read(bytes)
          is.close()
          val temp = MessageDigest.getInstance("MD5").digest(bytes).map("%02X".format(_)).mkString.toLowerCase
          BiopetCommandLineFunctionTrait.executableMd5Cache += executable -> temp
        }
      } catch {
        case ioe: java.io.IOException => logger.warn("Could not use 'which', check on executable skipped: " + ioe)
109
110
      }
    }
111
112
    val md5 = BiopetCommandLineFunctionTrait.executableMd5Cache.get(executable)
    addJobReportBinding("md5sum_exe", md5.getOrElse("None"))
113
  }
bow's avatar
bow committed
114

Peter van 't Hof's avatar
Peter van 't Hof committed
115
  /** executes checkExecutable method and fill job report */
116
  final protected def preCmdInternal {
117
    preProcesExecutable
bow's avatar
bow committed
118

119
    beforeCmd
bow's avatar
bow committed
120

Peter van 't Hof's avatar
Peter van 't Hof committed
121
122
    addJobReportBinding("cores", nCoresRequest match {
      case Some(n) if n > 0 => n
123
      case _                => 1
Peter van 't Hof's avatar
Peter van 't Hof committed
124
    })
125
    addJobReportBinding("version", getVersion)
126
  }
bow's avatar
bow committed
127

Peter van 't Hof's avatar
Peter van 't Hof committed
128
  /** Command to get version of executable */
129
  protected def versionCommand: String = null
Peter van 't Hof's avatar
Peter van 't Hof committed
130
131

  /** Regex to get version from version command output */
132
  protected val versionRegex: Regex = null
Peter van 't Hof's avatar
Peter van 't Hof committed
133
134
135
136
137

  /** Allowed exit codes for the version command */
  protected val versionExitcode = List(0)

  /** Executes the version command */
Peter van 't Hof's avatar
Peter van 't Hof committed
138
  private def getVersionInternal: String = {
139
    if (versionCommand == null || versionRegex == null) return "N/A"
140
    val exe = new File(versionCommand.trim.split(" ")(0))
Peter van 't Hof's avatar
Peter van 't Hof committed
141
    if (!exe.exists()) return "N/A"
142
143
    val stdout = new StringBuffer()
    val stderr = new StringBuffer()
144
145
146
147
    def outputLog = "Version command: \n" + versionCommand +
      "\n output log: \n stdout: \n" + stdout.toString +
      "\n stderr: \n" + stderr.toString
    val process = Process(versionCommand).run(ProcessLogger(stdout append _ + "\n", stderr append _ + "\n"))
148
    if (!versionExitcode.contains(process.exitValue)) {
149
      logger.warn("getVersion give exit code " + process.exitValue + ", version not found \n" + outputLog)
150
151
      return "N/A"
    }
152
    for (line <- stdout.toString.split("\n") ++ stderr.toString.split("\n")) {
bow's avatar
bow committed
153
      line match {
154
        case versionRegex(m) => return m
Peter van 't Hof's avatar
Peter van 't Hof committed
155
        case _               =>
156
157
      }
    }
158
    logger.warn("getVersion give a exit code " + process.exitValue + " but no version was found, executable correct? \n" + outputLog)
159
160
    return "N/A"
  }
bow's avatar
bow committed
161

Peter van 't Hof's avatar
Peter van 't Hof committed
162
  /** Get version from cache otherwise execute the version command  */
Peter van 't Hof's avatar
Peter van 't Hof committed
163
  def getVersion: String = {
Peter van 't Hof's avatar
Peter van 't Hof committed
164
    if (!BiopetCommandLineFunctionTrait.executableCache.contains(executable))
165
      preProcesExecutable
Peter van 't Hof's avatar
Peter van 't Hof committed
166
167
168
169
170
    if (!BiopetCommandLineFunctionTrait.versionCache.contains(executable))
      BiopetCommandLineFunctionTrait.versionCache += executable -> getVersionInternal
    return BiopetCommandLineFunctionTrait.versionCache(executable)
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
171
172
173
174
175
  /**
   * Get threads from config
   * @param default default when not found in config
   * @return number of threads
   */
bow's avatar
bow committed
176
177
178
  def getThreads(default: Int): Int = {
    val maxThreads: Int = config("maxthreads", default = 8)
    val threads: Int = config("threads", default = default)
179
180
181
    if (maxThreads > threads) return threads
    else return maxThreads
  }
bow's avatar
bow committed
182

Peter van 't Hof's avatar
Peter van 't Hof committed
183
184
185
186
187
188
  /**
   * Get threads from config
   * @param default default when not found in config
   * @param module Module when this is difrent from default
   * @return number of threads
   */
bow's avatar
bow committed
189
190
191
  def getThreads(default: Int, module: String): Int = {
    val maxThreads: Int = config("maxthreads", default = 8, submodule = module)
    val threads: Int = config("threads", default = default, submodule = module)
192
193
194
195
    if (maxThreads > threads) return threads
    else return maxThreads
  }
}
Peter van 't Hof's avatar
Peter van 't Hof committed
196

Peter van 't Hof's avatar
Peter van 't Hof committed
197
/** stores global caches */
Peter van 't Hof's avatar
Peter van 't Hof committed
198
199
200
object BiopetCommandLineFunctionTrait {
  import scala.collection.mutable.Map
  private val versionCache: Map[String, String] = Map()
201
  private[core] val executableMd5Cache: Map[String, String] = Map()
202
  private val executableCache: Map[String, String] = Map()
203
}