Commit cf082184 authored by Peter van 't Hof's avatar Peter van 't Hof Committed by GitHub

Merge pull request #197 from biopet/fix-BIOPET-780

Improve vcfstats on collecting data
parents 103f5dbd b4d19655
...@@ -23,13 +23,14 @@ import scala.collection.mutable ...@@ -23,13 +23,14 @@ import scala.collection.mutable
* @param sampleToSample Stores sample to sample compare stats * @param sampleToSample Stores sample to sample compare stats
*/ */
case class SampleStats(genotypeStats: mutable.Map[String, mutable.Map[Any, Int]] = mutable.Map(), case class SampleStats(genotypeStats: mutable.Map[String, mutable.Map[Any, Int]] = mutable.Map(),
sampleToSample: mutable.Map[String, SampleToSampleStats] = mutable.Map()) { sampleToSample: Array[SampleToSampleStats] = Array()) {
/** Add an other class */ /** Add an other class */
def +=(other: SampleStats): Unit = { def +=(other: SampleStats): Unit = {
for ((key, value) <- other.sampleToSample) { require(other.sampleToSample.size == this.sampleToSample.size)
if (this.sampleToSample.contains(key)) this.sampleToSample(key) += value val zipped = this.sampleToSample.zip(other.sampleToSample).zipWithIndex
else this.sampleToSample(key) = value for (((s1, s2), i) <- zipped) {
s1 += s2
} }
for ((field, fieldMap) <- other.genotypeStats) { for ((field, fieldMap) <- other.genotypeStats) {
val thisField = this.genotypeStats.get(field) val thisField = this.genotypeStats.get(field)
......
...@@ -31,7 +31,7 @@ import scala.sys.process.{Process, ProcessLogger} ...@@ -31,7 +31,7 @@ import scala.sys.process.{Process, ProcessLogger}
* @param samplesStats Stores all sample/genotype specific stats * @param samplesStats Stores all sample/genotype specific stats
*/ */
case class Stats(generalStats: mutable.Map[String, mutable.Map[Any, Int]] = mutable.Map(), case class Stats(generalStats: mutable.Map[String, mutable.Map[Any, Int]] = mutable.Map(),
samplesStats: mutable.Map[String, SampleStats] = mutable.Map()) { samplesStats: mutable.Map[Int, SampleStats] = mutable.Map()) {
/** Add an other class */ /** Add an other class */
def +=(other: Stats): Stats = { def +=(other: Stats): Stats = {
...@@ -91,18 +91,18 @@ case class Stats(generalStats: mutable.Map[String, mutable.Map[Any, Int]] = muta ...@@ -91,18 +91,18 @@ case class Stats(generalStats: mutable.Map[String, mutable.Map[Any, Int]] = muta
file.getParentFile.mkdirs() file.getParentFile.mkdirs()
val writer = new PrintWriter(file) val writer = new PrintWriter(file)
writer.println(samples.mkString(field + "\t", "\t", "")) writer.println(samples.mkString(field + "\t", "\t", ""))
val keySet = (for (sample <- samples) val keySet = (for ((sample, sampleIndex) <- samples.zipWithIndex)
yield yield
this this
.samplesStats(sample) .samplesStats(sampleIndex)
.genotypeStats .genotypeStats
.getOrElse(field, Map[Any, Int]()) .getOrElse(field, Map[Any, Int]())
.keySet).fold(Set[Any]())(_ ++ _) .keySet).fold(Set[Any]())(_ ++ _)
for (key <- keySet.toList.sortWith(sortAnyAny)) { for (key <- keySet.toList.sortWith(sortAnyAny)) {
val values = for (sample <- samples) val values = for ((sample, sampleIndex) <- samples.zipWithIndex)
yield yield
this this
.samplesStats(sample) .samplesStats(sampleIndex)
.genotypeStats .genotypeStats
.getOrElse(field, Map[Any, Int]()) .getOrElse(field, Map[Any, Int]())
.getOrElse(key, 0) .getOrElse(key, 0)
...@@ -113,22 +113,22 @@ case class Stats(generalStats: mutable.Map[String, mutable.Map[Any, Int]] = muta ...@@ -113,22 +113,22 @@ case class Stats(generalStats: mutable.Map[String, mutable.Map[Any, Int]] = muta
/** Function to write 1 specific genotype field */ /** Function to write 1 specific genotype field */
def getGenotypeField(samples: List[String], field: String): Map[String, Map[String, Any]] = { def getGenotypeField(samples: List[String], field: String): Map[String, Map[String, Any]] = {
val keySet = (for (sample <- samples) val keySet = (for ((sample, sampleIndex) <- samples.zipWithIndex)
yield yield
this this
.samplesStats(sample) .samplesStats(sampleIndex)
.genotypeStats .genotypeStats
.getOrElse(field, Map[Any, Int]()) .getOrElse(field, Map[Any, Int]())
.keySet).fold(Set[Any]())(_ ++ _) .keySet).fold(Set[Any]())(_ ++ _)
(for (sample <- samples) (for ((sample, sampleIndex) <- samples.zipWithIndex)
yield yield
sample -> { sample -> {
keySet keySet
.map( .map(
key => key =>
key.toString -> this key.toString -> this
.samplesStats(sample) .samplesStats(sampleIndex)
.genotypeStats .genotypeStats
.getOrElse(field, Map[Any, Int]()) .getOrElse(field, Map[Any, Int]())
.get(key)) .get(key))
...@@ -142,19 +142,21 @@ case class Stats(generalStats: mutable.Map[String, mutable.Map[Any, Int]] = muta ...@@ -142,19 +142,21 @@ case class Stats(generalStats: mutable.Map[String, mutable.Map[Any, Int]] = muta
genotypeFields: List[String] = Nil, genotypeFields: List[String] = Nil,
infoFields: List[String] = Nil, infoFields: List[String] = Nil,
sampleDistributions: List[String] = Nil): Map[String, Any] = { sampleDistributions: List[String] = Nil): Map[String, Any] = {
val sampleIndex = samples.zipWithIndex
Map( Map(
"genotype" -> genotypeFields.map(f => f -> getGenotypeField(samples, f)).toMap, "genotype" -> genotypeFields.map(f => f -> getGenotypeField(samples, f)).toMap,
"info" -> infoFields.map(f => f -> getField(f)).toMap, "info" -> infoFields.map(f => f -> getField(f)).toMap,
"sample_distributions" -> sampleDistributions "sample_distributions" -> sampleDistributions
.map(f => f -> getField("SampleDistribution-" + f)) .map(f => f -> getField("SampleDistribution-" + f))
.toMap .toMap,
) ++ Map(
"sample_compare" -> Map( "sample_compare" -> Map(
"samples" -> samples, "samples" -> samples,
"genotype_overlap" -> samples.map(sample1 => "genotype_overlap" -> sampleIndex.map(sample1 =>
samples.map(sample2 => samplesStats(sample1).sampleToSample(sample2).genotypeOverlap)), sampleIndex.map(sample2 =>
"allele_overlap" -> samples.map(sample1 => samplesStats(sample1._2).sampleToSample(sample2._2).genotypeOverlap)),
samples.map(sample2 => samplesStats(sample1).sampleToSample(sample2).alleleOverlap)) "allele_overlap" -> sampleIndex.map(sample1 =>
sampleIndex.map(sample2 =>
samplesStats(sample1._2).sampleToSample(sample2._2).alleleOverlap))
) )
) )
} }
...@@ -218,14 +220,14 @@ case class Stats(generalStats: mutable.Map[String, mutable.Map[Any, Int]] = muta ...@@ -218,14 +220,14 @@ case class Stats(generalStats: mutable.Map[String, mutable.Map[Any, Int]] = muta
absWriter.println(samples.mkString("\t", "\t", "")) absWriter.println(samples.mkString("\t", "\t", ""))
relWriter.println(samples.mkString("\t", "\t", "")) relWriter.println(samples.mkString("\t", "\t", ""))
for (sample1 <- samples) { for (sample1 <- samples.zipWithIndex) {
val values = for (sample2 <- samples) val values = for (sample2 <- samples.zipWithIndex)
yield function(this.samplesStats(sample1).sampleToSample(sample2)) yield function(this.samplesStats(sample1._2).sampleToSample(sample2._2))
absWriter.println(values.mkString(sample1 + "\t", "\t", "")) absWriter.println(values.mkString(sample1._1 + "\t", "\t", ""))
val total = function(this.samplesStats(sample1).sampleToSample(sample1)) val total = function(this.samplesStats(sample1._2).sampleToSample(sample1._2))
relWriter.println(values.map(_.toFloat / total).mkString(sample1 + "\t", "\t", "")) relWriter.println(values.map(_.toFloat / total).mkString(sample1._1 + "\t", "\t", ""))
} }
absWriter.close() absWriter.close()
relWriter.close() relWriter.close()
...@@ -239,12 +241,11 @@ object Stats { ...@@ -239,12 +241,11 @@ object Stats {
def emptyStats(samples: List[String]): Stats = { def emptyStats(samples: List[String]): Stats = {
val stats = new Stats val stats = new Stats
val sampleIndex = samples.zipWithIndex
//init stats //init stats
for (sample1 <- samples) { for (sample1 <- sampleIndex) {
stats.samplesStats += sample1 -> new SampleStats stats.samplesStats += sample1._2 -> SampleStats(
for (sample2 <- samples) { sampleToSample = Array.fill(samples.size)(new SampleToSampleStats))
stats.samplesStats(sample1).sampleToSample += sample2 -> new SampleToSampleStats
}
} }
stats stats
} }
......
...@@ -64,6 +64,7 @@ object VcfStats extends ToolCommand { ...@@ -64,6 +64,7 @@ object VcfStats extends ToolCommand {
.asInstanceOf[URLClassLoader] .asInstanceOf[URLClassLoader]
.getURLs .getURLs
.map(_.getFile) .map(_.getFile)
.filter(_.endsWith(".jar"))
val conf = cmdArgs.sparkConfigValues.foldLeft( val conf = cmdArgs.sparkConfigValues.foldLeft(
new SparkConf() new SparkConf()
.setExecutorEnv(sys.env.toArray) .setExecutorEnv(sys.env.toArray)
...@@ -79,19 +80,57 @@ object VcfStats extends ToolCommand { ...@@ -79,19 +80,57 @@ object VcfStats extends ToolCommand {
case _ => BedRecordList.fromReference(cmdArgs.referenceFile) case _ => BedRecordList.fromReference(cmdArgs.referenceFile)
}).combineOverlap }).combineOverlap
.scatter(cmdArgs.binSize, maxContigsInSingleJob = Some(cmdArgs.maxContigsInSingleJob)) .scatter(cmdArgs.binSize, maxContigsInSingleJob = Some(cmdArgs.maxContigsInSingleJob))
val contigs = regions.flatMap(_.map(_.chr)) val nonEmptyRegions = BedRecordList
.fromList(
sc.parallelize(regions, regions.size).flatMap(filterEmptyBins(_, cmdArgs)).collect())
.scatter(cmdArgs.binSize, maxContigsInSingleJob = Some(cmdArgs.maxContigsInSingleJob))
val contigsRdd = sc val contigs = nonEmptyRegions.flatMap(_.map(_.chr))
.parallelize(regions, regions.size)
.flatMap(readBins(_, samples, cmdArgs, adInfoTags, adGenotypeTags)) val bigContigs = nonEmptyRegions.filter(_.size == 1).flatten.groupBy(_.chr).map {
.foldByKey(Stats.emptyStats(samples))(_ += _) case (_, r) =>
.repartition(contigs.size) val rdds = r
.cache() .grouped(10)
.map(
g =>
sc.parallelize(g.map(List(_)), g.size)
.flatMap(readBins(_, samples, cmdArgs, adInfoTags, adGenotypeTags))
.reduceByKey(_ += _)
.repartition(1))
.toList
val steps = Math.log10(rdds.size).ceil.toInt
val lastRdds = (1 until steps)
.foldLeft(rdds) {
case (a, _) =>
if (a.size > 10)
a.grouped(10)
.map { g =>
sc.union(g)
.reduceByKey(_ += _)
.repartition(1)
}
.toList
else a
}
if (lastRdds.size > 1) {
sc.union(lastRdds)
.reduceByKey(_ += _)
.repartition(1)
} else lastRdds.head
}
val smallContigs = nonEmptyRegions.filter(_.size > 1).map { r =>
sc.parallelize(r.map(List(_)), r.size)
.flatMap(readBins(_, samples, cmdArgs, adInfoTags, adGenotypeTags))
.reduceByKey(_ += _)
.cache()
}
val contigsRdd = sc.union(smallContigs ++ bigContigs).cache
val totalRdd = contigsRdd val totalRdd = contigsRdd
.map("total" -> _._2) .map("total" -> _._2)
.foldByKey(Stats.emptyStats(samples))(_ += _) .reduceByKey(_ += _)
.repartition(1)
.cache() .cache()
val totalJson = totalRdd.map { val totalJson = totalRdd.map {
...@@ -138,42 +177,43 @@ object VcfStats extends ToolCommand { ...@@ -138,42 +177,43 @@ object VcfStats extends ToolCommand {
cmdArgs: Args, cmdArgs: Args,
adInfoTags: List[String], adInfoTags: List[String],
adGenotypeTags: List[String], adGenotypeTags: List[String],
reader: VCFFileReader): Stats = { reader: VCFFileReader): Option[Stats] = {
val stats = Stats.emptyStats(samples)
logger.info("Starting on: " + bedRecord) logger.info("Starting on: " + bedRecord)
val samInterval = bedRecord.toSamInterval val samInterval = bedRecord.toSamInterval
val query = val query = reader.query(samInterval.getContig, samInterval.getStart, samInterval.getEnd)
reader.query(samInterval.getContig, samInterval.getStart, samInterval.getEnd)
if (!query.hasNext) { if (query.nonEmpty) {
val stats = Stats.emptyStats(samples)
Stats.mergeNestedStatsMap(stats.generalStats, fillGeneral(adInfoTags)) Stats.mergeNestedStatsMap(stats.generalStats, fillGeneral(adInfoTags))
for (sample <- samples) yield { for (sample <- samples.zipWithIndex) yield {
Stats.mergeNestedStatsMap(stats.samplesStats(sample).genotypeStats, Stats.mergeNestedStatsMap(stats.samplesStats(sample._2).genotypeStats,
fillGenotype(adGenotypeTags)) fillGenotype(adGenotypeTags))
} }
}
for (record <- query if record.getStart <= samInterval.getEnd) { for (record <- query if record.getStart <= samInterval.getEnd) {
Stats.mergeNestedStatsMap(stats.generalStats, checkGeneral(record, adInfoTags)) Stats.mergeNestedStatsMap(stats.generalStats, checkGeneral(record, adInfoTags))
for (sample1 <- samples) yield { for (sample1 <- samples.zipWithIndex) yield {
val genotype = record.getGenotype(sample1) val genotype = record.getGenotype(sample1._2)
Stats.mergeNestedStatsMap(stats.samplesStats(sample1).genotypeStats, Stats.mergeNestedStatsMap(stats.samplesStats(sample1._2).genotypeStats,
checkGenotype(record, genotype, adGenotypeTags)) checkGenotype(record, genotype, adGenotypeTags))
for (sample2 <- samples) { for (sample2 <- samples.zipWithIndex) {
val genotype2 = record.getGenotype(sample2) val genotype2 = record.getGenotype(sample2._2)
if (genotype.getAlleles == genotype2.getAlleles) if (genotype.getAlleles == genotype2.getAlleles)
stats.samplesStats(sample1).sampleToSample(sample2).genotypeOverlap += 1 stats.samplesStats(sample1._2).sampleToSample(sample2._2).genotypeOverlap += 1
stats.samplesStats(sample1).sampleToSample(sample2).alleleOverlap += VcfUtils stats.samplesStats(sample1._2).sampleToSample(sample2._2).alleleOverlap += VcfUtils
.alleleOverlap(genotype.getAlleles.toList, genotype2.getAlleles.toList) .alleleOverlap(genotype.getAlleles.toList, genotype2.getAlleles.toList)
}
} }
} }
}
logger.info("Completed on: " + bedRecord) logger.info("Completed on: " + bedRecord)
stats Some(stats)
} else None
} }
def readBins(bedRecords: List[BedRecord], def readBins(bedRecords: List[BedRecord],
...@@ -184,11 +224,27 @@ object VcfStats extends ToolCommand { ...@@ -184,11 +224,27 @@ object VcfStats extends ToolCommand {
val reader = new VCFFileReader(cmdArgs.inputFile, true) val reader = new VCFFileReader(cmdArgs.inputFile, true)
val stats = for (bedRecord <- bedRecords) yield { val stats = for (bedRecord <- bedRecords) yield {
bedRecord.chr -> readBin(bedRecord, samples, cmdArgs, adInfoTags, adGenotypeTags, reader) readBin(bedRecord, samples, cmdArgs, adInfoTags, adGenotypeTags, reader).map(
bedRecord.chr -> _)
}
reader.close()
stats.flatten
}
def filterEmptyBins(bedRecords: List[BedRecord], cmdArgs: Args): List[BedRecord] = {
val reader = new VCFFileReader(cmdArgs.inputFile, true)
val stats = for (bedRecord <- bedRecords) yield {
val samInterval = bedRecord.toSamInterval
val query = reader.query(samInterval.getContig, samInterval.getStart, samInterval.getEnd)
if (query.nonEmpty) Some(bedRecord)
else None
} }
reader.close() reader.close()
stats stats.flatten
} }
val defaultGenotypeFields = val defaultGenotypeFields =
......
...@@ -71,16 +71,11 @@ class VcfStatsTest extends TestNGSuite with Matchers { ...@@ -71,16 +71,11 @@ class VcfStatsTest extends TestNGSuite with Matchers {
@Test @Test
def testSampleStats(): Unit = { def testSampleStats(): Unit = {
val s1 = SampleStats() val s1 = SampleStats(sampleToSample = Array.fill(2)(SampleToSampleStats()))
val s2 = SampleStats() val s2 = SampleStats(sampleToSample = Array.fill(2)(SampleToSampleStats()))
s1.sampleToSample += "s1" -> SampleToSampleStats() s1.sampleToSample(0).alleleOverlap = 1
s1.sampleToSample += "s2" -> SampleToSampleStats() s2.sampleToSample(1).alleleOverlap = 2
s2.sampleToSample += "s1" -> SampleToSampleStats()
s2.sampleToSample += "s2" -> SampleToSampleStats()
s1.sampleToSample("s1").alleleOverlap = 1
s2.sampleToSample("s2").alleleOverlap = 2
s1.genotypeStats += "1" -> mutable.Map(1 -> 1) s1.genotypeStats += "1" -> mutable.Map(1 -> 1)
s2.genotypeStats += "2" -> mutable.Map(2 -> 2) s2.genotypeStats += "2" -> mutable.Map(2 -> 2)
...@@ -92,7 +87,7 @@ class VcfStatsTest extends TestNGSuite with Matchers { ...@@ -92,7 +87,7 @@ class VcfStatsTest extends TestNGSuite with Matchers {
s1.genotypeStats shouldBe mutable.Map("1" -> mutable.Map(1 -> 1), "2" -> mutable.Map(2 -> 2)) s1.genotypeStats shouldBe mutable.Map("1" -> mutable.Map(1 -> 1), "2" -> mutable.Map(2 -> 2))
ss1.alleleOverlap = 1 ss1.alleleOverlap = 1
ss2.alleleOverlap = 2 ss2.alleleOverlap = 2
s1.sampleToSample shouldBe mutable.Map("s1" -> ss1, "s2" -> ss2) s1.sampleToSample shouldBe Array(ss1, ss2)
s1 += s2 s1 += s2
s1.genotypeStats shouldBe mutable.Map("1" -> mutable.Map(1 -> 1), "2" -> mutable.Map(2 -> 4)) s1.genotypeStats shouldBe mutable.Map("1" -> mutable.Map(1 -> 1), "2" -> mutable.Map(2 -> 4))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment