Commit 08448483 authored by pjvan_thof's avatar pjvan_thof

WIP

parent 88d51c42
......@@ -163,6 +163,12 @@
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
<filter>
<artifact>javax.servlet:servlet-api:2.5</artifact>
<excludes>
<exclude>*.*</exclude>
</excludes>
</filter>
</filters>
</configuration>
<executions>
......
......@@ -7,7 +7,7 @@ import htsjdk.variant.vcf.VCFFileReader
import nl.lumc.sasc.biopet.tools.vcfstats.VcfStats._
import nl.lumc.sasc.biopet.utils.{ConfigUtils, FastaUtils, ToolCommand, VcfUtils}
import nl.lumc.sasc.biopet.utils.intervals.{BedRecord, BedRecordList}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
import scala.collection.JavaConversions._
import scala.concurrent.{Await, Future}
......@@ -25,7 +25,7 @@ object VcfStatsSpark extends ToolCommand {
class OptParser extends AbstractOptParser {
opt[File]('I', "inputFile") required () unbounded () maxOccurs 1 valueName "<file>" action {
(x, c) =>
c.copy(inputFile = x)
c.copy(inputFile = x.getAbsoluteFile)
} validate { x =>
if (x.exists) success else failure("Input VCF required")
} text "Input VCF file (required)"
......@@ -122,10 +122,17 @@ object VcfStatsSpark extends ToolCommand {
logger.info("Init spark context")
val jars = ClassLoader.getSystemClassLoader
.asInstanceOf[URLClassLoader]
.getURLs
.map(_.getFile) ++ List(
"/home/pjvan_thof/src/biopet/biopet-utils/target/BiopetUtils-0.10.0-SNAPSHOT.jar",
"/home/pjvan_thof/src/biopet/biopet-tools/target/BiopetTools-0.10.0-SNAPSHOT.jar"
)
val conf = new SparkConf()
.setAppName(this.getClass.getSimpleName)
.setMaster(cmdArgs.sparkMaster.getOrElse(s"local[${cmdArgs.localThreads}]"))
.setJars(ClassLoader.getSystemClassLoader.asInstanceOf[URLClassLoader].getURLs.map(_.getFile))
.setJars(jars)
val sc = new SparkContext(conf)
logger.info("Spark context is up")
......@@ -136,41 +143,29 @@ object VcfStatsSpark extends ToolCommand {
}).combineOverlap
.scatter(cmdArgs.binSize)
.flatten
val contigs = regions.map(_.chr).distinct
val regionStats = sc.parallelize(regions, regions.size).groupBy(_.chr).map {
case (contig, records) =>
contig -> records.map(readBin(_, samples, cmdArgs, adInfoTags, adGenotypeTags))
val regionStats = sc.parallelize(regions, regions.size).map { record =>
record.chr -> (readBin(record, samples, cmdArgs, adInfoTags, adGenotypeTags), record)
}
val chrStats =
regionStats.map { case (contig, stats) => contig -> stats.reduce(_ += _) }.cache()
val contigOverlap = chrStats.map {
case (contig, stats) =>
writeOverlap(stats,
_.genotypeOverlap,
cmdArgs.outputDir + s"/sample_compare/contigs/$contig/genotype_overlap",
samples,
cmdArgs.contigSampleOverlapPlots)
writeOverlap(stats,
_.alleleOverlap,
cmdArgs.outputDir + s"/sample_compare/contigs/$contig/allele_overlap",
samples,
cmdArgs.contigSampleOverlapPlots)
}
val f1 = (s:Stats, b:BedRecord) => s
val f3 = (s1:Stats, s: Stats) => s1 += s
val chrStats = regionStats.combineByKey(
createCombiner = (x: (Stats, BedRecord)) => x._1,
mergeValue = (x: Stats,b: (Stats, BedRecord)) => x += b._1,
mergeCombiners = (x: Stats,y: Stats) => x += y,
partitioner = new HashPartitioner(contigs.size),
mapSideCombine = true)
//val chrStats = regionStats.aggregateByKey(Stats.emptyStats(samples)) (_ += _._1, _ += _)
val totalStats = chrStats.values.reduce(_ += _) // Blocking
val totalStats = chrStats.aggregate(Stats.emptyStats(samples)) (_ += _._2, _ += _)
//Await.ready(contigOverlap, Duration.Inf)
val allWriter = new PrintWriter(new File(cmdArgs.outputDir, "stats.json"))
val json = ConfigUtils.mapToJson(
totalStats.getAllStats(
FastaUtils.getCachedDict(cmdArgs.referenceFile).getSequences.map(_.getSequenceName).toList,
samples,
adGenotypeTags,
adInfoTags,
sampleDistributions))
totalStats.getAllStats(contigs, samples, adGenotypeTags, adInfoTags, sampleDistributions))
allWriter.println(json.nospaces)
allWriter.close()
......@@ -185,6 +180,8 @@ object VcfStatsSpark extends ToolCommand {
cmdArgs.outputDir + "/sample_compare/allele_overlap",
samples)
Thread.sleep(1000000)
sc.stop
logger.info("Done")
}
......
......@@ -34,6 +34,17 @@
<groupId>com.typesafe.play</groupId>
<artifactId>play-ws_2.11</artifactId>
<version>2.5.15</version>
<exclusions>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment