BiopetCommandLineFunction.scala 9.9 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
 * Biopet is built on top of GATK Queue for building bioinformatic
 * pipelines. It is mainly intended to support LUMC SHARK cluster which is running
 * SGE. But other types of HPC that are supported by GATK Queue (such as PBS)
 * should also be able to execute Biopet tools and pipelines.
 *
 * Copyright 2014 Sequencing Analysis Support Core - Leiden University Medical Center
 *
 * Contact us at: sasc@lumc.nl
 *
 * A dual licensing mode is applied. The source code within this project that are
 * not part of GATK Queue is freely available for non-commercial use under an AGPL
 * license; For commercial users or users who do not want to follow the AGPL
 * license, please contact us to obtain a separate license.
 */
16
17
package nl.lumc.sasc.biopet.core

18
import java.io.{ File, FileInputStream, PrintWriter }
19
20
import java.security.MessageDigest

21
import nl.lumc.sasc.biopet.utils.Logging
22
import org.broadinstitute.gatk.utils.commandline.{ Gather, Input, Output }
23
import org.broadinstitute.gatk.utils.runtime.ProcessSettings
24
import org.ggf.drmaa.JobTemplate
25
26

import scala.collection.mutable
27
import scala.io.Source
28
import scala.sys.process.{ Process, ProcessLogger }
29
import scala.collection.JavaConversions._
30
31

/** Biopet command line trait to auto check executable and cluster values */
32
trait BiopetCommandLineFunction extends CommandLineResources { biopetFunction =>
Sander Bollen's avatar
Sander Bollen committed
33
  analysisName = configNamespace
34
35
36
37
38
39

  @Input(doc = "deps", required = false)
  var deps: List[File] = Nil

  var executable: String = _

40
41
42
43
  /** This is the default shell for drmaa jobs */
  def defaultRemoteCommand = "bash"
  private val remoteCommand: String = config("remote_command", default = defaultRemoteCommand)

44
45
46
47
48
  private def changeScript(file: File): Unit = {
    val lines = Source.fromFile(file).getLines().toList
    val writer = new PrintWriter(file)
    writer.println("set -eubf")
    writer.println("set -o pipefail")
Peter van 't Hof's avatar
Peter van 't Hof committed
49
    lines.foreach(writer.println)
Peter van 't Hof's avatar
Peter van 't Hof committed
50
    jobDelayTime.foreach(x => writer.println(s"sleep $x"))
51
52
53
    writer.close()
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
54
55
56
57
  /**
   *  This value is used to let you job wait a x number of second after it finish.
   *  This is ionly used when having storage delay issues
   */
Peter van 't Hof's avatar
Peter van 't Hof committed
58
59
  var jobDelayTime: Option[Int] = config("job_delay_time")

60
61
  // This overrides the default "sh" from queue. For Biopet the default is "bash"
  updateJobRun = {
Peter van 't Hof's avatar
Peter van 't Hof committed
62
    case jt: JobTemplate =>
63
64
      changeScript(new File(jt.getArgs.head.toString))
      jt.setRemoteCommand(remoteCommand)
Peter van 't Hof's avatar
Peter van 't Hof committed
65
    case ps: ProcessSettings =>
66
67
      changeScript(new File(ps.getCommand.tail.head))
      ps.setCommand(Array(remoteCommand) ++ ps.getCommand.tail)
68
69
  }

70
71
72
73
  /**
   * Can override this method. This is executed just before the job is ready to run.
   * Can check on run time files from pipeline here
   */
Peter van 't Hof's avatar
Peter van 't Hof committed
74
  def beforeCmd() {}
75
76

  /** Can override this method. This is executed after the script is done en queue starts to generate the graph */
77
  def beforeGraph() {}
78
79

  override def freezeFieldValues() {
80
81

    this match {
Peter van 't Hof's avatar
Peter van 't Hof committed
82
      case r: Reference =>
83
84
85
86
87
88
        if (r.dictRequired) deps :+= r.referenceDict
        if (r.faiRequired) deps :+= r.referenceFai
        deps = deps.distinct
      case _ =>
    }

89
90
    preProcessExecutable()
    beforeGraph()
Peter van 't Hof's avatar
Peter van 't Hof committed
91
92
93
94
95
96
    internalBeforeGraph()

    super.freezeFieldValues()
  }

  /** Set default output file, threads and vmem for current job */
Peter van 't Hof's avatar
Peter van 't Hof committed
97
  final def internalBeforeGraph(): Unit = {
98

Peter van 't Hof's avatar
Peter van 't Hof committed
99
100
    _pipesJobs.foreach(_.beforeGraph())
    _pipesJobs.foreach(_.internalBeforeGraph())
101

102
103
  }

104
105
  /**
   * Can override this value is executable may not be converted to CanonicalPath
Peter van 't Hof's avatar
Peter van 't Hof committed
106
   *
107
108
   * @deprecated
   */
109
110
111
112
113
114
  val executableToCanonicalPath = true

  /**
   * Checks executable. Follow full CanonicalPath, checks if it is existing and do a md5sum on it to store in job report
   */
  protected[core] def preProcessExecutable() {
115
    val exe = BiopetCommandLineFunction.preProcessExecutable(executable)
Peter van 't Hof's avatar
Peter van 't Hof committed
116
    exe.path.foreach(executable = _)
117
    addJobReportBinding("md5sum_exe", exe.md5.getOrElse("N/A"))
118
119
120
121
122
123
124
125
126
127
128
129
130
  }

  /** executes checkExecutable method and fill job report */
  final protected def preCmdInternal() {
    preProcessExecutable()
    beforeCmd()

    addJobReportBinding("cores", nCoresRequest match {
      case Some(n) if n > 0 => n
      case _                => 1
    })
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
131
  private[core] var _inputAsStdin = false
Peter van 't Hof's avatar
Peter van 't Hof committed
132
  def inputAsStdin = _inputAsStdin
Peter van 't Hof's avatar
Peter van 't Hof committed
133
134
135
  private[core] var _outputAsStdout = false
  def outputAsStsout = _outputAsStdout

Peter van 't Hof's avatar
Peter van 't Hof committed
136
137
  /**
   * This operator sends stdout to `that` and combine this into 1 command line function
Peter van 't Hof's avatar
Peter van 't Hof committed
138
   *
Peter van 't Hof's avatar
Peter van 't Hof committed
139
140
141
   * @param that Function that will read from stdin
   * @return BiopetPipe function
   */
142
  def |(that: BiopetCommandLineFunction): BiopetCommandLineFunction = {
Peter van 't Hof's avatar
Peter van 't Hof committed
143
144
    this._outputAsStdout = true
    that._inputAsStdin = true
145
146
147
148
    this.beforeGraph()
    this.internalBeforeGraph()
    that.beforeGraph()
    that.internalBeforeGraph()
149
    this match {
Peter van 't Hof's avatar
Peter van 't Hof committed
150
      case p: BiopetPipe =>
151
152
        p.commands.last._outputAsStdout = true
        new BiopetPipe(p.commands ::: that :: Nil)
Peter van 't Hof's avatar
Peter van 't Hof committed
153
      case _ => new BiopetPipe(List(this, that))
Peter van 't Hof's avatar
Peter van 't Hof committed
154
155
156
    }
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
157
158
  /**
   * This operator can be used to give a program a file as stdin
Peter van 't Hof's avatar
Peter van 't Hof committed
159
   *
Peter van 't Hof's avatar
Peter van 't Hof committed
160
161
162
   * @param file File that will become stdin for this program
   * @return It's own class
   */
Peter van 't Hof's avatar
Peter van 't Hof committed
163
164
165
166
167
168
  def :<:(file: File): BiopetCommandLineFunction = {
    this._inputAsStdin = true
    this.stdinFile = Some(file)
    this
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
169
170
  /**
   * This operator can be used to give a program a file write it's atdout
Peter van 't Hof's avatar
Peter van 't Hof committed
171
   *
Peter van 't Hof's avatar
Peter van 't Hof committed
172
173
174
   * @param file File that will become stdout for this program
   * @return It's own class
   */
Peter van 't Hof's avatar
Peter van 't Hof committed
175
  def >(file: File): BiopetCommandLineFunction = {
Peter van 't Hof's avatar
Peter van 't Hof committed
176
177
178
179
180
    this._outputAsStdout = true
    this.stdoutFile = Some(file)
    this
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
181
182
183
184
185
186
187
188
189
  /**
   * This method can handle args that have multiple args for 1 arg name
   * @param argName Name of the arg like "-h" or "--help"
   * @param values Values for this arg
   * @param groupSize Values must come in groups of x number, default is 1
   * @param minGroups Minimal groups that are required, default is 0, when 0 the method return en empty string
   * @param maxGroups Max number of groups that can be filled here
   * @return Command part of this arg
   */
Peter van 't Hof's avatar
Peter van 't Hof committed
190
191
192
193
194
195
196
197
198
199
200
201
  def multiArg(argName: String, values: Iterable[Any], groupSize: Int = 1, minGroups: Int = 0, maxGroups: Int = 0): String = {
    if (values.size % groupSize != 0)
      Logging.addError(s"Arg '${argName}' values: '${values}' does not fit to a groupSize of ${groupSize}")
    val groups = values.size / groupSize
    if (groups < minGroups)
      Logging.addError(s"Args '${argName}' need atleast $minGroups with size $groupSize")
    if (maxGroups > 0 && groups > maxGroups)
      Logging.addError(s"Args '${argName}' may only have $maxGroups with size $groupSize")
    if (values.nonEmpty) required(argName) + values.map(required(_)).mkString
    else ""
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
202
203
204
205
206
  @Output(required = false)
  private[core] var stdoutFile: Option[File] = None

  @Input(required = false)
  private[core] var stdinFile: Option[File] = None
Peter van 't Hof's avatar
Peter van 't Hof committed
207

Peter van 't Hof's avatar
Peter van 't Hof committed
208
209
  /**
   * This function needs to be implemented to define the command that is executed
Peter van 't Hof's avatar
Peter van 't Hof committed
210
   *
Peter van 't Hof's avatar
Peter van 't Hof committed
211
212
   * @return Command to run
   */
213
  protected[core] def cmdLine: String
Peter van 't Hof's avatar
Peter van 't Hof committed
214
215
216
217

  /**
   * implementing a final version of the commandLine from org.broadinstitute.gatk.queue.function.CommandLineFunction
   * User needs to implement cmdLine instead
Peter van 't Hof's avatar
Peter van 't Hof committed
218
   *
Peter van 't Hof's avatar
Peter van 't Hof committed
219
220
   * @return Command to run
   */
221
  override final def commandLine: String = {
222
    preCmdInternal()
Peter van 't Hof's avatar
Peter van 't Hof committed
223
    val cmd = cmdLine +
224
225
      stdinFile.map(file => " < " + required(file.getAbsoluteFile)).getOrElse("") +
      stdoutFile.map(file => " > " + required(file.getAbsoluteFile)).getOrElse("")
226
    addJobReportBinding("command", cmd)
227
    cmd
228
  }
Peter van 't Hof's avatar
Peter van 't Hof committed
229

Peter van 't Hof's avatar
Peter van 't Hof committed
230
231
  private[core] var _pipesJobs: List[BiopetCommandLineFunction] = Nil
  def pipesJobs = _pipesJobs
Peter van 't Hof's avatar
Peter van 't Hof committed
232
  def addPipeJob(job: BiopetCommandLineFunction) {
Peter van 't Hof's avatar
Peter van 't Hof committed
233
234
    _pipesJobs :+= job
    _pipesJobs = _pipesJobs.distinct
Peter van 't Hof's avatar
Peter van 't Hof committed
235
  }
236
}
237
238

/** stores global caches */
239
object BiopetCommandLineFunction extends Logging {
240
241
  private[core] val executableMd5Cache: mutable.Map[String, String] = mutable.Map()
  private[core] val executableCache: mutable.Map[String, String] = mutable.Map()
242

Peter van 't Hof's avatar
Peter van 't Hof committed
243
  case class Executable(path: Option[String], md5: Option[String])
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
  def preProcessExecutable(executable: String): Executable = {
    if (!BiopetCommandLineFunction.executableMd5Cache.contains(executable)) {
      if (executable != null) {
        if (!BiopetCommandLineFunction.executableCache.contains(executable)) {
          try {
            val buffer = new StringBuffer()
            val cmd = Seq("which", executable)
            val process = Process(cmd).run(ProcessLogger(buffer.append(_)))
            if (process.exitValue == 0) {
              val file = new File(buffer.toString)
              BiopetCommandLineFunction.executableCache += executable -> file.getAbsolutePath
            } else {
              Logging.addError("executable: '" + executable + "' not found, please check config")
              BiopetCommandLineFunction.executableCache += executable -> executable
            }
          } catch {
            case ioe: java.io.IOException =>
              logger.warn(s"Could not use 'which' on '$executable', check on executable skipped: " + ioe)
              BiopetCommandLineFunction.executableCache += executable -> executable
          }
        }

        if (!BiopetCommandLineFunction.executableMd5Cache.contains(executable)) {
          val newExe = BiopetCommandLineFunction.executableCache(executable)
          if (new File(newExe).exists()) {
Peter van 't Hof's avatar
Peter van 't Hof committed
269
            val is = new FileInputStream(newExe)
270
271
272
273
274
275
276
277
278
279
            val cnt = is.available
            val bytes = Array.ofDim[Byte](cnt)
            is.read(bytes)
            is.close()
            val temp = MessageDigest.getInstance("MD5").digest(bytes).map("%02X".format(_)).mkString.toLowerCase
            BiopetCommandLineFunction.executableMd5Cache += newExe -> temp
          } else BiopetCommandLineFunction.executableMd5Cache += newExe -> "file_does_not_exist"
        }
      }
    }
Peter van 't Hof's avatar
Peter van 't Hof committed
280
    Executable(BiopetCommandLineFunction.executableCache.get(executable),
281
282
      BiopetCommandLineFunction.executableMd5Cache.get(executable))
  }
283
}