Commit 353abfe4 authored by Peter van 't Hof's avatar Peter van 't Hof
Browse files

improved futures collecting

parent 7ab46aa3
......@@ -14,7 +14,6 @@ import scala.concurrent.blocking
import scala.util.{Failure, Success}
import scala.concurrent.duration.Duration
import scala.collection.JavaConversions._
import scala.collection.immutable.Queue
/**
* Created by pjvanthof on 25/05/16.
......@@ -71,28 +70,18 @@ object BamStats extends ToolCommand {
}
def init(outputDir: File, bamFile: File, referenceDict: SAMSequenceDictionary, binSize: Int, threadBinSize: Int): Unit = {
var stats = Stats()
val contigsFutures = BedRecordList.fromDict(referenceDict).allRecords.map { contig =>
val f = Future {
val s = processContig(contig, bamFile, binSize, threadBinSize)
blocking { stats += s }
}
f.onFailure { case t => throw new RuntimeException(t) }
contig -> f
Future { processContig(contig, bamFile, binSize, threadBinSize) }
}
val unmappedFuture = Future { processUnmappedReads(bamFile) }
unmappedFuture.onFailure { case t => throw new RuntimeException(t) }
// Waiting on all contigs to complete
contigsFutures.foreach { x => Await.ready(x._2, Duration.Inf) }
Await.ready(unmappedFuture, Duration.Inf)
stats += unmappedFuture.value.get.get
val stats = waitOnFutures(unmappedFuture :: contigsFutures.toList)
logger.info(s"total: ${stats.totalReads}, unmapped: ${stats.unmapped}, secondary: ${stats.secondary}")
stats.insertSizeHistogram.keys.toList.sorted.foreach(x => println(s"$x\t${stats.insertSizeHistogram(x)}"))
stats.mappingQualityHistogram.writeToTsv(new File(outputDir, "mapping_quality.tsv"))
stats.insertSizeHistogram.writeToTsv(new File(outputDir, "insert_size.tsv"))
}
def processContig(region: BedRecord, bamFile: File, binSize: Int, threadBinSize: Int): Stats = {
......@@ -105,19 +94,22 @@ object BamStats extends ToolCommand {
val scattersPerThread = (region.length.toDouble / threadBinSize).ceil.toInt
val scattersFutures = scatters.grouped(scattersPerThread).map { scatters =>
val f = Future {
val s = processThread(scatters, bamFile)
blocking { stats += s }
}
f.onFailure { case t => throw new RuntimeException(t) }
f
Future { processThread(scatters, bamFile) }
}
// Waiting on all contigs to complete
scattersFutures.foreach { x => Await.ready(x, Duration.Inf) }
logger.info(s"Contig '${region.chr}' done")
waitOnFutures(scattersFutures.toList)
}
def waitOnFutures(futures: List[Future[Stats]], msg: Option[String] = None): Stats = {
futures.foreach(_.onFailure { case t => throw new RuntimeException(t) })
var stats = Stats()
var running = futures
while (running.nonEmpty) {
val done = running.filter(_.value.isDefined)
done.foreach(stats += _.value.get.get)
running = running.filterNot(done.contains(_))
Thread.sleep(1000)
}
stats
}
......@@ -137,12 +129,13 @@ object BamStats extends ToolCommand {
if (samRecord.isSecondaryOrSupplementary) totalStats.secondary += 1
if (samRecord.getReadUnmappedFlag) totalStats.unmapped += 1
else { // Mapped read
totalStats.mappingQualityHistogram += samRecord.getMappingQuality -> (totalStats.mappingQualityHistogram.getOrElse(samRecord.getMappingQuality, 0L) + 1)
totalStats.mappingQualityHistogram.add(samRecord.getMappingQuality)
}
if (samRecord.getProperPairFlag && samRecord.getFirstOfPairFlag && !samRecord.getSecondOfPairFlag)
totalStats.insertSizeHistogram += samRecord.getInferredInsertSize.abs -> (totalStats.insertSizeHistogram.getOrElse(samRecord.getInferredInsertSize.abs, 0L) + 1)
totalStats.insertSizeHistogram.add(samRecord.getInferredInsertSize.abs)
//TODO: Read counting
//TODO: Clipping stats
//TODO: Bin Support
}
//TODO: bases counting
......
package nl.lumc.sasc.biopet.tools.bamstats
import java.io.{File, PrintWriter}
import scala.collection.generic.Sorted
import scala.collection.mutable
import scala.math.ScalaNumber
/**
* Created by pjvanthof on 05/07/16.
*/
case class Histogram() {
protected[Histogram] val histrogram: mutable.Map[Int, Long] = mutable.Map()
def +=(other: Histogram): Histogram = {
other.histrogram.foreach(x => this.histrogram += x._1 -> (this.histrogram.getOrElse(x._1, 0L) + x._2))
this
}
def add(value: Int): Unit = {
histrogram += value -> (histrogram.getOrElse(value, 0L) + 1)
}
def writeToTsv(file: File): Unit = {
val writer = new PrintWriter(file)
writer.println("value\tcount")
histrogram.keys.toList.sorted.foreach(x => writer.println(s"$x\t${histrogram(x)}"))
writer.close()
}
}
package nl.lumc.sasc.biopet.tools.bamstats
import scala.collection.mutable
import scala.concurrent.blocking
/**
* Created by pjvanthof on 05/07/16.
......@@ -10,14 +12,14 @@ case class Stats() {
var totalReads = 0L
var unmapped = 0L
var secondary = 0L
var mappingQualityHistogram: mutable.Map[Int, Long] = mutable.Map()
var insertSizeHistogram: mutable.Map[Int, Long] = mutable.Map()
val mappingQualityHistogram = Histogram()
val insertSizeHistogram = Histogram()
def +(other: Stats): Stats = {
this.totalReads += other.totalReads
this.unmapped += other.unmapped
other.mappingQualityHistogram.foreach(x => this.mappingQualityHistogram += x._1 -> (this.mappingQualityHistogram.getOrElse(x._1, 0L) + x._2))
other.insertSizeHistogram.foreach(x => this.insertSizeHistogram += x._1 -> (this.insertSizeHistogram.getOrElse(x._1, 0L) + x._2))
this.mappingQualityHistogram += other.mappingQualityHistogram
this.insertSizeHistogram += other.insertSizeHistogram
this
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment