VcfStats.scala 17.3 KB
Newer Older
Peter van 't Hof's avatar
Peter van 't Hof committed
1
package nl.lumc.sasc.biopet.tools.vcfstats
2

3
import java.io.File
4
import java.net.URLClassLoader
5

6
import htsjdk.variant.variantcontext.{Genotype, VariantContext}
7
import htsjdk.variant.vcf.VCFFileReader
8
import nl.lumc.sasc.biopet.utils.intervals.{BedRecord, BedRecordList}
9
import nl.lumc.sasc.biopet.utils.{FastaUtils, ToolCommand, VcfUtils}
10
import org.apache.spark.{SparkConf, SparkContext}
Peter van 't Hof's avatar
Peter van 't Hof committed
11

12
13
import scala.collection.JavaConversions._
import scala.collection.mutable
14
15
16
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
pjvan_thof's avatar
pjvan_thof committed
17

18
/**
19
  * Created by pjvanthof on 14/07/2017.
20
  */
21
object VcfStats extends ToolCommand {
22

23
  type Args = VcfStatsArgs
24

25
  def main(args: Array[String]): Unit = {
26

27
    logger.info("Started")
28
29
    val argsParser = new VcfStatsOptParser(commandName)
    val cmdArgs = argsParser.parse(args, VcfStatsArgs()) getOrElse (throw new IllegalArgumentException)
30

Peter van 't Hof's avatar
Peter van 't Hof committed
31
    val reader = new VCFFileReader(cmdArgs.inputFile, true)
32
33
34
    val header = reader.getFileHeader
    val samples = header.getSampleNamesInOrder.toList

Peter van 't Hof's avatar
Peter van 't Hof committed
35
36
    reader.close()

Peter van 't Hof's avatar
Peter van 't Hof committed
37
    val adInfoTags = {
38
39
40
      (for (infoTag <- cmdArgs.infoTags if !defaultInfoFields.contains(infoTag)) yield {
        require(header.getInfoHeaderLine(infoTag) != null,
                "Info tag '" + infoTag + "' not found in header of vcf file")
Peter van 't Hof's avatar
Peter van 't Hof committed
41
        infoTag
42
43
44
      }) ::: (for (line <- header.getInfoHeaderLines if cmdArgs.allInfoTags
                   if !defaultInfoFields.contains(line.getID)
                   if !cmdArgs.infoTags.contains(line.getID)) yield {
Peter van 't Hof's avatar
Peter van 't Hof committed
45
46
47
        line.getID
      }).toList ::: defaultInfoFields
    }
48

49
50
51
52
    val adGenotypeTags = (for (genotypeTag <- cmdArgs.genotypeTags
                               if !defaultGenotypeFields.contains(genotypeTag)) yield {
      require(header.getFormatHeaderLine(genotypeTag) != null,
              "Format tag '" + genotypeTag + "' not found in header of vcf file")
53
      genotypeTag
54
55
56
57
    }) ::: (for (line <- header.getFormatHeaderLines if cmdArgs.allGenotypeTags
                 if !defaultGenotypeFields.contains(line.getID)
                 if !cmdArgs.genotypeTags.contains(line.getID)
                 if line.getID != "PL") yield {
58
      line.getID
Peter van 't Hof's avatar
Peter van 't Hof committed
59
    }).toList ::: defaultGenotypeFields
60

61
62
63
64
65
66
67
68
69
70
71
72
73
74
    logger.info("Init spark context")

    val jars = ClassLoader.getSystemClassLoader
      .asInstanceOf[URLClassLoader]
      .getURLs
      .map(_.getFile)
    val conf = new SparkConf()
      .setAppName(commandName)
      .setMaster(cmdArgs.sparkMaster.getOrElse(s"local[${cmdArgs.localThreads}]"))
      .setJars(jars)
    val sc = new SparkContext(conf)
    logger.info("Spark context is up")

    val regions = (cmdArgs.intervals match {
Peter van 't Hof's avatar
Peter van 't Hof committed
75
76
      case Some(i) =>
        BedRecordList.fromFile(i).validateContigs(cmdArgs.referenceFile)
77
      case _ => BedRecordList.fromReference(cmdArgs.referenceFile)
78
79
    }).combineOverlap
      .scatter(cmdArgs.binSize, maxContigsInSingleJob = Some(cmdArgs.maxContigsInSingleJob))
Peter van 't Hof's avatar
Peter van 't Hof committed
80

81
82
83
    val regionStats = sc
      .parallelize(regions, regions.size)
      .map(readBin(_, samples, cmdArgs, adInfoTags, adGenotypeTags))
84
85
86
87
88
89
90
91
      .cache()

    val totalStats = Future(regionStats.aggregate(Stats.emptyStats(samples))(_ += _._1, _ += _))
      .map(
        _.writeAllOutput(cmdArgs.outputDir,
                         samples,
                         adGenotypeTags,
                         adInfoTags,
pjvan_thof's avatar
pjvan_thof committed
92
93
                         sampleDistributions,
                         None))
94
95
96
97
98
99
100
101
102
103

    regionStats
      .flatMap(_._2)
      .aggregateByKey(Stats.emptyStats(samples))(_ += _, _ += _)
      .foreach {
        case (k, v) =>
          v.writeAllOutput(new File(cmdArgs.outputDir, "contigs" + File.separator + k),
                           samples,
                           adGenotypeTags,
                           adInfoTags,
pjvan_thof's avatar
pjvan_thof committed
104
105
                           sampleDistributions,
                           Some(k))
106
107
      }
    regionStats.unpersist()
Peter van 't Hof's avatar
Peter van 't Hof committed
108

109
    Await.result(totalStats, Duration.Inf)
110

111
    sc.stop
112
    logger.info("Done")
Peter van 't Hof's avatar
Peter van 't Hof committed
113
114
  }

115
116
117
118
  def readBin(bedRecords: List[BedRecord],
              samples: List[String],
              cmdArgs: Args,
              adInfoTags: List[String],
119
              adGenotypeTags: List[String]): (Stats, List[(String, Stats)]) = {
120
    val reader = new VCFFileReader(cmdArgs.inputFile, true)
121
122
123
124
125
    val totalStats = Stats.emptyStats(samples)
    val dict = FastaUtils.getDictFromFasta(cmdArgs.referenceFile)

    val nonCompleteContigs = for (bedRecord <- bedRecords) yield {
      val stats = Stats.emptyStats(samples)
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153

      logger.info("Starting on: " + bedRecord)

      val samInterval = bedRecord.toSamInterval

      val query =
        reader.query(samInterval.getContig, samInterval.getStart, samInterval.getEnd)
      if (!query.hasNext) {
        Stats.mergeNestedStatsMap(stats.generalStats, fillGeneral(adInfoTags))
        for (sample <- samples) yield {
          Stats.mergeNestedStatsMap(stats.samplesStats(sample).genotypeStats,
                                    fillGenotype(adGenotypeTags))
        }
      }

      for (record <- query if record.getStart <= samInterval.getEnd) {
        Stats.mergeNestedStatsMap(stats.generalStats, checkGeneral(record, adInfoTags))
        for (sample1 <- samples) yield {
          val genotype = record.getGenotype(sample1)
          Stats.mergeNestedStatsMap(stats.samplesStats(sample1).genotypeStats,
                                    checkGenotype(record, genotype, adGenotypeTags))
          for (sample2 <- samples) {
            val genotype2 = record.getGenotype(sample2)
            if (genotype.getAlleles == genotype2.getAlleles)
              stats.samplesStats(sample1).sampleToSample(sample2).genotypeOverlap += 1
            stats.samplesStats(sample1).sampleToSample(sample2).alleleOverlap += VcfUtils
              .alleleOverlap(genotype.getAlleles.toList, genotype2.getAlleles.toList)
          }
Peter van 't Hof's avatar
Peter van 't Hof committed
154
        }
Peter van 't Hof's avatar
Peter van 't Hof committed
155
      }
156
157
158
159
160
161
162
163
      totalStats += stats
      if (bedRecord.length == dict.getSequence(bedRecord.chr).getSequenceLength) {
        Future {
          stats.writeAllOutput(
            new File(cmdArgs.outputDir, "contigs" + File.separator + bedRecord.chr),
            samples,
            adGenotypeTags,
            adInfoTags,
pjvan_thof's avatar
pjvan_thof committed
164
165
            sampleDistributions,
            Some(bedRecord.chr))
166
167
168
          None
        }
      } else Future.successful(Some(bedRecord.chr -> stats))
Peter van 't Hof's avatar
Peter van 't Hof committed
169
    }
170
171
    reader.close()

172
    (totalStats, Await.result(Future.sequence(nonCompleteContigs), Duration.Inf).flatten)
Peter van 't Hof's avatar
Peter van 't Hof committed
173
174
  }

175
176
177
178
  val defaultGenotypeFields =
    List("DP", "GQ", "AD", "AD-ref", "AD-alt", "AD-used", "AD-not_used", "general")

  val defaultInfoFields = List("QUAL", "general", "AC", "AF", "AN", "DP")
Peter van 't Hof's avatar
Peter van 't Hof committed
179

180
181
182
183
184
185
186
187
188
189
190
191
192
193
  val sampleDistributions = List("Het",
                                 "HetNonRef",
                                 "Hom",
                                 "HomRef",
                                 "HomVar",
                                 "Mixed",
                                 "NoCall",
                                 "NonInformative",
                                 "Available",
                                 "Called",
                                 "Filtered",
                                 "Variant")

  /** Function to check sample/genotype specific stats */
194
195
196
  protected[tools] def checkGenotype(record: VariantContext,
                                     genotype: Genotype,
                                     additionalTags: List[String]): Map[String, Map[Any, Int]] = {
197
198
199
200
201
202
203
204
    val buffer = mutable.Map[String, Map[Any, Int]]()

    def addToBuffer(key: String, value: Any, found: Boolean): Unit = {
      val map = buffer.getOrElse(key, Map())
      if (found) buffer += key -> (map + (value -> (map.getOrElse(value, 0) + 1)))
      else buffer += key -> (map + (value -> map.getOrElse(value, 0)))
    }

205
206
    buffer += "DP" -> Map((if (genotype.hasDP) genotype.getDP else "not set") -> 1)
    buffer += "GQ" -> Map((if (genotype.hasGQ) genotype.getGQ else "not set") -> 1)
207

208
209
    val usedAlleles =
      (for (allele <- genotype.getAlleles) yield record.getAlleleIndex(allele)).toList
210

211
212
213
214
215
216
217
218
219
220
221
222
223
    addToBuffer("general", "Total", found = true)
    addToBuffer("general", "Het", genotype.isHet)
    addToBuffer("general", "HetNonRef", genotype.isHetNonRef)
    addToBuffer("general", "Hom", genotype.isHom)
    addToBuffer("general", "HomRef", genotype.isHomRef)
    addToBuffer("general", "HomVar", genotype.isHomVar)
    addToBuffer("general", "Mixed", genotype.isMixed)
    addToBuffer("general", "NoCall", genotype.isNoCall)
    addToBuffer("general", "NonInformative", genotype.isNonInformative)
    addToBuffer("general", "Available", genotype.isAvailable)
    addToBuffer("general", "Called", genotype.isCalled)
    addToBuffer("general", "Filtered", genotype.isFiltered)
    addToBuffer("general", "Variant", genotype.isHetNonRef || genotype.isHet || genotype.isHomVar)
224

225
226
227
228
229
230
231
232
233
234
235
236
    if (genotype.hasAD) {
      val ad = genotype.getAD
      for (i <- 0 until ad.size if ad(i) > 0) {
        addToBuffer("AD", ad(i), found = true)
        if (i == 0) addToBuffer("AD-ref", ad(i), found = true)
        if (i > 0) addToBuffer("AD-alt", ad(i), found = true)
        if (usedAlleles.contains(i)) addToBuffer("AD-used", ad(i), found = true)
        else addToBuffer("AD-not_used", ad(i), found = true)
      }
    }

    val skipTags = List("DP", "GQ", "AD", "AD-ref", "AD-alt", "AD-used", "AD-not_used", "general")
237
238

    for (tag <- additionalTags if !skipTags.contains(tag)) {
239
240
241
      val value = genotype.getAnyAttribute(tag)
      if (value == null) addToBuffer(tag, "notset", found = true)
      else addToBuffer(tag, value, found = true)
242
243
    }

244
    buffer.toMap
245
246
  }

247
  /** Function to check all general stats, all info expect sample/genotype specific stats */
248
249
  protected[tools] def checkGeneral(record: VariantContext,
                                    additionalTags: List[String]): Map[String, Map[Any, Int]] = {
Peter van 't Hof's avatar
Peter van 't Hof committed
250
251
    val buffer = mutable.Map[String, Map[Any, Int]]()

252
    def addToBuffer(key: String, value: Any, found: Boolean): Unit = {
Peter van 't Hof's avatar
Peter van 't Hof committed
253
      val map = buffer.getOrElse(key, Map())
254
      if (found) buffer += key -> (map + (value -> (map.getOrElse(value, 0) + 1)))
Peter van 't Hof's avatar
Peter van 't Hof committed
255
      else buffer += key -> (map + (value -> map.getOrElse(value, 0)))
Peter van 't Hof's avatar
Peter van 't Hof committed
256
257
    }

Peter van 't Hof's avatar
Peter van 't Hof committed
258
    addToBuffer("QUAL", Math.round(record.getPhredScaledQual), found = true)
Peter van 't Hof's avatar
Peter van 't Hof committed
259

260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
    addToBuffer("SampleDistribution-Het",
                record.getGenotypes.count(genotype => genotype.isHet),
                found = true)
    addToBuffer("SampleDistribution-HetNonRef",
                record.getGenotypes.count(genotype => genotype.isHetNonRef),
                found = true)
    addToBuffer("SampleDistribution-Hom",
                record.getGenotypes.count(genotype => genotype.isHom),
                found = true)
    addToBuffer("SampleDistribution-HomRef",
                record.getGenotypes.count(genotype => genotype.isHomRef),
                found = true)
    addToBuffer("SampleDistribution-HomVar",
                record.getGenotypes.count(genotype => genotype.isHomVar),
                found = true)
    addToBuffer("SampleDistribution-Mixed",
                record.getGenotypes.count(genotype => genotype.isMixed),
                found = true)
    addToBuffer("SampleDistribution-NoCall",
                record.getGenotypes.count(genotype => genotype.isNoCall),
                found = true)
    addToBuffer("SampleDistribution-NonInformative",
                record.getGenotypes.count(genotype => genotype.isNonInformative),
                found = true)
    addToBuffer("SampleDistribution-Available",
                record.getGenotypes.count(genotype => genotype.isAvailable),
                found = true)
    addToBuffer("SampleDistribution-Called",
                record.getGenotypes.count(genotype => genotype.isCalled),
                found = true)
    addToBuffer("SampleDistribution-Filtered",
                record.getGenotypes.count(genotype => genotype.isFiltered),
                found = true)
    addToBuffer("SampleDistribution-Variant",
                record.getGenotypes.count(genotype =>
                  genotype.isHetNonRef || genotype.isHet || genotype.isHomVar),
                found = true)
Peter van 't Hof's avatar
Peter van 't Hof committed
297

Peter van 't Hof's avatar
Peter van 't Hof committed
298
    addToBuffer("general", "Total", found = true)
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
    addToBuffer("general", "Biallelic", record.isBiallelic)
    addToBuffer("general", "ComplexIndel", record.isComplexIndel)
    addToBuffer("general", "Filtered", record.isFiltered)
    addToBuffer("general", "FullyDecoded", record.isFullyDecoded)
    addToBuffer("general", "Indel", record.isIndel)
    addToBuffer("general", "Mixed", record.isMixed)
    addToBuffer("general", "MNP", record.isMNP)
    addToBuffer("general", "MonomorphicInSamples", record.isMonomorphicInSamples)
    addToBuffer("general", "NotFiltered", record.isNotFiltered)
    addToBuffer("general", "PointEvent", record.isPointEvent)
    addToBuffer("general", "PolymorphicInSamples", record.isPolymorphicInSamples)
    addToBuffer("general", "SimpleDeletion", record.isSimpleDeletion)
    addToBuffer("general", "SimpleInsertion", record.isSimpleInsertion)
    addToBuffer("general", "SNP", record.isSNP)
    addToBuffer("general", "StructuralIndel", record.isStructuralIndel)
    addToBuffer("general", "Symbolic", record.isSymbolic)
    addToBuffer("general", "SymbolicOrSV", record.isSymbolicOrSV)
    addToBuffer("general", "Variant", record.isVariant)

Peter van 't Hof's avatar
Peter van 't Hof committed
318
319
320
    val skipTags = List("QUAL", "general")

    for (tag <- additionalTags if !skipTags.contains(tag)) {
321
      val value = record.getAttribute(tag)
Peter van 't Hof's avatar
Peter van 't Hof committed
322
323
      if (value == null) addToBuffer(tag, "notset", found = true)
      else addToBuffer(tag, value, found = true)
324
325
    }

326
    buffer.toMap
Peter van 't Hof's avatar
Peter van 't Hof committed
327
328
  }

329
  protected[tools] def fillGeneral(additionalTags: List[String]): Map[String, Map[Any, Int]] = {
330
331
332
333
334
335
336
337
    val buffer = mutable.Map[String, Map[Any, Int]]()

    def addToBuffer(key: String, value: Any, found: Boolean): Unit = {
      val map = buffer.getOrElse(key, Map())
      if (found) buffer += key -> (map + (value -> (map.getOrElse(value, 0) + 1)))
      else buffer += key -> (map + (value -> map.getOrElse(value, 0)))
    }

338
339
340
341
342
343
344
345
346
347
348
349
350
351
    addToBuffer("QUAL", "not set", found = false)

    addToBuffer("SampleDistribution-Het", "not set", found = false)
    addToBuffer("SampleDistribution-HetNonRef", "not set", found = false)
    addToBuffer("SampleDistribution-Hom", "not set", found = false)
    addToBuffer("SampleDistribution-HomRef", "not set", found = false)
    addToBuffer("SampleDistribution-HomVar", "not set", found = false)
    addToBuffer("SampleDistribution-Mixed", "not set", found = false)
    addToBuffer("SampleDistribution-NoCall", "not set", found = false)
    addToBuffer("SampleDistribution-NonInformative", "not set", found = false)
    addToBuffer("SampleDistribution-Available", "not set", found = false)
    addToBuffer("SampleDistribution-Called", "not set", found = false)
    addToBuffer("SampleDistribution-Filtered", "not set", found = false)
    addToBuffer("SampleDistribution-Variant", "not set", found = false)
Peter van 't Hof's avatar
Peter van 't Hof committed
352
353

    addToBuffer("general", "Total", found = false)
354
355
    addToBuffer("general", "Biallelic", found = false)
    addToBuffer("general", "ComplexIndel", found = false)
Peter van 't Hof's avatar
Peter van 't Hof committed
356
    addToBuffer("general", "Filtered", found = false)
357
358
359
360
361
362
363
364
365
366
367
368
369
370
    addToBuffer("general", "FullyDecoded", found = false)
    addToBuffer("general", "Indel", found = false)
    addToBuffer("general", "Mixed", found = false)
    addToBuffer("general", "MNP", found = false)
    addToBuffer("general", "MonomorphicInSamples", found = false)
    addToBuffer("general", "NotFiltered", found = false)
    addToBuffer("general", "PointEvent", found = false)
    addToBuffer("general", "PolymorphicInSamples", found = false)
    addToBuffer("general", "SimpleDeletion", found = false)
    addToBuffer("general", "SimpleInsertion", found = false)
    addToBuffer("general", "SNP", found = false)
    addToBuffer("general", "StructuralIndel", found = false)
    addToBuffer("general", "Symbolic", found = false)
    addToBuffer("general", "SymbolicOrSV", found = false)
Peter van 't Hof's avatar
Peter van 't Hof committed
371
    addToBuffer("general", "Variant", found = false)
372

373
    val skipTags = List("QUAL", "general")
374
375

    for (tag <- additionalTags if !skipTags.contains(tag)) {
376
      addToBuffer(tag, "not set", found = false)
377
378
    }

379
    buffer.toMap
380
381
  }

382
  protected[tools] def fillGenotype(additionalTags: List[String]): Map[String, Map[Any, Int]] = {
Peter van 't Hof's avatar
Peter van 't Hof committed
383
    val buffer = mutable.Map[String, Map[Any, Int]]()
Peter van 't Hof's avatar
Peter van 't Hof committed
384

385
    def addToBuffer(key: String, value: Any, found: Boolean): Unit = {
Peter van 't Hof's avatar
Peter van 't Hof committed
386
      val map = buffer.getOrElse(key, Map())
387
      if (found) buffer += key -> (map + (value -> (map.getOrElse(value, 0) + 1)))
Peter van 't Hof's avatar
Peter van 't Hof committed
388
      else buffer += key -> (map + (value -> map.getOrElse(value, 0)))
Peter van 't Hof's avatar
Peter van 't Hof committed
389
390
    }

391
392
    addToBuffer("DP", "not set", found = false)
    addToBuffer("GQ", "not set", found = false)
Peter van 't Hof's avatar
Peter van 't Hof committed
393

394
395
396
397
398
399
400
401
402
403
404
405
406
    addToBuffer("general", "Total", found = false)
    addToBuffer("general", "Het", found = false)
    addToBuffer("general", "HetNonRef", found = false)
    addToBuffer("general", "Hom", found = false)
    addToBuffer("general", "HomRef", found = false)
    addToBuffer("general", "HomVar", found = false)
    addToBuffer("general", "Mixed", found = false)
    addToBuffer("general", "NoCall", found = false)
    addToBuffer("general", "NonInformative", found = false)
    addToBuffer("general", "Available", found = false)
    addToBuffer("general", "Called", found = false)
    addToBuffer("general", "Filtered", found = false)
    addToBuffer("general", "Variant", found = false)
Peter van 't Hof's avatar
Peter van 't Hof committed
407

Peter van 't Hof's avatar
Peter van 't Hof committed
408
409
410
    val skipTags = List("DP", "GQ", "AD", "AD-ref", "AD-alt", "AD-used", "AD-not_used", "general")

    for (tag <- additionalTags if !skipTags.contains(tag)) {
411
      addToBuffer(tag, 0, found = false)
412
    }
Peter van 't Hof's avatar
Peter van 't Hof committed
413

414
    buffer.toMap
415
416
  }
}