BiopetCommandLineFunction.scala 8.38 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
 * Biopet is built on top of GATK Queue for building bioinformatic
 * pipelines. It is mainly intended to support LUMC SHARK cluster which is running
 * SGE. But other types of HPC that are supported by GATK Queue (such as PBS)
 * should also be able to execute Biopet tools and pipelines.
 *
 * Copyright 2014 Sequencing Analysis Support Core - Leiden University Medical Center
 *
 * Contact us at: sasc@lumc.nl
 *
 * A dual licensing mode is applied. The source code within this project that are
 * not part of GATK Queue is freely available for non-commercial use under an AGPL
 * license; For commercial users or users who do not want to follow the AGPL
 * license, please contact us to obtain a separate license.
 */
16
17
package nl.lumc.sasc.biopet.core

18
import java.io.{ PrintWriter, File, FileInputStream }
19
20
import java.security.MessageDigest

21
import nl.lumc.sasc.biopet.utils.Logging
22
import org.broadinstitute.gatk.utils.commandline.{ Output, Input }
23
import org.broadinstitute.gatk.utils.runtime.ProcessSettings
24
import org.ggf.drmaa.JobTemplate
25
26

import scala.collection.mutable
27
import scala.io.Source
28
29
import scala.sys.process.{ Process, ProcessLogger }
import scala.util.matching.Regex
30
import scala.collection.JavaConversions._
31
32

/** Biopet command line trait to auto check executable and cluster values */
33
trait BiopetCommandLineFunction extends CommandLineResources { biopetFunction =>
34
35
36
37
38
  analysisName = configName

  @Input(doc = "deps", required = false)
  var deps: List[File] = Nil

Peter van 't Hof's avatar
Peter van 't Hof committed
39
40
41
  @Output
  var outputFiles: List[File] = Nil

42
43
  var executable: String = _

44
45
46
47
  /** This is the default shell for drmaa jobs */
  def defaultRemoteCommand = "bash"
  private val remoteCommand: String = config("remote_command", default = defaultRemoteCommand)

48
49
50
51
52
53
54
55
56
  private def changeScript(file: File): Unit = {
    val lines = Source.fromFile(file).getLines().toList
    val writer = new PrintWriter(file)
    writer.println("set -eubf")
    writer.println("set -o pipefail")
    lines.foreach(writer.println(_))
    writer.close()
  }

57
58
  // This overrides the default "sh" from queue. For Biopet the default is "bash"
  updateJobRun = {
59
60
61
62
63
64
65
66
    case jt: JobTemplate => {
      changeScript(new File(jt.getArgs.head.toString))
      jt.setRemoteCommand(remoteCommand)
    }
    case ps: ProcessSettings => {
      changeScript(new File(ps.getCommand.tail.head))
      ps.setCommand(Array(remoteCommand) ++ ps.getCommand.tail)
    }
67
68
  }

69
70
71
72
  /**
   * Can override this method. This is executed just before the job is ready to run.
   * Can check on run time files from pipeline here
   */
Peter van 't Hof's avatar
Peter van 't Hof committed
73
  def beforeCmd() {}
74
75

  /** Can override this method. This is executed after the script is done en queue starts to generate the graph */
76
  def beforeGraph() {}
77
78
79
80

  override def freezeFieldValues() {
    preProcessExecutable()
    beforeGraph()
Peter van 't Hof's avatar
Peter van 't Hof committed
81
82
83
84
85
86
    internalBeforeGraph()

    super.freezeFieldValues()
  }

  /** Set default output file, threads and vmem for current job */
Peter van 't Hof's avatar
Peter van 't Hof committed
87
  final def internalBeforeGraph(): Unit = {
88

89
90
91
    pipesJobs.foreach(_.beforeGraph())
    pipesJobs.foreach(_.internalBeforeGraph())

92
93
  }

94
95
96
97
  /**
   * Can override this value is executable may not be converted to CanonicalPath
   * @deprecated
   */
98
99
100
101
102
103
  val executableToCanonicalPath = true

  /**
   * Checks executable. Follow full CanonicalPath, checks if it is existing and do a md5sum on it to store in job report
   */
  protected[core] def preProcessExecutable() {
104
105
106
    val exe = BiopetCommandLineFunction.preProcessExecutable(executable)
    executable = exe.path
    addJobReportBinding("md5sum_exe", exe.md5.getOrElse("N/A"))
107
108
109
110
111
112
113
114
115
116
117
118
119
  }

  /** executes checkExecutable method and fill job report */
  final protected def preCmdInternal() {
    preProcessExecutable()
    beforeCmd()

    addJobReportBinding("cores", nCoresRequest match {
      case Some(n) if n > 0 => n
      case _                => 1
    })
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
120
  private[core] var _inputAsStdin = false
Peter van 't Hof's avatar
Peter van 't Hof committed
121
  def inputAsStdin = _inputAsStdin
Peter van 't Hof's avatar
Peter van 't Hof committed
122
123
124
  private[core] var _outputAsStdout = false
  def outputAsStsout = _outputAsStdout

Peter van 't Hof's avatar
Peter van 't Hof committed
125
126
127
128
129
  /**
   * This operator sends stdout to `that` and combine this into 1 command line function
   * @param that Function that will read from stdin
   * @return BiopetPipe function
   */
130
  def |(that: BiopetCommandLineFunction): BiopetCommandLineFunction = {
Peter van 't Hof's avatar
Peter van 't Hof committed
131
132
    this._outputAsStdout = true
    that._inputAsStdin = true
133
134
135
136
    this.beforeGraph()
    this.internalBeforeGraph()
    that.beforeGraph()
    that.internalBeforeGraph()
137
138
139
140
    this match {
      case p: BiopetPipe => {
        p.commands.last._outputAsStdout = true
        new BiopetPipe(p.commands ::: that :: Nil)
Peter van 't Hof's avatar
Peter van 't Hof committed
141
      }
Peter van 't Hof's avatar
Peter van 't Hof committed
142
      case _ => new BiopetPipe(List(this, that))
Peter van 't Hof's avatar
Peter van 't Hof committed
143
144
145
    }
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
146
147
148
149
150
  /**
   * This operator can be used to give a program a file as stdin
   * @param file File that will become stdin for this program
   * @return It's own class
   */
Peter van 't Hof's avatar
Peter van 't Hof committed
151
152
153
154
155
156
  def :<:(file: File): BiopetCommandLineFunction = {
    this._inputAsStdin = true
    this.stdinFile = Some(file)
    this
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
157
158
159
160
161
  /**
   * This operator can be used to give a program a file write it's atdout
   * @param file File that will become stdout for this program
   * @return It's own class
   */
Peter van 't Hof's avatar
Peter van 't Hof committed
162
  def >(file: File): BiopetCommandLineFunction = {
Peter van 't Hof's avatar
Peter van 't Hof committed
163
164
165
166
167
    this._outputAsStdout = true
    this.stdoutFile = Some(file)
    this
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
168
169
170
171
172
  @Output(required = false)
  private[core] var stdoutFile: Option[File] = None

  @Input(required = false)
  private[core] var stdinFile: Option[File] = None
Peter van 't Hof's avatar
Peter van 't Hof committed
173

Peter van 't Hof's avatar
Peter van 't Hof committed
174
175
176
177
  /**
   * This function needs to be implemented to define the command that is executed
   * @return Command to run
   */
178
  protected[core] def cmdLine: String
Peter van 't Hof's avatar
Peter van 't Hof committed
179
180
181
182
183
184

  /**
   * implementing a final version of the commandLine from org.broadinstitute.gatk.queue.function.CommandLineFunction
   * User needs to implement cmdLine instead
   * @return Command to run
   */
185
  override final def commandLine: String = {
186
    preCmdInternal()
Peter van 't Hof's avatar
Peter van 't Hof committed
187
    val cmd = cmdLine +
188
189
      stdinFile.map(file => " < " + required(file.getAbsoluteFile)).getOrElse("") +
      stdoutFile.map(file => " > " + required(file.getAbsoluteFile)).getOrElse("")
190
    addJobReportBinding("command", cmd)
191
    cmd
192
  }
Peter van 't Hof's avatar
Peter van 't Hof committed
193

194
  private[core] var pipesJobs: List[BiopetCommandLineFunction] = Nil
Peter van 't Hof's avatar
Peter van 't Hof committed
195
196
197
198
  def addPipeJob(job: BiopetCommandLineFunction) {
    pipesJobs :+= job
    pipesJobs = pipesJobs.distinct
  }
199
}
200
201

/** stores global caches */
202
object BiopetCommandLineFunction extends Logging {
203
204
  private[core] val executableMd5Cache: mutable.Map[String, String] = mutable.Map()
  private[core] val executableCache: mutable.Map[String, String] = mutable.Map()
205

Peter van 't Hof's avatar
Peter van 't Hof committed
206
  case class Executable(path: String, md5: Option[String])
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
  def preProcessExecutable(executable: String): Executable = {
    if (!BiopetCommandLineFunction.executableMd5Cache.contains(executable)) {
      if (executable != null) {
        if (!BiopetCommandLineFunction.executableCache.contains(executable)) {
          try {
            val oldExecutable = executable
            val buffer = new StringBuffer()
            val cmd = Seq("which", executable)
            val process = Process(cmd).run(ProcessLogger(buffer.append(_)))
            if (process.exitValue == 0) {
              val file = new File(buffer.toString)
              BiopetCommandLineFunction.executableCache += executable -> file.getAbsolutePath
            } else {
              Logging.addError("executable: '" + executable + "' not found, please check config")
              BiopetCommandLineFunction.executableCache += executable -> executable
            }
          } catch {
            case ioe: java.io.IOException =>
              logger.warn(s"Could not use 'which' on '$executable', check on executable skipped: " + ioe)
              BiopetCommandLineFunction.executableCache += executable -> executable
          }
        }

        if (!BiopetCommandLineFunction.executableMd5Cache.contains(executable)) {
          val newExe = BiopetCommandLineFunction.executableCache(executable)
          if (new File(newExe).exists()) {
Peter van 't Hof's avatar
Peter van 't Hof committed
233
            val is = new FileInputStream(newExe)
234
235
236
237
238
239
240
241
242
243
244
245
246
            val cnt = is.available
            val bytes = Array.ofDim[Byte](cnt)
            is.read(bytes)
            is.close()
            val temp = MessageDigest.getInstance("MD5").digest(bytes).map("%02X".format(_)).mkString.toLowerCase
            BiopetCommandLineFunction.executableMd5Cache += newExe -> temp
          } else BiopetCommandLineFunction.executableMd5Cache += newExe -> "file_does_not_exist"
        }
      }
    }
    Executable(BiopetCommandLineFunction.executableCache(executable),
      BiopetCommandLineFunction.executableMd5Cache.get(executable))
  }
247
}