Commit 945050cd authored by pjvan_thof's avatar pjvan_thof

WIP: Spark implementation

parent 03c4d7d7
......@@ -29,6 +29,11 @@
<artifactId>BiopetTools</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>nl.lumc.sasc</groupId>
<artifactId>BiopetUtils</artifactId>
......
package nl.lumc.sasc.biopet.tools
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
/**
* Created by pjvan_thof on 14-7-17.
*/
object SparkTest {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName(this.getClass.getSimpleName)
.setMaster("local[*]")
val sc = new SparkContext(conf)
def sleep(i: Int): Int = {
Thread.sleep(1000)
i
}
val bla = sc.parallelize(1 until 1000).map(sleep)
val bla2 = bla.reduce((a, b) => a + b)
println(bla2)
Thread.sleep(1000000)
sc.stop
}
}
......@@ -191,6 +191,18 @@ case class Stats(generalStats: mutable.Map[String, mutable.Map[String, mutable.M
object Stats {
def emptyStats(samples: List[String]): Stats = {
val stats = new Stats
//init stats
for (sample1 <- samples) {
stats.samplesStats += sample1 -> new SampleStats
for (sample2 <- samples) {
stats.samplesStats(sample1).sampleToSample += sample2 -> new SampleToSampleStats
}
}
stats
}
/** Merge m2 into m1 */
def mergeStatsMap(m1: mutable.Map[Any, Int], m2: mutable.Map[Any, Int]): Unit = {
for (key <- m2.keySet)
......
......@@ -31,6 +31,9 @@ import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
/**
* This tool will generate statistics from a vcf file
*
......@@ -50,7 +53,9 @@ object VcfStats extends ToolCommand {
binSize: Int = 10000000,
writeBinStats: Boolean = false,
generalWiggle: List[String] = Nil,
genotypeWiggle: List[String] = Nil)
genotypeWiggle: List[String] = Nil,
localThreads: Int = 1,
sparkMaster: Option[String] = None)
extends AbstractArgs
private val generalWiggleOptions = List(
......@@ -144,6 +149,13 @@ object VcfStats extends ToolCommand {
if (genotypeWiggleOptions.contains(x)) success else failure(s"""Non-existent field $x""")
} text s"""Create a wiggle track with bin size <binSize> for any of the following genotype fields:
|${genotypeWiggleOptions.mkString(", ")}""".stripMargin
opt[Int]('t',"localThreads")unbounded () action { (x, c) =>
c.copy(localThreads = x)
} text s"Number of local threads to use"
opt[String]("sparkMaster")unbounded () action { (x, c) =>
c.copy(sparkMaster = Some(x))
} text s"Spark master to use"
}
protected var cmdArgs: Args = _
......@@ -174,6 +186,15 @@ object VcfStats extends ToolCommand {
val argsParser = new OptParser
cmdArgs = argsParser.parse(args, Args()) getOrElse (throw new IllegalArgumentException)
logger.info("Init spark context")
val conf = new SparkConf()
.setAppName(this.getClass.getSimpleName)
.setMaster(cmdArgs.sparkMaster.getOrElse(s"local[${cmdArgs.localThreads}]"))
val sparkContext = new SparkContext(conf)
logger.info("Spark context is up")
val reader = new VCFFileReader(cmdArgs.inputFile, true)
val header = reader.getFileHeader
val samples = header.getSampleNamesInOrder.toList
......@@ -218,18 +239,6 @@ object VcfStats extends ToolCommand {
// Reading vcf records
logger.info("Start reading vcf records")
def createStats: Stats = {
val stats = new Stats
//init stats
for (sample1 <- samples) {
stats.samplesStats += sample1 -> new SampleStats
for (sample2 <- samples) {
stats.samplesStats(sample1).sampleToSample += sample2 -> new SampleToSampleStats
}
}
stats
}
var variantCounter = 0L
var baseCounter = 0L
......@@ -250,7 +259,7 @@ object VcfStats extends ToolCommand {
val binStats = for (interval <- intervals.par) yield {
val reader = new VCFFileReader(cmdArgs.inputFile, true)
var chunkCounter = 0
val stats = createStats
val stats = Stats.emptyStats(samples)
logger.info("Starting on: " + interval)
val samInterval = interval.toSamInterval
......@@ -301,11 +310,11 @@ object VcfStats extends ToolCommand {
status(chunkCounter, samInterval)
stats
}
binStats.toList.fold(createStats)(_ += _)
binStats.toList.fold(Stats.emptyStats(samples))(_ += _)
}
chunkStats.toList.fold(createStats)(_ += _)
chunkStats.toList.fold(Stats.emptyStats(samples))(_ += _)
}
val stats = statsFutures.foldLeft(createStats) {
val stats = statsFutures.foldLeft(Stats.emptyStats(samples)) {
case (a, b) => a += Await.result(b, Duration.Inf)
}
......@@ -344,6 +353,7 @@ object VcfStats extends ToolCommand {
cmdArgs.outputDir + "/sample_compare/allele_overlap",
samples)
sparkContext.stop()
logger.info("Done")
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment