From fd22b885bfae5be01da0935569c5579884bd5154 Mon Sep 17 00:00:00 2001
From: Peter van 't Hof <p.j.van_t_hof@lumc.nl>
Date: Wed, 4 Feb 2015 14:57:42 +0100
Subject: [PATCH] Added thread scattering

---
 .../nl/lumc/sasc/biopet/tools/VcfStats.scala  | 94 ++++++++++++++-----
 1 file changed, 69 insertions(+), 25 deletions(-)

diff --git a/public/biopet-framework/src/main/scala/nl/lumc/sasc/biopet/tools/VcfStats.scala b/public/biopet-framework/src/main/scala/nl/lumc/sasc/biopet/tools/VcfStats.scala
index 1602c7e76..9efa75bbc 100644
--- a/public/biopet-framework/src/main/scala/nl/lumc/sasc/biopet/tools/VcfStats.scala
+++ b/public/biopet-framework/src/main/scala/nl/lumc/sasc/biopet/tools/VcfStats.scala
@@ -11,6 +11,7 @@ import scala.collection.JavaConversions._
 import scala.collection.mutable
 import scala.sys.process.{ Process, ProcessLogger }
 import scala.util.matching.Regex
+import htsjdk.samtools.util.Interval
 
 /**
  * Created by pjvan_thof on 1/10/15.
@@ -20,7 +21,7 @@ class VcfStats {
 }
 
 object VcfStats extends ToolCommand {
-  case class Args(inputFile: File = null, outputDir: String = null) extends AbstractArgs
+  case class Args(inputFile: File = null, outputDir: String = null, intervals: Option[File] = None) extends AbstractArgs
 
   class OptParser extends AbstractOptParser {
     opt[File]('I', "inputFile") required () unbounded () valueName ("<file>") action { (x, c) =>
@@ -29,6 +30,11 @@ object VcfStats extends ToolCommand {
     opt[String]('o', "outputDir") required () unbounded () valueName ("<file>") action { (x, c) =>
       c.copy(outputDir = x)
     }
+    /*
+    opt[File]('i', "intervals") unbounded () valueName ("<file>") action { (x, c) =>
+      c.copy(intervals = Some(x))
+    }
+    */
   }
 
   class SampleToSampleStats {
@@ -62,7 +68,7 @@ object VcfStats extends ToolCommand {
     val generalStats: mutable.Map[String, mutable.Map[Any, Int]] = mutable.Map()
     val samplesStats: mutable.Map[String, SampleStats] = mutable.Map()
 
-    def +=(other: Stats): Unit = {
+    def +=(other: Stats): Stats = {
       for ((key, value) <- other.samplesStats) {
         if (this.samplesStats.contains(key)) this.samplesStats(key) += value
         else this.samplesStats(key) = value
@@ -72,6 +78,7 @@ object VcfStats extends ToolCommand {
         if (thisField.isDefined) mergeStatsMap(thisField.get, fieldMap)
         else this.generalStats += field -> fieldMap
       }
+      this
     }
   }
 
@@ -101,40 +108,77 @@ object VcfStats extends ToolCommand {
     val argsParser = new OptParser
     commandArgs = argsParser.parse(args, Args()) getOrElse sys.exit(1)
 
-    val reader = new VCFFileReader(commandArgs.inputFile, false)
-
+    val reader = new VCFFileReader(commandArgs.inputFile, true)
     val header = reader.getFileHeader
     val samples = header.getSampleNamesInOrder.toList
 
+    val intervals: List[Interval] = (
+      for (seq <- header.getSequenceDictionary.getSequences;
+        chunks = seq.getSequenceLength / 10000000;
+        i <- 1 until chunks) yield {
+        val size = seq.getSequenceLength / chunks
+        val begin = size * (i-1) + 1
+        val end = if (i >= chunks) seq.getSequenceLength else size * i
+        new Interval(seq.getSequenceName, begin, end)
+      }
+      ).toList
+
+    val totalBases = intervals.foldRight(0L)(_.length() + _)
+
     // Reading vcf records
     logger.info("Start reading vcf records")
-    var counter = 0
-    val stats = new Stats
-    //init stats
-    for (sample1 <- samples) {
-      stats.samplesStats += sample1 -> new SampleStats
-      for (sample2 <- samples) {
-        stats.samplesStats(sample1).sampleToSample += sample2 -> new SampleToSampleStats
-      }
-    }
-    for (record <- reader) yield {
-      mergeNestedStatsMap(stats.generalStats, checkGeneral(record))
-      for (sample1 <- samples) yield {
-        val genotype = record.getGenotype(sample1)
-        mergeNestedStatsMap(stats.samplesStats(sample1).genotypeStats, checkGenotype(genotype))
+
+    def createStats: Stats = {
+      val stats = new Stats
+      //init stats
+      for (sample1 <- samples) {
+        stats.samplesStats += sample1 -> new SampleStats
         for (sample2 <- samples) {
-          val genotype2 = record.getGenotype(sample2)
-          if (genotype.getAlleles == genotype2.getAlleles)
-            stats.samplesStats(sample1).sampleToSample(sample2).genotypeOverlap += 1
-          stats.samplesStats(sample1).sampleToSample(sample2).alleleOverlap += genotype.getAlleles.count(allele => genotype2.getAlleles.exists(_.basesMatch(allele)))
+          stats.samplesStats(sample1).sampleToSample += sample2 -> new SampleToSampleStats
         }
       }
+      stats
+    }
+
+    var variantCounter = 0L
+    var baseCounter = 0L
+
+    def status(count: Int, interval:Interval): Unit = {
+      variantCounter += count
+      baseCounter += interval.length()
+      val fraction = baseCounter.toFloat / totalBases * 100
+      logger.info(interval + " done, " + count + " rows processed")
+      logger.info("total: " + variantCounter + " rows processed, " + fraction + "% done")
+    }
 
-      counter += 1
-      if (counter % 100000 == 0) logger.info(counter + " variants done")
+    val statsChunks = for (interval <- intervals.par) yield {
+      val reader = new VCFFileReader(commandArgs.inputFile, true)
+      var chunkCounter = 0
+      val stats = createStats
+      logger.info("Starting on: " + interval)
+
+      for (record <- reader.query(interval.getSequence, interval.getStart, interval.getEnd)
+           if record.getStart <= interval.getEnd) {
+        mergeNestedStatsMap(stats.generalStats, checkGeneral(record))
+        for (sample1 <- samples) yield {
+          val genotype = record.getGenotype(sample1)
+          mergeNestedStatsMap(stats.samplesStats(sample1).genotypeStats, checkGenotype(genotype))
+          for (sample2 <- samples) {
+            val genotype2 = record.getGenotype(sample2)
+            if (genotype.getAlleles == genotype2.getAlleles)
+              stats.samplesStats(sample1).sampleToSample(sample2).genotypeOverlap += 1
+            stats.samplesStats(sample1).sampleToSample(sample2).alleleOverlap += genotype.getAlleles.count(allele => genotype2.getAlleles.exists(_.basesMatch(allele)))
+          }
+        }
+        chunkCounter += 1
+      }
+      status(chunkCounter, interval)
+      stats
     }
 
-    logger.info(counter + " variants done")
+    val stats = statsChunks.toList.fold(createStats)(_ += _)
+
+    //logger.info(counter + " variants done")
     logger.info("Done reading vcf records")
 
     plotXy(writeField("QUAL", stats.generalStats.getOrElse("QUAL", mutable.Map())))
-- 
GitLab