From bc9b4ce67fd949af4638340311df08708e392bda Mon Sep 17 00:00:00 2001
From: bow <bow@bow.web.id>
Date: Sun, 15 Feb 2015 17:35:48 +0100
Subject: [PATCH] Bypass intermediate Stream creation in FastqSync to fix
 StackOverflowError

---
 .../nl/lumc/sasc/biopet/tools/FastqSync.scala | 102 ++++++------------
 1 file changed, 34 insertions(+), 68 deletions(-)

diff --git a/public/biopet-framework/src/main/scala/nl/lumc/sasc/biopet/tools/FastqSync.scala b/public/biopet-framework/src/main/scala/nl/lumc/sasc/biopet/tools/FastqSync.scala
index aaa2797b6..fe3ab7f60 100644
--- a/public/biopet-framework/src/main/scala/nl/lumc/sasc/biopet/tools/FastqSync.scala
+++ b/public/biopet-framework/src/main/scala/nl/lumc/sasc/biopet/tools/FastqSync.scala
@@ -1,5 +1,5 @@
 /**
- * Copyright (c) 2014 Leiden University Medical Center - Sequencing Analysis Support Core <sasc@lumc.nl>
+ * Copyright (c) 2014 Leiden University Medical Center - Streamuencing Analysis Support Core <sasc@lumc.nl>
  * @author Wibowo Arindrarto <w.arindrarto@lumc.nl>
  *
  * This tool is a port of a Python implementation written by Martijn Vermaat[1]
@@ -90,25 +90,11 @@ class FastqSync(val root: Configurable) extends BiopetJavaCommandLineFunction {
 
 object FastqSync extends ToolCommand {
 
-  /**
-   * Implicit class to allow for lazy retrieval of FastqRecord ID
-   * without any read pair mark
-   *
-   * @param fq FastqRecord
-   */
+  /** Implicit class to allow for lazy retrieval of FastqRecord ID without any read pair mark */
   private implicit class FastqPair(fq: FastqRecord) {
     lazy val fragId = fq.getReadHeader.split("[_/][12]\\s??|\\s")(0)
   }
 
-  /**
-   * Counts from syncing FastqRecords
-   *
-   * @param numDiscard1 Number of reads discarded from the initial read 1
-   * @param numDiscard2 Number of reads discarded from the initial read 2
-   * @param numKept Number of items in result
-   */
-  case class SyncCounts(numDiscard1: Int, numDiscard2: Int, numKept: Int)
-
   /**
    * Filters out FastqRecord that are not present in the input iterators, using
    * a reference sequence object
@@ -118,7 +104,8 @@ object FastqSync extends ToolCommand {
    * @param seqB FastqReader over read 2
    * @return
    */
-  def syncFastq(pre: FastqReader, seqA: FastqReader, seqB: FastqReader): (Stream[(FastqRecord, FastqRecord)], SyncCounts) = {
+  def syncFastq(pre: FastqReader, seqA: FastqReader, seqB: FastqReader,
+                seqOutA: AsyncFastqWriter, seqOutB: AsyncFastqWriter): (Long, Long, Long) = {
     // counters for discarded A and B seqections + total kept
     // NOTE: we are reasigning values to these variables in the recursion below
     var (numDiscA, numDiscB, numKept) = (0, 0, 0)
@@ -129,26 +116,11 @@ object FastqSync extends ToolCommand {
      * @param pre Reference sequence, assumed to be a superset of both seqA and seqB
      * @param seqA Sequence over read 1
      * @param seqB Sequence over read 2
-     * @param acc Stream containing pairs which are present in read 1 and read 2
      * @return
      */
     @tailrec def syncIter(pre: Stream[FastqRecord],
-                          seqA: Stream[FastqRecord], seqB: Stream[FastqRecord],
-                          acc: Stream[(FastqRecord, FastqRecord)]): Stream[(FastqRecord, FastqRecord)] =
+                          seqA: Stream[FastqRecord], seqB: Stream[FastqRecord]): Unit =
       (pre.headOption, seqA.headOption, seqB.headOption) match {
-        // recursion base case: both iterators have been exhausted
-        case (_, None, None) => acc
-        // illegal state: reference sequence exhausted but not seqA or seqB
-        case (None, Some(_), _) | (None, _, Some(_)) =>
-          throw new NoSuchElementException("Reference record stream shorter than expected")
-        // keep recursion going if A still has items (we want to count how many)
-        case (_, _, None) =>
-          numDiscA += 1
-          syncIter(pre.tail, seqA.tail, Stream(), acc)
-        // like above but for B
-        case (_, None, _) =>
-          numDiscB += 1
-          syncIter(pre.tail, Stream(), seqB.tail, acc)
         // where the magic happens!
         case (Some(r), Some(a), Some(b)) =>
           // value of A iterator in the next recursion
@@ -165,36 +137,31 @@ object FastqSync extends ToolCommand {
               if (a.fragId == r.fragId) numDiscA += 1
               seqB
             } else seqB.tail
-          // value of accumulator in the next recursion
-          val nextAcc =
-            // keep accumulator unchanged if any of the two post streams
-            // have different elements compared to the reference stream
-            if (a.fragId != r.fragId || b.fragId != r.fragId) acc
-            // otherwise, grow accumulator
-            else {
-              numKept += 1
-              acc ++ Stream((a, b))
-            }
-          syncIter(pre.tail, nextA, nextB, nextAcc)
+          // write only if all IDs are equal
+          if (a.fragId == r.fragId && b.fragId == r.fragId) {
+            numKept += 1
+            seqOutA.write(a)
+            seqOutB.write(b)
+          }
+          syncIter(pre.tail, nextA, nextB)
+        // recursion base case: both iterators have been exhausted
+        case (_, None, None) => ;
+        // illegal state: reference sequence exhausted but not seqA or seqB
+        case (None, Some(_), _) | (None, _, Some(_)) =>
+          throw new NoSuchElementException("Reference record stream shorter than expected")
+        // keep recursion going if A still has items (we want to count how many)
+        case (_, _, None) =>
+          numDiscA += 1
+          syncIter(pre.tail, seqA.tail, seqB)
+        // like above but for B
+        case (_, None, _) =>
+          numDiscB += 1
+          syncIter(pre.tail, seqA, seqB.tail)
       }
 
-    (syncIter(pre.iterator.asScala.toStream, seqA.iterator.asScala.toStream, seqB.iterator.asScala.toStream,
-      Stream.empty[(FastqRecord, FastqRecord)]),
-      SyncCounts(numDiscA, numDiscB, numKept))
-  }
+    syncIter(pre.iterator.asScala.toStream, seqA.iterator.asScala.toStream, seqB.iterator.asScala.toStream)
 
-  def writeSyncedFastq(sync: Stream[(FastqRecord, FastqRecord)],
-                       counts: SyncCounts,
-                       outputFastq1: AsyncFastqWriter,
-                       outputFastq2: AsyncFastqWriter): Unit = {
-    sync.foreach {
-      case (rec1, rec2) =>
-        outputFastq1.write(rec1)
-        outputFastq2.write(rec2)
-    }
-    println("Filtered %d reads from first read file.".format(counts.numDiscard1))
-    println("Filtered %d reads from second read file.".format(counts.numDiscard2))
-    println("Synced read files contain %d reads.".format(counts.numKept))
+    (numDiscA, numDiscB, numKept)
   }
 
   /** Function to merge this tool's summary with summaries from other objects */
@@ -229,7 +196,6 @@ object FastqSync extends ToolCommand {
 
   class OptParser extends AbstractOptParser {
 
-    // TODO: make output format independent from input format?
     head(
       s"""
         |$commandName - Sync paired-end FASTQ files.
@@ -279,15 +245,15 @@ object FastqSync extends ToolCommand {
 
     val commandArgs: Args = parseArgs(args)
 
-    val (synced, counts) = syncFastq(
+    val (numDiscA, numDiscB, numKept) = syncFastq(
       new FastqReader(commandArgs.refFastq),
       new FastqReader(commandArgs.inputFastq1),
-      new FastqReader(commandArgs.inputFastq2))
-
-    writeSyncedFastq(synced, counts,
-      // using 3000 for queue size to approximate NFS buffer
+      new FastqReader(commandArgs.inputFastq2),
       new AsyncFastqWriter(new BasicFastqWriter(commandArgs.outputFastq1), 3000),
-      new AsyncFastqWriter(new BasicFastqWriter(commandArgs.outputFastq2), 3000)
-    )
+      new AsyncFastqWriter(new BasicFastqWriter(commandArgs.outputFastq2), 3000))
+
+    println(s"Filtered $numDiscA reads from first read file.")
+    println(s"Filtered $numDiscB reads from second read file.")
+    println(s"Synced $numKept files contain %d reads.")
   }
 }
-- 
GitLab