Skip to content
Snippets Groups Projects
Commit 8acb0970 authored by Vermaat's avatar Vermaat
Browse files

Correctly handle batch job input and output encodings

parent 2a4dc3c1
No related branches found
No related tags found
No related merge requests found
......@@ -18,21 +18,80 @@ Module for parsing CSV files and spreadsheets.
from __future__ import unicode_literals
import codecs
import re
import magic # open(), MAGIC_MIME, MAGIC_NONE
import csv # Sniffer(), reader(), Error
import xlrd # open_workbook()
import zipfile # ZipFile()
import xml.dom.minidom # parseString()
import os # remove()
import tempfile
import cchardet as chardet
from mutalyzer.config import settings
# Amount of bytes to be read for determining the file type.
# Amount of bytes to be read from a file at a time (this is also the amount
# read for determining the file type).
BUFFER_SIZE = 32768
class _UniversalNewlinesByteStreamIter(object):
"""
The codecs module doesn't provide universal newline support. This class is
used as a stream wrapper that provides this functionality.
The wrapped stream must yield byte strings. We decode it using the given
encoding, normalise newlines, and yield UTF-8 encoded data (read method)
or lines (as iterator).
Adaptation from an old Cython version:
https://github.com/cython/cython/blob/076fac3/Cython/Utils.py
"""
normalise_newlines = re.compile('\r\n?|\n').sub
def __init__(self, stream, encoding='utf-8', buffer_size=0x1000):
# let's assume .read() doesn't change
self.stream = codecs.getreader(encoding)(stream)
self._read = self.stream.read
self.buffer_size = buffer_size
def _read_normalised(self, count=None):
count = count or self.buffer_size
data = self._read(count)
if '\r' not in data:
return data
if data.endswith('\r'):
# may be missing a '\n'
data += self._read(1)
return self.normalise_newlines('\n', data)
def _readlines(self):
buffer = []
data = self._read_normalised()
while data:
buffer.append(data)
lines = ''.join(buffer).splitlines(True)
for line in lines[:-1]:
yield line
buffer = [lines[-1]]
data = self._read_normalised()
if buffer[0]:
yield buffer[0]
def seek(self, pos):
if pos == 0:
self.stream.seek(0)
else:
raise NotImplementedError
def read(self, count=-1):
return self._read_normalised(count).encode('utf-8')
def __iter__(self):
return (line.encode('utf-8') for line in self._readlines())
class File() :
"""
Parse CSV files and spreadsheets.
......@@ -44,7 +103,6 @@ class File() :
- __init__(config, output) ; Initialise the class.
Private methods:
- __tempFileWrapper(func, handle) ; Call func() with a filename.
- __parseCsvFile(handle) ; Parse a CSV file.
- __parseXlsFile(handle) ; Parse an Excel file.
- __parseOdsFile(handle) ; Parse an OpenDocument Spreadsheet file.
......@@ -70,56 +128,48 @@ class File() :
self.__output = output #: The Output object
#__init__
def __tempFileWrapper(self, func, handle) :
def __parseCsvFile(self, handle) :
"""
Make a temporary file, put the content of a stream in it and pass
the filename to a general function. Return whatever this function
returns.
Parse a CSV file. Does not reset the file handle to start.
@arg func: general function that needs a file name as argument
@type func: function
@arg handle: A stream
@type handle: stream
@return: unknown; the output of func().
@rtype: ?
"""
write_handle, filename = tempfile.mkstemp(text=True)
# Dump the content of the stream pointed to by handle into the file.
handle.seek(0)
os.write(write_handle, handle.read())
os.close(write_handle)
# Open the file with func().
ret = func(filename)
# Apperantly apache will remove this file even when opened by the
# function *func
os.remove(filename)
return ret
#__tempFileWrapper
def __parseCsvFile(self, handle_) :
"""
Parse a CSV file.
The stream is not rewinded after use.
@arg handle: A handle to a stream
@type handle: stream
@arg handle: CSV file. Must be a seekable binary file object.
@type handle: file object
@return: list of lists
@rtype: list
"""
# We wrap the file in a temporary file just to have universal newlines
# which is not always possible to have on incoming files (thinks web
# and rpc frontends). This transparently solves the problem of Unix
# versus Windows versus Mac style newlines.
handle = tempfile.TemporaryFile('rU+w')
for chunk in handle_:
handle.write(chunk)
buf = handle.read(BUFFER_SIZE)
result = chardet.detect(buf)
handle.seek(0)
if result['confidence'] > 0.5:
encoding = result['encoding']
else:
encoding = 'utf-8'
# Python 2.7 makes it extraordinarily hard to do this correctly. We
# have a binary file object containing lines of text in a certain
# encoding with unknown style of line-endings.
#
# We want to correctly decode the file contents, accept any style of
# line-endings, parse the lines with the `csv` module, and return
# unicode strings.
#
# 1. `codecs.getreader` does not have a universal newlines mode.
# 2. `io.TextIOWrapper` cannot be wrapped around our file object,
# since it is required to be an `io.BufferedIOBase`, which it
# usually will not be.
# 3. The `csv` module cannot read unicode.
#
# Ugh.
#
# So, we use a stream wrapper that consumes byte strings, decodes to
# unicode, normalises newlines, and produces the result UTF-8 encoded.
# That's what we feed the `csv` module. We decode what it gives back
# to unicode strings. What a mess.
handle = _UniversalNewlinesByteStreamIter(handle, encoding=encoding,
buffer_size=BUFFER_SIZE)
buf = handle.read(BUFFER_SIZE)
# Default dialect
......@@ -147,41 +197,38 @@ class File() :
ret = []
for i in reader:
ret.append(i)
ret.append([c.decode('utf-8') for c in i])
handle.close()
return ret
#__parseCsvFile
def __parseXlsFile(self, handle) :
"""
Parse an Excel file.
The stream is not rewinded after use.
Parse an Excel file. Does not reset the file handle to start.
@arg handle: A handle to a stream
@type handle: stream
@arg handle: Excel file. Must be a binary file object.
@type handle: file object
@return: A list of lists
@rtype: list
"""
workBook = self.__tempFileWrapper(xlrd.open_workbook, handle)
try:
workBook = xlrd.open_workbook(file_contents=handle.read())
except xlrd.XLRDError:
return None
sheet = workBook.sheet_by_index(0)
ret = []
for i in range(sheet.nrows) :
row = []
for j in sheet.row_values(i) :
if isinstance(j, unicode):
row.append(j)
else:
row.append(j.decode('utf-8'))
row.append(j)
#for
ret.append(row)
#for
del sheet, workBook
return ret
#__parseXlsFile
......@@ -196,8 +243,8 @@ class File() :
@return: A list of lists
@rtype: list
"""
# Todo: Use a library for this.
#zipFile = self.__tempFileWrapper(zipfile.ZipFile, handle)
zipFile = zipfile.ZipFile(handle)
doc = xml.dom.minidom.parseString(zipFile.read("content.xml"))
zipFile.close()
......@@ -211,7 +258,8 @@ class File() :
row.append(c[0].lastChild.data)
#if
#for
ret.append(row)
if row:
ret.append(row)
#for
return ret
......@@ -342,8 +390,9 @@ class File() :
Get the mime type of a stream by inspecting a fixed number of bytes.
The stream is rewinded after use.
@arg handle: A handle to a stream
@type handle: stream
@arg handle: Stream to be inspected. Must be a seekable binary file
object.
@type handle: file object
@return: The mime type of a file and a textual description.
@rtype: unicode, unicode
......@@ -358,7 +407,6 @@ class File() :
MagicInstance = magic.open(magic.MAGIC_NONE)
MagicInstance.load()
description = MagicInstance.buffer(buf).decode('utf-8')
del MagicInstance
handle.seek(0)
return mimeType, description
......@@ -367,22 +415,28 @@ class File() :
def parseFileRaw(self, handle) :
"""
Check which format a stream has and parse it with the appropriate
parser if the stream is recognised.
parser if the stream is recognised. Does not reset the file handle to
start.
@arg handle: A handle to a stream
@type handle: stream
@arg handle: Input file to be parsed. Must be a seekable binary file
object.
@type handle: file object
@return: A list of lists, None if an error occured
@rtype: list
"""
mimeType = self.getMimeType(handle)
if mimeType[0] == "text/plain" :
if mimeType[0] == "text/plain":
return self.__parseCsvFile(handle)
if mimeType[0] == "application/vnd.ms-office" :
if mimeType[0] in ('application/vnd.ms-excel',
'application/vnd.ms-office',
'application/msword',
'application/zip'):
return self.__parseXlsFile(handle)
if mimeType == ("application/octet-stream",
"OpenDocument Spreadsheet") :
if (mimeType[0] == 'application/vnd.oasis.opendocument.spreadsheet' or
mimeType[1] in ('OpenDocument Spreadsheet',
'OpenOffice.org 1.x Calc spreadsheet')):
return self.__parseOdsFile(handle)
return None
......@@ -391,10 +445,12 @@ class File() :
def parseBatchFile(self, handle) :
"""
Check which format a stream has and parse it with the appropriate
parser if the stream is recognised.
parser if the stream is recognised. Does not reset the file handle to
start.
@arg handle: A handle to a stream
@type handle: stream
@arg handle: Batch job input file. Must be a seekable binary file
object.
@type handle: file object
@return: A sanitised list of lists (without a header or empty lines)
(or None if an error occured) and the number of columns.
......
......@@ -17,6 +17,7 @@ Module used to add and manage the Batch Jobs.
from __future__ import unicode_literals
import io
import os # os.path.exists
import smtplib # smtplib.STMP
from email.mime.text import MIMEText # MIMEText
......@@ -449,11 +450,11 @@ Mutalyzer batch scheduler""" % url)
'Affected Proteins',
'Restriction Sites Created',
'Restriction Sites Deleted']
handle = open(filename, 'a')
handle = io.open(filename, mode='a', encoding='utf-8')
handle.write("%s\n" % "\t".join(header))
#if
else :
handle = open(filename, 'a')
handle = io.open(filename, mode='a', encoding='utf-8')
if flags and 'C' in flags:
separator = '\t'
......@@ -508,11 +509,11 @@ Mutalyzer batch scheduler""" % url)
# header above it. The header is read from the config file as
# a list. We need a tab delimited string.
header = ['Input', 'Status']
handle = open(filename, 'a')
handle = io.open(filename, mode='a', encoding='utf-8')
handle.write("%s\n" % "\t".join(header))
#if
else :
handle = open(filename, 'a')
handle = io.open(filename, mode='a', encoding='utf-8')
if flags and 'C' in flags:
separator = '\t'
......@@ -620,11 +621,11 @@ Mutalyzer batch scheduler""" % url)
'Errors',
'Chromosomal Variant',
'Coding Variant(s)']
handle = open(filename, 'a')
handle = io.open(filename, mode='a', encoding='utf-8')
handle.write("%s\n" % "\t".join(header))
#if
else :
handle = open(filename, 'a')
handle = io.open(filename, mode='a', encoding='utf-8')
if flags and 'C' in flags:
separator = '\t'
......@@ -682,11 +683,11 @@ Mutalyzer batch scheduler""" % url)
header = ['Input Variant',
'HGVS description(s)',
'Errors and warnings']
handle = open(filename, 'a')
handle = io.open(filename, mode='a', encoding='utf-8')
handle.write("%s\n" % "\t".join(header))
#if
else :
handle = open(filename, 'a')
handle = io.open(filename, mode='a', encoding='utf-8')
if flags and 'C' in flags:
separator = '\t'
......
......@@ -16,9 +16,9 @@ from spyne.service import ServiceBase
from spyne.model.primitive import Integer, Boolean, DateTime, Unicode
from spyne.model.complex import Array
from spyne.model.fault import Fault
import io
import os
import socket
from io import BytesIO
from operator import attrgetter
from sqlalchemy.orm.exc import NoResultFound
......@@ -91,6 +91,12 @@ class MutalyzerService(ServiceBase):
'The process argument must be one of %s.'
% ', '.join(batch_types))
# The Python type for `data` should be a sequence of `str` objects,
# but it seems we sometimes just get one `str` object. Perhaps only in
# the unit tests, but let's fix that anyway.
if isinstance(data, str):
data = [data]
# Note that the max file size check below might be bogus, since Spyne
# first checks the total request size, which by default has a maximum
# of 2 megabytes.
......@@ -104,7 +110,7 @@ class MutalyzerService(ServiceBase):
'Only files up to %d megabytes are accepted.'
% (settings.MAX_FILE_SIZE // 1048576))
batch_file = BytesIO()
batch_file = io.BytesIO()
for d in data:
batch_file.write(d)
......
......@@ -773,7 +773,9 @@ def batch_jobs_submit():
"""
job_type = request.form.get('job_type')
email = request.form.get('email')
file = request.files.get('file')
# Note that this is always a seekable binary file object.
batch_file = request.files.get('file')
assemblies = Assembly.query \
.order_by(Assembly.taxonomy_common_name.asc(),
......@@ -809,7 +811,7 @@ def batch_jobs_submit():
scheduler = Scheduler.Scheduler()
file_instance = File.File(output)
job, columns = file_instance.parseBatchFile(file)
job, columns = file_instance.parseBatchFile(batch_file)
if job is None:
errors.append('Could not parse input file, please check your '
......
......@@ -21,3 +21,4 @@ mock==1.0.1
alembic==0.6.3
Sphinx==1.2.1
sphinx-rtd-theme==0.1.5
cchardet==0.3.5
File added
File added
File added
File added
File added
......@@ -30,12 +30,10 @@ class TestScheduler(MutalyzerTest):
"""
fixtures = (database, )
@staticmethod
def _batch_job(variants, expected, job_type, argument=None):
def _batch_job(self, batch_file, expected, job_type, argument=None):
file_instance = File.File(output.Output('test'))
scheduler = Scheduler.Scheduler()
batch_file = io.BytesIO(('\n'.join(variants) + '\n').encode('utf-8'))
job, columns = file_instance.parseBatchFile(batch_file)
result_id = scheduler.addJob('test@test.test', job, columns,
job_type, argument=argument)
......@@ -43,7 +41,7 @@ class TestScheduler(MutalyzerTest):
batch_job = BatchJob.query.filter_by(result_id=result_id).one()
left = batch_job.batch_queue_items.count()
assert left == len(variants)
assert left == len(expected)
scheduler.process()
......@@ -56,6 +54,10 @@ class TestScheduler(MutalyzerTest):
next(result) # Header.
assert expected == [line.strip().split('\t') for line in result]
def _batch_job_plain_text(self, variants, expected, job_type, argument=None):
batch_file = io.BytesIO(('\n'.join(variants) + '\n').encode('utf-8'))
self._batch_job(batch_file, expected, job_type, argument=argument)
def test_syntax_checker(self):
"""
Simple syntax checker batch job.
......@@ -66,7 +68,7 @@ class TestScheduler(MutalyzerTest):
'OK'],
['AL449423.14(CDKN2A_v002):c.5_400del',
'OK']]
self._batch_job(variants, expected, 'syntax-checker')
self._batch_job_plain_text(variants, expected, 'syntax-checker')
@fix(cache('AB026906.1', 'NM_000059.3'))
def test_name_checker(self):
......@@ -112,7 +114,7 @@ class TestScheduler(MutalyzerTest):
'NM_000059.3(BRCA2_i001):p.(Asp224Tyr)',
'',
'BspHI,CviAII,FatI,Hpy188III,NlaIII']]
self._batch_job(variants, expected, 'name-checker')
self._batch_job_plain_text(variants, expected, 'name-checker')
def test_name_checker_altered(self):
"""
......@@ -189,7 +191,7 @@ class TestScheduler(MutalyzerTest):
return bz2.BZ2File(path)
with patch.object(Entrez, 'efetch', mock_efetch):
self._batch_job(variants, expected, 'name-checker')
self._batch_job_plain_text(variants, expected, 'name-checker')
@fix(cache('NM_000059.3'))
def test_name_checker_skipped(self):
......@@ -230,7 +232,7 @@ class TestScheduler(MutalyzerTest):
raise IOError()
with patch.object(Entrez, 'efetch', mock_efetch):
self._batch_job(variants, expected, 'name-checker')
self._batch_job_plain_text(variants, expected, 'name-checker')
@fix(hg19, hg19_transcript_mappings)
def test_position_converter(self):
......@@ -244,4 +246,77 @@ class TestScheduler(MutalyzerTest):
'NM_003002.2:c.274G>T',
'NM_012459.2:c.-2203C>A',
'NR_028383.1:n.-2173C>A']]
self._batch_job(variants, expected, 'position-converter', 'hg19')
self._batch_job_plain_text(variants, expected, 'position-converter', 'hg19')
def test_ods_file(self):
"""
OpenDocument Spreadsheet input for batch job.
"""
path = os.path.join(os.path.dirname(os.path.realpath(__file__)),
'data',
'batch_input.ods')
batch_file = open(path, 'rb')
expected = [['AB026906.1:c.274G>T',
'OK'],
['AL449423.14(CDKN2A_v002):c.5_400del',
'OK']]
self._batch_job(batch_file, expected, 'syntax-checker')
def test_sxc_file(self):
"""
OpenOffice.org 1.x Calc spreadsheet input for batch job.
"""
path = os.path.join(os.path.dirname(os.path.realpath(__file__)),
'data',
'batch_input.sxc')
batch_file = open(path, 'rb')
expected = [['AB026906.1:c.274G>T',
'OK'],
['AL449423.14(CDKN2A_v002):c.5_400del',
'OK']]
self._batch_job(batch_file, expected, 'syntax-checker')
def test_xls_file(self):
"""
Microsoft Excel 97/2000/XP/2003 input for batch job.
"""
path = os.path.join(os.path.dirname(os.path.realpath(__file__)),
'data',
'batch_input.xls')
batch_file = open(path, 'rb')
expected = [['AB026906.1:c.274G>T',
'OK'],
['AL449423.14(CDKN2A_v002):c.5_400del',
'OK']]
self._batch_job(batch_file, expected, 'syntax-checker')
def test_xlsx_file(self):
"""
Office Open XML Spreadsheet input for batch job.
"""
path = os.path.join(os.path.dirname(os.path.realpath(__file__)),
'data',
'batch_input.xlsx')
batch_file = open(path, 'rb')
expected = [['AB026906.1:c.274G>T',
'OK'],
['AL449423.14(CDKN2A_v002):c.5_400del',
'OK']]
self._batch_job(batch_file, expected, 'syntax-checker')
def test_invalid_zip_file(self):
"""
Random zip file input for batch job (invalid).
"""
path = os.path.join(os.path.dirname(os.path.realpath(__file__)),
'data',
'image.zip')
batch_file = open(path, 'rb')
file_instance = File.File(output.Output('test'))
job, columns = file_instance.parseBatchFile(batch_file)
assert job is None
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment