Commit b4d19655 authored by Peter van 't Hof's avatar Peter van 't Hof

Adding a filter step for empty regions

parent 578e9e69
......@@ -80,9 +80,14 @@ object VcfStats extends ToolCommand {
case _ => BedRecordList.fromReference(cmdArgs.referenceFile)
}).combineOverlap
.scatter(cmdArgs.binSize, maxContigsInSingleJob = Some(cmdArgs.maxContigsInSingleJob))
val contigs = regions.flatMap(_.map(_.chr))
val nonEmptyRegions = BedRecordList
.fromList(
sc.parallelize(regions, regions.size).flatMap(filterEmptyBins(_, cmdArgs)).collect())
.scatter(cmdArgs.binSize, maxContigsInSingleJob = Some(cmdArgs.maxContigsInSingleJob))
val contigs = nonEmptyRegions.flatMap(_.map(_.chr))
val bigContigs = regions.filter(_.size == 1).flatten.groupBy(_.chr).map {
val bigContigs = nonEmptyRegions.filter(_.size == 1).flatten.groupBy(_.chr).map {
case (_, r) =>
val rdds = r
.grouped(10)
......@@ -95,23 +100,26 @@ object VcfStats extends ToolCommand {
.toList
val steps = Math.log10(rdds.size).ceil.toInt
sc.union((1 until steps)
.foldLeft(rdds) {
case (a, _) =>
if (a.size > 10)
a.grouped(10)
.map { g =>
sc.union(g)
.reduceByKey(_ += _)
.repartition(1)
}
.toList
else a
})
.reduceByKey(_ += _)
.repartition(1)
val lastRdds = (1 until steps)
.foldLeft(rdds) {
case (a, _) =>
if (a.size > 10)
a.grouped(10)
.map { g =>
sc.union(g)
.reduceByKey(_ += _)
.repartition(1)
}
.toList
else a
}
if (lastRdds.size > 1) {
sc.union(lastRdds)
.reduceByKey(_ += _)
.repartition(1)
} else lastRdds.head
}
val smallContigs = regions.filter(_.size > 1).map { r =>
val smallContigs = nonEmptyRegions.filter(_.size > 1).map { r =>
sc.parallelize(r.map(List(_)), r.size)
.flatMap(readBins(_, samples, cmdArgs, adInfoTags, adGenotypeTags))
.reduceByKey(_ += _)
......@@ -224,6 +232,21 @@ object VcfStats extends ToolCommand {
stats.flatten
}
def filterEmptyBins(bedRecords: List[BedRecord], cmdArgs: Args): List[BedRecord] = {
val reader = new VCFFileReader(cmdArgs.inputFile, true)
val stats = for (bedRecord <- bedRecords) yield {
val samInterval = bedRecord.toSamInterval
val query = reader.query(samInterval.getContig, samInterval.getStart, samInterval.getEnd)
if (query.nonEmpty) Some(bedRecord)
else None
}
reader.close()
stats.flatten
}
val defaultGenotypeFields =
List("DP", "GQ", "AD", "AD-ref", "AD-alt", "AD-used", "AD-not_used", "general")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment