VcfStats.scala 17.3 KB
Newer Older
Peter van 't Hof's avatar
Peter van 't Hof committed
1
package nl.lumc.sasc.biopet.tools.vcfstats
2

3
import java.io.File
4
import java.net.URLClassLoader
5

6
import htsjdk.variant.variantcontext.{Genotype, VariantContext}
7
import htsjdk.variant.vcf.VCFFileReader
8
import nl.lumc.sasc.biopet.utils.intervals.{BedRecord, BedRecordList}
9
import nl.lumc.sasc.biopet.utils.{FastaUtils, ToolCommand, VcfUtils}
10
import org.apache.spark.{SparkConf, SparkContext}
Peter van 't Hof's avatar
Peter van 't Hof committed
11

12 13
import scala.collection.JavaConversions._
import scala.collection.mutable
14 15 16
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
Peter van 't Hof's avatar
Peter van 't Hof committed
17

18
/**
19
  * Created by pjvanthof on 14/07/2017.
20
  */
21
object VcfStats extends ToolCommand {
22

23
  type Args = VcfStatsArgs
24

25
  def main(args: Array[String]): Unit = {
26

27
    logger.info("Started")
28 29
    val argsParser = new VcfStatsOptParser(commandName)
    val cmdArgs = argsParser.parse(args, VcfStatsArgs()) getOrElse (throw new IllegalArgumentException)
30

Peter van 't Hof's avatar
Peter van 't Hof committed
31
    val reader = new VCFFileReader(cmdArgs.inputFile, true)
32 33 34
    val header = reader.getFileHeader
    val samples = header.getSampleNamesInOrder.toList

Peter van 't Hof's avatar
Peter van 't Hof committed
35 36
    reader.close()

Peter van 't Hof's avatar
Peter van 't Hof committed
37
    val adInfoTags = {
38 39 40
      (for (infoTag <- cmdArgs.infoTags if !defaultInfoFields.contains(infoTag)) yield {
        require(header.getInfoHeaderLine(infoTag) != null,
                "Info tag '" + infoTag + "' not found in header of vcf file")
Peter van 't Hof's avatar
Peter van 't Hof committed
41
        infoTag
42 43 44
      }) ::: (for (line <- header.getInfoHeaderLines if cmdArgs.allInfoTags
                   if !defaultInfoFields.contains(line.getID)
                   if !cmdArgs.infoTags.contains(line.getID)) yield {
Peter van 't Hof's avatar
Peter van 't Hof committed
45 46 47
        line.getID
      }).toList ::: defaultInfoFields
    }
48

49 50 51 52
    val adGenotypeTags = (for (genotypeTag <- cmdArgs.genotypeTags
                               if !defaultGenotypeFields.contains(genotypeTag)) yield {
      require(header.getFormatHeaderLine(genotypeTag) != null,
              "Format tag '" + genotypeTag + "' not found in header of vcf file")
53
      genotypeTag
54 55 56 57
    }) ::: (for (line <- header.getFormatHeaderLines if cmdArgs.allGenotypeTags
                 if !defaultGenotypeFields.contains(line.getID)
                 if !cmdArgs.genotypeTags.contains(line.getID)
                 if line.getID != "PL") yield {
58
      line.getID
Peter van 't Hof's avatar
Peter van 't Hof committed
59
    }).toList ::: defaultGenotypeFields
60

61 62 63 64 65 66 67 68 69 70 71 72 73 74
    logger.info("Init spark context")

    val jars = ClassLoader.getSystemClassLoader
      .asInstanceOf[URLClassLoader]
      .getURLs
      .map(_.getFile)
    val conf = new SparkConf()
      .setAppName(commandName)
      .setMaster(cmdArgs.sparkMaster.getOrElse(s"local[${cmdArgs.localThreads}]"))
      .setJars(jars)
    val sc = new SparkContext(conf)
    logger.info("Spark context is up")

    val regions = (cmdArgs.intervals match {
Peter van 't Hof's avatar
Peter van 't Hof committed
75 76
      case Some(i) =>
        BedRecordList.fromFile(i).validateContigs(cmdArgs.referenceFile)
77
      case _ => BedRecordList.fromReference(cmdArgs.referenceFile)
78 79
    }).combineOverlap
      .scatter(cmdArgs.binSize, maxContigsInSingleJob = Some(cmdArgs.maxContigsInSingleJob))
Peter van 't Hof's avatar
Peter van 't Hof committed
80

81 82 83
    val regionStats = sc
      .parallelize(regions, regions.size)
      .map(readBin(_, samples, cmdArgs, adInfoTags, adGenotypeTags))
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
      .cache()

    val totalStats = Future(regionStats.aggregate(Stats.emptyStats(samples))(_ += _._1, _ += _))
      .map(
        _.writeAllOutput(cmdArgs.outputDir,
                         samples,
                         adGenotypeTags,
                         adInfoTags,
                         sampleDistributions))

    regionStats
      .flatMap(_._2)
      .aggregateByKey(Stats.emptyStats(samples))(_ += _, _ += _)
      .foreach {
        case (k, v) =>
          v.writeAllOutput(new File(cmdArgs.outputDir, "contigs" + File.separator + k),
                           samples,
                           adGenotypeTags,
                           adInfoTags,
                           sampleDistributions)
      }
    regionStats.unpersist()
Peter van 't Hof's avatar
Peter van 't Hof committed
106

107
    Await.result(totalStats, Duration.Inf)
108

109
    sc.stop
110
    logger.info("Done")
Peter van 't Hof's avatar
Peter van 't Hof committed
111 112
  }

113 114 115 116
  def readBin(bedRecords: List[BedRecord],
              samples: List[String],
              cmdArgs: Args,
              adInfoTags: List[String],
117
              adGenotypeTags: List[String]): (Stats, List[(String, Stats)]) = {
118
    val reader = new VCFFileReader(cmdArgs.inputFile, true)
119 120 121 122 123
    val totalStats = Stats.emptyStats(samples)
    val dict = FastaUtils.getDictFromFasta(cmdArgs.referenceFile)

    val nonCompleteContigs = for (bedRecord <- bedRecords) yield {
      val stats = Stats.emptyStats(samples)
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151

      logger.info("Starting on: " + bedRecord)

      val samInterval = bedRecord.toSamInterval

      val query =
        reader.query(samInterval.getContig, samInterval.getStart, samInterval.getEnd)
      if (!query.hasNext) {
        Stats.mergeNestedStatsMap(stats.generalStats, fillGeneral(adInfoTags))
        for (sample <- samples) yield {
          Stats.mergeNestedStatsMap(stats.samplesStats(sample).genotypeStats,
                                    fillGenotype(adGenotypeTags))
        }
      }

      for (record <- query if record.getStart <= samInterval.getEnd) {
        Stats.mergeNestedStatsMap(stats.generalStats, checkGeneral(record, adInfoTags))
        for (sample1 <- samples) yield {
          val genotype = record.getGenotype(sample1)
          Stats.mergeNestedStatsMap(stats.samplesStats(sample1).genotypeStats,
                                    checkGenotype(record, genotype, adGenotypeTags))
          for (sample2 <- samples) {
            val genotype2 = record.getGenotype(sample2)
            if (genotype.getAlleles == genotype2.getAlleles)
              stats.samplesStats(sample1).sampleToSample(sample2).genotypeOverlap += 1
            stats.samplesStats(sample1).sampleToSample(sample2).alleleOverlap += VcfUtils
              .alleleOverlap(genotype.getAlleles.toList, genotype2.getAlleles.toList)
          }
Peter van 't Hof's avatar
Peter van 't Hof committed
152
        }
Peter van 't Hof's avatar
Peter van 't Hof committed
153
      }
154 155 156 157 158 159 160 161 162 163 164 165
      totalStats += stats
      if (bedRecord.length == dict.getSequence(bedRecord.chr).getSequenceLength) {
        Future {
          stats.writeAllOutput(
            new File(cmdArgs.outputDir, "contigs" + File.separator + bedRecord.chr),
            samples,
            adGenotypeTags,
            adInfoTags,
            sampleDistributions)
          None
        }
      } else Future.successful(Some(bedRecord.chr -> stats))
Peter van 't Hof's avatar
Peter van 't Hof committed
166
    }
167 168
    reader.close()

169
    (totalStats, Await.result(Future.sequence(nonCompleteContigs), Duration.Inf).flatten)
Peter van 't Hof's avatar
Peter van 't Hof committed
170 171
  }

172 173 174 175
  val defaultGenotypeFields =
    List("DP", "GQ", "AD", "AD-ref", "AD-alt", "AD-used", "AD-not_used", "general")

  val defaultInfoFields = List("QUAL", "general", "AC", "AF", "AN", "DP")
Peter van 't Hof's avatar
Peter van 't Hof committed
176

177 178 179 180 181 182 183 184 185 186 187 188 189 190
  val sampleDistributions = List("Het",
                                 "HetNonRef",
                                 "Hom",
                                 "HomRef",
                                 "HomVar",
                                 "Mixed",
                                 "NoCall",
                                 "NonInformative",
                                 "Available",
                                 "Called",
                                 "Filtered",
                                 "Variant")

  /** Function to check sample/genotype specific stats */
191 192 193
  protected[tools] def checkGenotype(record: VariantContext,
                                     genotype: Genotype,
                                     additionalTags: List[String]): Map[String, Map[Any, Int]] = {
194 195 196 197 198 199 200 201
    val buffer = mutable.Map[String, Map[Any, Int]]()

    def addToBuffer(key: String, value: Any, found: Boolean): Unit = {
      val map = buffer.getOrElse(key, Map())
      if (found) buffer += key -> (map + (value -> (map.getOrElse(value, 0) + 1)))
      else buffer += key -> (map + (value -> map.getOrElse(value, 0)))
    }

202 203
    buffer += "DP" -> Map((if (genotype.hasDP) genotype.getDP else "not set") -> 1)
    buffer += "GQ" -> Map((if (genotype.hasGQ) genotype.getGQ else "not set") -> 1)
204

205 206
    val usedAlleles =
      (for (allele <- genotype.getAlleles) yield record.getAlleleIndex(allele)).toList
207

208 209 210 211 212 213 214 215 216 217 218 219 220
    addToBuffer("general", "Total", found = true)
    addToBuffer("general", "Het", genotype.isHet)
    addToBuffer("general", "HetNonRef", genotype.isHetNonRef)
    addToBuffer("general", "Hom", genotype.isHom)
    addToBuffer("general", "HomRef", genotype.isHomRef)
    addToBuffer("general", "HomVar", genotype.isHomVar)
    addToBuffer("general", "Mixed", genotype.isMixed)
    addToBuffer("general", "NoCall", genotype.isNoCall)
    addToBuffer("general", "NonInformative", genotype.isNonInformative)
    addToBuffer("general", "Available", genotype.isAvailable)
    addToBuffer("general", "Called", genotype.isCalled)
    addToBuffer("general", "Filtered", genotype.isFiltered)
    addToBuffer("general", "Variant", genotype.isHetNonRef || genotype.isHet || genotype.isHomVar)
221

222 223 224 225 226 227 228 229 230 231 232 233
    if (genotype.hasAD) {
      val ad = genotype.getAD
      for (i <- 0 until ad.size if ad(i) > 0) {
        addToBuffer("AD", ad(i), found = true)
        if (i == 0) addToBuffer("AD-ref", ad(i), found = true)
        if (i > 0) addToBuffer("AD-alt", ad(i), found = true)
        if (usedAlleles.contains(i)) addToBuffer("AD-used", ad(i), found = true)
        else addToBuffer("AD-not_used", ad(i), found = true)
      }
    }

    val skipTags = List("DP", "GQ", "AD", "AD-ref", "AD-alt", "AD-used", "AD-not_used", "general")
234 235

    for (tag <- additionalTags if !skipTags.contains(tag)) {
236 237 238
      val value = genotype.getAnyAttribute(tag)
      if (value == null) addToBuffer(tag, "notset", found = true)
      else addToBuffer(tag, value, found = true)
239 240
    }

241
    buffer.toMap
242 243
  }

244
  /** Function to check all general stats, all info expect sample/genotype specific stats */
245 246
  protected[tools] def checkGeneral(record: VariantContext,
                                    additionalTags: List[String]): Map[String, Map[Any, Int]] = {
Peter van 't Hof's avatar
Peter van 't Hof committed
247 248
    val buffer = mutable.Map[String, Map[Any, Int]]()

249
    def addToBuffer(key: String, value: Any, found: Boolean): Unit = {
Peter van 't Hof's avatar
Peter van 't Hof committed
250
      val map = buffer.getOrElse(key, Map())
251
      if (found) buffer += key -> (map + (value -> (map.getOrElse(value, 0) + 1)))
Peter van 't Hof's avatar
Peter van 't Hof committed
252
      else buffer += key -> (map + (value -> map.getOrElse(value, 0)))
Peter van 't Hof's avatar
Peter van 't Hof committed
253 254
    }

Peter van 't Hof's avatar
Peter van 't Hof committed
255
    addToBuffer("QUAL", Math.round(record.getPhredScaledQual), found = true)
Peter van 't Hof's avatar
Peter van 't Hof committed
256

257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293
    addToBuffer("SampleDistribution-Het",
                record.getGenotypes.count(genotype => genotype.isHet),
                found = true)
    addToBuffer("SampleDistribution-HetNonRef",
                record.getGenotypes.count(genotype => genotype.isHetNonRef),
                found = true)
    addToBuffer("SampleDistribution-Hom",
                record.getGenotypes.count(genotype => genotype.isHom),
                found = true)
    addToBuffer("SampleDistribution-HomRef",
                record.getGenotypes.count(genotype => genotype.isHomRef),
                found = true)
    addToBuffer("SampleDistribution-HomVar",
                record.getGenotypes.count(genotype => genotype.isHomVar),
                found = true)
    addToBuffer("SampleDistribution-Mixed",
                record.getGenotypes.count(genotype => genotype.isMixed),
                found = true)
    addToBuffer("SampleDistribution-NoCall",
                record.getGenotypes.count(genotype => genotype.isNoCall),
                found = true)
    addToBuffer("SampleDistribution-NonInformative",
                record.getGenotypes.count(genotype => genotype.isNonInformative),
                found = true)
    addToBuffer("SampleDistribution-Available",
                record.getGenotypes.count(genotype => genotype.isAvailable),
                found = true)
    addToBuffer("SampleDistribution-Called",
                record.getGenotypes.count(genotype => genotype.isCalled),
                found = true)
    addToBuffer("SampleDistribution-Filtered",
                record.getGenotypes.count(genotype => genotype.isFiltered),
                found = true)
    addToBuffer("SampleDistribution-Variant",
                record.getGenotypes.count(genotype =>
                  genotype.isHetNonRef || genotype.isHet || genotype.isHomVar),
                found = true)
Peter van 't Hof's avatar
Peter van 't Hof committed
294

Peter van 't Hof's avatar
Peter van 't Hof committed
295
    addToBuffer("general", "Total", found = true)
296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314
    addToBuffer("general", "Biallelic", record.isBiallelic)
    addToBuffer("general", "ComplexIndel", record.isComplexIndel)
    addToBuffer("general", "Filtered", record.isFiltered)
    addToBuffer("general", "FullyDecoded", record.isFullyDecoded)
    addToBuffer("general", "Indel", record.isIndel)
    addToBuffer("general", "Mixed", record.isMixed)
    addToBuffer("general", "MNP", record.isMNP)
    addToBuffer("general", "MonomorphicInSamples", record.isMonomorphicInSamples)
    addToBuffer("general", "NotFiltered", record.isNotFiltered)
    addToBuffer("general", "PointEvent", record.isPointEvent)
    addToBuffer("general", "PolymorphicInSamples", record.isPolymorphicInSamples)
    addToBuffer("general", "SimpleDeletion", record.isSimpleDeletion)
    addToBuffer("general", "SimpleInsertion", record.isSimpleInsertion)
    addToBuffer("general", "SNP", record.isSNP)
    addToBuffer("general", "StructuralIndel", record.isStructuralIndel)
    addToBuffer("general", "Symbolic", record.isSymbolic)
    addToBuffer("general", "SymbolicOrSV", record.isSymbolicOrSV)
    addToBuffer("general", "Variant", record.isVariant)

Peter van 't Hof's avatar
Peter van 't Hof committed
315 316 317
    val skipTags = List("QUAL", "general")

    for (tag <- additionalTags if !skipTags.contains(tag)) {
318
      val value = record.getAttribute(tag)
Peter van 't Hof's avatar
Peter van 't Hof committed
319 320
      if (value == null) addToBuffer(tag, "notset", found = true)
      else addToBuffer(tag, value, found = true)
321 322
    }

323
    buffer.toMap
Peter van 't Hof's avatar
Peter van 't Hof committed
324 325
  }

326
  protected[tools] def fillGeneral(additionalTags: List[String]): Map[String, Map[Any, Int]] = {
327 328 329 330 331 332 333 334
    val buffer = mutable.Map[String, Map[Any, Int]]()

    def addToBuffer(key: String, value: Any, found: Boolean): Unit = {
      val map = buffer.getOrElse(key, Map())
      if (found) buffer += key -> (map + (value -> (map.getOrElse(value, 0) + 1)))
      else buffer += key -> (map + (value -> map.getOrElse(value, 0)))
    }

335 336 337 338 339 340 341 342 343 344 345 346 347 348
    addToBuffer("QUAL", "not set", found = false)

    addToBuffer("SampleDistribution-Het", "not set", found = false)
    addToBuffer("SampleDistribution-HetNonRef", "not set", found = false)
    addToBuffer("SampleDistribution-Hom", "not set", found = false)
    addToBuffer("SampleDistribution-HomRef", "not set", found = false)
    addToBuffer("SampleDistribution-HomVar", "not set", found = false)
    addToBuffer("SampleDistribution-Mixed", "not set", found = false)
    addToBuffer("SampleDistribution-NoCall", "not set", found = false)
    addToBuffer("SampleDistribution-NonInformative", "not set", found = false)
    addToBuffer("SampleDistribution-Available", "not set", found = false)
    addToBuffer("SampleDistribution-Called", "not set", found = false)
    addToBuffer("SampleDistribution-Filtered", "not set", found = false)
    addToBuffer("SampleDistribution-Variant", "not set", found = false)
Peter van 't Hof's avatar
Peter van 't Hof committed
349 350

    addToBuffer("general", "Total", found = false)
351 352
    addToBuffer("general", "Biallelic", found = false)
    addToBuffer("general", "ComplexIndel", found = false)
Peter van 't Hof's avatar
Peter van 't Hof committed
353
    addToBuffer("general", "Filtered", found = false)
354 355 356 357 358 359 360 361 362 363 364 365 366 367
    addToBuffer("general", "FullyDecoded", found = false)
    addToBuffer("general", "Indel", found = false)
    addToBuffer("general", "Mixed", found = false)
    addToBuffer("general", "MNP", found = false)
    addToBuffer("general", "MonomorphicInSamples", found = false)
    addToBuffer("general", "NotFiltered", found = false)
    addToBuffer("general", "PointEvent", found = false)
    addToBuffer("general", "PolymorphicInSamples", found = false)
    addToBuffer("general", "SimpleDeletion", found = false)
    addToBuffer("general", "SimpleInsertion", found = false)
    addToBuffer("general", "SNP", found = false)
    addToBuffer("general", "StructuralIndel", found = false)
    addToBuffer("general", "Symbolic", found = false)
    addToBuffer("general", "SymbolicOrSV", found = false)
Peter van 't Hof's avatar
Peter van 't Hof committed
368
    addToBuffer("general", "Variant", found = false)
369

370
    val skipTags = List("QUAL", "general")
371 372

    for (tag <- additionalTags if !skipTags.contains(tag)) {
373
      addToBuffer(tag, "not set", found = false)
374 375
    }

376
    buffer.toMap
377 378
  }

379
  protected[tools] def fillGenotype(additionalTags: List[String]): Map[String, Map[Any, Int]] = {
Peter van 't Hof's avatar
Peter van 't Hof committed
380
    val buffer = mutable.Map[String, Map[Any, Int]]()
Peter van 't Hof's avatar
Peter van 't Hof committed
381

382
    def addToBuffer(key: String, value: Any, found: Boolean): Unit = {
Peter van 't Hof's avatar
Peter van 't Hof committed
383
      val map = buffer.getOrElse(key, Map())
384
      if (found) buffer += key -> (map + (value -> (map.getOrElse(value, 0) + 1)))
Peter van 't Hof's avatar
Peter van 't Hof committed
385
      else buffer += key -> (map + (value -> map.getOrElse(value, 0)))
Peter van 't Hof's avatar
Peter van 't Hof committed
386 387
    }

388 389
    addToBuffer("DP", "not set", found = false)
    addToBuffer("GQ", "not set", found = false)
Peter van 't Hof's avatar
Peter van 't Hof committed
390

391 392 393 394 395 396 397 398 399 400 401 402 403
    addToBuffer("general", "Total", found = false)
    addToBuffer("general", "Het", found = false)
    addToBuffer("general", "HetNonRef", found = false)
    addToBuffer("general", "Hom", found = false)
    addToBuffer("general", "HomRef", found = false)
    addToBuffer("general", "HomVar", found = false)
    addToBuffer("general", "Mixed", found = false)
    addToBuffer("general", "NoCall", found = false)
    addToBuffer("general", "NonInformative", found = false)
    addToBuffer("general", "Available", found = false)
    addToBuffer("general", "Called", found = false)
    addToBuffer("general", "Filtered", found = false)
    addToBuffer("general", "Variant", found = false)
Peter van 't Hof's avatar
Peter van 't Hof committed
404

Peter van 't Hof's avatar
Peter van 't Hof committed
405 406 407
    val skipTags = List("DP", "GQ", "AD", "AD-ref", "AD-alt", "AD-used", "AD-not_used", "general")

    for (tag <- additionalTags if !skipTags.contains(tag)) {
408
      addToBuffer(tag, 0, found = false)
409
    }
Peter van 't Hof's avatar
Peter van 't Hof committed
410

411
    buffer.toMap
412 413
  }
}