Skip to content
Snippets Groups Projects
Commit b80ac8ec authored by Vermaat's avatar Vermaat
Browse files

Merge pull request #94 from mutalyzer/redis-links

Cache transcript protein links in Redis
parents 4e4798c5 473c732c
No related branches found
No related tags found
No related merge requests found
......@@ -126,7 +126,7 @@ REDIS_URI
Redis connection URI (can be any `redis-py
<https://github.com/andymccurdy/redis-py>`_ connection URI). Set to `None`
to silently use a mock Redis. Redis is only used for non-essential
features.
features such as caching of external resources.
`Default value:` `None`
......@@ -235,17 +235,11 @@ DEFAULT_ASSEMBLY
`Default value:` ``hg19``
PROTEIN_LINK_EXPIRATION
Expiration time for cached transcript<->protein links from the NCBI (in
seconds).
`Default value:` `60 * 60 * 24 * 30` (30 days)
NEGATIVE_PROTEIN_LINK_EXPIRATION
Expiration time for cached negative transcript<->protein links from the NCBI
NEGATIVE_LINK_CACHE_EXPIRATION
Cache expiration time for negative transcript<->protein links from the NCBI
(in seconds).
`Default value:` `60 * 60 * 24 * 5` (5 days)
`Default value:` `60 * 60 * 24 * 30` (30 days)
USE_RELOADER
Enable the `Werkzeug reloader
......
......@@ -85,7 +85,8 @@ Mutalyzer uses Redis for non-critical fast storage such as statistics::
$ sudo apt-get install redis-server
.. note:: Redis is a soft dependency, meaning that Mutalyzer will run without
it (but may lack some non-essential features).
it (but may lack some non-essential features such as caching of external
resources).
.. _install-virtualenv:
......
"""Copy transcript protein links to redis
Revision ID: 225a8b4c3902
Revises: 3492d2ee8884
Create Date: 2015-10-15 14:11:22.961417
"""
from __future__ import unicode_literals
# revision identifiers, used by Alembic.
revision = '225a8b4c3902'
down_revision = u'3492d2ee8884'
from datetime import datetime, timedelta
import redis
from alembic import op
from sqlalchemy import and_, or_, sql
import sqlalchemy as sa
from mutalyzer.config import settings
def upgrade():
if settings.REDIS_URI is None:
return
connection = op.get_bind()
redis_client = redis.StrictRedis.from_url(settings.REDIS_URI,
decode_responses=True,
encoding='utf-8')
transcript_protein_links = sql.table(
'transcript_protein_links',
sql.column('transcript_accession', sa.String(30)),
sql.column('protein_accession', sa.String(30)),
sql.column('added', sa.DateTime)
)
negative_link_datetime = datetime.now() - \
timedelta(seconds=settings.NEGATIVE_LINK_CACHE_EXPIRATION)
result = connection.execute(transcript_protein_links.select().where(
or_(and_(transcript_protein_links.c.transcript_accession.isnot(None),
transcript_protein_links.c.protein_accession.isnot(None)),
transcript_protein_links.c.added >= negative_link_datetime)
).with_only_columns([transcript_protein_links.c.transcript_accession,
transcript_protein_links.c.protein_accession]))
while True:
chunk = result.fetchmany(1000)
if not chunk:
break
pipe = redis_client.pipeline(transaction=False)
for row in chunk:
transcript_accession, protein_accession = row
if transcript_accession is not None:
key = 'ncbi:transcript-to-protein:%s' % transcript_accession
if protein_accession is not None:
pipe.set(key, protein_accession)
else:
pipe.setex(key, settings.NEGATIVE_LINK_CACHE_EXPIRATION,
'')
if protein_accession is not None:
key = 'ncbi:protein-to-transcript:%s' % protein_accession
if transcript_accession is not None:
pipe.set(key, transcript_accession)
else:
pipe.setex(key, settings.NEGATIVE_LINK_CACHE_EXPIRATION,
'')
pipe.execute()
def downgrade():
### commands auto generated by Alembic - please adjust! ###
pass
### end Alembic commands ###
......@@ -65,12 +65,9 @@ LRG_PREFIX_URL = 'ftp://ftp.ebi.ac.uk/pub/databases/lrgex/SCHEMA_1_7_ARCHIVE/'
# Allow for this fraction of errors in batch jobs.
BATCH_JOBS_ERROR_THRESHOLD = 0.05
# Expiration time for transcript<->protein links from the NCBI (in seconds).
PROTEIN_LINK_EXPIRATION = 60 * 60 * 24 * 30
# Expiration time for negative transcript<->protein links from the NCBI (in
# seconds).
NEGATIVE_PROTEIN_LINK_EXPIRATION = 60 * 60 * 24 * 5
# Cache expiration time for negative transcript<->protein links from the NCBI
# (in seconds).
NEGATIVE_LINK_CACHE_EXPIRATION = 60 * 60 * 24 * 30
# URL to the SOAP webservice WSDL document. Used to build the WSDL document
# and for linking to it from the documentation page on the website.
......
......@@ -9,14 +9,8 @@ Queries on database models.
from __future__ import unicode_literals
from datetime import datetime, timedelta
from sqlalchemy import and_, or_
import sqlalchemy.exc
from mutalyzer.config import settings
from mutalyzer.db import session
from mutalyzer.db.models import BatchQueueItem, TranscriptProteinLink
from mutalyzer.db.models import BatchQueueItem
def pop_batch_queue_item(batch_job):
......@@ -54,85 +48,3 @@ def pop_batch_queue_item(batch_job):
session.commit()
return item, flags
def get_transcript_protein_link(accession, reverse=False):
"""
Get a cached link between a transcript and a protein that is not expired
according to the configuration settings `PROTEIN_LINK_EXPIRATION` and
`NEGATIVE_PROTEIN_LINK_EXPIRATION`.
:arg str accession: Accession number (without version number) to lookup
link for.
:arg bool reverse: If `True`, `accession` is assumed to be a protein
accession number, otherwise `accession` is assumed to be a transcript
accession number.
Note that the link may be negative, i.e., the knowledge that no link
exists can also be cached. In that case, the `protein_accession` field of
the resulting `TranscriptProteinLink` object is `None`.
Returns `None` if no link (positive or negative) is found.
"""
link_datetime = datetime.now() - \
timedelta(seconds=settings.PROTEIN_LINK_EXPIRATION)
negative_link_datetime = datetime.now() - \
timedelta(seconds=settings.NEGATIVE_PROTEIN_LINK_EXPIRATION)
# Query column must have `accession`, other column has the value we're
# probably interested in.
query_column = TranscriptProteinLink.transcript_accession
other_column = TranscriptProteinLink.protein_accession
if reverse:
# Lookup by protein accession instead of transcript accession.
query_column, other_column = other_column, query_column
return TranscriptProteinLink.query.filter(
query_column == accession,
or_(and_(other_column.isnot(None),
TranscriptProteinLink.added >= link_datetime),
and_(other_column.is_(None),
TranscriptProteinLink.added >= negative_link_datetime))
).first()
def update_transcript_protein_link(transcript_accession=None,
protein_accession=None):
"""
Update cached link between a transcript and a protein, or create it if it
doesn't exist yet.
:arg str transcript_accession: Transcript accession number (without
version number).
:arg str protein_accession: Protein accession number (without version
number).
At least one of `transcript_accession` or `protein_accession` must be not
`None`.
"""
if transcript_accession is None and protein_accession is None:
raise ValueError('Link must have a transcript or protein')
# Filter clauses to find links for either of the given accession numbers.
clauses = []
if transcript_accession is not None:
clauses.append(TranscriptProteinLink.transcript_accession ==
transcript_accession)
if protein_accession is not None:
clauses.append(TranscriptProteinLink.protein_accession ==
protein_accession)
# Delete any related existing links.
TranscriptProteinLink.query.filter(or_(*clauses)).delete()
session.commit()
# There is a race condition here between deleting old links and adding the
# new one. It's extremely unlikely to go wrong, and we can safely ignore
# it anyway.
link = TranscriptProteinLink(transcript_accession, protein_accession)
try:
session.add(link)
session.commit()
except sqlalchemy.exc.IntegrityError:
session.rollback()
from Bio import Entrez
"""
Communication with the NCBI.
"""
from .config import settings
from .db import queries
import functools
def transcript_to_protein(transcript_accession):
"""
Try to find the protein linked to a transcript.
from Bio import Entrez
First look in our database. If a link cannot be found, try to retrieve it
from the NCBI. Add the result to our database.
from .config import settings
from .redisclient import client as redis
:arg str transcript_accession: Accession number of the transcript for
which we want to find the protein (without version number).
:returns: Accession number of a protein (without version number) or `None`
if no link can be found.
def _get_link(source_accession, source_db, target_db, match_link_name):
"""
Retrieve a linked accession number from the NCBI.
:arg str source_accession: Accession number for which we want to find a
link (without version number).
:arg str source_db: NCBI source database.
:arg str target_db: NCBI target database.
:arg function match_link_name: For each link found, this function is
called with the link name (`str`) and it should return `True` iff the
link is to be used.
:returns: Linked accession number (without version number) or `None` if no
link can be found.
:rtype: str
"""
Entrez.email = settings.EMAIL
link = queries.get_transcript_protein_link(transcript_accession)
if link is not None:
return link.protein_accession
handle = Entrez.esearch(db='nucleotide', term=transcript_accession)
handle = Entrez.esearch(db=source_db, term=source_accession)
try:
result = Entrez.read(handle)
except Entrez.Parser.ValidationError:
# Todo: Log this error.
return None
finally:
handle.close()
transcript_gi = unicode(result['IdList'][0])
try:
source_gi = unicode(result['IdList'][0])
except IndexError:
return None
handle = Entrez.elink(dbfrom='nucleotide', db='protein', id=transcript_gi)
handle = Entrez.elink(dbfrom=source_db, db=target_db, id=source_gi)
try:
result = Entrez.read(handle)
except Entrez.Parser.ValidationError:
# Todo: Log this error.
return None
finally:
handle.close()
if not result[0]['LinkSetDb']:
# We also cache negative results.
queries.update_transcript_protein_link(
transcript_accession=transcript_accession)
return None
protein_gi = unicode(result[0]['LinkSetDb'][0]['Link'][0]['Id'])
for link in result[0]['LinkSetDb']:
if match_link_name(unicode(link['LinkName'])):
target_gi = unicode(link['Link'][0]['Id'])
break
else:
return None
handle = Entrez.efetch(
db='protein', id=protein_gi, rettype='acc', retmode='text')
protein_accession = unicode(handle.read()).split('.')[0]
db=target_db, id=target_gi, rettype='acc', retmode='text')
target_accession = unicode(handle.read()).split('.')[0]
handle.close()
return target_accession
queries.update_transcript_protein_link(
transcript_accession=transcript_accession,
protein_accession=protein_accession)
return protein_accession
def cache_link(source, target):
"""
Decorator to add caching to link retrieval.
def protein_to_transcript(protein_accession):
:arg str source: Source database (used to construct cache key).
:arg str target: Target database (used to construct cache key).
"""
Try to find the transcript linked to a protein.
forward_key = 'ncbi:%s-to-%s:%%s' % (source, target)
reverse_key = 'ncbi:%s-to-%s:%%s' % (target, source)
First look in our database. If a link cannot be found, try to retrieve it
from the NCBI. Add the result to our database.
def cache_source_to_target(f):
@functools.wraps(f)
def cached_f(accession):
result = redis.get(forward_key % accession)
if result is not None:
# The empty string is a cached negative result, which we return as
# `None`.
return result or None
:arg str protein_accession: Accession number of the protein for which we
want to find the transcript (without version number).
result = f(accession)
:returns: Accession number of a transcript (without version number) or
`None` if no link can be found.
:rtype: str
if result is None:
redis.setex(forward_key % accession,
settings.NEGATIVE_LINK_CACHE_EXPIRATION, '')
return None
# We store the resulting link in both directions.
redis.set(forward_key % accession, result)
redis.set(reverse_key % result, accession)
return result
return cached_f
return cache_source_to_target
@cache_link('transcript', 'protein')
def transcript_to_protein(transcript_accession):
"""
Entrez.email = settings.EMAIL
Try to find the protein linked to a transcript.
link = queries.get_transcript_protein_link(protein_accession, reverse=True)
if link is not None:
return link.transcript_accession
Links are retrieved from the NCBI using their Entrez API and cached in
Redis. Negative results (accession or link could not be found) are also
cached, but expire after `NEGATIVE_LINK_CACHE_EXPIRATION` seconds.
handle = Entrez.esearch(db='protein', term=protein_accession)
try:
result = Entrez.read(handle)
except Entrez.Parser.ValidationError:
# Todo: Log this error.
return None
finally:
handle.close()
:arg str transcript_accession: Accession number of the transcript for
which we want to find the protein (without version number).
if not result['IdList']:
return None
protein_gi = unicode(result['IdList'][0])
:returns: Accession number of a protein (without version number) or `None`
if no link can be found.
:rtype: str
"""
return _get_link(
transcript_accession, 'nucleotide', 'protein',
lambda link: link in ('nuccore_protein', 'nuccore_protein_cds'))
handle = Entrez.elink(dbfrom='protein', db='nucleotide', id=protein_gi)
try:
result = Entrez.read(handle)
except Entrez.Parser.ValidationError:
# Todo: Log this error.
return None
finally:
handle.close()
if not result[0]['LinkSetDb']:
# We also cache negative results.
queries.update_transcript_protein_link(
protein_accession=protein_accession)
return None
@cache_link('protein', 'transcript')
def protein_to_transcript(protein_accession):
"""
Try to find the transcript linked to a protein.
transcript_gi = ''
for link in result[0]['LinkSetDb']:
if unicode(link['LinkName']) == 'protein_nuccore_mrna':
transcript_gi = unicode(link['Link'][0]['Id'])
break
Links are retrieved from the NCBI using their Entrez API and cached in
Redis. Negative results (accession or link could not be found) are also
cached, but expire after `NEGATIVE_LINK_CACHE_EXPIRATION` seconds.
handle = Entrez.efetch(
db='nucleotide', id=transcript_gi, rettype='acc', retmode='text')
transcript_accession = unicode(handle.read()).split('.')[0]
handle.close()
:arg str protein_accession: Accession number of the protein for which we
want to find the transcript (without version number).
queries.update_transcript_protein_link(
transcript_accession=transcript_accession,
protein_accession=protein_accession)
return transcript_accession
:returns: Accession number of a transcript (without version number) or
`None` if no link can be found.
:rtype: str
"""
return _get_link(
protein_accession, 'protein', 'nucleotide',
lambda link: link == 'protein_nuccore_mrna')
......@@ -21,7 +21,7 @@ from __future__ import unicode_literals
import time
from mutalyzer.redisclient import client
from .redisclient import client as redis
# Label, bucket definition, expiration time in seconds.
......@@ -34,7 +34,7 @@ def increment_counter(counter):
"""
Increment the specified counter.
"""
pipe = client.pipeline(transaction=False)
pipe = redis.pipeline(transaction=False)
pipe.incr('counter:%s:total' % counter)
for label, bucket, expire in INTERVALS:
......@@ -56,9 +56,9 @@ def get_totals():
Known counters are just those that have been incremented at least once.
"""
counters = client.keys('counter:*:total')
counters = redis.keys('counter:*:total')
pipe = client.pipeline(transaction=False)
pipe = redis.pipeline(transaction=False)
for counter in counters:
pipe.get(counter)
......
......@@ -15,7 +15,7 @@ from mutalyzer.config import settings as _settings
from mutalyzer.output import Output
from mutalyzer.redisclient import client as redis
from mutalyzer.db.models import (Assembly, Chromosome, Reference,
TranscriptMapping, TranscriptProteinLink)
TranscriptMapping)
from mutalyzer import db as _db
......@@ -96,15 +96,28 @@ def references(request, settings, db, available_references):
entry['filename'])
shutil.copy(path, settings.CACHE_DIR)
reference = Reference(
accession, entry['checksum'], geninfo_identifier=geninfo_id)
db.session.add(reference)
for transcript, protein in entry.get('links', []):
db.session.add(TranscriptProteinLink(transcript, protein))
references.append(reference)
references.append(Reference(
accession, entry['checksum'], geninfo_identifier=geninfo_id))
for transcript_accession, protein_accession in entry.get('links', []):
if transcript_accession is not None:
key = 'ncbi:transcript-to-protein:%s' % transcript_accession
if protein_accession is not None:
redis.set(key, protein_accession)
else:
redis.setex(key,
settings.NEGATIVE_LINK_CACHE_EXPIRATION,
'')
if protein_accession is not None:
key = 'ncbi:protein-to-transcript:%s' % protein_accession
if transcript_accession is not None:
redis.set(key, transcript_accession)
else:
redis.setex(key,
settings.NEGATIVE_LINK_CACHE_EXPIRATION,
'')
db.session.add_all(references)
db.session.commit()
return references
......
"""
Tests for the mutalyzer.db.queries module.
"""
from __future__ import unicode_literals
import pytest
from mutalyzer.db import queries
pytestmark = [
pytest.mark.usefixtures('references'),
pytest.mark.parametrize('references', [['MARK1']], indirect=True)
]
def test_get_transcript_protein_link():
"""
Query a transcript-protein link by transcript.
"""
link = queries.get_transcript_protein_link('NM_018650')
assert link.transcript_accession == 'NM_018650'
assert link.protein_accession == 'NP_061120'
def test_get_transcript_protein_link_negative():
"""
Query a negative transcript-protein link by transcript.
"""
link = queries.get_transcript_protein_link('XM_005273133')
assert link.transcript_accession == 'XM_005273133'
assert link.protein_accession is None
def test_get_transcript_protein_link_missing():
"""
Query a missing transcript-protein link by transcript.
"""
link = queries.get_transcript_protein_link('NM_123456')
assert link is None
def test_get_transcript_protein_link_reverse():
"""
Query a transcript-protein link by protein.
"""
link = queries.get_transcript_protein_link('NP_061120', reverse=True)
assert link.transcript_accession == 'NM_018650'
assert link.protein_accession == 'NP_061120'
def test_get_transcript_protein_link_reverse_missing():
"""
Query a missing transcript-protein link by protein.
"""
link = queries.get_transcript_protein_link('NP_123456')
assert link is None
......@@ -5,6 +5,8 @@ Test database migrations.
from __future__ import unicode_literals
from datetime import datetime
import alembic.autogenerate
import alembic.command
import alembic.config
......@@ -91,7 +93,8 @@ def add_database_content(connection):
transcript_protein_links = sql.table(
'transcript_protein_links',
sql.column('transcript_accession', sa.String(30)),
sql.column('protein_accession', sa.String(30)))
sql.column('protein_accession', sa.String(30)),
sql.column('added', sa.DateTime))
# Add some common data.
connection.execute(
......@@ -170,8 +173,10 @@ def add_database_content(connection):
connection.execute(
transcript_protein_links.insert(),
transcript_accession='NM_052818',
protein_accession='NP_438169')
protein_accession='NP_438169',
added=datetime.now())
connection.execute(
transcript_protein_links.insert(),
transcript_accession='NM_001079691',
protein_accession=None)
protein_accession=None,
added=datetime.now())
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment