tssv.py 16.4 KB
Newer Older
1
#!/usr/bin/env python
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

#
# Copyright (C) 2016 Jerry Hoogenboom
#
# This file is part of FDSTools, data analysis tools for Next
# Generation Sequencing of forensic DNA markers.
#
# FDSTools is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation, either version 3 of the License, or (at
# your option) any later version.
#
# FDSTools is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with FDSTools.  If not, see <http://www.gnu.org/licenses/>.
#

23
24
25
26
27
28
29
30
31
32
33
"""
Link raw reads in a FastA or FastQ file to markers and count the number
of reads for each unique sequence.

This tool is basically a wrapper around the 'tssvl' program, offering
direct support for using FDSTools library files and allele name
generation.
"""
from __future__ import absolute_import  # Needed to import tssv package.
import sys
import math
34
35
36
37
38
import itertools as it

from multiprocessing.queues import SimpleQueue
from multiprocessing import Process
from threading import Thread
39
40

# TSSV is only imported when actually running this tool.
41
#from tssv.align_pair import align_pair
42
43
44
45
#from tssv.tssv_lite import process_file, make_sequence_tables, \
#                           make_statistics_table, prepare_output_dir

from ..lib import pos_int_arg, add_input_output_args, get_input_output_files,\
46
                  add_sequence_format_args, reverse_complement, PAT_SEQ_RAW,\
47
                  get_column_ids, ensure_sequence_format
48

49
__version__ = "1.1.0"
50
51
52
53
54
55
56
57
58


# Default values for parameters are specified below.

# Default maximum number of mismatches per nucleotide in the flanking
# sequences to allow.
# This value can be overridden by the -m command line option.
_DEF_MISMATCHES = 0.08

59
60
61
62
63
# Default penalty multiplier for insertions and deletions in the
# flanking sequences.
# This value can be overridden by the -n command line option.
_DEF_INDEL_SCORE = 1

64
65
66
67
68
69
70
71
72
73
74
75
76
# Default minimum number of reads to consider.
# This value can be overridden by the -a command line option.
_DEF_MINIMUM = 1


def convert_library(library, threshold):
    return {data[0]: (
        (data[1][0], reverse_complement(data[1][1])), (
            int(math.ceil(len(data[1][0]) * threshold)),
            int(math.ceil(len(data[1][1]) * threshold))))
                for data in (
                    (marker, library["flanks"][marker])
                        for marker in library["flanks"])}
Hoogenboom, Jerry's avatar
Hoogenboom, Jerry committed
77
78
79
80
81
82
83
84
#convert_library


def seq_pass_filt(sequence, reads, threshold, explen=None):
    """Return False if the sequence does not meet the criteria."""
    return (reads >= threshold and PAT_SEQ_RAW.match(sequence) is not None and
        (explen is None or explen[0] <= len(sequence) <= explen[1]))
#seq_pass_filt
85
86


87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
def worker_work(tssv_library, indel_score, seq):
    """Find markers in sequence."""
    seqs = (seq, reverse_complement(seq))
    seqs_up = map(str.upper, seqs)
    results = []
    for marker in tssv_library:
        pair = tssv_library[marker][0]
        thresholds = tssv_library[marker][1]
        algn = (
            align_pair(seqs_up[0], seqs_up[1], pair, indel_score),
            align_pair(seqs_up[1], seqs_up[0], pair, indel_score))
        matches = 0
        if algn[0][0][0] <= thresholds[0]:
            # Left marker was found in forward sequence
            cutout = seqs[0][max(0, algn[0][0][1]-len(pair[0])):algn[0][0][1]]
            if cutout.lower() != cutout:
                matches += 1
        if algn[0][1][0] <= thresholds[1]:
            # Right marker was found in forward sequence.
            cutout = seqs[0][algn[0][1][1] : algn[0][1][1]+len(pair[1])]
            if cutout.lower() != cutout:
                matches += 2
        if algn[1][0][0] <= thresholds[0]:
            # Left marker was found in reverse sequence
            cutout = seqs[1][max(0, algn[1][0][1]-len(pair[0])):algn[1][0][1]]
            if cutout.lower() != cutout:
                matches += 4
        if algn[1][1][0] <= thresholds[1]:
            # Right marker was found in reverse sequence.
            cutout = seqs[1][algn[1][1][1] : algn[1][1][1]+len(pair[1])]
            if cutout.lower() != cutout:
                matches += 8
        results.append((marker, matches,
            # Matched pair in forward sequence.
            seqs[0][algn[0][0][1] : algn[0][1][1]] if
                (matches & 3) == 3 and algn[0][0][1] < algn[0][1][1]
                else None,
            # Matched pair in reverse sequence.
            seqs[1][algn[1][0][1] : algn[1][1][1]] if
                (matches & 12) == 12 and algn[1][0][1] < algn[1][1][1]
                else None))
    return results
#worker_work


def worker(tssv_library, indel_score, task_queue, done_queue):
    """
    Read sequences from task_queue, write findings to done_queue.
    """
    for task in iter(task_queue.get, None):
        done_queue.put(
            tuple(worker_work(tssv_library, indel_score, seq) for seq in task))
#worker


def feeder(input, tssv_library, indel_score, workers, done_queue):
    """
    Start worker processes, feed them sequences from input and have them
    write their results to done_queue
    """
    task_queue = SimpleQueue()
    processes = []
    for i in range(workers):
        process = Process(target=worker,
            args=(tssv_library, indel_score, task_queue, done_queue))
        process.daemon = True
        process.start()
        processes.append(process)
    while 1:
        # Sending chunks of 10000 reads to the workers.
        task = tuple(r[1] for r in it.islice(input, 10000))
        if not task:
            break
        task_queue.put(task)
    for i in range(workers):
        task_queue.put(None)
    for process in processes:
        process.join()
    done_queue.put(None)
#feeder


def genreads(infile):
    """
    Generate tuples of (header, sequence) from FastA stream or
    tuples of (header, sequence, quality) from FastQ stream.
    """
    firstchar = infile.read(1)
    if not firstchar:
        return
    if firstchar not in ">@":
        raise ValueError("Input file is not a FastQ or FastA file")
    state = 0
    for line in infile:
        if state == 1:  # Put most common state on top.
            if firstchar == ">" and line.startswith(">"):
                yield (header, seq)
                header = line[1:].strip()
                seq = ""
            elif firstchar == "@" and line.startswith("+"):
                qual = ""
                state = 2
            else:
                seq += line.strip()
        elif state == 2:
            if line.startswith("@") and len(qual) >= len(seq):
                yield (header, seq, qual)
                header = line[1:].strip()
                seq = ""
                state = 1
            else:
                qual += line.strip()
        elif state == 0:
            header = line.strip()
            seq = ""
            state = 1
    yield (header, seq) if state == 1 else (header, seq, qual)
#genreads


def process_file_parallel(infile, tssv_library, indel_score, workers):
    # Prepare data storage.
    sequences = {marker: {} for marker in tssv_library}
    counters = {marker: {key: 0 for key in
            ("fPaired", "rPaired", "fLeft", "rLeft", "fRight", "rRight")}
        for marker in tssv_library}
    total_reads = 0
    unrecognised = 0

    # Create queues.
    done_queue = SimpleQueue()

    # Start worker processes.
    thread = Thread(target=feeder, args=(genreads(infile), tssv_library,
        indel_score, workers, done_queue))
    thread.daemon = True
    thread.start()

    for results in it.chain.from_iterable(iter(done_queue.get, None)):
        total_reads += 1
        recognised = 0

        for marker, matches, seq1, seq2 in results:
            recognised |= matches
            counters[marker]["fLeft"] += matches
            counters[marker]["fRight"] += (matches >> 1)
            counters[marker]["rLeft"] += (matches >> 2)
            counters[marker]["rRight"] += (matches >> 3)

            # Search in the forward strand.
            if seq1 is not None:
                counters[marker]["fPaired"] += 1
                if seq1 not in sequences[marker]:
                    sequences[marker][seq1] = [1, 0]
                else:
                    sequences[marker][seq1][0] += 1

            # Search in the reverse strand.
            if seq2 is not None:
                counters[marker]["rPaired"] += 1
                if seq2 not in sequences[marker]:
                    sequences[marker][seq2] = [0, 1]
                else:
                    sequences[marker][seq2][1] += 1

        if not recognised:
            unrecognised += 1
    thread.join()

    # Count number of unique sequences per marker.
    for marker in tssv_library:
        counters[marker]["unique_seqs"] = len(sequences[marker])

    # Return counters and sequences.
    return total_reads, unrecognised, counters, sequences
#process_file_parallel


265
def run_tssv_lite(infile, outfile, reportfile, is_fastq, library, seqformat,
Hoogenboom, Jerry's avatar
Hoogenboom, Jerry committed
266
                  threshold, minimum, aggregate_filtered,
267
                  missing_marker_action, dirname, indel_score, workers):
268
269
270
271
272
273
    file_format = "fastq" if is_fastq else "fasta"
    tssv_library = convert_library(library, threshold)

    # Open output directory if we have one.
    if dirname:
        outfiles = prepare_output_dir(dirname, library["flanks"], file_format)
274
275
276
277
278
279
        if workers > 1:
            # TODO: Implement FastA/FastQ writing in multiprocess mode.
            workers = 1
            sys.stderr.write(
                "Falling back to single-threaded mode because the -D/--dir "
                "option was used.\n")
280
281
282
    else:
        outfiles = None

283
284
285
286
287
288
    if workers > 1:
        total_reads, unrecognised, counters, sequences = \
            process_file_parallel(infile, tssv_library, indel_score, workers)
    else:
        total_reads, unrecognised, counters, sequences = process_file(
            infile, file_format, tssv_library, outfiles, indel_score)
289

290
    # Filter out sequences with low read counts and invalid bases now.
Hoogenboom, Jerry's avatar
Hoogenboom, Jerry committed
291
    if aggregate_filtered:
292
293
294
        aggregates = {}
        for marker in sequences:
            for sequence in sequences[marker]:
Hoogenboom, Jerry's avatar
Hoogenboom, Jerry committed
295
296
297
                if not seq_pass_filt(sequence,
                        sum(sequences[marker][sequence]), minimum,
                        library.get("expected_length", {}).get(marker)):
298
299
300
301
                    if marker not in aggregates:
                        aggregates[marker] = [0, 0]
                    aggregates[marker][0] += sequences[marker][sequence][0]
                    aggregates[marker][1] += sequences[marker][sequence][1]
302
303
304
    sequences = {marker:
        {sequence: sequences[marker][sequence]
            for sequence in sequences[marker]
Hoogenboom, Jerry's avatar
Hoogenboom, Jerry committed
305
306
307
            if seq_pass_filt(sequence,
                sum(sequences[marker][sequence]), minimum,
                library.get("expected_length", {}).get(marker))}
308
309
        for marker in sequences}

310
311
312
313
314
    # Add aggregate rows if the user requested so.
    if aggregate_filtered:
        for marker in aggregates:
            sequences[marker]["Other sequences"] = aggregates[marker]

315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
    # Check presence of all markers.
    if missing_marker_action != "exclude":
        for marker in library["flanks"]:
            if not sequences[marker]:
                if missing_marker_action == "include":
                    sequences[marker]["No data"] = [0, 0]
                else:
                    raise ValueError("Marker %s was not detected!" % marker)

    column_names, tables = make_sequence_tables(sequences, 0)

    # Convert sequences to the desired format.
    colid_sequence = get_column_ids(column_names, "sequence")
    if seqformat != "raw":
        for marker in tables:
            for line in tables[marker]:
                line[colid_sequence] = ensure_sequence_format(
                    line[colid_sequence], seqformat, library=library,
                    marker=marker)

    # Write sequence tables.
    column_names = "\t".join(column_names)
    for marker in sorted(tables):
        tables[marker] = "\n".join(
            "\t".join(map(str, line)) for line in tables[marker])
        if outfiles:
            outfiles["markers"][marker]["sequences"].write(
                "\n".join((column_names, tables[marker])))
    tables = "\n".join(
        [column_names] + [tables[marker] for marker in sorted(tables)])
    if outfiles:
        outfiles["sequences"].write(tables)
    outfile.write(tables)
    outfile.write("\n")

    # Write statistics table.
    statistics = "\n".join((
        make_statistics_table(counters),
        "",  # Empty line.
        "total reads\t%i" % total_reads,
        "unrecognised reads\t%i" % unrecognised))
    if outfiles:
        outfiles["statistics"].write(statistics)
    reportfile.write(statistics)
    reportfile.write("\n")
#run_tssv_lite


def add_arguments(parser):
    add_sequence_format_args(parser, "raw", False, True)
    add_input_output_args(parser, True, False, True)
366
    parser.add_argument("-q", "--is-fastq", action="store_true",
367
        help="if specified, treat the input as a FASTQ file instead of FASTA")
Hoogenboom, Jerry's avatar
Hoogenboom, Jerry committed
368
369
370
371
372
373
    parser.add_argument("-D", "--dir",
        help="output directory for verbose output; when given, a subdirectory "
             "will be created for each marker, each with a separate "
             "sequences.csv file and a number of FASTA/FASTQ files containing "
             "unrecognised reads (unknown.fa), recognised reads "
             "(Marker/paired.fa), and reads that lack one of the flanks of a "
374
             "marker (Marker/noend.fa and Marker/nostart.fa)")
375
376
377
    parser.add_argument("-T", "--num-threads",
        type=pos_int_arg, default=1,
        help="number of worker threads to use (default: %(default)s)")
Hoogenboom, Jerry's avatar
Hoogenboom, Jerry committed
378
379
    filtergroup = parser.add_argument_group("filtering options")
    filtergroup.add_argument("-m", "--mismatches", type=float,
380
381
382
        default=_DEF_MISMATCHES,
        help="number of mismatches per nucleotide to allow in flanking "
             "sequences (default: %(default)s)")
383
384
385
386
387
    filtergroup.add_argument("-n", "--indel-score", metavar="N",
        type=pos_int_arg, default=_DEF_INDEL_SCORE,
        help="insertions and deletions in the flanking sequences are "
             "penalised this number of times more heavily than mismatches "
             "(default: %(default)s)")
388
    filtergroup.add_argument("-a", "--minimum", metavar="N", type=pos_int_arg,
389
390
391
        default=_DEF_MINIMUM,
        help="report only sequences with this minimum number of reads "
             "(default: %(default)s)")
Hoogenboom, Jerry's avatar
Hoogenboom, Jerry committed
392
393
    filtergroup.add_argument("-A", "--aggregate-filtered", action="store_true",
        help="if specified, sequences that have been filtered (as per the "
394
395
396
             "-a/--minimum option, the expected_allele_length section in the "
             "library file, as well as all sequences with ambiguous bases) "
             "will be aggregated per marker and reported as 'Other sequences'")
Hoogenboom, Jerry's avatar
Hoogenboom, Jerry committed
397
    filtergroup.add_argument("-M", "--missing-marker-action", metavar="ACTION",
398
399
400
401
402
403
404
405
406
        choices=("include", "exclude", "halt"),
        default="include",
        help="action to take when no sequences are linked to a marker: one of "
             "%(choices)s (default: %(default)s)")
#add_arguments


def run(args):
    # Import TSSV now.
407
408
    global align_pair, process_file, make_sequence_tables
    global make_statistics_table, prepare_output_dir
409
    try:
410
        from tssv.align_pair import align_pair
411
412
413
414
        from tssv.tssv_lite import process_file, make_sequence_tables, \
            make_statistics_table, prepare_output_dir
    except ImportError:
        raise ValueError(
415
416
417
            "This tool requires version 0.4.0 or later of the 'tssvl' program "
            "(TSSV-Lite) to be installed. Please download and install the "
            "latest version of TSSV from https://pypi.python.org/pypi/tssv.")
418
419
420
421
422
423
424
425

    files = get_input_output_files(args, True, False)
    if not files:
        raise ValueError("please specify an input file, or pipe in the output "
                         "of another program")
    infile = sys.stdin if files[0] == "-" else open(files[0], "r")
    run_tssv_lite(infile, files[1], args.report, args.is_fastq, args.library,
                  args.sequence_format, args.mismatches, args.minimum,
Hoogenboom, Jerry's avatar
Hoogenboom, Jerry committed
426
                  args.aggregate_filtered, args.missing_marker_action,
427
                  args.dir, args.indel_score, args.num_threads)
428
429
430
    if infile != sys.stdin:
        infile.close()
#run