BamStats.scala 8.11 KB
Newer Older
Peter van 't Hof's avatar
Peter van 't Hof committed
1
2
3
package nl.lumc.sasc.biopet.tools.bamstats

import java.io.File
Peter van 't Hof's avatar
Peter van 't Hof committed
4
import java.util.concurrent.TimeoutException
Peter van 't Hof's avatar
Peter van 't Hof committed
5
6

import htsjdk.samtools.reference.FastaSequenceFile
Peter van 't Hof's avatar
Peter van 't Hof committed
7
import htsjdk.samtools.{ SAMSequenceDictionary, SamReaderFactory }
Peter van 't Hof's avatar
Peter van 't Hof committed
8
import nl.lumc.sasc.biopet.utils.BamUtils.SamDictCheck
Peter van 't Hof's avatar
Peter van 't Hof committed
9
import nl.lumc.sasc.biopet.utils.ToolCommand
Peter van 't Hof's avatar
Peter van 't Hof committed
10
import nl.lumc.sasc.biopet.utils.intervals.{ BedRecord, BedRecordList }
Peter van 't Hof's avatar
Peter van 't Hof committed
11
12

import scala.collection.JavaConversions._
Peter van 't Hof's avatar
Peter van 't Hof committed
13
import scala.concurrent.ExecutionContext.Implicits.global
Peter van 't Hof's avatar
Peter van 't Hof committed
14
15
import scala.concurrent.duration._
import scala.concurrent.{ Await, Future }
Peter van 't Hof's avatar
Peter van 't Hof committed
16
import scala.language.postfixOps
Peter van 't Hof's avatar
Peter van 't Hof committed
17
18

/**
Peter van 't Hof's avatar
Peter van 't Hof committed
19
20
 * This tool will collect stats from a bamfile
 *
Peter van 't Hof's avatar
Peter van 't Hof committed
21
22
 * Created by pjvanthof on 25/05/16.
 */
Peter van 't Hof's avatar
Peter van 't Hof committed
23
24
25
26
27
28
29
30
object BamStats extends ToolCommand {
  case class Args(outputDir: File = null,
                  bamFile: File = null,
                  referenceFasta: Option[File] = None,
                  binSize: Int = 10000,
                  threadBinSize: Int = 10000000) extends AbstractArgs

  class OptParser extends AbstractOptParser {
Peter van 't Hof's avatar
Peter van 't Hof committed
31
    opt[File]('R', "reference") valueName "<file>" action { (x, c) =>
Peter van 't Hof's avatar
Peter van 't Hof committed
32
      c.copy(referenceFasta = Some(x))
Peter van 't Hof's avatar
Peter van 't Hof committed
33
    } text "Fasta file of reference"
Peter van 't Hof's avatar
Peter van 't Hof committed
34
35
    opt[File]('o', "outputDir") required () valueName "<directory>" action { (x, c) =>
      c.copy(outputDir = x)
Peter van 't Hof's avatar
Peter van 't Hof committed
36
    } text "Output directory"
Peter van 't Hof's avatar
Peter van 't Hof committed
37
38
    opt[File]('b', "bam") required () valueName "<file>" action { (x, c) =>
      c.copy(bamFile = x)
Peter van 't Hof's avatar
Peter van 't Hof committed
39
    } text "Input bam file"
Peter van 't Hof's avatar
Peter van 't Hof committed
40
41
    opt[Int]("binSize") valueName "<int>" action { (x, c) =>
      c.copy(binSize = x)
Peter van 't Hof's avatar
Peter van 't Hof committed
42
    } text "Bin size of stats (beta)"
Peter van 't Hof's avatar
Peter van 't Hof committed
43
44
    opt[Int]("threadBinSize") valueName "<int>" action { (x, c) =>
      c.copy(threadBinSize = x)
Peter van 't Hof's avatar
Peter van 't Hof committed
45
    } text "Size of region per thread"
Peter van 't Hof's avatar
Peter van 't Hof committed
46
47
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
48
  /** This is the main entry to [[BamStats]], this will do the argument parsing. */
Peter van 't Hof's avatar
Peter van 't Hof committed
49
50
51
52
  def main(args: Array[String]): Unit = {
    val argsParser = new OptParser
    val cmdArgs: Args = argsParser.parse(args, Args()) getOrElse (throw new IllegalArgumentException)

53
54
    logger.info("Start")

Peter van 't Hof's avatar
Peter van 't Hof committed
55
    val sequenceDict = validateReferenceInBam(cmdArgs.bamFile, cmdArgs.referenceFasta)
Peter van 't Hof's avatar
Peter van 't Hof committed
56
57

    init(cmdArgs.outputDir, cmdArgs.bamFile, sequenceDict, cmdArgs.binSize, cmdArgs.threadBinSize)
58
59

    logger.info("Done")
Peter van 't Hof's avatar
Peter van 't Hof committed
60
61
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
62
63
64
65
  /**
   * This will retrieve the [[SAMSequenceDictionary]] from the bam file.
   * When `referenceFasta is given he will validate this against the bam file.`
   */
Peter van 't Hof's avatar
Peter van 't Hof committed
66
  def validateReferenceInBam(bamFile: File, referenceFasta: Option[File]) = {
Peter van 't Hof's avatar
Peter van 't Hof committed
67
68
69
70
71
72
73
74
75
76
77
78
    val samReader = SamReaderFactory.makeDefault().open(bamFile)
    val samHeader = samReader.getFileHeader
    samReader.close()
    referenceFasta.map { f =>
      val referenceReader = new FastaSequenceFile(f, true)
      val referenceDict = referenceReader.getSequenceDictionary
      samHeader.getSequenceDictionary.assertSameDictionary(referenceDict, false)
      referenceReader.close()
      referenceDict
    }.getOrElse(samHeader.getSequenceDictionary)
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
79
  /**
Peter van 't Hof's avatar
Peter van 't Hof committed
80
81
82
83
84
85
86
87
   * This is the main running function of [[BamStats]]. This will start the thereads and collect and write the results.
   *
   * @param outputDir All output files will be placed here
   * @param bamFile Input bam file
   * @param referenceDict Dict for scattering
   * @param binSize stats binsize
   * @param threadBinSize Thread binsize
   */
Peter van 't Hof's avatar
Peter van 't Hof committed
88
89
  def init(outputDir: File, bamFile: File, referenceDict: SAMSequenceDictionary, binSize: Int, threadBinSize: Int): Unit = {
    val contigsFutures = BedRecordList.fromDict(referenceDict).allRecords.map { contig =>
Peter van 't Hof's avatar
Peter van 't Hof committed
90
      Future { processContig(contig, bamFile, binSize, threadBinSize, outputDir) }
Peter van 't Hof's avatar
Peter van 't Hof committed
91
92
    }

93
94
    val unmappedFuture = Future { processUnmappedReads(bamFile) }

Peter van 't Hof's avatar
Peter van 't Hof committed
95
    val stats = waitOnFutures(unmappedFuture :: contigsFutures.toList)
96
97

    logger.info(s"total: ${stats.totalReads},  unmapped: ${stats.unmapped}, secondary: ${stats.secondary}")
Peter van 't Hof's avatar
Peter van 't Hof committed
98

Peter van 't Hof's avatar
Peter van 't Hof committed
99
    stats.writeStatsToFiles(outputDir)
Peter van 't Hof's avatar
Peter van 't Hof committed
100
101
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
102
  /**
Peter van 't Hof's avatar
Peter van 't Hof committed
103
104
105
106
107
108
109
110
   * This will start the subjobs for each contig and collect [[Stats]] on contig level
   *
   * @param region Region to check, mostly yhis is the complete contig
   * @param bamFile Input bam file
   * @param binSize stats binsize
   * @param threadBinSize Thread binsize
   * @return Output stats
   */
Peter van 't Hof's avatar
Peter van 't Hof committed
111
  def processContig(region: BedRecord, bamFile: File, binSize: Int, threadBinSize: Int, outputDir: File): Stats = {
Peter van 't Hof's avatar
Peter van 't Hof committed
112
    val scattersFutures = region
Peter van 't Hof's avatar
Peter van 't Hof committed
113
      .scatter(binSize)
Peter van 't Hof's avatar
Peter van 't Hof committed
114
      .grouped((region.length.toDouble / threadBinSize).ceil.toInt)
Peter van 't Hof's avatar
Peter van 't Hof committed
115
      .map(scatters => Future { processThread(scatters, bamFile) })
Peter van 't Hof's avatar
Peter van 't Hof committed
116
117
118
119
120
    val stats = waitOnFutures(scattersFutures.toList, Some(region.chr))
    val contigDir = new File(outputDir, "contigs" +  File.separator + region.chr)
    contigDir.mkdirs()
    stats.writeStatsToFiles(contigDir)
    stats
Peter van 't Hof's avatar
Peter van 't Hof committed
121
  }
Peter van 't Hof's avatar
Peter van 't Hof committed
122

Peter van 't Hof's avatar
Peter van 't Hof committed
123
  /**
Peter van 't Hof's avatar
Peter van 't Hof committed
124
   * This method will wait when all futures are complete and collect a single [[Stats]] instance
Peter van 't Hof's avatar
Peter van 't Hof committed
125
   *
Peter van 't Hof's avatar
Peter van 't Hof committed
126
127
128
129
   * @param futures List of futures to monitor
   * @param msg Optional message for logging
   * @return Output stats
   */
Peter van 't Hof's avatar
Peter van 't Hof committed
130
  def waitOnFutures(futures: List[Future[Stats]], msg: Option[String] = None): Stats = {
Peter van 't Hof's avatar
Peter van 't Hof committed
131
    msg.foreach(m => logger.info(s"Start monitoring jobs for '$m', ${futures.size} jobs"))
Peter van 't Hof's avatar
Peter van 't Hof committed
132
133
134
135
136
137
138
    futures.foreach(_.onFailure { case t => throw new RuntimeException(t) })
    var stats = Stats()
    var running = futures
    while (running.nonEmpty) {
      val done = running.filter(_.value.isDefined)
      done.foreach(stats += _.value.get.get)
      running = running.filterNot(done.contains(_))
Peter van 't Hof's avatar
Peter van 't Hof committed
139
      if (running.nonEmpty && done.nonEmpty) msg.foreach(m => logger.info(s"Jobs for '$m', ${running.size}/${futures.size} jobs"))
Peter van 't Hof's avatar
Peter van 't Hof committed
140
141
142
143
144
      if (running.nonEmpty) try {
        Await.ready(running.head, 1 second)
      } catch {
        case e: TimeoutException =>
      }
Peter van 't Hof's avatar
Peter van 't Hof committed
145
    }
Peter van 't Hof's avatar
Peter van 't Hof committed
146
    msg.foreach(m => logger.info(s"All jobs for '$m' are done"))
Peter van 't Hof's avatar
Peter van 't Hof committed
147
148
149
    stats
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
150
  /**
Peter van 't Hof's avatar
Peter van 't Hof committed
151
152
   * This method will process 1 thread bin
   *
Peter van 't Hof's avatar
Peter van 't Hof committed
153
   * @param scatters bins to check, there should be no gaps withing the scatters
Peter van 't Hof's avatar
Peter van 't Hof committed
154
155
156
   * @param bamFile Input bamfile
   * @return Output stats
   */
Peter van 't Hof's avatar
Peter van 't Hof committed
157
  def processThread(scatters: List[BedRecord], bamFile: File): Stats = {
Peter van 't Hof's avatar
Peter van 't Hof committed
158
    val totalStats = Stats()
Peter van 't Hof's avatar
Peter van 't Hof committed
159
160
161
162
163
164
165
    val sortedScatters = scatters.sortBy(_.start)
    val samReader = SamReaderFactory.makeDefault().open(bamFile)
    val threadChr = sortedScatters.head.chr
    val threadStart = sortedScatters.head.start
    val threadEnd = sortedScatters.last.end
    val it = samReader.query(threadChr, threadStart, threadEnd, false).buffered
    for (samRecord <- it) {
166
167

      // Read based stats
Peter van 't Hof's avatar
Peter van 't Hof committed
168
169
      if (samRecord.getAlignmentStart > threadStart && samRecord.getAlignmentStart <= threadEnd) {
        totalStats.totalReads += 1
170
171
172
        if (samRecord.isSecondaryOrSupplementary) totalStats.secondary += 1
        if (samRecord.getReadUnmappedFlag) totalStats.unmapped += 1
        else { // Mapped read
Peter van 't Hof's avatar
Peter van 't Hof committed
173
          totalStats.mappingQualityHistogram.add(samRecord.getMappingQuality)
174
175
        }
        if (samRecord.getProperPairFlag && samRecord.getFirstOfPairFlag && !samRecord.getSecondOfPairFlag)
Peter van 't Hof's avatar
Peter van 't Hof committed
176
          totalStats.insertSizeHistogram.add(samRecord.getInferredInsertSize.abs)
177

Peter van 't Hof's avatar
Peter van 't Hof committed
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
        val leftClipping = samRecord.getAlignmentStart - samRecord.getUnclippedStart
        val rightClipping = samRecord.getUnclippedEnd - samRecord.getAlignmentEnd

        totalStats.clippingHistogram.add(leftClipping + rightClipping)
        totalStats.leftClippingHistogram.add(leftClipping)
        totalStats.rightClippingHistogram.add(rightClipping)

        if (samRecord.getReadNegativeStrandFlag) {
          totalStats._5_ClippingHistogram.add(leftClipping)
          totalStats._3_ClippingHistogram.add(rightClipping)
        } else {
          totalStats._5_ClippingHistogram.add(rightClipping)
          totalStats._3_ClippingHistogram.add(leftClipping)
        }

Peter van 't Hof's avatar
Peter van 't Hof committed
193
        //TODO: Bin Support
Peter van 't Hof's avatar
Peter van 't Hof committed
194
195
196
197
198
199
200
201
      }

      //TODO: bases counting
    }
    samReader.close()

    totalStats
  }
202

Peter van 't Hof's avatar
Peter van 't Hof committed
203
  /**
Peter van 't Hof's avatar
Peter van 't Hof committed
204
   * This method will only count the unmapped fragments
Peter van 't Hof's avatar
Peter van 't Hof committed
205
206
    *
    * @param bamFile Input bamfile
Peter van 't Hof's avatar
Peter van 't Hof committed
207
208
   * @return Output stats
   */
209
  def processUnmappedReads(bamFile: File): Stats = {
Peter van 't Hof's avatar
Peter van 't Hof committed
210
    val stats = Stats()
211
212
213
214
215
216
217
    val samReader = SamReaderFactory.makeDefault().open(bamFile)
    var size = samReader.queryUnmapped().size
    stats.totalReads += size
    stats.unmapped += size
    samReader.close()
    stats
  }
Peter van 't Hof's avatar
Peter van 't Hof committed
218
}