Source code for modelarchive.databases.uniprotkb

"""Functions to retrieve data for UniProtKB entries/ sequences."""

# Copyright (c) 2026, SIB - Swiss Institute of Bioinformatics and
#                     Biozentrum - University of Basel
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

from datetime import datetime
import os
import requests

try:
    # pylint: disable=bare-except
    import ujson as json
except:
    import json

import parasail

# Name of UniProtKB as _ma_target_ref_db_details.db_name in a ModelCIF file,
# but 'pythonified' (no leading '_', no '.').
MA_TARGET_REF_DB_DETAILS_DB_NAME = "UNP"


[docs] def translate_upkb_date_string(date_string): """Convert a UniProtKB date string to a locale-independent format. UniProtKB uses 3-letter English month abbreviations (e.g. ``'MAY'``, ``'NOV'``) which fail with :func:`datetime.strptime` in non-English locales. This function replaces the month abbreviation with its zero-padded numeric equivalent before parsing. Args: date_string (str): A UniProtKB date string containing a 3-letter month abbreviation, e.g. ``'15-MAY-2023'``. Returns: str: The date string with the month abbreviation replaced by its numeric equivalent, e.g. ``'15-05-2023'``. Raises: RuntimeError: If no known 3-letter month abbreviation is found in ``date_string``. """ for i, mon in enumerate( [ "JAN", "FEB", "MAR", "APR", "MAY", "JUN", "JUL", "AUG", "SEP", "OCT", "NOV", "DEC", ], start=1, ): if mon in date_string: return date_string.replace(mon, f"{i:02}") raise RuntimeError( "Unrecognised UniProtKB date string found: " + f"'{date_string}'." )
def _fetch_upkb_versions(unp_ac): """Get list of entry versions with distinct sequence versions. Queries the UniSave API and returns only one entry version per unique sequence version, in descending order. Args: unp_ac (str): UniProtKB accession code. Returns: list[int]: Entry version numbers, one per unique sequence version. Raises: RuntimeError: If the UniSave API request fails. """ url = f"https://rest.uniprot.org/unisave/{unp_ac}?format=json" rspns = requests.get(url, timeout=180) if not rspns.ok: raise RuntimeError( f"Failed to retrieve version list for UniProtKB entry '{unp_ac}'." ) data = rspns.json() up_versions = [] up_seq_versions = set() for item in data["results"]: if item["sequenceVersion"] not in up_seq_versions: up_seq_versions.add(item["sequenceVersion"]) up_versions.append(item["entryVersion"]) return up_versions def _get_next_upkb_version(cur_version, up_versions): """Return the next entry version to try, or 0 if none remain. Args: cur_version (int): The current entry version. up_versions (list[int] | None): Ordered list of candidate entry versions to try. If :obj:`None`, simply decrements by one. Returns: int: Next entry version to try, or ``0`` if no more candidates. """ if up_versions is None: return cur_version - 1 for up_version in up_versions: if up_version < cur_version: return up_version return 0
[docs] class UniProtKBEntry: """Represent a single UniProtKB entry and its metadata. Fetches and parses a UniProtKB entry in TXT flat-file format from the UniProtKB or UniSave REST API on construction, or restores from a previously serialised JSON object. Args: unp_ac (str): Accession code of the UniProtKB entry to fetch. entry_version (str | int | None): Entry version to fetch. If :obj:`None`, the latest version is retrieved. json_data (dict | None): Restore the object from a serialised JSON dict instead of fetching from the API. Ignores ``unp_ac`` when provided. Attributes: unp_ac (str): UniProtKB accession code. entry_status (str | None): Entry status, e.g. ``'REVIEWED'``. entry_version (int | None): Entry version number. first_appearance (datetime | None): Date the entry was integrated into UniProtKB. last_change (datetime | None): Date of the last annotation change. last_seq_change (datetime | None): Date of the last sequence change. ncbi_taxid (str | None): NCBI taxonomy ID. organism_species (str): Organism species name. seq_version (int | None): Sequence version number. seqlen (int | None): Length of the canonical sequence. unp_crc64 (str | None): CRC64 checksum of the sequence. unp_details_full (str | None): Full recommended protein name. unp_id (str | None): UniProtKB entry name (mnemonic ID). unp_seq (str): Canonical amino-acid sequence. """ # pylint: disable=too-many-instance-attributes, too-few-public-methods def __init__(self, unp_ac, entry_version=None, json_data=None): if json_data is None: self.unp_ac = unp_ac self.entry_status = None self.entry_version = ( int(entry_version) if entry_version is not None else None ) self.first_appearance = None self.last_change = None self.last_seq_change = None self.ncbi_taxid = None self.organism_species = "" self.seq_version = None self.seqlen = None self.unp_crc64 = None self.unp_details_full = None self.unp_id = None self.unp_seq = "" self._fetch() assert len(self.unp_seq) == self.seqlen else: self.entry_status = json_data["entry_status"] self.entry_version = int(json_data["entry_version"]) self.first_appearance = ( datetime.fromisoformat(json_data["first_appearance"]) if json_data["first_appearance"] is not None else None ) self.last_change = ( datetime.fromisoformat(json_data["last_change"]) if json_data["last_change"] is not None else None ) self.last_seq_change = ( datetime.fromisoformat(json_data["last_seq_change"]) if json_data["last_seq_change"] is not None else None ) self.ncbi_taxid = json_data["ncbi_taxid"] self.organism_species = json_data["organism_species"] self.seq_version = json_data["seq_version"] self.seqlen = json_data["seqlen"] self.unp_ac = json_data["ac"] self.unp_crc64 = json_data["crc64"] self.unp_details_full = json_data["details_full"] self.unp_id = json_data["id"] self.unp_seq = json_data["seq"] def __str__(self): """Return a human-readable string representation of the entry.""" return ( f"<{__name__}.{type(self).__name__} AC={self.unp_ac} " + f"version={self.entry_version}>" ) def _parse_id_line(self, line): """Parse a UniProtKB TXT format's ID line. Should support some older format versions, too. """ sline = line.split() if len(sline) != 5: if len(sline) == 6 and sline[3] == "PRT;": sline.pop(3) else: raise RuntimeError( "ID line not conforming to 'ID EntryName" + f"Status; SequenceLength.', found: {line}" ) self.unp_id = sline[1] self.entry_status = sline[2][:-1].upper() self.seqlen = int(sline[3]) def _parse_dt_line(self, line): """Parse a UniProtKB TXT format's DT line. Should support some older format versions, too. """ sline = line.split() sline[1] = translate_upkb_date_string(sline[1]) if sline[2] == "(Rel.": # old format if sline[4] == "Created)": self.first_appearance = datetime.strptime(sline[1], "%d-%m-%Y") self.entry_version = int(sline[3][:-1]) elif sline[5] == "sequence": self.last_seq_change = datetime.strptime(sline[1], "%d-%m-%Y") self.seq_version = int(sline[3][:-1]) elif sline[5] == "annotation": self.last_change = datetime.strptime(sline[1], "%d-%m-%Y") return if sline[2] == "integrated": self.first_appearance = datetime.strptime(sline[1], "%d-%m-%Y,") elif sline[2] == "sequence": self.last_seq_change = datetime.strptime(sline[1], "%d-%m-%Y,") self.seq_version = int(sline[4][:-1]) elif sline[2] == "entry": self.last_change = datetime.strptime(sline[1], "%d-%m-%Y,") self.entry_version = int(sline[4][:-1]) def _parse_de_line(self, line): """Parse a UniProtKB TXT format's DE line(s).""" sline = line.split() if sline[1] == "RecName:": if self.unp_details_full is None and sline[2].startswith("Full="): self.unp_details_full = sline[2][len("Full=") :] for i in sline[3:]: if i.startswith("{"): break self.unp_details_full += f" {i}" if self.unp_details_full.endswith(";"): break if self.unp_details_full.endswith(";"): self.unp_details_full = self.unp_details_full[:-1] def _parse_os_line(self, line): """Parse a UniProtKB TXT format's OS line(s).""" osl = len("OS ") if line[-1] == ".": self.organism_species += line[osl:-1] else: self.organism_species += line[osl:-1] + " " def _parse_ox_line(self, line): """Parse a UniProtKB TXT format's OX line.""" sline = line.split("=") self.ncbi_taxid = sline[-1][:-1] self.ncbi_taxid = self.ncbi_taxid.split()[0] def _parse_sq_line(self, line): """Parse a UniProtKB TXT format's SQ line.""" sline = line.split() self.unp_crc64 = sline[6] def _parse_sequence(self, line): """Parse the sequence out of the UniProtKB TXT format.""" sline = line.split() self.unp_seq += "".join(sline) def _fetch(self): """Retrieve information for a single UniProtKB entry.""" if self.entry_version is None: query_url = f"https://rest.uniprot.org/uniprotkb/{self.unp_ac}.txt" else: query_url = ( f"https://rest.uniprot.org/unisave/{self.unp_ac}?format=txt&" + f"versions={self.entry_version}" ) rspns = requests.get(query_url, timeout=180) if not rspns.ok: raise RuntimeError( f"UniProtKB entry with AC '{self.unp_ac}' not retrieved for " + f"URL '{query_url}'" ) for line in rspns.iter_lines(decode_unicode=True): if line.startswith("ID "): self._parse_id_line(line) elif line.startswith("DT "): self._parse_dt_line(line) elif line.startswith("DE "): self._parse_de_line(line) elif line.startswith("OS "): self._parse_os_line(line) elif line.startswith("OX NCBI_TaxID="): self._parse_ox_line(line) elif line.startswith("SQ "): self._parse_sq_line(line) elif line.startswith(" "): self._parse_sequence(line)
[docs] def to_json(self): """Serialise the entry to a JSON-compatible dict. The returned dict can be passed to :meth:`__init__` via the ``json_data`` parameter to restore the entry without an API call. Returns: dict: JSON-serialisable representation of the entry. """ return { "ac": self.unp_ac, "entry_version": self.entry_version, "organism_species": self.organism_species, "entry_status": self.entry_status, "first_appearance": ( datetime.isoformat(self.first_appearance) if self.first_appearance is not None else None ), "last_change": ( datetime.isoformat(self.last_change) if self.last_change is not None else None ), "last_seq_change": ( datetime.isoformat(self.last_seq_change) if self.last_seq_change is not None else None ), "ncbi_taxid": self.ncbi_taxid, "seq_version": self.seq_version, "seqlen": self.seqlen, "details_full": self.unp_details_full, "id": self.unp_id, "crc64": self.unp_crc64, "seq": self.unp_seq, }
[docs] class UniProtKBEntryCache: """Cached retrieval of UniProtKB entries. To avoid calling the UniProtKB API for the same accession code multiple times, use this cache. The cache is keyed by accession code and entry version. Be aware that when no entry version is specified, the latest version is fetched, which may change at UniProtKB couple of times a year. The cache has no size limit and is never swept. Args: json_cache_file (str | None): Path to a JSON file used to persist the cache across runs. If :obj:`None`, the cache is kept in memory only and is lost when the process exits. """ # pylint: disable=too-few-public-methods def __init__(self, json_cache_file=None): self._cache = {} self._cache_file = json_cache_file if ( self._cache_file is not None and os.path.exists(self._cache_file) and os.stat(self._cache_file).st_size != 0 ): with open(self._cache_file, encoding="utf8") as jfh: self._cache = self._from_json(json.load(jfh))
[docs] def get(self, unp_ac, entry_version=None): """Return a :class:`UniProtKBEntry` from the cache. Fetches from the UniProtKB API on cache miss and persists the updated cache to disk if a cache file was configured. Args: unp_ac (str): UniProtKB accession code. entry_version (int | None): Entry version to retrieve. If :obj:`None`, the latest version is fetched. Returns: UniProtKBEntry: The requested entry. """ try: return self._cache[unp_ac][entry_version] except KeyError: unp = UniProtKBEntry(unp_ac, entry_version=entry_version) if unp_ac not in self._cache: self._cache[unp_ac] = {} self._cache[unp_ac][entry_version] = unp if self._cache_file is not None: with open(self._cache_file, "w", encoding="utf8") as jfh: json.dump(self.to_json(), jfh) return self._cache[unp_ac][entry_version]
[docs] def to_json(self): """Serialise the cache contents to a JSON-compatible dict. Returns: dict: Nested dict mapping accession codes and entry versions to their serialised :class:`UniProtKBEntry` representations. """ data = {} for acc, versions in self._cache.items(): data[acc] = {} for version, entry in versions.items(): data[acc][version] = entry.to_json() return data
def _from_json(self, data): """Initialise the cache from a JSON dict.""" cache = {} for acc, versions in data.items(): cache[acc] = {} for version, entry in versions.items(): version = int(version) if version != "null" else None cache[acc][version] = UniProtKBEntry(None, json_data=entry) return cache
[docs] def match_sequence(self, unp_ac, sequence, start=None, end=None): """Match a sequence against a UniProtKB entry, walking through various versions. Aligns ``sequence`` against the canonical sequence of the UNP entry using a Needleman-Wunsch global alignment (`parasail <https://github.com/jeffdaily/parasail>`_ NW, BLOSUM62). If the alignment at the current entry version does not produce an exact match in the requested range (no gaps in the UNP sequence, boundaries match ``start`` and ``end``), older entry versions are tried in descending order until a perfect match is found. If ``start`` and ``end`` are :obj:`None`, the full length of ``sequence`` is used as the range (``start=1``, ``end=len(sequence)``). Args: unp_ac (str): UniProtKB accession code. sequence (str): Target sequence of the model. start (int | None): Start residue of the alignment, 1-based. Defaults to ``1``. end (int | None): End residue of the alignment, 1-based inclusive. Defaults to ``len(sequence)``. Returns: tuple[UniProtKBEntry, tuple[int, int], tuple[int, int]]: A 3-tuple of the matching :class:`UniProtKBEntry`, the aligned range in the UNP sequence as ``(start, end)``, and the aligned range in ``sequence`` as ``(start, end)``. All positions are 1-based inclusive. Raises: RuntimeError: If no exact match can be found across all available entry versions. """ if start is None: start = 1 if end is None: end = len(sequence) def _aln(unp_seq, trg, start, end): unp_region = unp_seq[start:end] alignment = parasail.nw_trace_scan_sat( trg, unp_region, 5, 2, parasail.blosum62 ) db_aln_start = ( start + len(unp_region) - len(alignment.traceback.query.lstrip("-")) ) db_aln_end = db_aln_start + len( alignment.traceback.query.rstrip("-") ) seq_aln_start = len(trg) - len(alignment.traceback.ref.lstrip("-")) seq_aln_end = seq_aln_start + len( alignment.traceback.query.replace("-", "") ) return ( db_aln_start + 1, db_aln_end, seq_aln_start + 1, seq_aln_end, alignment.traceback.ref, ) def _is_exact(db_aln_start, db_aln_end, aln_unp, start, end): return ( db_aln_start == start and db_aln_end == end and aln_unp.find("-") == -1 ) entry = self.get(unp_ac) db_aln_start, db_aln_end, seq_aln_start, seq_aln_end, aln_unp = _aln( entry.unp_seq, sequence, start - 1, end ) if _is_exact(db_aln_start, db_aln_end, aln_unp, start, end): return ( entry, (db_aln_start, db_aln_end), (seq_aln_start, seq_aln_end), ) up_versions = _fetch_upkb_versions(unp_ac) version = entry.entry_version while True: version = _get_next_upkb_version(version, up_versions) if version == 0: break entry = self.get(unp_ac, entry_version=version) db_aln_start, db_aln_end, seq_aln_start, seq_aln_end, aln_unp = ( _aln(entry.unp_seq, sequence, start - 1, end) ) if _is_exact(db_aln_start, db_aln_end, aln_unp, start, end): return ( entry, (db_aln_start, db_aln_end), (seq_aln_start, seq_aln_end), ) raise RuntimeError( f"Could not find a proper alignment in region {start}-{end} " + f"for {unp_ac}." )