Commit 196fc34e authored by bow's avatar bow

Merge branch 'feature-bamstats' into 'develop'

Feature bamstats

Fixes #308 

- [x] Multi sample base with scala futures
- [x] Base stats like total number of reads
- [x] mapping quality histogram
- [x] insertsize histogram
- [x] Clipping stats

- [x] Add unit testing

See merge request !435
parents 4ddb5a93 351508ec
......@@ -18,12 +18,13 @@ import java.io.File
import nl.lumc.sasc.biopet.core.ToolCommandFunction
import nl.lumc.sasc.biopet.core.summary.Summarizable
import nl.lumc.sasc.biopet.tools.flagstat
import nl.lumc.sasc.biopet.utils.ConfigUtils
import nl.lumc.sasc.biopet.utils.config.Configurable
import org.broadinstitute.gatk.utils.commandline.{ Input, Output }
class BiopetFlagstat(val root: Configurable) extends ToolCommandFunction with Summarizable {
def toolObject = nl.lumc.sasc.biopet.tools.BiopetFlagstat
def toolObject = flagstat.BiopetFlagstat
@Input(doc = "Input bam", shortName = "input", required = true)
var input: File = _
......
......@@ -22,10 +22,11 @@ object BiopetToolsExecutable extends BiopetExecutable {
def tools: List[MainCommand] = List(
nl.lumc.sasc.biopet.tools.AnnotateVcfWithBed,
nl.lumc.sasc.biopet.tools.bamstats.BamStats,
nl.lumc.sasc.biopet.tools.BaseCounter,
nl.lumc.sasc.biopet.tools.BastyGenerateFasta,
nl.lumc.sasc.biopet.tools.BedtoolsCoverageToCounts,
nl.lumc.sasc.biopet.tools.BiopetFlagstat,
nl.lumc.sasc.biopet.tools.flagstat.BiopetFlagstat,
nl.lumc.sasc.biopet.tools.CheckAllelesVcfInBam,
nl.lumc.sasc.biopet.tools.ExtractAlignedFastq,
nl.lumc.sasc.biopet.tools.FastqSplitter,
......
/**
* Biopet is built on top of GATK Queue for building bioinformatic
* pipelines. It is mainly intended to support LUMC SHARK cluster which is running
* SGE. But other types of HPC that are supported by GATK Queue (such as PBS)
* should also be able to execute Biopet tools and pipelines.
*
* Copyright 2014 Sequencing Analysis Support Core - Leiden University Medical Center
*
* Contact us at: sasc@lumc.nl
*
* A dual licensing mode is applied. The source code within this project is freely available for non-commercial use under an AGPL
* license; For commercial users or users who do not want to follow the AGPL
* license, please contact us to obtain a separate license.
*/
package nl.lumc.sasc.biopet.tools
import java.io.{ File, PrintWriter }
import htsjdk.samtools.{ SAMRecord, SamReaderFactory }
import nl.lumc.sasc.biopet.utils.{ ToolCommand, ConfigUtils }
import scala.collection.JavaConversions._
import scala.collection.mutable
object BiopetFlagstat extends ToolCommand {
case class Args(inputFile: File = null,
outputFile: Option[File] = None,
summaryFile: Option[File] = None,
region: Option[String] = None) extends AbstractArgs
class OptParser extends AbstractOptParser {
opt[File]('I', "inputFile") required () valueName "<file>" action { (x, c) =>
c.copy(inputFile = x)
} text "input bam file"
opt[File]('o', "outputFile") valueName "<file>" action { (x, c) =>
c.copy(outputFile = Some(x))
} text "output file"
opt[File]('s', "summaryFile") valueName "<file>" action { (x, c) =>
c.copy(summaryFile = Some(x))
} text "summary output file"
opt[String]('r', "region") valueName "<chr:start-stop>" action { (x, c) =>
c.copy(region = Some(x))
} text "Region to calculate the statistics on"
}
/**
* @param args the command line arguments
*/
def main(args: Array[String]): Unit = {
val argsParser = new OptParser
val commandArgs: Args = argsParser.parse(args, Args()) getOrElse (throw new IllegalArgumentException)
val inputSam = SamReaderFactory.makeDefault.open(commandArgs.inputFile)
val iterSam = if (commandArgs.region.isEmpty) inputSam.iterator else {
val regionRegex = """(.*):(.*)-(.*)""".r
commandArgs.region.get match {
case regionRegex(chr, start, stop) => inputSam.query(chr, start.toInt, stop.toInt, false)
case _ => sys.error("Region wrong format")
}
}
val flagstatCollector = new FlagstatCollector
flagstatCollector.loadDefaultFunctions()
val m = 10
val max = 60
for (t <- 0 to (max / m))
flagstatCollector.addFunction("MAPQ>" + (t * m), record => record.getMappingQuality > (t * m))
flagstatCollector.addFunction("First normal, second read inverted (paired end orientation)", record => {
if (record.getReadPairedFlag &&
record.getReferenceIndex == record.getMateReferenceIndex && record.getReadNegativeStrandFlag != record.getMateNegativeStrandFlag &&
((record.getFirstOfPairFlag && !record.getReadNegativeStrandFlag && record.getAlignmentStart < record.getMateAlignmentStart) ||
(record.getFirstOfPairFlag && record.getReadNegativeStrandFlag && record.getAlignmentStart > record.getMateAlignmentStart) ||
(record.getSecondOfPairFlag && !record.getReadNegativeStrandFlag && record.getAlignmentStart < record.getMateAlignmentStart) ||
(record.getSecondOfPairFlag && record.getReadNegativeStrandFlag && record.getAlignmentStart > record.getMateAlignmentStart))) true
else false
})
flagstatCollector.addFunction("First normal, second read normal", record => {
if (record.getReadPairedFlag &&
record.getReferenceIndex == record.getMateReferenceIndex && record.getReadNegativeStrandFlag == record.getMateNegativeStrandFlag &&
((record.getFirstOfPairFlag && !record.getReadNegativeStrandFlag && record.getAlignmentStart < record.getMateAlignmentStart) ||
(record.getFirstOfPairFlag && record.getReadNegativeStrandFlag && record.getAlignmentStart > record.getMateAlignmentStart) ||
(record.getSecondOfPairFlag && record.getReadNegativeStrandFlag && record.getAlignmentStart < record.getMateAlignmentStart) ||
(record.getSecondOfPairFlag && !record.getReadNegativeStrandFlag && record.getAlignmentStart > record.getMateAlignmentStart))) true
else false
})
flagstatCollector.addFunction("First inverted, second read inverted", record => {
if (record.getReadPairedFlag &&
record.getReferenceIndex == record.getMateReferenceIndex && record.getReadNegativeStrandFlag == record.getMateNegativeStrandFlag &&
((record.getFirstOfPairFlag && record.getReadNegativeStrandFlag && record.getAlignmentStart < record.getMateAlignmentStart) ||
(record.getFirstOfPairFlag && !record.getReadNegativeStrandFlag && record.getAlignmentStart > record.getMateAlignmentStart) ||
(record.getSecondOfPairFlag && !record.getReadNegativeStrandFlag && record.getAlignmentStart < record.getMateAlignmentStart) ||
(record.getSecondOfPairFlag && record.getReadNegativeStrandFlag && record.getAlignmentStart > record.getMateAlignmentStart))) true
else false
})
flagstatCollector.addFunction("First inverted, second read normal", record => {
if (record.getReadPairedFlag &&
record.getReferenceIndex == record.getMateReferenceIndex && record.getReadNegativeStrandFlag != record.getMateNegativeStrandFlag &&
((record.getFirstOfPairFlag && record.getReadNegativeStrandFlag && record.getAlignmentStart < record.getMateAlignmentStart) ||
(record.getFirstOfPairFlag && !record.getReadNegativeStrandFlag && record.getAlignmentStart > record.getMateAlignmentStart) ||
(record.getSecondOfPairFlag && record.getReadNegativeStrandFlag && record.getAlignmentStart < record.getMateAlignmentStart) ||
(record.getSecondOfPairFlag && !record.getReadNegativeStrandFlag && record.getAlignmentStart > record.getMateAlignmentStart))) true
else false
})
flagstatCollector.addFunction("Mate in same strand", record => record.getReadPairedFlag && record.getReadNegativeStrandFlag && record.getMateNegativeStrandFlag &&
record.getReferenceIndex == record.getMateReferenceIndex)
flagstatCollector.addFunction("Mate on other chr", record => record.getReadPairedFlag && record.getReferenceIndex != record.getMateReferenceIndex)
logger.info("Start reading file: " + commandArgs.inputFile)
for (record <- iterSam) {
if (flagstatCollector.readsCount % 1e6 == 0 && flagstatCollector.readsCount > 0)
logger.info("Reads prosessed: " + flagstatCollector.readsCount)
flagstatCollector.loadRecord(record)
}
commandArgs.summaryFile.foreach {
case file =>
val writer = new PrintWriter(file)
writer.println(flagstatCollector.summary)
writer.close()
}
commandArgs.outputFile match {
case Some(file) => {
val writer = new PrintWriter(file)
writer.println(flagstatCollector.report)
writer.close()
}
case _ => println(flagstatCollector.report)
}
}
class FlagstatCollector {
private var functionCount = 0
var readsCount = 0
private val names: mutable.Map[Int, String] = mutable.Map()
private var functions: Array[SAMRecord => Boolean] = Array()
private var totalCounts: Array[Long] = Array()
private var crossCounts = Array.ofDim[Long](1, 1)
def loadDefaultFunctions() {
addFunction("All", record => true)
addFunction("Mapped", record => !record.getReadUnmappedFlag)
addFunction("Duplicates", record => record.getDuplicateReadFlag)
addFunction("FirstOfPair", record => if (record.getReadPairedFlag) record.getFirstOfPairFlag else false)
addFunction("SecondOfPair", record => if (record.getReadPairedFlag) record.getSecondOfPairFlag else false)
addFunction("ReadNegativeStrand", record => record.getReadNegativeStrandFlag)
addFunction("NotPrimaryAlignment", record => record.getNotPrimaryAlignmentFlag)
addFunction("ReadPaired", record => record.getReadPairedFlag)
addFunction("ProperPair", record => if (record.getReadPairedFlag) record.getProperPairFlag else false)
addFunction("MateNegativeStrand", record => if (record.getReadPairedFlag) record.getMateNegativeStrandFlag else false)
addFunction("MateUnmapped", record => if (record.getReadPairedFlag) record.getMateUnmappedFlag else false)
addFunction("ReadFailsVendorQualityCheck", record => record.getReadFailsVendorQualityCheckFlag)
addFunction("SupplementaryAlignment", record => record.getSupplementaryAlignmentFlag)
addFunction("SecondaryOrSupplementary", record => record.isSecondaryOrSupplementary)
}
def loadRecord(record: SAMRecord) {
readsCount += 1
val values: Array[Boolean] = new Array(names.size)
for (t <- 0 until names.size) {
values(t) = functions(t)(record)
if (values(t)) {
totalCounts(t) += 1
}
}
for (t <- 0 until names.size) {
for (t2 <- 0 until names.size) {
if (values(t) && values(t2)) {
crossCounts(t)(t2) += 1
}
}
}
}
def addFunction(name: String, function: SAMRecord => Boolean) {
functionCount += 1
crossCounts = Array.ofDim[Long](functionCount, functionCount)
totalCounts = new Array[Long](functionCount)
val temp = new Array[SAMRecord => Boolean](functionCount)
for (t <- 0 until (temp.length - 1)) temp(t) = functions(t)
functions = temp
val index = functionCount - 1
names += (index -> name)
functions(index) = function
totalCounts(index) = 0
}
def report: String = {
val buffer = new StringBuilder
buffer.append("Number\tTotal Flags\tFraction\tName\n")
for (t <- 0 until names.size) {
val precentage = (totalCounts(t).toFloat / readsCount) * 100
buffer.append("#" + (t + 1) + "\t" + totalCounts(t) + "\t" + f"$precentage%.4f" + "%\t" + names(t) + "\n")
}
buffer.append("\n")
buffer.append(crossReport() + "\n")
buffer.append(crossReport(fraction = true) + "\n")
buffer.toString()
}
def summary: String = {
val map = (for (t <- 0 until names.size) yield {
names(t) -> totalCounts(t)
}).toMap ++ Map("Singletons" -> crossCounts(names.find(_._2 == "Mapped").map(_._1).getOrElse(-1))(names.find(_._2 == "MateUnmapped").map(_._1).getOrElse(-1))
)
ConfigUtils.mapToJson(map).spaces4
}
def crossReport(fraction: Boolean = false): String = {
val buffer = new StringBuilder
for (t <- 0 until names.size) // Header for table
buffer.append("\t#" + (t + 1))
buffer.append("\n")
for (t <- 0 until names.size) {
buffer.append("#" + (t + 1) + "\t")
for (t2 <- 0 until names.size) {
val reads = crossCounts(t)(t2)
if (fraction) {
val precentage = (reads.toFloat / totalCounts(t).toFloat) * 100
buffer.append(f"$precentage%.4f" + "%")
} else buffer.append(reads)
if (t2 == names.size - 1) buffer.append("\n")
else buffer.append("\t")
}
}
buffer.toString()
}
} // End of class
}
package nl.lumc.sasc.biopet.tools.bamstats
import java.io.File
import java.util.concurrent.TimeoutException
import htsjdk.samtools.reference.FastaSequenceFile
import htsjdk.samtools.{ SAMSequenceDictionary, SamReaderFactory }
import nl.lumc.sasc.biopet.utils.BamUtils.SamDictCheck
import nl.lumc.sasc.biopet.utils.ToolCommand
import nl.lumc.sasc.biopet.utils.intervals.{ BedRecord, BedRecordList }
import scala.collection.JavaConversions._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.concurrent.{ Await, Future }
import scala.language.postfixOps
/**
* This tool will collect stats from a bamfile
*
* Created by pjvanthof on 25/05/16.
*/
object BamStats extends ToolCommand {
case class Args(outputDir: File = null,
bamFile: File = null,
referenceFasta: Option[File] = None,
binSize: Int = 10000,
threadBinSize: Int = 10000000) extends AbstractArgs
class OptParser extends AbstractOptParser {
opt[File]('R', "reference") valueName "<file>" action { (x, c) =>
c.copy(referenceFasta = Some(x))
} text "Fasta file of reference"
opt[File]('o', "outputDir") required () valueName "<directory>" action { (x, c) =>
c.copy(outputDir = x)
} text "Output directory"
opt[File]('b', "bam") required () valueName "<file>" action { (x, c) =>
c.copy(bamFile = x)
} text "Input bam file"
opt[Int]("binSize") valueName "<int>" action { (x, c) =>
c.copy(binSize = x)
} text "Bin size of stats (beta)"
opt[Int]("threadBinSize") valueName "<int>" action { (x, c) =>
c.copy(threadBinSize = x)
} text "Size of region per thread"
}
/** This is the main entry to [[BamStats]], this will do the argument parsing. */
def main(args: Array[String]): Unit = {
val argsParser = new OptParser
val cmdArgs: Args = argsParser.parse(args, Args()) getOrElse (throw new IllegalArgumentException)
logger.info("Start")
val sequenceDict = validateReferenceInBam(cmdArgs.bamFile, cmdArgs.referenceFasta)
init(cmdArgs.outputDir, cmdArgs.bamFile, sequenceDict, cmdArgs.binSize, cmdArgs.threadBinSize)
logger.info("Done")
}
/**
* This will retrieve the [[SAMSequenceDictionary]] from the bam file.
* When `referenceFasta is given he will validate this against the bam file.`
*/
def validateReferenceInBam(bamFile: File, referenceFasta: Option[File]) = {
val samReader = SamReaderFactory.makeDefault().open(bamFile)
val samHeader = samReader.getFileHeader
samReader.close()
referenceFasta.map { f =>
val referenceReader = new FastaSequenceFile(f, true)
val referenceDict = referenceReader.getSequenceDictionary
samHeader.getSequenceDictionary.assertSameDictionary(referenceDict, false)
referenceReader.close()
referenceDict
}.getOrElse(samHeader.getSequenceDictionary)
}
/**
* This is the main running function of [[BamStats]]. This will start the threads and collect and write the results.
*
* @param outputDir All output files will be placed here
* @param bamFile Input bam file
* @param referenceDict Dict for scattering
* @param binSize stats binsize
* @param threadBinSize Thread binsize
*/
def init(outputDir: File, bamFile: File, referenceDict: SAMSequenceDictionary, binSize: Int, threadBinSize: Int): Unit = {
val contigsFutures = BedRecordList.fromDict(referenceDict).allRecords.map { contig =>
processContig(contig, bamFile, binSize, threadBinSize, outputDir)
}
val stats = waitOnFutures(processUnmappedReads(bamFile) :: contigsFutures.toList)
stats.writeStatsToFiles(outputDir)
}
/**
* This will start the subjobs for each contig and collect [[Stats]] on contig level
*
* @param region Region to check, mostly yhis is the complete contig
* @param bamFile Input bam file
* @param binSize stats binsize
* @param threadBinSize Thread binsize
* @return Output stats
*/
def processContig(region: BedRecord, bamFile: File, binSize: Int, threadBinSize: Int, outputDir: File): Future[Stats] = Future {
val scattersFutures = region
.scatter(binSize)
.grouped((region.length.toDouble / binSize).ceil.toInt / (region.length.toDouble / threadBinSize).ceil.toInt)
.map(scatters => processThread(scatters, bamFile))
.toList
val stats = waitOnFutures(scattersFutures, Some(region.chr))
val contigDir = new File(outputDir, "contigs" + File.separator + region.chr)
contigDir.mkdirs()
stats.writeStatsToFiles(contigDir)
stats
}
/**
* This method will wait when all futures are complete and collect a single [[Stats]] instance
*
* @param futures List of futures to monitor
* @param msg Optional message for logging
* @return Output stats
*/
def waitOnFutures(futures: List[Future[Stats]], msg: Option[String] = None): Stats = {
msg.foreach(m => logger.info(s"Start monitoring jobs for '$m', ${futures.size} jobs"))
futures.foreach(_.onFailure { case t => throw new RuntimeException(t) })
val stats = Await.result(Future.sequence(futures).map(_.fold(Stats())(_ += _)), Duration.Inf)
msg.foreach(m => logger.info(s"All jobs for '$m' are done"))
stats
}
/**
* This method will process 1 thread bin
*
* @param scatters bins to check, there should be no gaps withing the scatters
* @param bamFile Input bamfile
* @return Output stats
*/
def processThread(scatters: List[BedRecord], bamFile: File): Future[Stats] = Future {
val totalStats = Stats()
val sortedScatters = scatters.sortBy(_.start)
val samReader = SamReaderFactory.makeDefault().open(bamFile)
val threadChr = sortedScatters.head.chr
val threadStart = sortedScatters.head.start
val threadEnd = sortedScatters.last.end
val it = samReader.query(threadChr, threadStart, threadEnd, false).buffered
for (samRecord <- it) {
// Read based stats
if (samRecord.getAlignmentStart > threadStart && samRecord.getAlignmentStart <= threadEnd) {
totalStats.flagstat.loadRecord(samRecord)
if (!samRecord.getReadUnmappedFlag) { // Mapped read
totalStats.mappingQualityHistogram.add(samRecord.getMappingQuality)
}
if (samRecord.getProperPairFlag && samRecord.getFirstOfPairFlag && !samRecord.getSecondOfPairFlag)
totalStats.insertSizeHistogram.add(samRecord.getInferredInsertSize.abs)
val leftClipping = samRecord.getAlignmentStart - samRecord.getUnclippedStart
val rightClipping = samRecord.getUnclippedEnd - samRecord.getAlignmentEnd
totalStats.clippingHistogram.add(leftClipping + rightClipping)
totalStats.leftClippingHistogram.add(leftClipping)
totalStats.rightClippingHistogram.add(rightClipping)
if (samRecord.getReadNegativeStrandFlag) {
totalStats._5_ClippingHistogram.add(leftClipping)
totalStats._3_ClippingHistogram.add(rightClipping)
} else {
totalStats._5_ClippingHistogram.add(rightClipping)
totalStats._3_ClippingHistogram.add(leftClipping)
}
//TODO: Bin Support
}
//TODO: bases counting
}
samReader.close()
totalStats
}
/**
* This method will only count the unmapped fragments
*
* @param bamFile Input bamfile
* @return Output stats
*/
def processUnmappedReads(bamFile: File): Future[Stats] = Future {
val stats = Stats()
val samReader = SamReaderFactory.makeDefault().open(bamFile)
for (samRecord <- samReader.queryUnmapped()) {
stats.flagstat.loadRecord(samRecord)
}
samReader.close()
stats
}
}
package nl.lumc.sasc.biopet.tools.bamstats
import java.io.{ File, PrintWriter }
import scala.collection.mutable
/**
* Created by pjvanthof on 05/07/16.
*/
class Counts[T](_counts: Map[T, Long] = Map[T, Long]())(implicit ord: Ordering[T]) {
protected[Counts] val counts: mutable.Map[T, Long] = mutable.Map() ++ _counts
/** Returns histogram as map */
def countsMap = counts.toMap
/** Returns value if it does exist */
def get(key: T) = counts.get(key)
/** This will add an other histogram to `this` */
def +=(other: Counts[T]): Counts[T] = {
other.counts.foreach(x => this.counts += x._1 -> (this.counts.getOrElse(x._1, 0L) + x._2))
this
}
/** With this a value can be added to the histogram */
def add(value: T): Unit = {
counts += value -> (counts.getOrElse(value, 0L) + 1)
}
/** Write histogram to a tsv/count file */
def writeToTsv(file: File): Unit = {
val writer = new PrintWriter(file)
writer.println("value\tcount")
counts.keys.toList.sorted.foreach(x => writer.println(s"$x\t${counts(x)}"))
writer.close()
}
override def equals(other: Any): Boolean = {
other match {
case c: Counts[T] => this.counts == c.counts
case _ => false
}
}
}
class Histogram[T](_counts: Map[T, Long] = Map[T, Long]())(implicit ord: Numeric[T]) extends Counts[T](_counts) {
}
package nl.lumc.sasc.biopet.tools.bamstats
import java.io.File
import nl.lumc.sasc.biopet.tools.flagstat.FlagstatCollector
/**
* Created by pjvanthof on 05/07/16.
*/
case class Stats(flagstat: FlagstatCollector = new FlagstatCollector(),
mappingQualityHistogram: Histogram[Int] = new Histogram[Int](),
insertSizeHistogram: Histogram[Int] = new Histogram[Int](),
clippingHistogram: Histogram[Int] = new Histogram[Int](),
leftClippingHistogram: Histogram[Int] = new Histogram[Int](),
rightClippingHistogram: Histogram[Int] = new Histogram[Int](),
_5_ClippingHistogram: Histogram[Int] = new Histogram[Int](),
_3_ClippingHistogram: Histogram[Int] = new Histogram[Int]()) {
flagstat.loadDefaultFunctions()
flagstat.loadQualityFunctions(1, 0)
flagstat.loadOrientationFunctions
/** This will add an other [[Stats]] inside `this` */
def +=(other: Stats): Stats = {
this.flagstat += other.flagstat
this.mappingQualityHistogram += other.mappingQualityHistogram
this.insertSizeHistogram += other.insertSizeHistogram
this.clippingHistogram += other.clippingHistogram
this.leftClippingHistogram += other.leftClippingHistogram
this.rightClippingHistogram += other.rightClippingHistogram
this</