gcoverage.py 5.47 KB
Newer Older
Sander Bollen's avatar
Sander Bollen committed
1 2 3 4 5 6 7 8 9
"""
vtools.gcoverage
~~~~~~~~~~~~~~~~

:copyright: (c) 2018 Sander Bollen
:copyright: (c) 2018 Leiden University Medical Center
:license: MIT
"""

Sander Bollen's avatar
Sander Bollen committed
10 11 12 13 14 15
import cyvcf2
import numpy as np

from collections import namedtuple
from itertools import chain

Sander Bollen's avatar
Sander Bollen committed
16
from typing import List, Optional, Tuple, NamedTuple
Sander Bollen's avatar
Sander Bollen committed
17

18 19
from .optimized import amount_atleast

Sander Bollen's avatar
Sander Bollen committed
20 21 22 23

Region = namedtuple("Region", ["chr", "start", "end"])


24 25 26 27 28 29 30 31
def coverage_for_gvcf_record(record: cyvcf2.Variant, maxlen: int = 15000) -> List[int]:
    """
    Get coverage for gvcf record per base

    Some records may be huge, especially those around centromeres.
    Therefore, there is a maxlen argument. Maximally `maxlen` values
    are returned.
    """
Sander Bollen's avatar
Sander Bollen committed
32 33
    start = record.start
    end = record.end
34
    size = end - start
Sander Bollen's avatar
Sander Bollen committed
35 36 37 38
    try:
        dp = record.format("DP")[0][0]
    except TypeError:
        dp = 0
39 40 41 42
    if size < maxlen:
        return [dp]*(end-start)
    else:
        return [dp]*maxlen
Sander Bollen's avatar
Sander Bollen committed
43 44


45 46 47 48 49 50
def gq_for_gvcf_record(record: cyvcf2.Variant, maxlen: int = 15000) -> List[int]:
    """
    Some records may be huge, especially those around centromeres.
    Therefore, there is a maxlen argument. Maximally `maxlen` values
    are returned.
    """
Sander Bollen's avatar
Sander Bollen committed
51 52
    start = record.start
    end = record.end
53
    size = end - start
Sander Bollen's avatar
Sander Bollen committed
54
    gq = record.gt_quals[0]
55 56 57 58
    if size < maxlen:
        return [gq]*(end-start)
    else:
        return [gq]*maxlen
Sander Bollen's avatar
Sander Bollen committed
59 60 61 62 63 64 65 66 67 68 69


class CovStats(object):
    def __init__(self, records):
        self.records = records
        self.__coverages = None
        self.__gq_qualities = None

    @property
    def coverages(self) -> List[int]:
        if self.__coverages is None:
Sander Bollen's avatar
Sander Bollen committed
70
            self.__coverages = np.fromiter(
Sander Bollen's avatar
Sander Bollen committed
71 72
                chain.from_iterable(
                    (coverage_for_gvcf_record(x) for x in self.records)
Sander Bollen's avatar
Sander Bollen committed
73 74
                ),
                dtype=int
Sander Bollen's avatar
Sander Bollen committed
75 76 77 78 79 80
            )
        return self.__coverages

    @property
    def gq_qualities(self) -> List[int]:
        if self.__gq_qualities is None:
Sander Bollen's avatar
Sander Bollen committed
81
            self.__gq_qualities = np.fromiter(
Sander Bollen's avatar
Sander Bollen committed
82 83
                chain.from_iterable(
                    (gq_for_gvcf_record(x) for x in self.records)
Sander Bollen's avatar
Sander Bollen committed
84 85
                ),
                dtype=int
Sander Bollen's avatar
Sander Bollen committed
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
            )
        return self.__gq_qualities

    @property
    def median_cov(self) -> float:
        return np.median(self.coverages)

    @property
    def mean_cov(self) -> float:
        return np.mean(self.coverages)

    @property
    def median_gq(self) -> float:
        return np.median(self.gq_qualities)

    @property
    def mean_gq(self) -> float:
        return np.mean(self.gq_qualities)

    def percent_atleast_dp(self, atleast) -> Optional[float]:
106 107 108 109
        if len(self.coverages) == 0:
            return None
        k = amount_atleast(self.coverages, atleast)
        return (k/len(self.coverages))*100
Sander Bollen's avatar
Sander Bollen committed
110 111

    def percent_atleast_gq(self, atleast) -> Optional[float]:
112 113 114 115
        if len(self.gq_qualities) == 0:
            return None
        k = amount_atleast(self.gq_qualities, atleast)
        return (k/len(self.gq_qualities))*100
Sander Bollen's avatar
Sander Bollen committed
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134

    @property
    def stats(self) -> dict:
        s = {
            "median_dp": self.median_cov,
            "mean_dp": self.mean_cov,
            "median_gq": self.median_gq,
            "mean_gq": self.mean_gq
        }
        for perc in (10, 20, 30, 50, 100):
            key = "perc_at_least_{0}_dp".format(perc)
            s[key] = self.percent_atleast_dp(perc)

        for perc in (10, 20, 30, 50, 90):
            key = "perc_at_least_{0}_gq".format(perc)
            s[key] = self.percent_atleast_gq(perc)
        return s


Sander Bollen's avatar
Sander Bollen committed
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149
class RefRecord(NamedTuple):
    gene: str
    transcript: str
    contig: str
    start: int
    end: int
    cds_start: int
    cds_end: int
    exon_starts: List[int]
    exon_ends: List[int]
    forward: bool

    @classmethod
    def from_line(cls, line):
        contents = line.strip().split("\t")
Sander Bollen's avatar
Sander Bollen committed
150 151
        if len(contents) < 11:
            raise ValueError("refFlat line must have at least 11 fields")
Sander Bollen's avatar
Sander Bollen committed
152 153 154
        gene = contents[0]
        transcript = contents[1]
        contig = contents[2]
Sander Bollen's avatar
Sander Bollen committed
155
        if "-" in contents[3].strip():
Sander Bollen's avatar
Sander Bollen committed
156 157 158 159 160 161 162 163 164 165 166
            forward = False
        else:
            forward = True
        start = int(contents[4])
        end = int(contents[5])
        cds_start = int(contents[6])
        cds_end = int(contents[7])
        exon_starts = [int(x) for x in contents[9].split(",")[:-1]]
        exon_ends = [int(x) for x in contents[10].split(",")[:-1]]
        return cls(gene, transcript, contig, start, end, cds_start,
                   cds_end, exon_starts, exon_ends, forward)
Sander Bollen's avatar
Sander Bollen committed
167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200

    @property
    def exons(self) -> List[Region]:
        regs = []
        for s, e in zip(self.exon_starts, self.exon_ends):
            regs.append(Region(self.contig, s, e))
        return regs

    @property
    def cds_exons(self) -> List[Tuple[int, Region]]:
        regs = []
        for i, (s, e) in enumerate(zip(self.exon_starts, self.exon_ends)):
            if s < self.cds_start:
                s = self.cds_start
            if e > self.cds_end:
                e = self.cds_end
            reg = Region(self.contig, s, e)
            if reg.end <= reg.start:   # utr exons
                continue
            regs.append((i, reg))
        return regs


def region_coverages(reader: cyvcf2.VCF, regions: List[Region]) -> Optional[dict]:
    records = []
    for region in regions:
        reg_str = "{0}:{1}-{2}".format(region.chr, region.start, region.end)
        it = reader(reg_str)
        records += list(it)

    if len(records) == 0:
        return CovStats([]).stats

    return CovStats(records).stats