BiopetCommandLineFunction.scala 8.56 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
 * Biopet is built on top of GATK Queue for building bioinformatic
 * pipelines. It is mainly intended to support LUMC SHARK cluster which is running
 * SGE. But other types of HPC that are supported by GATK Queue (such as PBS)
 * should also be able to execute Biopet tools and pipelines.
 *
 * Copyright 2014 Sequencing Analysis Support Core - Leiden University Medical Center
 *
 * Contact us at: sasc@lumc.nl
 *
 * A dual licensing mode is applied. The source code within this project that are
 * not part of GATK Queue is freely available for non-commercial use under an AGPL
 * license; For commercial users or users who do not want to follow the AGPL
 * license, please contact us to obtain a separate license.
 */
16
17
package nl.lumc.sasc.biopet.core

18
import java.io.{ PrintWriter, File, FileInputStream }
19
20
import java.security.MessageDigest

21
import nl.lumc.sasc.biopet.utils.Logging
22
import org.broadinstitute.gatk.utils.commandline.{ Output, Input }
23
import org.broadinstitute.gatk.utils.runtime.ProcessSettings
24
import org.ggf.drmaa.JobTemplate
25
26

import scala.collection.mutable
27
import scala.io.Source
28
import scala.sys.process.{ Process, ProcessLogger }
29
import scala.collection.JavaConversions._
30
31

/** Biopet command line trait to auto check executable and cluster values */
32
trait BiopetCommandLineFunction extends CommandLineResources { biopetFunction =>
Sander Bollen's avatar
Sander Bollen committed
33
  analysisName = configNamespace
34
35
36
37

  @Input(doc = "deps", required = false)
  var deps: List[File] = Nil

Peter van 't Hof's avatar
Peter van 't Hof committed
38
39
40
  @Output
  var outputFiles: List[File] = Nil

41
42
  var executable: String = _

43
44
45
46
  /** This is the default shell for drmaa jobs */
  def defaultRemoteCommand = "bash"
  private val remoteCommand: String = config("remote_command", default = defaultRemoteCommand)

47
48
49
50
51
  private def changeScript(file: File): Unit = {
    val lines = Source.fromFile(file).getLines().toList
    val writer = new PrintWriter(file)
    writer.println("set -eubf")
    writer.println("set -o pipefail")
Peter van 't Hof's avatar
Peter van 't Hof committed
52
    lines.foreach(writer.println)
53
54
55
    writer.close()
  }

56
57
  // This overrides the default "sh" from queue. For Biopet the default is "bash"
  updateJobRun = {
Peter van 't Hof's avatar
Peter van 't Hof committed
58
    case jt: JobTemplate =>
59
60
      changeScript(new File(jt.getArgs.head.toString))
      jt.setRemoteCommand(remoteCommand)
Peter van 't Hof's avatar
Peter van 't Hof committed
61
    case ps: ProcessSettings =>
62
63
      changeScript(new File(ps.getCommand.tail.head))
      ps.setCommand(Array(remoteCommand) ++ ps.getCommand.tail)
64
65
  }

66
67
68
69
  /**
   * Can override this method. This is executed just before the job is ready to run.
   * Can check on run time files from pipeline here
   */
Peter van 't Hof's avatar
Peter van 't Hof committed
70
  def beforeCmd() {}
71
72

  /** Can override this method. This is executed after the script is done en queue starts to generate the graph */
73
  def beforeGraph() {}
74
75

  override def freezeFieldValues() {
76
77

    this match {
Peter van 't Hof's avatar
Peter van 't Hof committed
78
      case r: Reference =>
79
80
81
82
83
84
        if (r.dictRequired) deps :+= r.referenceDict
        if (r.faiRequired) deps :+= r.referenceFai
        deps = deps.distinct
      case _ =>
    }

85
86
    preProcessExecutable()
    beforeGraph()
Peter van 't Hof's avatar
Peter van 't Hof committed
87
88
89
90
91
92
    internalBeforeGraph()

    super.freezeFieldValues()
  }

  /** Set default output file, threads and vmem for current job */
Peter van 't Hof's avatar
Peter van 't Hof committed
93
  final def internalBeforeGraph(): Unit = {
94

Peter van 't Hof's avatar
Peter van 't Hof committed
95
96
    _pipesJobs.foreach(_.beforeGraph())
    _pipesJobs.foreach(_.internalBeforeGraph())
97

98
99
  }

100
101
  /**
   * Can override this value is executable may not be converted to CanonicalPath
Peter van 't Hof's avatar
Peter van 't Hof committed
102
   *
103
104
   * @deprecated
   */
105
106
107
108
109
110
  val executableToCanonicalPath = true

  /**
   * Checks executable. Follow full CanonicalPath, checks if it is existing and do a md5sum on it to store in job report
   */
  protected[core] def preProcessExecutable() {
111
    val exe = BiopetCommandLineFunction.preProcessExecutable(executable)
Peter van 't Hof's avatar
Peter van 't Hof committed
112
    exe.path.foreach(executable = _)
113
    addJobReportBinding("md5sum_exe", exe.md5.getOrElse("N/A"))
114
115
116
117
118
119
120
121
122
123
124
125
126
  }

  /** executes checkExecutable method and fill job report */
  final protected def preCmdInternal() {
    preProcessExecutable()
    beforeCmd()

    addJobReportBinding("cores", nCoresRequest match {
      case Some(n) if n > 0 => n
      case _                => 1
    })
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
127
  private[core] var _inputAsStdin = false
Peter van 't Hof's avatar
Peter van 't Hof committed
128
  def inputAsStdin = _inputAsStdin
Peter van 't Hof's avatar
Peter van 't Hof committed
129
130
131
  private[core] var _outputAsStdout = false
  def outputAsStsout = _outputAsStdout

Peter van 't Hof's avatar
Peter van 't Hof committed
132
133
  /**
   * This operator sends stdout to `that` and combine this into 1 command line function
Peter van 't Hof's avatar
Peter van 't Hof committed
134
   *
Peter van 't Hof's avatar
Peter van 't Hof committed
135
136
137
   * @param that Function that will read from stdin
   * @return BiopetPipe function
   */
138
  def |(that: BiopetCommandLineFunction): BiopetCommandLineFunction = {
Peter van 't Hof's avatar
Peter van 't Hof committed
139
140
    this._outputAsStdout = true
    that._inputAsStdin = true
141
142
143
144
    this.beforeGraph()
    this.internalBeforeGraph()
    that.beforeGraph()
    that.internalBeforeGraph()
145
    this match {
Peter van 't Hof's avatar
Peter van 't Hof committed
146
      case p: BiopetPipe =>
147
148
        p.commands.last._outputAsStdout = true
        new BiopetPipe(p.commands ::: that :: Nil)
Peter van 't Hof's avatar
Peter van 't Hof committed
149
      case _ => new BiopetPipe(List(this, that))
Peter van 't Hof's avatar
Peter van 't Hof committed
150
151
152
    }
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
153
154
  /**
   * This operator can be used to give a program a file as stdin
Peter van 't Hof's avatar
Peter van 't Hof committed
155
   *
Peter van 't Hof's avatar
Peter van 't Hof committed
156
157
158
   * @param file File that will become stdin for this program
   * @return It's own class
   */
Peter van 't Hof's avatar
Peter van 't Hof committed
159
160
161
162
163
164
  def :<:(file: File): BiopetCommandLineFunction = {
    this._inputAsStdin = true
    this.stdinFile = Some(file)
    this
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
165
166
  /**
   * This operator can be used to give a program a file write it's atdout
Peter van 't Hof's avatar
Peter van 't Hof committed
167
   *
Peter van 't Hof's avatar
Peter van 't Hof committed
168
169
170
   * @param file File that will become stdout for this program
   * @return It's own class
   */
Peter van 't Hof's avatar
Peter van 't Hof committed
171
  def >(file: File): BiopetCommandLineFunction = {
Peter van 't Hof's avatar
Peter van 't Hof committed
172
173
174
175
176
    this._outputAsStdout = true
    this.stdoutFile = Some(file)
    this
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
177
178
179
180
181
  @Output(required = false)
  private[core] var stdoutFile: Option[File] = None

  @Input(required = false)
  private[core] var stdinFile: Option[File] = None
Peter van 't Hof's avatar
Peter van 't Hof committed
182

Peter van 't Hof's avatar
Peter van 't Hof committed
183
184
  /**
   * This function needs to be implemented to define the command that is executed
Peter van 't Hof's avatar
Peter van 't Hof committed
185
   *
Peter van 't Hof's avatar
Peter van 't Hof committed
186
187
   * @return Command to run
   */
188
  protected[core] def cmdLine: String
Peter van 't Hof's avatar
Peter van 't Hof committed
189
190
191
192

  /**
   * implementing a final version of the commandLine from org.broadinstitute.gatk.queue.function.CommandLineFunction
   * User needs to implement cmdLine instead
Peter van 't Hof's avatar
Peter van 't Hof committed
193
   *
Peter van 't Hof's avatar
Peter van 't Hof committed
194
195
   * @return Command to run
   */
196
  override final def commandLine: String = {
197
    preCmdInternal()
Peter van 't Hof's avatar
Peter van 't Hof committed
198
    val cmd = cmdLine +
199
200
      stdinFile.map(file => " < " + required(file.getAbsoluteFile)).getOrElse("") +
      stdoutFile.map(file => " > " + required(file.getAbsoluteFile)).getOrElse("")
201
    addJobReportBinding("command", cmd)
202
    cmd
203
  }
Peter van 't Hof's avatar
Peter van 't Hof committed
204

Peter van 't Hof's avatar
Peter van 't Hof committed
205
206
  private[core] var _pipesJobs: List[BiopetCommandLineFunction] = Nil
  def pipesJobs = _pipesJobs
Peter van 't Hof's avatar
Peter van 't Hof committed
207
  def addPipeJob(job: BiopetCommandLineFunction) {
Peter van 't Hof's avatar
Peter van 't Hof committed
208
209
    _pipesJobs :+= job
    _pipesJobs = _pipesJobs.distinct
Peter van 't Hof's avatar
Peter van 't Hof committed
210
  }
211
}
212
213

/** stores global caches */
214
object BiopetCommandLineFunction extends Logging {
215
216
  private[core] val executableMd5Cache: mutable.Map[String, String] = mutable.Map()
  private[core] val executableCache: mutable.Map[String, String] = mutable.Map()
217

Peter van 't Hof's avatar
Peter van 't Hof committed
218
  case class Executable(path: Option[String], md5: Option[String])
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
  def preProcessExecutable(executable: String): Executable = {
    if (!BiopetCommandLineFunction.executableMd5Cache.contains(executable)) {
      if (executable != null) {
        if (!BiopetCommandLineFunction.executableCache.contains(executable)) {
          try {
            val buffer = new StringBuffer()
            val cmd = Seq("which", executable)
            val process = Process(cmd).run(ProcessLogger(buffer.append(_)))
            if (process.exitValue == 0) {
              val file = new File(buffer.toString)
              BiopetCommandLineFunction.executableCache += executable -> file.getAbsolutePath
            } else {
              Logging.addError("executable: '" + executable + "' not found, please check config")
              BiopetCommandLineFunction.executableCache += executable -> executable
            }
          } catch {
            case ioe: java.io.IOException =>
              logger.warn(s"Could not use 'which' on '$executable', check on executable skipped: " + ioe)
              BiopetCommandLineFunction.executableCache += executable -> executable
          }
        }

        if (!BiopetCommandLineFunction.executableMd5Cache.contains(executable)) {
          val newExe = BiopetCommandLineFunction.executableCache(executable)
          if (new File(newExe).exists()) {
Peter van 't Hof's avatar
Peter van 't Hof committed
244
            val is = new FileInputStream(newExe)
245
246
247
248
249
250
251
252
253
254
            val cnt = is.available
            val bytes = Array.ofDim[Byte](cnt)
            is.read(bytes)
            is.close()
            val temp = MessageDigest.getInstance("MD5").digest(bytes).map("%02X".format(_)).mkString.toLowerCase
            BiopetCommandLineFunction.executableMd5Cache += newExe -> temp
          } else BiopetCommandLineFunction.executableMd5Cache += newExe -> "file_does_not_exist"
        }
      }
    }
Peter van 't Hof's avatar
Peter van 't Hof committed
255
    Executable(BiopetCommandLineFunction.executableCache.get(executable),
256
257
      BiopetCommandLineFunction.executableMd5Cache.get(executable))
  }
258
}