Commit 4d76382d authored by pjvan_thof's avatar pjvan_thof
Browse files

Only open file once per method call

parent ba3725c3
......@@ -7,7 +7,7 @@ import htsjdk.variant.vcf.VCFFileReader
import nl.lumc.sasc.biopet.tools.vcfstats.VcfStats._
import nl.lumc.sasc.biopet.utils.intervals.{BedRecord, BedRecordList}
import nl.lumc.sasc.biopet.utils.{ConfigUtils, ToolCommand, VcfUtils}
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.JavaConversions._
......@@ -73,22 +73,13 @@ object VcfStatsSpark extends ToolCommand {
case _ => BedRecordList.fromReference(cmdArgs.referenceFile)
}).combineOverlap
.scatter(cmdArgs.binSize)
.flatten
val contigs = regions.map(_.chr).distinct
val contigs = regions.flatMap(_.map(_.chr)).distinct
val regionStats = sc.parallelize(regions, regions.size).map { record =>
record.chr -> (readBin(record, samples, cmdArgs, adInfoTags, adGenotypeTags), record)
}
val chrStats = regionStats.combineByKey(
createCombiner = (x: (Stats, BedRecord)) => x._1,
mergeValue = (x: Stats, b: (Stats, BedRecord)) => x += b._1,
mergeCombiners = (x: Stats, y: Stats) => x += y,
partitioner = new HashPartitioner(contigs.size),
mapSideCombine = true
)
val regionStats = sc
.parallelize(regions, regions.size)
.map(readBin(_, samples, cmdArgs, adInfoTags, adGenotypeTags))
val totalStats = chrStats.aggregate(Stats.emptyStats(samples))(_ += _._2, _ += _)
val totalStats = regionStats.reduce(_ += _)
val allWriter = new PrintWriter(new File(cmdArgs.outputDir, "stats.json"))
val json = ConfigUtils.mapToJson(
......@@ -111,7 +102,7 @@ object VcfStatsSpark extends ToolCommand {
logger.info("Done")
}
def readBin(bedRecord: BedRecord,
def readBin(bedRecords: List[BedRecord],
samples: List[String],
cmdArgs: Args,
adInfoTags: List[String],
......@@ -119,36 +110,39 @@ object VcfStatsSpark extends ToolCommand {
val reader = new VCFFileReader(cmdArgs.inputFile, true)
var chunkCounter = 0
val stats = Stats.emptyStats(samples)
logger.info("Starting on: " + bedRecord)
val samInterval = bedRecord.toSamInterval
for (bedRecord <- bedRecords) {
logger.info("Starting on: " + bedRecord)
val samInterval = bedRecord.toSamInterval
val query =
reader.query(samInterval.getContig, samInterval.getStart, samInterval.getEnd)
if (!query.hasNext) {
Stats.mergeNestedStatsMap(stats.generalStats, fillGeneral(adInfoTags))
for (sample <- samples) yield {
Stats.mergeNestedStatsMap(stats.samplesStats(sample).genotypeStats,
fillGenotype(adGenotypeTags))
val query =
reader.query(samInterval.getContig, samInterval.getStart, samInterval.getEnd)
if (!query.hasNext) {
Stats.mergeNestedStatsMap(stats.generalStats, fillGeneral(adInfoTags))
for (sample <- samples) yield {
Stats.mergeNestedStatsMap(stats.samplesStats(sample).genotypeStats,
fillGenotype(adGenotypeTags))
}
chunkCounter += 1
}
chunkCounter += 1
}
for (record <- query if record.getStart <= samInterval.getEnd) {
Stats.mergeNestedStatsMap(stats.generalStats, checkGeneral(record, adInfoTags))
for (sample1 <- samples) yield {
val genotype = record.getGenotype(sample1)
Stats.mergeNestedStatsMap(stats.samplesStats(sample1).genotypeStats,
checkGenotype(record, genotype, adGenotypeTags))
for (sample2 <- samples) {
val genotype2 = record.getGenotype(sample2)
if (genotype.getAlleles == genotype2.getAlleles)
stats.samplesStats(sample1).sampleToSample(sample2).genotypeOverlap += 1
stats.samplesStats(sample1).sampleToSample(sample2).alleleOverlap += VcfUtils
.alleleOverlap(genotype.getAlleles.toList, genotype2.getAlleles.toList)
for (record <- query if record.getStart <= samInterval.getEnd) {
Stats.mergeNestedStatsMap(stats.generalStats, checkGeneral(record, adInfoTags))
for (sample1 <- samples) yield {
val genotype = record.getGenotype(sample1)
Stats.mergeNestedStatsMap(stats.samplesStats(sample1).genotypeStats,
checkGenotype(record, genotype, adGenotypeTags))
for (sample2 <- samples) {
val genotype2 = record.getGenotype(sample2)
if (genotype.getAlleles == genotype2.getAlleles)
stats.samplesStats(sample1).sampleToSample(sample2).genotypeOverlap += 1
stats.samplesStats(sample1).sampleToSample(sample2).alleleOverlap += VcfUtils
.alleleOverlap(genotype.getAlleles.toList, genotype2.getAlleles.toList)
}
}
chunkCounter += 1
}
chunkCounter += 1
}
reader.close()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment