From 473c732c7d53b0a9c68e7d0538bbff10d0823971 Mon Sep 17 00:00:00 2001
From: Martijn Vermaat <martijn@vermaat.name>
Date: Fri, 16 Oct 2015 14:08:00 +0200
Subject: [PATCH] Cache transcript protein links in Redis

Caching of transcript protein links received from the NCBI Entrez
service is a typical use case for Redis. This implements this cache
in Redis and removes all use of our original database table.

An Alembic migration copies all existing links from the database to
Redis. The original `TranscriptProteinLink` database table is not
dropped. This will be done in a future migration to ensure running
processes don't error and to provide a rollback scenario.

We also remove the expiration of links (originally defaulting to 30
days), since we don't expect them to ever change. Negative links
(caching a 'not found' result from Entrez) *are* still expiring,
but with a longer default of 30 days (was 5 days).

The configuration setting for the latter was renamed, yielding the
following changes in the default configuration settings.

Removed default settings:

    # Expiration time for transcript<->protein links from the NCBI (in seconds).
    PROTEIN_LINK_EXPIRATION = 60 * 60 * 24 * 30

    # Expiration time for negative transcript<->protein links from the NCBI (in
    # seconds).
    NEGATIVE_PROTEIN_LINK_EXPIRATION = 60 * 60 * 24 * 5

Added default setting:

    # Cache expiration time for negative transcript<->protein links from the NCBI
    # (in seconds).
    NEGATIVE_LINK_CACHE_EXPIRATION = 60 * 60 * 24 * 30
---
 doc/config.rst                                |  14 +-
 doc/install.rst                               |   3 +-
 ..._copy_transcript_protein_links_to_redis.py |  84 ++++++++
 mutalyzer/config/default_settings.py          |   9 +-
 mutalyzer/db/queries.py                       |  90 +--------
 mutalyzer/ncbi.py                             | 180 ++++++++++--------
 tests/fixtures.py                             |  33 +++-
 tests/test_db_queries.py                      |  59 ------
 tests/test_migrations.py                      |  11 +-
 9 files changed, 223 insertions(+), 260 deletions(-)
 create mode 100644 migrations/versions/225a8b4c3902_copy_transcript_protein_links_to_redis.py
 delete mode 100644 tests/test_db_queries.py

diff --git a/doc/config.rst b/doc/config.rst
index 8d4192f0..f956fee5 100644
--- a/doc/config.rst
+++ b/doc/config.rst
@@ -126,7 +126,7 @@ REDIS_URI
   Redis connection URI (can be any `redis-py
   <https://github.com/andymccurdy/redis-py>`_ connection URI). Set to `None`
   to silently use a mock Redis. Redis is only used for non-essential
-  features.
+  features such as caching of external resources.
 
   `Default value:` `None`
 
@@ -235,17 +235,11 @@ DEFAULT_ASSEMBLY
 
   `Default value:` ``hg19``
 
-PROTEIN_LINK_EXPIRATION
-  Expiration time for cached transcript<->protein links from the NCBI (in
-  seconds).
-
-  `Default value:` `60 * 60 * 24 * 30` (30 days)
-
-NEGATIVE_PROTEIN_LINK_EXPIRATION
-  Expiration time for cached negative transcript<->protein links from the NCBI
+NEGATIVE_LINK_CACHE_EXPIRATION
+  Cache expiration time for negative transcript<->protein links from the NCBI
   (in seconds).
 
-  `Default value:` `60 * 60 * 24 * 5` (5 days)
+  `Default value:` `60 * 60 * 24 * 30` (30 days)
 
 USE_RELOADER
   Enable the `Werkzeug reloader
diff --git a/doc/install.rst b/doc/install.rst
index 1a9f1118..c070d0fa 100644
--- a/doc/install.rst
+++ b/doc/install.rst
@@ -85,7 +85,8 @@ Mutalyzer uses Redis for non-critical fast storage such as statistics::
     $ sudo apt-get install redis-server
 
 .. note:: Redis is a soft dependency, meaning that Mutalyzer will run without
-    it (but may lack some non-essential features).
+    it (but may lack some non-essential features such as caching of external
+    resources).
 
 
 .. _install-virtualenv:
diff --git a/migrations/versions/225a8b4c3902_copy_transcript_protein_links_to_redis.py b/migrations/versions/225a8b4c3902_copy_transcript_protein_links_to_redis.py
new file mode 100644
index 00000000..32484f94
--- /dev/null
+++ b/migrations/versions/225a8b4c3902_copy_transcript_protein_links_to_redis.py
@@ -0,0 +1,84 @@
+"""Copy transcript protein links to redis
+
+Revision ID: 225a8b4c3902
+Revises: 3492d2ee8884
+Create Date: 2015-10-15 14:11:22.961417
+
+"""
+
+from __future__ import unicode_literals
+
+# revision identifiers, used by Alembic.
+revision = '225a8b4c3902'
+down_revision = u'3492d2ee8884'
+
+from datetime import datetime, timedelta
+
+import redis
+from alembic import op
+from sqlalchemy import and_, or_, sql
+import sqlalchemy as sa
+
+from mutalyzer.config import settings
+
+
+def upgrade():
+    if settings.REDIS_URI is None:
+        return
+
+    connection = op.get_bind()
+
+    redis_client = redis.StrictRedis.from_url(settings.REDIS_URI,
+                                              decode_responses=True,
+                                              encoding='utf-8')
+
+    transcript_protein_links = sql.table(
+        'transcript_protein_links',
+        sql.column('transcript_accession', sa.String(30)),
+        sql.column('protein_accession', sa.String(30)),
+        sql.column('added', sa.DateTime)
+    )
+
+    negative_link_datetime = datetime.now() - \
+        timedelta(seconds=settings.NEGATIVE_LINK_CACHE_EXPIRATION)
+
+    result = connection.execute(transcript_protein_links.select().where(
+        or_(and_(transcript_protein_links.c.transcript_accession.isnot(None),
+                 transcript_protein_links.c.protein_accession.isnot(None)),
+            transcript_protein_links.c.added >= negative_link_datetime)
+    ).with_only_columns([transcript_protein_links.c.transcript_accession,
+                         transcript_protein_links.c.protein_accession]))
+
+    while True:
+        chunk = result.fetchmany(1000)
+        if not chunk:
+            break
+
+        pipe = redis_client.pipeline(transaction=False)
+
+        for row in chunk:
+            transcript_accession, protein_accession = row
+
+            if transcript_accession is not None:
+                key = 'ncbi:transcript-to-protein:%s' % transcript_accession
+                if protein_accession is not None:
+                    pipe.set(key, protein_accession)
+                else:
+                    pipe.setex(key, settings.NEGATIVE_LINK_CACHE_EXPIRATION,
+                               '')
+
+            if protein_accession is not None:
+                key = 'ncbi:protein-to-transcript:%s' % protein_accession
+                if transcript_accession is not None:
+                    pipe.set(key, transcript_accession)
+                else:
+                    pipe.setex(key, settings.NEGATIVE_LINK_CACHE_EXPIRATION,
+                               '')
+
+        pipe.execute()
+
+
+def downgrade():
+    ### commands auto generated by Alembic - please adjust! ###
+    pass
+    ### end Alembic commands ###
diff --git a/mutalyzer/config/default_settings.py b/mutalyzer/config/default_settings.py
index be79de5c..06ba09ea 100644
--- a/mutalyzer/config/default_settings.py
+++ b/mutalyzer/config/default_settings.py
@@ -65,12 +65,9 @@ LRG_PREFIX_URL = 'ftp://ftp.ebi.ac.uk/pub/databases/lrgex/SCHEMA_1_7_ARCHIVE/'
 # Allow for this fraction of errors in batch jobs.
 BATCH_JOBS_ERROR_THRESHOLD = 0.05
 
-# Expiration time for transcript<->protein links from the NCBI (in seconds).
-PROTEIN_LINK_EXPIRATION = 60 * 60 * 24 * 30
-
-# Expiration time for negative transcript<->protein links from the NCBI (in
-# seconds).
-NEGATIVE_PROTEIN_LINK_EXPIRATION = 60 * 60 * 24 * 5
+# Cache expiration time for negative transcript<->protein links from the NCBI
+# (in seconds).
+NEGATIVE_LINK_CACHE_EXPIRATION = 60 * 60 * 24 * 30
 
 # URL to the SOAP webservice WSDL document. Used to build the WSDL document
 # and for linking to it from the documentation page on the website.
diff --git a/mutalyzer/db/queries.py b/mutalyzer/db/queries.py
index 7f28cf5e..1ff1e60c 100644
--- a/mutalyzer/db/queries.py
+++ b/mutalyzer/db/queries.py
@@ -9,14 +9,8 @@ Queries on database models.
 
 from __future__ import unicode_literals
 
-from datetime import datetime, timedelta
-
-from sqlalchemy import and_, or_
-import sqlalchemy.exc
-
-from mutalyzer.config import settings
 from mutalyzer.db import session
-from mutalyzer.db.models import BatchQueueItem, TranscriptProteinLink
+from mutalyzer.db.models import BatchQueueItem
 
 
 def pop_batch_queue_item(batch_job):
@@ -54,85 +48,3 @@ def pop_batch_queue_item(batch_job):
     session.commit()
 
     return item, flags
-
-
-def get_transcript_protein_link(accession, reverse=False):
-    """
-    Get a cached link between a transcript and a protein that is not expired
-    according to the configuration settings `PROTEIN_LINK_EXPIRATION` and
-    `NEGATIVE_PROTEIN_LINK_EXPIRATION`.
-
-    :arg str accession: Accession number (without version number) to lookup
-      link for.
-    :arg bool reverse: If `True`, `accession` is assumed to be a protein
-      accession number, otherwise `accession` is assumed to be a transcript
-      accession number.
-
-    Note that the link may be negative, i.e., the knowledge that no link
-    exists can also be cached. In that case, the `protein_accession` field of
-    the resulting `TranscriptProteinLink` object is `None`.
-
-    Returns `None` if no link (positive or negative) is found.
-    """
-    link_datetime = datetime.now() - \
-        timedelta(seconds=settings.PROTEIN_LINK_EXPIRATION)
-    negative_link_datetime = datetime.now() - \
-        timedelta(seconds=settings.NEGATIVE_PROTEIN_LINK_EXPIRATION)
-
-    # Query column must have `accession`, other column has the value we're
-    # probably interested in.
-    query_column = TranscriptProteinLink.transcript_accession
-    other_column = TranscriptProteinLink.protein_accession
-
-    if reverse:
-        # Lookup by protein accession instead of transcript accession.
-        query_column, other_column = other_column, query_column
-
-    return TranscriptProteinLink.query.filter(
-        query_column == accession,
-        or_(and_(other_column.isnot(None),
-                 TranscriptProteinLink.added >= link_datetime),
-            and_(other_column.is_(None),
-                 TranscriptProteinLink.added >= negative_link_datetime))
-    ).first()
-
-
-def update_transcript_protein_link(transcript_accession=None,
-                                   protein_accession=None):
-    """
-    Update cached link between a transcript and a protein, or create it if it
-    doesn't exist yet.
-
-    :arg str transcript_accession: Transcript accession number (without
-      version number).
-    :arg str protein_accession: Protein accession number (without version
-      number).
-
-    At least one of `transcript_accession` or `protein_accession` must be not
-    `None`.
-    """
-    if transcript_accession is None and protein_accession is None:
-        raise ValueError('Link must have a transcript or protein')
-
-    # Filter clauses to find links for either of the given accession numbers.
-    clauses = []
-    if transcript_accession is not None:
-        clauses.append(TranscriptProteinLink.transcript_accession ==
-                       transcript_accession)
-    if protein_accession is not None:
-        clauses.append(TranscriptProteinLink.protein_accession ==
-                       protein_accession)
-
-    # Delete any related existing links.
-    TranscriptProteinLink.query.filter(or_(*clauses)).delete()
-    session.commit()
-
-    # There is a race condition here between deleting old links and adding the
-    # new one. It's extremely unlikely to go wrong, and we can safely ignore
-    # it anyway.
-    link = TranscriptProteinLink(transcript_accession, protein_accession)
-    try:
-        session.add(link)
-        session.commit()
-    except sqlalchemy.exc.IntegrityError:
-        session.rollback()
diff --git a/mutalyzer/ncbi.py b/mutalyzer/ncbi.py
index dc02da6d..f131088a 100644
--- a/mutalyzer/ncbi.py
+++ b/mutalyzer/ncbi.py
@@ -1,128 +1,144 @@
-from Bio import Entrez
+"""
+Communication with the NCBI.
+"""
 
-from .config import settings
-from .db import queries
 
+import functools
 
-def transcript_to_protein(transcript_accession):
-    """
-    Try to find the protein linked to a transcript.
+from Bio import Entrez
 
-    First look in our database. If a link cannot be found, try to retrieve it
-    from the NCBI. Add the result to our database.
+from .config import settings
+from .redisclient import client as redis
 
-    :arg str transcript_accession: Accession number of the transcript for
-      which we want to find the protein (without version number).
 
-    :returns: Accession number of a protein (without version number) or `None`
-      if no link can be found.
+def _get_link(source_accession, source_db, target_db, match_link_name):
+    """
+    Retrieve a linked accession number from the NCBI.
+
+    :arg str source_accession: Accession number for which we want to find a
+      link (without version number).
+    :arg str source_db: NCBI source database.
+    :arg str target_db: NCBI target database.
+    :arg function match_link_name: For each link found, this function is
+      called with the link name (`str`) and it should return `True` iff the
+      link is to be used.
+
+    :returns: Linked accession number (without version number) or `None` if no
+      link can be found.
     :rtype: str
     """
     Entrez.email = settings.EMAIL
-
-    link = queries.get_transcript_protein_link(transcript_accession)
-    if link is not None:
-        return link.protein_accession
-
-    handle = Entrez.esearch(db='nucleotide', term=transcript_accession)
+    handle = Entrez.esearch(db=source_db, term=source_accession)
     try:
         result = Entrez.read(handle)
     except Entrez.Parser.ValidationError:
-        # Todo: Log this error.
         return None
     finally:
         handle.close()
 
-    transcript_gi = unicode(result['IdList'][0])
+    try:
+        source_gi = unicode(result['IdList'][0])
+    except IndexError:
+        return None
 
-    handle = Entrez.elink(dbfrom='nucleotide', db='protein', id=transcript_gi)
+    handle = Entrez.elink(dbfrom=source_db, db=target_db, id=source_gi)
     try:
         result = Entrez.read(handle)
     except Entrez.Parser.ValidationError:
-        # Todo: Log this error.
         return None
     finally:
         handle.close()
 
     if not result[0]['LinkSetDb']:
-        # We also cache negative results.
-        queries.update_transcript_protein_link(
-            transcript_accession=transcript_accession)
         return None
 
-    protein_gi = unicode(result[0]['LinkSetDb'][0]['Link'][0]['Id'])
+    for link in result[0]['LinkSetDb']:
+        if match_link_name(unicode(link['LinkName'])):
+            target_gi = unicode(link['Link'][0]['Id'])
+            break
+    else:
+        return None
 
     handle = Entrez.efetch(
-        db='protein', id=protein_gi, rettype='acc', retmode='text')
-    protein_accession = unicode(handle.read()).split('.')[0]
+        db=target_db, id=target_gi, rettype='acc', retmode='text')
+    target_accession = unicode(handle.read()).split('.')[0]
     handle.close()
+    return target_accession
 
-    queries.update_transcript_protein_link(
-        transcript_accession=transcript_accession,
-        protein_accession=protein_accession)
-    return protein_accession
 
+def cache_link(source, target):
+    """
+    Decorator to add caching to link retrieval.
 
-def protein_to_transcript(protein_accession):
+    :arg str source: Source database (used to construct cache key).
+    :arg str target: Target database (used to construct cache key).
     """
-    Try to find the transcript linked to a protein.
+    forward_key = 'ncbi:%s-to-%s:%%s' % (source, target)
+    reverse_key = 'ncbi:%s-to-%s:%%s' % (target, source)
 
-    First look in our database. If a link cannot be found, try to retrieve it
-    from the NCBI. Add the result to our database.
+    def cache_source_to_target(f):
+        @functools.wraps(f)
+        def cached_f(accession):
+            result = redis.get(forward_key % accession)
+            if result is not None:
+                # The empty string is a cached negative result, which we return as
+                # `None`.
+                return result or None
 
-    :arg str protein_accession: Accession number of the protein for which we
-      want to find the transcript (without version number).
+            result = f(accession)
 
-    :returns: Accession number of a transcript (without version number) or
-      `None` if no link can be found.
-    :rtype: str
+            if result is None:
+                redis.setex(forward_key % accession,
+                            settings.NEGATIVE_LINK_CACHE_EXPIRATION, '')
+                return None
+
+            # We store the resulting link in both directions.
+            redis.set(forward_key % accession, result)
+            redis.set(reverse_key % result, accession)
+            return result
+
+        return cached_f
+
+    return cache_source_to_target
+
+
+@cache_link('transcript', 'protein')
+def transcript_to_protein(transcript_accession):
     """
-    Entrez.email = settings.EMAIL
+    Try to find the protein linked to a transcript.
 
-    link = queries.get_transcript_protein_link(protein_accession, reverse=True)
-    if link is not None:
-        return link.transcript_accession
+    Links are retrieved from the NCBI using their Entrez API and cached in
+    Redis. Negative results (accession or link could not be found) are also
+    cached, but expire after `NEGATIVE_LINK_CACHE_EXPIRATION` seconds.
 
-    handle = Entrez.esearch(db='protein', term=protein_accession)
-    try:
-        result = Entrez.read(handle)
-    except Entrez.Parser.ValidationError:
-        # Todo: Log this error.
-        return None
-    finally:
-        handle.close()
+    :arg str transcript_accession: Accession number of the transcript for
+      which we want to find the protein (without version number).
 
-    if not result['IdList']:
-        return None
-    protein_gi = unicode(result['IdList'][0])
+    :returns: Accession number of a protein (without version number) or `None`
+      if no link can be found.
+    :rtype: str
+    """
+    return _get_link(
+        transcript_accession, 'nucleotide', 'protein',
+        lambda link: link in ('nuccore_protein', 'nuccore_protein_cds'))
 
-    handle = Entrez.elink(dbfrom='protein', db='nucleotide', id=protein_gi)
-    try:
-        result = Entrez.read(handle)
-    except Entrez.Parser.ValidationError:
-        # Todo: Log this error.
-        return None
-    finally:
-        handle.close()
 
-    if not result[0]['LinkSetDb']:
-        # We also cache negative results.
-        queries.update_transcript_protein_link(
-            protein_accession=protein_accession)
-        return None
+@cache_link('protein', 'transcript')
+def protein_to_transcript(protein_accession):
+    """
+    Try to find the transcript linked to a protein.
 
-    transcript_gi = ''
-    for link in result[0]['LinkSetDb']:
-        if unicode(link['LinkName']) == 'protein_nuccore_mrna':
-            transcript_gi = unicode(link['Link'][0]['Id'])
-            break
+    Links are retrieved from the NCBI using their Entrez API and cached in
+    Redis. Negative results (accession or link could not be found) are also
+    cached, but expire after `NEGATIVE_LINK_CACHE_EXPIRATION` seconds.
 
-    handle = Entrez.efetch(
-        db='nucleotide', id=transcript_gi, rettype='acc', retmode='text')
-    transcript_accession = unicode(handle.read()).split('.')[0]
-    handle.close()
+    :arg str protein_accession: Accession number of the protein for which we
+      want to find the transcript (without version number).
 
-    queries.update_transcript_protein_link(
-        transcript_accession=transcript_accession,
-        protein_accession=protein_accession)
-    return transcript_accession
+    :returns: Accession number of a transcript (without version number) or
+      `None` if no link can be found.
+    :rtype: str
+    """
+    return _get_link(
+        protein_accession, 'protein', 'nucleotide',
+        lambda link: link == 'protein_nuccore_mrna')
diff --git a/tests/fixtures.py b/tests/fixtures.py
index 25d29812..a3f5e363 100644
--- a/tests/fixtures.py
+++ b/tests/fixtures.py
@@ -15,7 +15,7 @@ from mutalyzer.config import settings as _settings
 from mutalyzer.output import Output
 from mutalyzer.redisclient import client as redis
 from mutalyzer.db.models import (Assembly, Chromosome, Reference,
-                                 TranscriptMapping, TranscriptProteinLink)
+                                 TranscriptMapping)
 from mutalyzer import db as _db
 
 
@@ -96,15 +96,28 @@ def references(request, settings, db, available_references):
                             entry['filename'])
         shutil.copy(path, settings.CACHE_DIR)
 
-        reference = Reference(
-            accession, entry['checksum'], geninfo_identifier=geninfo_id)
-        db.session.add(reference)
-
-        for transcript, protein in entry.get('links', []):
-            db.session.add(TranscriptProteinLink(transcript, protein))
-
-        references.append(reference)
-
+        references.append(Reference(
+            accession, entry['checksum'], geninfo_identifier=geninfo_id))
+
+        for transcript_accession, protein_accession in entry.get('links', []):
+            if transcript_accession is not None:
+                key = 'ncbi:transcript-to-protein:%s' % transcript_accession
+                if protein_accession is not None:
+                    redis.set(key, protein_accession)
+                else:
+                    redis.setex(key,
+                                settings.NEGATIVE_LINK_CACHE_EXPIRATION,
+                                '')
+            if protein_accession is not None:
+                key = 'ncbi:protein-to-transcript:%s' % protein_accession
+                if transcript_accession is not None:
+                    redis.set(key, transcript_accession)
+                else:
+                    redis.setex(key,
+                                settings.NEGATIVE_LINK_CACHE_EXPIRATION,
+                                '')
+
+    db.session.add_all(references)
     db.session.commit()
 
     return references
diff --git a/tests/test_db_queries.py b/tests/test_db_queries.py
deleted file mode 100644
index 6eeb4492..00000000
--- a/tests/test_db_queries.py
+++ /dev/null
@@ -1,59 +0,0 @@
-"""
-Tests for the mutalyzer.db.queries module.
-"""
-
-
-from __future__ import unicode_literals
-
-import pytest
-
-from mutalyzer.db import queries
-
-
-pytestmark = [
-    pytest.mark.usefixtures('references'),
-    pytest.mark.parametrize('references', [['MARK1']], indirect=True)
-]
-
-
-def test_get_transcript_protein_link():
-    """
-    Query a transcript-protein link by transcript.
-    """
-    link = queries.get_transcript_protein_link('NM_018650')
-    assert link.transcript_accession == 'NM_018650'
-    assert link.protein_accession == 'NP_061120'
-
-
-def test_get_transcript_protein_link_negative():
-    """
-    Query a negative transcript-protein link by transcript.
-    """
-    link = queries.get_transcript_protein_link('XM_005273133')
-    assert link.transcript_accession == 'XM_005273133'
-    assert link.protein_accession is None
-
-
-def test_get_transcript_protein_link_missing():
-    """
-    Query a missing transcript-protein link by transcript.
-    """
-    link = queries.get_transcript_protein_link('NM_123456')
-    assert link is None
-
-
-def test_get_transcript_protein_link_reverse():
-    """
-    Query a transcript-protein link by protein.
-    """
-    link = queries.get_transcript_protein_link('NP_061120', reverse=True)
-    assert link.transcript_accession == 'NM_018650'
-    assert link.protein_accession == 'NP_061120'
-
-
-def test_get_transcript_protein_link_reverse_missing():
-    """
-    Query a missing transcript-protein link by protein.
-    """
-    link = queries.get_transcript_protein_link('NP_123456')
-    assert link is None
diff --git a/tests/test_migrations.py b/tests/test_migrations.py
index c2167ce2..1954226f 100644
--- a/tests/test_migrations.py
+++ b/tests/test_migrations.py
@@ -5,6 +5,8 @@ Test database migrations.
 
 from __future__ import unicode_literals
 
+from datetime import datetime
+
 import alembic.autogenerate
 import alembic.command
 import alembic.config
@@ -91,7 +93,8 @@ def add_database_content(connection):
     transcript_protein_links = sql.table(
         'transcript_protein_links',
         sql.column('transcript_accession', sa.String(30)),
-        sql.column('protein_accession', sa.String(30)))
+        sql.column('protein_accession', sa.String(30)),
+        sql.column('added', sa.DateTime))
 
     # Add some common data.
     connection.execute(
@@ -170,8 +173,10 @@ def add_database_content(connection):
     connection.execute(
         transcript_protein_links.insert(),
         transcript_accession='NM_052818',
-        protein_accession='NP_438169')
+        protein_accession='NP_438169',
+        added=datetime.now())
     connection.execute(
         transcript_protein_links.insert(),
         transcript_accession='NM_001079691',
-        protein_accession=None)
+        protein_accession=None,
+        added=datetime.now())
-- 
GitLab