BiopetCommandLineFunction.scala 10.6 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
/**
 * Biopet is built on top of GATK Queue for building bioinformatic
 * pipelines. It is mainly intended to support LUMC SHARK cluster which is running
 * SGE. But other types of HPC that are supported by GATK Queue (such as PBS)
 * should also be able to execute Biopet tools and pipelines.
 *
 * Copyright 2014 Sequencing Analysis Support Core - Leiden University Medical Center
 *
 * Contact us at: sasc@lumc.nl
 *
11
 * A dual licensing mode is applied. The source code within this project is freely available for non-commercial use under an AGPL
12
13
14
 * license; For commercial users or users who do not want to follow the AGPL
 * license, please contact us to obtain a separate license.
 */
15
16
package nl.lumc.sasc.biopet.core

17
import java.io.{ File, FileInputStream, PrintWriter }
18
19
import java.security.MessageDigest

20
import nl.lumc.sasc.biopet.utils.Logging
Peter van 't Hof's avatar
Peter van 't Hof committed
21
import org.broadinstitute.gatk.utils.commandline.{ Input, Output }
22
import org.broadinstitute.gatk.utils.runtime.ProcessSettings
23
import org.ggf.drmaa.JobTemplate
24
25

import scala.collection.mutable
26
import scala.io.Source
27
import scala.sys.process.{ Process, ProcessLogger }
28
import scala.collection.JavaConversions._
29
30

/** Biopet command line trait to auto check executable and cluster values */
31
trait BiopetCommandLineFunction extends CommandLineResources { biopetFunction =>
Sander Bollen's avatar
Sander Bollen committed
32
  analysisName = configNamespace
33
34
35
36
37
38

  @Input(doc = "deps", required = false)
  var deps: List[File] = Nil

  var executable: String = _

39
  var mainFunction = true
40

41
42
43
44
  /** This is the default shell for drmaa jobs */
  def defaultRemoteCommand = "bash"
  private val remoteCommand: String = config("remote_command", default = defaultRemoteCommand)

Peter van 't Hof's avatar
Peter van 't Hof committed
45
  val preCommands: List[String] = config("pre_commands", default = Nil, freeVar = false)
Peter van 't Hof's avatar
Peter van 't Hof committed
46

47
48
49
  private def changeScript(file: File): Unit = {
    val lines = Source.fromFile(file).getLines().toList
    val writer = new PrintWriter(file)
Peter van 't Hof's avatar
Peter van 't Hof committed
50
51
52
53
54
    remoteCommand match {
      case "bash" => writer.println("#!/bin/bash")
      case "sh"   => writer.println("#!/bin/sh")
      case _      => writer.println(s"#!$remoteCommand")
    }
55
56
    writer.println("set -eubf")
    writer.println("set -o pipefail")
Peter van 't Hof's avatar
Peter van 't Hof committed
57
    lines.foreach(writer.println)
Peter van 't Hof's avatar
Peter van 't Hof committed
58
    jobDelayTime.foreach(x => writer.println(s"sleep $x"))
59
60
61
    writer.close()
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
62
63
64
65
  /**
   *  This value is used to let you job wait a x number of second after it finish.
   *  This is ionly used when having storage delay issues
   */
Peter van 't Hof's avatar
Peter van 't Hof committed
66
67
  var jobDelayTime: Option[Int] = config("job_delay_time")

68
69
  // This overrides the default "sh" from queue. For Biopet the default is "bash"
  updateJobRun = {
Peter van 't Hof's avatar
Peter van 't Hof committed
70
    case jt: JobTemplate =>
71
72
      changeScript(new File(jt.getArgs.head.toString))
      jt.setRemoteCommand(remoteCommand)
Peter van 't Hof's avatar
Peter van 't Hof committed
73
    case ps: ProcessSettings =>
Peter van 't Hof's avatar
Peter van 't Hof committed
74
75
76
      changeScript(new File(ps.getCommand.last))
      if (ps.getCommand.head != "srun")
        ps.setCommand(Array(remoteCommand) ++ ps.getCommand.tail)
77
78
  }

79
80
81
82
  /**
   * Can override this method. This is executed just before the job is ready to run.
   * Can check on run time files from pipeline here
   */
Peter van 't Hof's avatar
Peter van 't Hof committed
83
  def beforeCmd() {}
84
85

  /** Can override this method. This is executed after the script is done en queue starts to generate the graph */
86
  def beforeGraph() {}
87
88

  override def freezeFieldValues() {
89
90

    this match {
Peter van 't Hof's avatar
Peter van 't Hof committed
91
      case r: Reference =>
Peter van 't Hof's avatar
Peter van 't Hof committed
92
        if (r.dictRequired) deps :+= r.referenceDictFile
93
94
95
96
97
        if (r.faiRequired) deps :+= r.referenceFai
        deps = deps.distinct
      case _ =>
    }

98
99
    preProcessExecutable()
    beforeGraph()
Peter van 't Hof's avatar
Peter van 't Hof committed
100
101
102
103
104
105
    internalBeforeGraph()

    super.freezeFieldValues()
  }

  /** Set default output file, threads and vmem for current job */
Peter van 't Hof's avatar
Peter van 't Hof committed
106
  final def internalBeforeGraph(): Unit = {
107

Peter van 't Hof's avatar
Peter van 't Hof committed
108
109
    _pipesJobs.foreach(_.beforeGraph())
    _pipesJobs.foreach(_.internalBeforeGraph())
110

111
112
  }

113
114
  /**
   * Can override this value is executable may not be converted to CanonicalPath
Peter van 't Hof's avatar
Peter van 't Hof committed
115
   *
116
117
   * @deprecated
   */
118
119
120
121
122
123
  val executableToCanonicalPath = true

  /**
   * Checks executable. Follow full CanonicalPath, checks if it is existing and do a md5sum on it to store in job report
   */
  protected[core] def preProcessExecutable() {
124
    val exe = BiopetCommandLineFunction.preProcessExecutable(executable, preCommands)
Peter van 't Hof's avatar
Peter van 't Hof committed
125
    exe.path.foreach(executable = _)
126
    addJobReportBinding("md5sum_exe", exe.md5.getOrElse("N/A"))
127
128
129
130
131
132
133
134
135
136
137
138
139
  }

  /** executes checkExecutable method and fill job report */
  final protected def preCmdInternal() {
    preProcessExecutable()
    beforeCmd()

    addJobReportBinding("cores", nCoresRequest match {
      case Some(n) if n > 0 => n
      case _                => 1
    })
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
140
  private[core] var _inputAsStdin = false
Peter van 't Hof's avatar
Peter van 't Hof committed
141
  def inputAsStdin = _inputAsStdin
Peter van 't Hof's avatar
Peter van 't Hof committed
142
143
144
  private[core] var _outputAsStdout = false
  def outputAsStsout = _outputAsStdout

Peter van 't Hof's avatar
Peter van 't Hof committed
145
146
  /**
   * This operator sends stdout to `that` and combine this into 1 command line function
Peter van 't Hof's avatar
Peter van 't Hof committed
147
   *
Peter van 't Hof's avatar
Peter van 't Hof committed
148
149
150
   * @param that Function that will read from stdin
   * @return BiopetPipe function
   */
151
  def |(that: BiopetCommandLineFunction): BiopetCommandLineFunction = {
Peter van 't Hof's avatar
Peter van 't Hof committed
152
153
    this._outputAsStdout = true
    that._inputAsStdin = true
154
155
156
157
    this.beforeGraph()
    this.internalBeforeGraph()
    that.beforeGraph()
    that.internalBeforeGraph()
158
    this match {
Peter van 't Hof's avatar
Peter van 't Hof committed
159
      case p: BiopetPipe =>
160
161
        p.commands.last._outputAsStdout = true
        new BiopetPipe(p.commands ::: that :: Nil)
Peter van 't Hof's avatar
Peter van 't Hof committed
162
      case _ => new BiopetPipe(List(this, that))
Peter van 't Hof's avatar
Peter van 't Hof committed
163
164
165
    }
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
166
167
  /**
   * This operator can be used to give a program a file as stdin
Peter van 't Hof's avatar
Peter van 't Hof committed
168
   *
Peter van 't Hof's avatar
Peter van 't Hof committed
169
170
171
   * @param file File that will become stdin for this program
   * @return It's own class
   */
Peter van 't Hof's avatar
Peter van 't Hof committed
172
173
174
175
176
177
  def :<:(file: File): BiopetCommandLineFunction = {
    this._inputAsStdin = true
    this.stdinFile = Some(file)
    this
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
178
179
  /**
   * This operator can be used to give a program a file write it's atdout
Peter van 't Hof's avatar
Peter van 't Hof committed
180
   *
Peter van 't Hof's avatar
Peter van 't Hof committed
181
182
183
   * @param file File that will become stdout for this program
   * @return It's own class
   */
Peter van 't Hof's avatar
Peter van 't Hof committed
184
  def >(file: File): BiopetCommandLineFunction = {
Peter van 't Hof's avatar
Peter van 't Hof committed
185
186
187
188
189
    this._outputAsStdout = true
    this.stdoutFile = Some(file)
    this
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
190
191
192
193
194
195
196
197
198
  /**
   * This method can handle args that have multiple args for 1 arg name
   * @param argName Name of the arg like "-h" or "--help"
   * @param values Values for this arg
   * @param groupSize Values must come in groups of x number, default is 1
   * @param minGroups Minimal groups that are required, default is 0, when 0 the method return en empty string
   * @param maxGroups Max number of groups that can be filled here
   * @return Command part of this arg
   */
Peter van 't Hof's avatar
Peter van 't Hof committed
199
200
201
202
203
204
205
206
207
208
209
210
  def multiArg(argName: String, values: Iterable[Any], groupSize: Int = 1, minGroups: Int = 0, maxGroups: Int = 0): String = {
    if (values.size % groupSize != 0)
      Logging.addError(s"Arg '${argName}' values: '${values}' does not fit to a groupSize of ${groupSize}")
    val groups = values.size / groupSize
    if (groups < minGroups)
      Logging.addError(s"Args '${argName}' need atleast $minGroups with size $groupSize")
    if (maxGroups > 0 && groups > maxGroups)
      Logging.addError(s"Args '${argName}' may only have $maxGroups with size $groupSize")
    if (values.nonEmpty) required(argName) + values.map(required(_)).mkString
    else ""
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
211
212
213
214
215
  @Output(required = false)
  private[core] var stdoutFile: Option[File] = None

  @Input(required = false)
  private[core] var stdinFile: Option[File] = None
Peter van 't Hof's avatar
Peter van 't Hof committed
216

Peter van 't Hof's avatar
Peter van 't Hof committed
217
218
  /**
   * This function needs to be implemented to define the command that is executed
Peter van 't Hof's avatar
Peter van 't Hof committed
219
   *
Peter van 't Hof's avatar
Peter van 't Hof committed
220
221
   * @return Command to run
   */
222
  protected[core] def cmdLine: String
Peter van 't Hof's avatar
Peter van 't Hof committed
223
224
225
226

  /**
   * implementing a final version of the commandLine from org.broadinstitute.gatk.queue.function.CommandLineFunction
   * User needs to implement cmdLine instead
Peter van 't Hof's avatar
Peter van 't Hof committed
227
   *
Peter van 't Hof's avatar
Peter van 't Hof committed
228
229
   * @return Command to run
   */
230
  override final def commandLine: String = {
231
    preCmdInternal()
Peter van 't Hof's avatar
Peter van 't Hof committed
232
233
    val cmd = preCommands.mkString("\n", "\n", "\n") +
      cmdLine +
234
235
      stdinFile.map(file => " < " + required(file.getAbsoluteFile)).getOrElse("") +
      stdoutFile.map(file => " > " + required(file.getAbsoluteFile)).getOrElse("")
236
    cmd
237
  }
Peter van 't Hof's avatar
Peter van 't Hof committed
238

Peter van 't Hof's avatar
Peter van 't Hof committed
239
240
  private[core] var _pipesJobs: List[BiopetCommandLineFunction] = Nil
  def pipesJobs = _pipesJobs
Peter van 't Hof's avatar
Peter van 't Hof committed
241
  def addPipeJob(job: BiopetCommandLineFunction) {
Peter van 't Hof's avatar
Peter van 't Hof committed
242
243
    _pipesJobs :+= job
    _pipesJobs = _pipesJobs.distinct
Peter van 't Hof's avatar
Peter van 't Hof committed
244
  }
245
}
246
247

/** stores global caches */
248
object BiopetCommandLineFunction extends Logging {
249
250
  private[core] val executableMd5Cache: mutable.Map[String, String] = mutable.Map()
  private[core] val executableCache: mutable.Map[String, String] = mutable.Map()
251

Peter van 't Hof's avatar
Peter van 't Hof committed
252
  case class Executable(path: Option[String], md5: Option[String])
253
  def preProcessExecutable(executable: String, pre_commands: List[String] = Nil): Executable = {
254
255
256
257
258
    if (!BiopetCommandLineFunction.executableMd5Cache.contains(executable)) {
      if (executable != null) {
        if (!BiopetCommandLineFunction.executableCache.contains(executable)) {
          try {
            val buffer = new StringBuffer()
259
            val tempFile = File.createTempFile("which.", ".sh")
Peter van 't Hof's avatar
Peter van 't Hof committed
260
            tempFile.deleteOnExit()
261
262
263
264
265
            val writer = new PrintWriter(tempFile)
            pre_commands.foreach(cmd => writer.println(cmd + " > /dev/null 2> /dev/null"))
            writer.println(s"which $executable")
            writer.close()
            val cmd = Seq("bash", tempFile.getAbsolutePath)
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
            val process = Process(cmd).run(ProcessLogger(buffer.append(_)))
            if (process.exitValue == 0) {
              val file = new File(buffer.toString)
              BiopetCommandLineFunction.executableCache += executable -> file.getAbsolutePath
            } else {
              Logging.addError("executable: '" + executable + "' not found, please check config")
              BiopetCommandLineFunction.executableCache += executable -> executable
            }
          } catch {
            case ioe: java.io.IOException =>
              logger.warn(s"Could not use 'which' on '$executable', check on executable skipped: " + ioe)
              BiopetCommandLineFunction.executableCache += executable -> executable
          }
        }

        if (!BiopetCommandLineFunction.executableMd5Cache.contains(executable)) {
          val newExe = BiopetCommandLineFunction.executableCache(executable)
          if (new File(newExe).exists()) {
Peter van 't Hof's avatar
Peter van 't Hof committed
284
            val is = new FileInputStream(newExe)
285
286
287
288
289
290
291
292
293
294
            val cnt = is.available
            val bytes = Array.ofDim[Byte](cnt)
            is.read(bytes)
            is.close()
            val temp = MessageDigest.getInstance("MD5").digest(bytes).map("%02X".format(_)).mkString.toLowerCase
            BiopetCommandLineFunction.executableMd5Cache += newExe -> temp
          } else BiopetCommandLineFunction.executableMd5Cache += newExe -> "file_does_not_exist"
        }
      }
    }
Peter van 't Hof's avatar
Peter van 't Hof committed
295
    Executable(BiopetCommandLineFunction.executableCache.get(executable),
296
297
      BiopetCommandLineFunction.executableMd5Cache.get(executable))
  }
298
}