Commit fd22b885 authored by Peter van 't Hof's avatar Peter van 't Hof

Added thread scattering

parent b178746f
......@@ -11,6 +11,7 @@ import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.sys.process.{ Process, ProcessLogger }
import scala.util.matching.Regex
import htsjdk.samtools.util.Interval
/**
* Created by pjvan_thof on 1/10/15.
......@@ -20,7 +21,7 @@ class VcfStats {
}
object VcfStats extends ToolCommand {
case class Args(inputFile: File = null, outputDir: String = null) extends AbstractArgs
case class Args(inputFile: File = null, outputDir: String = null, intervals: Option[File] = None) extends AbstractArgs
class OptParser extends AbstractOptParser {
opt[File]('I', "inputFile") required () unbounded () valueName ("<file>") action { (x, c) =>
......@@ -29,6 +30,11 @@ object VcfStats extends ToolCommand {
opt[String]('o', "outputDir") required () unbounded () valueName ("<file>") action { (x, c) =>
c.copy(outputDir = x)
}
/*
opt[File]('i', "intervals") unbounded () valueName ("<file>") action { (x, c) =>
c.copy(intervals = Some(x))
}
*/
}
class SampleToSampleStats {
......@@ -62,7 +68,7 @@ object VcfStats extends ToolCommand {
val generalStats: mutable.Map[String, mutable.Map[Any, Int]] = mutable.Map()
val samplesStats: mutable.Map[String, SampleStats] = mutable.Map()
def +=(other: Stats): Unit = {
def +=(other: Stats): Stats = {
for ((key, value) <- other.samplesStats) {
if (this.samplesStats.contains(key)) this.samplesStats(key) += value
else this.samplesStats(key) = value
......@@ -72,6 +78,7 @@ object VcfStats extends ToolCommand {
if (thisField.isDefined) mergeStatsMap(thisField.get, fieldMap)
else this.generalStats += field -> fieldMap
}
this
}
}
......@@ -101,40 +108,77 @@ object VcfStats extends ToolCommand {
val argsParser = new OptParser
commandArgs = argsParser.parse(args, Args()) getOrElse sys.exit(1)
val reader = new VCFFileReader(commandArgs.inputFile, false)
val reader = new VCFFileReader(commandArgs.inputFile, true)
val header = reader.getFileHeader
val samples = header.getSampleNamesInOrder.toList
val intervals: List[Interval] = (
for (seq <- header.getSequenceDictionary.getSequences;
chunks = seq.getSequenceLength / 10000000;
i <- 1 until chunks) yield {
val size = seq.getSequenceLength / chunks
val begin = size * (i-1) + 1
val end = if (i >= chunks) seq.getSequenceLength else size * i
new Interval(seq.getSequenceName, begin, end)
}
).toList
val totalBases = intervals.foldRight(0L)(_.length() + _)
// Reading vcf records
logger.info("Start reading vcf records")
var counter = 0
val stats = new Stats
//init stats
for (sample1 <- samples) {
stats.samplesStats += sample1 -> new SampleStats
for (sample2 <- samples) {
stats.samplesStats(sample1).sampleToSample += sample2 -> new SampleToSampleStats
}
}
for (record <- reader) yield {
mergeNestedStatsMap(stats.generalStats, checkGeneral(record))
for (sample1 <- samples) yield {
val genotype = record.getGenotype(sample1)
mergeNestedStatsMap(stats.samplesStats(sample1).genotypeStats, checkGenotype(genotype))
def createStats: Stats = {
val stats = new Stats
//init stats
for (sample1 <- samples) {
stats.samplesStats += sample1 -> new SampleStats
for (sample2 <- samples) {
val genotype2 = record.getGenotype(sample2)
if (genotype.getAlleles == genotype2.getAlleles)
stats.samplesStats(sample1).sampleToSample(sample2).genotypeOverlap += 1
stats.samplesStats(sample1).sampleToSample(sample2).alleleOverlap += genotype.getAlleles.count(allele => genotype2.getAlleles.exists(_.basesMatch(allele)))
stats.samplesStats(sample1).sampleToSample += sample2 -> new SampleToSampleStats
}
}
stats
}
var variantCounter = 0L
var baseCounter = 0L
def status(count: Int, interval:Interval): Unit = {
variantCounter += count
baseCounter += interval.length()
val fraction = baseCounter.toFloat / totalBases * 100
logger.info(interval + " done, " + count + " rows processed")
logger.info("total: " + variantCounter + " rows processed, " + fraction + "% done")
}
counter += 1
if (counter % 100000 == 0) logger.info(counter + " variants done")
val statsChunks = for (interval <- intervals.par) yield {
val reader = new VCFFileReader(commandArgs.inputFile, true)
var chunkCounter = 0
val stats = createStats
logger.info("Starting on: " + interval)
for (record <- reader.query(interval.getSequence, interval.getStart, interval.getEnd)
if record.getStart <= interval.getEnd) {
mergeNestedStatsMap(stats.generalStats, checkGeneral(record))
for (sample1 <- samples) yield {
val genotype = record.getGenotype(sample1)
mergeNestedStatsMap(stats.samplesStats(sample1).genotypeStats, checkGenotype(genotype))
for (sample2 <- samples) {
val genotype2 = record.getGenotype(sample2)
if (genotype.getAlleles == genotype2.getAlleles)
stats.samplesStats(sample1).sampleToSample(sample2).genotypeOverlap += 1
stats.samplesStats(sample1).sampleToSample(sample2).alleleOverlap += genotype.getAlleles.count(allele => genotype2.getAlleles.exists(_.basesMatch(allele)))
}
}
chunkCounter += 1
}
status(chunkCounter, interval)
stats
}
logger.info(counter + " variants done")
val stats = statsChunks.toList.fold(createStats)(_ += _)
//logger.info(counter + " variants done")
logger.info("Done reading vcf records")
plotXy(writeField("QUAL", stats.generalStats.getOrElse("QUAL", mutable.Map())))
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment