"""Implementation of the `Sample` container."""
# -*- coding: utf-8 -*-
#
# This file is subject to the terms and conditions defined in
# file 'LICENSE.txt', which is part of this source code package.
#
#
# %% Imports
import sys
if sys.version_info >= (3, 11):
from typing import Self
else: # pragma: no cover
from typing import TypeVar
import copy
import logging
import pickle
import shutil
import subprocess
from pathlib import Path
from typing import Any, Optional, Union
import CGNS.MAP as CGM
import CGNS.PAT.cgnsutils as CGU
import numpy as np
from pydantic import BaseModel, ConfigDict, PrivateAttr
from pydantic import Field as PydanticField
from plaid.constants import (
AUTHORIZED_FEATURE_INFOS,
AUTHORIZED_FEATURE_TYPES,
CGNS_FIELD_LOCATIONS,
)
from plaid.containers.feature_identifier import FeatureIdentifier
from plaid.containers.features import SampleFeatures
from plaid.containers.utils import get_feature_type_and_details_from
from plaid.types import (
Feature,
Scalar,
)
from plaid.utils import cgns_helper as CGH
from plaid.utils.base import delegate_methods, safe_len
from plaid.utils.deprecation import deprecated
logger = logging.getLogger(__name__)
[docs]
CGNS_WORKER = Path(__file__).parent.parent / "utils" / "cgns_worker.py"
[docs]
FEATURES_METHODS = [
"set_default_base",
"set_default_zone_base",
"get_base_assignment",
"get_zone_assignment",
"set_default_time",
"get_all_time_values",
"get_all_mesh_times",
"get_mesh",
"get_base_names",
"get_zone_names",
"get_nodal_tags",
"has_globals",
"get_global",
"add_global",
"del_global",
"get_global_names",
"get_nodes",
"get_elements",
"get_field_names",
"get_field",
"show_tree",
"set_nodes",
"del_field",
"add_field",
"init_base",
"init_zone",
"init_tree",
"add_tree",
"del_tree",
]
@delegate_methods("features", FEATURES_METHODS)
[docs]
class Sample(BaseModel):
"""Represents a single sample. It contains data and information related to a single observation or measurement within a dataset.
By default, the sample is empty but:
- You can provide a path to a folder containing the sample data, and it will be loaded during initialization.
- You can provide `SampleFeatures` and `SampleFeatures` instances to initialize the sample with existing data.
The default `SampleFeatures` instance is initialized with `data=None` (i.e., no mesh data).
"""
# Pydantic configuration
[docs]
model_config = ConfigDict(
arbitrary_types_allowed=True, revalidate_instances="always", extra="forbid"
)
# Attributes
[docs]
path: Optional[Union[str, Path]] = PydanticField(
None,
description="Path to the folder containing the sample data. If provided, the sample will be loaded from this path during initialization. Defaults to None.",
)
[docs]
features: Optional[SampleFeatures] = PydanticField(
default_factory=lambda _: SampleFeatures(data=None),
description="An instance of SampleFeatures containing mesh data. Defaults to an empty `SampleFeatures` object.",
)
# Private attributes
_extra_data: Optional[dict] = PrivateAttr(default=None)
[docs]
def model_post_init(self, _context: Any) -> None:
"""Post-initialization processing for the Sample model."""
# Load if path is provided
if self.path is not None:
path = Path(self.path)
self.load(path)
[docs]
def copy(self) -> Self: # pyright: ignore[reportIncompatibleMethodOverride]
"""Create a deep copy of the current `Sample` instance.
Usage of `model_copy(deep=True)` from Pydantic to ensure all internal data is deeply copied.
Returns:
A new `Sample` instance with all internal data (scalars, fields, meshes, etc.)
deeply copied to ensure full isolation from the original.
Note:
This operation may be memory-intensive for large samples.
"""
return self.model_copy(deep=True)
[docs]
def get_scalar(self, name: str) -> Optional[Scalar]:
"""Retrieve a scalar value associated with the given name.
Args:
name (str): The name of the scalar value to retrieve.
Returns:
Scalar or None: The scalar value associated with the given name, or None if the name is not found.
"""
return self.features.get_global(name)
[docs]
def add_scalar(self, name: str, value: Scalar) -> None:
"""Add a scalar value to a dictionary.
Args:
name (str): The name of the scalar value.
value (Scalar): The scalar value to add or update in the dictionary.
"""
self.features.add_global(name, value)
[docs]
def del_scalar(self, name: str) -> Scalar:
"""Delete a scalar value from the dictionary.
Args:
name (str): The name of the scalar value to be deleted.
Raises:
KeyError: Raised when there is no scalar / there is no scalar with the provided name.
Returns:
Scalar: The value of the deleted scalar.
"""
return self.features.del_global(name)
[docs]
def get_scalar_names(self) -> list[str]:
"""Get a set of scalar names available in the object.
Returns:
list[str]: A set containing the names of the available scalars.
"""
return self.features.get_global_names()
# -------------------------------------------------------------------------#
[docs]
def del_all_fields(
self,
) -> Self:
"""Delete alls field from sample, while keeping geometrical info.
Returns:
Sample: The sample with deleted fields
"""
all_features_identifiers = self.get_all_features_identifiers()
# Delete all fields in the sample
for feat_id in all_features_identifiers:
if feat_id["type"] == "field":
self.del_field(
name=feat_id["name"],
location=feat_id["location"],
zone_name=feat_id["zone_name"],
base_name=feat_id["base_name"],
time=feat_id["time"],
)
return self
# -------------------------------------------------------------------------#
[docs]
def get_all_features_identifiers(
self,
) -> list[FeatureIdentifier]:
"""Get all features identifiers from the sample.
Returns:
list[FeatureIdentifier]: A list of dictionaries containing the identifiers of all features in the sample.
"""
all_features_identifiers = []
for sn in self.get_scalar_names():
all_features_identifiers.append({"type": "scalar", "name": sn})
for t in self.features.get_all_time_values():
for bn in self.features.get_base_names(time=t):
for zn in self.features.get_zone_names(base_name=bn, time=t):
if (
self.features.get_nodes(base_name=bn, zone_name=zn, time=t)
is not None
):
all_features_identifiers.append(
{
"type": "nodes",
"base_name": bn,
"zone_name": zn,
"time": t,
}
)
for loc in CGNS_FIELD_LOCATIONS:
for fn in self.features.get_field_names(
location=loc, zone_name=zn, base_name=bn, time=t
):
all_features_identifiers.append(
{
"type": "field",
"name": fn,
"base_name": bn,
"zone_name": zn,
"location": loc,
"time": t,
}
)
return all_features_identifiers
[docs]
def get_all_features_identifiers_by_type(
self, feature_type: str
) -> list[FeatureIdentifier]:
"""Get all features identifiers of a given type from the sample.
Args:
feature_type (str): Type of features to return
Returns:
list[FeatureIdentifier]: A list of dictionaries containing the identifiers of a given type of all features in the sample.
"""
assert feature_type in AUTHORIZED_FEATURE_TYPES, "feature_type not known"
all_features_identifiers = self.get_all_features_identifiers()
return [
feat_id
for feat_id in all_features_identifiers
if feat_id["type"] == feature_type
]
[docs]
def get_feature_by_path(self, path: str, time: Optional[int] = None) -> Feature:
"""Retrieve a feature value from the sample's CGNS mesh using a CGNS-style path.
Args:
path (str): CGNS node path relative to the mesh root (for example
"BaseName/ZoneName/GridCoordinates/CoordinateX" or
"BaseName/ZoneName/Solution/FieldName").
time (Optional[int], optional): Time selection for the mesh. If an integer,
it is interpreted via the SampleFeatures time-assignment logic
(see SampleFeatures.get_time_assignment). If None, the default time
assignment is used. Defaults to None.
Returns:
Feature: The value stored at the given CGNS path. This may be a numpy array, a scalar, or None if the node has no value.
Note:
- This is a thin wrapper around CGNS.PAT.cgnsutils.getValueByPath and Sample.get_mesh(time). Callers should handle a returned None when the path or value does not exist.
- For field-like features, prefer using Sample.get_field which applies additional validation and selection logic.
"""
time = self.features.get_time_assignment(time)
return CGU.getValueByPath(self.get_mesh(time), path)
[docs]
def get_feature_from_string_identifier(
self, feature_string_identifier: str
) -> Feature:
"""Retrieve a specific feature from its encoded string identifier.
The `feature_string_identifier` must follow the format:
"<feature_type>::<detail1>/<detail2>/.../<detailN>"
Supported feature types:
- "scalar": expects 1 detail → `scalars.get(name)`
- "field": up to 5 details → `get_field(name, base_name, zone_name, location, time)`
- "nodes": up to 3 details → `get_nodes(base_name, zone_name, time)`
Args:
feature_string_identifier (str): Structured identifier of the feature.
Returns:
Feature: The retrieved feature object.
Raises:
AssertionError: If `feature_type` is unknown.
Warnings:
- If "time" is present in a field/nodes identifier, it is cast to float.
- `name` is required for scalar and field features.
- The order of the details must be respected. One cannot specify a detail in the feature_string_identifier string without specified the previous ones.
"""
splitted_identifier = feature_string_identifier.split("::")
feature_type = splitted_identifier[0]
feature_details = [detail for detail in splitted_identifier[1].split("/")]
assert feature_type in AUTHORIZED_FEATURE_TYPES, "feature_type not known"
arg_names = AUTHORIZED_FEATURE_INFOS[feature_type]
assert len(arg_names) >= len(feature_details), "Too much details provided"
if feature_type == "scalar":
val = self.get_scalar(feature_details[0])
if val is None:
raise KeyError(
f"Unknown scalar {feature_details[0]}"
) # pragma: no cover
return val
elif feature_type == "field":
kwargs = {arg_names[i]: detail for i, detail in enumerate(feature_details)}
for k in kwargs:
if kwargs[k] == "":
kwargs[k] = None
if "time" in kwargs:
kwargs["time"] = float(kwargs["time"])
return self.get_field(**kwargs)
elif feature_type == "nodes":
kwargs = {arg_names[i]: detail for i, detail in enumerate(feature_details)}
for k in kwargs:
if kwargs[k] == "":
kwargs[k] = None
if "time" in kwargs:
kwargs["time"] = float(kwargs["time"])
return self.get_nodes(**kwargs).flatten()
[docs]
def get_feature_from_identifier(
self, feature_identifier: FeatureIdentifier
) -> Feature:
"""Retrieve a feature object based on a structured identifier dictionary.
The `feature_identifier` must include a `"type"` key specifying the feature kind:
- `"scalar"` → calls `scalars.get(name)`
- `"field"` → calls `get_field(name, base_name, zone_name, location, time)`
- `"nodes"` → calls `get_nodes(base_name, zone_name, time)`
Required keys:
- `"type"`: one of `"scalar"`, `"field"`, or `"nodes"`
- `"name"`: required for all types except `"nodes"`
Optional keys depending on type:
- `"base_name"`, `"zone_name"`, `"location"`, `"time"`: used in `"field"` and `"nodes"`
Any omitted optional keys will rely on the default values mechanics of the class.
Args:
feature_identifier ( dict[str:Union[str, float]]):
A dictionary encoding the feature type and its relevant parameters.
Returns:
Feature: The corresponding feature instance retrieved via the appropriate accessor.
"""
feature_type, feature_details = get_feature_type_and_details_from(
feature_identifier
)
if feature_type == "scalar":
return self.get_scalar(**feature_details)
elif feature_type == "field":
return self.get_field(**feature_details)
elif feature_type == "nodes":
return self.get_nodes(**feature_details).flatten()
[docs]
def get_features_from_identifiers(
self, feature_identifiers: list[FeatureIdentifier]
) -> list[Feature]:
"""Retrieve features based on a list of structured identifier dictionaries.
Elements of `feature_identifiers` must include a `"type"` key specifying the feature kind:
- `"scalar"` → calls `scalars.get(name)`
- `"field"` → calls `get_field(name, base_name, zone_name, location, time)`
- `"nodes"` → calls `get_nodes(base_name, zone_name, time)`
Required keys:
- `"type"`: one of `"scalar"`, `"field"`, or `"nodes"`
- `"name"`: required for all types except `"nodes"`
Optional keys depending on type:
- `"base_name"`, `"zone_name"`, `"location"`, `"time"`: used in `"field"` and `"nodes"`
Any omitted optional keys will rely on the default values mechanics of the class.
Args:
feature_identifiers (list[FeatureIdentifier]):
A dictionary encoding the feature type and its relevant parameters.
Returns:
list[Feature]: List of corresponding feature instance retrieved via the appropriate accessor.
"""
all_features_info = [
get_feature_type_and_details_from(feat_id)
for feat_id in feature_identifiers
]
features = []
for feature_type, feature_details in all_features_info:
if feature_type == "scalar":
features.append(self.get_scalar(**feature_details))
elif feature_type == "field":
features.append(self.get_field(**feature_details))
elif feature_type == "nodes":
features.append(self.get_nodes(**feature_details).flatten())
return features
[docs]
def add_feature(
self,
feature_identifier: FeatureIdentifier,
feature: Feature,
) -> Self:
"""Add a feature to current sample.
This method applies updates to scalars, fields, or nodes
using feature identifiers, and corresponding feature data.
Args:
feature_identifier (dict): A feature identifier.
feature (Feature): A feature corresponding to the identifiers.
Returns:
Self: The updated sample
Raises:
AssertionError: If types are inconsistent or identifiers contain unexpected keys.
"""
feature_type, feature_details = get_feature_type_and_details_from(
feature_identifier
)
if feature_type == "scalar":
if safe_len(feature) == 1:
feature = feature[0]
self.add_scalar(**feature_details, value=feature)
elif feature_type == "field":
self.add_field(**feature_details, field=feature, warning_overwrite=False)
elif feature_type == "nodes":
physical_dim_arg = {
k: v for k, v in feature_details.items() if k in ["base_name", "time"]
}
phys_dim = self.features.get_physical_dim(**physical_dim_arg)
self.set_nodes(**feature_details, nodes=feature.reshape((-1, phys_dim)))
return self
[docs]
def del_feature(
self,
feature_identifier: FeatureIdentifier,
) -> Self:
"""Remove a feature from current sample.
This method applies updates to scalars, time series, fields, or nodes using feature identifiers.
Args:
feature_identifier (dict): A feature identifier.
Returns:
Self: The updated sample
Raises:
AssertionError: If types are inconsistent or identifiers contain unexpected keys.
"""
feature_type, feature_details = get_feature_type_and_details_from(
feature_identifier
)
if feature_type == "scalar":
self.del_scalar(**feature_details)
elif feature_type == "field":
self.del_field(**feature_details)
elif feature_type == "nodes":
raise NotImplementedError("Deleting node features is not implemented.")
return self
[docs]
def update_features_from_identifier(
self,
feature_identifiers: dict[
int, Union[FeatureIdentifier, list[FeatureIdentifier]]
],
features: Union[Feature, list[Feature]],
in_place: bool = False,
) -> Self:
"""Update one or several features of the sample by their identifier(s).
This method applies updates to scalars, fields, or nodes
using feature identifiers, and corresponding feature data. When `in_place=False`, a deep copy of the sample is created
before applying updates, ensuring full isolation from the original.
Args:
feature_identifiers (FeatureIdentifier or list of FeatureIdentifier): One or more feature identifiers.
features (dict of Feature or list of Feature): One or more features corresponding
to the identifiers.
in_place (bool, optional): If True, modifies the current sample in place.
If False, returns a deep copy with updated features.
Returns:
Self: The updated sample (either the current instance or a new copy).
Raises:
AssertionError: If types are inconsistent or identifiers contain unexpected keys.
"""
if not isinstance(feature_identifiers, list):
feature_identifiers = [feature_identifiers]
features = [features]
assert len(feature_identifiers) == len(features)
for i_id, feat_id in enumerate(feature_identifiers):
feature_identifiers[i_id] = FeatureIdentifier(feat_id)
sample = self if in_place else self.copy()
for feat_id, feat in zip(feature_identifiers, features):
sample.add_feature(feat_id, feat)
return sample
@deprecated(
"`Dataset.from_features_identifier(...)` is deprecated, use instead `Dataset.extract_sample_from_identifier(...)`",
version="0.1.8",
removal="0.2",
)
[docs]
def from_features_identifier(
self,
feature_identifiers: Union[FeatureIdentifier, list[FeatureIdentifier]],
) -> Self:
"""DEPRECATED: Use :meth:`Dataset.extract_sample_from_identifier` instead."""
return self.extract_sample_from_identifier(
feature_identifiers
) # pragma: no cover
[docs]
def merge_features(self, sample: Self, in_place: bool = False) -> Self:
"""Merge features from another sample into the current sample.
This method applies updates to scalars, fields, or nodes
using features from another sample. When `in_place=False`, a deep copy of the sample is created
before applying updates, ensuring full isolation from the original.
Args:
sample (Sample): The sample from which features will be merged.
in_place (bool, optional): If True, modifies the current sample in place.
If False, returns a deep copy with updated features.
Returns:
Self: The updated sample (either the current instance or a new copy).
"""
merged_dataset = self if in_place else self.copy()
all_features_identifiers = sample.get_all_features_identifiers()
all_features = sample.get_features_from_identifiers(all_features_identifiers)
feature_types = set([feat_id["type"] for feat_id in all_features_identifiers])
# if field or node features are to extract, copy the source sample and delete all fields
if "field" in feature_types or "nodes" in feature_types:
source_sample = sample.copy()
source_sample.del_all_fields()
# DELETE LATER IF CONFIRMED THIS IS NOT NEEDED (WITH GLOBAL, THERE IS ALWAYS A TREE)
# for feat_id in all_features_identifiers:
# # if trying to add a field or nodes, must check if the corresponding tree exists, and add it if not
# if feat_id["type"] in ["field", "nodes"]:
# # get time of current feature
# time = sample.features.get_time_assignment(time=feat_id.get("time"))
# # if the constructed sample does not have a tree, add the one from the source sample, with no field
# if not merged_dataset.features.get_mesh(time):
# merged_dataset.features.add_tree(source_sample.get_mesh(time))
return merged_dataset.update_features_from_identifier(
feature_identifiers=all_features_identifiers,
features=all_features,
in_place=in_place,
)
# -------------------------------------------------------------------------#
@deprecated(
"`Dataset.save(...)` is deprecated, use instead `Dataset.save_to_dir(...)`",
version="0.1.8",
removal="0.2",
)
[docs]
def save(
self, path: Union[str, Path], overwrite: bool = False, memory_safe: bool = False
) -> None:
"""DEPRECATED: use :meth:`Dataset.save_to_dir` instead."""
self.save_to_dir(path, overwrite=overwrite, memory_safe=memory_safe)
# -------------------------------------------------------------------------#
[docs]
def save_to_dir(
self, path: Union[str, Path], overwrite: bool = False, memory_safe: bool = False
) -> None:
"""Save the Sample in directory `path`.
Args:
path (Union[str,Path]): relative or absolute directory path.
overwrite (bool): target directory overwritten if True.
memory_safe (bool): use pyCGNS save in a subprocess (requires an additional pickle of the sample) if True.
"""
path = Path(path)
if path.is_dir():
if overwrite:
shutil.rmtree(path)
logger.warning(f"Existing {path} directory has been reset.")
elif any(path.iterdir()):
raise ValueError(
f"directory {path} already exists and is not empty. Set `overwrite` to True if needed."
)
path.mkdir(exist_ok=True)
mesh_dir = path / "meshes"
if self.features.data:
mesh_dir.mkdir()
for i, time in enumerate(self.features.data.keys()):
outfname = mesh_dir / f"mesh_{i:09d}.cgns"
if memory_safe:
tmpfile = mesh_dir / f"mesh_{i:09d}.pkl"
with open(tmpfile, "wb") as f:
pickle.dump(self.features.data[time], f)
cmd = [sys.executable, str(CGNS_WORKER), tmpfile, str(outfname)]
subprocess.run(cmd)
logger.debug(f"save -> {outfname}")
else:
status = CGM.save(str(outfname), self.features.data[time])
logger.debug(f"save -> {status=}")
@classmethod
[docs]
def load_from_dir(cls, path: Union[str, Path]) -> Self:
"""Load the Sample from directory `path`.
This is a class method, you don't need to instantiate a `Sample` first.
Args:
path (Union[str,Path]): Relative or absolute directory path.
Returns:
Sample
Example:
.. code-block:: python
from plaid import Sample
sample = Sample.load_from_dir(dir_path)
print(sample)
>>> Sample(2 scalars, 1 timestamp, 5 fields)
Note:
It calls :meth:`Sample.load` method during execution.
"""
path = Path(path)
instance = cls()
instance.load(path)
return instance
[docs]
def load(self, path: Union[str, Path]) -> None:
"""Load the Sample from directory `path`.
Args:
path (Union[str,Path]): Relative or absolute directory path.
Raises:
FileNotFoundError: Triggered if the provided directory does not exist.
FileExistsError: Triggered if the provided path is a file instead of a directory.
Example:
.. code-block:: python
from plaid import Sample
sample = Sample()
sample.load(path)
print(sample)
>>> Sample(3 scalars, 1 timestamp, 3 fields)
"""
path = Path(path)
if not path.exists():
raise FileNotFoundError(f'Directory "{path}" does not exist. Abort')
if not path.is_dir():
raise FileExistsError(f'"{path}" is not a directory. Abort')
meshes_dir = path / "meshes"
if meshes_dir.is_dir():
meshes_names = list(meshes_dir.glob("*"))
nb_meshes = len(meshes_names)
for i in range(nb_meshes):
tree, _, _ = CGM.load(str(meshes_dir / f"mesh_{i:09d}.cgns"))
time = CGH.get_time_values(tree)
(self.features.data[time],) = (tree,)
old_scalars_file = path / "scalars.csv"
if old_scalars_file.is_file():
self._load_old_scalars(old_scalars_file)
old_time_series_files = list(path.glob("time_series_*.csv"))
if len(old_time_series_files) > 0:
self._load_old_time_series(old_time_series_files)
@deprecated(
reason="This Sample was written with plaid<=0.1.9, save it with plaid>=0.1.10 to have all features embedded in the CGNS tree",
version="0.1.10",
removal="0.2.0",
)
def _load_old_scalars(self, scalars_file: Path):
names = np.loadtxt(scalars_file, dtype=str, max_rows=1, delimiter=",").reshape(
(-1,)
)
scalars = np.loadtxt(
scalars_file, dtype=float, skiprows=1, delimiter=","
).reshape((-1,))
for name, value in zip(names, scalars):
self.add_scalar(name, value)
@deprecated(
reason="This Sample was written with plaid<=0.1.9, save it with plaid>=0.1.10 to have all features embedded in the CGNS tree",
version="0.1.10",
removal="0.2.0",
)
def _load_old_time_series(self, time_series_files: list[Path]):
for ts_fname in time_series_files:
names = np.loadtxt(ts_fname, dtype=str, max_rows=1, delimiter=",").reshape(
(-1,)
)
assert names[0] == "t"
times_and_val = np.loadtxt(ts_fname, dtype=float, skiprows=1, delimiter=",")
for i in range(times_and_val.shape[0]):
self.add_global(
name=names[1],
global_array=times_and_val[i, 1],
time=times_and_val[i, 0],
)
# # -------------------------------------------------------------------------#
def __str__(self) -> str:
"""Return a string representation of the sample.
Returns:
str: A string representation of the overview of sample content.
"""
# TODO rewrite using self.get_all_features_identifiers()
str_repr = "Sample("
# scalars
nb_scalars = len(self.get_scalar_names())
str_repr += f"{nb_scalars} scalar{'' if nb_scalars == 1 else 's'}, "
# fields
times = self.features.get_all_time_values()
nb_timestamps = len(times)
str_repr += f"{nb_timestamps} timestamp{'' if nb_timestamps == 1 else 's'}, "
field_names = set()
for time in times:
## Need to include all possible location within the count
base_names = self.features.get_base_names(time=time)
for bn in base_names:
zone_names = self.features.get_zone_names(base_name=bn)
for zn in zone_names:
for location in CGNS_FIELD_LOCATIONS:
field_names = field_names.union(
self.features.get_field_names(
location=location, zone_name=zn, base_name=bn, time=time
)
)
nb_fields = len(field_names)
str_repr += f"{nb_fields} field{'' if nb_fields == 1 else 's'}, "
if str_repr[-2:] == ", ":
str_repr = str_repr[:-2]
str_repr = str_repr + ")"
return str_repr
__repr__ = __str__
[docs]
def summarize(self) -> str:
"""Provide detailed summary of the Sample content, showing feature names and mesh information.
This provides more detailed information than the __repr__ method,
including the name of each feature.
Returns:
str: A detailed string representation of the sample content.
Example:
.. code-block:: bash
Sample Summary:
==================================================
Scalars (8):
- Pr: 0.9729006564945664
- Q: 0.2671142611487964
- Tr: 0.9983394202616822
- angle_in: 45.5066666666667
- angle_out: 61.89519547386746
- eth_is: 0.21238326882538008
- mach_out: 0.81003
- power: 0.0019118127462776008
Meshes (1 timestamps):
Time: 0.0
Base: Base_2_2
Nodes (36421)
Tags (6): Intrado (122), Extrado (122), Inflow (121), Outflow (121), Periodic_1 (120), Periodic_2 (238)
Fields (7): ro, sdf, rou, nut, mach, roe, rov
Elements (36000)
QUAD_4 (36000)
Base: Base_1_2
Nodes (244)
Fields (1): M_iso
Elements (242)
BAR_2 (242)
"""
summary = "Sample Summary:\n"
summary += "=" * 50 + "\n"
# Scalars with names
scalar_names = self.get_scalar_names()
if scalar_names:
summary += f"Scalars ({len(scalar_names)}):\n"
for name in scalar_names:
value = self.get_scalar(name)
summary += f" - {name}: {value}\n"
summary += "\n"
# Mesh information
times = self.features.get_all_time_values()
summary += f"Meshes ({len(times)} timestamps):\n"
if times:
for time in times:
summary += f" Time: {time}\n"
base_names = self.features.get_base_names(time=time)
for base_name in base_names:
summary += f" Base: {base_name}\n"
zone_names = self.features.get_zone_names(
base_name=base_name, time=time
)
for zone_name in zone_names:
summary += f" Zone: {zone_name}\n"
# Nodes, nodal tags and fields at verticies
nodes = self.get_nodes(
zone_name=zone_name, base_name=base_name, time=time
)
if nodes is not None:
nb_nodes = nodes.shape[0]
nodal_tags = self.features.get_nodal_tags(
zone_name=zone_name, base_name=base_name, time=time
)
summary += f" Nodes ({nb_nodes})\n"
if len(nodal_tags) > 0:
summary += f" Tags ({len(nodal_tags)}): {', '.join([f'{k} ({len(v)})' for k, v in nodal_tags.items()])}\n"
for location in CGNS_FIELD_LOCATIONS:
field_names = self.get_field_names(
location=location,
zone_name=zone_name,
base_name=base_name,
time=time,
)
if field_names:
summary += f" Location: {location}\n Fields ({len(field_names)}): {', '.join(field_names)}\n"
# Elements and fields at elements
elements = self.features.get_elements(
zone_name=zone_name, base_name=base_name, time=time
)
summary += f" Elements ({sum([v.shape[0] for v in elements.values()])})\n"
if len(elements) > 0:
summary += f" {', '.join([f'{k} ({v.shape[0]})' for k, v in elements.items()])}\n"
return summary
[docs]
def check_completeness(self) -> str:
"""Check the completeness of features in this sample.
Returns:
str: A report on feature completeness.
Example:
.. code-block:: bash
Sample Completeness Check:
==============================
Has scalars: True
Has meshes: True
Total unique fields: 8
Field names: M_iso, mach, nut, ro, roe, rou, rov, sdf
"""
report = "Sample Completeness Check:\n"
report += "=" * 30 + "\n"
# Check if sample has basic features
has_scalars = len(self.get_scalar_names()) > 0
has_meshes = len(self.features.get_all_time_values()) > 0
report += f"Has scalars: {has_scalars}\n"
report += f"Has meshes: {has_meshes}\n"
if has_meshes:
times = self.features.get_all_time_values()
total_fields = set()
for time in times:
base_names = self.features.get_base_names(time=time)
for base_name in base_names:
zone_names = self.features.get_zone_names(
base_name=base_name, time=time
)
for zone_name in zone_names:
for location in CGNS_FIELD_LOCATIONS:
field_names = self.get_field_names(
location=location,
zone_name=zone_name,
base_name=base_name,
time=time,
)
total_fields.update(field_names)
report += f"Total unique fields: {len(total_fields)}\n"
if total_fields:
report += f"Field names: {', '.join(sorted(total_fields))}\n"
return report