Commit bc9b4ce6 authored by bow's avatar bow
Browse files

Bypass intermediate Stream creation in FastqSync to fix StackOverflowError

parent 9e88effb
/**
* Copyright (c) 2014 Leiden University Medical Center - Sequencing Analysis Support Core <sasc@lumc.nl>
* Copyright (c) 2014 Leiden University Medical Center - Streamuencing Analysis Support Core <sasc@lumc.nl>
* @author Wibowo Arindrarto <w.arindrarto@lumc.nl>
*
* This tool is a port of a Python implementation written by Martijn Vermaat[1]
......@@ -90,25 +90,11 @@ class FastqSync(val root: Configurable) extends BiopetJavaCommandLineFunction {
object FastqSync extends ToolCommand {
/**
* Implicit class to allow for lazy retrieval of FastqRecord ID
* without any read pair mark
*
* @param fq FastqRecord
*/
/** Implicit class to allow for lazy retrieval of FastqRecord ID without any read pair mark */
private implicit class FastqPair(fq: FastqRecord) {
lazy val fragId = fq.getReadHeader.split("[_/][12]\\s??|\\s")(0)
}
/**
* Counts from syncing FastqRecords
*
* @param numDiscard1 Number of reads discarded from the initial read 1
* @param numDiscard2 Number of reads discarded from the initial read 2
* @param numKept Number of items in result
*/
case class SyncCounts(numDiscard1: Int, numDiscard2: Int, numKept: Int)
/**
* Filters out FastqRecord that are not present in the input iterators, using
* a reference sequence object
......@@ -118,7 +104,8 @@ object FastqSync extends ToolCommand {
* @param seqB FastqReader over read 2
* @return
*/
def syncFastq(pre: FastqReader, seqA: FastqReader, seqB: FastqReader): (Stream[(FastqRecord, FastqRecord)], SyncCounts) = {
def syncFastq(pre: FastqReader, seqA: FastqReader, seqB: FastqReader,
seqOutA: AsyncFastqWriter, seqOutB: AsyncFastqWriter): (Long, Long, Long) = {
// counters for discarded A and B seqections + total kept
// NOTE: we are reasigning values to these variables in the recursion below
var (numDiscA, numDiscB, numKept) = (0, 0, 0)
......@@ -129,26 +116,11 @@ object FastqSync extends ToolCommand {
* @param pre Reference sequence, assumed to be a superset of both seqA and seqB
* @param seqA Sequence over read 1
* @param seqB Sequence over read 2
* @param acc Stream containing pairs which are present in read 1 and read 2
* @return
*/
@tailrec def syncIter(pre: Stream[FastqRecord],
seqA: Stream[FastqRecord], seqB: Stream[FastqRecord],
acc: Stream[(FastqRecord, FastqRecord)]): Stream[(FastqRecord, FastqRecord)] =
seqA: Stream[FastqRecord], seqB: Stream[FastqRecord]): Unit =
(pre.headOption, seqA.headOption, seqB.headOption) match {
// recursion base case: both iterators have been exhausted
case (_, None, None) => acc
// illegal state: reference sequence exhausted but not seqA or seqB
case (None, Some(_), _) | (None, _, Some(_)) =>
throw new NoSuchElementException("Reference record stream shorter than expected")
// keep recursion going if A still has items (we want to count how many)
case (_, _, None) =>
numDiscA += 1
syncIter(pre.tail, seqA.tail, Stream(), acc)
// like above but for B
case (_, None, _) =>
numDiscB += 1
syncIter(pre.tail, Stream(), seqB.tail, acc)
// where the magic happens!
case (Some(r), Some(a), Some(b)) =>
// value of A iterator in the next recursion
......@@ -165,36 +137,31 @@ object FastqSync extends ToolCommand {
if (a.fragId == r.fragId) numDiscA += 1
seqB
} else seqB.tail
// value of accumulator in the next recursion
val nextAcc =
// keep accumulator unchanged if any of the two post streams
// have different elements compared to the reference stream
if (a.fragId != r.fragId || b.fragId != r.fragId) acc
// otherwise, grow accumulator
else {
numKept += 1
acc ++ Stream((a, b))
}
syncIter(pre.tail, nextA, nextB, nextAcc)
// write only if all IDs are equal
if (a.fragId == r.fragId && b.fragId == r.fragId) {
numKept += 1
seqOutA.write(a)
seqOutB.write(b)
}
syncIter(pre.tail, nextA, nextB)
// recursion base case: both iterators have been exhausted
case (_, None, None) => ;
// illegal state: reference sequence exhausted but not seqA or seqB
case (None, Some(_), _) | (None, _, Some(_)) =>
throw new NoSuchElementException("Reference record stream shorter than expected")
// keep recursion going if A still has items (we want to count how many)
case (_, _, None) =>
numDiscA += 1
syncIter(pre.tail, seqA.tail, seqB)
// like above but for B
case (_, None, _) =>
numDiscB += 1
syncIter(pre.tail, seqA, seqB.tail)
}
(syncIter(pre.iterator.asScala.toStream, seqA.iterator.asScala.toStream, seqB.iterator.asScala.toStream,
Stream.empty[(FastqRecord, FastqRecord)]),
SyncCounts(numDiscA, numDiscB, numKept))
}
syncIter(pre.iterator.asScala.toStream, seqA.iterator.asScala.toStream, seqB.iterator.asScala.toStream)
def writeSyncedFastq(sync: Stream[(FastqRecord, FastqRecord)],
counts: SyncCounts,
outputFastq1: AsyncFastqWriter,
outputFastq2: AsyncFastqWriter): Unit = {
sync.foreach {
case (rec1, rec2) =>
outputFastq1.write(rec1)
outputFastq2.write(rec2)
}
println("Filtered %d reads from first read file.".format(counts.numDiscard1))
println("Filtered %d reads from second read file.".format(counts.numDiscard2))
println("Synced read files contain %d reads.".format(counts.numKept))
(numDiscA, numDiscB, numKept)
}
/** Function to merge this tool's summary with summaries from other objects */
......@@ -229,7 +196,6 @@ object FastqSync extends ToolCommand {
class OptParser extends AbstractOptParser {
// TODO: make output format independent from input format?
head(
s"""
|$commandName - Sync paired-end FASTQ files.
......@@ -279,15 +245,15 @@ object FastqSync extends ToolCommand {
val commandArgs: Args = parseArgs(args)
val (synced, counts) = syncFastq(
val (numDiscA, numDiscB, numKept) = syncFastq(
new FastqReader(commandArgs.refFastq),
new FastqReader(commandArgs.inputFastq1),
new FastqReader(commandArgs.inputFastq2))
writeSyncedFastq(synced, counts,
// using 3000 for queue size to approximate NFS buffer
new FastqReader(commandArgs.inputFastq2),
new AsyncFastqWriter(new BasicFastqWriter(commandArgs.outputFastq1), 3000),
new AsyncFastqWriter(new BasicFastqWriter(commandArgs.outputFastq2), 3000)
)
new AsyncFastqWriter(new BasicFastqWriter(commandArgs.outputFastq2), 3000))
println(s"Filtered $numDiscA reads from first read file.")
println(s"Filtered $numDiscB reads from second read file.")
println(s"Synced $numKept files contain %d reads.")
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment