Commit 2d051084 authored by Sander Bollen's avatar Sander Bollen

original scripts

parent eef48e04
import click
from cyvcf2 import VCF, Writer
import json
import enum
class FilterClass(enum.Enum):
NON_CANONICAL = {
"ID": "NON_CANONICAL",
"Description": "Non-canonical chromosome"
}
INDEX_UNCALLED = {
"ID": "INDEX_UNCALLED",
"Description": "Index uncalled or homozygous reference"
}
TOO_HIGH_GONL_AF = {
"ID": "TOO_HIGH_GONL_AF",
"Description": "Too high GoNL allele frequency"
}
TOO_HIGH_GNOMAD_AF = {
"ID": "TOO_HIGH_GNOMAD_AF",
"Description": "Too high GnomAD allele frequency"
}
LOW_GQ = {
"ID": "LOW_GQ",
"Description": "Too low GQ on index"
}
DELETED_ALLELE = {
"ID": "DELETED_ALLELE",
"Description": "The only ALT allele is a deleted allele."
}
class FilterParams(object):
def __init__(self, params_path):
self.params_path = params_path
with open(self.params_path) as handle:
self.params_dict = json.load(handle)
self.__gonl_vcf = None
self.__onekg_vcf = None
self.__gnomad_vcf = None
@property
def min_gq(self):
return self.params_dict['gq_pass']
@property
def max_gonl_af(self):
return self.params_dict['low_gonl_af']
@property
def max_gnomad_af(self):
return self.params_dict['low_gnomad_af']
@property
def index_called(self):
return self.params_dict['index_called']
@property
def canonical_chromosomes(self):
return self.params_dict['canonical_chromosomes']
@property
def gonl_vcf(self):
if self.__gonl_vcf is None:
self.__gonl_vcf = VCF(self.params_dict['gonl_vcf'], gts012=True)
return self.__gonl_vcf
@property
def gonl_af(self):
return self.params_dict["gonl_af"]
@property
def gnomad_vcf(self):
if self.__gnomad_vcf is None:
self.__gnomad_vcf = VCF(self.params_dict['gnomad_vcf'], gts012=True)
return self.__gnomad_vcf
@property
def gnomad_af(self):
return self.params_dict['gnomad_af']
class Filterer(object):
"""
Iterator returning tuples of:
(cyvcf2_record, filter)
if the filter item is None, record is unfiltered
We assume gts012 to be set to True in the cyvcf2 readers
"""
def __init__(self, vcf_it,filter_params, index):
self.vcf_it = vcf_it
self.filters = filter_params
self.index = index
self.canonical_chroms = ["M", "X", "Y"] + list(map(str, range(0, 23)))
def __next__(self):
record = next(self.vcf_it)
filters = []
# filter out deleted alleles regardless
if set(record.ALT) == {"*"}:
return record, [FilterClass.DELETED_ALLELE]
if self.filters.canonical_chromosomes:
chrom = str(record.CHROM)
if "chr" in chrom:
chrom = chrom.split("chr")[-1]
if chrom not in self.canonical_chroms:
filters.append(FilterClass.NON_CANONICAL)
if self.filters.index_called:
gt = record.gt_types[self.index]
if gt == 0 or gt == 3:
filters.append(FilterClass.INDEX_UNCALLED)
if self.filters.min_gq is not None:
gq = record.gt_quals[self.index]
if gq < self.filters.min_gq:
filters.append(FilterClass.LOW_GQ)
if self.filters.max_gonl_af is not None:
gonl_af = self.get_af(record, self.filters.gonl_vcf,
self.filters.gonl_af)
if gonl_af > self.filters.max_gonl_af:
filters.append(FilterClass.TOO_HIGH_GONL_AF)
if self.filters.max_gnomad_af is not None:
gnomad_af = self.get_af(record, self.filters.gnomad_vcf,
self.filters.gnomad_af)
if gnomad_af > self.filters.max_gnomad_af:
filters.append(FilterClass.TOO_HIGH_GNOMAD_AF)
return record, filters
def get_af(self, record, ref_it, af_field="AF"):
"""Get allele frequency of record in reference iterator"""
loc = "{0}:{1}-{2}".format(record.CHROM, record.start, record.end)
ref_records = filter(lambda x: x.REF == record.REF,
filter(lambda x: x.ALT == record.ALT,
filter(lambda x: x.POS == record.POS,
ref_it(loc))))
# if there are more than one identical variants in this vcf
# we will be most conservative, and take the minimum
afs = []
for rr in ref_records:
field_val = rr.INFO.get(af_field)
if isinstance(field_val, tuple):
field_val = min(field_val)
afs.append(field_val)
if len(afs) == 0:
return 0
return min(afs)
def next(self):
return self.__next__()
def __iter__(self):
return self
@click.command()
@click.option("-i", "--input", type=click.Path(exists=True),
help="Path to input VCF file", required=True)
@click.option("-o", "--output", type=click.Path(writable=True),
help="Path to output (filtered) VCF file", required=True)
@click.option("-t", "--trash", type=click.Path(writable=True),
help="Path to trash VCF file", required=True)
@click.option("-p", "--params-file", type=click.Path(exists=True),
help="Path to filter params json", required=True)
@click.option('--index-sample', type=click.STRING,
help="Name of index sample", required=True)
def cli(input, output, trash, params_file, index_sample):
vcf = VCF(input, gts012=True)
idx = vcf.samples.index(index_sample)
for filter_item in list(FilterClass):
vcf.add_filter_to_header(filter_item.value)
out = Writer(output, vcf)
tr = Writer(trash, vcf)
filter_params = FilterParams(params_file)
filter_it = Filterer(vcf, filter_params, idx)
for record, fi in filter_it:
if fi is None or len(fi) == 0:
out.write_record(record)
else:
record.FILTER = [x.name for x in fi]
tr.write_record(record)
out.close()
tr.close()
if __name__ == "__main__":
cli()
import click
import cyvcf2
import numpy as np
from collections import namedtuple
from itertools import chain
from typing import List, Optional, Tuple
Region = namedtuple("Region", ["chr", "start", "end"])
def coverage_for_gvcf_record(record: cyvcf2.Variant) -> List[int]:
"""Get coverage for gvcf record per base"""
start = record.start
end = record.end
try:
dp = record.format("DP")[0][0]
except TypeError:
dp = 0
return [dp]*(end-start)
def gq_for_gvcf_record(record: cyvcf2.Variant) -> List[int]:
start = record.start
end = record.end
gq = record.gt_quals[0]
return [gq]*(end-start)
class CovStats(object):
def __init__(self, records):
self.records = records
self.__coverages = None
self.__gq_qualities = None
@property
def coverages(self) -> List[int]:
if self.__coverages is None:
self.__coverages = list(
chain.from_iterable(
(coverage_for_gvcf_record(x) for x in self.records)
)
)
return self.__coverages
@property
def gq_qualities(self) -> List[int]:
if self.__gq_qualities is None:
self.__gq_qualities = list(
chain.from_iterable(
(gq_for_gvcf_record(x) for x in self.records)
)
)
return self.__gq_qualities
@property
def median_cov(self) -> float:
return np.median(self.coverages)
@property
def mean_cov(self) -> float:
return np.mean(self.coverages)
@property
def median_gq(self) -> float:
return np.median(self.gq_qualities)
@property
def mean_gq(self) -> float:
return np.mean(self.gq_qualities)
def percent_atleast_dp(self, atleast) -> Optional[float]:
passing = [x for x in self.coverages if x >= atleast]
if len(self.coverages) > 0:
return (len(passing)/len(self.coverages))*100
return None
def percent_atleast_gq(self, atleast) -> Optional[float]:
passing = [x for x in self.gq_qualities if x >= atleast]
if len(self.gq_qualities) > 0:
return (len(passing)/len(self.gq_qualities))*100
return None
@property
def stats(self) -> dict:
s = {
"median_dp": self.median_cov,
"mean_dp": self.mean_cov,
"median_gq": self.median_gq,
"mean_gq": self.mean_gq
}
for perc in (10, 20, 30, 50, 100):
key = "perc_at_least_{0}_dp".format(perc)
s[key] = self.percent_atleast_dp(perc)
for perc in (10, 20, 30, 50, 90):
key = "perc_at_least_{0}_gq".format(perc)
s[key] = self.percent_atleast_gq(perc)
return s
class RefRecord(object):
def __init__(self, line: str):
self.line = line # type: str
self.gene = None # type: Optional[str]
self.transcript = None # type: Optional[str]
self.contig = None # type: Optional[str]
self.start = None # type: Optional[int]
self.end = None # type: Optional[int]
self.cds_start = None # type: Optional[int]
self.cds_end = None # type: Optional[int]
self.exon_starts = [] # type: List[int]
self.exon_ends = [] # type: List[int]
self.forward = True
self.parse()
def parse(self):
contents = self.line.strip().split("\t")
if len(contents) < 11:
raise ValueError("refFlat line must have at least 11 fields")
self.gene = contents[0]
self.transcript = contents[1]
self.contig = contents[2]
if "-" in contents[3].strip():
self.forward = False
self.start = int(contents[4])
self.end = int(contents[5])
self.cds_start = int(contents[6])
self.cds_end = int(contents[7])
self.exon_starts = [int(x) for x in contents[9].split(",")[:-1]]
self.exon_ends = [int(x) for x in contents[10].split(",")[:-1]]
@property
def exons(self) -> List[Region]:
regs = []
for s, e in zip(self.exon_starts, self.exon_ends):
regs.append(Region(self.contig, s, e))
return regs
@property
def cds_exons(self) -> List[Tuple[int, Region]]:
regs = []
for i, (s, e) in enumerate(zip(self.exon_starts, self.exon_ends)):
if s < self.cds_start:
s = self.cds_start
if e > self.cds_end:
e = self.cds_end
reg = Region(self.contig, s, e)
if reg.end <= reg.start: # utr exons
continue
regs.append((i, reg))
return regs
def region_coverages(reader: cyvcf2.VCF, regions: List[Region]) -> Optional[dict]:
records = []
for region in regions:
reg_str = "{0}:{1}-{2}".format(region.chr, region.start, region.end)
it = reader(reg_str)
records += list(it)
if len(records) == 0:
return CovStats([]).stats
return CovStats(records).stats
@click.command()
@click.option("-I", "--input-gvcf",
type=click.Path(exists=True, readable=True),
required=True,
help="Path to input VCF file")
@click.option("-R", "--refflat-file",
type=click.Path(exists=True, readable=True),
required=True,
help="Path to refFlat file")
@click.option("--per-exon/--per-transcript",
default=True,
help="Collect metrics per exon or per transcript")
def main(input_gvcf, refflat_file, per_exon):
reader = cyvcf2.VCF(input_gvcf)
header = None
with open(refflat_file) as handle:
for line in handle:
r = RefRecord(line)
if not per_exon:
regions = [x[1] for x in r.cds_exons]
cov = region_coverages(reader, regions)
cov['transcript'] = r.transcript
cov['gene'] = r.gene
if header is None:
header = "\t".join(sorted(cov.keys()))
print(header)
vals = [str(cov[k]) for k in sorted(cov.keys())]
print("\t".join(vals))
else:
for i, reg in enumerate(r.exons):
cov = region_coverages(reader, [reg])
cov['transcript'] = r.transcript
cov['gene'] = r.gene
if r.forward:
cov['exon'] = i+1
else:
cov['exon'] = len(r.exons) - i
if header is None:
header = "\t".join(sorted(cov.keys()))
print(header)
vals = [str(cov[k]) for k in sorted(cov.keys())]
print("\t".join(vals))
if __name__ == "__main__":
main()
import click
import cyvcf2
import numpy
import json
from tqdm import tqdm
from collections import Counter
def gen_chrom_counter(vcf_reader):
"""Generate chromosome counter from VCF reader"""
return Counter({n: 0 for n in vcf_reader.seqnames})
class Sample(object):
def __init__(self, name, idx):
self.name = name
self.idx = idx
self.transversions = 0
self.transitions = 0
self.hom_ref = 0
self.het = 0
self.hom_alt = 0
self.deletions = 0
self.insertions = 0
self.snps = 0
self.__gq_counter = Counter({i: 0 for i in range(100)})
@property
def ti_tv(self):
if self.transversions > 0:
return float(self.transitions)/self.transversions
return numpy.nan
def add_variant(self, var):
"""assuming gts012=True"""
typ = var.gt_types[self.idx]
if typ == 3:
return None
gq = var.gt_quals[self.idx]
self.__gq_counter[gq] += 1
if typ == 0:
self.hom_ref += 1
return None
if typ == 1:
self.het += 1
if typ == 2:
self.hom_alt += 1
if var.is_snp and var.is_transition: # this only works in python 2 for now. See: https://github.com/brentp/cyvcf2/pull/70
self.transitions += 1
self.snps += 1
elif var.is_snp:
self.transversions += 1
self.snps += 1
elif var.is_indel and var.is_deletion:
self.deletions += 1
elif var.is_indel:
self.insertions += 1
@property
def gq_distr(self):
return [self.__gq_counter[x] for x in range(100)]
@property
def total_variants(self):
return self.hom_alt + self.het
@property
def as_dict(self):
return {
"name": self.name,
"total_variants": self.total_variants,
"variant_types": {
"snps": self.snps,
"deletions": self.deletions,
"insertions": self.insertions,
},
"genotypes": {
"hom_ref": self.hom_ref,
"het": self.het,
"hom_alt": self.hom_alt
},
"transitions": self.transitions,
"transversions": self.transversions,
"ti_tv_ratio": self.ti_tv,
"gq_distribution": self.gq_distr
}
class Stats(object):
def __init__(self, vcf_path):
self.path = vcf_path
self.vcf = cyvcf2.VCF(vcf_path, gts012=True)
self.samples = [Sample(x, i) for i, x in enumerate(self.vcf.samples)]
self.chrom_counter = gen_chrom_counter(self.vcf)
self.__calculated = False
def calculate(self):
for record in tqdm(self.vcf, unit="variants", unit_scale=True):
for s in self.samples:
s.add_variant(record)
self.chrom_counter[record.CHROM] += 1
self.__calculated = True
@property
def total_variants(self):
return sum(self.chrom_counter.values())
@property
def as_dict(self):
if not self.__calculated:
self.calculate()
return {
"vcf_path": self.path,
"total_variants": self.total_variants,
"samples": [s.as_dict for s in self.samples],
"per_chromosome_variants": {k: v for k, v in self.chrom_counter.items()}
}
@property
def as_json(self):
return json.dumps(self.as_dict, sort_keys=True, indent=4)
@click.command()
@click.option("-i",
"--input",
type=click.Path(exists=True, dir_okay=False, readable=True),
required=True,
help="Input VCF file")
def main(input):
stats = Stats(input)
print(stats.as_json)
if __name__ == "__main__":
main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment