Commit 9b261e56 authored by Hoogenboom, Jerry's avatar Hoogenboom, Jerry

FDSTools v1.1.0.dev2: Parallelize TSSV

* TSSV v1.1.0:
  * Added option -T/--num-threads (default: 1), which controls the
    number of worker threads TSSV may spawn to run the analysis.
    
parent 4a23b60f
......@@ -24,7 +24,7 @@ including tools for characterisation and filtering of PCR stutter artefacts and
other systemic noise, and for automatic detection of the alleles in a sample.
"""
__version_info__ = ('1', '1', '0', 'dev1')
__version_info__ = ('1', '1', '0', 'dev2')
__version__ = '.'.join(__version_info__)
usage = __doc__.split("\n\n\n")
......
......@@ -31,8 +31,14 @@ generation.
from __future__ import absolute_import # Needed to import tssv package.
import sys
import math
import itertools as it
from multiprocessing.queues import SimpleQueue
from multiprocessing import Process
from threading import Thread
# TSSV is only imported when actually running this tool.
#from tssv.align_pair import align_pair
#from tssv.tssv_lite import process_file, make_sequence_tables, \
# make_statistics_table, prepare_output_dir
......@@ -40,7 +46,7 @@ from ..lib import pos_int_arg, add_input_output_args, get_input_output_files,\
add_sequence_format_args, reverse_complement, PAT_SEQ_RAW,\
get_column_ids, ensure_sequence_format
__version__ = "1.0.2"
__version__ = "1.1.0"
# Default values for parameters are specified below.
......@@ -78,20 +84,208 @@ def seq_pass_filt(sequence, reads, threshold, explen=None):
#seq_pass_filt
def worker_work(tssv_library, indel_score, seq):
"""Find markers in sequence."""
seqs = (seq, reverse_complement(seq))
seqs_up = map(str.upper, seqs)
results = []
for marker in tssv_library:
pair = tssv_library[marker][0]
thresholds = tssv_library[marker][1]
algn = (
align_pair(seqs_up[0], seqs_up[1], pair, indel_score),
align_pair(seqs_up[1], seqs_up[0], pair, indel_score))
matches = 0
if algn[0][0][0] <= thresholds[0]:
# Left marker was found in forward sequence
cutout = seqs[0][max(0, algn[0][0][1]-len(pair[0])):algn[0][0][1]]
if cutout.lower() != cutout:
matches += 1
if algn[0][1][0] <= thresholds[1]:
# Right marker was found in forward sequence.
cutout = seqs[0][algn[0][1][1] : algn[0][1][1]+len(pair[1])]
if cutout.lower() != cutout:
matches += 2
if algn[1][0][0] <= thresholds[0]:
# Left marker was found in reverse sequence
cutout = seqs[1][max(0, algn[1][0][1]-len(pair[0])):algn[1][0][1]]
if cutout.lower() != cutout:
matches += 4
if algn[1][1][0] <= thresholds[1]:
# Right marker was found in reverse sequence.
cutout = seqs[1][algn[1][1][1] : algn[1][1][1]+len(pair[1])]
if cutout.lower() != cutout:
matches += 8
results.append((marker, matches,
# Matched pair in forward sequence.
seqs[0][algn[0][0][1] : algn[0][1][1]] if
(matches & 3) == 3 and algn[0][0][1] < algn[0][1][1]
else None,
# Matched pair in reverse sequence.
seqs[1][algn[1][0][1] : algn[1][1][1]] if
(matches & 12) == 12 and algn[1][0][1] < algn[1][1][1]
else None))
return results
#worker_work
def worker(tssv_library, indel_score, task_queue, done_queue):
"""
Read sequences from task_queue, write findings to done_queue.
"""
for task in iter(task_queue.get, None):
done_queue.put(
tuple(worker_work(tssv_library, indel_score, seq) for seq in task))
#worker
def feeder(input, tssv_library, indel_score, workers, done_queue):
"""
Start worker processes, feed them sequences from input and have them
write their results to done_queue
"""
task_queue = SimpleQueue()
processes = []
for i in range(workers):
process = Process(target=worker,
args=(tssv_library, indel_score, task_queue, done_queue))
process.daemon = True
process.start()
processes.append(process)
while 1:
# Sending chunks of 10000 reads to the workers.
task = tuple(r[1] for r in it.islice(input, 10000))
if not task:
break
task_queue.put(task)
for i in range(workers):
task_queue.put(None)
for process in processes:
process.join()
done_queue.put(None)
#feeder
def genreads(infile):
"""
Generate tuples of (header, sequence) from FastA stream or
tuples of (header, sequence, quality) from FastQ stream.
"""
firstchar = infile.read(1)
if not firstchar:
return
if firstchar not in ">@":
raise ValueError("Input file is not a FastQ or FastA file")
state = 0
for line in infile:
if state == 1: # Put most common state on top.
if firstchar == ">" and line.startswith(">"):
yield (header, seq)
header = line[1:].strip()
seq = ""
elif firstchar == "@" and line.startswith("+"):
qual = ""
state = 2
else:
seq += line.strip()
elif state == 2:
if line.startswith("@") and len(qual) >= len(seq):
yield (header, seq, qual)
header = line[1:].strip()
seq = ""
state = 1
else:
qual += line.strip()
elif state == 0:
header = line.strip()
seq = ""
state = 1
yield (header, seq) if state == 1 else (header, seq, qual)
#genreads
def process_file_parallel(infile, tssv_library, indel_score, workers):
# Prepare data storage.
sequences = {marker: {} for marker in tssv_library}
counters = {marker: {key: 0 for key in
("fPaired", "rPaired", "fLeft", "rLeft", "fRight", "rRight")}
for marker in tssv_library}
total_reads = 0
unrecognised = 0
# Create queues.
done_queue = SimpleQueue()
# Start worker processes.
thread = Thread(target=feeder, args=(genreads(infile), tssv_library,
indel_score, workers, done_queue))
thread.daemon = True
thread.start()
for results in it.chain.from_iterable(iter(done_queue.get, None)):
total_reads += 1
recognised = 0
for marker, matches, seq1, seq2 in results:
recognised |= matches
counters[marker]["fLeft"] += matches
counters[marker]["fRight"] += (matches >> 1)
counters[marker]["rLeft"] += (matches >> 2)
counters[marker]["rRight"] += (matches >> 3)
# Search in the forward strand.
if seq1 is not None:
counters[marker]["fPaired"] += 1
if seq1 not in sequences[marker]:
sequences[marker][seq1] = [1, 0]
else:
sequences[marker][seq1][0] += 1
# Search in the reverse strand.
if seq2 is not None:
counters[marker]["rPaired"] += 1
if seq2 not in sequences[marker]:
sequences[marker][seq2] = [0, 1]
else:
sequences[marker][seq2][1] += 1
if not recognised:
unrecognised += 1
thread.join()
# Count number of unique sequences per marker.
for marker in tssv_library:
counters[marker]["unique_seqs"] = len(sequences[marker])
# Return counters and sequences.
return total_reads, unrecognised, counters, sequences
#process_file_parallel
def run_tssv_lite(infile, outfile, reportfile, is_fastq, library, seqformat,
threshold, minimum, aggregate_filtered,
missing_marker_action, dirname, indel_score):
missing_marker_action, dirname, indel_score, workers):
file_format = "fastq" if is_fastq else "fasta"
tssv_library = convert_library(library, threshold)
# Open output directory if we have one.
if dirname:
outfiles = prepare_output_dir(dirname, library["flanks"], file_format)
if workers > 1:
# TODO: Implement FastA/FastQ writing in multiprocess mode.
workers = 1
sys.stderr.write(
"Falling back to single-threaded mode because the -D/--dir "
"option was used.\n")
else:
outfiles = None
total_reads, unrecognised, counters, sequences = process_file(
infile, file_format, tssv_library, outfiles, indel_score)
if workers > 1:
total_reads, unrecognised, counters, sequences = \
process_file_parallel(infile, tssv_library, indel_score, workers)
else:
total_reads, unrecognised, counters, sequences = process_file(
infile, file_format, tssv_library, outfiles, indel_score)
# Filter out sequences with low read counts and invalid bases now.
if aggregate_filtered:
......@@ -178,6 +372,9 @@ def add_arguments(parser):
"unrecognised reads (unknown.fa), recognised reads "
"(Marker/paired.fa), and reads that lack one of the flanks of a "
"marker (Marker/noend.fa and Marker/nostart.fa)")
parser.add_argument("-T", "--num-threads",
type=pos_int_arg, default=1,
help="number of worker threads to use (default: %(default)s)")
filtergroup = parser.add_argument_group("filtering options")
filtergroup.add_argument("-m", "--mismatches", type=float,
default=_DEF_MISMATCHES,
......@@ -207,9 +404,10 @@ def add_arguments(parser):
def run(args):
# Import TSSV now.
global process_file, make_sequence_tables, make_statistics_table
global prepare_output_dir
global align_pair, process_file, make_sequence_tables
global make_statistics_table, prepare_output_dir
try:
from tssv.align_pair import align_pair
from tssv.tssv_lite import process_file, make_sequence_tables, \
make_statistics_table, prepare_output_dir
except ImportError:
......@@ -226,7 +424,7 @@ def run(args):
run_tssv_lite(infile, files[1], args.report, args.is_fastq, args.library,
args.sequence_format, args.mismatches, args.minimum,
args.aggregate_filtered, args.missing_marker_action,
args.dir, args.indel_score)
args.dir, args.indel_score, args.num_threads)
if infile != sys.stdin:
infile.close()
#run
\ No newline at end of file
......@@ -50,8 +50,6 @@ To-do:
account as well.
* Perhaps there should be a version of BGEstimate that makes a profile for each
genotype instead of each allele. This allows for the detection of hybrids.
* Add tool to summarise various statistics about the entire analysis pipeline:
(TODO: Write this list)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment