Commit e34aac8c authored by Peter van 't Hof's avatar Peter van 't Hof
Browse files

Added scala docs

parent 5b3d2898
...@@ -13,6 +13,8 @@ import scala.concurrent.ExecutionContext.Implicits.global ...@@ -13,6 +13,8 @@ import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future import scala.concurrent.Future
/** /**
* This tool will collect stats from a bamfile
*
* Created by pjvanthof on 25/05/16. * Created by pjvanthof on 25/05/16.
*/ */
object BamStats extends ToolCommand { object BamStats extends ToolCommand {
...@@ -40,6 +42,7 @@ object BamStats extends ToolCommand { ...@@ -40,6 +42,7 @@ object BamStats extends ToolCommand {
} }
} }
/** This is the main entry to [[BamStats]], this will do the argument parsing. */
def main(args: Array[String]): Unit = { def main(args: Array[String]): Unit = {
val argsParser = new OptParser val argsParser = new OptParser
val cmdArgs: Args = argsParser.parse(args, Args()) getOrElse (throw new IllegalArgumentException) val cmdArgs: Args = argsParser.parse(args, Args()) getOrElse (throw new IllegalArgumentException)
...@@ -53,6 +56,10 @@ object BamStats extends ToolCommand { ...@@ -53,6 +56,10 @@ object BamStats extends ToolCommand {
logger.info("Done") logger.info("Done")
} }
/**
* This will retrieve the [[SAMSequenceDictionary]] from the bam file.
* When `referenceFasta is given he will validate this against the bam file.`
*/
def validateReferenceinBam(bamFile: File, referenceFasta: Option[File]) = { def validateReferenceinBam(bamFile: File, referenceFasta: Option[File]) = {
val samReader = SamReaderFactory.makeDefault().open(bamFile) val samReader = SamReaderFactory.makeDefault().open(bamFile)
val samHeader = samReader.getFileHeader val samHeader = samReader.getFileHeader
...@@ -66,6 +73,15 @@ object BamStats extends ToolCommand { ...@@ -66,6 +73,15 @@ object BamStats extends ToolCommand {
}.getOrElse(samHeader.getSequenceDictionary) }.getOrElse(samHeader.getSequenceDictionary)
} }
/**
* This is the main running function of [[BamStats]]. This will start the thereads and collect and write the results.
*
* @param outputDir All output files will be placed here
* @param bamFile Input bam file
* @param referenceDict Dict for scattering
* @param binSize stats binsize
* @param threadBinSize Thread binsize
*/
def init(outputDir: File, bamFile: File, referenceDict: SAMSequenceDictionary, binSize: Int, threadBinSize: Int): Unit = { def init(outputDir: File, bamFile: File, referenceDict: SAMSequenceDictionary, binSize: Int, threadBinSize: Int): Unit = {
val contigsFutures = BedRecordList.fromDict(referenceDict).allRecords.map { contig => val contigsFutures = BedRecordList.fromDict(referenceDict).allRecords.map { contig =>
Future { processContig(contig, bamFile, binSize, threadBinSize) } Future { processContig(contig, bamFile, binSize, threadBinSize) }
...@@ -86,23 +102,31 @@ object BamStats extends ToolCommand { ...@@ -86,23 +102,31 @@ object BamStats extends ToolCommand {
stats._3_ClippingHistogram.writeToTsv(new File(outputDir, "3_prime_clipping.tsv")) stats._3_ClippingHistogram.writeToTsv(new File(outputDir, "3_prime_clipping.tsv"))
} }
/**
* This will start the subjobs for each contig and collect [[Stats]] on contig level
*
* @param region Region to check, mostly yhis is the complete contig
* @param bamFile Input bam file
* @param binSize stats binsize
* @param threadBinSize Thread binsize
* @return Output stats
*/
def processContig(region: BedRecord, bamFile: File, binSize: Int, threadBinSize: Int): Stats = { def processContig(region: BedRecord, bamFile: File, binSize: Int, threadBinSize: Int): Stats = {
logger.info(s"Contig '${region.chr}' starting") val scattersFutures = region
var stats = Stats()
val scatters = region
.scatter(binSize) .scatter(binSize)
.grouped((region.length.toDouble / threadBinSize).ceil.toInt)
val scattersPerThread = (region.length.toDouble / threadBinSize).ceil.toInt .map( scatters => Future { processThread(scatters, bamFile) })
waitOnFutures(scattersFutures.toList, Some(region.chr))
val scattersFutures = scatters.grouped(scattersPerThread).map { scatters =>
Future { processThread(scatters, bamFile) }
}
waitOnFutures(scattersFutures.toList)
} }
/**
* This method will wait when all futures are complete and collect a single [[Stats]] instance
* @param futures List of futures to monitor
* @param msg Optional message for logging
* @return Output stats
*/
def waitOnFutures(futures: List[Future[Stats]], msg: Option[String] = None): Stats = { def waitOnFutures(futures: List[Future[Stats]], msg: Option[String] = None): Stats = {
msg.foreach(m => logger.info(s"Start monitoring jobs for '$m', ${futures.size} jobs"))
futures.foreach(_.onFailure { case t => throw new RuntimeException(t) }) futures.foreach(_.onFailure { case t => throw new RuntimeException(t) })
var stats = Stats() var stats = Stats()
var running = futures var running = futures
...@@ -110,11 +134,20 @@ object BamStats extends ToolCommand { ...@@ -110,11 +134,20 @@ object BamStats extends ToolCommand {
val done = running.filter(_.value.isDefined) val done = running.filter(_.value.isDefined)
done.foreach(stats += _.value.get.get) done.foreach(stats += _.value.get.get)
running = running.filterNot(done.contains(_)) running = running.filterNot(done.contains(_))
Thread.sleep(1000) if (running.nonEmpty && done.nonEmpty) msg.foreach(m => logger.info(s"Jobs for '$m', ${running.size}/${futures.size} jobs"))
if (running.nonEmpty) Thread.sleep(1000)
} }
msg.foreach(m => logger.info(s"All jobs for '$m' are done"))
stats stats
} }
/**
* This method will process 1 thread bin
*
* @param scatters bins to check
* @param bamFile Input bamfile
* @return Output stats
*/
def processThread(scatters: List[BedRecord], bamFile: File): Stats = { def processThread(scatters: List[BedRecord], bamFile: File): Stats = {
var totalStats = Stats() var totalStats = Stats()
val sortedScatters = scatters.sortBy(_.start) val sortedScatters = scatters.sortBy(_.start)
...@@ -161,6 +194,11 @@ object BamStats extends ToolCommand { ...@@ -161,6 +194,11 @@ object BamStats extends ToolCommand {
totalStats totalStats
} }
/**
* This method will only count the unmapped fragments
* @param bamFile Input bamfile
* @return Output stats
*/
def processUnmappedReads(bamFile: File): Stats = { def processUnmappedReads(bamFile: File): Stats = {
var stats = Stats() var stats = Stats()
val samReader = SamReaderFactory.makeDefault().open(bamFile) val samReader = SamReaderFactory.makeDefault().open(bamFile)
......
...@@ -16,6 +16,7 @@ case class Stats() { ...@@ -16,6 +16,7 @@ case class Stats() {
val _5_ClippingHistogram = Histogram() val _5_ClippingHistogram = Histogram()
val _3_ClippingHistogram = Histogram() val _3_ClippingHistogram = Histogram()
/** This will add an other [[Stats]] inside `this` */
def +=(other: Stats): Stats = { def +=(other: Stats): Stats = {
this.totalReads += other.totalReads this.totalReads += other.totalReads
this.unmapped += other.unmapped this.unmapped += other.unmapped
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment