BiopetCommandLineFunction.scala 10.3 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
/**
 * Biopet is built on top of GATK Queue for building bioinformatic
 * pipelines. It is mainly intended to support LUMC SHARK cluster which is running
 * SGE. But other types of HPC that are supported by GATK Queue (such as PBS)
 * should also be able to execute Biopet tools and pipelines.
 *
 * Copyright 2014 Sequencing Analysis Support Core - Leiden University Medical Center
 *
 * Contact us at: sasc@lumc.nl
 *
11
 * A dual licensing mode is applied. The source code within this project is freely available for non-commercial use under an AGPL
12
13
14
 * license; For commercial users or users who do not want to follow the AGPL
 * license, please contact us to obtain a separate license.
 */
15
16
package nl.lumc.sasc.biopet.core

17
import java.io.{ File, FileInputStream, PrintWriter }
18
19
import java.security.MessageDigest

20
import nl.lumc.sasc.biopet.utils.Logging
Peter van 't Hof's avatar
Peter van 't Hof committed
21
import org.broadinstitute.gatk.utils.commandline.{ Input, Output }
22
import org.broadinstitute.gatk.utils.runtime.ProcessSettings
23
import org.ggf.drmaa.JobTemplate
24
25

import scala.collection.mutable
26
import scala.io.Source
27
import scala.sys.process.{ Process, ProcessLogger }
28
import scala.collection.JavaConversions._
29
30

/** Biopet command line trait to auto check executable and cluster values */
31
trait BiopetCommandLineFunction extends CommandLineResources { biopetFunction =>
Sander Bollen's avatar
Sander Bollen committed
32
  analysisName = configNamespace
33
34
35
36
37
38

  @Input(doc = "deps", required = false)
  var deps: List[File] = Nil

  var executable: String = _

39
  var mainFunction = false
40

41
42
43
44
  /** This is the default shell for drmaa jobs */
  def defaultRemoteCommand = "bash"
  private val remoteCommand: String = config("remote_command", default = defaultRemoteCommand)

Peter van 't Hof's avatar
Peter van 't Hof committed
45
  val preCommands: List[String] = config("pre_commands", default = Nil, freeVar = false)
Peter van 't Hof's avatar
Peter van 't Hof committed
46

47
48
49
50
51
  private def changeScript(file: File): Unit = {
    val lines = Source.fromFile(file).getLines().toList
    val writer = new PrintWriter(file)
    writer.println("set -eubf")
    writer.println("set -o pipefail")
Peter van 't Hof's avatar
Peter van 't Hof committed
52
    lines.foreach(writer.println)
Peter van 't Hof's avatar
Peter van 't Hof committed
53
    jobDelayTime.foreach(x => writer.println(s"sleep $x"))
54
55
56
    writer.close()
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
57
58
59
60
  /**
   *  This value is used to let you job wait a x number of second after it finish.
   *  This is ionly used when having storage delay issues
   */
Peter van 't Hof's avatar
Peter van 't Hof committed
61
62
  var jobDelayTime: Option[Int] = config("job_delay_time")

63
64
  // This overrides the default "sh" from queue. For Biopet the default is "bash"
  updateJobRun = {
Peter van 't Hof's avatar
Peter van 't Hof committed
65
    case jt: JobTemplate =>
66
67
      changeScript(new File(jt.getArgs.head.toString))
      jt.setRemoteCommand(remoteCommand)
Peter van 't Hof's avatar
Peter van 't Hof committed
68
    case ps: ProcessSettings =>
69
70
      changeScript(new File(ps.getCommand.tail.head))
      ps.setCommand(Array(remoteCommand) ++ ps.getCommand.tail)
71
72
  }

73
74
75
76
  /**
   * Can override this method. This is executed just before the job is ready to run.
   * Can check on run time files from pipeline here
   */
Peter van 't Hof's avatar
Peter van 't Hof committed
77
  def beforeCmd() {}
78
79

  /** Can override this method. This is executed after the script is done en queue starts to generate the graph */
80
  def beforeGraph() {}
81
82

  override def freezeFieldValues() {
83
84

    this match {
Peter van 't Hof's avatar
Peter van 't Hof committed
85
      case r: Reference =>
Peter van 't Hof's avatar
Peter van 't Hof committed
86
        if (r.dictRequired) deps :+= r.referenceDictFile
87
88
89
90
91
        if (r.faiRequired) deps :+= r.referenceFai
        deps = deps.distinct
      case _ =>
    }

92
93
    preProcessExecutable()
    beforeGraph()
Peter van 't Hof's avatar
Peter van 't Hof committed
94
95
96
97
98
99
    internalBeforeGraph()

    super.freezeFieldValues()
  }

  /** Set default output file, threads and vmem for current job */
Peter van 't Hof's avatar
Peter van 't Hof committed
100
  final def internalBeforeGraph(): Unit = {
101

Peter van 't Hof's avatar
Peter van 't Hof committed
102
103
    _pipesJobs.foreach(_.beforeGraph())
    _pipesJobs.foreach(_.internalBeforeGraph())
104

105
106
  }

107
108
  /**
   * Can override this value is executable may not be converted to CanonicalPath
Peter van 't Hof's avatar
Peter van 't Hof committed
109
   *
110
111
   * @deprecated
   */
112
113
114
115
116
117
  val executableToCanonicalPath = true

  /**
   * Checks executable. Follow full CanonicalPath, checks if it is existing and do a md5sum on it to store in job report
   */
  protected[core] def preProcessExecutable() {
118
    val exe = BiopetCommandLineFunction.preProcessExecutable(executable, preCommands)
Peter van 't Hof's avatar
Peter van 't Hof committed
119
    exe.path.foreach(executable = _)
120
    addJobReportBinding("md5sum_exe", exe.md5.getOrElse("N/A"))
121
122
123
124
125
126
127
128
129
130
131
132
133
  }

  /** executes checkExecutable method and fill job report */
  final protected def preCmdInternal() {
    preProcessExecutable()
    beforeCmd()

    addJobReportBinding("cores", nCoresRequest match {
      case Some(n) if n > 0 => n
      case _                => 1
    })
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
134
  private[core] var _inputAsStdin = false
Peter van 't Hof's avatar
Peter van 't Hof committed
135
  def inputAsStdin = _inputAsStdin
Peter van 't Hof's avatar
Peter van 't Hof committed
136
137
138
  private[core] var _outputAsStdout = false
  def outputAsStsout = _outputAsStdout

Peter van 't Hof's avatar
Peter van 't Hof committed
139
140
  /**
   * This operator sends stdout to `that` and combine this into 1 command line function
Peter van 't Hof's avatar
Peter van 't Hof committed
141
   *
Peter van 't Hof's avatar
Peter van 't Hof committed
142
143
144
   * @param that Function that will read from stdin
   * @return BiopetPipe function
   */
145
  def |(that: BiopetCommandLineFunction): BiopetCommandLineFunction = {
Peter van 't Hof's avatar
Peter van 't Hof committed
146
147
    this._outputAsStdout = true
    that._inputAsStdin = true
148
149
150
151
    this.beforeGraph()
    this.internalBeforeGraph()
    that.beforeGraph()
    that.internalBeforeGraph()
152
    this match {
Peter van 't Hof's avatar
Peter van 't Hof committed
153
      case p: BiopetPipe =>
154
155
        p.commands.last._outputAsStdout = true
        new BiopetPipe(p.commands ::: that :: Nil)
Peter van 't Hof's avatar
Peter van 't Hof committed
156
      case _ => new BiopetPipe(List(this, that))
Peter van 't Hof's avatar
Peter van 't Hof committed
157
158
159
    }
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
160
161
  /**
   * This operator can be used to give a program a file as stdin
Peter van 't Hof's avatar
Peter van 't Hof committed
162
   *
Peter van 't Hof's avatar
Peter van 't Hof committed
163
164
165
   * @param file File that will become stdin for this program
   * @return It's own class
   */
Peter van 't Hof's avatar
Peter van 't Hof committed
166
167
168
169
170
171
  def :<:(file: File): BiopetCommandLineFunction = {
    this._inputAsStdin = true
    this.stdinFile = Some(file)
    this
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
172
173
  /**
   * This operator can be used to give a program a file write it's atdout
Peter van 't Hof's avatar
Peter van 't Hof committed
174
   *
Peter van 't Hof's avatar
Peter van 't Hof committed
175
176
177
   * @param file File that will become stdout for this program
   * @return It's own class
   */
Peter van 't Hof's avatar
Peter van 't Hof committed
178
  def >(file: File): BiopetCommandLineFunction = {
Peter van 't Hof's avatar
Peter van 't Hof committed
179
180
181
182
183
    this._outputAsStdout = true
    this.stdoutFile = Some(file)
    this
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
184
185
186
187
188
189
190
191
192
  /**
   * This method can handle args that have multiple args for 1 arg name
   * @param argName Name of the arg like "-h" or "--help"
   * @param values Values for this arg
   * @param groupSize Values must come in groups of x number, default is 1
   * @param minGroups Minimal groups that are required, default is 0, when 0 the method return en empty string
   * @param maxGroups Max number of groups that can be filled here
   * @return Command part of this arg
   */
Peter van 't Hof's avatar
Peter van 't Hof committed
193
194
195
196
197
198
199
200
201
202
203
204
  def multiArg(argName: String, values: Iterable[Any], groupSize: Int = 1, minGroups: Int = 0, maxGroups: Int = 0): String = {
    if (values.size % groupSize != 0)
      Logging.addError(s"Arg '${argName}' values: '${values}' does not fit to a groupSize of ${groupSize}")
    val groups = values.size / groupSize
    if (groups < minGroups)
      Logging.addError(s"Args '${argName}' need atleast $minGroups with size $groupSize")
    if (maxGroups > 0 && groups > maxGroups)
      Logging.addError(s"Args '${argName}' may only have $maxGroups with size $groupSize")
    if (values.nonEmpty) required(argName) + values.map(required(_)).mkString
    else ""
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
205
206
207
208
209
  @Output(required = false)
  private[core] var stdoutFile: Option[File] = None

  @Input(required = false)
  private[core] var stdinFile: Option[File] = None
Peter van 't Hof's avatar
Peter van 't Hof committed
210

Peter van 't Hof's avatar
Peter van 't Hof committed
211
212
  /**
   * This function needs to be implemented to define the command that is executed
Peter van 't Hof's avatar
Peter van 't Hof committed
213
   *
Peter van 't Hof's avatar
Peter van 't Hof committed
214
215
   * @return Command to run
   */
216
  protected[core] def cmdLine: String
Peter van 't Hof's avatar
Peter van 't Hof committed
217
218
219
220

  /**
   * implementing a final version of the commandLine from org.broadinstitute.gatk.queue.function.CommandLineFunction
   * User needs to implement cmdLine instead
Peter van 't Hof's avatar
Peter van 't Hof committed
221
   *
Peter van 't Hof's avatar
Peter van 't Hof committed
222
223
   * @return Command to run
   */
224
  override final def commandLine: String = {
225
    preCmdInternal()
Peter van 't Hof's avatar
Peter van 't Hof committed
226
227
    val cmd = preCommands.mkString("\n", "\n", "\n") +
      cmdLine +
228
229
      stdinFile.map(file => " < " + required(file.getAbsoluteFile)).getOrElse("") +
      stdoutFile.map(file => " > " + required(file.getAbsoluteFile)).getOrElse("")
230
    cmd
231
  }
Peter van 't Hof's avatar
Peter van 't Hof committed
232

Peter van 't Hof's avatar
Peter van 't Hof committed
233
234
  private[core] var _pipesJobs: List[BiopetCommandLineFunction] = Nil
  def pipesJobs = _pipesJobs
Peter van 't Hof's avatar
Peter van 't Hof committed
235
  def addPipeJob(job: BiopetCommandLineFunction) {
Peter van 't Hof's avatar
Peter van 't Hof committed
236
237
    _pipesJobs :+= job
    _pipesJobs = _pipesJobs.distinct
Peter van 't Hof's avatar
Peter van 't Hof committed
238
  }
239
}
240
241

/** stores global caches */
242
object BiopetCommandLineFunction extends Logging {
243
244
  private[core] val executableMd5Cache: mutable.Map[String, String] = mutable.Map()
  private[core] val executableCache: mutable.Map[String, String] = mutable.Map()
245

Peter van 't Hof's avatar
Peter van 't Hof committed
246
  case class Executable(path: Option[String], md5: Option[String])
247
  def preProcessExecutable(executable: String, pre_commands: List[String] = Nil): Executable = {
248
249
250
251
252
    if (!BiopetCommandLineFunction.executableMd5Cache.contains(executable)) {
      if (executable != null) {
        if (!BiopetCommandLineFunction.executableCache.contains(executable)) {
          try {
            val buffer = new StringBuffer()
253
254
255
256
257
258
            val tempFile = File.createTempFile("which.", ".sh")
            val writer = new PrintWriter(tempFile)
            pre_commands.foreach(cmd => writer.println(cmd + " > /dev/null 2> /dev/null"))
            writer.println(s"which $executable")
            writer.close()
            val cmd = Seq("bash", tempFile.getAbsolutePath)
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
            val process = Process(cmd).run(ProcessLogger(buffer.append(_)))
            if (process.exitValue == 0) {
              val file = new File(buffer.toString)
              BiopetCommandLineFunction.executableCache += executable -> file.getAbsolutePath
            } else {
              Logging.addError("executable: '" + executable + "' not found, please check config")
              BiopetCommandLineFunction.executableCache += executable -> executable
            }
          } catch {
            case ioe: java.io.IOException =>
              logger.warn(s"Could not use 'which' on '$executable', check on executable skipped: " + ioe)
              BiopetCommandLineFunction.executableCache += executable -> executable
          }
        }

        if (!BiopetCommandLineFunction.executableMd5Cache.contains(executable)) {
          val newExe = BiopetCommandLineFunction.executableCache(executable)
          if (new File(newExe).exists()) {
Peter van 't Hof's avatar
Peter van 't Hof committed
277
            val is = new FileInputStream(newExe)
278
279
280
281
282
283
284
285
286
287
            val cnt = is.available
            val bytes = Array.ofDim[Byte](cnt)
            is.read(bytes)
            is.close()
            val temp = MessageDigest.getInstance("MD5").digest(bytes).map("%02X".format(_)).mkString.toLowerCase
            BiopetCommandLineFunction.executableMd5Cache += newExe -> temp
          } else BiopetCommandLineFunction.executableMd5Cache += newExe -> "file_does_not_exist"
        }
      }
    }
Peter van 't Hof's avatar
Peter van 't Hof committed
288
    Executable(BiopetCommandLineFunction.executableCache.get(executable),
289
290
      BiopetCommandLineFunction.executableMd5Cache.get(executable))
  }
291
}