VcfStats.scala 17.9 KB
Newer Older
Peter van 't Hof's avatar
Peter van 't Hof committed
1
package nl.lumc.sasc.biopet.tools.vcfstats
2

3
import java.io.File
4
import java.net.URLClassLoader
5

6
import htsjdk.variant.variantcontext.{Genotype, VariantContext}
7
import htsjdk.variant.vcf.VCFFileReader
8
import nl.lumc.sasc.biopet.utils.intervals.{BedRecord, BedRecordList}
Peter van 't Hof's avatar
Peter van 't Hof committed
9
import nl.lumc.sasc.biopet.utils._
10
import org.apache.spark.{SparkConf, SparkContext}
Peter van 't Hof's avatar
Peter van 't Hof committed
11

12 13
import scala.collection.JavaConversions._
import scala.collection.mutable
14 15 16
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
Peter van 't Hof's avatar
Peter van 't Hof committed
17

18
/**
19
  * Created by pjvanthof on 14/07/2017.
20
  */
21
object VcfStats extends ToolCommand {
22

23
  type Args = VcfStatsArgs
24

25
  def main(args: Array[String]): Unit = {
26

27
    logger.info("Started")
28 29
    val argsParser = new VcfStatsOptParser(commandName)
    val cmdArgs = argsParser.parse(args, VcfStatsArgs()) getOrElse (throw new IllegalArgumentException)
30

Peter van 't Hof's avatar
Peter van 't Hof committed
31
    val reader = new VCFFileReader(cmdArgs.inputFile, true)
32 33 34
    val header = reader.getFileHeader
    val samples = header.getSampleNamesInOrder.toList

Peter van 't Hof's avatar
Peter van 't Hof committed
35 36
    reader.close()

Peter van 't Hof's avatar
Peter van 't Hof committed
37
    val adInfoTags = {
38 39 40
      (for (infoTag <- cmdArgs.infoTags if !defaultInfoFields.contains(infoTag)) yield {
        require(header.getInfoHeaderLine(infoTag) != null,
                "Info tag '" + infoTag + "' not found in header of vcf file")
Peter van 't Hof's avatar
Peter van 't Hof committed
41
        infoTag
42 43 44
      }) ::: (for (line <- header.getInfoHeaderLines if cmdArgs.allInfoTags
                   if !defaultInfoFields.contains(line.getID)
                   if !cmdArgs.infoTags.contains(line.getID)) yield {
Peter van 't Hof's avatar
Peter van 't Hof committed
45 46 47
        line.getID
      }).toList ::: defaultInfoFields
    }
48

49 50 51 52
    val adGenotypeTags = (for (genotypeTag <- cmdArgs.genotypeTags
                               if !defaultGenotypeFields.contains(genotypeTag)) yield {
      require(header.getFormatHeaderLine(genotypeTag) != null,
              "Format tag '" + genotypeTag + "' not found in header of vcf file")
53
      genotypeTag
54 55 56 57
    }) ::: (for (line <- header.getFormatHeaderLines if cmdArgs.allGenotypeTags
                 if !defaultGenotypeFields.contains(line.getID)
                 if !cmdArgs.genotypeTags.contains(line.getID)
                 if line.getID != "PL") yield {
58
      line.getID
Peter van 't Hof's avatar
Peter van 't Hof committed
59
    }).toList ::: defaultGenotypeFields
60

61 62 63 64 65 66 67 68 69 70 71 72 73 74
    logger.info("Init spark context")

    val jars = ClassLoader.getSystemClassLoader
      .asInstanceOf[URLClassLoader]
      .getURLs
      .map(_.getFile)
    val conf = new SparkConf()
      .setAppName(commandName)
      .setMaster(cmdArgs.sparkMaster.getOrElse(s"local[${cmdArgs.localThreads}]"))
      .setJars(jars)
    val sc = new SparkContext(conf)
    logger.info("Spark context is up")

    val regions = (cmdArgs.intervals match {
Peter van 't Hof's avatar
Peter van 't Hof committed
75 76
      case Some(i) =>
        BedRecordList.fromFile(i).validateContigs(cmdArgs.referenceFile)
77
      case _ => BedRecordList.fromReference(cmdArgs.referenceFile)
78 79
    }).combineOverlap
      .scatter(cmdArgs.binSize, maxContigsInSingleJob = Some(cmdArgs.maxContigsInSingleJob))
Peter van 't Hof's avatar
Peter van 't Hof committed
80

81 82 83
    val regionStats = sc
      .parallelize(regions, regions.size)
      .map(readBin(_, samples, cmdArgs, adInfoTags, adGenotypeTags))
84 85 86 87 88 89 90 91
      .cache()

    val totalStats = Future(regionStats.aggregate(Stats.emptyStats(samples))(_ += _._1, _ += _))
      .map(
        _.writeAllOutput(cmdArgs.outputDir,
                         samples,
                         adGenotypeTags,
                         adInfoTags,
Peter van 't Hof's avatar
Peter van 't Hof committed
92 93
                         sampleDistributions,
                         None))
94 95 96 97 98 99 100 101 102 103

    regionStats
      .flatMap(_._2)
      .aggregateByKey(Stats.emptyStats(samples))(_ += _, _ += _)
      .foreach {
        case (k, v) =>
          v.writeAllOutput(new File(cmdArgs.outputDir, "contigs" + File.separator + k),
                           samples,
                           adGenotypeTags,
                           adInfoTags,
Peter van 't Hof's avatar
Peter van 't Hof committed
104 105
                           sampleDistributions,
                           Some(k))
106 107
      }
    regionStats.unpersist()
Peter van 't Hof's avatar
Peter van 't Hof committed
108

109
    Await.result(totalStats, Duration.Inf)
110

Peter van 't Hof's avatar
Peter van 't Hof committed
111 112
    val completeStatsJson = regions
      .flatMap(_.map(_.chr))
113
      .foldLeft(ConfigUtils.fileToConfigMap(new File(cmdArgs.outputDir, "total.json"))) {
Peter van 't Hof's avatar
Peter van 't Hof committed
114 115 116
        case (map, contig) =>
          val contigMap = ConfigUtils.fileToConfigMap(
            new File(cmdArgs.outputDir,
117
                     "contigs" + File.separator + contig + File.separator + s"$contig.json"))
Peter van 't Hof's avatar
Peter van 't Hof committed
118 119 120 121 122 123
          ConfigUtils.mergeMaps(map, contigMap)
      }

    IoUtils.writeLinesToFile(new File(cmdArgs.outputDir, "stats.json"),
                             ConfigUtils.mapToJson(completeStatsJson).nospaces :: Nil)

124
    sc.stop
125
    logger.info("Done")
Peter van 't Hof's avatar
Peter van 't Hof committed
126 127
  }

128 129 130 131
  def readBin(bedRecords: List[BedRecord],
              samples: List[String],
              cmdArgs: Args,
              adInfoTags: List[String],
132
              adGenotypeTags: List[String]): (Stats, List[(String, Stats)]) = {
133
    val reader = new VCFFileReader(cmdArgs.inputFile, true)
134 135 136 137 138
    val totalStats = Stats.emptyStats(samples)
    val dict = FastaUtils.getDictFromFasta(cmdArgs.referenceFile)

    val nonCompleteContigs = for (bedRecord <- bedRecords) yield {
      val stats = Stats.emptyStats(samples)
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166

      logger.info("Starting on: " + bedRecord)

      val samInterval = bedRecord.toSamInterval

      val query =
        reader.query(samInterval.getContig, samInterval.getStart, samInterval.getEnd)
      if (!query.hasNext) {
        Stats.mergeNestedStatsMap(stats.generalStats, fillGeneral(adInfoTags))
        for (sample <- samples) yield {
          Stats.mergeNestedStatsMap(stats.samplesStats(sample).genotypeStats,
                                    fillGenotype(adGenotypeTags))
        }
      }

      for (record <- query if record.getStart <= samInterval.getEnd) {
        Stats.mergeNestedStatsMap(stats.generalStats, checkGeneral(record, adInfoTags))
        for (sample1 <- samples) yield {
          val genotype = record.getGenotype(sample1)
          Stats.mergeNestedStatsMap(stats.samplesStats(sample1).genotypeStats,
                                    checkGenotype(record, genotype, adGenotypeTags))
          for (sample2 <- samples) {
            val genotype2 = record.getGenotype(sample2)
            if (genotype.getAlleles == genotype2.getAlleles)
              stats.samplesStats(sample1).sampleToSample(sample2).genotypeOverlap += 1
            stats.samplesStats(sample1).sampleToSample(sample2).alleleOverlap += VcfUtils
              .alleleOverlap(genotype.getAlleles.toList, genotype2.getAlleles.toList)
          }
Peter van 't Hof's avatar
Peter van 't Hof committed
167
        }
Peter van 't Hof's avatar
Peter van 't Hof committed
168
      }
169 170 171 172 173 174 175 176
      totalStats += stats
      if (bedRecord.length == dict.getSequence(bedRecord.chr).getSequenceLength) {
        Future {
          stats.writeAllOutput(
            new File(cmdArgs.outputDir, "contigs" + File.separator + bedRecord.chr),
            samples,
            adGenotypeTags,
            adInfoTags,
Peter van 't Hof's avatar
Peter van 't Hof committed
177 178
            sampleDistributions,
            Some(bedRecord.chr))
179 180 181
          None
        }
      } else Future.successful(Some(bedRecord.chr -> stats))
Peter van 't Hof's avatar
Peter van 't Hof committed
182
    }
183 184
    reader.close()

185
    (totalStats, Await.result(Future.sequence(nonCompleteContigs), Duration.Inf).flatten)
Peter van 't Hof's avatar
Peter van 't Hof committed
186 187
  }

188 189 190 191
  val defaultGenotypeFields =
    List("DP", "GQ", "AD", "AD-ref", "AD-alt", "AD-used", "AD-not_used", "general")

  val defaultInfoFields = List("QUAL", "general", "AC", "AF", "AN", "DP")
Peter van 't Hof's avatar
Peter van 't Hof committed
192

193 194 195 196 197 198 199 200 201 202 203 204 205 206
  val sampleDistributions = List("Het",
                                 "HetNonRef",
                                 "Hom",
                                 "HomRef",
                                 "HomVar",
                                 "Mixed",
                                 "NoCall",
                                 "NonInformative",
                                 "Available",
                                 "Called",
                                 "Filtered",
                                 "Variant")

  /** Function to check sample/genotype specific stats */
207 208 209
  protected[tools] def checkGenotype(record: VariantContext,
                                     genotype: Genotype,
                                     additionalTags: List[String]): Map[String, Map[Any, Int]] = {
210 211 212 213 214 215 216 217
    val buffer = mutable.Map[String, Map[Any, Int]]()

    def addToBuffer(key: String, value: Any, found: Boolean): Unit = {
      val map = buffer.getOrElse(key, Map())
      if (found) buffer += key -> (map + (value -> (map.getOrElse(value, 0) + 1)))
      else buffer += key -> (map + (value -> map.getOrElse(value, 0)))
    }

218 219
    buffer += "DP" -> Map((if (genotype.hasDP) genotype.getDP else "not set") -> 1)
    buffer += "GQ" -> Map((if (genotype.hasGQ) genotype.getGQ else "not set") -> 1)
220

221 222
    val usedAlleles =
      (for (allele <- genotype.getAlleles) yield record.getAlleleIndex(allele)).toList
223

224 225 226 227 228 229 230 231 232 233 234 235 236
    addToBuffer("general", "Total", found = true)
    addToBuffer("general", "Het", genotype.isHet)
    addToBuffer("general", "HetNonRef", genotype.isHetNonRef)
    addToBuffer("general", "Hom", genotype.isHom)
    addToBuffer("general", "HomRef", genotype.isHomRef)
    addToBuffer("general", "HomVar", genotype.isHomVar)
    addToBuffer("general", "Mixed", genotype.isMixed)
    addToBuffer("general", "NoCall", genotype.isNoCall)
    addToBuffer("general", "NonInformative", genotype.isNonInformative)
    addToBuffer("general", "Available", genotype.isAvailable)
    addToBuffer("general", "Called", genotype.isCalled)
    addToBuffer("general", "Filtered", genotype.isFiltered)
    addToBuffer("general", "Variant", genotype.isHetNonRef || genotype.isHet || genotype.isHomVar)
237

238 239 240 241 242 243 244 245 246 247 248 249
    if (genotype.hasAD) {
      val ad = genotype.getAD
      for (i <- 0 until ad.size if ad(i) > 0) {
        addToBuffer("AD", ad(i), found = true)
        if (i == 0) addToBuffer("AD-ref", ad(i), found = true)
        if (i > 0) addToBuffer("AD-alt", ad(i), found = true)
        if (usedAlleles.contains(i)) addToBuffer("AD-used", ad(i), found = true)
        else addToBuffer("AD-not_used", ad(i), found = true)
      }
    }

    val skipTags = List("DP", "GQ", "AD", "AD-ref", "AD-alt", "AD-used", "AD-not_used", "general")
250 251

    for (tag <- additionalTags if !skipTags.contains(tag)) {
252 253 254
      val value = genotype.getAnyAttribute(tag)
      if (value == null) addToBuffer(tag, "notset", found = true)
      else addToBuffer(tag, value, found = true)
255 256
    }

257
    buffer.toMap
258 259
  }

260
  /** Function to check all general stats, all info expect sample/genotype specific stats */
261 262
  protected[tools] def checkGeneral(record: VariantContext,
                                    additionalTags: List[String]): Map[String, Map[Any, Int]] = {
Peter van 't Hof's avatar
Peter van 't Hof committed
263 264
    val buffer = mutable.Map[String, Map[Any, Int]]()

265
    def addToBuffer(key: String, value: Any, found: Boolean): Unit = {
Peter van 't Hof's avatar
Peter van 't Hof committed
266
      val map = buffer.getOrElse(key, Map())
267
      if (found) buffer += key -> (map + (value -> (map.getOrElse(value, 0) + 1)))
Peter van 't Hof's avatar
Peter van 't Hof committed
268
      else buffer += key -> (map + (value -> map.getOrElse(value, 0)))
Peter van 't Hof's avatar
Peter van 't Hof committed
269 270
    }

Peter van 't Hof's avatar
Peter van 't Hof committed
271
    addToBuffer("QUAL", Math.round(record.getPhredScaledQual), found = true)
Peter van 't Hof's avatar
Peter van 't Hof committed
272

273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309
    addToBuffer("SampleDistribution-Het",
                record.getGenotypes.count(genotype => genotype.isHet),
                found = true)
    addToBuffer("SampleDistribution-HetNonRef",
                record.getGenotypes.count(genotype => genotype.isHetNonRef),
                found = true)
    addToBuffer("SampleDistribution-Hom",
                record.getGenotypes.count(genotype => genotype.isHom),
                found = true)
    addToBuffer("SampleDistribution-HomRef",
                record.getGenotypes.count(genotype => genotype.isHomRef),
                found = true)
    addToBuffer("SampleDistribution-HomVar",
                record.getGenotypes.count(genotype => genotype.isHomVar),
                found = true)
    addToBuffer("SampleDistribution-Mixed",
                record.getGenotypes.count(genotype => genotype.isMixed),
                found = true)
    addToBuffer("SampleDistribution-NoCall",
                record.getGenotypes.count(genotype => genotype.isNoCall),
                found = true)
    addToBuffer("SampleDistribution-NonInformative",
                record.getGenotypes.count(genotype => genotype.isNonInformative),
                found = true)
    addToBuffer("SampleDistribution-Available",
                record.getGenotypes.count(genotype => genotype.isAvailable),
                found = true)
    addToBuffer("SampleDistribution-Called",
                record.getGenotypes.count(genotype => genotype.isCalled),
                found = true)
    addToBuffer("SampleDistribution-Filtered",
                record.getGenotypes.count(genotype => genotype.isFiltered),
                found = true)
    addToBuffer("SampleDistribution-Variant",
                record.getGenotypes.count(genotype =>
                  genotype.isHetNonRef || genotype.isHet || genotype.isHomVar),
                found = true)
Peter van 't Hof's avatar
Peter van 't Hof committed
310

Peter van 't Hof's avatar
Peter van 't Hof committed
311
    addToBuffer("general", "Total", found = true)
312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330
    addToBuffer("general", "Biallelic", record.isBiallelic)
    addToBuffer("general", "ComplexIndel", record.isComplexIndel)
    addToBuffer("general", "Filtered", record.isFiltered)
    addToBuffer("general", "FullyDecoded", record.isFullyDecoded)
    addToBuffer("general", "Indel", record.isIndel)
    addToBuffer("general", "Mixed", record.isMixed)
    addToBuffer("general", "MNP", record.isMNP)
    addToBuffer("general", "MonomorphicInSamples", record.isMonomorphicInSamples)
    addToBuffer("general", "NotFiltered", record.isNotFiltered)
    addToBuffer("general", "PointEvent", record.isPointEvent)
    addToBuffer("general", "PolymorphicInSamples", record.isPolymorphicInSamples)
    addToBuffer("general", "SimpleDeletion", record.isSimpleDeletion)
    addToBuffer("general", "SimpleInsertion", record.isSimpleInsertion)
    addToBuffer("general", "SNP", record.isSNP)
    addToBuffer("general", "StructuralIndel", record.isStructuralIndel)
    addToBuffer("general", "Symbolic", record.isSymbolic)
    addToBuffer("general", "SymbolicOrSV", record.isSymbolicOrSV)
    addToBuffer("general", "Variant", record.isVariant)

Peter van 't Hof's avatar
Peter van 't Hof committed
331 332 333
    val skipTags = List("QUAL", "general")

    for (tag <- additionalTags if !skipTags.contains(tag)) {
334
      val value = record.getAttribute(tag)
Peter van 't Hof's avatar
Peter van 't Hof committed
335 336
      if (value == null) addToBuffer(tag, "notset", found = true)
      else addToBuffer(tag, value, found = true)
337 338
    }

339
    buffer.toMap
Peter van 't Hof's avatar
Peter van 't Hof committed
340 341
  }

342
  protected[tools] def fillGeneral(additionalTags: List[String]): Map[String, Map[Any, Int]] = {
343 344 345 346 347 348 349 350
    val buffer = mutable.Map[String, Map[Any, Int]]()

    def addToBuffer(key: String, value: Any, found: Boolean): Unit = {
      val map = buffer.getOrElse(key, Map())
      if (found) buffer += key -> (map + (value -> (map.getOrElse(value, 0) + 1)))
      else buffer += key -> (map + (value -> map.getOrElse(value, 0)))
    }

351 352 353 354 355 356 357 358 359 360 361 362 363 364
    addToBuffer("QUAL", "not set", found = false)

    addToBuffer("SampleDistribution-Het", "not set", found = false)
    addToBuffer("SampleDistribution-HetNonRef", "not set", found = false)
    addToBuffer("SampleDistribution-Hom", "not set", found = false)
    addToBuffer("SampleDistribution-HomRef", "not set", found = false)
    addToBuffer("SampleDistribution-HomVar", "not set", found = false)
    addToBuffer("SampleDistribution-Mixed", "not set", found = false)
    addToBuffer("SampleDistribution-NoCall", "not set", found = false)
    addToBuffer("SampleDistribution-NonInformative", "not set", found = false)
    addToBuffer("SampleDistribution-Available", "not set", found = false)
    addToBuffer("SampleDistribution-Called", "not set", found = false)
    addToBuffer("SampleDistribution-Filtered", "not set", found = false)
    addToBuffer("SampleDistribution-Variant", "not set", found = false)
Peter van 't Hof's avatar
Peter van 't Hof committed
365 366

    addToBuffer("general", "Total", found = false)
367 368
    addToBuffer("general", "Biallelic", found = false)
    addToBuffer("general", "ComplexIndel", found = false)
Peter van 't Hof's avatar
Peter van 't Hof committed
369
    addToBuffer("general", "Filtered", found = false)
370 371 372 373 374 375 376 377 378 379 380 381 382 383
    addToBuffer("general", "FullyDecoded", found = false)
    addToBuffer("general", "Indel", found = false)
    addToBuffer("general", "Mixed", found = false)
    addToBuffer("general", "MNP", found = false)
    addToBuffer("general", "MonomorphicInSamples", found = false)
    addToBuffer("general", "NotFiltered", found = false)
    addToBuffer("general", "PointEvent", found = false)
    addToBuffer("general", "PolymorphicInSamples", found = false)
    addToBuffer("general", "SimpleDeletion", found = false)
    addToBuffer("general", "SimpleInsertion", found = false)
    addToBuffer("general", "SNP", found = false)
    addToBuffer("general", "StructuralIndel", found = false)
    addToBuffer("general", "Symbolic", found = false)
    addToBuffer("general", "SymbolicOrSV", found = false)
Peter van 't Hof's avatar
Peter van 't Hof committed
384
    addToBuffer("general", "Variant", found = false)
385

386
    val skipTags = List("QUAL", "general")
387 388

    for (tag <- additionalTags if !skipTags.contains(tag)) {
389
      addToBuffer(tag, "not set", found = false)
390 391
    }

392
    buffer.toMap
393 394
  }

395
  protected[tools] def fillGenotype(additionalTags: List[String]): Map[String, Map[Any, Int]] = {
Peter van 't Hof's avatar
Peter van 't Hof committed
396
    val buffer = mutable.Map[String, Map[Any, Int]]()
Peter van 't Hof's avatar
Peter van 't Hof committed
397

398
    def addToBuffer(key: String, value: Any, found: Boolean): Unit = {
Peter van 't Hof's avatar
Peter van 't Hof committed
399
      val map = buffer.getOrElse(key, Map())
400
      if (found) buffer += key -> (map + (value -> (map.getOrElse(value, 0) + 1)))
Peter van 't Hof's avatar
Peter van 't Hof committed
401
      else buffer += key -> (map + (value -> map.getOrElse(value, 0)))
Peter van 't Hof's avatar
Peter van 't Hof committed
402 403
    }

404 405
    addToBuffer("DP", "not set", found = false)
    addToBuffer("GQ", "not set", found = false)
Peter van 't Hof's avatar
Peter van 't Hof committed
406

407 408 409 410 411 412 413 414 415 416 417 418 419
    addToBuffer("general", "Total", found = false)
    addToBuffer("general", "Het", found = false)
    addToBuffer("general", "HetNonRef", found = false)
    addToBuffer("general", "Hom", found = false)
    addToBuffer("general", "HomRef", found = false)
    addToBuffer("general", "HomVar", found = false)
    addToBuffer("general", "Mixed", found = false)
    addToBuffer("general", "NoCall", found = false)
    addToBuffer("general", "NonInformative", found = false)
    addToBuffer("general", "Available", found = false)
    addToBuffer("general", "Called", found = false)
    addToBuffer("general", "Filtered", found = false)
    addToBuffer("general", "Variant", found = false)
Peter van 't Hof's avatar
Peter van 't Hof committed
420

Peter van 't Hof's avatar
Peter van 't Hof committed
421 422 423
    val skipTags = List("DP", "GQ", "AD", "AD-ref", "AD-alt", "AD-used", "AD-not_used", "general")

    for (tag <- additionalTags if !skipTags.contains(tag)) {
424
      addToBuffer(tag, 0, found = false)
425
    }
Peter van 't Hof's avatar
Peter van 't Hof committed
426

427
    buffer.toMap
428 429
  }
}