Commit 5da183c5 authored by Sander Bollen's avatar Sander Bollen
Browse files

Merge branch 'fix-depedencies' into 'develop'

Fix depedencies

This will create a json file what will store job and file dependencies at the start of the pipeline.

See merge request !269
parents dacb3b22 2a64a1d6
...@@ -61,7 +61,7 @@ class BamMetricsTest extends TestNGSuite with Matchers { ...@@ -61,7 +61,7 @@ class BamMetricsTest extends TestNGSuite with Matchers {
} }
@Test(dataProvider = "bammetricsOptions") @Test(dataProvider = "bammetricsOptions")
def testFlexiprep(rois: Int, amplicon: Boolean, rna: Boolean) = { def testBamMetrics(rois: Int, amplicon: Boolean, rna: Boolean) = {
val map = ConfigUtils.mergeMaps(Map("output_dir" -> BamMetricsTest.outputDir), val map = ConfigUtils.mergeMaps(Map("output_dir" -> BamMetricsTest.outputDir),
Map(BamMetricsTest.executables.toSeq: _*)) ++ Map(BamMetricsTest.executables.toSeq: _*)) ++
(if (amplicon) Map("amplicon_bed" -> "amplicon.bed") else Map()) ++ (if (amplicon) Map("amplicon_bed" -> "amplicon.bed") else Map()) ++
......
...@@ -26,8 +26,6 @@ import org.broadinstitute.gatk.queue.function.scattergather.ScatterGatherableFun ...@@ -26,8 +26,6 @@ import org.broadinstitute.gatk.queue.function.scattergather.ScatterGatherableFun
import org.broadinstitute.gatk.queue.util.{ Logging => GatkLogging } import org.broadinstitute.gatk.queue.util.{ Logging => GatkLogging }
import org.broadinstitute.gatk.utils.commandline.Argument import org.broadinstitute.gatk.utils.commandline.Argument
import scala.collection.mutable.ListBuffer
/** Base for biopet pipeline */ /** Base for biopet pipeline */
trait BiopetQScript extends Configurable with GatkLogging { trait BiopetQScript extends Configurable with GatkLogging {
...@@ -78,6 +76,13 @@ trait BiopetQScript extends Configurable with GatkLogging { ...@@ -78,6 +76,13 @@ trait BiopetQScript extends Configurable with GatkLogging {
case f: ScatterGatherableFunction => f.scatterCount = 1 case f: ScatterGatherableFunction => f.scatterCount = 1
case _ => case _ =>
} }
this match {
case q: MultiSampleQScript if q.onlySamples.nonEmpty && !q.samples.forall(x => q.onlySamples.contains(x._1)) =>
logger.info("Write report is skipped because sample flag is used")
case _ => reportClass.foreach(add(_))
}
for (function <- functions) function match { for (function <- functions) function match {
case f: BiopetCommandLineFunction => case f: BiopetCommandLineFunction =>
f.preProcessExecutable() f.preProcessExecutable()
...@@ -93,14 +98,18 @@ trait BiopetQScript extends Configurable with GatkLogging { ...@@ -93,14 +98,18 @@ trait BiopetQScript extends Configurable with GatkLogging {
inputFiles.foreach { i => inputFiles.foreach { i =>
if (!i.file.exists()) Logging.addError(s"Input file does not exist: ${i.file}") if (!i.file.exists()) Logging.addError(s"Input file does not exist: ${i.file}")
else if (!i.file.canRead()) Logging.addError(s"Input file can not be read: ${i.file}") else if (!i.file.canRead) Logging.addError(s"Input file can not be read: ${i.file}")
} }
this match { functions.filter(_.jobOutputFile == null).foreach(f => {
case q: MultiSampleQScript if q.onlySamples.nonEmpty && !q.samples.forall(x => q.onlySamples.contains(x._1)) => try {
logger.info("Write report is skipped because sample flag is used") f.jobOutputFile = new File(f.firstOutput.getAbsoluteFile.getParent, "." + f.firstOutput.getName + "." + configName + ".out")
case _ => reportClass.foreach(add(_)) } catch {
} case e: NullPointerException => logger.warn(s"Can't generate a jobOutputFile for $f")
}
})
if (logger.isDebugEnabled) WriteDependencies.writeDependencies(functions, new File(outputDir, s".log/${qSettings.runName}.deps.json"))
Logging.checkErrors() Logging.checkErrors()
} }
......
package nl.lumc.sasc.biopet.core package nl.lumc.sasc.biopet.core
import java.io.File
import nl.lumc.sasc.biopet.utils.config.Configurable import nl.lumc.sasc.biopet.utils.config.Configurable
import org.broadinstitute.gatk.queue.function.CommandLineFunction import org.broadinstitute.gatk.queue.function.CommandLineFunction
...@@ -58,9 +56,6 @@ trait CommandLineResources extends CommandLineFunction with Configurable { ...@@ -58,9 +56,6 @@ trait CommandLineResources extends CommandLineFunction with Configurable {
case e: NullPointerException => null case e: NullPointerException => null
} }
if (jobOutputFile == null && firstOutput != null)
jobOutputFile = new File(firstOutput.getAbsoluteFile.getParent, "." + firstOutput.getName + "." + configName + ".out")
nCoresRequest = Option(threads) nCoresRequest = Option(threads)
/** The 1e retry does not yet upgrade the memory */ /** The 1e retry does not yet upgrade the memory */
......
package nl.lumc.sasc.biopet.core
import java.io.{ File, PrintWriter }
import nl.lumc.sasc.biopet.utils.config.Configurable
import nl.lumc.sasc.biopet.utils.{ Logging, ConfigUtils }
import org.broadinstitute.gatk.queue.function.{ CommandLineFunction, QFunction }
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
/**
* This object will generate with [[WriteDependencies.writeDependencies]] a json file where information about job and file dependencies are stored
*
* @author Peter van 't Hof <p.j.van_t_hof@lumc.nl>
*/
object WriteDependencies extends Logging with Configurable {
val root: Configurable = null
private val functionNames: mutable.Map[QFunction, String] = mutable.Map()
private def createFunctionNames(functions: Seq[QFunction]): Unit = {
val cache: mutable.Map[String, Int] = mutable.Map()
for (function <- functions) {
val baseName = function match {
case f: Configurable => f.configName
case f => f.getClass.getSimpleName
}
cache += baseName -> (cache.getOrElse(baseName, 0) + 1)
functionNames += function -> s"$baseName-${cache(baseName)}"
}
}
/**
* This method will generate a json file where information about job and file dependencies are stored
*
* @param functions This should be all functions that are given to the graph of Queue
* @param outputFile Json file to write dependencies to
*/
def writeDependencies(functions: Seq[QFunction], outputFile: File): Unit = {
logger.info("Start calculating dependencies")
val errorOnMissingInput: Boolean = config("error_on_missing_input", false)
createFunctionNames(functions)
case class QueueFile(file: File) {
private val inputJobs: ListBuffer[QFunction] = ListBuffer()
def addInputJob(function: QFunction) = inputJobs += function
def inputJobNames = inputJobs.toList.map(functionNames)
private val outputJobs: ListBuffer[QFunction] = ListBuffer()
def addOutputJob(function: QFunction) = {
if (outputJobs.nonEmpty) logger.warn(s"File '$file' is found as output of multiple jobs")
outputJobs += function
}
def outputJobNames = outputJobs.toList.map(functionNames)
def getMap = {
val fileExist = file.exists()
if (!fileExist && outputJobs.isEmpty) {
if (errorOnMissingInput) Logging.addError(s"Input file does not exist: $file")
else logger.warn(s"Input file does not exist: $file")
}
Map(
"path" -> file.getAbsolutePath,
"intermediate" -> isIntermediate,
"output_jobs" -> outputJobNames,
"input_jobs" -> inputJobNames,
"exist_at_start" -> fileExist,
"pipeline_input" -> outputJobs.isEmpty
)
}
def isIntermediate = outputJobs.exists(_.isIntermediate)
}
val files: mutable.Map[File, QueueFile] = mutable.Map()
def outputFiles(function: QFunction) = {
if (function.jobErrorFile == null) function.outputs :+ function.jobOutputFile
else function.outputs :+ function.jobOutputFile :+ function.jobErrorFile
}
for (function <- functions) {
for (input <- function.inputs) {
val file = files.getOrElse(input, QueueFile(input))
file.addInputJob(function)
files += input -> file
}
for (output <- outputFiles(function)) {
val file = files.getOrElse(output, QueueFile(output))
file.addOutputJob(function)
files += output -> file
}
}
val jobs = functionNames.par.map {
case (f, name) =>
name -> Map("command" -> (f match {
case cmd: CommandLineFunction => cmd.commandLine
case _ => None
}), "intermediate" -> f.isIntermediate,
"depens_on_intermediate" -> f.inputs.exists(files(_).isIntermediate),
"depens_on_jobs" -> f.inputs.toList.flatMap(files(_).outputJobNames).distinct,
"ouput_used_by_jobs" -> outputFiles(f).toList.flatMap(files(_).inputJobNames).distinct,
"outputs" -> outputFiles(f).toList,
"inputs" -> f.inputs.toList,
"done_at_start" -> f.isDone,
"fail_at_start" -> f.isFail)
}.toIterator.toMap
logger.info(s"Writing dependencies to: $outputFile")
val writer = new PrintWriter(outputFile)
writer.println(ConfigUtils.mapToJson(Map(
"jobs" -> jobs.toMap,
"files" -> files.values.par.map(_.getMap).toList
)).spaces2)
writer.close()
logger.info("done calculating dependencies")
}
}
...@@ -21,11 +21,6 @@ class CheckChecksum extends InProcessFunction { ...@@ -21,11 +21,6 @@ class CheckChecksum extends InProcessFunction {
@Argument(required = true) @Argument(required = true)
var checksum: String = _ var checksum: String = _
override def freezeFieldValues(): Unit = {
super.freezeFieldValues()
jobOutputFile = new File(checksumFile.getParentFile, checksumFile.getName + ".check.out")
}
/** Exits whenever the input md5sum is not the same as the output md5sum */ /** Exits whenever the input md5sum is not the same as the output md5sum */
def run: Unit = { def run: Unit = {
val outputChecksum = WriteSummary.parseChecksum(checksumFile).toLowerCase val outputChecksum = WriteSummary.parseChecksum(checksumFile).toLowerCase
......
...@@ -144,6 +144,7 @@ trait SummaryQScript extends BiopetQScript { qscript => ...@@ -144,6 +144,7 @@ trait SummaryQScript extends BiopetQScript { qscript =>
s"Md5 job is not executed, checksum file can't be found for: ${inputFile.file}") s"Md5 job is not executed, checksum file can't be found for: ${inputFile.file}")
checkMd5.checksumFile = SummaryQScript.md5sumCache(inputFile.file) checkMd5.checksumFile = SummaryQScript.md5sumCache(inputFile.file)
checkMd5.checksum = checksum checkMd5.checksum = checksum
checkMd5.jobOutputFile = new File(checkMd5.checksumFile.getParentFile, checkMd5.checksumFile.getName + ".check.out")
add(checkMd5) add(checkMd5)
} }
case _ => case _ =>
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
package nl.lumc.sasc.biopet.utils package nl.lumc.sasc.biopet.utils
import java.io.File import java.io.File
import java.util
import argonaut.Argonaut._ import argonaut.Argonaut._
import argonaut._ import argonaut._
...@@ -270,9 +271,10 @@ object ConfigUtils extends Logging { ...@@ -270,9 +271,10 @@ object ConfigUtils extends Logging {
def any2set(any: Any): Set[Any] = { def any2set(any: Any): Set[Any] = {
if (any == null) return null if (any == null) return null
any match { any match {
case s: Set[_] => s.toSet case s: Set[_] => s.toSet
case l: List[_] => l.toSet case l: List[_] => l.toSet
case _ => Set(any) case l: util.ArrayList[_] => l.toSet
case _ => Set(any)
} }
} }
......
...@@ -24,6 +24,7 @@ import nl.lumc.sasc.biopet.extensions.picard.{ AddOrReplaceReadGroups, MarkDupli ...@@ -24,6 +24,7 @@ import nl.lumc.sasc.biopet.extensions.picard.{ AddOrReplaceReadGroups, MarkDupli
import nl.lumc.sasc.biopet.pipelines.bammetrics.BamMetrics import nl.lumc.sasc.biopet.pipelines.bammetrics.BamMetrics
import nl.lumc.sasc.biopet.pipelines.mapping.Mapping import nl.lumc.sasc.biopet.pipelines.mapping.Mapping
import nl.lumc.sasc.biopet.pipelines.toucan.Toucan import nl.lumc.sasc.biopet.pipelines.toucan.Toucan
import nl.lumc.sasc.biopet.utils.Logging
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
...@@ -119,9 +120,17 @@ trait ShivaTrait extends MultiSampleQScript with Reference { ...@@ -119,9 +120,17 @@ trait ShivaTrait extends MultiSampleQScript with Reference {
(Some(mapping), Some(mapping.finalBamFile), preProcess(mapping.finalBamFile)) (Some(mapping), Some(mapping.finalBamFile), preProcess(mapping.finalBamFile))
} }
lazy val inputR1: Option[File] = config("R1") def fileMustBeAbsulute(file: Option[File]): Option[File] = {
lazy val inputR2: Option[File] = config("R2") if (file.forall(_.isAbsolute)) file
lazy val inputBam: Option[File] = if (inputR1.isEmpty) config("bam") else None else {
Logging.addError(s"$file for $sampleId / $libId should be a absolute file path")
file.map(_.getAbsoluteFile)
}
}
lazy val inputR1: Option[File] = fileMustBeAbsulute(config("R1"))
lazy val inputR2: Option[File] = fileMustBeAbsulute(config("R2"))
lazy val inputBam: Option[File] = fileMustBeAbsulute(if (inputR1.isEmpty) config("bam") else None)
lazy val (mapping, bamFile, preProcessBam): (Option[Mapping], Option[File], Option[File]) = lazy val (mapping, bamFile, preProcessBam): (Option[Mapping], Option[File], Option[File]) =
(inputR1.isDefined, inputBam.isDefined) match { (inputR1.isDefined, inputBam.isDefined) match {
...@@ -253,7 +262,8 @@ trait ShivaTrait extends MultiSampleQScript with Reference { ...@@ -253,7 +262,8 @@ trait ShivaTrait extends MultiSampleQScript with Reference {
md.input = input md.input = input
md.output = new File(sampleDir, sampleId + ".dedup.bam") md.output = new File(sampleDir, sampleId + ".dedup.bam")
md.outputMetrics = new File(sampleDir, sampleId + ".dedup.metrics") md.outputMetrics = new File(sampleDir, sampleId + ".dedup.metrics")
md.isIntermediate = isIntermediate //FIXME: making this file intermediate make the pipeline restart unnessery jobs
//md.isIntermediate = isIntermediate
add(md) add(md)
addSummarizable(md, "mark_duplicates") addSummarizable(md, "mark_duplicates")
Some(md.output) Some(md.output)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment