Commit 9d5bc538 authored by Wai Yi Leung's avatar Wai Yi Leung
Browse files

Merge branch 'feature-fastq_sync' into 'develop'

Feature fastq sync

For issue #35 :).

See merge request !75
parents 9691531b a08964e0
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="EntryPointsManager">
<entry_points version="2.0" />
</component>
<component name="MavenProjectsManager">
<option name="originalFiles">
<list>
......
#!/usr/bin/env python
#
# Biopet is built on top of GATK Queue for building bioinformatic
# pipelines. It is mainly intended to support LUMC SHARK cluster which is running
# SGE. But other types of HPC that are supported by GATK Queue (such as PBS)
# should also be able to execute Biopet tools and pipelines.
#
# Copyright 2014 Sequencing Analysis Support Core - Leiden University Medical Center
#
# Contact us at: sasc@lumc.nl
#
# A dual licensing mode is applied. The source code within this project that are
# not part of GATK Queue is freely available for non-commercial use under an AGPL
# license; For commercial users or users who do not want to follow the AGPL
# license, please contact us to obtain a separate license.
#
"""
(Re-)sync two filtered paired end FASTQ files.
Given two filtered paired end read files and one of the original read files,
re-sync the filtered reads by filtering out anything that is only present in
one of the two files.
Usage:
{command} <orig.fq> <reads_1.fq> <reads_2.fq> \\
<reads_1.synced.fq> <reads_2.synced.fq>
The synced reads are written to disk as <reads_1.synced.fq> and
<reads_2.synced.fq>. Afterwards some counts are printed.
Both Illumina old-style and new-style paired-end header lines are supported.
The original read file is used to speed up processing: it contains all
possible reads from both edited reads (in all files in the same order) so it
can process all files line by line, not having to read a single file in
memory. Some ideas were taken from [1].
[1] https://gist.github.com/588841/
2011-11-03, Martijn Vermaat <m.vermaat.hg@lumc.nl>
"""
import sys
import re
def sync_paired_end_reads(original, reads_a, reads_b, synced_a, synced_b):
"""
Filter out reads from two paired end read files that are not present in
both of them. Do this in a reasonable amount of time by using a file
containing all of the reads for one of the paired ends.
All arguments are open file handles.
@arg original: File containing all original reads for one of the paired
ends.
@arg reads_a: First from paired end read files.
@arg reads_b: Second from paired end read files.
@arg synced_a: Filtered reads from first paired end read file.
@arg synced_b: Filtered reads from second paired end read file.
@return: Triple (filtered_a, filtered_b, kept) containing counts
of the number of reads filtered from both input files and
the total number of reads kept in the synced results.
@todo: Print warnings if obvious things are not right (a or b still has
lines after original is processed).
"""
# This matches 1, 2, or 3 preceded by / _ or whitespace. Its rightmost
# match in a header line is used to identify the read pair.
sep = re.compile('[\s_/][123]')
def next_record(fh):
return [fh.readline().strip() for i in range(4)]
def head(record):
return sep.split(record[0])[:-1]
headers = (sep.split(x.strip())[:-1] for i, x in enumerate(original)
if not (i % 4))
filtered_a = filtered_b = kept = 0
a, b = next_record(reads_a), next_record(reads_b)
for header in headers:
if header == head(a) and head(b) != header:
a = next_record(reads_a)
filtered_a += 1
if header == head(b) and head(a) != header:
b = next_record(reads_b)
filtered_b += 1
if header == head(a) == head(b):
print >>synced_a, '\n'.join(a)
print >>synced_b, '\n'.join(b)
a, b = next_record(reads_a), next_record(reads_b)
kept += 1
return filtered_a, filtered_b, kept
if __name__ == '__main__':
if len(sys.argv) < 6:
sys.stderr.write(__doc__.split('\n\n\n')[0].strip().format(
command=sys.argv[0]) + '\n')
sys.exit(1)
try:
original = open(sys.argv[1], 'r')
reads_a = open(sys.argv[2], 'r')
reads_b = open(sys.argv[3], 'r')
synced_a = open(sys.argv[4], 'w')
synced_b = open(sys.argv[5], 'w')
filtered_a, filtered_b, kept = \
sync_paired_end_reads(original, reads_a, reads_b,
synced_a, synced_b)
print 'Filtered %i reads from first read file.' % filtered_a
print 'Filtered %i reads from second read file.' % filtered_b
print 'Synced read files contain %i reads.' % kept
except IOError as (_, message):
sys.stderr.write('Error: %s\n' % message)
sys.exit(1)
/**
* Biopet is built on top of GATK Queue for building bioinformatic
* pipelines. It is mainly intended to support LUMC SHARK cluster which is running
* SGE. But other types of HPC that are supported by GATK Queue (such as PBS)
* should also be able to execute Biopet tools and pipelines.
*
* Copyright 2014 Sequencing Analysis Support Core - Leiden University Medical Center
*
* Contact us at: sasc@lumc.nl
*
* A dual licensing mode is applied. The source code within this project that are
* not part of GATK Queue is freely available for non-commercial use under an AGPL
* license; For commercial users or users who do not want to follow the AGPL
* license, please contact us to obtain a separate license.
*/
package nl.lumc.sasc.biopet.scripts
import java.io.File
import org.broadinstitute.gatk.utils.commandline.{ Input, Output }
import argonaut._, Argonaut._
import scalaz._, Scalaz._
import nl.lumc.sasc.biopet.core.config.Configurable
import nl.lumc.sasc.biopet.extensions.PythonCommandLineFunction
import scala.io.Source
class FastqSync(val root: Configurable) extends PythonCommandLineFunction {
setPythonScript("sync_paired_end_reads.py")
@Input(doc = "Start fastq")
var input_start_fastq: File = _
@Input(doc = "R1 input")
var input_R1: File = _
@Input(doc = "R2 input")
var input_R2: File = _
@Output(doc = "R1 output")
var output_R1: File = _
@Output(doc = "R2 output")
var output_R2: File = _
//No output Annotation so file
var output_stats: File = _
def cmdLine = {
getPythonCommand +
required(input_start_fastq) +
required(input_R1) +
required(input_R2) +
required(output_R1) +
required(output_R2) +
" > " +
required(output_stats)
}
def getSummary: Json = {
val R1_filteredR = """Filtered (\d*) reads from first read file.""".r
val R2_filteredR = """Filtered (\d*) reads from second read file.""".r
val readsLeftR = """Synced read files contain (\d*) reads.""".r
var R1_filtered = 0
var R2_filtered = 0
var readsLeft = 0
if (output_stats.exists) for (line <- Source.fromFile(output_stats).getLines) {
line match {
case R1_filteredR(m) => R1_filtered = m.toInt
case R2_filteredR(m) => R2_filtered = m.toInt
case readsLeftR(m) => readsLeft = m.toInt
case _ =>
}
}
return ("num_reads_discarded_R1" := R1_filtered) ->:
("num_reads_discarded_R2" := R2_filtered) ->:
("num_reads_kept" := readsLeft) ->:
jEmptyObject
}
}
object FastqSync {
def apply(root: Configurable, input_start_fastq: File, input_R1: File, input_R2: File,
output_R1: File, output_R2: File, output_stats: File): FastqSync = {
val fastqSync = new FastqSync(root)
fastqSync.input_start_fastq = input_start_fastq
fastqSync.input_R1 = input_R1
fastqSync.input_R2 = input_R2
fastqSync.output_R1 = output_R1
fastqSync.output_R2 = output_R2
fastqSync.output_stats = output_stats
return fastqSync
}
def mergeSummaries(jsons: List[Json]): Json = {
var R1_filtered = 0
var R2_filtered = 0
var readsLeft = 0
for (json <- jsons) {
R1_filtered += json.field("num_reads_discarded_R1").get.numberOrZero.toInt
R2_filtered += json.field("num_reads_discarded_R2").get.numberOrZero.toInt
readsLeft += json.field("num_reads_kept").get.numberOrZero.toInt
}
return ("num_reads_discarded_R1" := R1_filtered) ->:
("num_reads_discarded_R2" := R2_filtered) ->:
("num_reads_kept" := readsLeft) ->:
jEmptyObject
}
}
\ No newline at end of file
/**
* Copyright (c) 2014 Leiden University Medical Center - Sequencing Analysis Support Core <sasc@lumc.nl>
* @author Wibowo Arindrarto <w.arindrarto@lumc.nl>
*
* This tool is a port of a Python implementation written by Martijn Vermaat[1]
*
* [1] https://github.com/martijnvermaat/bio-playground/blob/master/sync-paired-end-reads/sync_paired_end_reads.py
*/
package nl.lumc.sasc.biopet.tools
import java.io.File
import scala.io.Source
import scala.util.matching.Regex
import scala.annotation.tailrec
import scala.collection.JavaConverters._
import argonaut._, Argonaut._
import scalaz._, Scalaz._
import htsjdk.samtools.fastq.{ AsyncFastqWriter, BasicFastqWriter, FastqReader, FastqRecord }
import org.broadinstitute.gatk.utils.commandline.{ Input, Output }
import nl.lumc.sasc.biopet.core.BiopetJavaCommandLineFunction
import nl.lumc.sasc.biopet.core.ToolCommand
import nl.lumc.sasc.biopet.core.config.Configurable
/**
* FastqSync function class for usage in Biopet pipelines
*
* @param root Configuration object for the pipeline
*/
class FastqSync(val root: Configurable) extends BiopetJavaCommandLineFunction {
javaMainClass = getClass.getName
@Input(doc = "Original FASTQ file (read 1 or 2)", shortName = "r", required = true)
var refFastq: File = _
@Input(doc = "Input read 1 FASTQ file", shortName = "i", required = true)
var inputFastq1: File = _
@Input(doc = "Input read 2 FASTQ file", shortName = "j", required = true)
var inputFastq2: File = _
@Output(doc = "Output read 1 FASTQ file", shortName = "o", required = true)
var outputFastq1: File = _
@Output(doc = "Output read 2 FASTQ file", shortName = "p", required = true)
var outputFastq2: File = _
@Output(doc = "Sync statistics", required = true)
var outputStats: File = _
// executed command line
override def commandLine =
super.commandLine +
required("-r", refFastq) +
required("-i", inputFastq1) +
required("-j", inputFastq2) +
required("-o", outputFastq1) +
required("-p", outputFastq2) + " > " +
required(outputStats)
// summary statistics
def summary: Json = {
val regex = new Regex("""Filtered (\d*) reads from first read file.
|Filtered (\d*) reads from second read file.
|Synced read files contain (\d*) reads.""".stripMargin,
"R1", "R2", "RL")
val (countFilteredR1, countFilteredR2, countRLeft) =
if (outputStats.exists) {
val text = Source
.fromFile(outputStats)
.getLines()
.mkString("\n")
regex.findFirstMatchIn(text) match {
case None => (0, 0, 0)
case Some(rmatch) => (rmatch.group("R1").toInt, rmatch.group("R2").toInt, rmatch.group("RL").toInt)
}
} else (0, 0, 0)
("num_reads_discarded_R1" := countFilteredR1) ->:
("num_reads_discarded_R2" := countFilteredR2) ->:
("num_reads_kept" := countRLeft) ->:
jEmptyObject
}
}
object FastqSync extends ToolCommand {
/**
* Implicit class to allow for lazy retrieval of FastqRecord ID
* without any read pair mark
*
* @param fq FastqRecord
*/
private implicit class FastqPair(fq: FastqRecord) {
lazy val fragId = fq.getReadHeader.split("[_/][12]\\s??|\\s")(0)
}
/**
* Counts from syncing FastqRecords
*
* @param numDiscard1 Number of reads discarded from the initial read 1
* @param numDiscard2 Number of reads discarded from the initial read 2
* @param numKept Number of items in result
*/
case class SyncCounts(numDiscard1: Int, numDiscard2: Int, numKept: Int)
/**
* Filters out FastqRecord that are not present in the input iterators, using
* a reference sequence object
*
* @param pre FastqReader over reference FASTQ file
* @param seqA FastqReader over read 1
* @param seqB FastqReader over read 2
* @return
*/
def syncFastq(pre: FastqReader, seqA: FastqReader, seqB: FastqReader): (Stream[(FastqRecord, FastqRecord)], SyncCounts) = {
// counters for discarded A and B seqections + total kept
// NOTE: we are reasigning values to these variables in the recursion below
var (numDiscA, numDiscB, numKept) = (0, 0, 0)
/**
* Syncs read pairs recursively
*
* @param pre Reference sequence, assumed to be a superset of both seqA and seqB
* @param seqA Sequence over read 1
* @param seqB Sequence over read 2
* @param acc Stream containing pairs which are present in read 1 and read 2
* @return
*/
@tailrec def syncIter(pre: Stream[FastqRecord],
seqA: Stream[FastqRecord], seqB: Stream[FastqRecord],
acc: Stream[(FastqRecord, FastqRecord)]): Stream[(FastqRecord, FastqRecord)] =
(pre.headOption, seqA.headOption, seqB.headOption) match {
// recursion base case: both iterators have been exhausted
case (_, None, None) => acc
// illegal state: reference sequence exhausted but not seqA or seqB
case (None, Some(_), _) | (None, _, Some(_)) =>
throw new NoSuchElementException("Reference record stream shorter than expected")
// keep recursion going if A still has items (we want to count how many)
case (_, _, None) =>
numDiscA += 1
syncIter(pre.tail, seqA.tail, Stream(), acc)
// like above but for B
case (_, None, _) =>
numDiscB += 1
syncIter(pre.tail, Stream(), seqB.tail, acc)
// where the magic happens!
case (Some(r), Some(a), Some(b)) =>
// value of A iterator in the next recursion
val nextA =
// hold A if its head is not equal to reference
if (a.fragId != r.fragId) {
if (b.fragId == r.fragId) numDiscB += 1
seqA
// otherwise, go to next item
} else seqA.tail
// like A above
val nextB =
if (b.fragId != r.fragId) {
if (a.fragId == r.fragId) numDiscA += 1
seqB
} else seqB.tail
// value of accumulator in the next recursion
val nextAcc =
// keep accumulator unchanged if any of the two post streams
// have different elements compared to the reference stream
if (a.fragId != r.fragId || b.fragId != r.fragId) acc
// otherwise, grow accumulator
else {
numKept += 1
acc ++ Stream((a, b))
}
syncIter(pre.tail, nextA, nextB, nextAcc)
}
(syncIter(pre.iterator.asScala.toStream, seqA.iterator.asScala.toStream, seqB.iterator.asScala.toStream,
Stream.empty[(FastqRecord, FastqRecord)]),
SyncCounts(numDiscA, numDiscB, numKept))
}
def writeSyncedFastq(sync: Stream[(FastqRecord, FastqRecord)],
counts: SyncCounts,
outputFastq1: AsyncFastqWriter,
outputFastq2: AsyncFastqWriter): Unit = {
sync.foreach {
case (rec1, rec2) =>
outputFastq1.write(rec1)
outputFastq2.write(rec2)
}
println("Filtered %d reads from first read file.".format(counts.numDiscard1))
println("Filtered %d reads from second read file.".format(counts.numDiscard2))
println("Synced read files contain %d reads.".format(counts.numKept))
}
/** Function to merge this tool's summary with summaries from other objects */
// TODO: refactor this into the object? At least make it work on the summary object
def mergeSummaries(jsons: List[Json]): Json = {
val (read1FilteredCount, read2FilteredCount, readsLeftCount) = jsons
// extract the values we require from each JSON object into tuples
.map {
case json =>
(json.field("num_reads_discarded_R1").get.numberOrZero.toInt,
json.field("num_reads_discarded_R2").get.numberOrZero.toInt,
json.field("num_reads_kept").get.numberOrZero.toInt)
}
// reduce the tuples
.reduceLeft {
(x: (Int, Int, Int), y: (Int, Int, Int)) =>
(x._1 + y._1, x._2 + y._2, x._3 + y._3)
}
("num_reads_discarded_R1" := read1FilteredCount) ->:
("num_reads_discarded_R2" := read2FilteredCount) ->:
("num_reads_kept" := readsLeftCount) ->:
jEmptyObject
}
case class Args(refFastq: File = new File(""),
inputFastq1: File = new File(""),
inputFastq2: File = new File(""),
outputFastq1: File = new File(""),
outputFastq2: File = new File("")) extends AbstractArgs
class OptParser extends AbstractOptParser {
// TODO: make output format independent from input format?
head(
s"""
|$commandName - Sync paired-end FASTQ files.
|
|This tool works with gzipped or non-gzipped FASTQ files. The output
|file will be gzipped when the input is also gzipped.
""".stripMargin)
opt[File]('r', "ref") required () valueName "<fastq>" action { (x, c) =>
c.copy(refFastq = x)
} validate {
x => if (x.exists) success else failure("Reference FASTQ file not found")
} text "Reference FASTQ file"
opt[File]('i', "in1") required () valueName "<fastq>" action { (x, c) =>
c.copy(inputFastq1 = x)
} validate {
x => if (x.exists) success else failure("Input FASTQ file 1 not found")
} text "Input FASTQ file 1"
opt[File]('j', "in2") required () valueName "<fastq[.gz]>" action { (x, c) =>
c.copy(inputFastq2 = x)
} validate {
x => if (x.exists) success else failure("Input FASTQ file 2 not found")
} text "Input FASTQ file 2"
opt[File]('o', "out1") required () valueName "<fastq[.gz]>" action { (x, c) =>
c.copy(outputFastq1 = x)
} text "Output FASTQ file 1"
opt[File]('p', "out2") required () valueName "<fastq>" action { (x, c) =>
c.copy(outputFastq2 = x)
} text "Output FASTQ file 2"
}
/**
* Parses the command line argument
*
* @param args Array of arguments
* @return
*/
def parseArgs(args: Array[String]): Args = new OptParser()
.parse(args, Args())
.getOrElse(sys.exit(1))
def main(args: Array[String]): Unit = {
val commandArgs: Args = parseArgs(args)
val (synced, counts) = syncFastq(
new FastqReader(commandArgs.refFastq),
new FastqReader(commandArgs.inputFastq1),
new FastqReader(commandArgs.inputFastq2))
writeSyncedFastq(synced, counts,
// using 3000 for queue size to approximate NFS buffer
new AsyncFastqWriter(new BasicFastqWriter(commandArgs.outputFastq1), 3000),
new AsyncFastqWriter(new BasicFastqWriter(commandArgs.outputFastq2), 3000)
)
}
}
/**
* Copyright (c) 2014 Leiden University Medical Center - Sequencing Analysis Support Core <sasc@lumc.nl>
* @author Wibowo Arindrarto <w.arindrarto@lumc.nl>
*/
package nl.lumc.sasc.biopet.tools
import java.io.File
import java.nio.file.Paths
import scala.collection.JavaConverters._
import htsjdk.samtools.fastq.{ AsyncFastqWriter, FastqReader, FastqRecord }
import org.mockito.Mockito.{ inOrder => inOrd, when }
import org.scalatest.Matchers
import org.scalatest.mock.MockitoSugar
import org.scalatest.testng.TestNGSuite
import org.testng.annotations.{ DataProvider, Test }
class FastqSyncTest extends TestNGSuite with MockitoSugar with Matchers {
import FastqSync._
private def resourceFile(p: String): File =
new File(resourcePath(p))