describe.py 21.7 KB
Newer Older
1
2
3
4
5
6
"""
Generate a HGVS description of the variant(s) leading from one sequence to an
other.
"""


Vermaat's avatar
Vermaat committed
7
from __future__ import (absolute_import, division, print_function,
8
    unicode_literals)
9
10
11

import math

12
from Bio.Seq import reverse_complement
13

14
15
16
from .variant import (ISeq, AISeq, ISeqList, AISeqList, DNAVar, ProteinVar,
    Allele, ProteinAllele, FS)
from . import extractor, util
17

jkvis's avatar
jkvis committed
18
19
from crossmapper import Crossmap

20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88

def roll(s, first, last):
    """
    Determine the variability of a variant by looking at cyclic
    permutations. Not all cyclic permutations are tested at each time, it
    is sufficient to check ``aW'' if ``Wa'' matches (with ``a'' a letter,
    ``W'' a word) when rolling to the left for example.

        >>> roll('abbabbabbabb', 4, 6)
        (3, 6)
        >>> roll('abbabbabbabb', 5, 5)
        (0, 1)
        >>> roll('abcccccde', 4, 4)
        (1, 3)

    @arg s: A reference sequence.
    @type s: any sequence type
    @arg first: First position of the pattern in the reference sequence.
    @type first: int
    @arg last: Last position of the pattern in the reference sequence.
    @type last: int

    @return: tuple:
        - left  ; Amount of positions that the pattern can be shifted to
                  the left.
        - right ; Amount of positions that the pattern can be shifted to
                  the right.
    @rtype: tuple(int, int)
    """
    pattern = s[first - 1:last]   # Extract the pattern
    pattern_length = len(pattern)

    # Keep rolling to the left as long as a cyclic permutation matches.
    minimum = first - 2
    j = pattern_length - 1
    while minimum > -1 and s[minimum] == pattern[j % pattern_length]:
        j -= 1
        minimum -= 1

    # Keep rolling to the right as long as a cyclic permutation matches.
    maximum = last
    j = 0
    while maximum < len(s) and s[maximum] == pattern[j % pattern_length]:
        j += 1
        maximum += 1

    return first - minimum - 2, maximum - last


def palinsnoop(s):
    """
    Check a sequence for a reverse-complement-palindromic prefix (and
    suffix). If one is detected, return the length of this prefix. If the
    string equals its reverse complement, return -1.

        >>> palinsnoop('TACGCTA')
        2
        >>> palinsnoop('TACGTA')
        -1
        >>> palinsnoop('TACGCTT')
        0

    @arg s: A nucleotide sequence.
    @type s: unicode

    @return: The number of elements that are palindromic or -1 if the string
             is a 'palindrome'.
    @rtype: int
    """
89
    s_revcomp = reverse_complement(str(s)) # FIXME str inserted.
90
91
92
93
94
95
96
97
98
99

    for i in range(int(math.ceil(len(s) / 2.0))):
        if s[i] != s_revcomp[i]:
            # The first i elements are 'palindromic'.
            return i

    # Perfect 'palindrome'.
    return -1


100
def var_to_dna_var(s1, s2, var, seq_list=[], weight_position=1):
101
    """
102
    Convert a variant from the extractor module to a DNAVar.
103
104
105
106
107

    :arg unicode s1: Reference sequence.
    :arg unicode s2: Sample sequence.
    :arg str var: Variant from the extractor module.
    :arg str seq_list: Container for an inserted sequence.
jkvis's avatar
jkvis committed
108
    :arg int weight_position: Weight of a position.
109
110
111
    """
    # Unknown.
    if s1 == '?' or s2 == '?':
112
        return [DNAVar(type='unknown', weight_position=weight_position)]
113
114
115
116
117
118
119
120
121
122
123
124

    # Insertion / Duplication.
    if var.reference_start == var.reference_end:
        ins_length = var.sample_end - var.sample_start
        shift5, shift3 = roll(s2, var.sample_start + 1, var.sample_end)
        shift = shift5 + shift3

        var.reference_start += shift3
        var.reference_end += shift3
        var.sample_start += shift3
        var.sample_end += shift3

jkvis's avatar
jkvis committed
125

126
        if (var.sample_start - ins_length >= 0 and
jkvis's avatar
jkvis committed
127
128
                '$' not in s1[var.reference_start - ins_length:var.reference_start] and
                '$' not in s2[var.sample_start:var.sample_end] and
129
130
131
132
                s1[var.reference_start - ins_length:var.reference_start] ==
                s2[var.sample_start:var.sample_end]):
            # NOTE: We may want to omit the inserted / deleted sequence and
            # use the ranges instead.
133
            return DNAVar(start=var.reference_start - ins_length + 1,
134
135
136
137
138
139
140
                end=var.reference_end, type='dup', shift=shift,
                sample_start=var.sample_start + 1, sample_end=var.sample_end,
                inserted=ISeqList([ISeq(sequence=s2[
                var.sample_start:var.sample_end],
                    weight_position=weight_position)]),
                weight_position=weight_position)

141
        return DNAVar(start=var.reference_start,
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
            end=var.reference_start + 1,
            inserted=seq_list or
            ISeqList([ISeq(sequence=s2[var.sample_start:var.sample_end],
                weight_position=weight_position)]),
            type='ins', shift=shift, sample_start=var.sample_start + 1,
            sample_end=var.sample_end, weight_position=weight_position)

    # Deletion.
    if var.sample_start == var.sample_end:
        shift5, shift3 = roll(s1, var.reference_start + 1, var.reference_end)
        shift = shift5 + shift3

        var.reference_start += shift3
        var.reference_end += shift3

157
        return DNAVar(start=var.reference_start + 1,
158
159
160
161
162
163
164
165
166
167
            end=var.reference_end, type='del', shift=shift,
            sample_start=var.sample_start, sample_end=var.sample_end + 1,
            deleted=ISeqList([ISeq(sequence=s1[
                var.reference_start:var.reference_end],
                weight_position=weight_position)]),
            weight_position=weight_position)

    # Substitution.
    if (var.reference_start + 1 == var.reference_end and
            var.sample_start + 1 == var.sample_end):
168
        return DNAVar(start=var.reference_start + 1,
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
            end=var.reference_end, sample_start=var.sample_start + 1,
            sample_end=var.sample_end, type='subst',
            deleted=ISeqList([ISeq(sequence=s1[var.reference_start],
                weight_position=weight_position)]),
            inserted=ISeqList([ISeq(sequence=s2[var.sample_start],
                weight_position=weight_position)]),
            weight_position=weight_position)

    # Inversion.
    if var.type & extractor.REVERSE_COMPLEMENT:
        trim = palinsnoop(s1[var.reference_start:var.reference_end])

        if trim > 0: # Partial palindrome.
            var.reference_end -= trim
            var.sample_end -= trim

185
        return DNAVar(start=var.reference_start + 1,
186
187
188
189
190
191
            end=var.reference_end, type='inv',
            sample_start=var.sample_start + 1, sample_end=var.sample_end,
            deleted=ISeqList([ISeq(sequence=s1[
                var.reference_start:var.reference_end],
                weight_position=weight_position)]),
            inserted=ISeqList([ISeq(sequence=s2[
192
                var.sample_start:var.sample_end],
193
194
195
196
                weight_position=weight_position)]),
            weight_position=weight_position)

    # InDel.
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
    return DNAVar(start=var.reference_start + 1,
        end=var.reference_end, deleted=ISeqList([ISeq(sequence=s1[
                var.reference_start:var.reference_end],
                weight_position=weight_position)]),
        inserted=seq_list or
        ISeqList([ISeq(sequence=s2[var.sample_start:var.sample_end],
            weight_position=weight_position)]),
        type='delins', sample_start=var.sample_start + 1,
        sample_end=var.sample_end, weight_position=weight_position)


def var_to_protein_var(s1, s2, var, seq_list=[], weight_position=1):
    """
    Convert a variant from the extractor module to a ProteinVar.

    :arg unicode s1: Reference sequence.
    :arg unicode s2: Sample sequence.
    :arg str var: Variant from the extractor module.
    :arg str seq_list: Container for an inserted sequence.
    :arg str weight_position: Weight of a position.
    """
    # Unknown.
    if s1 == '?' or s2 == '?':
        return [ProteinVar(type='unknown', weight_position=weight_position)]

    # Insertion / Duplication.
    if var.reference_start == var.reference_end:
        ins_length = var.sample_end - var.sample_start
        shift5, shift3 = roll(s2, var.sample_start + 1, var.sample_end)
        shift = shift5 + shift3

        var.reference_start += shift3
        var.reference_end += shift3
        var.sample_start += shift3
        var.sample_end += shift3

        if (var.sample_start - ins_length >= 0 and
                s1[var.reference_start - ins_length:var.reference_start] ==
                s2[var.sample_start:var.sample_end]):
            # NOTE: We may want to omit the inserted / deleted sequence and
            # use the ranges instead.
            return ProteinVar(s1=s1, s2=s2,
                start=var.reference_start - ins_length + 1,
                end=var.reference_end, type='dup', shift=shift,
                sample_start=var.sample_start + 1, sample_end=var.sample_end,
242
                inserted=AISeqList([AISeq(sequence=s2[
243
244
245
246
247
248
249
                var.sample_start:var.sample_end],
                    weight_position=weight_position)]),
                weight_position=weight_position)

        return ProteinVar(s1=s1, s2=s2, start=var.reference_start,
            end=var.reference_start + 1,
            inserted=seq_list or
250
            AISeqList([AISeq(sequence=s2[var.sample_start:var.sample_end],
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
                weight_position=weight_position)]),
            type='ins', shift=shift, sample_start=var.sample_start + 1,
            sample_end=var.sample_end, weight_position=weight_position)

    # Deletion.
    if var.sample_start == var.sample_end:
        shift5, shift3 = roll(s1, var.reference_start + 1, var.reference_end)
        shift = shift5 + shift3

        var.reference_start += shift3
        var.reference_end += shift3

        return ProteinVar(s1=s1, s2=s2, start=var.reference_start + 1,
            end=var.reference_end, type='del', shift=shift,
            sample_start=var.sample_start, sample_end=var.sample_end + 1,
266
            deleted=AISeqList([AISeq(sequence=s1[
267
268
269
270
271
272
273
274
275
276
                var.reference_start:var.reference_end],
                weight_position=weight_position)]),
            weight_position=weight_position)

    # Substitution.
    if (var.reference_start + 1 == var.reference_end and
            var.sample_start + 1 == var.sample_end):
        return ProteinVar(s1=s1, s2=s2, start=var.reference_start + 1,
            end=var.reference_end, sample_start=var.sample_start + 1,
            sample_end=var.sample_end, type='subst',
277
            deleted=AISeqList([AISeq(sequence=s1[var.reference_start],
278
                weight_position=weight_position)]),
279
            inserted=AISeqList([AISeq(sequence=s2[var.sample_start],
280
281
282
283
284
                weight_position=weight_position)]),
            weight_position=weight_position)

    # InDel.
    return ProteinVar(s1=s1, s2=s2, start=var.reference_start + 1,
285
        end=var.reference_end, deleted=AISeqList([AISeq(sequence=s1[
286
287
288
                var.reference_start:var.reference_end],
                weight_position=weight_position)]),
        inserted=seq_list or
289
        AISeqList([AISeq(sequence=s2[var.sample_start:var.sample_end],
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
            weight_position=weight_position)]),
        type='delins', sample_start=var.sample_start + 1,
        sample_end=var.sample_end, weight_position=weight_position)


def describe_dna(s1, s2):
    """
    Give an allele description of the change from {s1} to {s2}.

    :arg unicode s1: Sequence 1.
    :arg unicode s2: Sequence 2.

    :returns list(RawVar): A list of RawVar objects, representing the allele.
    """
    description = Allele()
    in_transposition = 0

Vermaat's avatar
Vermaat committed
307
308
309
    s1_swig = util.swig_str(s1)
    s2_swig = util.swig_str(s2)
    extracted = extractor.extract(s1_swig[0], s1_swig[1],
310
                                  s2_swig[0], s2_swig[1], extractor.TYPE_DNA)
Vermaat's avatar
Vermaat committed
311

312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
    for variant in extracted.variants:
        if variant.type & extractor.TRANSPOSITION_OPEN:
            if not in_transposition:
                seq_list = ISeqList()
            in_transposition += 1

        if in_transposition:
            if variant.type & extractor.IDENTITY:
                seq_list.append(ISeq(start=variant.transposition_start + 1,
                    end=variant.transposition_end, reverse=False,
                    weight_position=extracted.weight_position))
            elif variant.type & extractor.REVERSE_COMPLEMENT:
                seq_list.append(ISeq(start=variant.transposition_start + 1,
                    end=variant.transposition_end, reverse=True,
                    weight_position=extracted.weight_position))
            else:
                seq_list.append(ISeq(
                    sequence=s2[variant.sample_start:variant.sample_end],
                    weight_position=extracted.weight_position))
        elif not (variant.type & extractor.IDENTITY):
332
            description.append(var_to_dna_var(s1, s2, variant,
333
334
335
336
337
338
                weight_position=extracted.weight_position))

        if variant.type & extractor.TRANSPOSITION_CLOSE:
            in_transposition -= 1

            if not in_transposition:
339
                description.append(var_to_dna_var(s1, s2, variant, seq_list,
340
341
342
343
344
345
346
                    weight_position=extracted.weight_position))

    if not description:
        return Allele([DNAVar()])
    return description


jkvis's avatar
jkvis committed
347
348
349

def mask_string(string, units):
    MASK = '$'
350
    repeats = []
jkvis's avatar
jkvis committed
351
352
    for unit in units:
        found = string.find(unit)
353
354
355
356
357
358
359
360
361
        while found != -1:
            last = len(repeats) - 1
            if last > -1 and repeats[last]['start'] + repeats[last]['count'] * len(unit) == found:
                repeats[last]['count'] += 1
            else:
                repeats.append({'start': found, 'count': 1, 'unit': unit})
            string = string[:found] + MASK * len(unit) + string[found + len(unit):]
            found = string.find(unit, found + len(unit))
    return string, sorted(repeats, key=lambda repeat: repeat['start'])
jkvis's avatar
jkvis committed
362
363

def describe_repeats(reference, sample, units):
364
    MASK = '$'
jkvis's avatar
jkvis committed
365
    masked_ref, _ = mask_string(reference, units)
366
    masked_alt, repeats = mask_string(sample, units)
jkvis's avatar
jkvis committed
367

368
369
370
371
    reference_start = max(0, masked_ref.find(MASK))
    reference_end = min(len(masked_ref), masked_ref.rfind(MASK)) + 1
    sample_start = max(0, masked_alt.find(MASK))
    sample_end = min(len(masked_alt), masked_alt.rfind(MASK)) + 1
jkvis's avatar
jkvis committed
372

373
374
375
376
377
378
379
380
    description = Allele()
    prefix = describe_dna(reference[:reference_start], sample[:sample_start])
    for variant in prefix:
        if variant.type != 'none':
            description.append(variant)

    ref_swig = util.swig_str(masked_ref[reference_start:reference_end])
    alt_swig = util.swig_str(masked_alt[sample_start:sample_end])
jkvis's avatar
jkvis committed
381
382
383
    extracted = extractor.extract(ref_swig[0], ref_swig[1],
                                  alt_swig[0], alt_swig[1], extractor.TYPE_DNA)

jkvis's avatar
jkvis committed
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
    variant_list = []
    for variant in extracted.variants:
        replaced = []
        i = variant.sample_start
        start = i
        while i < variant.sample_end:
            while i < variant.sample_end and masked_alt[i + sample_start] != MASK:
                i += 1
            if i < variant.sample_end:
                split = extractor.Variant()
                split.reference_start = variant.reference_start
                split.reference_end = variant.reference_end
                split.sample_start = start
                split.sample_end = i
                split.type = variant.type
                split.transposition_start = variant.transposition_start
                split.transposition_end = variant.transposition_end
                replaced.append(split)
                while i < variant.sample_end and masked_alt[i + sample_start] == MASK:
                    i += 1
                start = i

        if len(replaced) > 0:
            split = extractor.Variant()
            split.reference_start = variant.reference_start
            split.reference_end = variant.reference_end
            split.sample_start = start
            split.sample_end = variant.sample_end
            split.type = variant.type
            split.transposition_start = variant.transposition_start
            split.transposition_end = variant.transposition_end
            replaced.append(split)
            variant_list += replaced
        else:
            variant_list.append(variant)

jkvis's avatar
jkvis committed
420
421
422
423
    in_transposition = 0
    index = 0
    repeat = 0
    seq_list = ISeqList()
jkvis's avatar
jkvis committed
424
    for variant in variant_list:
425
        while variant.sample_start > index and repeat < len(repeats):
jkvis's avatar
jkvis committed
426
            seq_list.append(DNAVar(type='repeat',inserted=repeats[repeat]['unit'],count=repeats[repeat]['count']))
jkvis's avatar
jkvis committed
427
            index = repeats[repeat]['start'] + repeats[repeat]['count'] * len(repeats[repeat]['unit'])
jkvis's avatar
jkvis committed
428
429
430
431
432
433
434
            repeat += 1

        if variant.type & extractor.TRANSPOSITION_OPEN:
            in_transposition += 1

        if in_transposition:
            if variant.type & extractor.IDENTITY:
jkvis's avatar
jkvis committed
435
436
                seq_list.append(ISeq(start=variant.transposition_start + 1 + reference_start,
                    end=variant.transposition_end + reference_start, reverse=False,
jkvis's avatar
jkvis committed
437
438
                    weight_position=extracted.weight_position))
            elif variant.type & extractor.REVERSE_COMPLEMENT:
jkvis's avatar
jkvis committed
439
440
                seq_list.append(ISeq(start=variant.transposition_start + 1 + reference_start,
                    end=variant.transposition_end + reference_start, reverse=True,
jkvis's avatar
jkvis committed
441
                    weight_position=extracted.weight_position))
jkvis's avatar
jkvis committed
442
            else: #bases insertion
jkvis's avatar
jkvis committed
443
                seq_list.append(ISeq(
444
                    sequence=sample[variant.sample_start + sample_start:variant.sample_end + sample_start],
jkvis's avatar
jkvis committed
445
446
                    weight_position=extracted.weight_position))
        elif variant.type & extractor.IDENTITY:
jkvis's avatar
jkvis committed
447
448
            seq_list.append(ISeq(start=variant.reference_start + 1 + reference_start,
                end=variant.reference_end + reference_start, reverse=False,weight_position=extracted.weight_position))
jkvis's avatar
jkvis committed
449
        elif variant.type & extractor.REVERSE_COMPLEMENT:
jkvis's avatar
jkvis committed
450
451
452
            seq_list.append(ISeq(start=variant.reference_start + 1 + reference_start,
                end=variant.reference_end + reference_start, reverse=True,weight_position=extracted.weight_position))
        else: #bases insertion
453
            seq_list.append(ISeq(sequence=sample[variant.sample_start + sample_start:variant.sample_end + sample_start],
jkvis's avatar
jkvis committed
454
455
456
457
458
459
460
                weight_position=extracted.weight_position))

        if variant.type & extractor.TRANSPOSITION_CLOSE:
            in_transposition -= 1

        index = variant.sample_end

461
462
463
464
    while repeat < len(repeats):
        seq_list.append(DNAVar(type='repeat',inserted=repeats[repeat]['unit'],count=repeats[repeat]['count']))
        repeat += 1

465
466
467
468
469
470
471
472
473
474
475
    if len(variant_list) > 0 or len(repeats) > 0:
        description.append(DNAVar(start=reference_start + 1,end=reference_end,sample_start=sample_start,sample_end=sample_end,type='delins',inserted=seq_list))

        suffix = describe_dna(reference[reference_end:], sample[sample_end:])
        for variant in suffix:
            if variant.type != 'none':
                variant.start += reference_end
                variant.end += reference_end
                description.append(variant)
    else:
        description = prefix
476

jkvis's avatar
jkvis committed
477
478
479
    cm = Crossmap([reference_start + 1, reference_end], [], 1)
    for variant in description:
        for inserted in variant.inserted:
jkvis's avatar
jkvis committed
480
481
482
483
            inserted.start = cm.tuple2string(cm.g2x(int(inserted.start)))
            inserted.end = cm.tuple2string(cm.g2x(int(inserted.end)))
        variant.start = cm.tuple2string(cm.g2x(int(variant.start)))
        variant.end = cm.tuple2string(cm.g2x(int(variant.end)))
jkvis's avatar
jkvis committed
484
485

    return description, reference_start, reference_end
jkvis's avatar
jkvis committed
486
487


488
def print_var(variant):
489
    print('({:3}, {:3}), ({:3}, {:3}), {:08b}, {}, {}'.format(variant.reference_start,
490
        variant.reference_end, variant.sample_start, variant.sample_end,
491
        variant.type, variant.type, variant.sample_end - variant.sample_start))
492
493
494
495
496
497
498
499
500
501
502
503
504


def get_frames(flags):
    result = []

    for fs in FS:
        if flags & FS[fs]:
            result.append(fs)

    return result


def describe_protein(s1, s2, codon_table=1):
505
506
    """
    """
507
    codons = util.codon_table_string(codon_table)
508
509
510
511
512
513

    description = ProteinAllele()

    s1_swig = util.swig_str(s1)
    s2_swig = util.swig_str(s2)
    codons_swig = util.swig_str(codons)
514

515
516
    extracted = extractor.extract(s1_swig[0], s1_swig[1],
        s2_swig[0], s2_swig[1], extractor.TYPE_PROTEIN, codons_swig[0])
517
518
    variants = extracted.variants

519
520
521
    #for variant in variants:
    #    print_var(variant)
    #print()
522
523
524

    index = 0
    while index < len(variants):
525
        if variants[index].type != extractor.IDENTITY:
526
527
528
            variant = variants[index]
            index += 1
            seq_list = AISeqList()
529
530

            # NOTE: This is for filling.
531
            last_end = variants[index].reference_start
532

533
534
            while (index < len(variants) and
                    variants[index].type & extractor.FRAME_SHIFT):
535

536
                if last_end < variants[index].sample_start:
537
538
                    seq_list.append(AISeq(
                        s2[last_end:variants[index].sample_start]))
539

540
541
                last_end = variants[index].sample_end

542
543
544
545
546
547
548
549
550
551
552
                seq_list.append(AISeq(
                    s2[variants[index].sample_start:
                        variants[index].sample_end],
                    start=variants[index].reference_start + 1,
                    end=variants[index].reference_end,
                    sample_start=variants[index].sample_start + 1,
                    sample_end=variants[index].sample_end,
                    frames=get_frames(variants[index].type)))

                # NOTE: Perhaps use trans_open, trans_close to ...
                index += 1
553

554
            if last_end < variant.sample_end:
555
556
                seq_list.append(AISeq(s2[last_end:variant.sample_end]))

557
            var = var_to_protein_var(s1, s2, variant, seq_list,
558
559
                weight_position=extracted.weight_position)
            description.append(var)
560
561
        else:
            index += 1
562
563

    if not description:
564
565
        return ProteinAllele([ProteinVar()])
    return description