Commit 4a192e0a authored by rhpvorderman's avatar rhpvorderman

Merge remote-tracking branch 'origin/develop' into fix-BIOPET-724

parents d9383457 ab4b7dc7
......@@ -139,9 +139,6 @@ trait ReportBuilder extends ToolCommand {
/** default args that are passed to all page withing the report */
def pageArgs: Map[String, Any] = Map()
private var done = 0
private var total = 0
private var _sampleId: Option[Int] = None
protected[report] def sampleId: Option[Int] = _sampleId
private var _libId: Option[Int] = None
......@@ -220,9 +217,6 @@ trait ReportBuilder extends ToolCommand {
x.copy(subPages = x.subPages ::: generalPages(sampleId, libId))
}
// total = ReportBuilder.countPages(rootPage)
done = 0
logger.info("Generate pages")
val jobsFutures = generatePage(
summary,
......@@ -235,30 +229,10 @@ trait ReportBuilder extends ToolCommand {
"runId" -> cmdArgs.runId)
)
total = jobsFutures.size
logger.info(total + " pages to be generated")
def wait(futures: List[Future[Any]]): Unit = {
try {
Await.result(Future.sequence(futures), Duration.fromNanos(30000000000L))
} catch {
case _: TimeoutException =>
}
val notDone = futures.filter(!_.isCompleted)
done += futures.size - notDone.size
if (notDone.nonEmpty) {
logger.info(s"$done / $total pages are generated")
wait(notDone)
}
}
//jobsFutures.foreach(f => f.onFailure{ case e => throw new RuntimeException(e) })
wait(jobsFutures)
Await.result(Future.sequence(jobsFutures), Duration.Inf)
Await.result(baseFilesFuture, Duration.Inf)
Await.result(jobsFutures, Duration.Inf)
logger.info(s"Done, $done pages generated")
logger.info(s"Done")
}
/** This must be implemented, this will be the root page of the report */
......@@ -281,7 +255,7 @@ trait ReportBuilder extends ToolCommand {
pageFuture: Future[ReportPage],
outputDir: File,
path: List[String] = Nil,
args: Map[String, Any] = Map()): List[Future[ReportPage]] = {
args: Map[String, Any] = Map()): Future[List[ReportPage]] = {
val pageOutputDir = new File(outputDir, path.mkString(File.separator))
def pageArgs(page: ReportPage) = {
......@@ -325,7 +299,10 @@ trait ReportBuilder extends ToolCommand {
page
}
renderFuture :: Await.result(subPageJobs, Duration.Inf)
for {
f1 <- subPageJobs
f2 <- renderFuture
} yield f2 :: f1
}
def pipelineName: String
......@@ -336,15 +313,15 @@ trait ReportBuilder extends ToolCommand {
.getFiles(runId, sample = sampleId.map(SampleId), library = libraryId.map(LibraryId))
.map(_.groupBy(_.pipelineId))
val modulePages = dbFiles.map(_.map {
case (pipelineId, files) =>
val moduleSections = files.groupBy(_.moduleId).map {
case (moduleId, files) =>
case (pipelineId, pFiles) =>
val moduleSections = pFiles.groupBy(_.moduleId).map {
case (moduleId, mFiles) =>
val moduleName: Future[String] = moduleId match {
case Some(id) => summary.getModuleName(pipelineId, id).map(_.getOrElse("Pipeline"))
case _ => Future.successful("Pipeline")
}
moduleName.map(_ -> ReportSection("/nl/lumc/sasc/biopet/core/report/files.ssp",
Map("files" -> files)))
Map("files" -> mFiles)))
}
val moduleSectionsSorted = moduleSections.find(_._1 == "Pipeline") ++ moduleSections
.filter(_._1 != "Pipeline")
......
......@@ -61,7 +61,8 @@ object VcfFilter extends ToolCommand {
diffGenotype: List[(String, String)] = Nil,
filterHetVarToHomVar: List[(String, String)] = Nil,
iDset: Set[String] = Set(),
minGenomeQuality: Int = 0)
minGenomeQuality: Int = 0,
advancedGroups: List[List[String]] = Nil)
class OptParser extends AbstractOptParser[Args](commandName) {
opt[File]('I', "inputVcf") required () maxOccurs 1 valueName "<file>" action { (x, c) =>
......@@ -153,6 +154,9 @@ object VcfFilter extends ToolCommand {
opt[Int]("minGenomeQuality") unbounded () action { (x, c) =>
c.copy(minGenomeQuality = x)
}
opt[String]("advancedGroups") unbounded () action { (x, c) =>
c.copy(advancedGroups = x.split(",").toList :: c.advancedGroups)
} text "All members of groups sprated with a ','"
}
/** @param args the command line arguments */
......@@ -205,6 +209,7 @@ object VcfFilter extends ToolCommand {
denovoTrio(record, cmdArgs.trioLossOfHet, onlyLossHet = true) &&
resToDom(record, cmdArgs.resToDom) &&
trioCompound(record, cmdArgs.trioCompound) &&
advancedGroupFilter(record, cmdArgs.advancedGroups) &&
(cmdArgs.iDset.isEmpty || inIdSet(record, cmdArgs.iDset))) {
writer.add(record)
counterLeft += 1
......@@ -444,4 +449,24 @@ object VcfFilter extends ToolCommand {
def inIdSet(record: VariantContext, idSet: Set[String]): Boolean = {
record.getID.split(",").exists(idSet.contains)
}
/**
* returns true when for all groups all or none members have a variants,
* records with partial groups are discarded
*/
def advancedGroupFilter(record: VariantContext, groups: List[List[String]]): Boolean = {
val samples = record.getGenotypes
.map(a => a.getSampleName -> (a.isHomRef || a.isNoCall || VcfUtils.isCompoundNoCall(a)))
.toMap
val g: List[Option[Boolean]] = groups.map { group =>
val total = group.size
val count = group.count(samples(_))
if (count == 0) Some(false)
else if (total == count) Some(true)
else None
}
!g.contains(None)
}
}
......@@ -64,10 +64,11 @@ object VcfStats extends ToolCommand {
.asInstanceOf[URLClassLoader]
.getURLs
.map(_.getFile)
val conf = new SparkConf()
.setAppName(commandName)
.setMaster(cmdArgs.sparkMaster.getOrElse(s"local[${cmdArgs.localThreads}]"))
.setJars(jars)
val conf = cmdArgs.sparkExecutorMemory.toList.foldLeft(
new SparkConf()
.setAppName(commandName)
.setMaster(cmdArgs.sparkMaster.getOrElse(s"local[${cmdArgs.localThreads}]"))
.setJars(jars))(_.set("spark.executor.memory", _))
val sc = new SparkContext(conf)
logger.info("Spark context is up")
......
......@@ -23,6 +23,7 @@ case class VcfStatsArgs(inputFile: File = null,
writeBinStats: Boolean = false,
localThreads: Int = 1,
sparkMaster: Option[String] = None,
sparkExecutorMemory: Option[String] = None,
contigSampleOverlapPlots: Boolean = false)
/**
......@@ -79,4 +80,7 @@ class VcfStatsOptParser(cmdName: String) extends AbstractOptParser[VcfStatsArgs]
opt[String]("sparkMaster") unbounded () action { (x, c) =>
c.copy(sparkMaster = Some(x))
} text s"Spark master to use"
opt[String]("sparkExecutorMemory") unbounded () action { (x, c) =>
c.copy(sparkExecutorMemory = Some(x))
} text s"Spark executor memory to use"
}
......@@ -261,4 +261,17 @@ class VcfFilterTest extends TestNGSuite with MockitoSugar with Matchers {
inIdSet(record, Set("dummy")) shouldBe false
}
@Test def testAdvancedGroup(): Unit = {
val reader = new VCFFileReader(vepped, false)
val record = reader.iterator().next()
advancedGroupFilter(record, Nil) shouldBe true
advancedGroupFilter(record, List(List("Sample_101", "Sample_102"))) shouldBe true
advancedGroupFilter(record, List(List("Sample_102", "Sample_103"))) shouldBe false
advancedGroupFilter(
record,
List(List("Sample_102", "Sample_103"), List("Sample_101", "Sample_102"))) shouldBe false
advancedGroupFilter(record, List(List("Sample_102"), List("Sample_101", "Sample_102"))) shouldBe true
}
}
......@@ -21,6 +21,8 @@ class GenotypeGvcfs(val parent: Configurable) extends QScript with BiopetQScript
@Input(required = true, shortName = "V")
var inputGvcfs: List[File] = Nil
val writeFinalGvcfFile: Boolean = config("writeFinalGvcfFile", default = true)
var namePrefix: String = config("name_prefix", default = "multisample")
val maxNumberOfFiles: Int = config("max_number_of_files", default = 10)
......@@ -29,31 +31,36 @@ class GenotypeGvcfs(val parent: Configurable) extends QScript with BiopetQScript
def finalVcfFile = new File(outputDir, s"$namePrefix.vcf.gz")
/** Init for pipeline */
def init(): Unit = {}
def init(): Unit = {
inputGvcfs.foreach(inputFiles :+= InputFile(_))
}
/** Pipeline itself */
def biopetScript(): Unit = {
inputGvcfs.foreach(inputFiles :+= InputFile(_))
if (inputGvcfs.size > 1) {
val genotype = new gatk.GenotypeGVCFs(this)
genotype.variant = if (inputGvcfs.size > 1) {
val combineJob = new CombineJob(finalGvcfFile, outputDir, inputGvcfs)
combineJob.allJobs.foreach(add(_))
} else
combineJob.allJobs.filter(job => job.group.nonEmpty || !job.isIntermediate).foreach(add(_))
combineJob.job.variant
} else {
inputGvcfs.headOption.foreach { file =>
add(Ln(this, file, finalGvcfFile))
add(Ln(this, file + ".tbi", finalGvcfFile + ".tbi"))
}
Seq(finalGvcfFile)
}
val genotype = new gatk.GenotypeGVCFs(this)
genotype.variant = List(finalGvcfFile)
genotype.out = finalVcfFile
add(genotype)
}
class CombineJob(outputFile: File, outputDir: File, allInput: List[File], group: List[Int] = Nil) {
private class CombineJob(val outputFile: File,
val outputDir: File,
val allInput: List[File],
val group: List[Int] = Nil) {
val job: gatk.CombineGVCFs = new gatk.CombineGVCFs(qscript)
job.out = outputFile
job.isIntermediate = group.nonEmpty
job.isIntermediate = group.nonEmpty || !writeFinalGvcfFile
val subJobs: ListBuffer[CombineJob] = ListBuffer()
val groupedInput: List[List[File]] = makeEqualGroups(allInput)
if (groupedInput.size == 1) job.variant = groupedInput.head
......
......@@ -42,10 +42,5 @@
<artifactId>BiopetExtensions</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>nl.lumc.sasc</groupId>
<artifactId>BiopetToolsExtensions</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment