diff --git a/biopet-tools/src/main/scala/nl/lumc/sasc/biopet/tools/bamstats/BamStats.scala b/biopet-tools/src/main/scala/nl/lumc/sasc/biopet/tools/bamstats/BamStats.scala index 385a6b5ce4822be725fed45b20a4cfefdfab10d7..eb21f9f4b02741281c10ec10647ef2d61007c84d 100644 --- a/biopet-tools/src/main/scala/nl/lumc/sasc/biopet/tools/bamstats/BamStats.scala +++ b/biopet-tools/src/main/scala/nl/lumc/sasc/biopet/tools/bamstats/BamStats.scala @@ -13,6 +13,8 @@ import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future /** + * This tool will collect stats from a bamfile + * * Created by pjvanthof on 25/05/16. */ object BamStats extends ToolCommand { @@ -40,6 +42,7 @@ object BamStats extends ToolCommand { } } + /** This is the main entry to [[BamStats]], this will do the argument parsing. */ def main(args: Array[String]): Unit = { val argsParser = new OptParser val cmdArgs: Args = argsParser.parse(args, Args()) getOrElse (throw new IllegalArgumentException) @@ -53,6 +56,10 @@ object BamStats extends ToolCommand { logger.info("Done") } + /** + * This will retrieve the [[SAMSequenceDictionary]] from the bam file. + * When `referenceFasta is given he will validate this against the bam file.` + */ def validateReferenceinBam(bamFile: File, referenceFasta: Option[File]) = { val samReader = SamReaderFactory.makeDefault().open(bamFile) val samHeader = samReader.getFileHeader @@ -66,6 +73,15 @@ object BamStats extends ToolCommand { }.getOrElse(samHeader.getSequenceDictionary) } + /** + * This is the main running function of [[BamStats]]. This will start the thereads and collect and write the results. + * + * @param outputDir All output files will be placed here + * @param bamFile Input bam file + * @param referenceDict Dict for scattering + * @param binSize stats binsize + * @param threadBinSize Thread binsize + */ def init(outputDir: File, bamFile: File, referenceDict: SAMSequenceDictionary, binSize: Int, threadBinSize: Int): Unit = { val contigsFutures = BedRecordList.fromDict(referenceDict).allRecords.map { contig => Future { processContig(contig, bamFile, binSize, threadBinSize) } @@ -86,23 +102,31 @@ object BamStats extends ToolCommand { stats._3_ClippingHistogram.writeToTsv(new File(outputDir, "3_prime_clipping.tsv")) } + /** + * This will start the subjobs for each contig and collect [[Stats]] on contig level + * + * @param region Region to check, mostly yhis is the complete contig + * @param bamFile Input bam file + * @param binSize stats binsize + * @param threadBinSize Thread binsize + * @return Output stats + */ def processContig(region: BedRecord, bamFile: File, binSize: Int, threadBinSize: Int): Stats = { - logger.info(s"Contig '${region.chr}' starting") - var stats = Stats() - - val scatters = region + val scattersFutures = region .scatter(binSize) - - val scattersPerThread = (region.length.toDouble / threadBinSize).ceil.toInt - - val scattersFutures = scatters.grouped(scattersPerThread).map { scatters => - Future { processThread(scatters, bamFile) } - } - - waitOnFutures(scattersFutures.toList) + .grouped((region.length.toDouble / threadBinSize).ceil.toInt) + .map( scatters => Future { processThread(scatters, bamFile) }) + waitOnFutures(scattersFutures.toList, Some(region.chr)) } + /** + * This method will wait when all futures are complete and collect a single [[Stats]] instance + * @param futures List of futures to monitor + * @param msg Optional message for logging + * @return Output stats + */ def waitOnFutures(futures: List[Future[Stats]], msg: Option[String] = None): Stats = { + msg.foreach(m => logger.info(s"Start monitoring jobs for '$m', ${futures.size} jobs")) futures.foreach(_.onFailure { case t => throw new RuntimeException(t) }) var stats = Stats() var running = futures @@ -110,11 +134,20 @@ object BamStats extends ToolCommand { val done = running.filter(_.value.isDefined) done.foreach(stats += _.value.get.get) running = running.filterNot(done.contains(_)) - Thread.sleep(1000) + if (running.nonEmpty && done.nonEmpty) msg.foreach(m => logger.info(s"Jobs for '$m', ${running.size}/${futures.size} jobs")) + if (running.nonEmpty) Thread.sleep(1000) } + msg.foreach(m => logger.info(s"All jobs for '$m' are done")) stats } + /** + * This method will process 1 thread bin + * + * @param scatters bins to check + * @param bamFile Input bamfile + * @return Output stats + */ def processThread(scatters: List[BedRecord], bamFile: File): Stats = { var totalStats = Stats() val sortedScatters = scatters.sortBy(_.start) @@ -161,6 +194,11 @@ object BamStats extends ToolCommand { totalStats } + /** + * This method will only count the unmapped fragments + * @param bamFile Input bamfile + * @return Output stats + */ def processUnmappedReads(bamFile: File): Stats = { var stats = Stats() val samReader = SamReaderFactory.makeDefault().open(bamFile) diff --git a/biopet-tools/src/main/scala/nl/lumc/sasc/biopet/tools/bamstats/Stats.scala b/biopet-tools/src/main/scala/nl/lumc/sasc/biopet/tools/bamstats/Stats.scala index cc3e72f898d3969cc1214fc680d891add5d00680..91db3220fc944cb726d1a5aefec95f73c086d922 100644 --- a/biopet-tools/src/main/scala/nl/lumc/sasc/biopet/tools/bamstats/Stats.scala +++ b/biopet-tools/src/main/scala/nl/lumc/sasc/biopet/tools/bamstats/Stats.scala @@ -16,6 +16,7 @@ case class Stats() { val _5_ClippingHistogram = Histogram() val _3_ClippingHistogram = Histogram() + /** This will add an other [[Stats]] inside `this` */ def +=(other: Stats): Stats = { this.totalReads += other.totalReads this.unmapped += other.unmapped