Skip to content

plaid.viewer.services.plaid_dataset_service

plaid.viewer.services.plaid_dataset_service

Dataset discovery and sample introspection for the PLAID viewer.

This service owns all PLAID-facing logic used by the viewer:

  • Discover datasets under a configured root directory.
  • Load a split-wise (dataset_dict, converter_dict) pair through :func:plaid.storage.init_from_disk and cache it for subsequent calls.
  • Materialize PLAID :class:plaid.Sample instances via converter.to_plaid(dataset, index), regardless of the underlying backend (hf_datasets, cgns, zarr ...).
  • Summarize sample contents (bases, zones, fields, times, scalars).
  • Report basic validation status via :meth:Sample.check_completeness.

plaid.viewer.services.plaid_dataset_service.PlaidDatasetService

PlaidDatasetService(config)

High-level access to PLAID datasets stored under a root directory.

A dataset is a subdirectory of config.datasets_root that contains a data/ directory readable by :func:plaid.storage.init_from_disk. The function returns a dataset_dict and a converter_dict keyed by split name; the viewer iterates splits and addresses samples by integer index in range(len(dataset_dict[split])).

Source code in plaid/viewer/services/plaid_dataset_service.py
def __init__(self, config: ViewerConfig) -> None:
    self._config = config
    # Datasets root is kept on the service (not on the frozen config)
    # so it can be changed at runtime through ``set_datasets_root``.
    # ``None`` means no root has been selected yet: discovery methods
    # return empty lists and the UI is expected to prompt the user.
    self._datasets_root: Path | None = (
        Path(config.datasets_root) if config.datasets_root is not None else None
    )
    # Sandbox for interactive root selection. Defaults to the user's
    # home directory when no explicit ``browse_roots`` is configured.
    # The configured ``datasets_root`` is always implicitly allowed so
    # ``list_subdirs`` can start from there.
    browse_roots: list[Path] = [Path(p).expanduser() for p in config.browse_roots]
    if not browse_roots:
        browse_roots = [Path.home()]
    if self._datasets_root is not None:
        # Make sure the startup root is always reachable even if
        # ``browse_roots`` is more restrictive.
        browse_roots.append(self._datasets_root)
    self._browse_roots: tuple[Path, ...] = tuple(
        dict.fromkeys(p.resolve() for p in browse_roots)
    )
    # Cache of (dataset_dict, converter_dict) keyed by dataset_id to
    # avoid re-parsing large arrow/zarr datasets on every call.
    self._store_cache: dict[str, tuple[dict, dict]] = {}
    # Registered Hugging Face Hub repositories that should be exposed
    # as datasets through :func:`plaid.storage.init_streaming_from_hub`.
    # The ``dataset_id`` used throughout the viewer is the raw
    # ``repo_id`` string (e.g. ``"PLAID-lib/VKI-LS59"``), which never
    # collides with a local directory name (it always contains a
    # forward slash).
    self._hub_repos: list[str] = []
    # Per-(dataset_id, split) streaming cursors. Streaming datasets
    # are ``datasets.IterableDataset`` instances without ``__len__``
    # so we cannot index them. We maintain a forward-only cursor
    # instead: ``_cursors[(dataset_id, split)] = (iterator, position,
    # cached_sample)``. ``Next`` consumes the iterator and advances
    # ``position``; ``Reset`` discards the iterator so a fresh one is
    # built on the next access.
    self._cursors: dict[tuple[str, str], _StreamCursor] = {}
    # User-selected feature filter per dataset. ``None`` means "no
    # filter" (load every feature, current default behaviour). An
    # empty list means "all features unselected".
    self._features: dict[str, list[str] | None] = {}
    # Memoised ``(constant_feature_keys, variable_feature_keys)`` per
    # dataset, retrieved through ``load_metadata_from_disk`` or
    # ``load_metadata_from_hub``. Used to (a) populate the UI
    # checkbox list through :meth:`list_available_features` and (b)
    # expand user-selected feature paths with
    # :func:`plaid.utils.cgns_helper.update_features_for_CGNS_compatibility`
    # before handing them to ``init_streaming_from_hub`` (which, unlike
    # :meth:`Converter.to_plaid`, does not expand features by itself).
    self._feature_metadata: dict[str, tuple[list[str], list[str]]] = {}
    # Memoised per-split feature catalogue for a dataset. Unlike
    # ``_feature_metadata`` (which aggregates constants across
    # splits so the UI can offer a union of fields), this mapping
    # preserves the split boundary so :meth:`load_sample` can
    # filter the user's selection down to what a specific split
    # actually carries. ``PlaidSampleConverter.to_plaid`` otherwise
    # raises ``KeyError('Missing features in dataset/converter:
    # ...')`` whenever the request names a path that the split in
    # hand does not know about.
    self._split_feature_metadata: dict[str, dict[str, set[str]]] = {}
    # In-memory LRU of decoded :class:`plaid.Sample` objects keyed by
    # ``(dataset_id, split_key, sample_id, features_tuple)``. The
    # viewer calls :meth:`load_sample` several times per user
    # interaction (one for the VTK artifact, one for globals, one
    # for non-visual bases, one per playback frame, ...). Without
    # this cache each call performs a full ``Converter.to_plaid``
    # decode of the same sample - cheap on the memory-mapped
    # ``hf_datasets`` backend but very expensive on ``zarr``, where
    # every decode reopens hundreds of small chunk files. The cache
    # collapses repeated calls into a single decode for the
    # currently active sample (and the previous one for back-and-
    # forth navigation). Streaming datasets are intentionally
    # bypassed: their ``sample_id`` is the constant
    # :data:`STREAM_CURSOR_ID` sentinel so caching would conflate
    # different cursor positions.
    self._sample_cache: OrderedDict[
        tuple[str, str, str, tuple[str, ...] | None], Any
    ] = OrderedDict()
    self._sample_cache_capacity: int = 2

plaid.viewer.services.plaid_dataset_service.PlaidDatasetService.datasets_root property

datasets_root

Return the currently active datasets root, or None.

plaid.viewer.services.plaid_dataset_service.PlaidDatasetService.browse_roots property

browse_roots

Return the sandbox directories for interactive path selection.

plaid.viewer.services.plaid_dataset_service.PlaidDatasetService.hub_repos property

hub_repos

Return the list of registered Hugging Face Hub repositories.

plaid.viewer.services.plaid_dataset_service.PlaidDatasetService.set_datasets_root

set_datasets_root(path)

Change the active datasets root at runtime.

The new path (when not None) must exist, be a directory, and be located under one of browse_roots. All per-dataset caches are invalidated so the next discovery call reflects the new root.

Parameters:

  • path (Path | str | None) –

    The new datasets root. None clears the current root.

Returns:

  • Path | None

    The resolved new datasets root, or None if cleared.

Raises:

  • ValueError

    If the path does not exist, is not a directory, or escapes browse_roots.

Source code in plaid/viewer/services/plaid_dataset_service.py
def set_datasets_root(self, path: Path | str | None) -> Path | None:
    """Change the active datasets root at runtime.

    The new path (when not ``None``) must exist, be a directory, and be
    located under one of ``browse_roots``. All per-dataset caches are
    invalidated so the next discovery call reflects the new root.

    Args:
        path: The new datasets root. ``None`` clears the current root.

    Returns:
        The resolved new datasets root, or ``None`` if cleared.

    Raises:
        ValueError: If the path does not exist, is not a directory, or
            escapes ``browse_roots``.
    """
    # Deferred import so the service module stays importable without
    # write access to the user config directory (e.g. in read-only
    # CI sandboxes that don't touch ``set_datasets_root`` anyway).
    from plaid.viewer.preferences import (  # noqa: PLC0415
        set_last_datasets_root,
    )

    if path is None:
        self._datasets_root = None
        self._store_cache.clear()
        self._sample_cache.clear()
        set_last_datasets_root(None)
        return None
    resolved = Path(path).expanduser().resolve()
    if not resolved.is_dir():
        raise ValueError(f"Not a directory: {resolved}")
    self._ensure_within_browse_roots(resolved)
    self._datasets_root = resolved
    self._store_cache.clear()
    self._sample_cache.clear()
    # Persist the new root so the next launch of the viewer picks it
    # up automatically when ``--datasets-root`` is not provided.
    set_last_datasets_root(resolved)
    return resolved

plaid.viewer.services.plaid_dataset_service.PlaidDatasetService.list_subdirs

list_subdirs(path=None)

Return immediate subdirectories of path for the file browser.

Each entry is tagged with is_plaid_candidate (True when it looks like a PLAID dataset, i.e. contains a data/ subdirectory) so the UI can highlight it. The returned path is always an absolute resolved path inside browse_roots.

Parameters:

  • path (Path | str | None, default: None ) –

    Directory to list. When None the first browse root is used (typically $HOME).

Returns:

  • dict[str, object]

    A dict ``{"path": str, "parent": str | None,

  • dict[str, object]

    "entries": [{"name": str, "path": str,

  • dict[str, object]

    "is_plaid_candidate": bool}, ...]}``.

Raises:

  • ValueError

    If path is not a directory or escapes the sandbox.

Source code in plaid/viewer/services/plaid_dataset_service.py
def list_subdirs(self, path: Path | str | None = None) -> dict[str, object]:
    """Return immediate subdirectories of ``path`` for the file browser.

    Each entry is tagged with ``is_plaid_candidate`` (``True`` when it
    looks like a PLAID dataset, i.e. contains a ``data/`` subdirectory)
    so the UI can highlight it. The returned ``path`` is always an
    absolute resolved path inside ``browse_roots``.

    Args:
        path: Directory to list. When ``None`` the first browse root is
            used (typically ``$HOME``).

    Returns:
        A dict ``{"path": str, "parent": str | None,
        "entries": [{"name": str, "path": str,
        "is_plaid_candidate": bool}, ...]}``.

    Raises:
        ValueError: If ``path`` is not a directory or escapes the
            sandbox.
    """
    if path is None:
        target = self._browse_roots[0]
    else:
        target = Path(path).expanduser().resolve()
    if not target.is_dir():
        raise ValueError(f"Not a directory: {target}")
    self._ensure_within_browse_roots(target)
    entries: list[dict[str, object]] = []
    for entry in sorted(target.iterdir()):
        if not entry.is_dir():
            continue
        if entry.name.startswith("."):
            continue
        entries.append(
            {
                "name": entry.name,
                "path": str(entry),
                "is_plaid_candidate": (entry / "data").is_dir(),
            }
        )
    # Rank PLAID candidates first, then alphabetical (stable).
    entries.sort(key=lambda e: (not e["is_plaid_candidate"], e["name"].lower()))
    parent: str | None = None
    if any(
        target != root and root in target.parents for root in self._browse_roots
    ):
        parent = str(target.parent)
    elif (
        target.parent != target
        and any(  # pragma: no cover - alternate browse-root ancestry guard
            target.parent == root or root in target.parent.parents
            for root in self._browse_roots
        )
    ):
        parent = str(target.parent)
    return {
        "path": str(target),
        "parent": parent,
        "entries": entries,
    }

plaid.viewer.services.plaid_dataset_service.PlaidDatasetService.list_datasets

list_datasets()

Return a summary of every dataset available to the viewer.

Local datasets (subdirectories of datasets_root) and registered Hugging Face Hub repositories (added via :meth:add_hub_dataset) are both included, in that order.

Source code in plaid/viewer/services/plaid_dataset_service.py
def list_datasets(self) -> list[DatasetInfo]:
    """Return a summary of every dataset available to the viewer.

    Local datasets (subdirectories of ``datasets_root``) and registered
    Hugging Face Hub repositories (added via :meth:`add_hub_dataset`)
    are both included, in that order.
    """
    infos: list[DatasetInfo] = []
    root = self._datasets_root
    if root is not None:
        for entry in _safe_list_dir(root):
            if not entry.is_dir():
                continue
            if not (entry / "data").is_dir():
                continue
            infos.append(
                DatasetInfo(
                    dataset_id=entry.name,
                    is_streaming=False,
                    path=str(entry),
                    has_infos=(entry / "infos.yaml").exists()
                    or (entry / "infos.json").exists(),
                    has_problem_definitions=(
                        entry / "problem_definitions"
                    ).is_dir(),
                )
            )
    for repo_id in self._hub_repos:
        infos.append(
            DatasetInfo(
                dataset_id=repo_id,
                is_streaming=True,
                path=f"hf://{repo_id}",
                has_infos=False,
                has_problem_definitions=False,
            )
        )

    return infos

plaid.viewer.services.plaid_dataset_service.PlaidDatasetService.add_hub_dataset

add_hub_dataset(repo_id)

Register a Hugging Face Hub dataset to stream from.

The dataset is exposed through :func:plaid.storage.init_streaming_from_hub and appears in :meth:list_datasets with dataset_id == repo_id.

Parameters:

  • repo_id (str) –

    Hugging Face repository identifier, e.g. "PLAID-lib/VKI-LS59". Must contain a / separator.

Returns:

  • str

    The normalised repo_id.

Raises:

  • ValueError

    If repo_id is empty or does not look like a namespace/name pair.

Source code in plaid/viewer/services/plaid_dataset_service.py
def add_hub_dataset(self, repo_id: str) -> str:
    """Register a Hugging Face Hub dataset to stream from.

    The dataset is exposed through :func:`plaid.storage.init_streaming_from_hub`
    and appears in :meth:`list_datasets` with ``dataset_id == repo_id``.

    Args:
        repo_id: Hugging Face repository identifier, e.g.
            ``"PLAID-lib/VKI-LS59"``. Must contain a ``/`` separator.

    Returns:
        The normalised ``repo_id``.

    Raises:
        ValueError: If ``repo_id`` is empty or does not look like a
            ``namespace/name`` pair.
    """
    normalised = (repo_id or "").strip()
    if not normalised:
        raise ValueError("repo_id must be a non-empty string.")
    if "/" not in normalised:
        raise ValueError(
            f"repo_id {normalised!r} must be of the form 'namespace/name'."
        )
    if normalised in self._hub_repos:
        return normalised
    self._hub_repos.append(normalised)
    return normalised

plaid.viewer.services.plaid_dataset_service.PlaidDatasetService.remove_hub_dataset

remove_hub_dataset(repo_id)

Unregister a previously added Hugging Face Hub dataset.

Source code in plaid/viewer/services/plaid_dataset_service.py
def remove_hub_dataset(self, repo_id: str) -> None:
    """Unregister a previously added Hugging Face Hub dataset."""
    if repo_id in self._hub_repos:
        self._hub_repos.remove(repo_id)
        self._store_cache.pop(repo_id, None)
        self._features.pop(repo_id, None)
        self._feature_metadata.pop(repo_id, None)
        # Drop any streaming cursors owned by the removed dataset.
        self._cursors = {
            key: cur for key, cur in self._cursors.items() if key[0] != repo_id
        }
        self._invalidate_sample_cache(repo_id)

plaid.viewer.services.plaid_dataset_service.PlaidDatasetService.list_available_features

list_available_features(dataset_id)

Return the feature paths offered to the user for filtering.

The viewer only exposes paths that are CGNS fields (i.e. what :func:plaid.containers.utils.get_feature_details_from_path classifies as type == "field"). Globals, coordinates, element connectivities, boundary conditions, etc. are hidden because they are not what the user means when they want to "filter the displayed features" in a 3D viewer.

Paths ending in _times (time-series bookkeeping duplicates of a field, e.g. Base_.../FlowSolution/Pressure_times) are also filtered out: they are artefacts of the temporal storage layout, not distinct physical quantities the user would want to toggle.

Source code in plaid/viewer/services/plaid_dataset_service.py
def list_available_features(self, dataset_id: str) -> list[str]:
    """Return the feature paths offered to the user for filtering.

    The viewer only exposes paths that are CGNS *fields* (i.e. what
    :func:`plaid.containers.utils.get_feature_details_from_path`
    classifies as ``type == "field"``). Globals, coordinates,
    element connectivities, boundary conditions, etc. are hidden
    because they are not what the user means when they want to
    "filter the displayed features" in a 3D viewer.

    Paths ending in ``_times`` (time-series bookkeeping duplicates
    of a field, e.g. ``Base_.../FlowSolution/Pressure_times``) are
    also filtered out: they are artefacts of the temporal storage
    layout, not distinct physical quantities the user would want to
    toggle.
    """
    # Deferred import - the helper lives in PLAID's containers module.
    from plaid.containers.utils import (  # noqa: PLC0415
        get_feature_details_from_path,
    )

    constant_keys, variable_keys = self._load_feature_metadata(dataset_id)
    candidates = set(constant_keys) | set(variable_keys)
    fields: list[str] = []
    for path in candidates:
        if path.endswith("_times"):
            continue
        try:
            details = get_feature_details_from_path(path)
        except Exception:  # noqa: BLE001 - malformed path, skip
            continue
        # Only expose "genuine" field paths - i.e. those that carry
        # a ``name`` entry in ``details``. Some variants returned by
        # :func:`get_feature_details_from_path` are typed as
        # ``"field"`` but describe a container (e.g. a
        # ``FlowSolution_t`` node) rather than a specific data array,
        # and therefore have no ``name``. Filtering on ``name``
        # removes those from the UI while keeping every real scalar
        # / vector field the user can actually plot.
        # ``GridLocation`` nodes are CGNS metadata (they describe
        # *where* a field lives, e.g. ``Vertex`` vs ``CellCenter``)
        # rather than a plottable field, so they must not appear in
        # the viewer's feature selection.
        name = details.get("name")
        if details.get("type") == "field" and name and name != "GridLocation":
            fields.append(path)
    return sorted(fields)

plaid.viewer.services.plaid_dataset_service.PlaidDatasetService.get_features

get_features(dataset_id)

Return the active feature filter for dataset_id.

None means "no filter": every feature is loaded (default behaviour). An explicit empty list means "no feature selected".

Source code in plaid/viewer/services/plaid_dataset_service.py
def get_features(self, dataset_id: str) -> list[str] | None:
    """Return the active feature filter for ``dataset_id``.

    ``None`` means "no filter": every feature is loaded (default
    behaviour). An explicit empty list means "no feature selected".
    """
    return self._features.get(dataset_id)

plaid.viewer.services.plaid_dataset_service.PlaidDatasetService.list_available_bases

list_available_bases(dataset_id)

Return the unique CGNS base prefixes (e.g. Base_2_2).

Derived from the constant/variable feature catalogues, so the list is available before any sample has been loaded - which lets the trame UI populate the "Base" toggle as soon as a dataset is selected. The synthetic Globals base is excluded; it is exposed separately by :meth:list_globals_paths and surfaced as its own toggle in the side drawer.

Source code in plaid/viewer/services/plaid_dataset_service.py
def list_available_bases(self, dataset_id: str) -> list[str]:
    """Return the unique CGNS base prefixes (e.g. ``Base_2_2``).

    Derived from the constant/variable feature catalogues, so the
    list is available *before* any sample has been loaded - which
    lets the trame UI populate the "Base" toggle as soon as a
    dataset is selected. The synthetic ``Globals`` base is
    excluded; it is exposed separately by
    :meth:`list_globals_paths` and surfaced as its own toggle in
    the side drawer.
    """
    constant_keys, variable_keys = self._load_feature_metadata(dataset_id)
    bases: set[str] = set()
    for path in set(constant_keys) | set(variable_keys):
        if not path:
            continue
        head = path.split("/", 1)[0]
        # Skip the synthetic PLAID ``Global`` / ``Global_times`` base
        # (sample-level scalars / tensors); they are surfaced through
        # the dedicated "Globals" toggle in the side drawer.
        if head in {"Global", "Global_times"}:
            continue
        # Skip ``Base_X_Y_times`` bookkeeping bases - they are
        # PLAID time-series duplicates of their companion
        # ``Base_X_Y`` and are not separately renderable.
        if head.startswith("Base_") and not head.endswith("_times"):
            bases.add(head)
    return sorted(bases)

plaid.viewer.services.plaid_dataset_service.PlaidDatasetService.list_base_paths

list_base_paths(dataset_id, base)

Return every PLAID feature path declared under base.

Used by the trame "Base" toggle to translate a base pick into a concrete feature list passed to :meth:set_features. The returned paths span both constant and variable schemas, so :meth:Converter.to_plaid can rebuild the mesh of that base (and any field declared at the dataset level) without pulling in unrelated bases.

Base_X_Y and Base_X_Y_times paths are both returned when present so the time-series bookkeeping companion of the chosen base is loaded along with it.

Source code in plaid/viewer/services/plaid_dataset_service.py
def list_base_paths(self, dataset_id: str, base: str) -> list[str]:
    """Return every PLAID feature path declared under ``base``.

    Used by the trame "Base" toggle to translate a base pick into
    a concrete feature list passed to :meth:`set_features`. The
    returned paths span both constant and variable schemas, so
    :meth:`Converter.to_plaid` can rebuild the *mesh* of that base
    (and any field declared at the dataset level) without pulling
    in unrelated bases.

    ``Base_X_Y`` and ``Base_X_Y_times`` paths are both returned
    when present so the time-series bookkeeping companion of the
    chosen base is loaded along with it.
    """
    constant_keys, variable_keys = self._load_feature_metadata(dataset_id)
    prefix = f"{base}/"
    prefix_times = f"{base}_times/"
    return sorted(
        p
        for p in set(constant_keys) | set(variable_keys)
        if p.startswith(prefix) or p.startswith(prefix_times)
    )

plaid.viewer.services.plaid_dataset_service.PlaidDatasetService.list_globals_paths

list_globals_paths(dataset_id)

Return every PLAID feature path that lives under Global/.

Used by the trame UI to translate the "Globals" toggle into a concrete feature list passed to :meth:set_features. PLAID identifies sample-level scalars / tensors with a singular Global base (and a companion Global_times base for time series), so we accept exactly those two prefixes.

Source code in plaid/viewer/services/plaid_dataset_service.py
def list_globals_paths(self, dataset_id: str) -> list[str]:
    """Return every PLAID feature path that lives under ``Global/``.

    Used by the trame UI to translate the "Globals" toggle into a
    concrete feature list passed to :meth:`set_features`. PLAID
    identifies sample-level scalars / tensors with a singular
    ``Global`` base (and a companion ``Global_times`` base for time
    series), so we accept exactly those two prefixes.
    """
    constant_keys, variable_keys = self._load_feature_metadata(dataset_id)
    return sorted(
        p
        for p in set(constant_keys) | set(variable_keys)
        if p.startswith("Global/") or p.startswith("Global_times/")
    )

plaid.viewer.services.plaid_dataset_service.PlaidDatasetService.set_features

set_features(dataset_id, features)

Set (or clear) the active feature filter for dataset_id.

Only the user-visible field paths (those returned by :meth:list_available_features) are stored. Geometric supports (coordinates, element connectivities, boundary conditions, GridLocation metadata, _times bookkeeping paths, ...) required to render the selected fields are handled transparently by :meth:Converter.to_plaid, which runs :func:~plaid.utils.cgns_helper.update_features_for_CGNS_compatibility internally against its own per-split constant_features / variable_features catalogues. We therefore never pre-expand the selection here - doing so would use the dataset-wide (union) catalogue and, on splits whose data does not contain the selected fields, would hand PLAID a list of coordinates without the fields that justify them and trigger Missing features in dataset/converter in the CGNS expander.

For disk-backed datasets the filter is applied on every call to :meth:Converter.to_plaid during :meth:load_sample. For streaming (Hugging Face Hub) datasets it is injected into :func:plaid.storage.init_streaming_from_hub before any sample is consumed; we therefore invalidate the cached (datasetdict, converterdict) and any open streaming cursors so the next :meth:_open call rebuilds them with the new feature list.

Parameters:

  • dataset_id (str) –

    Target dataset identifier.

  • features (list[str] | None) –

    Field paths to keep (subset of :meth:list_available_features), or None to clear the filter and load every feature.

Returns:

  • list[str] | None

    The normalised, deduplicated feature list (None when no

  • list[str] | None

    filter is active).

Raises:

  • ValueError

    If features contains paths not declared in the dataset metadata.

Source code in plaid/viewer/services/plaid_dataset_service.py
def set_features(
    self, dataset_id: str, features: list[str] | None
) -> list[str] | None:
    """Set (or clear) the active feature filter for ``dataset_id``.

    Only the *user-visible* field paths (those returned by
    :meth:`list_available_features`) are stored. Geometric supports
    (coordinates, element connectivities, boundary conditions,
    ``GridLocation`` metadata, ``_times`` bookkeeping paths, ...)
    required to render the selected fields are handled transparently
    by :meth:`Converter.to_plaid`, which runs
    :func:`~plaid.utils.cgns_helper.update_features_for_CGNS_compatibility`
    internally against its *own* per-split
    ``constant_features`` / ``variable_features`` catalogues. We
    therefore never pre-expand the selection here - doing so would
    use the dataset-wide (union) catalogue and, on splits whose
    data does not contain the selected fields, would hand PLAID a
    list of coordinates *without the fields that justify them* and
    trigger ``Missing features in dataset/converter`` in the CGNS
    expander.

    For disk-backed datasets the filter is applied on every call to
    :meth:`Converter.to_plaid` during :meth:`load_sample`. For
    streaming (Hugging Face Hub) datasets it is injected into
    :func:`plaid.storage.init_streaming_from_hub` *before* any
    sample is consumed; we therefore invalidate the cached
    ``(datasetdict, converterdict)`` and any open streaming cursors
    so the next :meth:`_open` call rebuilds them with the new
    feature list.

    Args:
        dataset_id: Target dataset identifier.
        features: Field paths to keep (subset of
            :meth:`list_available_features`), or ``None`` to clear
            the filter and load every feature.

    Returns:
        The normalised, deduplicated feature list (``None`` when no
        filter is active).

    Raises:
        ValueError: If ``features`` contains paths not declared in
            the dataset metadata.
    """
    if features is None:
        normalised: list[str] | None = None
    else:
        normalised = sorted(dict.fromkeys(str(f) for f in features))
        all_keys = set(self._load_feature_metadata(dataset_id)[0]) | set(
            self._load_feature_metadata(dataset_id)[1]
        )
        unknown = [f for f in normalised if f not in all_keys]
        if unknown:
            raise ValueError(
                f"Unknown features for dataset {dataset_id!r}: {unknown}"
            )
    self._features[dataset_id] = normalised
    # Invalidate store cache so streaming datasets rebuild their
    # IterableDataset with the new feature list. For disk datasets
    # this is not strictly required (features are applied on each
    # ``to_plaid`` call) but keeping a single invalidation policy is
    # simpler and does not hurt performance measurably.
    self._store_cache.pop(dataset_id, None)
    self._cursors = {
        key: cur for key, cur in self._cursors.items() if key[0] != dataset_id
    }
    self._invalidate_sample_cache(dataset_id)
    return normalised

plaid.viewer.services.plaid_dataset_service.PlaidDatasetService.is_streaming

is_streaming(dataset_id)

Return True when dataset_id is a Hugging Face Hub stream.

Streaming datasets have no __len__ on their splits and must be navigated forward-only through :meth:advance_stream_cursor / :meth:reset_stream_cursor rather than indexed.

Source code in plaid/viewer/services/plaid_dataset_service.py
def is_streaming(self, dataset_id: str) -> bool:
    """Return ``True`` when ``dataset_id`` is a Hugging Face Hub stream.

    Streaming datasets have no ``__len__`` on their splits and must be
    navigated forward-only through :meth:`advance_stream_cursor` /
    :meth:`reset_stream_cursor` rather than indexed.
    """
    if not self._is_hub_dataset(dataset_id):
        return False
    try:
        datasetdict, _ = self._open(dataset_id)
    except Exception:  # noqa: BLE001
        return True
    return not all(hasattr(ds, "__len__") for ds in datasetdict.values())

plaid.viewer.services.plaid_dataset_service.PlaidDatasetService.get_dataset

get_dataset(dataset_id)

Return detailed information about a single dataset.

Source code in plaid/viewer/services/plaid_dataset_service.py
def get_dataset(self, dataset_id: str) -> DatasetDetail:
    """Return detailed information about a single dataset."""
    if self._is_hub_dataset(dataset_id):
        splits = self._splits_with_counts(dataset_id)
        return DatasetDetail(
            dataset_id=dataset_id,
            is_streaming=True,
            path=f"hf://{dataset_id}",
            has_infos=False,
            has_problem_definitions=False,
            splits=splits,
            infos=None,
            problem_definitions=[],
        )
    base = self._dataset_dir(dataset_id)
    splits = self._splits_with_counts(dataset_id)
    pb_defs_dir = base / "problem_definitions"
    pb_defs = (
        [
            p.stem
            for p in _safe_list_dir(pb_defs_dir)
            if p.suffix in {".yaml", ".yml"}
        ]
        if pb_defs_dir.is_dir()
        else []
    )
    return DatasetDetail(
        dataset_id=dataset_id,
        is_streaming=False,
        path=str(base),
        has_infos=(base / "infos.yaml").exists() or (base / "infos.json").exists(),
        has_problem_definitions=bool(pb_defs),
        splits=splits,
        infos=self._load_infos(base),
        problem_definitions=pb_defs,
    )

plaid.viewer.services.plaid_dataset_service.PlaidDatasetService.list_samples

list_samples(dataset_id)

Return every sample reference available in a dataset.

For disk-backed datasets, sample ids are the zero-based integer indices used with converter.to_plaid(dataset, index). For streaming datasets (Hugging Face Hub), each split contributes a single reference whose sample_id is the :data:STREAM_CURSOR_ID sentinel; the actual sample is obtained by advancing the per-split cursor with :meth:advance_stream_cursor.

Source code in plaid/viewer/services/plaid_dataset_service.py
def list_samples(self, dataset_id: str) -> list[SampleRefDTO]:
    """Return every sample reference available in a dataset.

    For disk-backed datasets, sample ids are the zero-based integer
    indices used with ``converter.to_plaid(dataset, index)``. For
    streaming datasets (Hugging Face Hub), each split contributes a
    single reference whose ``sample_id`` is the
    :data:`STREAM_CURSOR_ID` sentinel; the actual sample is obtained
    by advancing the per-split cursor with
    :meth:`advance_stream_cursor`.
    """
    datasetdict, _ = self._open(dataset_id)
    streaming = self.is_streaming(dataset_id)

    refs: list[SampleRef] = []
    for split, ds in datasetdict.items():
        split_key = None if split == "__default__" else split
        if streaming:
            refs.append(
                SampleRef(
                    dataset_id=dataset_id,
                    split=split_key,
                    sample_id=STREAM_CURSOR_ID,
                )
            )
            continue
        for index in range(len(ds)):
            refs.append(
                SampleRef(
                    dataset_id=dataset_id,
                    split=split_key,
                    sample_id=str(index),
                )
            )
    return [SampleRefDTO.from_ref(ref) for ref in refs]

plaid.viewer.services.plaid_dataset_service.PlaidDatasetService.stream_cursor_position

stream_cursor_position(dataset_id, split)

Return the current forward position of a streaming cursor.

Returns -1 before the first call to :meth:advance_stream_cursor.

Source code in plaid/viewer/services/plaid_dataset_service.py
def stream_cursor_position(self, dataset_id: str, split: str | None) -> int:
    """Return the current forward position of a streaming cursor.

    Returns ``-1`` before the first call to :meth:`advance_stream_cursor`.
    """
    cursor = self._cursors.get(self._cursor_key(dataset_id, split))
    return cursor.position if cursor is not None else -1

plaid.viewer.services.plaid_dataset_service.PlaidDatasetService.advance_stream_cursor

advance_stream_cursor(dataset_id, split)

Consume the next record from the stream and return its ref.

The returned :class:SampleRef always carries the :data:STREAM_CURSOR_ID sentinel in its sample_id; the underlying record is cached on the service so a subsequent :meth:load_sample call returns the freshly fetched sample.

Raises:

  • StopIteration

    If the underlying stream is exhausted.

Source code in plaid/viewer/services/plaid_dataset_service.py
def advance_stream_cursor(self, dataset_id: str, split: str | None) -> SampleRef:
    """Consume the next record from the stream and return its ref.

    The returned :class:`SampleRef` always carries the
    :data:`STREAM_CURSOR_ID` sentinel in its ``sample_id``; the
    underlying record is cached on the service so a subsequent
    :meth:`load_sample` call returns the freshly fetched sample.

    Raises:
        StopIteration: If the underlying stream is exhausted.
    """
    key = self._cursor_key(dataset_id, split)
    cursor = self._cursors.get(key)
    if cursor is None or cursor.iterator is None:
        cursor = self._build_cursor(dataset_id, split)
        self._cursors[key] = cursor
    try:
        record = next(cursor.iterator)
    except StopIteration:
        cursor.exhausted = True
        raise
    cursor.current_record = record
    cursor.position += 1
    return SampleRef(
        dataset_id=dataset_id,
        split=split,
        sample_id=STREAM_CURSOR_ID,
    )

plaid.viewer.services.plaid_dataset_service.PlaidDatasetService.reset_stream_cursor

reset_stream_cursor(dataset_id, split)

Rebuild a fresh iterator for (dataset_id, split).

The cached record is discarded and the position reset to -1 so the next :meth:advance_stream_cursor call yields the first sample again.

Source code in plaid/viewer/services/plaid_dataset_service.py
def reset_stream_cursor(self, dataset_id: str, split: str | None) -> None:
    """Rebuild a fresh iterator for ``(dataset_id, split)``.

    The cached record is discarded and the position reset to ``-1``
    so the next :meth:`advance_stream_cursor` call yields the first
    sample again.
    """
    key = self._cursor_key(dataset_id, split)
    self._cursors[key] = self._build_cursor(dataset_id, split)

plaid.viewer.services.plaid_dataset_service.PlaidDatasetService.load_sample

load_sample(ref)

Return a PLAID :class:plaid.Sample for the given reference.

Uses converter.to_plaid(dataset, index) to rebuild the sample from whatever backend store (hf_datasets, cgns, zarr) is in use.

For random-access (non-streaming) samples, results are memoised in a small LRU keyed on (dataset_id, split_key, sample_id, features_tuple) so that repeated calls within the same UI interaction (summary, globals, non-visual bases, paraview artifact build, playback frames, ...) only incur a single Converter.to_plaid decode. The cache is invalidated whenever the active feature filter, datasets root, or hub registration changes. Streaming samples bypass the cache because their sample_id is the constant :data:STREAM_CURSOR_ID sentinel.

Source code in plaid/viewer/services/plaid_dataset_service.py
def load_sample(self, ref: SampleRef):
    """Return a PLAID :class:`plaid.Sample` for the given reference.

    Uses ``converter.to_plaid(dataset, index)`` to rebuild the sample
    from whatever backend store (hf_datasets, cgns, zarr) is in use.

    For random-access (non-streaming) samples, results are memoised
    in a small LRU keyed on
    ``(dataset_id, split_key, sample_id, features_tuple)`` so that
    repeated calls within the same UI interaction (summary, globals,
    non-visual bases, paraview artifact build, playback frames, ...)
    only incur a single ``Converter.to_plaid`` decode. The cache is
    invalidated whenever the active feature filter, datasets root,
    or hub registration changes. Streaming samples bypass the cache
    because their ``sample_id`` is the constant
    :data:`STREAM_CURSOR_ID` sentinel.
    """
    datasetdict, converterdict = self._open(ref.dataset_id)
    split_key = ref.split if ref.split is not None else "__default__"
    if split_key not in datasetdict:
        # Fallback: some converters return a single unnamed split.
        if len(datasetdict) == 1:
            split_key = next(iter(datasetdict))
        else:
            raise KeyError(
                f"Split {ref.split!r} not found in dataset {ref.dataset_id!r}; "
                f"available splits: {sorted(datasetdict.keys())}"
            )
    dataset = datasetdict[split_key]
    converter = converterdict[split_key]
    # Streaming datasets expose a forward-only cursor rather than
    # random access. The viewer drives the cursor explicitly via
    # ``advance_stream_cursor`` and then calls ``load_sample`` with
    # ``sample_id == STREAM_CURSOR_ID`` to materialise the PLAID
    # sample from the most recently consumed raw record.
    if ref.sample_id == STREAM_CURSOR_ID:
        cursor = self._cursors.get(self._cursor_key(ref.dataset_id, ref.split))
        if cursor is None or cursor.current_record is None:
            # Auto-advance once so a fresh selection behaves like
            # "show me the first sample".
            self.advance_stream_cursor(ref.dataset_id, ref.split)
            cursor = self._cursors[self._cursor_key(ref.dataset_id, ref.split)]
        # Streaming converters use ``sample_to_plaid`` (single record)
        # rather than ``to_plaid(dataset, index)`` (random access).
        return converter.sample_to_plaid(cursor.current_record)

    try:
        index = int(ref.sample_id)
    except ValueError as exc:
        raise ValueError(
            f"Invalid sample id {ref.sample_id!r}; expected an integer index."
        ) from exc
    features = self._features.get(ref.dataset_id)
    cache_key = (
        ref.dataset_id,
        split_key,
        ref.sample_id,
        tuple(features) if features is not None else None,
    )
    cached = self._sample_cache.get(cache_key)
    if cached is not None:
        self._sample_cache.move_to_end(cache_key)
        return cached
    sample = self._decode_sample(ref, converter, dataset, index, features)
    self._sample_cache[cache_key] = sample
    if len(self._sample_cache) > self._sample_cache_capacity:
        self._sample_cache.popitem(last=False)
    return sample

plaid.viewer.services.plaid_dataset_service.PlaidDatasetService.get_sample_summary

get_sample_summary(ref)

Return a minimal summary of the PLAID sample.

Source code in plaid/viewer/services/plaid_dataset_service.py
def get_sample_summary(self, ref: SampleRef) -> SampleSummary:
    """Return a minimal summary of the PLAID sample."""
    sample = self.load_sample(ref)
    times = self._time_keys(sample)
    bases, zones_by_base, fields_by_base = self._describe_tree(sample, times)
    globals_dict = {
        name: str(sample.get_scalar(name)) for name in sample.get_scalar_names()
    }
    return SampleSummary(
        ref=SampleRefDTO.from_ref(ref),
        n_times=len(times),
        time_values=list(times),
        bases=bases,
        zones_by_base=zones_by_base,
        fields_by_base=fields_by_base,
        globals=globals_dict,
    )

plaid.viewer.services.plaid_dataset_service.PlaidDatasetService.list_time_values

list_time_values(ref)

Return the sorted list of time values available for a sample.

Thin wrapper around :meth:plaid.Sample.get_all_time_values that always returns a list[float] (it may be empty for static samples).

Source code in plaid/viewer/services/plaid_dataset_service.py
def list_time_values(self, ref: SampleRef) -> list[float]:
    """Return the sorted list of time values available for a sample.

    Thin wrapper around :meth:`plaid.Sample.get_all_time_values`
    that always returns a ``list[float]`` (it may be empty for static
    samples).
    """
    sample = self.load_sample(ref)
    try:
        times = sample.get_all_time_values()
    except Exception:  # noqa: BLE001 - defensive, PLAID shouldn't raise
        return []
    return sorted(float(t) for t in times)

plaid.viewer.services.plaid_dataset_service.PlaidDatasetService.describe_globals

describe_globals(ref, *, time=None)

Return PLAID global scalars/tensors reported by the sample.

Uses :meth:plaid.Sample.get_global_names to enumerate globals and :meth:plaid.Sample.get_global to fetch each value, so only the "real" globals exposed by PLAID's API are reported. The CGNS bookkeeping arrays IterationValues and TimeValues (which describe time steps, not physical scalars) are filtered out.

Parameters:

  • ref (SampleRef) –

    The sample to inspect.

  • time (float | None, default: None ) –

    Optional time value; when None the sample's first available time (or the static value) is used.

Returns:

  • list[dict[str, object]]

    A list of ``{"name": str, "shape": list[int], "dtype": str,

  • list[dict[str, object]]

    "preview": str | None}`` descriptors, one per global.

Source code in plaid/viewer/services/plaid_dataset_service.py
def describe_globals(
    self, ref: SampleRef, *, time: float | None = None
) -> list[dict[str, object]]:
    """Return PLAID global scalars/tensors reported by the sample.

    Uses :meth:`plaid.Sample.get_global_names` to enumerate globals
    and :meth:`plaid.Sample.get_global` to fetch each value, so only
    the "real" globals exposed by PLAID's API are reported. The CGNS
    bookkeeping arrays ``IterationValues`` and ``TimeValues`` (which
    describe time steps, not physical scalars) are filtered out.

    Args:
        ref: The sample to inspect.
        time: Optional time value; when ``None`` the sample's first
            available time (or the static value) is used.

    Returns:
        A list of ``{"name": str, "shape": list[int], "dtype": str,
        "preview": str | None}`` descriptors, one per global.
    """
    sample = self.load_sample(ref)
    return self._describe_globals_for_sample(sample, time=time)

plaid.viewer.services.plaid_dataset_service.PlaidDatasetService.describe_globals_all_times

describe_globals_all_times(ref)

Return globals descriptors for every time step of a sample.

Performs a single :meth:load_sample call (which goes through the in-memory LRU) and then iterates over every time value exposed by :meth:plaid.Sample.get_all_time_values, building a {time: [globals_entries]} snapshot. The trame playback loop can then refresh the globals panel by indexing into this dict instead of triggering a fresh service call (and risking a per-frame decode on backends that bypass the cache).

Parameters:

Returns:

  • list[dict[str, object]]

    A pair (static, by_time) where static is the

  • dict[float, list[dict[str, object]]]

    time-less globals listing (used as a fallback when the

  • tuple[list[dict[str, object]], dict[float, list[dict[str, object]]]]

    sample has no time axis or the requested time is missing)

  • tuple[list[dict[str, object]], dict[float, list[dict[str, object]]]]

    and by_time maps each available float time value

  • tuple[list[dict[str, object]], dict[float, list[dict[str, object]]]]

    to its globals descriptors.

Source code in plaid/viewer/services/plaid_dataset_service.py
def describe_globals_all_times(
    self, ref: SampleRef
) -> tuple[list[dict[str, object]], dict[float, list[dict[str, object]]]]:
    """Return globals descriptors for every time step of a sample.

    Performs a single :meth:`load_sample` call (which goes through
    the in-memory LRU) and then iterates over every time value
    exposed by :meth:`plaid.Sample.get_all_time_values`,
    building a ``{time: [globals_entries]}`` snapshot. The trame
    playback loop can then refresh the globals panel by indexing
    into this dict instead of triggering a fresh service call (and
    risking a per-frame decode on backends that bypass the cache).

    Args:
        ref: The sample to inspect.

    Returns:
        A pair ``(static, by_time)`` where ``static`` is the
        time-less globals listing (used as a fallback when the
        sample has no time axis or the requested time is missing)
        and ``by_time`` maps each available ``float`` time value
        to its globals descriptors.
    """
    sample = self.load_sample(ref)
    static = self._describe_globals_for_sample(sample, time=None)
    try:
        times = sample.get_all_time_values()
    except Exception:  # noqa: BLE001 - defensive, PLAID shouldn't raise
        times = []
    by_time: dict[float, list[dict[str, object]]] = {}
    for raw_time in times:
        t = float(raw_time)
        by_time[t] = self._describe_globals_for_sample(sample, time=t)
    return static, by_time

plaid.viewer.services.plaid_dataset_service.PlaidDatasetService.describe_non_visual_bases

describe_non_visual_bases(ref)

Return data arrays of CGNS bases that carry no zones.

Some datasets store auxiliary tensors (constants, global reference values, look-up tables, ...) inside a CGNS base that has no Zone_t children, so VTK cannot render them as geometry. This method returns, for each zone-less base, a list of descriptors {"name": str, "shape": list[int], "dtype": str, "preview": str | None} suitable for display in the viewer.

Parameters:

Returns:

  • dict[str, list[dict[str, object]]]

    A mapping from base name to a list of data-array descriptors.

  • dict[str, list[dict[str, object]]]

    Bases that do contain zones are omitted.

Source code in plaid/viewer/services/plaid_dataset_service.py
def describe_non_visual_bases(
    self, ref: SampleRef
) -> dict[str, list[dict[str, object]]]:
    """Return data arrays of CGNS bases that carry no zones.

    Some datasets store auxiliary tensors (constants, global reference
    values, look-up tables, ...) inside a CGNS base that has no
    ``Zone_t`` children, so VTK cannot render them as geometry. This
    method returns, for each zone-less base, a list of descriptors
    ``{"name": str, "shape": list[int], "dtype": str,
    "preview": str | None}`` suitable for display in the viewer.

    Args:
        ref: The sample to inspect.

    Returns:
        A mapping from base name to a list of data-array descriptors.
        Bases that do contain zones are omitted.
    """
    sample = self.load_sample(ref)
    times = self._time_keys(sample)
    if not times:
        return {}
    try:
        from CGNS.PAT import cgnskeywords as CK  # noqa: PLC0415
        from CGNS.PAT import cgnsutils as CU  # noqa: PLC0415
    except ImportError:  # pragma: no cover - defensive
        return {}
    tree = sample.data[times[0]]
    summary: dict[str, list[dict[str, object]]] = {}
    for base_node in (
        CU.hasChildType(tree, CK.CGNSBase_ts) or []
    ):  # pragma: no cover - CGNS tree introspection
        if CU.hasChildType(base_node, CK.Zone_ts):
            continue
        summary[base_node[0]] = _collect_data_arrays(base_node)
    return summary

plaid.viewer.services.plaid_dataset_service.PlaidDatasetService.get_sample_validation

get_sample_validation(ref)

Check basic sample completeness using PLAID's built-in validator.

Source code in plaid/viewer/services/plaid_dataset_service.py
def get_sample_validation(self, ref: SampleRef) -> ValidationResult:
    """Check basic sample completeness using PLAID's built-in validator."""
    warnings: list[str] = []
    errors: list[str] = []
    try:
        sample = self.load_sample(ref)
    except Exception as exc:  # noqa: BLE001 - surface error to API caller
        # Always log the full traceback: ``str(exc)`` can be empty
        # for some C-extension exceptions (VTK / CGNS / HDF5), in
        # which case the API response would otherwise reduce to a
        # bare "Failed to load sample:" with no diagnostic value.
        logger.exception("Failed to load sample %s", ref.encode())
        message = str(exc) or exc.__class__.__name__
        return ValidationResult(
            ref=SampleRefDTO.from_ref(ref),
            ok=False,
            errors=[f"Failed to load sample ({type(exc).__name__}): {message}"],
        )
    try:
        report = sample.check_completeness()
    except Exception as exc:  # noqa: BLE001
        logger.exception("Completeness check failed for sample %s", ref.encode())
        message = str(exc) or exc.__class__.__name__
        return ValidationResult(
            ref=SampleRefDTO.from_ref(ref),
            ok=False,
            errors=[f"Completeness check failed ({type(exc).__name__}): {message}"],
        )
    ok = isinstance(report, str) and "error" not in report.lower()
    if report and not ok:
        errors.append(report)
    elif report:
        warnings.append(report)
    return ValidationResult(
        ref=SampleRefDTO.from_ref(ref),
        ok=ok,
        warnings=warnings,
        errors=errors,
    )