Commit dfb1ece5 authored by Wai Yi Leung's avatar Wai Yi Leung
Browse files

Merge branch 'patch-fastq_sync_stream' into 'develop'

Patch fastq sync stream

The current implementation of `FastqSync` creates an intermediate Stream that is then passed to a writing function. Apparently, the `Stream` is not evaluated as lazily (or not in the way) as I thought. This led to weird StackOverflow errors (not yet documented).

This fix merges the Stream creation and writing function, so that no intermediate containers are created and no errors are encountered. Tests have been updated accordingly.

See merge request !95
parents 4731b21d a913c55e
......@@ -33,23 +33,25 @@ class FastqSync(val root: Configurable) extends BiopetJavaCommandLineFunction {
javaMainClass = getClass.getName
override val defaultVmem: String = "5G"
@Input(doc = "Original FASTQ file (read 1 or 2)", shortName = "r", required = true)
var refFastq: File = _
var refFastq: File = null
@Input(doc = "Input read 1 FASTQ file", shortName = "i", required = true)
var inputFastq1: File = _
var inputFastq1: File = null
@Input(doc = "Input read 2 FASTQ file", shortName = "j", required = true)
var inputFastq2: File = _
var inputFastq2: File = null
@Output(doc = "Output read 1 FASTQ file", shortName = "o", required = true)
var outputFastq1: File = _
var outputFastq1: File = null
@Output(doc = "Output read 2 FASTQ file", shortName = "p", required = true)
var outputFastq2: File = _
var outputFastq2: File = null
@Output(doc = "Sync statistics", required = true)
var outputStats: File = _
var outputStats: File = null
override val defaultVmem = "5G"
......@@ -92,25 +94,14 @@ class FastqSync(val root: Configurable) extends BiopetJavaCommandLineFunction {
object FastqSync extends ToolCommand {
/**
* Implicit class to allow for lazy retrieval of FastqRecord ID
* without any read pair mark
*
* @param fq FastqRecord
*/
/** Regex for capturing read ID ~ taking into account its read pair mark (if present) */
private val idRegex = "[_/][12]\\s??|\\s".r
/** Implicit class to allow for lazy retrieval of FastqRecord ID without any read pair mark */
private implicit class FastqPair(fq: FastqRecord) {
lazy val fragId = fq.getReadHeader.split("[_/][12]\\s??|\\s")(0)
lazy val fragId = idRegex.split(fq.getReadHeader)(0)
}
/**
* Counts from syncing FastqRecords
*
* @param numDiscard1 Number of reads discarded from the initial read 1
* @param numDiscard2 Number of reads discarded from the initial read 2
* @param numKept Number of items in result
*/
case class SyncCounts(numDiscard1: Int, numDiscard2: Int, numKept: Int)
/**
* Filters out FastqRecord that are not present in the input iterators, using
* a reference sequence object
......@@ -120,7 +111,8 @@ object FastqSync extends ToolCommand {
* @param seqB FastqReader over read 2
* @return
*/
def syncFastq(pre: FastqReader, seqA: FastqReader, seqB: FastqReader): (Stream[(FastqRecord, FastqRecord)], SyncCounts) = {
def syncFastq(pre: FastqReader, seqA: FastqReader, seqB: FastqReader,
seqOutA: AsyncFastqWriter, seqOutB: AsyncFastqWriter): (Long, Long, Long) = {
// counters for discarded A and B seqections + total kept
// NOTE: we are reasigning values to these variables in the recursion below
var (numDiscA, numDiscB, numKept) = (0, 0, 0)
......@@ -131,72 +123,50 @@ object FastqSync extends ToolCommand {
* @param pre Reference sequence, assumed to be a superset of both seqA and seqB
* @param seqA Sequence over read 1
* @param seqB Sequence over read 2
* @param acc Stream containing pairs which are present in read 1 and read 2
* @return
*/
@tailrec def syncIter(pre: Stream[FastqRecord],
seqA: Stream[FastqRecord], seqB: Stream[FastqRecord],
acc: Stream[(FastqRecord, FastqRecord)]): Stream[(FastqRecord, FastqRecord)] =
seqA: Stream[FastqRecord], seqB: Stream[FastqRecord]): Unit =
(pre.headOption, seqA.headOption, seqB.headOption) match {
// where the magic happens!
case (Some(r), Some(a), Some(b)) =>
val (nextA, nextB) = (a.fragId == r.fragId, b.fragId == r.fragId) match {
// all IDs are equal to ref
case (true, true) =>
numKept += 1
seqOutA.write(a)
seqOutB.write(b)
(seqA.tail, seqB.tail)
// B not equal to ref and A is equal, then we discard A and progress
case (true, false) =>
numDiscA += 1
(seqA.tail, seqB)
// A not equal to ref and B is equal, then we discard B and progress
case (false, true) =>
numDiscB += 1
(seqA, seqB.tail)
case (false, false) =>
(seqA, seqB)
}
syncIter(pre.tail, nextA, nextB)
// recursion base case: both iterators have been exhausted
case (_, None, None) => acc
case (_, None, None) => ;
// illegal state: reference sequence exhausted but not seqA or seqB
case (None, Some(_), _) | (None, _, Some(_)) =>
throw new NoSuchElementException("Reference record stream shorter than expected")
// keep recursion going if A still has items (we want to count how many)
case (_, _, None) =>
numDiscA += 1
syncIter(pre.tail, seqA.tail, Stream(), acc)
syncIter(pre.tail, seqA.tail, seqB)
// like above but for B
case (_, None, _) =>
numDiscB += 1
syncIter(pre.tail, Stream(), seqB.tail, acc)
// where the magic happens!
case (Some(r), Some(a), Some(b)) =>
// value of A iterator in the next recursion
val nextA =
// hold A if its head is not equal to reference
if (a.fragId != r.fragId) {
if (b.fragId == r.fragId) numDiscB += 1
seqA
// otherwise, go to next item
} else seqA.tail
// like A above
val nextB =
if (b.fragId != r.fragId) {
if (a.fragId == r.fragId) numDiscA += 1
seqB
} else seqB.tail
// value of accumulator in the next recursion
val nextAcc =
// keep accumulator unchanged if any of the two post streams
// have different elements compared to the reference stream
if (a.fragId != r.fragId || b.fragId != r.fragId) acc
// otherwise, grow accumulator
else {
numKept += 1
acc ++ Stream((a, b))
}
syncIter(pre.tail, nextA, nextB, nextAcc)
syncIter(pre.tail, seqA, seqB.tail)
}
(syncIter(pre.iterator.asScala.toStream, seqA.iterator.asScala.toStream, seqB.iterator.asScala.toStream,
Stream.empty[(FastqRecord, FastqRecord)]),
SyncCounts(numDiscA, numDiscB, numKept))
}
syncIter(pre.iterator.asScala.toStream, seqA.iterator.asScala.toStream, seqB.iterator.asScala.toStream)
def writeSyncedFastq(sync: Stream[(FastqRecord, FastqRecord)],
counts: SyncCounts,
outputFastq1: AsyncFastqWriter,
outputFastq2: AsyncFastqWriter): Unit = {
sync.foreach {
case (rec1, rec2) =>
outputFastq1.write(rec1)
outputFastq2.write(rec2)
}
println("Filtered %d reads from first read file.".format(counts.numDiscard1))
println("Filtered %d reads from second read file.".format(counts.numDiscard2))
println("Synced read files contain %d reads.".format(counts.numKept))
(numDiscA, numDiscB, numKept)
}
/** Function to merge this tool's summary with summaries from other objects */
......@@ -231,7 +201,6 @@ object FastqSync extends ToolCommand {
class OptParser extends AbstractOptParser {
// TODO: make output format independent from input format?
head(
s"""
|$commandName - Sync paired-end FASTQ files.
......@@ -281,15 +250,23 @@ object FastqSync extends ToolCommand {
val commandArgs: Args = parseArgs(args)
val (synced, counts) = syncFastq(
new FastqReader(commandArgs.refFastq),
new FastqReader(commandArgs.inputFastq1),
new FastqReader(commandArgs.inputFastq2))
writeSyncedFastq(synced, counts,
// using 3000 for queue size to approximate NFS buffer
new AsyncFastqWriter(new BasicFastqWriter(commandArgs.outputFastq1), 3000),
new AsyncFastqWriter(new BasicFastqWriter(commandArgs.outputFastq2), 3000)
)
val refReader = new FastqReader(commandArgs.refFastq)
val AReader = new FastqReader(commandArgs.inputFastq1)
val BReader = new FastqReader(commandArgs.inputFastq2)
val AWriter = new AsyncFastqWriter(new BasicFastqWriter(commandArgs.outputFastq1), 3000)
val BWriter = new AsyncFastqWriter(new BasicFastqWriter(commandArgs.outputFastq2), 3000)
try {
val (numDiscA, numDiscB, numKept) = syncFastq(refReader, AReader, BReader, AWriter, BWriter)
println(s"Filtered $numDiscA reads from first read file.")
println(s"Filtered $numDiscB reads from second read file.")
println(s"Synced files contain $numKept reads.")
} finally {
refReader.close()
AReader.close()
BReader.close()
AWriter.close()
BWriter.close()
}
}
}
......@@ -30,187 +30,229 @@ class FastqSyncTest extends TestNGSuite with MockitoSugar with Matchers {
.map(x => new FastqRecord(x, "A", "", "H"))
.toIterator.asJava
@DataProvider(name = "mockReaderProvider")
def mockReaderProvider() =
@DataProvider(name = "mockProvider")
def mockProvider() =
Array(
Array(mock[FastqReader], mock[FastqReader], mock[FastqReader]))
Array(mock[FastqReader], mock[FastqReader], mock[FastqReader],
mock[AsyncFastqWriter], mock[AsyncFastqWriter])
)
@Test(dataProvider = "mockProvider")
def testDefault(refMock: FastqReader, aMock: FastqReader, bMock: FastqReader,
aOutMock: AsyncFastqWriter, bOutMock: AsyncFastqWriter) = {
@Test(dataProvider = "mockReaderProvider")
def testDefault(refMock: FastqReader, aMock: FastqReader, bMock: FastqReader) = {
when(refMock.iterator) thenReturn recordsOver("1", "2", "3")
when(aMock.iterator) thenReturn recordsOver("1", "2", "3")
when(bMock.iterator) thenReturn recordsOver("1", "2", "3")
val obs = inOrd(aOutMock, bOutMock)
val exp = recordsOver("1", "2", "3").asScala.toSeq
val (numDiscard1, numDiscard2, numKept) = syncFastq(refMock, aMock, bMock, aOutMock, bOutMock)
obs.verify(aOutMock).write(exp(0))
obs.verify(bOutMock).write(exp(0))
obs.verify(aOutMock).write(exp(1))
obs.verify(bOutMock).write(exp(1))
obs.verify(aOutMock).write(exp(2))
obs.verify(bOutMock).write(exp(2))
val (sync, counts) = syncFastq(refMock, aMock, bMock)
sync.length shouldBe 3
sync(0) shouldBe (new FastqRecord("1", "A", "", "H"), new FastqRecord("1", "A", "", "H"))
sync(1) shouldBe (new FastqRecord("2", "A", "", "H"), new FastqRecord("2", "A", "", "H"))
sync(2) shouldBe (new FastqRecord("3", "A", "", "H"), new FastqRecord("3", "A", "", "H"))
counts.numDiscard1 shouldBe 0
counts.numDiscard2 shouldBe 0
counts.numKept shouldBe 3
numDiscard1 shouldBe 0
numDiscard2 shouldBe 0
numKept shouldBe 3
}
@Test(dataProvider = "mockReaderProvider")
def testRefTooShort(refMock: FastqReader, aMock: FastqReader, bMock: FastqReader) = {
@Test(dataProvider = "mockProvider")
def testRefTooShort(refMock: FastqReader, aMock: FastqReader, bMock: FastqReader,
aOutMock: AsyncFastqWriter, bOutMock: AsyncFastqWriter) = {
when(refMock.iterator) thenReturn recordsOver("1", "2")
when(aMock.iterator) thenReturn recordsOver("1", "2", "3")
when(bMock.iterator) thenReturn recordsOver("1", "2", "3")
val thrown = intercept[NoSuchElementException] {
syncFastq(refMock, aMock, bMock)
syncFastq(refMock, aMock, bMock, aOutMock, bOutMock)
}
thrown.getMessage should ===("Reference record stream shorter than expected")
}
@Test(dataProvider = "mockReaderProvider")
def testSeqAEmpty(refMock: FastqReader, aMock: FastqReader, bMock: FastqReader) = {
@Test(dataProvider = "mockProvider")
def testSeqAEmpty(refMock: FastqReader, aMock: FastqReader, bMock: FastqReader,
aOutMock: AsyncFastqWriter, bOutMock: AsyncFastqWriter) = {
when(refMock.iterator) thenReturn recordsOver("1", "2", "3")
when(aMock.iterator) thenReturn recordsOver()
when(bMock.iterator) thenReturn recordsOver("1", "2", "3")
val (sync, counts) = syncFastq(refMock, aMock, bMock)
sync.length shouldBe 0
counts.numDiscard1 shouldBe 0
counts.numDiscard2 shouldBe 3
counts.numKept shouldBe 0
val (numDiscard1, numDiscard2, numKept) = syncFastq(refMock, aMock, bMock, aOutMock, bOutMock)
numDiscard1 shouldBe 0
numDiscard2 shouldBe 3
numKept shouldBe 0
}
@Test(dataProvider = "mockReaderProvider")
def testSeqBEmpty(refMock: FastqReader, aMock: FastqReader, bMock: FastqReader) = {
@Test(dataProvider = "mockProvider")
def testSeqBEmpty(refMock: FastqReader, aMock: FastqReader, bMock: FastqReader,
aOutMock: AsyncFastqWriter, bOutMock: AsyncFastqWriter) = {
when(refMock.iterator) thenReturn recordsOver("1", "2", "3")
when(aMock.iterator) thenReturn recordsOver("1", "2", "3")
when(bMock.iterator) thenReturn recordsOver()
val (sync, counts) = syncFastq(refMock, aMock, bMock)
sync.length shouldBe 0
counts.numDiscard1 shouldBe 3
counts.numDiscard2 shouldBe 0
counts.numKept shouldBe 0
val (numDiscard1, numDiscard2, numKept) = syncFastq(refMock, aMock, bMock, aOutMock, bOutMock)
numDiscard1 shouldBe 3
numDiscard2 shouldBe 0
numKept shouldBe 0
}
@Test(dataProvider = "mockReaderProvider")
def testSeqAShorter(refMock: FastqReader, aMock: FastqReader, bMock: FastqReader) = {
@Test(dataProvider = "mockProvider")
def testSeqAShorter(refMock: FastqReader, aMock: FastqReader, bMock: FastqReader,
aOutMock: AsyncFastqWriter, bOutMock: AsyncFastqWriter) = {
when(refMock.iterator) thenReturn recordsOver("1", "2", "3")
when(aMock.iterator) thenReturn recordsOver("2", "3")
when(bMock.iterator) thenReturn recordsOver("1", "2", "3")
val obs = inOrd(aOutMock, bOutMock)
val exp = recordsOver("1", "2", "3").asScala.toSeq
val (numDiscard1, numDiscard2, numKept) = syncFastq(refMock, aMock, bMock, aOutMock, bOutMock)
// exp(0) is discarded by syncFastq
obs.verify(aOutMock).write(exp(1))
obs.verify(bOutMock).write(exp(1))
val (sync, counts) = syncFastq(refMock, aMock, bMock)
sync.length shouldBe 2
sync(0) shouldBe (new FastqRecord("2", "A", "", "H"), new FastqRecord("2", "A", "", "H"))
sync(1) shouldBe (new FastqRecord("3", "A", "", "H"), new FastqRecord("3", "A", "", "H"))
counts.numDiscard1 shouldBe 0
counts.numDiscard2 shouldBe 1
counts.numKept shouldBe 2
obs.verify(aOutMock).write(exp(2))
obs.verify(bOutMock).write(exp(2))
numDiscard1 shouldBe 0
numDiscard2 shouldBe 1
numKept shouldBe 2
}
@Test(dataProvider = "mockReaderProvider")
def testSeqBShorter(refMock: FastqReader, aMock: FastqReader, bMock: FastqReader) = {
@Test(dataProvider = "mockProvider")
def testSeqBShorter(refMock: FastqReader, aMock: FastqReader, bMock: FastqReader,
aOutMock: AsyncFastqWriter, bOutMock: AsyncFastqWriter) = {
when(refMock.iterator) thenReturn recordsOver("1", "2", "3")
when(aMock.iterator) thenReturn recordsOver("2", "3")
when(bMock.iterator) thenReturn recordsOver("1", "2", "3")
when(aMock.iterator) thenReturn recordsOver("1", "2", "3")
when(bMock.iterator) thenReturn recordsOver("1", "3")
val obs = inOrd(aOutMock, bOutMock)
val exp = recordsOver("1", "2", "3").asScala.toSeq
val (numDiscard1, numDiscard2, numKept) = syncFastq(refMock, aMock, bMock, aOutMock, bOutMock)
val (sync, counts) = syncFastq(refMock, aMock, bMock)
sync.length shouldBe 2
sync(0) shouldBe (new FastqRecord("2", "A", "", "H"), new FastqRecord("2", "A", "", "H"))
sync(1) shouldBe (new FastqRecord("3", "A", "", "H"), new FastqRecord("3", "A", "", "H"))
counts.numDiscard1 shouldBe 0
counts.numDiscard2 shouldBe 1
counts.numKept shouldBe 2
// exp(1) is discarded by syncFastq
obs.verify(aOutMock).write(exp(0))
obs.verify(bOutMock).write(exp(0))
obs.verify(aOutMock).write(exp(2))
obs.verify(bOutMock).write(exp(2))
numDiscard1 shouldBe 1
numDiscard2 shouldBe 0
numKept shouldBe 2
}
@Test(dataProvider = "mockReaderProvider")
def testSeqABShorter(refMock: FastqReader, aMock: FastqReader, bMock: FastqReader) = {
@Test(dataProvider = "mockProvider")
def testSeqABShorter(refMock: FastqReader, aMock: FastqReader, bMock: FastqReader,
aOutMock: AsyncFastqWriter, bOutMock: AsyncFastqWriter) = {
when(refMock.iterator) thenReturn recordsOver("1", "2", "3")
when(aMock.iterator) thenReturn recordsOver("2", "3")
when(bMock.iterator) thenReturn recordsOver("1", "2")
val obs = inOrd(aOutMock, bOutMock)
val exp = recordsOver("1", "2", "3").asScala.toSeq
val (numDiscard1, numDiscard2, numKept) = syncFastq(refMock, aMock, bMock, aOutMock, bOutMock)
// exp(0) and exp(2) are discarded by syncFastq
obs.verify(aOutMock).write(exp(1))
obs.verify(bOutMock).write(exp(1))
val (sync, counts) = syncFastq(refMock, aMock, bMock)
sync.length shouldBe 1
sync(0) shouldBe (new FastqRecord("2", "A", "", "H"), new FastqRecord("2", "A", "", "H"))
counts.numDiscard1 shouldBe 1
counts.numDiscard2 shouldBe 1
counts.numKept shouldBe 1
numDiscard1 shouldBe 1
numDiscard2 shouldBe 1
numKept shouldBe 1
}
@Test(dataProvider = "mockReaderProvider")
def testSeqABShorterPairMarkSlash(refMock: FastqReader, aMock: FastqReader, bMock: FastqReader) = {
@Test(dataProvider = "mockProvider")
def testSeqABShorterPairMarkSlash(refMock: FastqReader, aMock: FastqReader, bMock: FastqReader,
aOutMock: AsyncFastqWriter, bOutMock: AsyncFastqWriter) = {
when(refMock.iterator) thenReturn recordsOver("1/1", "2/1", "3/1")
when(aMock.iterator) thenReturn recordsOver("2/1", "3/1")
when(bMock.iterator) thenReturn recordsOver("1/2", "2/2")
val obs = inOrd(aOutMock, bOutMock)
val (numDiscard1, numDiscard2, numKept) = syncFastq(refMock, aMock, bMock, aOutMock, bOutMock)
obs.verify(aOutMock).write(new FastqRecord("2/1", "A", "", "H"))
obs.verify(bOutMock).write(new FastqRecord("2/2", "A", "", "H"))
val (sync, counts) = syncFastq(refMock, aMock, bMock)
sync.length shouldBe 1
sync(0) shouldBe (new FastqRecord("2/1", "A", "", "H"), new FastqRecord("2/2", "A", "", "H"))
counts.numDiscard1 shouldBe 1
counts.numDiscard2 shouldBe 1
counts.numKept shouldBe 1
numDiscard1 shouldBe 1
numDiscard2 shouldBe 1
numKept shouldBe 1
}
@Test(dataProvider = "mockReaderProvider")
def testSeqABShorterPairMarkUnderscore(refMock: FastqReader, aMock: FastqReader, bMock: FastqReader) = {
@Test(dataProvider = "mockProvider")
def testSeqABShorterPairMarkUnderscore(refMock: FastqReader, aMock: FastqReader, bMock: FastqReader,
aOutMock: AsyncFastqWriter, bOutMock: AsyncFastqWriter) = {
when(refMock.iterator) thenReturn recordsOver("1_1", "2_1", "3_1")
when(aMock.iterator) thenReturn recordsOver("2_1", "3_1")
when(bMock.iterator) thenReturn recordsOver("1_2", "2_2")
val obs = inOrd(aOutMock, bOutMock)
val (numDiscard1, numDiscard2, numKept) = syncFastq(refMock, aMock, bMock, aOutMock, bOutMock)
val (sync, counts) = syncFastq(refMock, aMock, bMock)
sync.length shouldBe 1
sync(0) shouldBe (new FastqRecord("2_1", "A", "", "H"), new FastqRecord("2_2", "A", "", "H"))
counts.numDiscard1 shouldBe 1
counts.numDiscard2 shouldBe 1
counts.numKept shouldBe 1
obs.verify(aOutMock).write(new FastqRecord("2_1", "A", "", "H"))
obs.verify(bOutMock).write(new FastqRecord("2_2", "A", "", "H"))
numDiscard1 shouldBe 1
numDiscard2 shouldBe 1
numKept shouldBe 1
}
@Test(dataProvider = "mockReaderProvider")
def testSeqABShorterWithDesc(refMock: FastqReader, aMock: FastqReader, bMock: FastqReader) = {
@Test(dataProvider = "mockProvider")
def testSeqABShorterWithDescription(refMock: FastqReader, aMock: FastqReader, bMock: FastqReader,
aOutMock: AsyncFastqWriter, bOutMock: AsyncFastqWriter) = {
when(refMock.iterator) thenReturn recordsOver("1 desc1b", "2 desc2b", "3 desc3b")
when(aMock.iterator) thenReturn recordsOver("2 desc2a", "3 desc3a")
when(bMock.iterator) thenReturn recordsOver("1 desc1b", "2 desc2b")
val obs = inOrd(aOutMock, bOutMock)
val (numDiscard1, numDiscard2, numKept) = syncFastq(refMock, aMock, bMock, aOutMock, bOutMock)
val (sync, counts) = syncFastq(refMock, aMock, bMock)
sync.length shouldBe 1
sync(0) shouldBe (new FastqRecord("2 desc2a", "A", "", "H"), new FastqRecord("2 desc2b", "A", "", "H"))
counts.numDiscard1 shouldBe 1
counts.numDiscard2 shouldBe 1
counts.numKept shouldBe 1
obs.verify(aOutMock).write(new FastqRecord("2 desc2a", "A", "", "H"))
obs.verify(bOutMock).write(new FastqRecord("2 desc2b", "A", "", "H"))
numDiscard1 shouldBe 1
numDiscard2 shouldBe 1
numKept shouldBe 1
}
@Test(dataProvider = "mockReaderProvider")
def testComplex(refMock: FastqReader, aMock: FastqReader, bMock: FastqReader) = {
@Test(dataProvider = "mockProvider")
def testComplex(refMock: FastqReader, aMock: FastqReader, bMock: FastqReader,
aOutMock: AsyncFastqWriter, bOutMock: AsyncFastqWriter) = {
when(refMock.iterator) thenReturn recordsOver("1/2 yep", "2/2 yep", "3/2 yep", "4/2 yep", "5/2 yep")
when(aMock.iterator) thenReturn recordsOver("1/1 yep", "2/1 yep", "4/1 yep")
when(bMock.iterator) thenReturn recordsOver("1/2 yep", "3/2 yep", "4/2 yep")
val obs = inOrd(aOutMock, bOutMock)
val (sync, counts) = syncFastq(refMock, aMock, bMock)
sync.length shouldBe 2
sync(0) shouldBe (new FastqRecord("1/1 yep", "A", "", "H"), new FastqRecord("1/2 yep", "A", "", "H"))
sync(1) shouldBe (new FastqRecord("4/1 yep", "A", "", "H"), new FastqRecord("4/2 yep", "A", "", "H"))
counts.numDiscard1 shouldBe 1
counts.numDiscard2 shouldBe 1
counts.numKept shouldBe 2
}
val (numDiscard1, numDiscard2, numKept) = syncFastq(refMock, aMock, bMock, aOutMock, bOutMock)
@Test def testWriteSynced() = {
val aMock = mock[AsyncFastqWriter]
val bMock = mock[AsyncFastqWriter]
val sync = Stream(
(new FastqRecord("1", "A", "", "H"), new FastqRecord("1", "T", "", "E")),
(new FastqRecord("2", "A", "", "H"), new FastqRecord("2", "T", "", "E")))
val counts = SyncCounts(4, 3, 2)
val obs = inOrd(aMock, bMock)
val stdout = new java.io.ByteArrayOutputStream
Console.withOut(stdout) {
writeSyncedFastq(sync, counts, aMock, bMock)
}
stdout.toString should ===(List(
"Filtered 4 reads from first read file.",
"Filtered 3 reads from second read file.",
"Synced read files contain 2 reads.\n"
).mkString("\n"))
obs.verify(aMock).write(new FastqRecord("1", "A", "", "H"))
obs.verify(bMock).write(new FastqRecord("1", "T", "", "E"))