Source code for braincell.io.neuromorpho.client

# Copyright 2026 BrainX Ecosystem Limited. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================

"""Stateful HTTP client for the NeuroMorpho.Org REST API.

The client is the Tier-2 entry point for the NeuroMorpho.Org integration.
For one-shot use cases, prefer the Tier-1 helpers in
:mod:`braincell.io.neuromorpho.entry`.
"""



import json
from pathlib import Path
from typing import Any, Iterator, Mapping

from .cache import NeuroMorphoCache
from .errors import NeuroMorphoError, NeuroMorphoHTTPError
from .http import request_with_retry
from .models import (
    NeuroMorphoCacheStatus,
    NeuroMorphoDetail,
    NeuroMorphoDownloadItem,
    NeuroMorphoDownloadRecord,
    NeuroMorphoFilePlan,
    NeuroMorphoMeasurement,
    NeuroMorphoNeuron,
    NeuroMorphoSearchPage,
    NeuroMorphoUrls,
)
from .query import NeuroMorphoQuery
from .urls import (
    API_BASE,
    DownloadMode,
    _validate_mode,
    build_measurement_url,
    build_original_file_url,
    build_standard_swc_url,
    coerce_https,
    plan_neuron_files,
)

__all__ = [
    "DEFAULT_TIMEOUT",
    "NeuroMorphoClient",
]

#: Default per-request timeout in seconds.
DEFAULT_TIMEOUT = 30.0


[docs] class NeuroMorphoClient: """Stateful client for the NeuroMorpho.Org REST API. The client wraps a ``requests.Session`` (lazily imported on first construction so the package can be imported without ``requests`` installed) and adds retry/backoff for transient failures, an optional :class:`NeuroMorphoCache`, and typed return values. Parameters ---------- session : object or None Session-like object exposing ``get(url, params=..., timeout=..., stream=...)``. Pass a configured ``requests.Session`` to control proxies, headers, or HTTPS verification. ``None`` constructs a fresh ``requests.Session`` (importing ``requests`` lazily). timeout : float Per-request timeout in seconds. Defaults to :data:`DEFAULT_TIMEOUT`. cache_dir : str or Path or None Root directory for cached downloads. When provided, the client creates a :class:`NeuroMorphoCache` accessible via :attr:`cache`. retries : int Number of attempts for JSON API calls. Must be ``>= 1``. Streaming file downloads are not retried. backoff_base : float Base delay (seconds) for exponential backoff between retries. Attributes ---------- session : object timeout : float cache : NeuroMorphoCache or None Set when ``cache_dir`` was provided. retries : int backoff_base : float Examples -------- .. code-block:: python >>> client = NeuroMorphoClient(cache_dir="~/data/neuromorpho") >>> page = client.search("species:mouse", size=5) # doctest: +SKIP >>> for neuron in client.iter_search("species:mouse", limit=20): ... client.download(neuron) # doctest: +SKIP """ def __init__( self, session: Any = None, *, timeout: float = DEFAULT_TIMEOUT, cache_dir: str | Path | None = None, retries: int = 3, backoff_base: float = 0.5, ) -> None: if session is None: try: import requests except Exception as exc: # pragma: no cover - depends on env raise ImportError( "The 'requests' library is required to use NeuroMorphoClient. " "Install it with 'pip install requests'." ) from exc session = requests.Session() if retries < 1: raise ValueError(f"retries must be >= 1, got {retries!r}") self.session = session self.timeout = float(timeout) self.retries = int(retries) self.backoff_base = float(backoff_base) self._cache: NeuroMorphoCache | None = ( NeuroMorphoCache(cache_dir) if cache_dir is not None else None ) # ------------------------------------------------------------------ # Cache # ------------------------------------------------------------------ @property def cache(self) -> NeuroMorphoCache | None: """Return the attached :class:`NeuroMorphoCache`, if configured.""" return self._cache @property def cache_dir(self) -> Path | None: """Convenience accessor for the cache root directory.""" return self._cache.root if self._cache is not None else None # ------------------------------------------------------------------ # Internal HTTP helpers # ------------------------------------------------------------------ def _get_json(self, url: str, *, params: dict[str, Any] | None = None) -> tuple[Any, str]: response = request_with_retry( self.session, url, timeout=self.timeout, attempts=self.retries, backoff_base=self.backoff_base, params=params, ) return response.json(), str(getattr(response, "url", url)) # ------------------------------------------------------------------ # Search # ------------------------------------------------------------------
[docs] def search( self, query: str | NeuroMorphoQuery, *, fq: list[str] | None = None, size: int = 20, page: int = 0, sort: str = "neuron_id,asc", ) -> NeuroMorphoSearchPage: """Run a single search request against ``/api/neuron/select``. Parameters ---------- query : str or NeuroMorphoQuery Either a raw Solr ``q`` string or a typed :class:`NeuroMorphoQuery`. When a query object is passed, its ``raw_fq`` is merged with the explicit ``fq`` argument. fq : list of str or None Extra Solr filter strings to append to ``fq``. size : int Page size requested from the API. page : int Zero-based page index. sort : str Sort string forwarded to the API. Returns ------- NeuroMorphoSearchPage """ params: dict[str, Any] = {"size": size, "page": page, "sort": sort} merged_fq: list[str] = [] if isinstance(query, NeuroMorphoQuery): params["q"] = query.to_q() merged_fq.extend(query.to_fq()) else: params["q"] = str(query) if fq: merged_fq.extend(fq) if merged_fq: params["fq"] = merged_fq payload, url = self._get_json(f"{API_BASE}/neuron/select/", params=params) items = tuple( NeuroMorphoNeuron.from_payload(item) for item in payload.get("_embedded", {}).get("neuronResources", []) ) page_info = payload.get("page", {}) return NeuroMorphoSearchPage( items=items, page=int(page_info.get("number", page)), size=int(page_info.get("size", size)), total_pages=int(page_info.get("totalPages", 0)), total_elements=int(page_info.get("totalElements", len(items))), query_url=url, )
# ------------------------------------------------------------------ # Single neuron # ------------------------------------------------------------------
[docs] def get_neuron(self, neuron_id: int) -> NeuroMorphoNeuron: """Fetch the neuron record for *neuron_id*. Parameters ---------- neuron_id : int Returns ------- NeuroMorphoNeuron Raises ------ NeuroMorphoNotFoundError If the upstream returns 404 for the id. """ payload, _ = self._get_json(f"{API_BASE}/neuron/id/{int(neuron_id)}") return NeuroMorphoNeuron.from_payload(payload)
[docs] def get_measurement( self, neuron: NeuroMorphoNeuron | int, ) -> NeuroMorphoMeasurement: """Fetch the morphometry record for a neuron. Parameters ---------- neuron : NeuroMorphoNeuron or int Either a previously-fetched neuron or its id. Returns ------- NeuroMorphoMeasurement """ if isinstance(neuron, NeuroMorphoNeuron): url = build_measurement_url(neuron) neuron_id = neuron.neuron_id else: neuron_id = int(neuron) url = f"{API_BASE}/morphometry/id/{neuron_id}" payload, _ = self._get_json(url) if not isinstance(payload, Mapping): raise NeuroMorphoError( f"Measurement endpoint {url} did not return a JSON object." ) if "neuron_id" not in payload: payload = dict(payload) payload["neuron_id"] = neuron_id try: return NeuroMorphoMeasurement.from_payload(payload) except ValueError as exc: raise NeuroMorphoError( f"Malformed measurement payload from {url}: {exc}" ) from exc
[docs] def get_urls(self, neuron: NeuroMorphoNeuron) -> NeuroMorphoUrls: """Return the resolved URLs for *neuron*. Parameters ---------- neuron : NeuroMorphoNeuron Returns ------- NeuroMorphoUrls """ thumbnail = neuron.png_url if thumbnail is not None: thumbnail = coerce_https(thumbnail) return NeuroMorphoUrls( standard_swc=build_standard_swc_url(neuron), original_file=build_original_file_url(neuron), measurement=build_measurement_url(neuron), thumbnail=thumbnail, )
[docs] def get_cache_status( self, neuron: NeuroMorphoNeuron | int, ) -> NeuroMorphoCacheStatus: """Return on-disk cache status for *neuron*. Parameters ---------- neuron : NeuroMorphoNeuron or int Returns ------- NeuroMorphoCacheStatus ``configured=False`` when this client has no cache attached. """ if self._cache is None: return NeuroMorphoCacheStatus( configured=False, folder=None, exists=False, metadata_exists=False, standard_exists=False, original_exists=False, neuron_id=( neuron.neuron_id if isinstance(neuron, NeuroMorphoNeuron) else int(neuron) ), ) neuron_id = ( neuron.neuron_id if isinstance(neuron, NeuroMorphoNeuron) else int(neuron) ) status = self._cache.status(neuron_id) if isinstance(neuron, NeuroMorphoNeuron): # Refine standard/original presence using the actual neuron name. folder = self._cache.layout.neuron_dir(neuron_id) standard = self._cache.layout.standard_swc_path(neuron_id, neuron.neuron_name) standard_exists = standard.exists() original_exists = False if neuron.original_format: suffix = Path(neuron.original_format).suffix if suffix: original_exists = self._cache.layout.original_file_path( neuron_id, neuron.neuron_name, suffix ).exists() return NeuroMorphoCacheStatus( configured=True, folder=folder, exists=folder.exists(), metadata_exists=self._cache.layout.metadata_path(neuron_id).exists(), standard_exists=standard_exists, original_exists=original_exists, neuron_id=neuron_id, ) return status
[docs] def describe( self, neuron: NeuroMorphoNeuron | int, *, include_measurement: bool = True, ) -> NeuroMorphoDetail: """Return an aggregate :class:`NeuroMorphoDetail` for a neuron. Combines :meth:`get_neuron` (when an id is passed), :meth:`get_measurement`, :meth:`get_urls`, and :meth:`get_cache_status` in one call. Parameters ---------- neuron : NeuroMorphoNeuron or int include_measurement : bool If ``False``, the ``measurement`` field of the result is ``None`` and no measurement request is issued. Returns ------- NeuroMorphoDetail """ resolved = ( neuron if isinstance(neuron, NeuroMorphoNeuron) else self.get_neuron(neuron) ) measurement = self.get_measurement(resolved) if include_measurement else None urls = self.get_urls(resolved) cache_status = self.get_cache_status(resolved) return NeuroMorphoDetail( neuron=resolved, measurement=measurement, urls=urls, cache_status=cache_status, )
# ------------------------------------------------------------------ # Files # ------------------------------------------------------------------
[docs] def file_plan( self, neuron: NeuroMorphoNeuron, *, mode: DownloadMode = "both", ) -> tuple[NeuroMorphoFilePlan, ...]: """Build the typed download plan for *neuron*. Parameters ---------- neuron : NeuroMorphoNeuron mode : {"standard", "original", "both"} Returns ------- tuple of NeuroMorphoFilePlan """ return plan_neuron_files(neuron, mode=mode)
[docs] def download( self, neuron: NeuroMorphoNeuron | int, output_dir: str | Path | None = None, *, mode: DownloadMode = "both", overwrite: bool = False, dry_run: bool = False, ) -> NeuroMorphoDownloadRecord: """Download files for *neuron* into the cache layout. When ``output_dir`` is ``None`` the client falls back to :attr:`cache_dir`. Either ``output_dir`` or a configured cache is required. Parameters ---------- neuron : NeuroMorphoNeuron or int output_dir : str or Path or None Cache root. ``None`` uses the client's configured cache. mode : {"standard", "original", "both"} overwrite : bool Re-download files that are already on disk. dry_run : bool If ``True``, do not touch the network or filesystem. Returns a populated :class:`NeuroMorphoDownloadRecord` whose items have ``downloaded_now=False`` and ``reason="dry_run"``. Returns ------- NeuroMorphoDownloadRecord Raises ------ ValueError If neither ``output_dir`` nor a client cache is provided. """ _validate_mode(mode) if output_dir is None: if self._cache is None: raise ValueError( "download() requires either an output_dir argument or a " "client constructed with cache_dir=..." ) cache_root = self._cache.root else: cache_root = Path(output_dir).expanduser() resolved = ( neuron if isinstance(neuron, NeuroMorphoNeuron) else self.get_neuron(neuron) ) # Fetch measurement first so we fail fast on a network error before # touching the filesystem. measurement: NeuroMorphoMeasurement | None if dry_run: measurement = None else: measurement = self.get_measurement(resolved) folder = cache_root / str(resolved.neuron_id) if not dry_run: folder.mkdir(parents=True, exist_ok=True) plans = plan_neuron_files(resolved, mode=mode) items: list[NeuroMorphoDownloadItem] = [] for plan in plans: target = folder / plan.filename if plan.skip: items.append( NeuroMorphoDownloadItem( kind=plan.kind, url=plan.url, filename=plan.filename, path=target, downloaded_now=False, reason=plan.reason, ) ) continue if dry_run: items.append( NeuroMorphoDownloadItem( kind=plan.kind, url=plan.url, filename=plan.filename, path=target, downloaded_now=False, reason="dry_run", ) ) continue downloaded_now = self._download_file(plan.url, target, overwrite=overwrite) items.append( NeuroMorphoDownloadItem( kind=plan.kind, url=plan.url, filename=target.name, path=target, downloaded_now=downloaded_now, ) ) metadata_path = folder / "metadata.json" if not dry_run: metadata = self._build_metadata(resolved, items, measurement, mode) metadata_path.write_text( json.dumps(metadata, indent=2, sort_keys=True), encoding="utf-8", ) return NeuroMorphoDownloadRecord( folder=folder, metadata_path=metadata_path, download_items=tuple(items), measurement=measurement, download_mode=mode, dry_run=dry_run, )
# ------------------------------------------------------------------ # Internals # ------------------------------------------------------------------ def _download_file(self, url: str, path: Path, *, overwrite: bool) -> bool: path.parent.mkdir(parents=True, exist_ok=True) if path.exists() and not overwrite: return False try: response = self.session.get( url, stream=True, timeout=max(self.timeout, 60.0), ) except Exception as exc: # noqa: BLE001 raise NeuroMorphoHTTPError( f"GET {url} failed: {exc}", status=0, url=url ) from exc with response: status = int(getattr(response, "status_code", 0) or 0) if status >= 400: raise NeuroMorphoHTTPError( f"GET {url} returned HTTP {status}", status=status, url=url ) tmp_path = path.with_suffix(path.suffix + ".tmp") try: with tmp_path.open("wb") as file_obj: for chunk in response.iter_content(chunk_size=1 << 15): if chunk: file_obj.write(chunk) tmp_path.replace(path) except BaseException: tmp_path.unlink(missing_ok=True) raise return True @staticmethod def _build_metadata( neuron: NeuroMorphoNeuron, items: list[NeuroMorphoDownloadItem], measurement: NeuroMorphoMeasurement | None, mode: DownloadMode, ) -> dict[str, Any]: thumbnail = coerce_https(neuron.png_url) if neuron.png_url else None measurement_url = build_measurement_url(neuron) return { "neuron_id": neuron.neuron_id, "neuron_name": neuron.neuron_name, "archive": neuron.archive, "species": neuron.species, "brain_region": neuron.brain_region, "cell_type": neuron.cell_type, "original_format": neuron.original_format, "thumbnail_url": thumbnail, "standard_swc_url": build_standard_swc_url(neuron), "original_file_url": build_original_file_url(neuron), "measurement_url": measurement_url, "links": neuron.payload.get("_links", {}), "neuron": neuron.payload, "measurement": dict(measurement.raw) if measurement is not None else None, "download_mode": mode, "download_items": [ { "kind": item.kind, "url": item.url, "filename": item.filename, "path": item.path.name, "downloaded_now": item.downloaded_now, "reason": item.reason, } for item in items ], }