describe.py 21.2 KB
Newer Older
1
2
3
4
5
6
"""
Generate a HGVS description of the variant(s) leading from one sequence to an
other.
"""


Vermaat's avatar
Vermaat committed
7
from __future__ import (absolute_import, division, print_function,
8
    unicode_literals)
9
10
11

import math

12
from Bio.Seq import reverse_complement
13

14
15
16
from .variant import (ISeq, AISeq, ISeqList, AISeqList, DNAVar, ProteinVar,
    Allele, ProteinAllele, FS)
from . import extractor, util
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86


def roll(s, first, last):
    """
    Determine the variability of a variant by looking at cyclic
    permutations. Not all cyclic permutations are tested at each time, it
    is sufficient to check ``aW'' if ``Wa'' matches (with ``a'' a letter,
    ``W'' a word) when rolling to the left for example.

        >>> roll('abbabbabbabb', 4, 6)
        (3, 6)
        >>> roll('abbabbabbabb', 5, 5)
        (0, 1)
        >>> roll('abcccccde', 4, 4)
        (1, 3)

    @arg s: A reference sequence.
    @type s: any sequence type
    @arg first: First position of the pattern in the reference sequence.
    @type first: int
    @arg last: Last position of the pattern in the reference sequence.
    @type last: int

    @return: tuple:
        - left  ; Amount of positions that the pattern can be shifted to
                  the left.
        - right ; Amount of positions that the pattern can be shifted to
                  the right.
    @rtype: tuple(int, int)
    """
    pattern = s[first - 1:last]   # Extract the pattern
    pattern_length = len(pattern)

    # Keep rolling to the left as long as a cyclic permutation matches.
    minimum = first - 2
    j = pattern_length - 1
    while minimum > -1 and s[minimum] == pattern[j % pattern_length]:
        j -= 1
        minimum -= 1

    # Keep rolling to the right as long as a cyclic permutation matches.
    maximum = last
    j = 0
    while maximum < len(s) and s[maximum] == pattern[j % pattern_length]:
        j += 1
        maximum += 1

    return first - minimum - 2, maximum - last


def palinsnoop(s):
    """
    Check a sequence for a reverse-complement-palindromic prefix (and
    suffix). If one is detected, return the length of this prefix. If the
    string equals its reverse complement, return -1.

        >>> palinsnoop('TACGCTA')
        2
        >>> palinsnoop('TACGTA')
        -1
        >>> palinsnoop('TACGCTT')
        0

    @arg s: A nucleotide sequence.
    @type s: unicode

    @return: The number of elements that are palindromic or -1 if the string
             is a 'palindrome'.
    @rtype: int
    """
87
    s_revcomp = reverse_complement(str(s)) # FIXME str inserted.
88
89
90
91
92
93
94
95
96
97

    for i in range(int(math.ceil(len(s) / 2.0))):
        if s[i] != s_revcomp[i]:
            # The first i elements are 'palindromic'.
            return i

    # Perfect 'palindrome'.
    return -1


98
def var_to_dna_var(s1, s2, var, seq_list=[], weight_position=1):
99
    """
100
    Convert a variant from the extractor module to a DNAVar.
101
102
103
104
105

    :arg unicode s1: Reference sequence.
    :arg unicode s2: Sample sequence.
    :arg str var: Variant from the extractor module.
    :arg str seq_list: Container for an inserted sequence.
jkvis's avatar
jkvis committed
106
    :arg int weight_position: Weight of a position.
107
108
109
    """
    # Unknown.
    if s1 == '?' or s2 == '?':
110
        return [DNAVar(type='unknown', weight_position=weight_position)]
111
112
113
114
115
116
117
118
119
120
121
122

    # Insertion / Duplication.
    if var.reference_start == var.reference_end:
        ins_length = var.sample_end - var.sample_start
        shift5, shift3 = roll(s2, var.sample_start + 1, var.sample_end)
        shift = shift5 + shift3

        var.reference_start += shift3
        var.reference_end += shift3
        var.sample_start += shift3
        var.sample_end += shift3

jkvis's avatar
jkvis committed
123

124
        if (var.sample_start - ins_length >= 0 and
jkvis's avatar
jkvis committed
125
126
                '$' not in s1[var.reference_start - ins_length:var.reference_start] and
                '$' not in s2[var.sample_start:var.sample_end] and
127
128
129
130
                s1[var.reference_start - ins_length:var.reference_start] ==
                s2[var.sample_start:var.sample_end]):
            # NOTE: We may want to omit the inserted / deleted sequence and
            # use the ranges instead.
131
            return DNAVar(start=var.reference_start - ins_length + 1,
132
133
134
135
136
137
138
                end=var.reference_end, type='dup', shift=shift,
                sample_start=var.sample_start + 1, sample_end=var.sample_end,
                inserted=ISeqList([ISeq(sequence=s2[
                var.sample_start:var.sample_end],
                    weight_position=weight_position)]),
                weight_position=weight_position)

139
        return DNAVar(start=var.reference_start,
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
            end=var.reference_start + 1,
            inserted=seq_list or
            ISeqList([ISeq(sequence=s2[var.sample_start:var.sample_end],
                weight_position=weight_position)]),
            type='ins', shift=shift, sample_start=var.sample_start + 1,
            sample_end=var.sample_end, weight_position=weight_position)

    # Deletion.
    if var.sample_start == var.sample_end:
        shift5, shift3 = roll(s1, var.reference_start + 1, var.reference_end)
        shift = shift5 + shift3

        var.reference_start += shift3
        var.reference_end += shift3

155
        return DNAVar(start=var.reference_start + 1,
156
157
158
159
160
161
162
163
164
165
            end=var.reference_end, type='del', shift=shift,
            sample_start=var.sample_start, sample_end=var.sample_end + 1,
            deleted=ISeqList([ISeq(sequence=s1[
                var.reference_start:var.reference_end],
                weight_position=weight_position)]),
            weight_position=weight_position)

    # Substitution.
    if (var.reference_start + 1 == var.reference_end and
            var.sample_start + 1 == var.sample_end):
166
        return DNAVar(start=var.reference_start + 1,
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
            end=var.reference_end, sample_start=var.sample_start + 1,
            sample_end=var.sample_end, type='subst',
            deleted=ISeqList([ISeq(sequence=s1[var.reference_start],
                weight_position=weight_position)]),
            inserted=ISeqList([ISeq(sequence=s2[var.sample_start],
                weight_position=weight_position)]),
            weight_position=weight_position)

    # Inversion.
    if var.type & extractor.REVERSE_COMPLEMENT:
        trim = palinsnoop(s1[var.reference_start:var.reference_end])

        if trim > 0: # Partial palindrome.
            var.reference_end -= trim
            var.sample_end -= trim

183
        return DNAVar(start=var.reference_start + 1,
184
185
186
187
188
189
            end=var.reference_end, type='inv',
            sample_start=var.sample_start + 1, sample_end=var.sample_end,
            deleted=ISeqList([ISeq(sequence=s1[
                var.reference_start:var.reference_end],
                weight_position=weight_position)]),
            inserted=ISeqList([ISeq(sequence=s2[
190
                var.sample_start:var.sample_end],
191
192
193
194
                weight_position=weight_position)]),
            weight_position=weight_position)

    # InDel.
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
    return DNAVar(start=var.reference_start + 1,
        end=var.reference_end, deleted=ISeqList([ISeq(sequence=s1[
                var.reference_start:var.reference_end],
                weight_position=weight_position)]),
        inserted=seq_list or
        ISeqList([ISeq(sequence=s2[var.sample_start:var.sample_end],
            weight_position=weight_position)]),
        type='delins', sample_start=var.sample_start + 1,
        sample_end=var.sample_end, weight_position=weight_position)


def var_to_protein_var(s1, s2, var, seq_list=[], weight_position=1):
    """
    Convert a variant from the extractor module to a ProteinVar.

    :arg unicode s1: Reference sequence.
    :arg unicode s2: Sample sequence.
    :arg str var: Variant from the extractor module.
    :arg str seq_list: Container for an inserted sequence.
    :arg str weight_position: Weight of a position.
    """
    # Unknown.
    if s1 == '?' or s2 == '?':
        return [ProteinVar(type='unknown', weight_position=weight_position)]

    # Insertion / Duplication.
    if var.reference_start == var.reference_end:
        ins_length = var.sample_end - var.sample_start
        shift5, shift3 = roll(s2, var.sample_start + 1, var.sample_end)
        shift = shift5 + shift3

        var.reference_start += shift3
        var.reference_end += shift3
        var.sample_start += shift3
        var.sample_end += shift3

        if (var.sample_start - ins_length >= 0 and
                s1[var.reference_start - ins_length:var.reference_start] ==
                s2[var.sample_start:var.sample_end]):
            # NOTE: We may want to omit the inserted / deleted sequence and
            # use the ranges instead.
            return ProteinVar(s1=s1, s2=s2,
                start=var.reference_start - ins_length + 1,
                end=var.reference_end, type='dup', shift=shift,
                sample_start=var.sample_start + 1, sample_end=var.sample_end,
240
                inserted=AISeqList([AISeq(sequence=s2[
241
242
243
244
245
246
247
                var.sample_start:var.sample_end],
                    weight_position=weight_position)]),
                weight_position=weight_position)

        return ProteinVar(s1=s1, s2=s2, start=var.reference_start,
            end=var.reference_start + 1,
            inserted=seq_list or
248
            AISeqList([AISeq(sequence=s2[var.sample_start:var.sample_end],
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
                weight_position=weight_position)]),
            type='ins', shift=shift, sample_start=var.sample_start + 1,
            sample_end=var.sample_end, weight_position=weight_position)

    # Deletion.
    if var.sample_start == var.sample_end:
        shift5, shift3 = roll(s1, var.reference_start + 1, var.reference_end)
        shift = shift5 + shift3

        var.reference_start += shift3
        var.reference_end += shift3

        return ProteinVar(s1=s1, s2=s2, start=var.reference_start + 1,
            end=var.reference_end, type='del', shift=shift,
            sample_start=var.sample_start, sample_end=var.sample_end + 1,
264
            deleted=AISeqList([AISeq(sequence=s1[
265
266
267
268
269
270
271
272
273
274
                var.reference_start:var.reference_end],
                weight_position=weight_position)]),
            weight_position=weight_position)

    # Substitution.
    if (var.reference_start + 1 == var.reference_end and
            var.sample_start + 1 == var.sample_end):
        return ProteinVar(s1=s1, s2=s2, start=var.reference_start + 1,
            end=var.reference_end, sample_start=var.sample_start + 1,
            sample_end=var.sample_end, type='subst',
275
            deleted=AISeqList([AISeq(sequence=s1[var.reference_start],
276
                weight_position=weight_position)]),
277
            inserted=AISeqList([AISeq(sequence=s2[var.sample_start],
278
279
280
281
282
                weight_position=weight_position)]),
            weight_position=weight_position)

    # InDel.
    return ProteinVar(s1=s1, s2=s2, start=var.reference_start + 1,
283
        end=var.reference_end, deleted=AISeqList([AISeq(sequence=s1[
284
285
286
                var.reference_start:var.reference_end],
                weight_position=weight_position)]),
        inserted=seq_list or
287
        AISeqList([AISeq(sequence=s2[var.sample_start:var.sample_end],
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
            weight_position=weight_position)]),
        type='delins', sample_start=var.sample_start + 1,
        sample_end=var.sample_end, weight_position=weight_position)


def describe_dna(s1, s2):
    """
    Give an allele description of the change from {s1} to {s2}.

    :arg unicode s1: Sequence 1.
    :arg unicode s2: Sequence 2.

    :returns list(RawVar): A list of RawVar objects, representing the allele.
    """
    description = Allele()
    in_transposition = 0

Vermaat's avatar
Vermaat committed
305
306
307
    s1_swig = util.swig_str(s1)
    s2_swig = util.swig_str(s2)
    extracted = extractor.extract(s1_swig[0], s1_swig[1],
308
                                  s2_swig[0], s2_swig[1], extractor.TYPE_DNA)
Vermaat's avatar
Vermaat committed
309

310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
    for variant in extracted.variants:
        if variant.type & extractor.TRANSPOSITION_OPEN:
            if not in_transposition:
                seq_list = ISeqList()
            in_transposition += 1

        if in_transposition:
            if variant.type & extractor.IDENTITY:
                seq_list.append(ISeq(start=variant.transposition_start + 1,
                    end=variant.transposition_end, reverse=False,
                    weight_position=extracted.weight_position))
            elif variant.type & extractor.REVERSE_COMPLEMENT:
                seq_list.append(ISeq(start=variant.transposition_start + 1,
                    end=variant.transposition_end, reverse=True,
                    weight_position=extracted.weight_position))
            else:
                seq_list.append(ISeq(
                    sequence=s2[variant.sample_start:variant.sample_end],
                    weight_position=extracted.weight_position))
        elif not (variant.type & extractor.IDENTITY):
330
            description.append(var_to_dna_var(s1, s2, variant,
331
332
333
334
335
336
                weight_position=extracted.weight_position))

        if variant.type & extractor.TRANSPOSITION_CLOSE:
            in_transposition -= 1

            if not in_transposition:
337
                description.append(var_to_dna_var(s1, s2, variant, seq_list,
338
339
340
341
342
343
344
                    weight_position=extracted.weight_position))

    if not description:
        return Allele([DNAVar()])
    return description


jkvis's avatar
jkvis committed
345
346
347

def mask_string(string, units):
    MASK = '$'
348
    repeats = []
jkvis's avatar
jkvis committed
349
350
    for unit in units:
        found = string.find(unit)
351
352
353
354
355
356
357
358
359
        while found != -1:
            last = len(repeats) - 1
            if last > -1 and repeats[last]['start'] + repeats[last]['count'] * len(unit) == found:
                repeats[last]['count'] += 1
            else:
                repeats.append({'start': found, 'count': 1, 'unit': unit})
            string = string[:found] + MASK * len(unit) + string[found + len(unit):]
            found = string.find(unit, found + len(unit))
    return string, sorted(repeats, key=lambda repeat: repeat['start'])
jkvis's avatar
jkvis committed
360
361

def describe_repeats(reference, sample, units):
362
    MASK = '$'
jkvis's avatar
jkvis committed
363
    masked_ref, _ = mask_string(reference, units)
364
    masked_alt, repeats = mask_string(sample, units)
jkvis's avatar
jkvis committed
365

366
367
368
369
    reference_start = max(0, masked_ref.find(MASK))
    reference_end = min(len(masked_ref), masked_ref.rfind(MASK)) + 1
    sample_start = max(0, masked_alt.find(MASK))
    sample_end = min(len(masked_alt), masked_alt.rfind(MASK)) + 1
jkvis's avatar
jkvis committed
370

371
372
373
374
375
376
377
378
    description = Allele()
    prefix = describe_dna(reference[:reference_start], sample[:sample_start])
    for variant in prefix:
        if variant.type != 'none':
            description.append(variant)

    ref_swig = util.swig_str(masked_ref[reference_start:reference_end])
    alt_swig = util.swig_str(masked_alt[sample_start:sample_end])
jkvis's avatar
jkvis committed
379
380
381
    extracted = extractor.extract(ref_swig[0], ref_swig[1],
                                  alt_swig[0], alt_swig[1], extractor.TYPE_DNA)

jkvis's avatar
jkvis committed
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
    variant_list = []
    for variant in extracted.variants:
        replaced = []
        i = variant.sample_start
        start = i
        while i < variant.sample_end:
            while i < variant.sample_end and masked_alt[i + sample_start] != MASK:
                i += 1
            if i < variant.sample_end:
                split = extractor.Variant()
                split.reference_start = variant.reference_start
                split.reference_end = variant.reference_end
                split.sample_start = start
                split.sample_end = i
                split.type = variant.type
                split.transposition_start = variant.transposition_start
                split.transposition_end = variant.transposition_end
                replaced.append(split)
                while i < variant.sample_end and masked_alt[i + sample_start] == MASK:
                    i += 1
                start = i

        if len(replaced) > 0:
            split = extractor.Variant()
            split.reference_start = variant.reference_start
            split.reference_end = variant.reference_end
            split.sample_start = start
            split.sample_end = variant.sample_end
            split.type = variant.type
            split.transposition_start = variant.transposition_start
            split.transposition_end = variant.transposition_end
            replaced.append(split)
            variant_list += replaced
        else:
            variant_list.append(variant)

jkvis's avatar
jkvis committed
418
419
420
421
    in_transposition = 0
    index = 0
    repeat = 0
    seq_list = ISeqList()
jkvis's avatar
jkvis committed
422
    for variant in variant_list:
423
        while variant.sample_start > index and repeat < len(repeats):
jkvis's avatar
jkvis committed
424
            seq_list.append(DNAVar(type='repeat',inserted=repeats[repeat]['unit'],count=repeats[repeat]['count']))
jkvis's avatar
jkvis committed
425
            index = repeats[repeat]['start'] + repeats[repeat]['count'] * len(repeats[repeat]['unit'])
jkvis's avatar
jkvis committed
426
427
428
429
430
431
432
            repeat += 1

        if variant.type & extractor.TRANSPOSITION_OPEN:
            in_transposition += 1

        if in_transposition:
            if variant.type & extractor.IDENTITY:
jkvis's avatar
jkvis committed
433
434
                seq_list.append(ISeq(start=variant.transposition_start + 1 + reference_start,
                    end=variant.transposition_end + reference_start, reverse=False,
jkvis's avatar
jkvis committed
435
436
                    weight_position=extracted.weight_position))
            elif variant.type & extractor.REVERSE_COMPLEMENT:
jkvis's avatar
jkvis committed
437
438
                seq_list.append(ISeq(start=variant.transposition_start + 1 + reference_start,
                    end=variant.transposition_end + reference_start, reverse=True,
jkvis's avatar
jkvis committed
439
                    weight_position=extracted.weight_position))
jkvis's avatar
jkvis committed
440
            else: #bases insertion
jkvis's avatar
jkvis committed
441
                seq_list.append(ISeq(
442
                    sequence=sample[variant.sample_start + sample_start:variant.sample_end + sample_start],
jkvis's avatar
jkvis committed
443
444
                    weight_position=extracted.weight_position))
        elif variant.type & extractor.IDENTITY:
jkvis's avatar
jkvis committed
445
446
            seq_list.append(ISeq(start=variant.reference_start + 1 + reference_start,
                end=variant.reference_end + reference_start, reverse=False,weight_position=extracted.weight_position))
jkvis's avatar
jkvis committed
447
        elif variant.type & extractor.REVERSE_COMPLEMENT:
jkvis's avatar
jkvis committed
448
449
450
            seq_list.append(ISeq(start=variant.reference_start + 1 + reference_start,
                end=variant.reference_end + reference_start, reverse=True,weight_position=extracted.weight_position))
        else: #bases insertion
451
            seq_list.append(ISeq(sequence=sample[variant.sample_start + sample_start:variant.sample_end + sample_start],
jkvis's avatar
jkvis committed
452
453
454
455
456
457
458
                weight_position=extracted.weight_position))

        if variant.type & extractor.TRANSPOSITION_CLOSE:
            in_transposition -= 1

        index = variant.sample_end

459
460
461
462
    while repeat < len(repeats):
        seq_list.append(DNAVar(type='repeat',inserted=repeats[repeat]['unit'],count=repeats[repeat]['count']))
        repeat += 1

463
464
465
466
467
468
469
470
471
472
473
    if len(variant_list) > 0 or len(repeats) > 0:
        description.append(DNAVar(start=reference_start + 1,end=reference_end,sample_start=sample_start,sample_end=sample_end,type='delins',inserted=seq_list))

        suffix = describe_dna(reference[reference_end:], sample[sample_end:])
        for variant in suffix:
            if variant.type != 'none':
                variant.start += reference_end
                variant.end += reference_end
                description.append(variant)
    else:
        description = prefix
474

475
    return description
jkvis's avatar
jkvis committed
476
477


478
def print_var(variant):
479
    print('({:3}, {:3}), ({:3}, {:3}), {:08b}, {}, {}'.format(variant.reference_start,
480
        variant.reference_end, variant.sample_start, variant.sample_end,
481
        variant.type, variant.type, variant.sample_end - variant.sample_start))
482
483
484
485
486
487
488
489
490
491
492
493
494


def get_frames(flags):
    result = []

    for fs in FS:
        if flags & FS[fs]:
            result.append(fs)

    return result


def describe_protein(s1, s2, codon_table=1):
495
496
    """
    """
497
    codons = util.codon_table_string(codon_table)
498
499
500
501
502
503

    description = ProteinAllele()

    s1_swig = util.swig_str(s1)
    s2_swig = util.swig_str(s2)
    codons_swig = util.swig_str(codons)
504

505
506
    extracted = extractor.extract(s1_swig[0], s1_swig[1],
        s2_swig[0], s2_swig[1], extractor.TYPE_PROTEIN, codons_swig[0])
507
508
    variants = extracted.variants

509
510
511
    #for variant in variants:
    #    print_var(variant)
    #print()
512
513
514

    index = 0
    while index < len(variants):
515
        if variants[index].type != extractor.IDENTITY:
516
517
518
            variant = variants[index]
            index += 1
            seq_list = AISeqList()
519
520

            # NOTE: This is for filling.
521
            last_end = variants[index].reference_start
522

523
524
            while (index < len(variants) and
                    variants[index].type & extractor.FRAME_SHIFT):
525

526
                if last_end < variants[index].sample_start:
527
528
                    seq_list.append(AISeq(
                        s2[last_end:variants[index].sample_start]))
529

530
531
                last_end = variants[index].sample_end

532
533
534
535
536
537
538
539
540
541
542
                seq_list.append(AISeq(
                    s2[variants[index].sample_start:
                        variants[index].sample_end],
                    start=variants[index].reference_start + 1,
                    end=variants[index].reference_end,
                    sample_start=variants[index].sample_start + 1,
                    sample_end=variants[index].sample_end,
                    frames=get_frames(variants[index].type)))

                # NOTE: Perhaps use trans_open, trans_close to ...
                index += 1
543

544
            if last_end < variant.sample_end:
545
546
                seq_list.append(AISeq(s2[last_end:variant.sample_end]))

547
            var = var_to_protein_var(s1, s2, variant, seq_list,
548
549
                weight_position=extracted.weight_position)
            description.append(var)
550
551
        else:
            index += 1
552
553

    if not description:
554
555
        return ProteinAllele([ProteinVar()])
    return description