"""Functions to retrieve data for NCBI databases in batch."""
import json
from pathlib import Path
import re
from xml.etree import ElementTree as ET
import requests
from modelarchive import _utils
def _get_ncbi_base_url():
return "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/"
def _get_ncbi_docsums(params):
"""Fetch and parse document summaries from the NCBI esummary endpoint.
Queries the NCBI esummary endpoint with the given parameters and parses
the returned XML into a list of dicts, one per DocSum element. Item
values are cast to their declared type (``String``, ``Integer``,
``Date``); items without text content are stored as ``None``.
Args:
params: Query parameters forwarded to the esummary endpoint, e.g.
``{"db": "protein", "query_key": ..., "WebEnv": ...}``.
Returns:
list[dict]: One dict per DocSum, mapping NCBI item names to their
typed values.
Raises:
RuntimeError: If an Item element carries an unrecognised type
attribute.
"""
# params = parameters to pass to esummary
response = requests.get(
_get_ncbi_base_url() + "esummary.fcgi",
params=params,
timeout=360,
)
response.encoding = "utf-8"
root = ET.fromstring(response.text)
docsums = root.findall(".//DocSum")
ncbi_dicts = []
for docsum in docsums:
ncbi_dict = {}
for cn in docsum:
if cn.tag == "Item":
cn_name = cn.get("Name")
cn_type = cn.get("Type")
if cn.text:
d = cn.text
if cn_type == "String":
ncbi_dict[cn_name] = d
elif cn_type == "Integer":
ncbi_dict[cn_name] = int(d)
elif cn_type == "Date":
# kept as string
ncbi_dict[cn_name] = d
else:
raise RuntimeError(
f"Unknown type {cn_type} for {cn_name}"
)
else:
ncbi_dict[cn_name] = None
ncbi_dicts.append(ncbi_dict)
return ncbi_dicts
def _parse_fasta(text):
"""Fetch sequencs & headers from a NCBI response."""
records = []
header = None
seq_parts = []
for line in text.splitlines():
line = line.strip()
if not line:
continue
if line.startswith(">"):
# save last record
if header is not None:
records.append((header, "".join(seq_parts)))
header = line[1:].strip()
seq_parts = []
else:
seq_parts.append(line)
# last record
if header is not None:
records.append((header, "".join(seq_parts)))
return records
def _get_ncbi_data(ncbi_acs):
"""Fetch sequence and metadata for a set of NCBI protein accessions.
Uploads accessions in batches to the NCBI epost endpoint, then
retrieves FASTA sequences via efetch and document summaries via
esummary. Batches that return incomplete results are retried
automatically until all accessions are resolved.
Args:
ncbi_acs (list[str]): NCBI protein accessions to fetch. May
include or omit version suffixes (e.g. ``"AAB12345"`` or
``"AAB12345.1"``).
Returns:
dict[str, dict]: Mapping from accession to a dict with keys
``"seq_name"`` (FASTA header), ``"seq_str"`` (sequence string),
and ``"info"`` (parsed esummary dict).
Raises:
RuntimeError: If a returned accession cannot be matched to any
requested accession, if sequence and info keys diverge after
a batch, or if the final set of resolved accessions does not
match the requested set.
"""
# No reducing variables for now
# pylint: disable=too-many-locals
max_num_ids = 2000 # recommended to do max. 5000 at once
ncbi_seq = {}
ncbi_info = {}
sel_ids = list(ncbi_acs)
while len(sel_ids) > 0:
# SOURCE: https://www.ncbi.nlm.nih.gov/books/NBK25501/
db = "protein"
# Upload the list of IDs using the epost endpoint
sel_ids = sel_ids[:max_num_ids]
params = {"db": db, "id": ",".join(sel_ids)}
response = requests.post(
_get_ncbi_base_url() + "epost.fcgi",
data=params,
timeout=360,
)
xml_string = response.text
root = ET.fromstring(xml_string)
query_key = root.find(".//QueryKey").text
web_env = root.find(".//WebEnv").text
# Fetch all sequences using the efetch endpoint via epost info
params = {
"db": db,
"query_key": query_key,
"WebEnv": web_env,
"rettype": "fasta",
"retmode": "text",
}
response = requests.get(
_get_ncbi_base_url() + "efetch.fcgi",
params=params,
timeout=360,
)
ss = _parse_fasta(response.text)
for s in ss:
ncbi_ac = s[0].split()[0]
if ncbi_ac not in sel_ids:
# maybe its the form x|xxxxx...|
if "|" in ncbi_ac:
ncbi_ac = ncbi_ac.split("|")[1]
if ncbi_ac not in sel_ids:
# assume that we store them without version
ncbi_ac = ncbi_ac.rsplit(".", 1)[0]
if ncbi_ac not in sel_ids:
raise RuntimeError(
f"NCBI AC '{ncbi_ac}' not in selected IDs"
)
ncbi_seq[ncbi_ac] = s
# Fetch all infos using the esummary endpoint via epost info
params = {"db": db, "query_key": query_key, "WebEnv": web_env}
for ncbi_dict in _get_ncbi_docsums(params):
ncbi_ac = ncbi_dict["AccessionVersion"]
if ncbi_ac not in sel_ids:
# assume that we store them without version
ncbi_ac = ncbi_ac.rsplit(".", 1)[0]
if ncbi_ac not in sel_ids:
raise RuntimeError(
f"NCBI AC '{ncbi_ac}' not in selected IDs"
)
ncbi_info[ncbi_ac] = ncbi_dict
if ncbi_info.keys() != ncbi_seq.keys():
raise RuntimeError("NCBI info & sequence keys mismatching")
# what's left?
# (random ones get dropped; reason unknown; so we try until all done)
sel_ids = [ncbi_ac for ncbi_ac in ncbi_acs if ncbi_ac not in ncbi_info]
if sorted(ncbi_acs) != sorted(ncbi_info):
raise RuntimeError("NCBI ACs not matching NCBI Info")
# combine nicely for further use
ncbi_data = {
ncbi_ac: {
"seq_name": ncbi_seq[ncbi_ac][0],
"seq_str": ncbi_seq[ncbi_ac][1],
"info": ncbi_info[ncbi_ac],
}
for ncbi_ac in ncbi_acs
}
return ncbi_data
def _get_ncbi_data_cached(ncbi_acs, ncbi_metadata_file):
"""Fetch NCBI protein data from a local cache file or from the web.
If ``ncbi_metadata_file`` is given and the file exists, data is loaded
from it. If the file does not yet exist, data is fetched from NCBI and
written to the file for future use. If ``ncbi_metadata_file`` is
``None``, data is always fetched from NCBI.
Args:
ncbi_acs (list[str]): NCBI protein accessions to fetch.
ncbi_metadata_file (str | None): Path to a JSON cache file, or
``None`` to bypass caching.
Returns:
dict[str, dict]: NCBI data as returned by :func:`_get_ncbi_data`.
"""
if ncbi_metadata_file:
# Convert to Path if string
if isinstance(ncbi_metadata_file, str):
ncbi_metadata_file = Path(ncbi_metadata_file)
if ncbi_metadata_file.exists():
with open(ncbi_metadata_file, encoding="ascii") as jfh:
return json.load(jfh)
ncbi_data = _get_ncbi_data(ncbi_acs)
with open(ncbi_metadata_file, "w", encoding="ascii") as jfh:
json.dump(ncbi_data, jfh)
return _get_ncbi_data(ncbi_acs)
def _get_ncbi_tax_info(tax_ids):
"""Fetch taxonomy document summaries from NCBI for a set of taxon IDs.
Uploads taxon IDs in batches to the NCBI epost endpoint and retrieves
document summaries via esummary. Batches that return incomplete results
are retried automatically. If a batch makes no progress, a warning is
emitted and the partially populated dict is returned.
Args:
tax_ids (list[str]): NCBI taxon IDs as strings.
Returns:
dict[str, dict]: Mapping from taxon ID string to its parsed
esummary dict. May be incomplete if NCBI repeatedly fails to
return certain IDs.
Raises:
RuntimeError: If a returned taxon ID is not found in the requested
set, or if the final set of resolved IDs does not match the
requested set.
"""
max_num_ids = 2000 # recommended to do max. 5000 at once
ncbi_info = {}
sel_ids = list(tax_ids)
while len(sel_ids) > 0:
# keep track of TODOs to abort if no progress made
last_num_todo = len(sel_ids)
# SOURCE: https://www.ncbi.nlm.nih.gov/books/NBK25501/
db = "taxonomy"
# Upload the list of IDs using the epost endpoint
sel_ids = sel_ids[:max_num_ids]
params = {"db": db, "id": ",".join(sel_ids)}
response = requests.post(
_get_ncbi_base_url() + "epost.fcgi",
data=params,
timeout=360,
)
xml_string = response.text
root = ET.fromstring(xml_string)
query_key = root.find(".//QueryKey").text
web_env = root.find(".//WebEnv").text
# Fetch all infos using the esummary endpoint via epost info
params = {"db": db, "query_key": query_key, "WebEnv": web_env}
for ncbi_dict in _get_ncbi_docsums(params):
tax_id = str(ncbi_dict["TaxId"])
if tax_id not in sel_ids:
raise RuntimeError(f"Taxon ID '{tax_id}' not in selection")
ncbi_info[tax_id] = ncbi_dict
# what's left?
# (random ones get dropped; reason unknown; so we try until all done)
sel_ids = [tax_id for tax_id in tax_ids if tax_id not in ncbi_info]
if last_num_todo == len(sel_ids):
_utils.warn_msg(f"ABORTING...failed to get {sel_ids}")
return ncbi_info
if sorted(tax_ids) != sorted(ncbi_info):
raise RuntimeError("Taxon IDs not matching NCBI Info")
return ncbi_info
def _check_ncbi_data(ncbi_data):
"""Run sanity checks on fetched NCBI protein data.
Iterates over all entries in ``ncbi_data`` and checks that accession
versions are consistent and that title strings contain only characters
permitted in mmCIF text fields. Entries with non-live status or that
have been replaced are collected and reported.
Args:
ncbi_data (dict[str, dict]): NCBI data as returned by
:func:`_get_ncbi_data` after species names have been added.
Returns:
tuple[list[str], list[tuple[str, str]]]
Raises:
RuntimeError: If the accession stored under a key does not match
the ``AccessionVersion`` field in the associated info dict, or
if the entry title contains characters not permitted in mmCIF
text fields.
"""
non_live = []
outdated = [] # (AC, replaced-by)
for ncbi_ac, ncbi_item in ncbi_data.items():
ncbi_info = ncbi_item["info"]
if ncbi_info["Status"] != "live":
non_live.append(ncbi_ac)
if ncbi_info["ReplacedBy"]:
outdated.append((ncbi_ac, ncbi_info["ReplacedBy"]))
if ncbi_info["AccessionVersion"] != ncbi_ac:
ncbi_info_ac = ncbi_info["AccessionVersion"].rsplit(".", 1)[0]
if ncbi_info_ac != ncbi_ac:
raise RuntimeError("NCBI AC is not AC")
mmcif_regex = "[][ \t_(),.;:\"&<>/\\\\{}'`~!@#$%?+=*A-Za-z0-9|^-]*"
description = ncbi_info["Title"]
if not re.fullmatch(mmcif_regex, description):
raise RuntimeError(
f"Illegal characters found in title of "
f"{ncbi_ac}: {description}"
)
if len(non_live) > 0:
msg = f"{len(non_live)} NCBI entries not live"
if len(non_live) < 20:
msg += f": {non_live}"
_utils.warn_msg(msg)
if len(outdated) > 0:
ncbi_acs = [v[0] for v in outdated]
ncbi_rep = [v[1] for v in outdated]
msg = f"{len(ncbi_acs)} outdated NCBI entries"
if len(outdated) < 20:
msg += f": {ncbi_acs} replaced by {ncbi_rep}"
_utils.warn_msg(msg)
return non_live, outdated
[docs]
def get_and_check_ncbi_species_names(ncbi_tax_ids):
"""Fetch and validate scientific species names for NCBI taxon IDs.
Retrieves taxonomy summaries for all given taxon IDs and checks that
all entries carry the expected ``Status`` value. Unexpected status
values will be printed out.
Args:
ncbi_tax_ids (list[str]): NCBI taxon IDs as strings.
Returns:
dict[str, str]: Mapping from taxon ID string to scientific species
name.
Raises:
RuntimeError: If a returned taxon ID is not in the requested set,
or if the final set of resolved taxon IDs does not match the
requested set.
"""
ncbi_tax_infos = _get_ncbi_tax_info(ncbi_tax_ids)
# some checks (expect to have all active tax. ids)
for item, exp_vals in [("Status", ["active"])]:
observed_values = sorted(
set(
ncbi_tax_info[item] for ncbi_tax_info in ncbi_tax_infos.values()
)
)
if observed_values != exp_vals:
_utils.warn_msg(
f"UNEXPECTED observed '{item}' values: {observed_values}"
)
# extract/keep scientific names
return {
ncbi_tax_id: ncbi_tax_info["ScientificName"]
for ncbi_tax_id, ncbi_tax_info in ncbi_tax_infos.items()
}
[docs]
def get_and_check_ncbi_data(ncbi_acs, ncbi_metadata_file=None):
"""Fetch NCBI sequence data, enrich with species names, and run checks.
Deduplicates the given accessions, fetches sequence and metadata
(from cache or NCBI), adds scientific species names from the taxonomy
database, and runs sanity checks. Warnings about outdated entries will be
printed out.
Args:
ncbi_acs (list[str]): NCBI protein accessions to process.
Duplicates are silently removed.
ncbi_metadata_file (str | None): Path to a JSON cache file for
NCBI protein data. If ``None``, data is always fetched from
NCBI. Defaults to ``None``.
Returns:
dict[str, dict]: Mapping from accession to a dict with keys
``"seq_name"``, ``"seq_str"``, and ``"info"``. The ``"info"``
sub-dict contains all summary key-value pairs plus
``"SpeciesName"``.
Raises:
RuntimeError: If a returned accession or taxon ID cannot be
matched to the requested set, if sequence and info keys
diverge after a batch, if the final resolved set is
incomplete, or if an entry fails the mmCIF character or
accession consistency checks.
"""
# fetch NCBI data for each
unique_acs = sorted(set(ncbi_acs))
ncbi_data = _get_ncbi_data_cached(unique_acs, ncbi_metadata_file)
# fetch all needed tax. infos (excluding overridden ones)
tax_ids = sorted(
set(str(ncbi_item["info"]["TaxId"]) for ncbi_item in ncbi_data.values())
)
# extract/keep scientific names
ncbi_species_names = get_and_check_ncbi_species_names(tax_ids)
# apply to data
for ncbi_item in ncbi_data.values():
tax_id = str(ncbi_item["info"]["TaxId"])
ncbi_item["info"]["SpeciesName"] = ncbi_species_names[tax_id]
# do some checks at the end
_check_ncbi_data(ncbi_data)
return ncbi_data