From 26d14d83bc59256cc94c2c001c06eaada7b487b7 Mon Sep 17 00:00:00 2001
From: Wai Yi Leung <w.y.leung@lumc.nl>
Date: Mon, 11 Apr 2016 15:32:57 +0200
Subject: [PATCH] Refactored code to avoid collection.Breakout. (With help from
 Peter)

---
 .../nl/lumc/sasc/biopet/utils/BamUtils.scala  | 65 ++++++++++++-------
 1 file changed, 43 insertions(+), 22 deletions(-)

diff --git a/public/biopet-utils/src/main/scala/nl/lumc/sasc/biopet/utils/BamUtils.scala b/public/biopet-utils/src/main/scala/nl/lumc/sasc/biopet/utils/BamUtils.scala
index 0a3939c88..df8068c0c 100644
--- a/public/biopet-utils/src/main/scala/nl/lumc/sasc/biopet/utils/BamUtils.scala
+++ b/public/biopet-utils/src/main/scala/nl/lumc/sasc/biopet/utils/BamUtils.scala
@@ -21,6 +21,7 @@ import htsjdk.samtools.{ SamReader, SamReaderFactory }
 import nl.lumc.sasc.biopet.utils.intervals.{ BedRecord, BedRecordList }
 
 import scala.collection.JavaConversions._
+import scala.collection.mutable
 import scala.collection.parallel.immutable
 
 /**
@@ -65,33 +66,53 @@ object BamUtils {
     // create a bedList to devide the contig into multiple pieces
     val insertSizesOnAllFragments = BedRecordList.fromList(Seq(BedRecord(contig, start, end)))
       .scatter(binSize)
-      .allRecords.par.map({
+      .allRecords.par.flatMap({
         bedRecord =>
           // for each scatter, open the bamfile for this specific region-query
           val inputSam: SamReader = SamReaderFactory.makeDefault.open(inputBam)
           val samIterator = inputSam.query(bedRecord.chr, bedRecord.start, bedRecord.end, true)
 
-          val insertsizes: List[Int] = (for {
-            read <- samIterator.toStream.takeWhile(rec => {
-              // TODO: This value is now hard-coded. I'm not sure whether this is the best practice on selecting reads with a minimum required quality.
-              val minQ10 = rec.getMappingQuality >= 10
-              // with properPairFlag we exclude readpairs that span multiple contigs.
-              val paired = rec.getReadPairedFlag && rec.getProperPairFlag
-              val bothMapped = if (paired) ((rec.getReadUnmappedFlag == false) && (rec.getMateUnmappedFlag == false)) else false
-              paired && bothMapped && minQ10
-            }).take(samplingSize)
-          } yield {
-            read.getInferredInsertSize.asInstanceOf[Int].abs
-          })(collection.breakOut)
-          val lociInsertSize = insertsizes.foldLeft((0.0, 0))((t, r) => (t._1 + r, t._2 + 1))
-
-          samIterator.close()
-          inputSam.close()
-          if (lociInsertSize._2 == 0) None else Some((lociInsertSize._1 / lociInsertSize._2).toInt)
-      }).toList.flatten
-
-    val contigInsertSize = insertSizesOnAllFragments.foldLeft((0.0, 0))((t, r) => (t._1 + r, t._2 + 1))
-    if (contigInsertSize._2 == 0) None else Some((contigInsertSize._1 / contigInsertSize._2).toInt)
+          val counts: mutable.Map[Int, Int] = mutable.Map()
+
+          for (i <- 0 until samplingSize if samIterator.hasNext) {
+            val rec = samIterator.next()
+            val isPaired = rec.getReadPairedFlag
+            val minQ10 = rec.getMappingQuality >= 10
+            val pairOnSameContig = rec.getContig == rec.getMateReferenceName
+
+            if (isPaired && minQ10 && pairOnSameContig) {
+              val insertSize = rec.getInferredInsertSize.abs
+              counts(insertSize) = counts.getOrElse(insertSize, 0) + 1
+            }
+          }
+
+          counts.keys.size match {
+            case 1 => Some(counts.keys.head)
+            case 0 => None
+            case _ => {
+              Some(counts.foldLeft(0)((old, observation) => {
+                observation match {
+                  case (insertSize: Int, observations: Int) => {
+                    (old + (insertSize * observations)) / (observations + 1)
+                  }
+                  case _ => 0
+                }
+              }))
+            }
+          }
+      })
+
+    insertSizesOnAllFragments.size match {
+      case 1 => Some(insertSizesOnAllFragments.head)
+      case 0 => None
+      case _ => {
+        Some(
+          insertSizesOnAllFragments.foldLeft(0)((old, observation) => {
+            (old + observation) / 2
+          }))
+      }
+
+    }
   }
 
   /**
-- 
GitLab