From 96d44ee9a6c2a4205a8985077fb83e413f919091 Mon Sep 17 00:00:00 2001
From: bow <bow@bow.web.id>
Date: Thu, 29 Jan 2015 17:02:32 +0100
Subject: [PATCH] Use sync tool and update its wrapper

---
 .../nl/lumc/sasc/biopet/tools/FastqSync.scala | 83 ++++++++++++++++++-
 .../pipelines/flexiprep/Flexiprep.scala       | 26 +++---
 .../flexiprep/FlexiprepSummary.scala          | 18 ++--
 3 files changed, 108 insertions(+), 19 deletions(-)

diff --git a/public/biopet-framework/src/main/scala/nl/lumc/sasc/biopet/tools/FastqSync.scala b/public/biopet-framework/src/main/scala/nl/lumc/sasc/biopet/tools/FastqSync.scala
index 4d439f0f4..54ff8d13b 100644
--- a/public/biopet-framework/src/main/scala/nl/lumc/sasc/biopet/tools/FastqSync.scala
+++ b/public/biopet-framework/src/main/scala/nl/lumc/sasc/biopet/tools/FastqSync.scala
@@ -9,10 +9,16 @@
 package nl.lumc.sasc.biopet.tools
 
 import java.io.File
+import scala.io.Source
+import scala.util.matching.Regex
+
 import scala.annotation.tailrec
 import scala.collection.JavaConverters._
 
+import argonaut._, Argonaut._
+import scalaz._, Scalaz._
 import htsjdk.samtools.fastq.{ BasicFastqWriter, FastqReader, FastqRecord }
+import org.broadinstitute.gatk.utils.commandline.{ Input, Output }
 
 import nl.lumc.sasc.biopet.core.BiopetJavaCommandLineFunction
 import nl.lumc.sasc.biopet.core.ToolCommand
@@ -27,9 +33,60 @@ class FastqSync(val root: Configurable) extends BiopetJavaCommandLineFunction {
 
   javaMainClass = getClass.getName
 
+  @Input(doc = "Original FASTQ file (read 1 or 2)", shortName = "r", required = true)
+  var refFastq: File = _
+
+  @Input(doc = "Input read 1 FASTQ file", shortName = "i", required = true)
+  var inputFastq1: File = _
+
+  @Input(doc = "Input read 2 FASTQ file", shortName = "j", required = true)
+  var inputFastq2: File = _
+
+  @Output(doc = "Output read 1 FASTQ file", shortName = "o", required = true)
+  var outputFastq1: File = _
+
+  @Output(doc = "Output read 2 FASTQ file", shortName = "p", required = true)
+  var outputFastq2: File = _
+
+  var outputStats: File = _
+
+  // executed command line
+  override def commandLine =
+    super.commandLine +
+      required("-r", refFastq) +
+      required("-i", inputFastq1) +
+      required("-j", inputFastq2) +
+      required("-o", outputFastq1) +
+      required("-p", outputFastq2) + " > " +
+      required(outputStats)
+
+  // summary statistics
+  def summary: Json = {
+
+    val regex = new Regex("""Filtered (\d*) reads from first read file.
+                            |Filtered (\d*) reads from second read file.
+                            |Synced read files contain (\d*) reads.""".stripMargin,
+      "R1", "R2", "RL")
+
+    val (countFilteredR1, countFilteredR2, countRLeft) =
+      if (outputStats.exists) {
+        val text = Source
+          .fromFile(outputStats)
+          .getLines()
+          .mkString("\n")
+        regex.findFirstMatchIn(text) match {
+          case None         => (0, 0, 0)
+          case Some(rmatch) => (rmatch.group("R1").toInt, rmatch.group("R2").toInt, rmatch.group("RL").toInt)
+        }
+      } else (0, 0, 0)
+
+    ("num_reads_discarded_R1" := countFilteredR1) ->:
+      ("num_reads_discarded_R2" := countFilteredR2) ->:
+      ("num_reads_kept" := countRLeft) ->:
+      jEmptyObject
+  }
 }
 
-// TODO: implement reading from and writing to gzipped files
 object FastqSync extends ToolCommand {
 
   /**
@@ -139,6 +196,30 @@ object FastqSync extends ToolCommand {
     println("Synced read files contain %d reads.".format(counts.numKept))
   }
 
+  /** Function to merge this tool's summary with summaries from other objects */
+  // TODO: refactor this into the object? At least make it work on the summary object
+  def mergeSummaries(jsons: List[Json]): Json = {
+
+    val (read1FilteredCount, read2FilteredCount, readsLeftCount) = jsons
+      // extract the values we require from each JSON object into tuples
+      .map {
+        case json =>
+          (json.field("num_reads_discarded_R1").get.numberOrZero.toInt,
+            json.field("num_reads_discarded_R2").get.numberOrZero.toInt,
+            json.field("num_reads_kept").get.numberOrZero.toInt)
+      }
+      // reduce the tuples
+      .reduceLeft {
+        (x: (Int, Int, Int), y: (Int, Int, Int)) =>
+          (x._1 + y._1, x._2 + y._2, x._3 + y._3)
+      }
+
+    ("num_reads_discarded_R1" := read1FilteredCount) ->:
+      ("num_reads_discarded_R2" := read2FilteredCount) ->:
+      ("num_reads_kept" := readsLeftCount) ->:
+      jEmptyObject
+  }
+
   case class Args(refFastq: File = new File(""),
                   inputFastq1: File = new File(""),
                   inputFastq2: File = new File(""),
diff --git a/public/flexiprep/src/main/scala/nl/lumc/sasc/biopet/pipelines/flexiprep/Flexiprep.scala b/public/flexiprep/src/main/scala/nl/lumc/sasc/biopet/pipelines/flexiprep/Flexiprep.scala
index e44260588..9f1bf50f3 100644
--- a/public/flexiprep/src/main/scala/nl/lumc/sasc/biopet/pipelines/flexiprep/Flexiprep.scala
+++ b/public/flexiprep/src/main/scala/nl/lumc/sasc/biopet/pipelines/flexiprep/Flexiprep.scala
@@ -21,7 +21,8 @@ import org.broadinstitute.gatk.utils.commandline.{ Input, Argument }
 import nl.lumc.sasc.biopet.core.{ BiopetQScript, PipelineCommand }
 import nl.lumc.sasc.biopet.core.config.Configurable
 import nl.lumc.sasc.biopet.extensions.{ Gzip, Pbzip2, Md5sum, Zcat, Seqstat }
-import nl.lumc.sasc.biopet.scripts.{ FastqSync }
+//import nl.lumc.sasc.biopet.scripts.FastqSync
+import nl.lumc.sasc.biopet.tools.FastqSync
 
 class Flexiprep(val root: Configurable) extends QScript with BiopetQScript {
   def this() = this(null)
@@ -182,15 +183,20 @@ class Flexiprep(val root: Configurable) extends QScript with BiopetQScript {
         R2 = cutadapt_R2.fastq_output
         deps ::= R2
 
-        val fastqSync = FastqSync(this, cutadapt_R1.fastq_input, cutadapt_R1.fastq_output, cutadapt_R2.fastq_output,
-          swapExt(outDir, R1, R1_ext, ".sync" + R1_ext), swapExt(outDir, R2, R2_ext, ".sync" + R2_ext), swapExt(outDir, R1, R1_ext, ".sync.stats"))
-        fastqSync.deps :::= deps
-        fastqSync.isIntermediate = true
-        add(fastqSync)
-        summary.addFastqcSync(fastqSync, chunk)
-        outputFiles += ("syncStats" -> fastqSync.output_stats)
-        R1 = fastqSync.output_R1
-        R2 = fastqSync.output_R2
+        val fqSync = new FastqSync(this)
+        fqSync.refFastq = cutadapt_R1.fastq_input
+        fqSync.inputFastq1 = cutadapt_R1.fastq_output
+        fqSync.inputFastq2 = cutadapt_R2.fastq_output
+        fqSync.outputFastq1 = swapExt(outDir, R1, R1_ext, ".sync" + R1_ext)
+        fqSync.outputFastq2 = swapExt(outDir, R2, R2_ext, ".sync" + R2_ext)
+        fqSync.outputStats = swapExt(outDir, R1, R1_ext, ".sync.stats")
+        fqSync.deps :::= deps
+        add(fqSync)
+
+        summary.addFastqcSync(fqSync, chunk)
+        outputFiles += ("syncStats" -> fqSync.outputStats)
+        R1 = fqSync.outputFastq1
+        R2 = fqSync.outputFastq2
         deps :::= R1 :: R2 :: Nil
       }
     }
diff --git a/public/flexiprep/src/main/scala/nl/lumc/sasc/biopet/pipelines/flexiprep/FlexiprepSummary.scala b/public/flexiprep/src/main/scala/nl/lumc/sasc/biopet/pipelines/flexiprep/FlexiprepSummary.scala
index 855e7288c..84acd1820 100644
--- a/public/flexiprep/src/main/scala/nl/lumc/sasc/biopet/pipelines/flexiprep/FlexiprepSummary.scala
+++ b/public/flexiprep/src/main/scala/nl/lumc/sasc/biopet/pipelines/flexiprep/FlexiprepSummary.scala
@@ -18,7 +18,7 @@ package nl.lumc.sasc.biopet.pipelines.flexiprep
 import java.io.PrintWriter
 import nl.lumc.sasc.biopet.core.config.Configurable
 import nl.lumc.sasc.biopet.extensions.{ Md5sum, Seqstat }
-import nl.lumc.sasc.biopet.scripts.{ FastqSync }
+import nl.lumc.sasc.biopet.tools.FastqSync
 import org.broadinstitute.gatk.queue.function.InProcessFunction
 import org.broadinstitute.gatk.utils.commandline.{ Input, Output }
 import java.io.File
@@ -112,8 +112,8 @@ class FlexiprepSummary(val root: Configurable) extends InProcessFunction with Co
   def addFastqcSync(fastqSync: FastqSync, chunk: String = ""): FastqSync = {
     if (!chunks.contains(chunk)) chunks += (chunk -> new Chunk)
     chunks(chunk).fastqSync = fastqSync
-    deps ::= fastqSync.output_stats
-    return fastqSync
+    deps ::= fastqSync.outputStats
+    fastqSync
   }
   // format: OFF
   override def run {
@@ -223,11 +223,13 @@ class FlexiprepSummary(val root: Configurable) extends InProcessFunction with Co
       jEmptyObject)
   }
 
-  def syncstatSummary(): Option[Json] = {
-    if (flexiprep.skipClip || !flexiprep.paired) return None
-    val s = for ((key, value) <- chunks) yield value.fastqSync.getSummary
-    return Option(FastqSync.mergeSummaries(s.toList))
-  }
+  def syncstatSummary(): Option[Json] =
+    if (flexiprep.skipClip || !flexiprep.paired)
+      None
+    else {
+      val s = for ((key, value) <- chunks) yield value.fastqSync.summary
+      Option(FastqSync.mergeSummaries(s.toList))
+    }
 
   def trimstatSummary(): Option[Json] = {
     if (flexiprep.skipTrim) return None
-- 
GitLab