BiopetCommandLineFunctionTrait.scala 7.64 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
 * Biopet is built on top of GATK Queue for building bioinformatic
 * pipelines. It is mainly intended to support LUMC SHARK cluster which is running
 * SGE. But other types of HPC that are supported by GATK Queue (such as PBS)
 * should also be able to execute Biopet tools and pipelines.
 *
 * Copyright 2014 Sequencing Analysis Support Core - Leiden University Medical Center
 *
 * Contact us at: sasc@lumc.nl
 *
 * A dual licensing mode is applied. The source code within this project that are
 * not part of GATK Queue is freely available for non-commercial use under an AGPL
 * license; For commercial users or users who do not want to follow the AGPL
 * license, please contact us to obtain a separate license.
 */
16
17
package nl.lumc.sasc.biopet.core

18
//import java.io.BufferedInputStream
19
import java.io.File
Peter van 't Hof's avatar
Peter van 't Hof committed
20
import nl.lumc.sasc.biopet.core.config.Configurable
Peter van 't Hof's avatar
Peter van 't Hof committed
21
22
import org.broadinstitute.gatk.queue.QException
import org.broadinstitute.gatk.queue.function.CommandLineFunction
23
import org.broadinstitute.gatk.utils.commandline.{ Input, Argument }
24
//import scala.io.Source
25
import scala.sys.process.{ Process, ProcessLogger }
26
import scala.util.matching.Regex
27
28
import java.io.FileInputStream
import java.security.MessageDigest
29

Peter van 't Hof's avatar
Peter van 't Hof committed
30
31
32
/**
 * Biopet command line trait to auto check executable and cluster values
 */
33
trait BiopetCommandLineFunctionTrait extends CommandLineFunction with Configurable {
Peter van 't Hof's avatar
Peter van 't Hof committed
34
  analysisName = configName
bow's avatar
bow committed
35
36

  @Input(doc = "deps", required = false)
37
  var deps: List[File] = Nil
bow's avatar
bow committed
38

Peter van 't Hof's avatar
Peter van 't Hof committed
39

40
41
  var threads = 0
  val defaultThreads = 1
bow's avatar
bow committed
42

Peter van 't Hof's avatar
Peter van 't Hof committed
43
  var vmem: Option[String] = None
44
  val defaultVmem: String = ""
Peter van 't Hof's avatar
Peter van 't Hof committed
45
  var executable: String = _
bow's avatar
bow committed
46

Peter van 't Hof's avatar
Peter van 't Hof committed
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
  /**
   * Can override this method. This is executed just before the job is ready to run.
   * Can check on run time files from pipeline here
   */
  protected[core] def beforeCmd { }

  /**
   * Can override this method. This is executed after the script is done en queue starts to generate the graph
   */
  protected[core] def afterGraph { }
  //TODO: function need rename to beforeGraph

  /**
   * Set default output file, threads and vmem for current job
   */
62
  override def freezeFieldValues() {
Peter van 't Hof's avatar
Peter van 't Hof committed
63
    checkExecutable
64
    afterGraph
Peter van 't Hof's avatar
Peter van 't Hof committed
65
    jobOutputFile = new File(firstOutput.getParent + "/." + firstOutput.getName + "." + configName + ".out")
bow's avatar
bow committed
66

67
68
    if (threads == 0) threads = getThreads(defaultThreads)
    if (threads > 1) nCoresRequest = Option(threads)
bow's avatar
bow committed
69

Peter van 't Hof's avatar
Peter van 't Hof committed
70
    if (vmem.isEmpty) {
Peter van 't Hof's avatar
Peter van 't Hof committed
71
      vmem = config("vmem")
72
      if (vmem.isEmpty && defaultVmem.nonEmpty) vmem = Some(defaultVmem)
73
74
    }
    if (vmem != null) jobResourceRequests :+= "h_vmem=" + vmem
Peter van 't Hof's avatar
Peter van 't Hof committed
75
    jobName = configName + ":" + firstOutput.getName
bow's avatar
bow committed
76

77
78
    super.freezeFieldValues()
  }
bow's avatar
bow committed
79

Peter van 't Hof's avatar
Peter van 't Hof committed
80
81
82
  /**
   * Checks executable. Follow full CanonicalPath, checks if it is existing and do a md5sum on it to store in job report
   */
Peter van 't Hof's avatar
Peter van 't Hof committed
83
  protected def checkExecutable {
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
    if (!BiopetCommandLineFunctionTrait.executableMd5Cache.contains(executable)) {
      try if (executable != null) {
        if (!BiopetCommandLineFunctionTrait.executableCache.contains(executable)) {
          val oldExecutable = executable
          val buffer = new StringBuffer()
          val cmd = Seq("which", executable)
          val process = Process(cmd).run(ProcessLogger(buffer.append(_)))
          if (process.exitValue == 0) {
            executable = buffer.toString
            val file = new File(executable)
            executable = file.getCanonicalPath
          } else {
            logger.error("executable: '" + executable + "' not found, please check config")
            throw new QException("executable: '" + executable + "' not found, please check config")
          }
          BiopetCommandLineFunctionTrait.executableCache += oldExecutable -> executable
          BiopetCommandLineFunctionTrait.executableCache += executable -> executable
        } else {
          executable = BiopetCommandLineFunctionTrait.executableCache(executable)
        }

        if (!BiopetCommandLineFunctionTrait.executableMd5Cache.contains(executable)) {
          val is = new FileInputStream(executable)
          val cnt = is.available
          val bytes = Array.ofDim[Byte](cnt)
          is.read(bytes)
          is.close()
          val temp = MessageDigest.getInstance("MD5").digest(bytes).map("%02X".format(_)).mkString.toLowerCase
          BiopetCommandLineFunctionTrait.executableMd5Cache += executable -> temp
        }
      } catch {
        case ioe: java.io.IOException => logger.warn("Could not use 'which', check on executable skipped: " + ioe)
116
117
      }
    }
Peter van 't Hof's avatar
Peter van 't Hof committed
118
119
120
    val md5 = BiopetCommandLineFunctionTrait.executableMd5Cache(executable)
    if (md5 == null) addJobReportBinding("md5sum_exe", md5)
    else addJobReportBinding("md5sum_exe", "None")
121
  }
bow's avatar
bow committed
122

Peter van 't Hof's avatar
Peter van 't Hof committed
123
124
125
  /**
   * executes checkExecutable method and fill job report
   */
126
  final protected def preCmdInternal {
Peter van 't Hof's avatar
Peter van 't Hof committed
127
    checkExecutable
bow's avatar
bow committed
128

129
    beforeCmd
bow's avatar
bow committed
130

131
    addJobReportBinding("cores", if (nCoresRequest.get.toInt > 0) nCoresRequest.get.toInt else 1)
132
    addJobReportBinding("version", getVersion)
133
  }
bow's avatar
bow committed
134

Peter van 't Hof's avatar
Peter van 't Hof committed
135
136
137
138
  /**
   * Command to get version of executable
   * @return
   */
139
  protected def versionCommand: String = null
Peter van 't Hof's avatar
Peter van 't Hof committed
140
141

  /** Regex to get version from version command output */
142
  protected val versionRegex: Regex = null
Peter van 't Hof's avatar
Peter van 't Hof committed
143
144
145
146
147

  /** Allowed exit codes for the version command */
  protected val versionExitcode = List(0)

  /** Executes the version command */
Peter van 't Hof's avatar
Peter van 't Hof committed
148
  private def getVersionInternal: String = {
149
    if (versionCommand == null || versionRegex == null) return "N/A"
150
151
    val stdout = new StringBuffer()
    val stderr = new StringBuffer()
152
153
154
155
    def outputLog = "Version command: \n" + versionCommand +
      "\n output log: \n stdout: \n" + stdout.toString +
      "\n stderr: \n" + stderr.toString
    val process = Process(versionCommand).run(ProcessLogger(stdout append _ + "\n", stderr append _ + "\n"))
156
    if (!versionExitcode.contains(process.exitValue)) {
157
      logger.warn("getVersion give exit code " + process.exitValue + ", version not found \n" + outputLog)
158
159
      return "N/A"
    }
160
    for (line <- stdout.toString.split("\n") ++ stderr.toString.split("\n")) {
bow's avatar
bow committed
161
      line match {
162
        case versionRegex(m) => return m
Peter van 't Hof's avatar
Peter van 't Hof committed
163
        case _               =>
164
165
      }
    }
166
    logger.warn("getVersion give a exit code " + process.exitValue + " but no version was found, executable correct? \n" + outputLog)
167
168
    return "N/A"
  }
bow's avatar
bow committed
169

Peter van 't Hof's avatar
Peter van 't Hof committed
170
  /** Get version from cache otherwise execute the version command  */
Peter van 't Hof's avatar
Peter van 't Hof committed
171
172
173
174
175
176
  def getVersion: String = {
    if (!BiopetCommandLineFunctionTrait.versionCache.contains(executable))
      BiopetCommandLineFunctionTrait.versionCache += executable -> getVersionInternal
    return BiopetCommandLineFunctionTrait.versionCache(executable)
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
177
178
179
180
181
  /**
   * Get threads from config
   * @param default default when not found in config
   * @return number of threads
   */
bow's avatar
bow committed
182
183
184
  def getThreads(default: Int): Int = {
    val maxThreads: Int = config("maxthreads", default = 8)
    val threads: Int = config("threads", default = default)
185
186
187
    if (maxThreads > threads) return threads
    else return maxThreads
  }
bow's avatar
bow committed
188

Peter van 't Hof's avatar
Peter van 't Hof committed
189
190
191
192
193
194
  /**
   * Get threads from config
   * @param default default when not found in config
   * @param module Module when this is difrent from default
   * @return number of threads
   */
bow's avatar
bow committed
195
196
197
  def getThreads(default: Int, module: String): Int = {
    val maxThreads: Int = config("maxthreads", default = 8, submodule = module)
    val threads: Int = config("threads", default = default, submodule = module)
198
199
200
201
    if (maxThreads > threads) return threads
    else return maxThreads
  }
}
Peter van 't Hof's avatar
Peter van 't Hof committed
202

Peter van 't Hof's avatar
Peter van 't Hof committed
203
204
205
/**
 * stores global caches
 */
Peter van 't Hof's avatar
Peter van 't Hof committed
206
207
208
object BiopetCommandLineFunctionTrait {
  import scala.collection.mutable.Map
  private val versionCache: Map[String, String] = Map()
209
210
  private val executableMd5Cache: Map[String, String] = Map()
  private val executableCache: Map[String, String] = Map()
Peter van 't Hof's avatar
Peter van 't Hof committed
211
}