BamStats.scala 12.1 KB
Newer Older
Peter van 't Hof's avatar
Peter van 't Hof committed
1
/**
2
3
4
5
6
7
8
9
10
11
12
13
14
  * Biopet is built on top of GATK Queue for building bioinformatic
  * pipelines. It is mainly intended to support LUMC SHARK cluster which is running
  * SGE. But other types of HPC that are supported by GATK Queue (such as PBS)
  * should also be able to execute Biopet tools and pipelines.
  *
  * Copyright 2014 Sequencing Analysis Support Core - Leiden University Medical Center
  *
  * Contact us at: sasc@lumc.nl
  *
  * A dual licensing mode is applied. The source code within this project is freely available for non-commercial use under an AGPL
  * license; For commercial users or users who do not want to follow the AGPL
  * license, please contact us to obtain a separate license.
  */
Peter van 't Hof's avatar
Peter van 't Hof committed
15
16
package nl.lumc.sasc.biopet.tools.bamstats

17
import java.io.{File, PrintWriter}
Peter van 't Hof's avatar
Peter van 't Hof committed
18

19
import htsjdk.samtools.{SAMSequenceDictionary, SamReaderFactory}
Peter van 't Hof's avatar
Peter van 't Hof committed
20
import nl.lumc.sasc.biopet.utils.BamUtils.SamDictCheck
21
22
import nl.lumc.sasc.biopet.utils.{ConfigUtils, FastaUtils, ToolCommand}
import nl.lumc.sasc.biopet.utils.intervals.{BedRecord, BedRecordList}
Peter van 't Hof's avatar
Peter van 't Hof committed
23
24

import scala.collection.JavaConversions._
Peter van 't Hof's avatar
Peter van 't Hof committed
25
import scala.collection.mutable.ArrayBuffer
Peter van 't Hof's avatar
Peter van 't Hof committed
26
import scala.concurrent.ExecutionContext.Implicits.global
Peter van 't Hof's avatar
Peter van 't Hof committed
27
import scala.concurrent.duration._
28
import scala.concurrent.{Await, Future}
Peter van 't Hof's avatar
Peter van 't Hof committed
29
import scala.io.Source
Peter van 't Hof's avatar
Peter van 't Hof committed
30
import scala.language.postfixOps
Peter van 't Hof's avatar
Peter van 't Hof committed
31
32

/**
33
34
35
36
  * This tool will collect stats from a bamfile
  *
  * Created by pjvanthof on 25/05/16.
  */
Peter van 't Hof's avatar
Peter van 't Hof committed
37
38
39
40
41
object BamStats extends ToolCommand {
  case class Args(outputDir: File = null,
                  bamFile: File = null,
                  referenceFasta: Option[File] = None,
                  binSize: Int = 10000,
Peter van 't Hof's avatar
Peter van 't Hof committed
42
                  threadBinSize: Int = 10000000,
43
44
                  tsvOutputs: Boolean = false)
      extends AbstractArgs
Peter van 't Hof's avatar
Peter van 't Hof committed
45
46

  class OptParser extends AbstractOptParser {
Peter van 't Hof's avatar
Peter van 't Hof committed
47
    opt[File]('R', "reference") valueName "<file>" action { (x, c) =>
Peter van 't Hof's avatar
Peter van 't Hof committed
48
      c.copy(referenceFasta = Some(x))
Peter van 't Hof's avatar
Peter van 't Hof committed
49
    } text "Fasta file of reference"
Peter van 't Hof's avatar
Peter van 't Hof committed
50
51
    opt[File]('o', "outputDir") required () valueName "<directory>" action { (x, c) =>
      c.copy(outputDir = x)
Peter van 't Hof's avatar
Peter van 't Hof committed
52
    } text "Output directory"
Peter van 't Hof's avatar
Peter van 't Hof committed
53
54
    opt[File]('b', "bam") required () valueName "<file>" action { (x, c) =>
      c.copy(bamFile = x)
Peter van 't Hof's avatar
Peter van 't Hof committed
55
    } text "Input bam file"
Peter van 't Hof's avatar
Peter van 't Hof committed
56
57
    opt[Int]("binSize") valueName "<int>" action { (x, c) =>
      c.copy(binSize = x)
Peter van 't Hof's avatar
Peter van 't Hof committed
58
    } text "Bin size of stats (beta)"
Peter van 't Hof's avatar
Peter van 't Hof committed
59
60
    opt[Int]("threadBinSize") valueName "<int>" action { (x, c) =>
      c.copy(threadBinSize = x)
Peter van 't Hof's avatar
Peter van 't Hof committed
61
    } text "Size of region per thread"
62
    opt[Unit]("tsvOutputs") action { (_, c) =>
Peter van 't Hof's avatar
Peter van 't Hof committed
63
64
      c.copy(tsvOutputs = true)
    } text "Also output tsv files, default there is only a json"
Peter van 't Hof's avatar
Peter van 't Hof committed
65
66
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
67
  /** This is the main entry to [[BamStats]], this will do the argument parsing. */
Peter van 't Hof's avatar
Peter van 't Hof committed
68
69
  def main(args: Array[String]): Unit = {
    val argsParser = new OptParser
70
71
    val cmdArgs
      : Args = argsParser.parse(args, Args()) getOrElse (throw new IllegalArgumentException)
Peter van 't Hof's avatar
Peter van 't Hof committed
72

73
74
    logger.info("Start")

Peter van 't Hof's avatar
Peter van 't Hof committed
75
    val sequenceDict = validateReferenceInBam(cmdArgs.bamFile, cmdArgs.referenceFasta)
Peter van 't Hof's avatar
Peter van 't Hof committed
76

77
78
79
80
81
82
    init(cmdArgs.outputDir,
         cmdArgs.bamFile,
         sequenceDict,
         cmdArgs.binSize,
         cmdArgs.threadBinSize,
         cmdArgs.tsvOutputs)
83
84

    logger.info("Done")
Peter van 't Hof's avatar
Peter van 't Hof committed
85
86
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
87
  /**
88
89
90
    * This will retrieve the [[SAMSequenceDictionary]] from the bam file.
    * When `referenceFasta is given he will validate this against the bam file.`
    */
91
  def validateReferenceInBam(bamFile: File, referenceFasta: Option[File]): SAMSequenceDictionary = {
Peter van 't Hof's avatar
Peter van 't Hof committed
92
93
94
    val samReader = SamReaderFactory.makeDefault().open(bamFile)
    val samHeader = samReader.getFileHeader
    samReader.close()
95
96
    referenceFasta
      .map { f =>
Peter van 't Hof's avatar
Peter van 't Hof committed
97
        samHeader.getSequenceDictionary.assertSameDictionary(FastaUtils.getCachedDict(f), false)
98
99
100
        FastaUtils.getCachedDict(f)
      }
      .getOrElse(samHeader.getSequenceDictionary)
Peter van 't Hof's avatar
Peter van 't Hof committed
101
102
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
103
  /**
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
    * This is the main running function of [[BamStats]]. This will start the threads and collect and write the results.
    *
    * @param outputDir All output files will be placed here
    * @param bamFile Input bam file
    * @param referenceDict Dict for scattering
    * @param binSize stats binsize
    * @param threadBinSize Thread binsize
    */
  def init(outputDir: File,
           bamFile: File,
           referenceDict: SAMSequenceDictionary,
           binSize: Int,
           threadBinSize: Int,
           tsvOutput: Boolean): Unit = {
    val contigsFutures = BedRecordList
      .fromDict(referenceDict)
      .allRecords
      .map { contig =>
        contig.chr -> processContig(contig, bamFile, binSize, threadBinSize, outputDir)
      }
      .toList
Peter van 't Hof's avatar
Peter van 't Hof committed
125

Peter van 't Hof's avatar
Peter van 't Hof committed
126
    val stats = waitOnFutures(processUnmappedReads(bamFile) :: contigsFutures.map(_._2))
127

Peter van 't Hof's avatar
Peter van 't Hof committed
128
129
    if (tsvOutput) {
      stats.flagstat.writeAsTsv(new File(outputDir, "flagstats.tsv"))
Peter van 't Hof's avatar
Peter van 't Hof committed
130

131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
      stats.insertSizeHistogram.writeFilesAndPlot(outputDir,
                                                  "insertsize",
                                                  "Insertsize",
                                                  "Reads",
                                                  "Insertsize distribution")
      stats.mappingQualityHistogram.writeFilesAndPlot(outputDir,
                                                      "mappingQuality",
                                                      "Mapping Quality",
                                                      "Reads",
                                                      "Mapping Quality distribution")
      stats.clippingHistogram.writeFilesAndPlot(outputDir,
                                                "clipping",
                                                "CLipped bases",
                                                "Reads",
                                                "Clipping distribution")
Peter van 't Hof's avatar
Peter van 't Hof committed
146

147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
      stats.leftClippingHistogram.writeFilesAndPlot(outputDir,
                                                    "left_clipping",
                                                    "CLipped bases",
                                                    "Reads",
                                                    "Left Clipping distribution")
      stats.rightClippingHistogram.writeFilesAndPlot(outputDir,
                                                     "right_clipping",
                                                     "CLipped bases",
                                                     "Reads",
                                                     "Right Clipping distribution")
      stats._3_ClippingHistogram.writeFilesAndPlot(outputDir,
                                                   "3prime_clipping",
                                                   "CLipped bases",
                                                   "Reads",
                                                   "3 Prime Clipping distribution")
      stats._5_ClippingHistogram.writeFilesAndPlot(outputDir,
                                                   "5prime_clipping",
                                                   "CLipped bases",
                                                   "Reads",
                                                   "5 Prime Clipping distribution")
Peter van 't Hof's avatar
Peter van 't Hof committed
167
    }
Peter van 't Hof's avatar
Peter van 't Hof committed
168

Peter van 't Hof's avatar
Peter van 't Hof committed
169
170
    val statsWriter = new PrintWriter(new File(outputDir, "bamstats.json"))
    val totalStats = stats.toSummaryMap
Peter van 't Hof's avatar
Peter van 't Hof committed
171
    val statsMap = Map(
Peter van 't Hof's avatar
Peter van 't Hof committed
172
      "total" -> totalStats,
173
174
175
      "contigs" -> contigsFutures
        .map(x => x._1 -> Await.result(x._2, Duration.Zero).toSummaryMap)
        .toMap
Peter van 't Hof's avatar
Peter van 't Hof committed
176
    )
Peter van 't Hof's avatar
Peter van 't Hof committed
177
    statsWriter.println(ConfigUtils.mapToJson(statsMap).nospaces)
Peter van 't Hof's avatar
Peter van 't Hof committed
178
    statsWriter.close()
Peter van 't Hof's avatar
Peter van 't Hof committed
179
180

    val summaryWriter = new PrintWriter(new File(outputDir, "bamstats.summary.json"))
Peter van 't Hof's avatar
Peter van 't Hof committed
181
    summaryWriter.println(ConfigUtils.mapToJson(totalStats).nospaces)
Peter van 't Hof's avatar
Peter van 't Hof committed
182
    summaryWriter.close()
Peter van 't Hof's avatar
Peter van 't Hof committed
183
184
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
185
  /**
186
187
188
189
190
191
192
193
194
195
196
197
198
    * This will start the subjobs for each contig and collect [[Stats]] on contig level
    *
    * @param region Region to check, mostly yhis is the complete contig
    * @param bamFile Input bam file
    * @param binSize stats binsize
    * @param threadBinSize Thread binsize
    * @return Output stats
    */
  def processContig(region: BedRecord,
                    bamFile: File,
                    binSize: Int,
                    threadBinSize: Int,
                    outputDir: File): Future[Stats] = Future {
Peter van 't Hof's avatar
Peter van 't Hof committed
199
    val scattersFutures = region
Peter van 't Hof's avatar
Peter van 't Hof committed
200
      .scatter(binSize)
201
202
      .grouped(
        (region.length.toDouble / binSize).ceil.toInt / (region.length.toDouble / threadBinSize).ceil.toInt)
Peter van 't Hof's avatar
Peter van 't Hof committed
203
      .map(scatters => processThread(scatters, bamFile))
Peter van 't Hof's avatar
Peter van 't Hof committed
204
      .toList
Peter van 't Hof's avatar
Peter van 't Hof committed
205
    waitOnFutures(scattersFutures, Some(region.chr))
Peter van 't Hof's avatar
Peter van 't Hof committed
206
  }
Peter van 't Hof's avatar
Peter van 't Hof committed
207

Peter van 't Hof's avatar
Peter van 't Hof committed
208
  /**
209
210
211
212
213
214
    * This method will wait when all futures are complete and collect a single [[Stats]] instance
    *
    * @param futures List of futures to monitor
    * @param msg Optional message for logging
    * @return Output stats
    */
Peter van 't Hof's avatar
Peter van 't Hof committed
215
  def waitOnFutures(futures: List[Future[Stats]], msg: Option[String] = None): Stats = {
Peter van 't Hof's avatar
Peter van 't Hof committed
216
    msg.foreach(m => logger.info(s"Start monitoring jobs for '$m', ${futures.size} jobs"))
Peter van 't Hof's avatar
Peter van 't Hof committed
217
    futures.foreach(_.onFailure { case t => throw new RuntimeException(t) })
Peter van 't Hof's avatar
Peter van 't Hof committed
218
    val stats = Await.result(Future.sequence(futures).map(_.fold(Stats())(_ += _)), Duration.Inf)
Peter van 't Hof's avatar
Peter van 't Hof committed
219
    msg.foreach(m => logger.info(s"All jobs for '$m' are done"))
Peter van 't Hof's avatar
Peter van 't Hof committed
220
221
222
    stats
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
223
  /**
224
225
226
227
228
229
    * This method will process 1 thread bin
    *
    * @param scatters bins to check, there should be no gaps withing the scatters
    * @param bamFile Input bamfile
    * @return Output stats
    */
Peter van 't Hof's avatar
Peter van 't Hof committed
230
  def processThread(scatters: List[BedRecord], bamFile: File): Future[Stats] = Future {
Peter van 't Hof's avatar
Peter van 't Hof committed
231
    val totalStats = Stats()
Peter van 't Hof's avatar
Peter van 't Hof committed
232
233
234
235
236
237
238
    val sortedScatters = scatters.sortBy(_.start)
    val samReader = SamReaderFactory.makeDefault().open(bamFile)
    val threadChr = sortedScatters.head.chr
    val threadStart = sortedScatters.head.start
    val threadEnd = sortedScatters.last.end
    val it = samReader.query(threadChr, threadStart, threadEnd, false).buffered
    for (samRecord <- it) {
239
240

      // Read based stats
Peter van 't Hof's avatar
Peter van 't Hof committed
241
      if (samRecord.getAlignmentStart > threadStart && samRecord.getAlignmentStart <= threadEnd) {
Peter van 't Hof's avatar
Peter van 't Hof committed
242
        totalStats.flagstat.loadRecord(samRecord)
Peter van 't Hof's avatar
Peter van 't Hof committed
243
        if (!samRecord.getReadUnmappedFlag) { // Mapped read
Peter van 't Hof's avatar
Peter van 't Hof committed
244
          totalStats.mappingQualityHistogram.add(samRecord.getMappingQuality)
245
        }
246
        if (samRecord.getReadPairedFlag && samRecord.getProperPairFlag && samRecord.getFirstOfPairFlag && !samRecord.getSecondOfPairFlag)
Peter van 't Hof's avatar
Peter van 't Hof committed
247
          totalStats.insertSizeHistogram.add(samRecord.getInferredInsertSize.abs)
248

Peter van 't Hof's avatar
Peter van 't Hof committed
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
        val leftClipping = samRecord.getAlignmentStart - samRecord.getUnclippedStart
        val rightClipping = samRecord.getUnclippedEnd - samRecord.getAlignmentEnd

        totalStats.clippingHistogram.add(leftClipping + rightClipping)
        totalStats.leftClippingHistogram.add(leftClipping)
        totalStats.rightClippingHistogram.add(rightClipping)

        if (samRecord.getReadNegativeStrandFlag) {
          totalStats._5_ClippingHistogram.add(leftClipping)
          totalStats._3_ClippingHistogram.add(rightClipping)
        } else {
          totalStats._5_ClippingHistogram.add(rightClipping)
          totalStats._3_ClippingHistogram.add(leftClipping)
        }

Peter van 't Hof's avatar
Peter van 't Hof committed
264
        //TODO: Bin Support
Peter van 't Hof's avatar
Peter van 't Hof committed
265
266
267
268
269
270
271
272
      }

      //TODO: bases counting
    }
    samReader.close()

    totalStats
  }
273

Peter van 't Hof's avatar
Peter van 't Hof committed
274
  /**
275
276
277
278
279
    * This method will only count the unmapped fragments
    *
    * @param bamFile Input bamfile
    * @return Output stats
    */
Peter van 't Hof's avatar
Peter van 't Hof committed
280
  def processUnmappedReads(bamFile: File): Future[Stats] = Future {
Peter van 't Hof's avatar
Peter van 't Hof committed
281
    val stats = Stats()
282
    val samReader = SamReaderFactory.makeDefault().open(bamFile)
Peter van 't Hof's avatar
Peter van 't Hof committed
283
284
285
    for (samRecord <- samReader.queryUnmapped()) {
      stats.flagstat.loadRecord(samRecord)
    }
286
287
288
    samReader.close()
    stats
  }
Peter van 't Hof's avatar
Peter van 't Hof committed
289

290
  def tsvToMap(tsvFile: File): Map[String, Array[Long]] = {
Peter van 't Hof's avatar
Peter van 't Hof committed
291
292
293
    val reader = Source.fromFile(tsvFile)
    val it = reader.getLines()
    val header = it.next().split("\t")
294
    val arrays = header.zipWithIndex.map(x => x._2 -> (x._1 -> ArrayBuffer[Long]()))
Peter van 't Hof's avatar
Peter van 't Hof committed
295
296
    for (line <- it) {
      val values = line.split("\t")
297
298
      require(values.size == header.size,
              s"Line does not have the number of field as header: $line")
Peter van 't Hof's avatar
Peter van 't Hof committed
299
      for (array <- arrays) {
300
        array._2._2.append(values(array._1).toLong)
Peter van 't Hof's avatar
Peter van 't Hof committed
301
302
303
304
305
306
      }
    }
    reader.close()
    arrays.map(x => x._2._1 -> x._2._2.toArray).toMap
  }

Peter van 't Hof's avatar
Peter van 't Hof committed
307
}