Commit 73754420 authored by Peter van 't Hof's avatar Peter van 't Hof

Move contigs stats to spark logic

parent 6bad5603
......@@ -22,9 +22,8 @@ import scala.collection.mutable
* @param genotypeStats Stores all genotype relative stats
* @param sampleToSample Stores sample to sample compare stats
*/
case class SampleStats(
genotypeStats: mutable.Map[String, mutable.Map[Any, Int]] = mutable.Map(),
sampleToSample: mutable.Map[String, SampleToSampleStats] = mutable.Map()) {
case class SampleStats(genotypeStats: mutable.Map[String, mutable.Map[Any, Int]] = mutable.Map(),
sampleToSample: mutable.Map[String, SampleToSampleStats] = mutable.Map()) {
/** Add an other class */
def +=(other: SampleStats): Unit = {
......
......@@ -30,8 +30,7 @@ import scala.sys.process.{Process, ProcessLogger}
* @param generalStats Stores are general stats
* @param samplesStats Stores all sample/genotype specific stats
*/
case class Stats(generalStats: mutable.Map[String, mutable.Map[Any, Int]] =
mutable.Map(),
case class Stats(generalStats: mutable.Map[String, mutable.Map[Any, Int]] = mutable.Map(),
samplesStats: mutable.Map[String, SampleStats] = mutable.Map()) {
/** Add an other class */
......@@ -49,9 +48,7 @@ case class Stats(generalStats: mutable.Map[String, mutable.Map[Any, Int]] =
}
/** Function to write 1 specific general field */
def writeField(field: String,
outputDir: File,
prefix: String = ""): File = {
def writeField(field: String, outputDir: File, prefix: String = ""): File = {
val file = prefix match {
case "" => new File(outputDir, field + ".tsv")
case _ => new File(outputDir, prefix + "-" + field + ".tsv")
......@@ -115,8 +112,7 @@ case class Stats(generalStats: mutable.Map[String, mutable.Map[Any, Int]] =
}
/** Function to write 1 specific genotype field */
def getGenotypeField(samples: List[String],
field: String): Map[String, Map[String, Any]] = {
def getGenotypeField(samples: List[String], field: String): Map[String, Map[String, Any]] = {
val keySet = (for (sample <- samples)
yield
this
......@@ -143,9 +139,9 @@ case class Stats(generalStats: mutable.Map[String, mutable.Map[Any, Int]] =
/** This will generate stats for one contig */
def getStatsAsMap(samples: List[String],
genotypeFields: List[String] = Nil,
infoFields: List[String] = Nil,
sampleDistributions: List[String] = Nil): Map[String, Any] = {
genotypeFields: List[String] = Nil,
infoFields: List[String] = Nil,
sampleDistributions: List[String] = Nil): Map[String, Any] = {
Map(
"genotype" -> genotypeFields.map(f => f -> getGenotypeField(samples, f)).toMap,
"info" -> infoFields.map(f => f -> getField(f)).toMap,
......@@ -153,16 +149,28 @@ case class Stats(generalStats: mutable.Map[String, mutable.Map[Any, Int]] =
.map(f => f -> getField("SampleDistribution-" + f))
.toMap
) ++ Map(
"sample_compare" -> Map(
"samples" -> samples,
"genotype_overlap" -> samples.map(sample1 =>
samples.map(sample2 =>
samplesStats(sample1).sampleToSample(sample2).genotypeOverlap)),
"allele_overlap" -> samples.map(sample1 =>
samples.map(sample2 =>
samplesStats(sample1).sampleToSample(sample2).alleleOverlap))
)
)
"sample_compare" -> Map(
"samples" -> samples,
"genotype_overlap" -> samples.map(sample1 =>
samples.map(sample2 => samplesStats(sample1).sampleToSample(sample2).genotypeOverlap)),
"allele_overlap" -> samples.map(sample1 =>
samples.map(sample2 => samplesStats(sample1).sampleToSample(sample2).alleleOverlap))
)
)
}
def writeAllOutput(outputDir: File,
samples: List[String],
genotypeFields: List[String] = Nil,
infoFields: List[String] = Nil,
sampleDistributions: List[String] = Nil): Unit = {
outputDir.mkdirs()
this.writeToFile(new File(outputDir, "stats.json"),
samples,
genotypeFields,
infoFields,
sampleDistributions)
writeOverlap(outputDir, samples)
}
def writeToFile(outputFile: File,
......@@ -178,19 +186,15 @@ case class Stats(generalStats: mutable.Map[String, mutable.Map[Any, Int]] =
}
def writeOverlap(outputDir: File, samples: List[String]): Unit = {
this.writeOverlap(_.genotypeOverlap,
outputDir + "/sample_compare/genotype_overlap",
samples)
this.writeOverlap(_.alleleOverlap,
outputDir + "/sample_compare/allele_overlap",
samples)
this.writeOverlap(_.genotypeOverlap, outputDir + "/sample_compare/genotype_overlap", samples)
this.writeOverlap(_.alleleOverlap, outputDir + "/sample_compare/allele_overlap", samples)
}
/** Function to write sample to sample compare tsv's / heatmaps */
private def writeOverlap(function: SampleToSampleStats => Int,
prefix: String,
samples: List[String],
plots: Boolean = true): Unit = {
prefix: String,
samples: List[String],
plots: Boolean = true): Unit = {
val absFile = new File(prefix + ".abs.tsv")
val relFile = new File(prefix + ".rel.tsv")
......
......@@ -6,11 +6,14 @@ import java.net.URLClassLoader
import htsjdk.variant.variantcontext.{Genotype, VariantContext}
import htsjdk.variant.vcf.VCFFileReader
import nl.lumc.sasc.biopet.utils.intervals.{BedRecord, BedRecordList}
import nl.lumc.sasc.biopet.utils.{ToolCommand, VcfUtils}
import nl.lumc.sasc.biopet.utils.{FastaUtils, ToolCommand, VcfUtils}
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
/**
* Created by pjvanthof on 14/07/2017.
......@@ -74,17 +77,34 @@ object VcfStats extends ToolCommand {
case _ => BedRecordList.fromReference(cmdArgs.referenceFile)
}).combineOverlap
.scatter(cmdArgs.binSize, maxContigsInSingleJob = Some(cmdArgs.maxContigsInSingleJob))
val contigs = regions.flatMap(_.map(_.chr)).distinct
val regionStats = sc
.parallelize(regions, regions.size)
.map(readBin(_, samples, cmdArgs, adInfoTags, adGenotypeTags))
.cache()
val totalStats = Future(regionStats.aggregate(Stats.emptyStats(samples))(_ += _._1, _ += _))
.map(
_.writeAllOutput(cmdArgs.outputDir,
samples,
adGenotypeTags,
adInfoTags,
sampleDistributions))
regionStats
.flatMap(_._2)
.aggregateByKey(Stats.emptyStats(samples))(_ += _, _ += _)
.foreach {
case (k, v) =>
v.writeAllOutput(new File(cmdArgs.outputDir, "contigs" + File.separator + k),
samples,
adGenotypeTags,
adInfoTags,
sampleDistributions)
}
regionStats.unpersist()
val totalStats = regionStats.reduce(_ += _)
totalStats.writeToFile(new File(cmdArgs.outputDir, "stats.json"), samples, adGenotypeTags, adInfoTags, sampleDistributions)
totalStats.writeOverlap(cmdArgs.outputDir, samples)
Await.result(totalStats, Duration.Inf)
sc.stop
logger.info("Done")
......@@ -94,11 +114,14 @@ object VcfStats extends ToolCommand {
samples: List[String],
cmdArgs: Args,
adInfoTags: List[String],
adGenotypeTags: List[String]): Stats = {
adGenotypeTags: List[String]): (Stats, List[(String, Stats)]) = {
val reader = new VCFFileReader(cmdArgs.inputFile, true)
val stats = Stats.emptyStats(samples)
val totalStats = Stats.emptyStats(samples)
val dict = FastaUtils.getDictFromFasta(cmdArgs.referenceFile)
val nonCompleteContigs = for (bedRecord <- bedRecords) yield {
val stats = Stats.emptyStats(samples)
for (bedRecord <- bedRecords) {
logger.info("Starting on: " + bedRecord)
val samInterval = bedRecord.toSamInterval
......@@ -128,10 +151,22 @@ object VcfStats extends ToolCommand {
}
}
}
totalStats += stats
if (bedRecord.length == dict.getSequence(bedRecord.chr).getSequenceLength) {
Future {
stats.writeAllOutput(
new File(cmdArgs.outputDir, "contigs" + File.separator + bedRecord.chr),
samples,
adGenotypeTags,
adInfoTags,
sampleDistributions)
None
}
} else Future.successful(Some(bedRecord.chr -> stats))
}
reader.close()
stats
(totalStats, Await.result(Future.sequence(nonCompleteContigs), Duration.Inf).flatten)
}
val defaultGenotypeFields =
......@@ -153,10 +188,9 @@ object VcfStats extends ToolCommand {
"Variant")
/** Function to check sample/genotype specific stats */
protected[tools] def checkGenotype(
record: VariantContext,
genotype: Genotype,
additionalTags: List[String]): Map[String, Map[Any, Int]] = {
protected[tools] def checkGenotype(record: VariantContext,
genotype: Genotype,
additionalTags: List[String]): Map[String, Map[Any, Int]] = {
val buffer = mutable.Map[String, Map[Any, Int]]()
def addToBuffer(key: String, value: Any, found: Boolean): Unit = {
......@@ -208,9 +242,8 @@ object VcfStats extends ToolCommand {
}
/** Function to check all general stats, all info expect sample/genotype specific stats */
protected[tools] def checkGeneral(
record: VariantContext,
additionalTags: List[String]): Map[String, Map[Any, Int]] = {
protected[tools] def checkGeneral(record: VariantContext,
additionalTags: List[String]): Map[String, Map[Any, Int]] = {
val buffer = mutable.Map[String, Map[Any, Int]]()
def addToBuffer(key: String, value: Any, found: Boolean): Unit = {
......@@ -290,8 +323,7 @@ object VcfStats extends ToolCommand {
buffer.toMap
}
protected[tools] def fillGeneral(
additionalTags: List[String]): Map[String, Map[Any, Int]] = {
protected[tools] def fillGeneral(additionalTags: List[String]): Map[String, Map[Any, Int]] = {
val buffer = mutable.Map[String, Map[Any, Int]]()
def addToBuffer(key: String, value: Any, found: Boolean): Unit = {
......@@ -344,8 +376,7 @@ object VcfStats extends ToolCommand {
buffer.toMap
}
protected[tools] def fillGenotype(
additionalTags: List[String]): Map[String, Map[Any, Int]] = {
protected[tools] def fillGenotype(additionalTags: List[String]): Map[String, Map[Any, Int]] = {
val buffer = mutable.Map[String, Map[Any, Int]]()
def addToBuffer(key: String, value: Any, found: Boolean): Unit = {
......
......@@ -89,22 +89,16 @@ class VcfStatsTest extends TestNGSuite with Matchers {
val ss2 = SampleToSampleStats()
s1 += s2
s1.genotypeStats shouldBe mutable.Map(
"1" -> mutable.Map(1 -> 1),
"2" -> mutable.Map(2 -> 2))
s1.genotypeStats shouldBe mutable.Map("1" -> mutable.Map(1 -> 1), "2" -> mutable.Map(2 -> 2))
ss1.alleleOverlap = 1
ss2.alleleOverlap = 2
s1.sampleToSample shouldBe mutable.Map("s1" -> ss1, "s2" -> ss2)
s1 += s2
s1.genotypeStats shouldBe mutable.Map(
"1" -> mutable.Map(1 -> 1),
"2" -> mutable.Map(2 -> 4))
s1.genotypeStats shouldBe mutable.Map("1" -> mutable.Map(1 -> 1), "2" -> mutable.Map(2 -> 4))
s1 += s1
s1.genotypeStats shouldBe mutable.Map(
"1" -> mutable.Map(1 -> 2),
"2" -> mutable.Map(2 -> 8))
s1.genotypeStats shouldBe mutable.Map("1" -> mutable.Map(1 -> 2), "2" -> mutable.Map(2 -> 8))
}
@Test
......@@ -130,15 +124,16 @@ class VcfStatsTest extends TestNGSuite with Matchers {
@Test
def testMergeNestedStatsMap(): Unit = {
val m1: mutable.Map[String, mutable.Map[Any, Int]] = mutable.Map("nested" -> mutable.Map("a" -> 1))
val m1: mutable.Map[String, mutable.Map[Any, Int]] =
mutable.Map("nested" -> mutable.Map("a" -> 1))
val m2: Map[String, Map[Any, Int]] = Map("nested" -> Map("b" -> 2))
Stats.mergeNestedStatsMap(m1, m2)
m1 should equal(
mutable.Map("nested" -> mutable.Map("a" -> 1, "b" -> 2)))
m1 should equal(mutable.Map("nested" -> mutable.Map("a" -> 1, "b" -> 2)))
val m3: mutable.Map[String, mutable.Map[Any, Int]] = mutable.Map("nestedd" -> mutable.Map(1 -> 500))
val m3: mutable.Map[String, mutable.Map[Any, Int]] =
mutable.Map("nestedd" -> mutable.Map(1 -> 500))
val m4: Map[String, Map[Any, Int]] = Map("nestedd" -> Map(6 -> 125))
Stats.mergeNestedStatsMap(m3, m4)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment