gcoverage.py 6 KB
Newer Older
Sander Bollen's avatar
Sander Bollen committed
1 2 3 4 5 6 7 8 9
"""
vtools.gcoverage
~~~~~~~~~~~~~~~~

:copyright: (c) 2018 Sander Bollen
:copyright: (c) 2018 Leiden University Medical Center
:license: MIT
"""

Sander Bollen's avatar
Sander Bollen committed
10 11 12 13 14 15 16 17
import cyvcf2
import numpy as np

from collections import namedtuple
from itertools import chain

from typing import List, Optional, Tuple

18 19
from .optimized import amount_atleast

Sander Bollen's avatar
Sander Bollen committed
20 21 22 23

Region = namedtuple("Region", ["chr", "start", "end"])


24 25 26 27 28 29 30 31
def coverage_for_gvcf_record(record: cyvcf2.Variant, maxlen: int = 15000) -> List[int]:
    """
    Get coverage for gvcf record per base

    Some records may be huge, especially those around centromeres.
    Therefore, there is a maxlen argument. Maximally `maxlen` values
    are returned.
    """
Sander Bollen's avatar
Sander Bollen committed
32 33
    start = record.start
    end = record.end
34
    size = end - start
Sander Bollen's avatar
Sander Bollen committed
35 36 37 38
    try:
        dp = record.format("DP")[0][0]
    except TypeError:
        dp = 0
39 40 41 42
    if size < maxlen:
        return [dp]*(end-start)
    else:
        return [dp]*maxlen
Sander Bollen's avatar
Sander Bollen committed
43 44


45 46 47 48 49 50
def gq_for_gvcf_record(record: cyvcf2.Variant, maxlen: int = 15000) -> List[int]:
    """
    Some records may be huge, especially those around centromeres.
    Therefore, there is a maxlen argument. Maximally `maxlen` values
    are returned.
    """
Sander Bollen's avatar
Sander Bollen committed
51 52
    start = record.start
    end = record.end
53
    size = end - start
Sander Bollen's avatar
Sander Bollen committed
54
    gq = record.gt_quals[0]
55 56 57 58
    if size < maxlen:
        return [gq]*(end-start)
    else:
        return [gq]*maxlen
Sander Bollen's avatar
Sander Bollen committed
59 60


61 62 63 64 65 66 67
def qualmean(quals: np.ndarray) -> float:
    """
    Credit:
    https://gigabaseorgigabyte.wordpress.com/2017/06/26/averaging-basecall-quality-scores-the-right-way/
    https://git.lumc.nl/klinische-genetica/capture-lumc/vtools/issues/3
    """
    return -10*np.log10(np.mean(np.power(10, quals/-10)))
Mark Santcroos's avatar
Mark Santcroos committed
68 69


Sander Bollen's avatar
Sander Bollen committed
70 71 72 73 74 75 76
class CovStats(object):
    def __init__(self, records):
        self.records = records
        self.__coverages = None
        self.__gq_qualities = None

    @property
77
    def coverages(self) -> np.ndarray:
Sander Bollen's avatar
Sander Bollen committed
78
        if self.__coverages is None:
Sander Bollen's avatar
Sander Bollen committed
79
            self.__coverages = np.fromiter(
Sander Bollen's avatar
Sander Bollen committed
80 81
                chain.from_iterable(
                    (coverage_for_gvcf_record(x) for x in self.records)
Sander Bollen's avatar
Sander Bollen committed
82 83
                ),
                dtype=int
Sander Bollen's avatar
Sander Bollen committed
84 85 86 87
            )
        return self.__coverages

    @property
88
    def gq_qualities(self) -> np.ndarray:
Sander Bollen's avatar
Sander Bollen committed
89
        if self.__gq_qualities is None:
Sander Bollen's avatar
Sander Bollen committed
90
            self.__gq_qualities = np.fromiter(
Sander Bollen's avatar
Sander Bollen committed
91 92
                chain.from_iterable(
                    (gq_for_gvcf_record(x) for x in self.records)
Sander Bollen's avatar
Sander Bollen committed
93 94
                ),
                dtype=int
Sander Bollen's avatar
Sander Bollen committed
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
            )
        return self.__gq_qualities

    @property
    def median_cov(self) -> float:
        return np.median(self.coverages)

    @property
    def mean_cov(self) -> float:
        return np.mean(self.coverages)

    @property
    def median_gq(self) -> float:
        return np.median(self.gq_qualities)

    @property
    def mean_gq(self) -> float:
Mark Santcroos's avatar
Mark Santcroos committed
112
        return qualmean(self.gq_qualities)
Sander Bollen's avatar
Sander Bollen committed
113 114

    def percent_atleast_dp(self, atleast) -> Optional[float]:
115 116 117 118
        if len(self.coverages) == 0:
            return None
        k = amount_atleast(self.coverages, atleast)
        return (k/len(self.coverages))*100
Sander Bollen's avatar
Sander Bollen committed
119 120

    def percent_atleast_gq(self, atleast) -> Optional[float]:
121 122 123 124
        if len(self.gq_qualities) == 0:
            return None
        k = amount_atleast(self.gq_qualities, atleast)
        return (k/len(self.gq_qualities))*100
Sander Bollen's avatar
Sander Bollen committed
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208

    @property
    def stats(self) -> dict:
        s = {
            "median_dp": self.median_cov,
            "mean_dp": self.mean_cov,
            "median_gq": self.median_gq,
            "mean_gq": self.mean_gq
        }
        for perc in (10, 20, 30, 50, 100):
            key = "perc_at_least_{0}_dp".format(perc)
            s[key] = self.percent_atleast_dp(perc)

        for perc in (10, 20, 30, 50, 90):
            key = "perc_at_least_{0}_gq".format(perc)
            s[key] = self.percent_atleast_gq(perc)
        return s


class RefRecord(object):
    def __init__(self, line: str):
        self.line = line  # type: str
        self.gene = None  # type: Optional[str]
        self.transcript = None  # type: Optional[str]
        self.contig = None  # type: Optional[str]
        self.start = None  # type: Optional[int]
        self.end = None  # type: Optional[int]
        self.cds_start = None  # type: Optional[int]
        self.cds_end = None  # type: Optional[int]
        self.exon_starts = []  # type: List[int]
        self.exon_ends = []  # type: List[int]
        self.forward = True

        self.parse()

    def parse(self):
        contents = self.line.strip().split("\t")
        if len(contents) < 11:
            raise ValueError("refFlat line must have at least 11 fields")
        self.gene = contents[0]
        self.transcript = contents[1]
        self.contig = contents[2]
        if "-" in contents[3].strip():
            self.forward = False
        self.start = int(contents[4])
        self.end = int(contents[5])
        self.cds_start = int(contents[6])
        self.cds_end = int(contents[7])
        self.exon_starts = [int(x) for x in contents[9].split(",")[:-1]]
        self.exon_ends = [int(x) for x in contents[10].split(",")[:-1]]

    @property
    def exons(self) -> List[Region]:
        regs = []
        for s, e in zip(self.exon_starts, self.exon_ends):
            regs.append(Region(self.contig, s, e))
        return regs

    @property
    def cds_exons(self) -> List[Tuple[int, Region]]:
        regs = []
        for i, (s, e) in enumerate(zip(self.exon_starts, self.exon_ends)):
            if s < self.cds_start:
                s = self.cds_start
            if e > self.cds_end:
                e = self.cds_end
            reg = Region(self.contig, s, e)
            if reg.end <= reg.start:   # utr exons
                continue
            regs.append((i, reg))
        return regs


def region_coverages(reader: cyvcf2.VCF, regions: List[Region]) -> Optional[dict]:
    records = []
    for region in regions:
        reg_str = "{0}:{1}-{2}".format(region.chr, region.start, region.end)
        it = reader(reg_str)
        records += list(it)

    if len(records) == 0:
        return CovStats([]).stats

    return CovStats(records).stats