From e031256a9bfca4c2da87cd6ccda3eb7295cb310b Mon Sep 17 00:00:00 2001 From: Peter van 't Hof <p.j.van_t_hof@lumc.nl> Date: Thu, 12 Jun 2014 21:09:56 +0200 Subject: [PATCH] Skipped compiling step at init phase of pipeline --- .../sasc/biopet/core/BiopetQCommandLine.scala | 298 ++++++++++++++++++ .../sasc/biopet/core/PipelineCommand.scala | 22 +- .../pipelines/flexiprep/Flexiprep.scala | 2 +- .../sasc/biopet/pipelines/gatk/Gatk.scala | 2 +- .../biopet/pipelines/mapping/Mapping.scala | 2 +- 5 files changed, 313 insertions(+), 13 deletions(-) create mode 100644 biopet-framework/src/main/java/nl/lumc/sasc/biopet/core/BiopetQCommandLine.scala diff --git a/biopet-framework/src/main/java/nl/lumc/sasc/biopet/core/BiopetQCommandLine.scala b/biopet-framework/src/main/java/nl/lumc/sasc/biopet/core/BiopetQCommandLine.scala new file mode 100644 index 000000000..00362623a --- /dev/null +++ b/biopet-framework/src/main/java/nl/lumc/sasc/biopet/core/BiopetQCommandLine.scala @@ -0,0 +1,298 @@ + /* +* Copyright (c) 2012 The Broad Institute +* +* Permission is hereby granted, free of charge, to any person +* obtaining a copy of this software and associated documentation +* files (the "Software"), to deal in the Software without +* restriction, including without limitation the rights to use, +* copy, modify, merge, publish, distribute, sublicense, and/or sell +* copies of the Software, and to permit persons to whom the +* Software is furnished to do so, subject to the following +* conditions: +* +* The above copyright notice and this permission notice shall be +* included in all copies or substantial portions of the Software. +* +* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +* OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +* HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR +* THE USE OR OTHER DEALINGS IN THE SOFTWARE. +*/ + +package nl.lumc.sasc.biopet.core + +import java.io.File +import org.broadinstitute.sting.commandline._ +import org.broadinstitute.sting.queue.util._ +import org.broadinstitute.sting.queue.QCommandPlugin +import org.broadinstitute.sting.queue.QScript +import org.broadinstitute.sting.queue.QScriptManager +import org.broadinstitute.sting.queue.engine.{QStatusMessenger, QGraphSettings, QGraph} +import collection.JavaConversions._ +import org.broadinstitute.sting.utils.classloader.PluginManager +import org.broadinstitute.sting.utils.exceptions.UserException +import org.broadinstitute.sting.utils.io.IOUtils +import org.broadinstitute.sting.utils.help.ApplicationDetails +import java.net.URL +import java.util.{ResourceBundle, Arrays} +import org.broadinstitute.sting.utils.text.TextFormattingUtils +import org.apache.commons.io.FilenameUtils + +/** + * Entry point of Queue. Compiles and runs QScripts passed in to the command line. + */ +object BiopetQCommandLine extends Logging { + /** + * Main. + * @param argv Arguments. + */ + def main(argv: Array[String]) { + val qCommandLine = new BiopetQCommandLine + + val shutdownHook = new Thread { + override def run() { + logger.info("Shutting down jobs. Please wait...") + qCommandLine.shutdown() + } + } + + Runtime.getRuntime.addShutdownHook(shutdownHook) + + try { + CommandLineProgram.start(qCommandLine, argv) + try { + Runtime.getRuntime.removeShutdownHook(shutdownHook) + qCommandLine.shutdown() + } catch { + case e: Exception => /* ignore, example 'java.lang.IllegalStateException: Shutdown in progress' */ + } + if (CommandLineProgram.result != 0) + System.exit(CommandLineProgram.result) + } catch { + case e: Exception => CommandLineProgram.exitSystemWithError(e) + } + } +} + +/** + * Entry point of Queue. Compiles and runs QScripts passed in to the command line. + */ +class BiopetQCommandLine extends CommandLineProgram with Logging { + @Input(fullName="script", shortName="S", doc="QScript scala file", required=false) + @ClassType(classOf[File]) + var scripts: Seq[File] = Nil + + @ArgumentCollection + val settings = new QGraphSettings + + private val qScriptManager = new QScriptManager + private val qGraph = new QGraph + private var qScriptClasses: File = _ + private var shuttingDown = false + + private lazy val qScriptPluginManager = { + //qScriptClasses = IOUtils.tempDir("Q-Classes-", "", settings.qSettings.tempDirectory) + //qScriptManager.loadScripts(scripts, qScriptClasses) + var temp: Seq[URL] = Seq() + for (t <- scripts) temp :+= this.getClass.getResource(t.toString) + new PluginManager[QScript](qPluginType, temp) + } + + private lazy val qCommandPlugin = { + new PluginManager[QCommandPlugin](classOf[QCommandPlugin]) + } + + private lazy val allCommandPlugins = qCommandPlugin.createAllTypes() + + private lazy val qPluginType: Class[_ <: QScript] = { + allCommandPlugins.map(_.qScriptClass).headOption.getOrElse(classOf[QScript]) + } + + /** + * Takes the QScripts passed in, runs their script() methods, retrieves their generated + * functions, and then builds and runs a QGraph based on the dependencies. + */ + def execute = { + var success = false + var result = 1 + var functionsAndStatusSize = 0 + try { + ClassFieldCache.parsingEngine = this.parser + + if (settings.qSettings.runName == null) + settings.qSettings.runName = "runname" + if (IOUtils.isDefaultTempDir(settings.qSettings.tempDirectory)) + settings.qSettings.tempDirectory = IOUtils.absolute(settings.qSettings.runDirectory, ".queue/tmp") + qGraph.initializeWithSettings(settings) + + for (commandPlugin <- allCommandPlugins) { + loadArgumentsIntoObject(commandPlugin) + } + + for (commandPlugin <- allCommandPlugins) { + if (commandPlugin.statusMessenger != null) + commandPlugin.statusMessenger.started() + } + + qGraph.messengers = allCommandPlugins.filter(_.statusMessenger != null).map(_.statusMessenger).toSeq + + // TODO: Default command plugin argument? + val remoteFileConverter = ( + for (commandPlugin <- allCommandPlugins if (commandPlugin.remoteFileConverter != null)) + yield commandPlugin.remoteFileConverter + ).headOption.getOrElse(null) + + if (remoteFileConverter != null) + loadArgumentsIntoObject(remoteFileConverter) + + val allQScripts = qScriptPluginManager.createAllTypes() + for (script <- allQScripts) { + logger.info("Scripting " + qScriptPluginManager.getName(script.getClass.asSubclass(classOf[QScript]))) + loadArgumentsIntoObject(script) + allCommandPlugins.foreach(_.initScript(script)) + // TODO: Pulling inputs can be time/io expensive! Some scripts are using the files to generate functions-- even for dry runs-- so pull it all down for now. + //if (settings.run) + script.pullInputs() + script.qSettings = settings.qSettings + try { + script.script() + } catch { + case e: Exception => + throw new UserException.CannotExecuteQScript(script.getClass.getSimpleName + ".script() threw the following exception: " + e, e) + } + + if (remoteFileConverter != null) { + if (remoteFileConverter.convertToRemoteEnabled) + script.mkRemoteOutputs(remoteFileConverter) + } + + script.functions.foreach(qGraph.add(_)) + logger.info("Added " + script.functions.size + " functions") + } + // Execute the job graph + qGraph.run() + + val functionsAndStatus = qGraph.getFunctionsAndStatus + + // walk over each script, calling onExecutionDone + for (script <- allQScripts) { + val scriptFunctions = functionsAndStatus.filterKeys(f => script.functions.contains(f)) + script.onExecutionDone(scriptFunctions, success) + } + functionsAndStatusSize = functionsAndStatus.size + + // write the final complete job report + logger.info("Writing final jobs report...") + qGraph.writeJobsReport() + + if (qGraph.success) { + if (settings.run) { + allQScripts.foreach(_.pushOutputs()) + for (commandPlugin <- allCommandPlugins) + if (commandPlugin.statusMessenger != null) { + val allInputs = allQScripts.map(_.remoteInputs) + val allOutputs = allQScripts.map(_.remoteOutputs) + commandPlugin.statusMessenger.done(allInputs, allOutputs) + } + } + success = true + result = 0 + } + } finally { + if (!success) { + logger.info("Done with errors") + qGraph.logFailed() + if (settings.run) { + for (commandPlugin <- allCommandPlugins) + if (commandPlugin.statusMessenger != null) + commandPlugin.statusMessenger.exit("Done with errors: %s".format(qGraph.formattedStatusCounts)) + } + } + } + logger.info("Script %s with %d total jobs".format(if (success) "completed successfully" else "failed", functionsAndStatusSize)) + result + } + + /** + * Returns true as QScripts are located and compiled. + * @return true + */ + override def canAddArgumentsDynamically = true + + /** + * Returns the list of QScripts passed in via -S and other plugins + * so that their arguments can be inspected before QScript.script is called. + * @return Array of dynamic sources + */ + override def getArgumentSources = { + var plugins = Seq.empty[Class[_]] + plugins ++= qScriptPluginManager.getPlugins + plugins ++= qCommandPlugin.getPlugins + plugins.toArray + } + + /** + * Returns the name of a script/plugin + * @return The name of a script/plugin + */ + override def getArgumentSourceName(source: Class[_]) = { + if (classOf[QScript].isAssignableFrom(source)) + qScriptPluginManager.getName(source.asSubclass(classOf[QScript])) + else if (classOf[QCommandPlugin].isAssignableFrom(source)) + qCommandPlugin.getName(source.asSubclass(classOf[QCommandPlugin])) + else + null + } + + /** + * Returns a ScalaCompoundArgumentTypeDescriptor that can parse argument sources into scala collections. + * @return a ScalaCompoundArgumentTypeDescriptor + */ + override def getArgumentTypeDescriptors = + Arrays.asList(new ScalaCompoundArgumentTypeDescriptor) + + override def getApplicationDetails : ApplicationDetails = { + new ApplicationDetails(createQueueHeader(), + Seq.empty[String], + ApplicationDetails.createDefaultRunningInstructions(getClass.asInstanceOf[Class[CommandLineProgram]]), + "") + } + + private def createQueueHeader() : Seq[String] = { + Seq(String.format("Queue v%s, Compiled %s", getQueueVersion, getBuildTimestamp), + "Copyright (c) 2012 The Broad Institute", + "For support and documentation go to http://www.broadinstitute.org/gatk") + } + + private def getQueueVersion : String = { + val stingResources : ResourceBundle = TextFormattingUtils.loadResourceBundle("StingText") + + if ( stingResources.containsKey("org.broadinstitute.sting.queue.QueueVersion.version") ) { + stingResources.getString("org.broadinstitute.sting.queue.QueueVersion.version") + } + else { + "<unknown>" + } + } + + private def getBuildTimestamp : String = { + val stingResources : ResourceBundle = TextFormattingUtils.loadResourceBundle("StingText") + + if ( stingResources.containsKey("build.timestamp") ) { + stingResources.getString("build.timestamp") + } + else { + "<unknown>" + } + } + + def shutdown() = { + shuttingDown = true + qGraph.shutdown() + if (qScriptClasses != null) IOUtils.tryDelete(qScriptClasses) + } +} diff --git a/biopet-framework/src/main/java/nl/lumc/sasc/biopet/core/PipelineCommand.scala b/biopet-framework/src/main/java/nl/lumc/sasc/biopet/core/PipelineCommand.scala index ba76acc2a..b2cc5050f 100644 --- a/biopet-framework/src/main/java/nl/lumc/sasc/biopet/core/PipelineCommand.scala +++ b/biopet-framework/src/main/java/nl/lumc/sasc/biopet/core/PipelineCommand.scala @@ -7,23 +7,25 @@ package nl.lumc.sasc.biopet.core import java.io.FileOutputStream -import org.broadinstitute.sting.queue.QCommandLine +//import org.broadinstitute.sting.queue.QCommandLine import org.broadinstitute.sting.queue.util.Logging trait PipelineCommand extends Logging { - val src = "" - val extension = ".scala" +// val src = "" +// val extension = ".scala" + val pipeline = "" def main(args: Array[String]): Unit = { - val tempFile = java.io.File.createTempFile(src + ".", extension) - val is = getClass.getResourceAsStream(src + extension) - val os = new FileOutputStream(tempFile) - org.apache.commons.io.IOUtils.copy(is, os) - os.close() +// val tempFile = java.io.File.createTempFile(src + ".", extension) +// val is = getClass.getResourceAsStream(src + extension) +// val os = new FileOutputStream(tempFile) +// org.apache.commons.io.IOUtils.copy(is, os) +// os.close() var argv: Array[String] = Array() - argv ++= Array("-S", tempFile.getAbsolutePath) + //argv ++= Array("-S", tempFile.getAbsolutePath) + argv ++= Array("-S", pipeline) argv ++= args - return QCommandLine.main(argv) + return BiopetQCommandLine.main(argv) } } \ No newline at end of file diff --git a/flexiprep/src/main/java/nl/lumc/sasc/biopet/pipelines/flexiprep/Flexiprep.scala b/flexiprep/src/main/java/nl/lumc/sasc/biopet/pipelines/flexiprep/Flexiprep.scala index af4476e95..c46e4f14b 100644 --- a/flexiprep/src/main/java/nl/lumc/sasc/biopet/pipelines/flexiprep/Flexiprep.scala +++ b/flexiprep/src/main/java/nl/lumc/sasc/biopet/pipelines/flexiprep/Flexiprep.scala @@ -271,5 +271,5 @@ class Flexiprep(private var globalConfig: Config) extends QScript with BiopetQSc } object Flexiprep extends PipelineCommand { - override val src = "Flexiprep" + override val pipeline = "/nl/lumc/sasc/biopet/pipelines/flexiprep/" } diff --git a/gatk/src/main/java/nl/lumc/sasc/biopet/pipelines/gatk/Gatk.scala b/gatk/src/main/java/nl/lumc/sasc/biopet/pipelines/gatk/Gatk.scala index 400ed4ad2..a0402a2b2 100644 --- a/gatk/src/main/java/nl/lumc/sasc/biopet/pipelines/gatk/Gatk.scala +++ b/gatk/src/main/java/nl/lumc/sasc/biopet/pipelines/gatk/Gatk.scala @@ -310,5 +310,5 @@ class Gatk(private var globalConfig: Config) extends QScript with BiopetQScript } object Gatk extends PipelineCommand { - override val src = "Gatk" + override val pipeline = "/nl/lumc/sasc/biopet/pipelines/gatk/" } diff --git a/mapping/src/main/java/nl/lumc/sasc/biopet/pipelines/mapping/Mapping.scala b/mapping/src/main/java/nl/lumc/sasc/biopet/pipelines/mapping/Mapping.scala index db9681488..bd774d431 100644 --- a/mapping/src/main/java/nl/lumc/sasc/biopet/pipelines/mapping/Mapping.scala +++ b/mapping/src/main/java/nl/lumc/sasc/biopet/pipelines/mapping/Mapping.scala @@ -192,5 +192,5 @@ class Mapping(private var globalConfig: Config) extends QScript with BiopetQScri } object Mapping extends PipelineCommand { - override val src = "Mapping" + override val pipeline = "/nl/lumc/sasc/biopet/pipelines/mapping/" } -- GitLab