describe.py 21 KB
Newer Older
1
2
3
4
5
6
"""
Generate a HGVS description of the variant(s) leading from one sequence to an
other.
"""


Vermaat's avatar
Vermaat committed
7
from __future__ import (absolute_import, division, print_function,
8
    unicode_literals)
9
10
11

import math

12
from Bio.Seq import reverse_complement
13

14
15
16
from .variant import (ISeq, AISeq, ISeqList, AISeqList, DNAVar, ProteinVar,
    Allele, ProteinAllele, FS)
from . import extractor, util
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86


def roll(s, first, last):
    """
    Determine the variability of a variant by looking at cyclic
    permutations. Not all cyclic permutations are tested at each time, it
    is sufficient to check ``aW'' if ``Wa'' matches (with ``a'' a letter,
    ``W'' a word) when rolling to the left for example.

        >>> roll('abbabbabbabb', 4, 6)
        (3, 6)
        >>> roll('abbabbabbabb', 5, 5)
        (0, 1)
        >>> roll('abcccccde', 4, 4)
        (1, 3)

    @arg s: A reference sequence.
    @type s: any sequence type
    @arg first: First position of the pattern in the reference sequence.
    @type first: int
    @arg last: Last position of the pattern in the reference sequence.
    @type last: int

    @return: tuple:
        - left  ; Amount of positions that the pattern can be shifted to
                  the left.
        - right ; Amount of positions that the pattern can be shifted to
                  the right.
    @rtype: tuple(int, int)
    """
    pattern = s[first - 1:last]   # Extract the pattern
    pattern_length = len(pattern)

    # Keep rolling to the left as long as a cyclic permutation matches.
    minimum = first - 2
    j = pattern_length - 1
    while minimum > -1 and s[minimum] == pattern[j % pattern_length]:
        j -= 1
        minimum -= 1

    # Keep rolling to the right as long as a cyclic permutation matches.
    maximum = last
    j = 0
    while maximum < len(s) and s[maximum] == pattern[j % pattern_length]:
        j += 1
        maximum += 1

    return first - minimum - 2, maximum - last


def palinsnoop(s):
    """
    Check a sequence for a reverse-complement-palindromic prefix (and
    suffix). If one is detected, return the length of this prefix. If the
    string equals its reverse complement, return -1.

        >>> palinsnoop('TACGCTA')
        2
        >>> palinsnoop('TACGTA')
        -1
        >>> palinsnoop('TACGCTT')
        0

    @arg s: A nucleotide sequence.
    @type s: unicode

    @return: The number of elements that are palindromic or -1 if the string
             is a 'palindrome'.
    @rtype: int
    """
87
    s_revcomp = reverse_complement(str(s)) # FIXME str inserted.
88
89
90
91
92
93
94
95
96
97

    for i in range(int(math.ceil(len(s) / 2.0))):
        if s[i] != s_revcomp[i]:
            # The first i elements are 'palindromic'.
            return i

    # Perfect 'palindrome'.
    return -1


98
def var_to_dna_var(s1, s2, var, seq_list=[], weight_position=1):
99
    """
100
    Convert a variant from the extractor module to a DNAVar.
101
102
103
104
105

    :arg unicode s1: Reference sequence.
    :arg unicode s2: Sample sequence.
    :arg str var: Variant from the extractor module.
    :arg str seq_list: Container for an inserted sequence.
jkvis's avatar
jkvis committed
106
    :arg int weight_position: Weight of a position.
107
108
109
    """
    # Unknown.
    if s1 == '?' or s2 == '?':
110
        return [DNAVar(type='unknown', weight_position=weight_position)]
111
112
113
114
115
116
117
118
119
120
121
122

    # Insertion / Duplication.
    if var.reference_start == var.reference_end:
        ins_length = var.sample_end - var.sample_start
        shift5, shift3 = roll(s2, var.sample_start + 1, var.sample_end)
        shift = shift5 + shift3

        var.reference_start += shift3
        var.reference_end += shift3
        var.sample_start += shift3
        var.sample_end += shift3

jkvis's avatar
jkvis committed
123

124
        if (var.sample_start - ins_length >= 0 and
jkvis's avatar
jkvis committed
125
126
                '$' not in s1[var.reference_start - ins_length:var.reference_start] and
                '$' not in s2[var.sample_start:var.sample_end] and
127
128
129
130
                s1[var.reference_start - ins_length:var.reference_start] ==
                s2[var.sample_start:var.sample_end]):
            # NOTE: We may want to omit the inserted / deleted sequence and
            # use the ranges instead.
131
            return DNAVar(start=var.reference_start - ins_length + 1,
132
133
134
135
136
137
138
                end=var.reference_end, type='dup', shift=shift,
                sample_start=var.sample_start + 1, sample_end=var.sample_end,
                inserted=ISeqList([ISeq(sequence=s2[
                var.sample_start:var.sample_end],
                    weight_position=weight_position)]),
                weight_position=weight_position)

139
        return DNAVar(start=var.reference_start,
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
            end=var.reference_start + 1,
            inserted=seq_list or
            ISeqList([ISeq(sequence=s2[var.sample_start:var.sample_end],
                weight_position=weight_position)]),
            type='ins', shift=shift, sample_start=var.sample_start + 1,
            sample_end=var.sample_end, weight_position=weight_position)

    # Deletion.
    if var.sample_start == var.sample_end:
        shift5, shift3 = roll(s1, var.reference_start + 1, var.reference_end)
        shift = shift5 + shift3

        var.reference_start += shift3
        var.reference_end += shift3

155
        return DNAVar(start=var.reference_start + 1,
156
157
158
159
160
161
162
163
164
165
            end=var.reference_end, type='del', shift=shift,
            sample_start=var.sample_start, sample_end=var.sample_end + 1,
            deleted=ISeqList([ISeq(sequence=s1[
                var.reference_start:var.reference_end],
                weight_position=weight_position)]),
            weight_position=weight_position)

    # Substitution.
    if (var.reference_start + 1 == var.reference_end and
            var.sample_start + 1 == var.sample_end):
166
        return DNAVar(start=var.reference_start + 1,
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
            end=var.reference_end, sample_start=var.sample_start + 1,
            sample_end=var.sample_end, type='subst',
            deleted=ISeqList([ISeq(sequence=s1[var.reference_start],
                weight_position=weight_position)]),
            inserted=ISeqList([ISeq(sequence=s2[var.sample_start],
                weight_position=weight_position)]),
            weight_position=weight_position)

    # Inversion.
    if var.type & extractor.REVERSE_COMPLEMENT:
        trim = palinsnoop(s1[var.reference_start:var.reference_end])

        if trim > 0: # Partial palindrome.
            var.reference_end -= trim
            var.sample_end -= trim

183
        return DNAVar(start=var.reference_start + 1,
184
185
186
187
188
189
            end=var.reference_end, type='inv',
            sample_start=var.sample_start + 1, sample_end=var.sample_end,
            deleted=ISeqList([ISeq(sequence=s1[
                var.reference_start:var.reference_end],
                weight_position=weight_position)]),
            inserted=ISeqList([ISeq(sequence=s2[
190
                var.sample_start:var.sample_end],
191
192
193
194
                weight_position=weight_position)]),
            weight_position=weight_position)

    # InDel.
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
    return DNAVar(start=var.reference_start + 1,
        end=var.reference_end, deleted=ISeqList([ISeq(sequence=s1[
                var.reference_start:var.reference_end],
                weight_position=weight_position)]),
        inserted=seq_list or
        ISeqList([ISeq(sequence=s2[var.sample_start:var.sample_end],
            weight_position=weight_position)]),
        type='delins', sample_start=var.sample_start + 1,
        sample_end=var.sample_end, weight_position=weight_position)


def var_to_protein_var(s1, s2, var, seq_list=[], weight_position=1):
    """
    Convert a variant from the extractor module to a ProteinVar.

    :arg unicode s1: Reference sequence.
    :arg unicode s2: Sample sequence.
    :arg str var: Variant from the extractor module.
    :arg str seq_list: Container for an inserted sequence.
    :arg str weight_position: Weight of a position.
    """
    # Unknown.
    if s1 == '?' or s2 == '?':
        return [ProteinVar(type='unknown', weight_position=weight_position)]

    # Insertion / Duplication.
    if var.reference_start == var.reference_end:
        ins_length = var.sample_end - var.sample_start
        shift5, shift3 = roll(s2, var.sample_start + 1, var.sample_end)
        shift = shift5 + shift3

        var.reference_start += shift3
        var.reference_end += shift3
        var.sample_start += shift3
        var.sample_end += shift3

        if (var.sample_start - ins_length >= 0 and
                s1[var.reference_start - ins_length:var.reference_start] ==
                s2[var.sample_start:var.sample_end]):
            # NOTE: We may want to omit the inserted / deleted sequence and
            # use the ranges instead.
            return ProteinVar(s1=s1, s2=s2,
                start=var.reference_start - ins_length + 1,
                end=var.reference_end, type='dup', shift=shift,
                sample_start=var.sample_start + 1, sample_end=var.sample_end,
240
                inserted=AISeqList([AISeq(sequence=s2[
241
242
243
244
245
246
247
                var.sample_start:var.sample_end],
                    weight_position=weight_position)]),
                weight_position=weight_position)

        return ProteinVar(s1=s1, s2=s2, start=var.reference_start,
            end=var.reference_start + 1,
            inserted=seq_list or
248
            AISeqList([AISeq(sequence=s2[var.sample_start:var.sample_end],
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
                weight_position=weight_position)]),
            type='ins', shift=shift, sample_start=var.sample_start + 1,
            sample_end=var.sample_end, weight_position=weight_position)

    # Deletion.
    if var.sample_start == var.sample_end:
        shift5, shift3 = roll(s1, var.reference_start + 1, var.reference_end)
        shift = shift5 + shift3

        var.reference_start += shift3
        var.reference_end += shift3

        return ProteinVar(s1=s1, s2=s2, start=var.reference_start + 1,
            end=var.reference_end, type='del', shift=shift,
            sample_start=var.sample_start, sample_end=var.sample_end + 1,
264
            deleted=AISeqList([AISeq(sequence=s1[
265
266
267
268
269
270
271
272
273
274
                var.reference_start:var.reference_end],
                weight_position=weight_position)]),
            weight_position=weight_position)

    # Substitution.
    if (var.reference_start + 1 == var.reference_end and
            var.sample_start + 1 == var.sample_end):
        return ProteinVar(s1=s1, s2=s2, start=var.reference_start + 1,
            end=var.reference_end, sample_start=var.sample_start + 1,
            sample_end=var.sample_end, type='subst',
275
            deleted=AISeqList([AISeq(sequence=s1[var.reference_start],
276
                weight_position=weight_position)]),
277
            inserted=AISeqList([AISeq(sequence=s2[var.sample_start],
278
279
280
281
282
                weight_position=weight_position)]),
            weight_position=weight_position)

    # InDel.
    return ProteinVar(s1=s1, s2=s2, start=var.reference_start + 1,
283
        end=var.reference_end, deleted=AISeqList([AISeq(sequence=s1[
284
285
286
                var.reference_start:var.reference_end],
                weight_position=weight_position)]),
        inserted=seq_list or
287
        AISeqList([AISeq(sequence=s2[var.sample_start:var.sample_end],
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
            weight_position=weight_position)]),
        type='delins', sample_start=var.sample_start + 1,
        sample_end=var.sample_end, weight_position=weight_position)


def describe_dna(s1, s2):
    """
    Give an allele description of the change from {s1} to {s2}.

    :arg unicode s1: Sequence 1.
    :arg unicode s2: Sequence 2.

    :returns list(RawVar): A list of RawVar objects, representing the allele.
    """
    description = Allele()
    in_transposition = 0

Vermaat's avatar
Vermaat committed
305
306
307
    s1_swig = util.swig_str(s1)
    s2_swig = util.swig_str(s2)
    extracted = extractor.extract(s1_swig[0], s1_swig[1],
308
                                  s2_swig[0], s2_swig[1], extractor.TYPE_DNA)
Vermaat's avatar
Vermaat committed
309

310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
    for variant in extracted.variants:
        if variant.type & extractor.TRANSPOSITION_OPEN:
            if not in_transposition:
                seq_list = ISeqList()
            in_transposition += 1

        if in_transposition:
            if variant.type & extractor.IDENTITY:
                seq_list.append(ISeq(start=variant.transposition_start + 1,
                    end=variant.transposition_end, reverse=False,
                    weight_position=extracted.weight_position))
            elif variant.type & extractor.REVERSE_COMPLEMENT:
                seq_list.append(ISeq(start=variant.transposition_start + 1,
                    end=variant.transposition_end, reverse=True,
                    weight_position=extracted.weight_position))
            else:
                seq_list.append(ISeq(
                    sequence=s2[variant.sample_start:variant.sample_end],
                    weight_position=extracted.weight_position))
        elif not (variant.type & extractor.IDENTITY):
330
            description.append(var_to_dna_var(s1, s2, variant,
331
332
333
334
335
336
                weight_position=extracted.weight_position))

        if variant.type & extractor.TRANSPOSITION_CLOSE:
            in_transposition -= 1

            if not in_transposition:
337
                description.append(var_to_dna_var(s1, s2, variant, seq_list,
338
339
340
341
342
343
344
                    weight_position=extracted.weight_position))

    if not description:
        return Allele([DNAVar()])
    return description


jkvis's avatar
jkvis committed
345
346
347

def mask_string(string, units):
    MASK = '$'
348
    repeats = []
jkvis's avatar
jkvis committed
349
350
    for unit in units:
        found = string.find(unit)
351
352
353
354
355
356
357
358
359
        while found != -1:
            last = len(repeats) - 1
            if last > -1 and repeats[last]['start'] + repeats[last]['count'] * len(unit) == found:
                repeats[last]['count'] += 1
            else:
                repeats.append({'start': found, 'count': 1, 'unit': unit})
            string = string[:found] + MASK * len(unit) + string[found + len(unit):]
            found = string.find(unit, found + len(unit))
    return string, sorted(repeats, key=lambda repeat: repeat['start'])
jkvis's avatar
jkvis committed
360
361

def describe_repeats(reference, sample, units):
362
    MASK = '$'
jkvis's avatar
jkvis committed
363
    masked_ref, _ = mask_string(reference, units)
364
    masked_alt, repeats = mask_string(sample, units)
jkvis's avatar
jkvis committed
365

366
367
368
369
    reference_start = max(0, masked_ref.find(MASK))
    reference_end = min(len(masked_ref), masked_ref.rfind(MASK)) + 1
    sample_start = max(0, masked_alt.find(MASK))
    sample_end = min(len(masked_alt), masked_alt.rfind(MASK)) + 1
jkvis's avatar
jkvis committed
370

371
372
373
374
375
376
377
378
    description = Allele()
    prefix = describe_dna(reference[:reference_start], sample[:sample_start])
    for variant in prefix:
        if variant.type != 'none':
            description.append(variant)

    ref_swig = util.swig_str(masked_ref[reference_start:reference_end])
    alt_swig = util.swig_str(masked_alt[sample_start:sample_end])
jkvis's avatar
jkvis committed
379
380
381
    extracted = extractor.extract(ref_swig[0], ref_swig[1],
                                  alt_swig[0], alt_swig[1], extractor.TYPE_DNA)

jkvis's avatar
jkvis committed
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
    variant_list = []
    for variant in extracted.variants:
        replaced = []
        i = variant.sample_start
        start = i
        while i < variant.sample_end:
            while i < variant.sample_end and masked_alt[i + sample_start] != MASK:
                i += 1
            if i < variant.sample_end:
                split = extractor.Variant()
                split.reference_start = variant.reference_start
                split.reference_end = variant.reference_end
                split.sample_start = start
                split.sample_end = i
                split.type = variant.type
                split.transposition_start = variant.transposition_start
                split.transposition_end = variant.transposition_end
                replaced.append(split)
                while i < variant.sample_end and masked_alt[i + sample_start] == MASK:
                    i += 1
                start = i

        if len(replaced) > 0:
            split = extractor.Variant()
            split.reference_start = variant.reference_start
            split.reference_end = variant.reference_end
            split.sample_start = start
            split.sample_end = variant.sample_end
            split.type = variant.type
            split.transposition_start = variant.transposition_start
            split.transposition_end = variant.transposition_end
            replaced.append(split)
            variant_list += replaced
        else:
            variant_list.append(variant)

jkvis's avatar
jkvis committed
418
419
420
421
    in_transposition = 0
    index = 0
    repeat = 0
    seq_list = ISeqList()
jkvis's avatar
jkvis committed
422
    for variant in variant_list:
423
        while variant.sample_start > index and repeat < len(repeats):
jkvis's avatar
jkvis committed
424
            seq_list.append(DNAVar(type='repeat',inserted=repeats[repeat]['unit'],count=repeats[repeat]['count']))
jkvis's avatar
jkvis committed
425
            index = repeats[repeat]['start'] + repeats[repeat]['count'] * len(repeats[repeat]['unit'])
jkvis's avatar
jkvis committed
426
427
428
429
430
431
432
            repeat += 1

        if variant.type & extractor.TRANSPOSITION_OPEN:
            in_transposition += 1

        if in_transposition:
            if variant.type & extractor.IDENTITY:
jkvis's avatar
jkvis committed
433
434
                seq_list.append(ISeq(start=variant.transposition_start + 1 + reference_start,
                    end=variant.transposition_end + reference_start, reverse=False,
jkvis's avatar
jkvis committed
435
436
                    weight_position=extracted.weight_position))
            elif variant.type & extractor.REVERSE_COMPLEMENT:
jkvis's avatar
jkvis committed
437
438
                seq_list.append(ISeq(start=variant.transposition_start + 1 + reference_start,
                    end=variant.transposition_end + reference_start, reverse=True,
jkvis's avatar
jkvis committed
439
                    weight_position=extracted.weight_position))
jkvis's avatar
jkvis committed
440
            else: #bases insertion
jkvis's avatar
jkvis committed
441
                seq_list.append(ISeq(
442
                    sequence=sample[variant.sample_start + sample_start:variant.sample_end + sample_start],
jkvis's avatar
jkvis committed
443
444
                    weight_position=extracted.weight_position))
        elif variant.type & extractor.IDENTITY:
jkvis's avatar
jkvis committed
445
446
            seq_list.append(ISeq(start=variant.reference_start + 1 + reference_start,
                end=variant.reference_end + reference_start, reverse=False,weight_position=extracted.weight_position))
jkvis's avatar
jkvis committed
447
        elif variant.type & extractor.REVERSE_COMPLEMENT:
jkvis's avatar
jkvis committed
448
449
450
            seq_list.append(ISeq(start=variant.reference_start + 1 + reference_start,
                end=variant.reference_end + reference_start, reverse=True,weight_position=extracted.weight_position))
        else: #bases insertion
451
            seq_list.append(ISeq(sequence=sample[variant.sample_start + sample_start:variant.sample_end + sample_start],
jkvis's avatar
jkvis committed
452
453
454
455
456
457
458
                weight_position=extracted.weight_position))

        if variant.type & extractor.TRANSPOSITION_CLOSE:
            in_transposition -= 1

        index = variant.sample_end

459
460
461
462
    while repeat < len(repeats):
        seq_list.append(DNAVar(type='repeat',inserted=repeats[repeat]['unit'],count=repeats[repeat]['count']))
        repeat += 1

jkvis's avatar
jkvis committed
463
    description.append(DNAVar(start=reference_start + 1,end=reference_end,sample_start=sample_start,sample_end=sample_end,type='delins',inserted=seq_list))
464
465
466
467
468
469

    suffix = describe_dna(reference[reference_end:], sample[sample_end:])
    for variant in suffix:
        if variant.type != 'none':
            description.append(variant)

470
    return description
jkvis's avatar
jkvis committed
471
472


473
def print_var(variant):
474
    print('({:3}, {:3}), ({:3}, {:3}), {:08b}, {}, {}'.format(variant.reference_start,
475
        variant.reference_end, variant.sample_start, variant.sample_end,
476
        variant.type, variant.type, variant.sample_end - variant.sample_start))
477
478
479
480
481
482
483
484
485
486
487
488
489


def get_frames(flags):
    result = []

    for fs in FS:
        if flags & FS[fs]:
            result.append(fs)

    return result


def describe_protein(s1, s2, codon_table=1):
490
491
    """
    """
492
    codons = util.codon_table_string(codon_table)
493
494
495
496
497
498

    description = ProteinAllele()

    s1_swig = util.swig_str(s1)
    s2_swig = util.swig_str(s2)
    codons_swig = util.swig_str(codons)
499

500
501
    extracted = extractor.extract(s1_swig[0], s1_swig[1],
        s2_swig[0], s2_swig[1], extractor.TYPE_PROTEIN, codons_swig[0])
502
503
    variants = extracted.variants

504
505
506
    #for variant in variants:
    #    print_var(variant)
    #print()
507
508
509

    index = 0
    while index < len(variants):
510
        if variants[index].type != extractor.IDENTITY:
511
512
513
            variant = variants[index]
            index += 1
            seq_list = AISeqList()
514
515

            # NOTE: This is for filling.
516
            last_end = variants[index].reference_start
517

518
519
            while (index < len(variants) and
                    variants[index].type & extractor.FRAME_SHIFT):
520

521
                if last_end < variants[index].sample_start:
522
523
                    seq_list.append(AISeq(
                        s2[last_end:variants[index].sample_start]))
524

525
526
                last_end = variants[index].sample_end

527
528
529
530
531
532
533
534
535
536
537
                seq_list.append(AISeq(
                    s2[variants[index].sample_start:
                        variants[index].sample_end],
                    start=variants[index].reference_start + 1,
                    end=variants[index].reference_end,
                    sample_start=variants[index].sample_start + 1,
                    sample_end=variants[index].sample_end,
                    frames=get_frames(variants[index].type)))

                # NOTE: Perhaps use trans_open, trans_close to ...
                index += 1
538

539
            if last_end < variant.sample_end:
540
541
                seq_list.append(AISeq(s2[last_end:variant.sample_end]))

542
            var = var_to_protein_var(s1, s2, variant, seq_list,
543
544
                weight_position=extracted.weight_position)
            description.append(var)
545
546
        else:
            index += 1
547
548

    if not description:
549
550
        return ProteinAllele([ProteinVar()])
    return description