Source code for modelarchive.databases.ncbi

"""Functions to retrieve data for NCBI databases in batch."""

import json
from pathlib import Path
import re
from xml.etree import ElementTree as ET

import requests

from modelarchive import _utils


def _get_ncbi_base_url():
    return "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/"


def _get_ncbi_docsums(params):
    """Fetch and parse document summaries from the NCBI esummary endpoint.

    Queries the NCBI esummary endpoint with the given parameters and parses
    the returned XML into a list of dicts, one per DocSum element. Item
    values are cast to their declared type (``String``, ``Integer``,
    ``Date``); items without text content are stored as ``None``.

    Args:
        params: Query parameters forwarded to the esummary endpoint, e.g.
            ``{"db": "protein", "query_key": ..., "WebEnv": ...}``.

    Returns:
        list[dict]: One dict per DocSum, mapping NCBI item names to their
        typed values.

    Raises:
        RuntimeError: If an Item element carries an unrecognised type
            attribute.
    """
    # params = parameters to pass to esummary
    response = requests.get(
        _get_ncbi_base_url() + "esummary.fcgi",
        params=params,
        timeout=360,
    )
    response.encoding = "utf-8"
    root = ET.fromstring(response.text)
    docsums = root.findall(".//DocSum")
    ncbi_dicts = []
    for docsum in docsums:
        ncbi_dict = {}
        for cn in docsum:
            if cn.tag == "Item":
                cn_name = cn.get("Name")
                cn_type = cn.get("Type")
                if cn.text:
                    d = cn.text
                    if cn_type == "String":
                        ncbi_dict[cn_name] = d
                    elif cn_type == "Integer":
                        ncbi_dict[cn_name] = int(d)
                    elif cn_type == "Date":
                        # kept as string
                        ncbi_dict[cn_name] = d
                    else:
                        raise RuntimeError(
                            f"Unknown type {cn_type} for {cn_name}"
                        )
                else:
                    ncbi_dict[cn_name] = None
        ncbi_dicts.append(ncbi_dict)
    return ncbi_dicts


def _parse_fasta(text):
    """Fetch sequencs & headers from a NCBI response."""
    records = []
    header = None
    seq_parts = []

    for line in text.splitlines():
        line = line.strip()

        if not line:
            continue

        if line.startswith(">"):
            # save last record
            if header is not None:
                records.append((header, "".join(seq_parts)))

            header = line[1:].strip()
            seq_parts = []
        else:
            seq_parts.append(line)

    # last record
    if header is not None:
        records.append((header, "".join(seq_parts)))

    return records


def _get_ncbi_data(ncbi_acs):
    """Fetch sequence and metadata for a set of NCBI protein accessions.

    Uploads accessions in batches to the NCBI epost endpoint, then
    retrieves FASTA sequences via efetch and document summaries via
    esummary. Batches that return incomplete results are retried
    automatically until all accessions are resolved.

    Args:
        ncbi_acs (list[str]): NCBI protein accessions to fetch. May
            include or omit version suffixes (e.g. ``"AAB12345"`` or
            ``"AAB12345.1"``).

    Returns:
        dict[str, dict]: Mapping from accession to a dict with keys
        ``"seq_name"`` (FASTA header), ``"seq_str"`` (sequence string),
        and ``"info"`` (parsed esummary dict).

    Raises:
        RuntimeError: If a returned accession cannot be matched to any
            requested accession, if sequence and info keys diverge after
            a batch, or if the final set of resolved accessions does not
            match the requested set.
    """
    # No reducing variables for now
    # pylint: disable=too-many-locals
    max_num_ids = 2000  # recommended to do max. 5000 at once
    ncbi_seq = {}
    ncbi_info = {}
    sel_ids = list(ncbi_acs)
    while len(sel_ids) > 0:
        # SOURCE: https://www.ncbi.nlm.nih.gov/books/NBK25501/
        db = "protein"
        # Upload the list of IDs using the epost endpoint
        sel_ids = sel_ids[:max_num_ids]
        params = {"db": db, "id": ",".join(sel_ids)}
        response = requests.post(
            _get_ncbi_base_url() + "epost.fcgi",
            data=params,
            timeout=360,
        )
        xml_string = response.text
        root = ET.fromstring(xml_string)
        query_key = root.find(".//QueryKey").text
        web_env = root.find(".//WebEnv").text
        # Fetch all sequences using the efetch endpoint via epost info
        params = {
            "db": db,
            "query_key": query_key,
            "WebEnv": web_env,
            "rettype": "fasta",
            "retmode": "text",
        }
        response = requests.get(
            _get_ncbi_base_url() + "efetch.fcgi",
            params=params,
            timeout=360,
        )
        ss = _parse_fasta(response.text)
        for s in ss:
            ncbi_ac = s[0].split()[0]
            if ncbi_ac not in sel_ids:
                # maybe its the form x|xxxxx...|
                if "|" in ncbi_ac:
                    ncbi_ac = ncbi_ac.split("|")[1]
                if ncbi_ac not in sel_ids:
                    # assume that we store them without version
                    ncbi_ac = ncbi_ac.rsplit(".", 1)[0]
                    if ncbi_ac not in sel_ids:
                        raise RuntimeError(
                            f"NCBI AC '{ncbi_ac}' not in selected IDs"
                        )
            ncbi_seq[ncbi_ac] = s
        # Fetch all infos using the esummary endpoint via epost info
        params = {"db": db, "query_key": query_key, "WebEnv": web_env}
        for ncbi_dict in _get_ncbi_docsums(params):
            ncbi_ac = ncbi_dict["AccessionVersion"]
            if ncbi_ac not in sel_ids:
                # assume that we store them without version
                ncbi_ac = ncbi_ac.rsplit(".", 1)[0]
                if ncbi_ac not in sel_ids:
                    raise RuntimeError(
                        f"NCBI AC '{ncbi_ac}' not in selected IDs"
                    )
            ncbi_info[ncbi_ac] = ncbi_dict
        if ncbi_info.keys() != ncbi_seq.keys():
            raise RuntimeError("NCBI info & sequence keys mismatching")
        # what's left?
        # (random ones get dropped; reason unknown; so we try until all done)
        sel_ids = [ncbi_ac for ncbi_ac in ncbi_acs if ncbi_ac not in ncbi_info]
    if sorted(ncbi_acs) != sorted(ncbi_info):
        raise RuntimeError("NCBI ACs not matching NCBI Info")

    # combine nicely for further use
    ncbi_data = {
        ncbi_ac: {
            "seq_name": ncbi_seq[ncbi_ac][0],
            "seq_str": ncbi_seq[ncbi_ac][1],
            "info": ncbi_info[ncbi_ac],
        }
        for ncbi_ac in ncbi_acs
    }
    return ncbi_data


def _get_ncbi_data_cached(ncbi_acs, ncbi_metadata_file):
    """Fetch NCBI protein data from a local cache file or from the web.

    If ``ncbi_metadata_file`` is given and the file exists, data is loaded
    from it. If the file does not yet exist, data is fetched from NCBI and
    written to the file for future use. If ``ncbi_metadata_file`` is
    ``None``, data is always fetched from NCBI.

    Args:
        ncbi_acs (list[str]): NCBI protein accessions to fetch.
        ncbi_metadata_file (str | None): Path to a JSON cache file, or
            ``None`` to bypass caching.

    Returns:
        dict[str, dict]: NCBI data as returned by :func:`_get_ncbi_data`.
    """
    if ncbi_metadata_file:
        # Convert to Path if string
        if isinstance(ncbi_metadata_file, str):
            ncbi_metadata_file = Path(ncbi_metadata_file)
        if ncbi_metadata_file.exists():
            with open(ncbi_metadata_file, encoding="ascii") as jfh:
                return json.load(jfh)
        ncbi_data = _get_ncbi_data(ncbi_acs)
        with open(ncbi_metadata_file, "w", encoding="ascii") as jfh:
            json.dump(ncbi_data, jfh)
    return _get_ncbi_data(ncbi_acs)


def _get_ncbi_tax_info(tax_ids):
    """Fetch taxonomy document summaries from NCBI for a set of taxon IDs.

    Uploads taxon IDs in batches to the NCBI epost endpoint and retrieves
    document summaries via esummary. Batches that return incomplete results
    are retried automatically. If a batch makes no progress, a warning is
    emitted and the partially populated dict is returned.

    Args:
        tax_ids (list[str]): NCBI taxon IDs as strings.

    Returns:
        dict[str, dict]: Mapping from taxon ID string to its parsed
        esummary dict. May be incomplete if NCBI repeatedly fails to
        return certain IDs.

    Raises:
        RuntimeError: If a returned taxon ID is not found in the requested
            set, or if the final set of resolved IDs does not match the
            requested set.
    """
    max_num_ids = 2000  # recommended to do max. 5000 at once
    ncbi_info = {}
    sel_ids = list(tax_ids)
    while len(sel_ids) > 0:
        # keep track of TODOs to abort if no progress made
        last_num_todo = len(sel_ids)
        # SOURCE: https://www.ncbi.nlm.nih.gov/books/NBK25501/
        db = "taxonomy"
        # Upload the list of IDs using the epost endpoint
        sel_ids = sel_ids[:max_num_ids]
        params = {"db": db, "id": ",".join(sel_ids)}
        response = requests.post(
            _get_ncbi_base_url() + "epost.fcgi",
            data=params,
            timeout=360,
        )
        xml_string = response.text
        root = ET.fromstring(xml_string)
        query_key = root.find(".//QueryKey").text
        web_env = root.find(".//WebEnv").text
        # Fetch all infos using the esummary endpoint via epost info
        params = {"db": db, "query_key": query_key, "WebEnv": web_env}
        for ncbi_dict in _get_ncbi_docsums(params):
            tax_id = str(ncbi_dict["TaxId"])
            if tax_id not in sel_ids:
                raise RuntimeError(f"Taxon ID '{tax_id}' not in selection")
            ncbi_info[tax_id] = ncbi_dict
        # what's left?
        # (random ones get dropped; reason unknown; so we try until all done)
        sel_ids = [tax_id for tax_id in tax_ids if tax_id not in ncbi_info]
        if last_num_todo == len(sel_ids):
            _utils.warn_msg(f"ABORTING...failed to get {sel_ids}")
            return ncbi_info
    if sorted(tax_ids) != sorted(ncbi_info):
        raise RuntimeError("Taxon IDs not matching NCBI Info")
    return ncbi_info


def _check_ncbi_data(ncbi_data):
    """Run sanity checks on fetched NCBI protein data.

    Iterates over all entries in ``ncbi_data`` and checks that accession
    versions are consistent and that title strings contain only characters
    permitted in mmCIF text fields. Entries with non-live status or that
    have been replaced are collected and reported.

    Args:
        ncbi_data (dict[str, dict]): NCBI data as returned by
            :func:`_get_ncbi_data` after species names have been added.

    Returns:
        tuple[list[str], list[tuple[str, str]]]

    Raises:
        RuntimeError: If the accession stored under a key does not match
            the ``AccessionVersion`` field in the associated info dict, or
            if the entry title contains characters not permitted in mmCIF
            text fields.
    """
    non_live = []
    outdated = []  # (AC, replaced-by)
    for ncbi_ac, ncbi_item in ncbi_data.items():
        ncbi_info = ncbi_item["info"]
        if ncbi_info["Status"] != "live":
            non_live.append(ncbi_ac)
        if ncbi_info["ReplacedBy"]:
            outdated.append((ncbi_ac, ncbi_info["ReplacedBy"]))
        if ncbi_info["AccessionVersion"] != ncbi_ac:
            ncbi_info_ac = ncbi_info["AccessionVersion"].rsplit(".", 1)[0]
            if ncbi_info_ac != ncbi_ac:
                raise RuntimeError("NCBI AC is not AC")
        mmcif_regex = "[][ \t_(),.;:\"&<>/\\\\{}'`~!@#$%?+=*A-Za-z0-9|^-]*"
        description = ncbi_info["Title"]
        if not re.fullmatch(mmcif_regex, description):
            raise RuntimeError(
                f"Illegal characters found in title of "
                f"{ncbi_ac}: {description}"
            )
    if len(non_live) > 0:
        msg = f"{len(non_live)} NCBI entries not live"
        if len(non_live) < 20:
            msg += f": {non_live}"
        _utils.warn_msg(msg)
    if len(outdated) > 0:
        ncbi_acs = [v[0] for v in outdated]
        ncbi_rep = [v[1] for v in outdated]
        msg = f"{len(ncbi_acs)} outdated NCBI entries"
        if len(outdated) < 20:
            msg += f": {ncbi_acs} replaced by {ncbi_rep}"
        _utils.warn_msg(msg)
    return non_live, outdated


[docs] def get_and_check_ncbi_species_names(ncbi_tax_ids): """Fetch and validate scientific species names for NCBI taxon IDs. Retrieves taxonomy summaries for all given taxon IDs and checks that all entries carry the expected ``Status`` value. Unexpected status values will be printed out. Args: ncbi_tax_ids (list[str]): NCBI taxon IDs as strings. Returns: dict[str, str]: Mapping from taxon ID string to scientific species name. Raises: RuntimeError: If a returned taxon ID is not in the requested set, or if the final set of resolved taxon IDs does not match the requested set. """ ncbi_tax_infos = _get_ncbi_tax_info(ncbi_tax_ids) # some checks (expect to have all active tax. ids) for item, exp_vals in [("Status", ["active"])]: observed_values = sorted( set( ncbi_tax_info[item] for ncbi_tax_info in ncbi_tax_infos.values() ) ) if observed_values != exp_vals: _utils.warn_msg( f"UNEXPECTED observed '{item}' values: {observed_values}" ) # extract/keep scientific names return { ncbi_tax_id: ncbi_tax_info["ScientificName"] for ncbi_tax_id, ncbi_tax_info in ncbi_tax_infos.items() }
[docs] def get_and_check_ncbi_data(ncbi_acs, ncbi_metadata_file=None): """Fetch NCBI sequence data, enrich with species names, and run checks. Deduplicates the given accessions, fetches sequence and metadata (from cache or NCBI), adds scientific species names from the taxonomy database, and runs sanity checks. Warnings about outdated entries will be printed out. Args: ncbi_acs (list[str]): NCBI protein accessions to process. Duplicates are silently removed. ncbi_metadata_file (str | None): Path to a JSON cache file for NCBI protein data. If ``None``, data is always fetched from NCBI. Defaults to ``None``. Returns: dict[str, dict]: Mapping from accession to a dict with keys ``"seq_name"``, ``"seq_str"``, and ``"info"``. The ``"info"`` sub-dict contains all summary key-value pairs plus ``"SpeciesName"``. Raises: RuntimeError: If a returned accession or taxon ID cannot be matched to the requested set, if sequence and info keys diverge after a batch, if the final resolved set is incomplete, or if an entry fails the mmCIF character or accession consistency checks. """ # fetch NCBI data for each unique_acs = sorted(set(ncbi_acs)) ncbi_data = _get_ncbi_data_cached(unique_acs, ncbi_metadata_file) # fetch all needed tax. infos (excluding overridden ones) tax_ids = sorted( set(str(ncbi_item["info"]["TaxId"]) for ncbi_item in ncbi_data.values()) ) # extract/keep scientific names ncbi_species_names = get_and_check_ncbi_species_names(tax_ids) # apply to data for ncbi_item in ncbi_data.values(): tax_id = str(ncbi_item["info"]["TaxId"]) ncbi_item["info"]["SpeciesName"] = ncbi_species_names[tax_id] # do some checks at the end _check_ncbi_data(ncbi_data) return ncbi_data