Commit 4b96fb53 authored by wyleung's avatar wyleung
Browse files

Seqstat with map reduce

parent 1701c381
......@@ -8,8 +8,9 @@ package nl.lumc.sasc.biopet.tools
import java.io.File
import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.language.postfixOps
import htsjdk.samtools.fastq.{ BasicFastqWriter, FastqReader, FastqRecord }
import htsjdk.samtools.fastq.{ FastqReader, FastqRecord }
import scalaz._, Scalaz._
import argonaut._, Argonaut._
......@@ -31,6 +32,30 @@ object Seqstat extends ToolCommand {
var phred_encoding: String = "sanger"
var phred_correction: Int = 33
case class ReadStats(length: Int, n_bases: Int, has_N: Int, avg_qual: Map[Int, Int], qualstat: Map[Int, Int],
total_reads: Int, length_max: Int, length_min: Int,
bases_total: Long)
/**
* Count N and quals per read
* - numBases
*
* @param record FastqRecord
*/
def analyseFastQRecord(record: FastqRecord): ReadStats = {
var (qualstat, avg_qual, numBases, n_bases, readlength) = (Map(0 -> 0), Map(0 -> 0), 0, 0, 0)
qualstat = histogramReadQual(record.getBaseQualityString)
avg_qual = Map(qualstat.view.map { case (k, v) => k * v }.toList.foldLeft(0)(_ + _) / qualstat.values.sum -> 1)
n_bases = record.getReadString.count(_ == 'N')
readlength = record.getReadString.length
val has_N = if (n_bases > 0) 1 else 0
ReadStats(readlength, n_bases, has_N, avg_qual, qualstat, 1, readlength, readlength, readlength)
}
/**
* Reads a FastQ file and counts:
* - N bases/read
......@@ -42,50 +67,49 @@ object Seqstat extends ToolCommand {
*
* @param fqf FastqReader of the source FastQ file
*/
def startStatistics(fqf: FastqReader): (Int, Int, Int, Int, Map[Int, Int], Map[Int, Int], Int, Int) = {
var (numRecords, numRecordsWithN, numBases, numBasesWithN) = (0, 0, 0, 0)
def startStatistics(fqf: FastqReader): (ReadStats, Map[Int, Int], Map[Int, Int]) = {
var baseQuals: Map[Int, Int] = Map(0 -> 0)
var readQuals: Map[Int, Int] = Map(0 -> 0)
var readlength_min: Int = Int.MaxValue
var readlength_max: Int = Int.MinValue
val it = fqf.iterator.asScala
while (it.hasNext) {
val record = it.next()
numRecords += 1
numBases += record.getReadString.length
val recordNbases: Int = record.getReadString.count(_ == 'N')
if (recordNbases > 0) {
numRecordsWithN += 1
numBasesWithN += recordNbases
}
// compute the histogram of qualities for this read
val qualStat = histogramReadQual(record.getBaseQualityString)
// merge the matrixes
baseQuals = baseQuals |+| qualStat
// compute the per read avg quality
val AvgReadQual = qualStat.view.map { case (k, v) => k * v }.toList.foldLeft(0)(_ + _) / qualStat.values.sum
readQuals = readQuals |+| Map(AvgReadQual -> 1)
readlength_max = if (record.length() > readlength_max) record.length() else readlength_max;
readlength_min = if (record.length() < readlength_min) record.length() else readlength_min;
}
// set the Phred encoding
detectPhredEncoding(baseQuals)
val result: ReadStats = fqf.iterator
.asScala.toList
.map(record => analyseFastQRecord(record)).reduceLeft(statReducer)
// val it = fqf.iterator.asScala
// while (it.hasNext) {
// val record = it.next()
//
// numRecords += 1
// numBases += record.getReadString.length
//
// val recordNbases: Int = record.getReadString.count(_ == 'N')
// if (recordNbases > 0) {
// numRecordsWithN += 1
// numBasesWithN += recordNbases
// }
//
// // compute the histogram of qualities for this read
// val qualStat = histogramReadQual(record.getBaseQualityString)
// // merge the matrixes
// baseQuals = baseQuals |+| qualStat
//
// // compute the per read avg quality
// val AvgReadQual = qualStat.view.map { case (k, v) => k * v }.toList.foldLeft(0)(_ + _) / qualStat.values.sum
// readQuals = readQuals |+| Map(AvgReadQual -> 1)
//
// readlength_max = if (record.length() > readlength_max) record.length() else readlength_max;
// readlength_min = if (record.length() < readlength_min) record.length() else readlength_min;
//
// }
// detect and set the Phred encoding
detectPhredEncoding(result.qualstat)
// transform the keys
baseQuals = transformPhredEncoding(baseQuals)
readQuals = transformPhredEncoding(readQuals)
baseQuals = transformPhredEncoding(result.qualstat)
readQuals = transformPhredEncoding(result.avg_qual)
(numRecords, numRecordsWithN, numBases, numBasesWithN, baseQuals, readQuals, readlength_max, readlength_min)
(result, baseQuals, readQuals)
}
/**
......@@ -140,6 +164,28 @@ object Seqstat extends ToolCommand {
}
}
/**
* http://stackoverflow.com/questions/17408880/reduce-fold-or-scan-left-right
*
*/
def statReducer(res: ReadStats,
nxt: ReadStats) = {
// reduce the following:
// ReadStats(length: Int, n_bases: Int, has_N: Int, avg_qual: Map[Int, Int], qualstat: Map[Int, Int],
// total_reads: Int, length_max: Int, length_min: Int)
ReadStats(
res.length + nxt.length,
res.n_bases + nxt.n_bases,
res.has_N + nxt.has_N,
res.avg_qual |+| nxt.avg_qual,
res.qualstat |+| nxt.qualstat,
res.total_reads + nxt.total_reads,
List(res.length_max, nxt.length_max).max,
List(res.length_min, nxt.length_min).min,
res.bases_total + nxt.bases_total
)
}
def transformPhredEncoding(qualMap: Map[Int, Int]): (Map[Int, Int]) = {
qualMap map { case (k, v) => (k - phred_correction, v) }
}
......@@ -147,33 +193,18 @@ object Seqstat extends ToolCommand {
def main(args: Array[String]): Unit = {
val commandArgs: Args = parseArgs(args)
val (numRecords, numRecordsWithN,
numBases, numBasesWithN,
baseHistogram, readHistogram,
readlength_max, readlength_min) = startStatistics(
val (readstats, baseHistogram, readHistogram) = startStatistics(
new FastqReader(commandArgs.fastq)
)
// println("Detected encoding: " + phred_encoding)
// println("Apply correction: " + phred_correction)
// println("Number of records:" + numRecords)
// println("Number of N records:" + numRecordsWithN)
// println("Number of bases:" + numBases)
// println("Number of N bases:" + numBasesWithN)
// println(baseHistogram)
// println(readHistogram)
val reportValues = List(1, 10, 20, 30, 40, 50, 60)
val baseHistMap = reportValues map (t => t -> baseHistogram.filterKeys(k => k >= t).values.sum) toMap
val readHistMap = reportValues map (t => t -> readHistogram.filterKeys(k => k >= t).values.sum) toMap
// println(baseHistMap)
// println(readHistMap)
val stats = ("bases" := (
("num_n" := numBasesWithN) ->:
("num_total" := numBases) ->:
("num_n" := readstats.n_bases) ->:
("num_total" := readstats.bases_total) ->:
("num_qual_gte" := (
("1" := baseHistMap(1)) ->:
("10" := baseHistMap(10)) ->:
......@@ -185,10 +216,10 @@ object Seqstat extends ToolCommand {
jEmptyObject
)) ->: jEmptyObject)) ->:
("reads" := (
("num_with_n" := numRecordsWithN) ->:
("num_total" := numRecords) ->:
("len_min" := readlength_min) ->:
("len_max" := readlength_max) ->:
("num_with_n" := readstats.has_N) ->:
("num_total" := readstats.total_reads) ->:
("len_min" := readstats.length_min) ->:
("len_max" := readstats.length_max) ->:
("num_mean_qual_gte" := (
("1" := readHistMap(1)) ->:
("10" := readHistMap(10)) ->:
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment