BamStats.scala 9.08 KB
Newer Older
Peter van 't Hof's avatar
Peter van 't Hof committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/**
 * Biopet is built on top of GATK Queue for building bioinformatic
 * pipelines. It is mainly intended to support LUMC SHARK cluster which is running
 * SGE. But other types of HPC that are supported by GATK Queue (such as PBS)
 * should also be able to execute Biopet tools and pipelines.
 *
 * Copyright 2014 Sequencing Analysis Support Core - Leiden University Medical Center
 *
 * Contact us at: sasc@lumc.nl
 *
 * A dual licensing mode is applied. The source code within this project is freely available for non-commercial use under an AGPL
 * license; For commercial users or users who do not want to follow the AGPL
 * license, please contact us to obtain a separate license.
 */
Peter van 't Hof's avatar
Peter van 't Hof committed
15 16
package nl.lumc.sasc.biopet.tools.bamstats

Peter van 't Hof's avatar
Peter van 't Hof committed
17
import java.io.{ File, PrintWriter }
Peter van 't Hof's avatar
Peter van 't Hof committed
18

Peter van 't Hof's avatar
Peter van 't Hof committed
19
import htsjdk.samtools.{ SAMSequenceDictionary, SamReaderFactory }
Peter van 't Hof's avatar
Peter van 't Hof committed
20
import nl.lumc.sasc.biopet.utils.BamUtils.SamDictCheck
Peter van 't Hof's avatar
Peter van 't Hof committed
21 22
import nl.lumc.sasc.biopet.utils.{ ConfigUtils, FastaUtils, ToolCommand }
import nl.lumc.sasc.biopet.utils.intervals.{ BedRecord, BedRecordList }
Peter van 't Hof's avatar
Peter van 't Hof committed
23 24

import scala.collection.JavaConversions._
Peter van 't Hof's avatar
Peter van 't Hof committed
25
import scala.collection.mutable.ArrayBuffer
Peter van 't Hof's avatar
Peter van 't Hof committed
26
import scala.concurrent.ExecutionContext.Implicits.global
Peter van 't Hof's avatar
Peter van 't Hof committed
27
import scala.concurrent.duration._
Peter van 't Hof's avatar
Peter van 't Hof committed
28
import scala.concurrent.{ Await, Future }
Peter van 't Hof's avatar
Peter van 't Hof committed
29
import scala.io.Source
Peter van 't Hof's avatar
Peter van 't Hof committed
30
import scala.language.postfixOps
Peter van 't Hof's avatar
Peter van 't Hof committed
31 32

/**
Peter van 't Hof's avatar
Peter van 't Hof committed
33 34
 * This tool will collect stats from a bamfile
 *
Peter van 't Hof's avatar
Peter van 't Hof committed
35 36
 * Created by pjvanthof on 25/05/16.
 */
Peter van 't Hof's avatar
Peter van 't Hof committed
37 38 39 40 41 42 43 44
object BamStats extends ToolCommand {
  case class Args(outputDir: File = null,
                  bamFile: File = null,
                  referenceFasta: Option[File] = None,
                  binSize: Int = 10000,
                  threadBinSize: Int = 10000000) extends AbstractArgs

  class OptParser extends AbstractOptParser {
Peter van 't Hof's avatar
Peter van 't Hof committed
45
    opt[File]('R', "reference") valueName "<file>" action { (x, c) =>
Peter van 't Hof's avatar
Peter van 't Hof committed
46
      c.copy(referenceFasta = Some(x))
Peter van 't Hof's avatar
Peter van 't Hof committed
47
    } text "Fasta file of reference"
Peter van 't Hof's avatar
Peter van 't Hof committed
48 49
    opt[File]('o', "outputDir") required () valueName "<directory>" action { (x, c) =>
      c.copy(outputDir = x)
Peter van 't Hof's avatar
Peter van 't Hof committed
50
    } text "Output directory"
Peter van 't Hof's avatar
Peter van 't Hof committed
51 52
    opt[File]('b', "bam") required () valueName "<file>" action { (x, c) =>
      c.copy(bamFile = x)
Peter van 't Hof's avatar
Peter van 't Hof committed
53
    } text "Input bam file"
Peter van 't Hof's avatar
Peter van 't Hof committed
54 55
    opt[Int]("binSize") valueName "<int>" action { (x, c) =>
      c.copy(binSize = x)
Peter van 't Hof's avatar
Peter van 't Hof committed
56
    } text "Bin size of stats (beta)"
Peter van 't Hof's avatar
Peter van 't Hof committed
57 58
    opt[Int]("threadBinSize") valueName "<int>" action { (x, c) =>
      c.copy(threadBinSize = x)
Peter van 't Hof's avatar
Peter van 't Hof committed
59
    } text "Size of region per thread"
Peter van 't Hof's avatar
Peter van 't Hof committed
60 61
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
62
  /** This is the main entry to [[BamStats]], this will do the argument parsing. */
Peter van 't Hof's avatar
Peter van 't Hof committed
63 64 65 66
  def main(args: Array[String]): Unit = {
    val argsParser = new OptParser
    val cmdArgs: Args = argsParser.parse(args, Args()) getOrElse (throw new IllegalArgumentException)

67 68
    logger.info("Start")

Peter van 't Hof's avatar
Peter van 't Hof committed
69
    val sequenceDict = validateReferenceInBam(cmdArgs.bamFile, cmdArgs.referenceFasta)
Peter van 't Hof's avatar
Peter van 't Hof committed
70 71

    init(cmdArgs.outputDir, cmdArgs.bamFile, sequenceDict, cmdArgs.binSize, cmdArgs.threadBinSize)
72 73

    logger.info("Done")
Peter van 't Hof's avatar
Peter van 't Hof committed
74 75
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
76 77 78 79
  /**
   * This will retrieve the [[SAMSequenceDictionary]] from the bam file.
   * When `referenceFasta is given he will validate this against the bam file.`
   */
Peter van 't Hof's avatar
Peter van 't Hof committed
80
  def validateReferenceInBam(bamFile: File, referenceFasta: Option[File]) = {
Peter van 't Hof's avatar
Peter van 't Hof committed
81 82 83 84
    val samReader = SamReaderFactory.makeDefault().open(bamFile)
    val samHeader = samReader.getFileHeader
    samReader.close()
    referenceFasta.map { f =>
Peter van 't Hof's avatar
Peter van 't Hof committed
85 86
      samHeader.getSequenceDictionary.assertSameDictionary(FastaUtils.getCachedDict(f), false)
      FastaUtils.getCachedDict(f)
Peter van 't Hof's avatar
Peter van 't Hof committed
87 88 89
    }.getOrElse(samHeader.getSequenceDictionary)
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
90
  /**
Peter van 't Hof's avatar
Peter van 't Hof committed
91
   * This is the main running function of [[BamStats]]. This will start the threads and collect and write the results.
Peter van 't Hof's avatar
Peter van 't Hof committed
92 93 94 95 96 97 98
   *
   * @param outputDir All output files will be placed here
   * @param bamFile Input bam file
   * @param referenceDict Dict for scattering
   * @param binSize stats binsize
   * @param threadBinSize Thread binsize
   */
Peter van 't Hof's avatar
Peter van 't Hof committed
99 100
  def init(outputDir: File, bamFile: File, referenceDict: SAMSequenceDictionary, binSize: Int, threadBinSize: Int): Unit = {
    val contigsFutures = BedRecordList.fromDict(referenceDict).allRecords.map { contig =>
Peter van 't Hof's avatar
Peter van 't Hof committed
101 102
      contig.chr -> processContig(contig, bamFile, binSize, threadBinSize, outputDir)
    }.toList
Peter van 't Hof's avatar
Peter van 't Hof committed
103

Peter van 't Hof's avatar
Peter van 't Hof committed
104
    val stats = waitOnFutures(processUnmappedReads(bamFile) :: contigsFutures.map(_._2))
105

Peter van 't Hof's avatar
Peter van 't Hof committed
106 107
    val statsWriter = new PrintWriter(new File(outputDir, "bamstats.json"))
    val totalStats = stats.toSummaryMap
Peter van 't Hof's avatar
Peter van 't Hof committed
108
    val statsMap = Map(
Peter van 't Hof's avatar
Peter van 't Hof committed
109
      "total" -> totalStats,
Peter van 't Hof's avatar
Peter van 't Hof committed
110
      "contigs" -> contigsFutures.map(x => x._1 -> Await.result(x._2, Duration.Zero).toSummaryMap).toMap
Peter van 't Hof's avatar
Peter van 't Hof committed
111
    )
Peter van 't Hof's avatar
Peter van 't Hof committed
112
    statsWriter.println(ConfigUtils.mapToJson(statsMap).nospaces)
Peter van 't Hof's avatar
Peter van 't Hof committed
113
    statsWriter.close()
Peter van 't Hof's avatar
Peter van 't Hof committed
114 115

    val summaryWriter = new PrintWriter(new File(outputDir, "bamstats.summary.json"))
Peter van 't Hof's avatar
Peter van 't Hof committed
116
    summaryWriter.println(ConfigUtils.mapToJson(totalStats).nospaces)
Peter van 't Hof's avatar
Peter van 't Hof committed
117
    summaryWriter.close()
Peter van 't Hof's avatar
Peter van 't Hof committed
118 119
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
120
  /**
Peter van 't Hof's avatar
Peter van 't Hof committed
121 122 123 124 125 126 127 128
   * This will start the subjobs for each contig and collect [[Stats]] on contig level
   *
   * @param region Region to check, mostly yhis is the complete contig
   * @param bamFile Input bam file
   * @param binSize stats binsize
   * @param threadBinSize Thread binsize
   * @return Output stats
   */
Peter van 't Hof's avatar
Peter van 't Hof committed
129
  def processContig(region: BedRecord, bamFile: File, binSize: Int, threadBinSize: Int, outputDir: File): Future[Stats] = Future {
Peter van 't Hof's avatar
Peter van 't Hof committed
130
    val scattersFutures = region
Peter van 't Hof's avatar
Peter van 't Hof committed
131
      .scatter(binSize)
Peter van 't Hof's avatar
Peter van 't Hof committed
132
      .grouped((region.length.toDouble / binSize).ceil.toInt / (region.length.toDouble / threadBinSize).ceil.toInt)
Peter van 't Hof's avatar
Peter van 't Hof committed
133
      .map(scatters => processThread(scatters, bamFile))
Peter van 't Hof's avatar
Peter van 't Hof committed
134
      .toList
Peter van 't Hof's avatar
Peter van 't Hof committed
135
    waitOnFutures(scattersFutures, Some(region.chr))
Peter van 't Hof's avatar
Peter van 't Hof committed
136
  }
Peter van 't Hof's avatar
Peter van 't Hof committed
137

Peter van 't Hof's avatar
Peter van 't Hof committed
138
  /**
Peter van 't Hof's avatar
Peter van 't Hof committed
139
   * This method will wait when all futures are complete and collect a single [[Stats]] instance
Peter van 't Hof's avatar
Peter van 't Hof committed
140
   *
Peter van 't Hof's avatar
Peter van 't Hof committed
141 142 143 144
   * @param futures List of futures to monitor
   * @param msg Optional message for logging
   * @return Output stats
   */
Peter van 't Hof's avatar
Peter van 't Hof committed
145
  def waitOnFutures(futures: List[Future[Stats]], msg: Option[String] = None): Stats = {
Peter van 't Hof's avatar
Peter van 't Hof committed
146
    msg.foreach(m => logger.info(s"Start monitoring jobs for '$m', ${futures.size} jobs"))
Peter van 't Hof's avatar
Peter van 't Hof committed
147
    futures.foreach(_.onFailure { case t => throw new RuntimeException(t) })
Peter van 't Hof's avatar
Peter van 't Hof committed
148
    val stats = Await.result(Future.sequence(futures).map(_.fold(Stats())(_ += _)), Duration.Inf)
Peter van 't Hof's avatar
Peter van 't Hof committed
149
    msg.foreach(m => logger.info(s"All jobs for '$m' are done"))
Peter van 't Hof's avatar
Peter van 't Hof committed
150 151 152
    stats
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
153
  /**
Peter van 't Hof's avatar
Peter van 't Hof committed
154 155
   * This method will process 1 thread bin
   *
Peter van 't Hof's avatar
Peter van 't Hof committed
156
   * @param scatters bins to check, there should be no gaps withing the scatters
Peter van 't Hof's avatar
Peter van 't Hof committed
157 158 159
   * @param bamFile Input bamfile
   * @return Output stats
   */
Peter van 't Hof's avatar
Peter van 't Hof committed
160
  def processThread(scatters: List[BedRecord], bamFile: File): Future[Stats] = Future {
Peter van 't Hof's avatar
Peter van 't Hof committed
161
    val totalStats = Stats()
Peter van 't Hof's avatar
Peter van 't Hof committed
162 163 164 165 166 167 168
    val sortedScatters = scatters.sortBy(_.start)
    val samReader = SamReaderFactory.makeDefault().open(bamFile)
    val threadChr = sortedScatters.head.chr
    val threadStart = sortedScatters.head.start
    val threadEnd = sortedScatters.last.end
    val it = samReader.query(threadChr, threadStart, threadEnd, false).buffered
    for (samRecord <- it) {
169 170

      // Read based stats
Peter van 't Hof's avatar
Peter van 't Hof committed
171
      if (samRecord.getAlignmentStart > threadStart && samRecord.getAlignmentStart <= threadEnd) {
Peter van 't Hof's avatar
Peter van 't Hof committed
172
        totalStats.flagstat.loadRecord(samRecord)
Peter van 't Hof's avatar
Peter van 't Hof committed
173
        if (!samRecord.getReadUnmappedFlag) { // Mapped read
Peter van 't Hof's avatar
Peter van 't Hof committed
174
          totalStats.mappingQualityHistogram.add(samRecord.getMappingQuality)
175
        }
176
        if (samRecord.getReadPairedFlag && samRecord.getProperPairFlag && samRecord.getFirstOfPairFlag && !samRecord.getSecondOfPairFlag)
Peter van 't Hof's avatar
Peter van 't Hof committed
177
          totalStats.insertSizeHistogram.add(samRecord.getInferredInsertSize.abs)
178

Peter van 't Hof's avatar
Peter van 't Hof committed
179 180 181 182 183 184 185 186 187 188 189 190 191 192 193
        val leftClipping = samRecord.getAlignmentStart - samRecord.getUnclippedStart
        val rightClipping = samRecord.getUnclippedEnd - samRecord.getAlignmentEnd

        totalStats.clippingHistogram.add(leftClipping + rightClipping)
        totalStats.leftClippingHistogram.add(leftClipping)
        totalStats.rightClippingHistogram.add(rightClipping)

        if (samRecord.getReadNegativeStrandFlag) {
          totalStats._5_ClippingHistogram.add(leftClipping)
          totalStats._3_ClippingHistogram.add(rightClipping)
        } else {
          totalStats._5_ClippingHistogram.add(rightClipping)
          totalStats._3_ClippingHistogram.add(leftClipping)
        }

Peter van 't Hof's avatar
Peter van 't Hof committed
194
        //TODO: Bin Support
Peter van 't Hof's avatar
Peter van 't Hof committed
195 196 197 198 199 200 201 202
      }

      //TODO: bases counting
    }
    samReader.close()

    totalStats
  }
203

Peter van 't Hof's avatar
Peter van 't Hof committed
204
  /**
Peter van 't Hof's avatar
Peter van 't Hof committed
205
   * This method will only count the unmapped fragments
Peter van 't Hof's avatar
Peter van 't Hof committed
206 207
   *
   * @param bamFile Input bamfile
Peter van 't Hof's avatar
Peter van 't Hof committed
208 209
   * @return Output stats
   */
Peter van 't Hof's avatar
Peter van 't Hof committed
210
  def processUnmappedReads(bamFile: File): Future[Stats] = Future {
Peter van 't Hof's avatar
Peter van 't Hof committed
211
    val stats = Stats()
212
    val samReader = SamReaderFactory.makeDefault().open(bamFile)
Peter van 't Hof's avatar
Peter van 't Hof committed
213 214 215
    for (samRecord <- samReader.queryUnmapped()) {
      stats.flagstat.loadRecord(samRecord)
    }
216 217 218
    samReader.close()
    stats
  }
Peter van 't Hof's avatar
Peter van 't Hof committed
219

220
  def tsvToMap(tsvFile: File): Map[String, Array[Long]] = {
Peter van 't Hof's avatar
Peter van 't Hof committed
221 222 223
    val reader = Source.fromFile(tsvFile)
    val it = reader.getLines()
    val header = it.next().split("\t")
224
    val arrays = header.zipWithIndex.map(x => x._2 -> (x._1 -> ArrayBuffer[Long]()))
Peter van 't Hof's avatar
Peter van 't Hof committed
225 226 227 228
    for (line <- it) {
      val values = line.split("\t")
      require(values.size == header.size, s"Line does not have the number of field as header: $line")
      for (array <- arrays) {
229
        array._2._2.append(values(array._1).toLong)
Peter van 't Hof's avatar
Peter van 't Hof committed
230 231 232 233 234 235
      }
    }
    reader.close()
    arrays.map(x => x._2._1 -> x._2._2.toArray).toMap
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
236
}