Commit 8ae539f7 authored by Peter van 't Hof's avatar Peter van 't Hof
Browse files

Switch to fifo pipes

parent 9230f543
...@@ -85,7 +85,7 @@ trait BiopetCommandLineFunction extends CommandLineFunction with Configurable { ...@@ -85,7 +85,7 @@ trait BiopetCommandLineFunction extends CommandLineFunction with Configurable {
* Can override this method. This is executed just before the job is ready to run. * Can override this method. This is executed just before the job is ready to run.
* Can check on run time files from pipeline here * Can check on run time files from pipeline here
*/ */
protected[core] def beforeCmd() {} def beforeCmd() {}
/** Can override this method. This is executed after the script is done en queue starts to generate the graph */ /** Can override this method. This is executed after the script is done en queue starts to generate the graph */
def beforeGraph() {} def beforeGraph() {}
...@@ -101,7 +101,7 @@ trait BiopetCommandLineFunction extends CommandLineFunction with Configurable { ...@@ -101,7 +101,7 @@ trait BiopetCommandLineFunction extends CommandLineFunction with Configurable {
} }
/** Set default output file, threads and vmem for current job */ /** Set default output file, threads and vmem for current job */
private[core] def internalBeforeGraph(): Unit = { final def internalBeforeGraph(): Unit = {
val firstOutput = try { val firstOutput = try {
this.firstOutput this.firstOutput
} catch { } catch {
...@@ -342,46 +342,6 @@ trait BiopetCommandLineFunction extends CommandLineFunction with Configurable { ...@@ -342,46 +342,6 @@ trait BiopetCommandLineFunction extends CommandLineFunction with Configurable {
pipesJobs :+= job pipesJobs :+= job
pipesJobs = pipesJobs.distinct pipesJobs = pipesJobs.distinct
} }
def requiredInput(prefix: String, arg: Either[File, BiopetCommandLineFunction]): String = {
arg match {
case Left(file) => {
deps :+= file
required(prefix, file)
}
case Right(cmd) => {
cmd._outputAsStdout = true
addPipeJob(cmd)
try {
if (cmd.outputs != null) outputFiles ++= cmd.outputs
if (cmd.inputs != null) deps ++= cmd.inputs
} catch {
case e: NullPointerException =>
}
s"'${prefix}' <( ${cmd.commandLine} ) "
}
}
}
def requiredOutput(prefix: String, arg: Either[File, BiopetCommandLineFunction]): String = {
arg match {
case Left(file) => {
deps :+= file
required(prefix, file)
}
case Right(cmd) => {
cmd._inputAsStdin = true
addPipeJob(cmd)
try {
if (cmd.outputs != null) outputFiles ++= cmd.outputs
if (cmd.inputs != null) deps ++= cmd.inputs
} catch {
case e: NullPointerException =>
}
s"'${prefix}' >( ${cmd.commandLine} ) "
}
}
}
} }
/** stores global caches */ /** stores global caches */
......
package nl.lumc.sasc.biopet.core
import java.io.File
import nl.lumc.sasc.biopet.utils.config.Configurable
/**
* Created by pjvan_thof on 9/29/15.
*/
class BiopetFifoPipe(val root: Configurable,
protected var commands: List[BiopetCommandLineFunction]) extends BiopetCommandLineFunction {
def fifos: List[File] = {
val outputs: Map[BiopetCommandLineFunction, Seq[File]] = try {
commands.map(x => x -> x.outputs).toMap
} catch {
case e: NullPointerException => Map()
}
val inputs: Map[BiopetCommandLineFunction, Seq[File]] = try {
commands.map(x => x -> x.inputs).toMap
} catch {
case e: NullPointerException => Map()
}
for (
cmdOutput <- commands;
cmdInput <- commands if cmdOutput != cmdInput && outputs.contains(cmdOutput);
outputFile <- outputs(cmdOutput) if inputs.contains(cmdInput);
inputFile <- inputs(cmdInput) if outputFile == inputFile
) yield outputFile
}
override def beforeGraph(): Unit = {
val outputs: Map[BiopetCommandLineFunction, Seq[File]] = try {
commands.map(x => x -> x.outputs).toMap
} catch {
case e: NullPointerException => Map()
}
val inputs: Map[BiopetCommandLineFunction, Seq[File]] = try {
commands.map(x => x -> x.inputs).toMap
} catch {
case e: NullPointerException => Map()
}
val fifoFiles = fifos
outputFiles = outputs.values.toList.flatten.filter(!fifoFiles.contains(_))
outputFiles = outputFiles.distinct
deps = inputs.values.toList.flatten.filter(!fifoFiles.contains(_))
deps = deps.distinct
}
override def beforeCmd(): Unit = {
commands.foreach{ cmd =>
cmd.beforeGraph()
cmd.internalBeforeGraph()
cmd.beforeCmd()
}
}
def cmdLine = {
val fifosFiles = this.fifos
fifosFiles.filter(_.exists()).map(required("rm", _)).mkString("\n\n", " \n", " \n\n") +
fifosFiles.map(required("mkfifo", _)).mkString("\n\n", "\n", "\n\n") +
commands.map(_.commandLine).mkString("\n\n", " & \n", " & \n\n") +
BiopetFifoPipe.waitScript +
fifosFiles.map(required("rm", _)).mkString("\n\n", " \n", " \n\n") +
BiopetFifoPipe.endScript
}
}
object BiopetFifoPipe {
val waitScript =
"""
|
|FAIL="0"
|
|for job in `jobs -p`
|do
|echo $job
| wait $job || let "FAIL+=1"
|done
|
|echo $FAIL
|
""".stripMargin
val endScript =
"""
echo $FAIL
|
|if [ "$FAIL" == "0" ];
|then
|echo "BiopetFifoPipe Done"
|else
|echo BiopetFifoPipe "FAIL! ($FAIL)"
|exit $FAIL
|fi
|
|
""".stripMargin
}
\ No newline at end of file
...@@ -29,7 +29,7 @@ trait BiopetJavaCommandLineFunction extends JavaCommandLineFunction with BiopetC ...@@ -29,7 +29,7 @@ trait BiopetJavaCommandLineFunction extends JavaCommandLineFunction with BiopetC
/** Constructs java opts, this adds scala threads */ /** Constructs java opts, this adds scala threads */
override def javaOpts = super.javaOpts + override def javaOpts = super.javaOpts +
optional("-Dscala.concurrent.context.numThreads=", threads, spaceSeparated = false, escape = false) optional("-Dscala.concurrent.context.numThreads=", threads, spaceSeparated = false)
override def beforeGraph(): Unit = { override def beforeGraph(): Unit = {
if (javaMemoryLimit.isEmpty && memoryLimit.isDefined) if (javaMemoryLimit.isEmpty && memoryLimit.isDefined)
......
...@@ -10,6 +10,11 @@ trait ToolCommandFuntion extends BiopetJavaCommandLineFunction { ...@@ -10,6 +10,11 @@ trait ToolCommandFuntion extends BiopetJavaCommandLineFunction {
override def getVersion = Some("Biopet " + FullVersion) override def getVersion = Some("Biopet " + FullVersion)
override def beforeGraph(): Unit = {
javaMainClass = toolObject.getClass.getName.takeWhile(_ != '$')
super.beforeGraph()
}
override def freezeFieldValues(): Unit = { override def freezeFieldValues(): Unit = {
javaMainClass = toolObject.getClass.getName.takeWhile(_ != '$') javaMainClass = toolObject.getClass.getName.takeWhile(_ != '$')
super.freezeFieldValues() super.freezeFieldValues()
......
...@@ -33,7 +33,8 @@ class Cutadapt(val root: Configurable) extends BiopetCommandLineFunction with Su ...@@ -33,7 +33,8 @@ class Cutadapt(val root: Configurable) extends BiopetCommandLineFunction with Su
@Input(doc = "Input fastq file") @Input(doc = "Input fastq file")
var fastq_input: File = _ var fastq_input: File = _
var fastq_output: Either[File, BiopetCommandLineFunction] = _ @Output
var fastq_output: File = _
@Output(doc = "Output statistics file") @Output(doc = "Output statistics file")
var stats_output: File = _ var stats_output: File = _
...@@ -62,7 +63,7 @@ class Cutadapt(val root: Configurable) extends BiopetCommandLineFunction with Su ...@@ -62,7 +63,7 @@ class Cutadapt(val root: Configurable) extends BiopetCommandLineFunction with Su
optional("-M", opt_maximum_length) + optional("-M", opt_maximum_length) +
// input / output // input / output
required(fastq_input) + required(fastq_input) +
(if (outputAsStsout) "" else requiredOutput("--output", fastq_output) + (if (outputAsStsout) "" else required("--output", fastq_output) +
" > " + required(stats_output)) " > " + required(stats_output))
/** Output summary stats */ /** Output summary stats */
......
...@@ -36,7 +36,8 @@ class Sickle(val root: Configurable) extends BiopetCommandLineFunction with Summ ...@@ -36,7 +36,8 @@ class Sickle(val root: Configurable) extends BiopetCommandLineFunction with Summ
@Input(doc = "R2 input", required = false) @Input(doc = "R2 input", required = false)
var input_R2: File = _ var input_R2: File = _
var output_R1: Either[File, BiopetCommandLineFunction] = _ @Output(doc = "R1 output", required = false)
var output_R1: File = _
@Output(doc = "R2 output", required = false) @Output(doc = "R2 output", required = false)
var output_R2: File = _ var output_R2: File = _
...@@ -75,7 +76,7 @@ class Sickle(val root: Configurable) extends BiopetCommandLineFunction with Summ ...@@ -75,7 +76,7 @@ class Sickle(val root: Configurable) extends BiopetCommandLineFunction with Summ
cmd + cmd +
(if (inputAsStdin) required("-f", new File("/dev/stdin")) else required("-f", input_R1)) + (if (inputAsStdin) required("-f", new File("/dev/stdin")) else required("-f", input_R1)) +
required("-t", qualityType) + required("-t", qualityType) +
(if (outputAsStsout) required("-o", new File("/dev/stdout")) else requiredOutput("-o", output_R1)) + (if (outputAsStsout) required("-o", new File("/dev/stdout")) else required("-o", output_R1)) +
optional("-q", qualityThreshold) + optional("-q", qualityThreshold) +
optional("-l", lengthThreshold) + optional("-l", lengthThreshold) +
conditional(noFiveprime, "-x") + conditional(noFiveprime, "-x") +
......
...@@ -37,9 +37,11 @@ class FastqSync(val root: Configurable) extends ToolCommandFuntion with Summariz ...@@ -37,9 +37,11 @@ class FastqSync(val root: Configurable) extends ToolCommandFuntion with Summariz
@Input(doc = "Original FASTQ file (read 1 or 2)", shortName = "r", required = true) @Input(doc = "Original FASTQ file (read 1 or 2)", shortName = "r", required = true)
var refFastq: File = null var refFastq: File = null
var inputFastq1: Either[File, BiopetCommandLineFunction] = null @Input(required = true)
var inputFastq1: File = null
var inputFastq2: Either[File, BiopetCommandLineFunction] = null @Input(required = true)
var inputFastq2: File = null
@Output(doc = "Output read 1 FASTQ file", shortName = "o", required = true) @Output(doc = "Output read 1 FASTQ file", shortName = "o", required = true)
var outputFastq1: File = null var outputFastq1: File = null
...@@ -52,23 +54,11 @@ class FastqSync(val root: Configurable) extends ToolCommandFuntion with Summariz ...@@ -52,23 +54,11 @@ class FastqSync(val root: Configurable) extends ToolCommandFuntion with Summariz
override def defaultCoreMemory = 4.0 override def defaultCoreMemory = 4.0
override def beforeGraph(): Unit = {
super.beforeGraph()
inputFastq1 match {
case Right(job) => addPipeJob(job)
case _ =>
}
inputFastq2 match {
case Right(job) => addPipeJob(job)
case _ =>
}
}
override def cmdLine = override def cmdLine =
super.cmdLine + super.cmdLine +
required("-r", refFastq) + required("-r", refFastq) +
requiredInput("-i", inputFastq1) + required("-i", inputFastq1) +
requiredInput("-j", inputFastq2) + required("-j", inputFastq2) +
required("-o", outputFastq1) + required("-o", outputFastq1) +
required("-p", outputFastq2) + " > " + required("-p", outputFastq2) + " > " +
required(outputStats) required(outputStats)
......
...@@ -18,7 +18,7 @@ package nl.lumc.sasc.biopet.pipelines.flexiprep ...@@ -18,7 +18,7 @@ package nl.lumc.sasc.biopet.pipelines.flexiprep
import java.io.File import java.io.File
import nl.lumc.sasc.biopet.core.summary.SummaryQScript import nl.lumc.sasc.biopet.core.summary.SummaryQScript
import nl.lumc.sasc.biopet.core.{ PipelineCommand, SampleLibraryTag } import nl.lumc.sasc.biopet.core.{ BiopetFifoPipe, PipelineCommand, SampleLibraryTag }
import nl.lumc.sasc.biopet.extensions.{ Pbzip2, Zcat, Gzip, Sickle } import nl.lumc.sasc.biopet.extensions.{ Pbzip2, Zcat, Gzip, Sickle }
import nl.lumc.sasc.biopet.utils.config.Configurable import nl.lumc.sasc.biopet.utils.config.Configurable
import nl.lumc.sasc.biopet.extensions.tools.{ SeqStat, FastqSync } import nl.lumc.sasc.biopet.extensions.tools.{ SeqStat, FastqSync }
...@@ -180,7 +180,7 @@ class Flexiprep(val root: Configurable) extends QScript with SummaryQScript with ...@@ -180,7 +180,7 @@ class Flexiprep(val root: Configurable) extends QScript with SummaryQScript with
val qcCmdR1 = new QcCommand(this, fastqc_R1) val qcCmdR1 = new QcCommand(this, fastqc_R1)
qcCmdR1.input = R1_in qcCmdR1.input = R1_in
qcCmdR1.read = "R1" qcCmdR1.read = "R1"
qcCmdR1.output = if (paired) new File("/dev/stdout") qcCmdR1.output = if (paired) new File(fastqR1Qc.getAbsolutePath.stripSuffix(".gz"))
else fastqR1Qc else fastqR1Qc
qcCmdR1.isIntermediate = paired || !keepQcFastqFiles qcCmdR1.isIntermediate = paired || !keepQcFastqFiles
addSummarizable(qcCmdR1, "qc_command_R1") addSummarizable(qcCmdR1, "qc_command_R1")
...@@ -188,7 +188,7 @@ class Flexiprep(val root: Configurable) extends QScript with SummaryQScript with ...@@ -188,7 +188,7 @@ class Flexiprep(val root: Configurable) extends QScript with SummaryQScript with
if (paired) { if (paired) {
val qcCmdR2 = new QcCommand(this, fastqc_R2) val qcCmdR2 = new QcCommand(this, fastqc_R2)
qcCmdR2.input = R2_in.get qcCmdR2.input = R2_in.get
qcCmdR2.output = new File("/dev/stdout") qcCmdR2.output = new File(fastqR2Qc.get.getAbsolutePath.stripSuffix(".gz"))
qcCmdR2.read = "R2" qcCmdR2.read = "R2"
addSummarizable(qcCmdR2, "qc_command_R2") addSummarizable(qcCmdR2, "qc_command_R2")
...@@ -197,15 +197,38 @@ class Flexiprep(val root: Configurable) extends QScript with SummaryQScript with ...@@ -197,15 +197,38 @@ class Flexiprep(val root: Configurable) extends QScript with SummaryQScript with
val fqSync = new FastqSync(this) val fqSync = new FastqSync(this)
fqSync.refFastq = R1_in fqSync.refFastq = R1_in
fqSync.inputFastq1 = Right(qcCmdR1) fqSync.inputFastq1 = qcCmdR1.output
fqSync.inputFastq2 = Right(qcCmdR2) fqSync.inputFastq2 = qcCmdR2.output
fqSync.outputFastq1 = fastqR1Qc fqSync.outputFastq1 = fastqR1Qc
fqSync.outputFastq2 = fastqR2Qc.get fqSync.outputFastq2 = fastqR2Qc.get
fqSync.outputStats = new File(outDir, s"${sampleId.getOrElse("x")}-${libId.getOrElse("x")}.sync.stats") fqSync.outputStats = new File(outDir, s"${sampleId.getOrElse("x")}-${libId.getOrElse("x")}.sync.stats")
fqSync.isIntermediate = !keepQcFastqFiles //add(fqSync)
fqSync.deps ::= fastqc_R1.output
fqSync.deps ::= fastqc_R2.output val pipe = new BiopetFifoPipe(this, fqSync :: Nil) {
add(fqSync)
override def defaultThreads = 4
override def defaultCoreMemory = 4.0
override def configName = "qc-cmd"
override def beforeGraph(): Unit = {
fqSync.beforeGraph()
super.beforeGraph()
}
override def beforeCmd(): Unit = {
qcCmdR1.beforeCmd()
qcCmdR2.beforeCmd()
fqSync.beforeCmd()
commands = qcCmdR1.jobs ::: qcCmdR2.jobs ::: fqSync :: Nil
super.beforeCmd()
}
}
pipe.deps ::= fastqc_R1.output
pipe.deps ::= fastqc_R2.output
pipe.isIntermediate = !keepQcFastqFiles
add(pipe)
addSummarizable(fqSync, "fastq_sync") addSummarizable(fqSync, "fastq_sync")
outputFiles += ("syncStats" -> fqSync.outputStats) outputFiles += ("syncStats" -> fqSync.outputStats)
R1 = fqSync.outputFastq1 R1 = fqSync.outputFastq1
......
...@@ -3,7 +3,7 @@ package nl.lumc.sasc.biopet.pipelines.flexiprep ...@@ -3,7 +3,7 @@ package nl.lumc.sasc.biopet.pipelines.flexiprep
import java.io.File import java.io.File
import nl.lumc.sasc.biopet.core.summary.{ SummaryQScript, Summarizable } import nl.lumc.sasc.biopet.core.summary.{ SummaryQScript, Summarizable }
import nl.lumc.sasc.biopet.core.{ BiopetCommandLineFunction, BiopetPipe } import nl.lumc.sasc.biopet.core.{ BiopetFifoPipe, BiopetCommandLineFunction, BiopetPipe }
import nl.lumc.sasc.biopet.extensions.{ Cat, Gzip, Sickle, Cutadapt } import nl.lumc.sasc.biopet.extensions.{ Cat, Gzip, Sickle, Cutadapt }
import nl.lumc.sasc.biopet.extensions.seqtk.SeqtkSeq import nl.lumc.sasc.biopet.extensions.seqtk.SeqtkSeq
import nl.lumc.sasc.biopet.utils.config.Configurable import nl.lumc.sasc.biopet.utils.config.Configurable
...@@ -35,6 +35,9 @@ class QcCommand(val root: Configurable, val fastqc: Fastqc) extends BiopetComman ...@@ -35,6 +35,9 @@ class QcCommand(val root: Configurable, val fastqc: Fastqc) extends BiopetComman
val seqtk = new SeqtkSeq(root) val seqtk = new SeqtkSeq(root)
var clip: Option[Cutadapt] = None var clip: Option[Cutadapt] = None
var trim: Option[Sickle] = None var trim: Option[Sickle] = None
var outputCommand: BiopetCommandLineFunction = null
def jobs = (Some(seqtk) :: clip :: trim :: Some(outputCommand) :: Nil).flatten
def summaryFiles = Map() def summaryFiles = Map()
...@@ -60,6 +63,7 @@ class QcCommand(val root: Configurable, val fastqc: Fastqc) extends BiopetComman ...@@ -60,6 +63,7 @@ class QcCommand(val root: Configurable, val fastqc: Fastqc) extends BiopetComman
override def beforeCmd(): Unit = { override def beforeCmd(): Unit = {
seqtk.input = input seqtk.input = input
seqtk.output = new File(output.getParentFile, input.getName + ".seqtk.fq")
seqtk.Q = fastqc.encoding match { seqtk.Q = fastqc.encoding match {
case null => None case null => None
case enc if enc.contains("Sanger / Illumina 1.9") => None case enc if enc.contains("Sanger / Illumina 1.9") => None
...@@ -73,7 +77,9 @@ class QcCommand(val root: Configurable, val fastqc: Fastqc) extends BiopetComman ...@@ -73,7 +77,9 @@ class QcCommand(val root: Configurable, val fastqc: Fastqc) extends BiopetComman
clip = if (!flexiprep.skipClip) { clip = if (!flexiprep.skipClip) {
val foundAdapters = fastqc.foundAdapters.map(_.seq) val foundAdapters = fastqc.foundAdapters.map(_.seq)
if (foundAdapters.nonEmpty) { if (foundAdapters.nonEmpty) {
val cutadept = new nl.lumc.sasc.biopet.extensions.Cutadapt(root) val cutadept = new Cutadapt(root)
cutadept.fastq_input = seqtk.output
cutadept.fastq_output = new File(output.getParentFile, input.getName + ".cutadept.fq")
cutadept.stats_output = new File(flexiprep.outputDir, s"${flexiprep.sampleId.getOrElse("x")}-${flexiprep.libId.getOrElse("x")}.$read.clip.stats") cutadept.stats_output = new File(flexiprep.outputDir, s"${flexiprep.sampleId.getOrElse("x")}-${flexiprep.libId.getOrElse("x")}.$read.clip.stats")
if (cutadept.default_clip_mode == "3") cutadept.opt_adapter ++= foundAdapters if (cutadept.default_clip_mode == "3") cutadept.opt_adapter ++= foundAdapters
else if (cutadept.default_clip_mode == "5") cutadept.opt_front ++= foundAdapters else if (cutadept.default_clip_mode == "5") cutadept.opt_front ++= foundAdapters
...@@ -83,35 +89,53 @@ class QcCommand(val root: Configurable, val fastqc: Fastqc) extends BiopetComman ...@@ -83,35 +89,53 @@ class QcCommand(val root: Configurable, val fastqc: Fastqc) extends BiopetComman
} else None } else None
trim = if (!flexiprep.skipTrim) { trim = if (!flexiprep.skipTrim) {
val sickle = new nl.lumc.sasc.biopet.extensions.Sickle(root) val sickle = new Sickle(root)
sickle.output_stats = new File(flexiprep.outputDir, s"${flexiprep.sampleId.getOrElse("x")}-${flexiprep.libId.getOrElse("x")}.$read.trim.stats") sickle.output_stats = new File(flexiprep.outputDir, s"${flexiprep.sampleId.getOrElse("x")}-${flexiprep.libId.getOrElse("x")}.$read.trim.stats")
sickle.input_R1 = clip match {
case Some(clip) => clip.fastq_output
case _ => seqtk.output
}
sickle.output_R1 = new File(output.getParentFile, input.getName + ".sickle.fq")
Some(sickle) Some(sickle)
} else None } else None
val outputFile = (clip, trim) match {
case (_, Some(trim)) => trim.output_R1
case (Some(clip), _) => clip.fastq_output
case _ => seqtk.output
}
if (compress) outputCommand = {
val gzip = new Gzip(root)
gzip.input = outputFile :: Nil
gzip.output = output
gzip
}
else outputCommand = {
val cat = new Cat(root)
cat.input = outputFile :: Nil
cat.output = output
cat
}
seqtk.beforeGraph()
clip.foreach(_.beforeGraph())
trim.foreach(_.beforeGraph())
outputCommand.beforeGraph()
seqtk.beforeCmd()
clip.foreach(_.beforeCmd())
trim.foreach(_.beforeCmd())
outputCommand.beforeCmd()
} }
def cmdLine = { def cmdLine = {
val outputCommand = {
if (compress) new Gzip(root)
else new Cat(root)
}
val cmd = (clip, trim) match { val cmd = (clip, trim) match {
case (Some(clip), Some(trim)) => { case (Some(clip), Some(trim)) => new BiopetFifoPipe(root, seqtk :: clip :: trim :: outputCommand :: Nil)
clip.fastq_output = Right(trim) case (Some(clip), _) => new BiopetFifoPipe(root, seqtk :: clip :: outputCommand :: Nil)
trim.output_R1 = Right(outputCommand > output) case (_, Some(trim)) => new BiopetFifoPipe(root, seqtk :: trim :: outputCommand :: Nil)
seqtk | clip case _ => new BiopetFifoPipe(root, seqtk :: outputCommand :: Nil)
}
case (Some(clip), _) => {
clip.fastq_output = Right(outputCommand > output)
seqtk | clip
}
case (_, Some(trim)) => {
trim.output_R1 = Right(outputCommand > output)
seqtk | trim
}
case _ => {
seqtk | outputCommand > output
}
} }
//val cmds = (Some(seqtk) :: clip :: trim :: Some(new Gzip(root)) :: Nil).flatten