"""Module for implementing collections of features within a Sample."""
import logging
from copy import deepcopy
from typing import Optional
import CGNS.PAT.cgnskeywords as CGK
import CGNS.PAT.cgnslib as CGL
import CGNS.PAT.cgnsutils as CGU
import numpy as np
from CGNS.PAT.cgnsutils import __CHILDREN__, __NAME__
from plaid.constants import (
CGNS_ELEMENT_NAMES,
CGNS_FIELD_LOCATIONS,
)
from plaid.containers.managers.default_manager import DefaultManager
from plaid.containers.utils import (
_check_names,
_read_index,
get_feature_details_from_path,
)
from plaid.types import Array, CGNSNode, CGNSTree, Field
from plaid.utils import cgns_helper as CGH
from plaid.utils.deprecation import deprecated
logger = logging.getLogger(__name__)
[docs]
class SampleFeatures:
"""A container for meshes within a Sample.
Args:
data (dict[float, CGNSTree], optional): A dictionary mapping time steps to CGNSTrees. Defaults to None.
"""
def __init__(
self,
data: Optional[dict[float, CGNSTree]] = None,
):
[docs]
self.data: dict[float, CGNSTree] = data if data is not None else {}
[docs]
self.defaults = DefaultManager(self)
# -------------------------------------------------------------------------#
# Default time/base/zone management interface
# -------------------------------------------------------------------------#
[docs]
def set_default_time(self, time: float) -> None:
"""Set the default active time. Calls the DefaultManager to set the default time.
Args:
time (float): The time to set as the default active time.
"""
self.defaults.set_default_time(time)
[docs]
def set_default_base(self, base_name: str, time: Optional[float] = None) -> None:
"""Set the default active base. Calls the DefaultManager to set the default base.
Args:
base_name (str): The base name to set as the default active base.
time (float, optional): The time at which to set the default base. Defaults to None.
"""
self.defaults.set_default_base(base_name, time=time)
[docs]
def set_default_zone_base(
self, zone_name: str, base_name: str, time: Optional[float] = None
) -> None:
"""Set the default active zone within a base. Calls the DefaultManager to set the default zone and base.
Args:
zone_name (str): The zone name to set as the default active zone.
base_name (str): The base name in which the zone is located.
time (float, optional): The time at which to set the default zone and base. Defaults to None.
"""
self.defaults.set_default_zone_base(zone_name, base_name, time=time)
[docs]
def resolve_time(self, time: Optional[float] = None) -> float:
"""Get the resolved time assignment. Calls the DefaultManager to resolve the time.
Args:
time (float, optional): The time to resolve. Defaults to None.
Returns:
float: The resolved time.
"""
return self.defaults.resolve_time(time)
[docs]
def resolve_base(
self, base_name: Optional[str] = None, time: Optional[float] = None
) -> Optional[str]:
"""Get the resolved base assignment. Calls the DefaultManager to resolve the base.
Args:
base_name (str, optional): The base name to resolve. Defaults to None.
time (float, optional): The time at which to resolve the base. Defaults to None.
Returns:
Optional[str]: The resolved base name.
"""
return self.defaults.resolve_base(base_name=base_name, time=time)
[docs]
def resolve_zone(
self,
zone_name: Optional[str] = None,
base_name: Optional[str] = None,
time: Optional[float] = None,
) -> Optional[str]:
"""Get the resolved zone assignment. Calls the DefaultManager to resolve the zone.
Args:
zone_name (str, optional): The zone name to resolve. Defaults to None.
base_name (str, optional): The base name in which the zone is located. Defaults to None.
time (float, optional): The time at which to resolve the zone. Defaults to None.
Returns:
Optional[str]: The resolved zone name.
"""
return self.defaults.resolve_zone(
zone_name=zone_name, base_name=base_name, time=time
)
# -------------------------------------------------------------------------#
[docs]
def get_all_time_values(self) -> list[float]:
"""Retrieve all time steps corresponding to the meshes, if available.
Returns:
list[float]: A list of all available time steps.
"""
return list(self.data.keys())
@deprecated(
"`get_all_mesh_times()` is deprecated, use instead `get_all_time_values()`",
version="0.1.11",
removal="0.2.0",
)
[docs]
def get_all_mesh_times(self) -> list[float]:
"""DEPRECATED: Use :meth:`get_all_time_values` instead."""
return self.get_all_time_values() # pragma: no cover
[docs]
def init_tree(self, time: Optional[float] = None) -> CGNSTree:
"""Initialize a CGNS tree structure at a specified time step or create a new one if it doesn't exist.
Args:
time (float, optional): The time step for which to initialize the CGNS tree structure. If a specific time is not provided, the method will display the tree structure for the default time step.
Returns:
CGNSTree (list): The initialized or existing CGNS tree structure for the specified time step.
"""
time = self.resolve_time(time)
if not self.data:
self.data = {time: CGL.newCGNSTree()}
elif time not in self.data:
self.data[time] = CGL.newCGNSTree()
return self.data[time]
[docs]
def get_tree(
self, time: Optional[float] = None, only_mesh: bool = False
) -> Optional[CGNSTree]:
"""Retrieve the CGNS tree structure for a specified time step, if available.
Args:
time (float, optional): The time step for which to retrieve the CGNS tree structure. If a specific time is not provided, the method will display the tree structure for the default time step.
only_mesh (bool): If True, features of type global and fields are removed from the sample
Returns:
CGNSTree: The CGNS tree structure for the specified time step if available; otherwise, returns None.
"""
if not self.data:
return None
time = self.resolve_time(time)
tree = self.data[time]
if only_mesh:
flat_tree, cgns_types = CGH.flatten_cgns_tree(tree)
updated_flat_tree = {
path: value
for path, value in flat_tree.items()
if get_feature_details_from_path(path)["type"]
not in ["global", "field"]
}
tree = CGH.unflatten_cgns_tree(updated_flat_tree, cgns_types)
return tree
[docs]
def set_trees(self, meshes: dict[float, CGNSTree]) -> None:
"""Set all meshes with their corresponding time step.
Args:
meshes (dict[float,CGNSTree]): Collection of time step with its corresponding CGNSTree.
Raises:
KeyError: If there is already a CGNS tree set.
"""
if not self.data:
self.data = meshes
else:
raise KeyError(
"meshes is already set, you cannot overwrite it, delete it first or extend it with `Sample.add_tree`"
)
[docs]
def add_tree(
self, tree: CGNSTree, time: Optional[float] = None, in_place: bool = True
) -> CGNSTree:
"""Merge a CGNS tree into the tree stored at a given time.
If there is no tree at ``time``, ``tree`` is stored directly. Otherwise, Base
nodes from ``tree`` are appended only when their name does not already exist in
the destination tree. Bases with duplicate names are ignored and a warning is
emitted.
Args:
tree (CGNSTree): CGNS tree to add.
time (float, optional): Time step at which the tree is added. If omitted,
the default time resolution is used.
in_place (bool, optional): Controls ownership of the input tree. When
``True`` (default), the provided object may be stored/used directly.
When ``False``, the input tree is deep-copied before insertion.
Raises:
ValueError: If ``tree`` is an empty list.
Returns:
CGNSTree: The resulting tree for the resolved ``time``.
"""
if tree == []:
raise ValueError("CGNS Tree should not be an empty list")
if not in_place:
tree = deepcopy(tree)
def _iter_node_names(node: CGNSNode) -> list[str]:
names = []
if isinstance(node, list) and len(node) > 0:
if isinstance(node[0], str):
names.append(node[0])
if len(node) > 2 and isinstance(node[2], list):
for child in node[2]:
names.extend(_iter_node_names(child))
return names
all_names = _iter_node_names(tree)
if all_names and all_names[0] == "CGNSTree":
all_names = all_names[1:]
_check_names(all_names)
time = self.resolve_time(time)
if not self.data:
self.data = {time: tree}
elif time not in self.data:
self.data[time] = tree
else:
# TODO: gérer le cas où il y a des bases de mêmes noms... + merge
# récursif des nœuds
local_bases = self.get_base_names(time=time)
base_nodes = CGU.getNodesFromTypeSet(tree, "CGNSBase_t")
for _, node in base_nodes:
if node[__NAME__] not in local_bases: # pragma: no cover
self.data[time][__CHILDREN__].append(node)
else:
logger.warning(
f"base <{node[__NAME__]}> already exists in self._tree --> ignored"
)
base_names = self.get_base_names(time=time)
for base_name in base_names:
base_node = self.get_base(base_name, time=time)
if CGU.getValueByPath(base_node, "Time/TimeValues") is None:
baseIterativeData_node = CGL.newBaseIterativeData(base_node, "Time", 1)
TimeValues_node = CGU.newNode(
"TimeValues", None, [], CGK.DataArray_ts, baseIterativeData_node
)
CGU.setValue(TimeValues_node, np.array([time]))
return self.data[time]
[docs]
def del_tree(self, time: float) -> CGNSTree:
"""Delete the CGNS tree for a specific time.
Args:
time (float): The time step for which to delete the CGNS tree structure.
Raises:
KeyError: There is no CGNS tree in this Sample / There is no CGNS tree for the provided time.
Returns:
CGNSTree: The deleted CGNS tree.
"""
if not self.data:
raise KeyError("There is no CGNS tree in this sample.")
if time not in self.data:
raise KeyError(f"There is no CGNS tree for time {time}.")
return self.data.pop(time)
# -------------------------------------------------------------------------#
[docs]
def get_topological_dim(
self, base_name: Optional[str] = None, time: Optional[float] = None
) -> int:
"""Get the topological dimension of a base node at a specific time.
Args:
base_name (str, optional): The name of the base node for which to retrieve the topological dimension. Defaults to None.
time (float, optional): The time at which to retrieve the topological dimension. Defaults to None.
Raises:
ValueError: If there is no base node with the specified `base_name` at the given `time` in this sample.
Returns:
int: The topological dimension of the specified base node at the given time.
"""
# get_base will look for default time and base_name
base_node = self.get_base(base_name, time)
if base_node is None: # pragma: no cover
raise ValueError(
f"there is no base called {base_name} at the time {time} in this sample"
)
return base_node[1][0]
[docs]
def get_physical_dim(
self, base_name: Optional[str] = None, time: Optional[float] = None
) -> int:
"""Get the physical dimension of a base node at a specific time.
Args:
base_name (str, optional): The name of the base node for which to retrieve the topological dimension. Defaults to None.
time (float, optional): The time at which to retrieve the topological dimension. Defaults to None.
Raises:
ValueError: If there is no base node with the specified `base_name` at the given `time` in this sample.
Returns:
int: The topological dimension of the specified base node at the given time.
"""
base_node = self.get_base(base_name, time)
if base_node is None: # pragma: no cover
raise ValueError(
f"there is no base called {base_name} at the time {time} in this sample"
)
return base_node[1][1]
[docs]
def init_base(
self,
topological_dim: int,
physical_dim: int,
base_name: Optional[str] = None,
time: Optional[float] = None,
) -> CGNSNode:
"""Create a Base node named `base_name` if it doesn't already exists.
Args:
topological_dim (int): Cell dimension, see [CGNS standard](https://pycgns.github.io/PAT/lib.html#CGNS.PAT.cgnslib.newCGNSBase).
physical_dim (int): Ambient space dimension, see [CGNS standard](https://pycgns.github.io/PAT/lib.html#CGNS.PAT.cgnslib.newCGNSBase).
base_name (str): If not specified, uses `mesh_base_name` specified in Sample initialization. Defaults to None.
time (float, optional): The time at which to initialize the base. If a specific time is not provided, the method will display the tree structure for the default time step.
Returns:
CGNSNode: The created Base node.
"""
_check_names([base_name])
time = self.resolve_time(time)
if base_name is None:
base_name = "Base_" + str(topological_dim) + "_" + str(physical_dim)
self.init_tree(time)
if not (self.has_base(base_name, time)):
base_node = CGL.newCGNSBase(
self.data[time], base_name, topological_dim, physical_dim
)
base_names = self.get_base_names(time=time)
for base_name in base_names:
base_node = self.get_base(base_name, time=time)
if CGU.getValueByPath(base_node, "Time/TimeValues") is None:
base_iterative_data_node = CGL.newBaseIterativeData(
base_node, "Time", 1
)
time_values_node = CGU.newNode(
"TimeValues", None, [], CGK.DataArray_ts, base_iterative_data_node
)
CGU.setValue(time_values_node, np.array([time]))
return base_node
[docs]
def del_base(self, base_name: str, time: float) -> CGNSTree:
"""Delete a CGNS base node for a specific time.
Args:
base_name (str): The name of the base node to be deleted.
time (float): The time step for which to delete the CGNS base node.
Raises:
KeyError: There is no CGNS tree in this sample / There is no CGNS tree for the provided time.
KeyError: If there is no base node with the given base name or time.
Returns:
CGNSTree: The tree at the provided time (without the deleted node)
"""
if not self.data:
raise KeyError("There is no CGNS tree in this sample.")
if time not in self.data:
raise KeyError(f"There is no CGNS tree for time {time}.")
base_node = self.get_base(base_name, time)
mesh_tree = self.data[time]
if base_node is None:
raise KeyError(
f"There is no base node with name {base_name} for time {time}."
)
return CGU.nodeDelete(mesh_tree, base_node)
[docs]
def get_base_names(
self,
full_path: bool = False,
unique: bool = False,
time: Optional[float] = None,
) -> list[str]:
"""Return Base names.
Args:
full_path (bool, optional): If True, returns full paths instead of only Base names. Defaults to False.
unique (bool, optional): If True, returns unique names instead of potentially duplicated names. Defaults to False.
time (float, optional): The time at which to check for the Base. If a specific time is not provided, the method will display the tree structure for the default time step.
Returns:
list[str]:
"""
time = self.resolve_time(time)
if self.data and time in self.data and self.data[time] is not None:
return CGH.get_base_names(
self.data[time], full_path=full_path, unique=unique
)
else:
return []
[docs]
def has_base(self, base_name: str, time: Optional[float] = None) -> bool:
"""Check if a CGNS tree contains a Base with a given name at a specified time.
Args:
base_name (str): The name of the Base to check for in the CGNS tree.
time (float, optional): The time at which to check for the Base. If a specific time is not provided, the method will display the tree structure for the default time step.
Returns:
bool: `True` if the CGNS tree has a Base called `base_name`, else return `False`.
"""
# get_base_names will look for the default time
return base_name in self.get_base_names(time=time)
[docs]
def has_globals(self, time: Optional[float] = None) -> bool:
"""Check if a CGNS tree contains globals a given name at a specified time.
Args:
time (float, optional): The time at which to check for the Base. If a specific time is not provided, the method will display the tree structure for the default time step.
Returns:
bool: `True` if the CGNS tree has a Base called `Globals`, else return `False`.
"""
return "Global" in self.get_base_names(time=time)
[docs]
def get_base(
self, base_name: Optional[str] = None, time: Optional[float] = None
) -> CGNSNode:
"""Return Base node named `base_name`.
If `base_name` is not specified, checks that there is **at most** one base, else raises an error.
Args:
base_name (str, optional): The name of the Base node to retrieve. Defaults to None. Defaults to None.
time (float, optional): Time at which you want to retrieve the Base node. If a specific time is not provided, the method will display the tree structure for the default time step.
Returns:
CGNSNode or None: The Base node with the specified name or None if it is not found.
"""
time = self.resolve_time(time)
if time not in self.data or self.data[time] is None:
logger.warning(f"No mesh exists in the sample at {time=}")
return None
if base_name != "Global":
base_name = self.resolve_base(base_name, time)
return CGU.getNodeByPath(self.data[time], f"/CGNSTree/{base_name}")
# -------------------------------------------------------------------------#
[docs]
def init_zone(
self,
zone_shape: Array,
zone_type: str = CGK.Unstructured_s,
zone_name: Optional[str] = None,
base_name: Optional[str] = None,
time: Optional[float] = None,
) -> CGNSNode:
"""Initialize a new zone within a CGNS base.
Args:
zone_shape (Array): An array specifying the shape or dimensions of the zone.
zone_type (str, optional): The type of the zone. Defaults to CGK.Unstructured_s.
zone_name (str, optional): The name of the zone to initialize. If not provided, uses `mesh_zone_name` specified in Sample initialization. Defaults to None.
base_name (str, optional): The name of the base to which the zone will be added. If not provided, the zone will be added to the currently active base. Defaults to None.
time (float, optional): The time at which to initialize the zone. If a specific time is not provided, the method will display the tree structure for the default time step.
Raises:
KeyError: If the specified base does not exist. You can create a base using `Sample.init_base(base_name)`.
Returns:
CGLNode: The newly initialized zone node within the CGNS tree.
"""
_check_names([zone_name])
# init_tree will look for default time
self.init_tree(time)
# get_base will look for default base_name and time
base_node = self.get_base(base_name, time)
if base_node is None:
raise KeyError(
f"there is no base <{base_name}>, you should first create one with `Sample.init_base({base_name=})`"
)
zone_name = self.resolve_zone(zone_name, base_name, time)
if zone_name is None:
zone_name = "Zone"
zone_node = CGL.newZone(base_node, zone_name, zone_shape, zone_type)
return zone_node
[docs]
def del_zone(self, zone_name: str, base_name: str, time: float) -> CGNSTree:
"""Delete a zone within a CGNS base.
Args:
zone_name (str): The name of the zone to be deleted.
base_name (str, optional): The name of the base from which the zone will be deleted. If not provided, the zone will be deleted from the currently active base. Defaults to None.
time (float, optional): The time step for which to delete the zone. Defaults to None.
Raises:
KeyError: There is no CGNS tree in this sample / There is no CGNS tree for the provided time.
KeyError: If there is no base node with the given base name or time.
Returns:
CGNSTree: The tree at the provided time (without the deleted node)
"""
if self.data is None: # pragma: no cover
raise KeyError("There is no CGNS tree in this sample.")
if time not in self.data:
raise KeyError(f"There is no CGNS tree for time {time}.")
zone_node = self.get_zone(zone_name=zone_name, base_name=base_name, time=time)
mesh_tree = self.data[time]
if zone_node is None:
raise KeyError(
f"There is no zone node with name {zone_name} or base node with name {base_name}."
)
return CGU.nodeDelete(mesh_tree, zone_node)
[docs]
def get_zone_names(
self,
base_name: Optional[str] = None,
full_path: bool = False,
unique: bool = False,
time: Optional[float] = None,
) -> list[str]:
"""Return list of Zone names in Base named `base_name` with specific time.
Args:
base_name (str, optional): Name of Base where to search Zones. If not specified, checks if there is at most one Base. Defaults to None.
full_path (bool, optional): If True, returns full paths instead of only Zone names. Defaults to False.
unique (bool, optional): If True, returns unique names instead of potentially duplicated names. Defaults to False.
time (float, optional): The time at which to check for the Zone. If a specific time is not provided, the method will display the tree structure for the default time step.
Returns:
list[str]: List of Zone names in Base named `base_name`, empty if there is none or if the Base doesn't exist.
"""
zone_paths = []
# get_base will look for default base_name and time
base_node = self.get_base(base_name, time)
if base_node is not None:
z_paths = CGU.getPathsByTypeSet(base_node, "CGNSZone_t")
for pth in z_paths:
s_pth = pth.split("/")
assert len(s_pth) == 2
assert s_pth[0] == base_name or base_name is None
if full_path:
zone_paths.append(pth)
else:
zone_paths.append(s_pth[1])
if unique:
return list(set(zone_paths))
else:
return zone_paths
[docs]
def has_zone(
self,
zone_name: str,
base_name: Optional[str] = None,
time: Optional[float] = None,
) -> bool:
"""Check if the CGNS tree contains a Zone with the specified name within a specific Base and time.
Args:
zone_name (str): The name of the Zone to check for.
base_name (str, optional): The name of the Base where the Zone should be located. If not provided, the function checks all bases. Defaults to None.
time (float, optional): The time at which to check for the Zone. If a specific time is not provided, the method will display the tree structure for the default time step.
Returns:
bool: `True` if the CGNS tree has a Zone called `zone_name` in a Base called `base_name`, else return `False`.
"""
# get_zone_names will look for default base_name and time
return zone_name in self.get_zone_names(base_name, time=time)
[docs]
def get_zone(
self,
zone_name: Optional[str] = None,
base_name: Optional[str] = None,
time: Optional[float] = None,
) -> CGNSNode:
"""Retrieve a CGNS Zone node by its name within a specific Base and time.
Args:
zone_name (str, optional): The name of the Zone node to retrieve. If not specified, checks that there is **at most** one zone in the base, else raises an error. Defaults to None.
base_name (str, optional): The Base in which to seek to zone retrieve. If not specified, checks that there is **at most** one base, else raises an error. Defaults to None.
time (float, optional): Time at which you want to retrieve the Zone node.
Returns:
CGNSNode: Returns a CGNS Zone node if found; otherwise, returns None.
"""
# get_base will look for default base_name and time
base_node = self.get_base(base_name, time)
if base_node is None:
# logger.warning(f"No base with name {base_name} in this tree")
return None
# _zone_attribution will look for default base_name
zone_name = self.resolve_zone(zone_name, base_name, time)
if zone_name is None:
# logger.warning(f"No zone with name {zone_name} in this base ({base_name})")
return None
return CGU.getNodeByPath(base_node, zone_name)
[docs]
def get_zone_type(
self,
zone_name: Optional[str] = None,
base_name: Optional[str] = None,
time: Optional[float] = None,
) -> str:
"""Get the type of a specific zone at a specified time.
Args:
zone_name (str, optional): The name of the zone whose type you want to retrieve. Default is None.
base_name (str, optional): The name of the base in which the zone is located. Default is None.
time (float, optional): The timestamp for which you want to retrieve the zone type. Default is 0.0.
Raises:
KeyError: Raised when the specified zone or base does not exist. You should first create the base/zone using `Sample.init_zone(zone_name, base_name)`.
Returns:
str: The type of the specified zone as a string.
"""
# get_zone will look for default base_name, zone_name and time
zone_node = self.get_zone(zone_name=zone_name, base_name=base_name, time=time)
if zone_node is None:
raise KeyError(
f"there is no base/zone <{base_name}/{zone_name}>, you should first create one with `Sample.init_zone({zone_name=},{base_name=})`"
)
return CGU.getValueByPath(zone_node, "ZoneType").tobytes().decode()
# -------------------------------------------------------------------------#
# -------------------------------------------------------------------------#
[docs]
def get_global(
self,
name: str,
time: Optional[float] = None,
) -> Optional[Array]:
"""Retrieve a global array by name at a specified time.
Args:
name (str): The name of the global array to retrieve.
time (float, optional): The time step for which to retrieve the global array. If not provided, uses the default time.
Returns:
Optional[Array]: The global array if found, otherwise None. Returns a scalar if the array has size 1.
"""
time = self.resolve_time(time)
if not self.has_globals(time):
return None
global_ = CGU.getValueByPath(self.data[time], "Global/" + name)
if global_ is None:
return None
if getattr(global_, "size", None) == 1:
return global_[0]
return global_
[docs]
def add_global(
self,
name: str,
global_array: Array,
time: Optional[float] = None,
) -> None:
"""Add or update a global array at a specified time.
Args:
name (str): The name of the global array to add or update.
global_array (Array): The array to store.
time (float, optional): The time step for which to add the global array. If not provided, uses the default time.
Note:
If the "Global" base does not exist, it will be created.
If an array with the same name exists, its value will be updated.
"""
_check_names(name)
base_names = self.get_base_names(time=time)
if "Global" in base_names:
base_node = self.get_base("Global", time=time)
else:
base_node = self.init_base(1, 1, "Global", time)
if isinstance(global_array, str): # pragma: no cover
global_array = np.frombuffer(
global_array.encode("ascii"), dtype="S1", count=len(global_array)
)
if CGU.getValueByPath(base_node, name) is None:
CGL.newDataArray(base_node, name, value=global_array)
else:
global_node = CGU.getNodeByPath(base_node, f"{name}")
CGU.setValue(global_node, np.asfortranarray(global_array))
[docs]
def del_global(
self,
name: str,
time: Optional[float] = None,
) -> Array:
"""Delete a global array by name at a specified time.
Args:
name (str): The name of the global array to delete.
time (float, optional): The time step for which to delete the global array. If not provided, uses the default time.
Raises:
KeyError: If the global array does not exist at the specified time.
Returns:
Array: The value of the deleted global array.
"""
val = self.get_global(name, time)
if val is None:
raise KeyError(
f"There is no global with name {name} at the specified time."
)
base_node = self.get_base("Global", time=time)
CGU.nodeDelete(base_node, name)
return val
[docs]
def get_global_names(self, time: Optional[float] = None) -> list[str]:
"""Return a list of all global array names at the specified time(s).
Args:
time (float, optional): The time step for which to retrieve global names. If not provided, returns names for all available times.
Returns:
list[str]: List of global array names (excluding "Time" arrays).
"""
if time is None:
all_times = self.get_all_time_values()
else:
all_times = [time]
global_names = []
for time in all_times:
base_names = self.get_base_names(time=time)
if "Global" in base_names:
base_node = self.get_base("Global", time=time)
if base_node is not None:
global_paths = CGU.getAllNodesByTypeSet(base_node, ["DataArray_t"])
for path in global_paths:
if "Time" not in path:
global_names.append(CGU.getNodeByPath(base_node, path)[0])
return global_names
# -------------------------------------------------------------------------#
[docs]
def get_nodes(
self,
zone_name: Optional[str] = None,
base_name: Optional[str] = None,
time: Optional[float] = None,
) -> Optional[Array]:
"""Get grid node coordinates from a specified base, zone, and time.
Args:
zone_name (str, optional): The name of the zone to search for. Defaults to None.
base_name (str, optional): The name of the base to search for. Defaults to None.
time (float, optional): The time value to consider when searching for the zone. If a specific time is not provided, the method will display the tree structure for the default time step.
Raises:
TypeError: Raised if multiple <GridCoordinates> nodes are found. Only one is expected.
Returns:
Optional[Array]: A NumPy array containing the grid node coordinates.
If no matching zone or grid coordinates are found, None is returned.
Seealso:
This function can also be called using `get_points()` or `get_vertices()`.
"""
# get_zone will look for default base_name, zone_name and time
search_node = self.get_zone(zone_name=zone_name, base_name=base_name, time=time)
if search_node is None:
return None
grid_paths = CGU.getAllNodesByTypeSet(search_node, ["GridCoordinates_t"])
if len(grid_paths) == 1:
grid_node = CGU.getNodeByPath(search_node, grid_paths[0])
array_x = CGU.getValueByPath(grid_node, "GridCoordinates/CoordinateX")
array_y = CGU.getValueByPath(grid_node, "GridCoordinates/CoordinateY")
array_z = CGU.getValueByPath(grid_node, "GridCoordinates/CoordinateZ")
if array_z is None:
array = np.concatenate(
(array_x.reshape((-1, 1)), array_y.reshape((-1, 1))), axis=1
)
else:
array = np.concatenate(
(
array_x.reshape((-1, 1)),
array_y.reshape((-1, 1)),
array_z.reshape((-1, 1)),
),
axis=1,
)
return array
elif len(grid_paths) > 1: # pragma: no cover
raise TypeError(
f"Found {len(grid_paths)} <GridCoordinates> nodes, should find only one"
)
[docs]
get_vertices = get_nodes
[docs]
def set_nodes(
self,
nodes: Array,
zone_name: Optional[str] = None,
base_name: Optional[str] = None,
time: Optional[float] = None,
) -> None:
"""Set the coordinates of nodes for a specified base and zone at a given time.
Args:
nodes (Array): A numpy array containing the new node coordinates.
zone_name (str, optional): The name of the zone where the nodes should be updated. Defaults to None.
base_name (str, optional): The name of the base where the nodes should be updated. Defaults to None.
time (float, optional): The time at which the node coordinates should be updated. If a specific time is not provided, the method will display the tree structure for the default time step.
Raises:
KeyError: Raised if the specified base or zone do not exist. You should first
create the base and zone using the `Sample.init_zone(zone_name,base_name)` method.
Seealso:
This function can also be called using `set_points()` or `set_vertices()`
"""
# get_zone will look for default base_name, zone_name and time
zone_node = self.get_zone(zone_name=zone_name, base_name=base_name, time=time)
if zone_node is None:
raise KeyError(
f"there is no base/zone <{base_name}/{zone_name}>, you should first create one with `Sample.init_zone({zone_name=},{base_name=})`"
)
# Check if GridCoordinates_t node exists
gc_nodes = [
child for child in zone_node[2] if child[0] in CGK.GridCoordinates_ts
]
if gc_nodes:
grid_coords_node = gc_nodes[0]
coord_type = [CGK.CoordinateX_s, CGK.CoordinateY_s, CGK.CoordinateZ_s]
for i_dim in range(nodes.shape[-1]):
name = coord_type[i_dim]
# Remove existing coordinate if present
if gc_nodes:
grid_coords_node[2] = [
child for child in grid_coords_node[2] if child[0] != name
]
# Create new coordinate
CGL.newCoordinates(zone_node, name, np.asfortranarray(nodes[..., i_dim]))
[docs]
set_vertices = set_nodes
# -------------------------------------------------------------------------#
[docs]
def get_elements(
self,
zone_name: Optional[str] = None,
base_name: Optional[str] = None,
time: Optional[float] = None,
) -> dict[str, Array]:
"""Retrieve element connectivity data for a specified zone, base, and time.
Args:
zone_name (str, optional): The name of the zone for which element connectivity data is requested. Defaults to None, indicating the default zone.
base_name (str, optional): The name of the base for which element connectivity data is requested. Defaults to None, indicating the default base.
time (float, optional): The time at which element connectivity data is requested. If a specific time is not provided, the method will display the tree structure for the default time step.
Returns:
dict[str, Array]: A dictionary where keys are element type names and values are NumPy arrays representing the element connectivity data.
The NumPy arrays have shape (num_elements, num_nodes_per_element), and element indices are 0-based.
"""
# get_zone will look for default base_name, zone_name and time
zone_node = self.get_zone(zone_name=zone_name, base_name=base_name, time=time)
if zone_node is None:
return {}
elements = {}
elem_paths = CGU.getAllNodesByTypeSet(zone_node, ["Elements_t"])
for elem in elem_paths:
elem_node = CGU.getNodeByPath(zone_node, elem)
val = CGU.getValue(elem_node)
elem_type = CGNS_ELEMENT_NAMES[val[0]]
elem_size = int(elem_type.split("_")[-1])
# elem_range = CGU.getValueByPath(
# elem_node, "ElementRange"
# ) # TODO elem_range is unused
# -1 is to get back indexes starting at 0
elements[elem_type] = (
CGU.getValueByPath(elem_node, "ElementConnectivity").reshape(
(-1, elem_size)
)
- 1
)
return elements
# -------------------------------------------------------------------------#
[docs]
def get_field_names(
self,
location: str = None,
zone_name: Optional[str] = None,
base_name: Optional[str] = None,
time: Optional[float] = None,
) -> list[str]:
"""Get a set of field names associated with a specified zone, base, location, and/or time.
For each argument that is not specified, the method will search for fields in all available values for this argument.
Args:
location (str, optional): The desired grid location where to search for. Defaults to None.
Possible values : :py:const:`plaid.constants.CGNS_FIELD_LOCATIONS`
zone_name (str, optional): The name of the zone to search for. Defaults to None.
base_name (str, optional): The name of the base to search for. Defaults to None.
time (float, optional): The specific time at which to search for. Defaults to None.
Returns:
set[str]: A set containing the names of the fields that match the specified criteria.
"""
def get_field_names_one_time_base_zone_location(
location: str, zone_name: str, base_name: str, time: float
) -> list[str]:
# get_zone will look for default zone_name, base_name, time
search_node = self.get_zone(
zone_name=zone_name, base_name=base_name, time=time
)
if search_node is None: # pragma: no cover
return []
names = []
solution_paths = CGU.getPathsByTypeSet(search_node, [CGK.FlowSolution_t])
for f_path in solution_paths:
if (
CGU.getValueByPath(search_node, f_path + "/GridLocation")
.tobytes()
.decode()
!= location
):
continue
f_node = CGU.getNodeByPath(search_node, f_path)
for path in CGU.getPathByTypeFilter(f_node, CGK.DataArray_t):
field_name = path.split("/")[-1]
if not (field_name == "GridLocation"):
names.append(field_name)
return names
field_names = []
times = [time] if time is not None else self.get_all_time_values()
for _time in times:
base_names = (
[base_name]
if base_name is not None
else self.get_base_names(time=_time)
)
for _base_name in base_names:
zone_names = (
[zone_name]
if zone_name is not None
else self.get_zone_names(base_name=_base_name, time=_time)
)
for _zone_name in zone_names:
locations = (
[location] if location is not None else CGNS_FIELD_LOCATIONS
)
for _location in locations:
field_names += get_field_names_one_time_base_zone_location(
location=_location,
zone_name=_zone_name,
base_name=_base_name,
time=_time,
)
field_names = sorted(set(field_names))
return field_names
[docs]
def get_field(
self,
name: str,
location: str = "Vertex",
zone_name: Optional[str] = None,
base_name: Optional[str] = None,
time: Optional[float] = None,
) -> Field:
"""Retrieve a field with a specified name from a given zone, base, location, and time.
Args:
name (str): The name of the field to retrieve.
location (str, optional): The location at which to retrieve the field. Defaults to 'Vertex'.
Possible values : :py:const:`plaid.constants.CGNS_FIELD_LOCATIONS`
zone_name (str, optional): The name of the zone to search for. Defaults to None.
base_name (str, optional): The name of the base to search for. Defaults to None.
time (float, optional): The time value to consider when searching for the field. If a specific time is not provided, the method will display the tree structure for the default time step.
Returns:
Field: A set containing the names of the fields that match the specified criteria.
"""
# get_zone will look for default time
search_node = self.get_zone(zone_name=zone_name, base_name=base_name, time=time)
if search_node is None:
return None
full_field = []
solution_paths = CGU.getPathsByTypeSet(search_node, [CGK.FlowSolution_t])
for f_path in solution_paths:
grid_loc = CGU.getValueByPath(search_node, f_path + "/GridLocation")
if grid_loc.tobytes().decode() != location:
continue
field = CGU.getValueByPath(search_node, f_path + "/" + name)
if field is not None and field.size > 0:
full_field.append(field)
if not full_field:
return None
if len(full_field) == 1:
return full_field[0]
raise ValueError(
f"Multiple fields found with name {name} at location {location}."
) # pragma: no cover
[docs]
def add_field(
self,
name: str,
field: Field,
location: str = "Vertex",
zone_name: Optional[str] = None,
base_name: Optional[str] = None,
time: Optional[float] = None,
warning_overwrite: bool = True,
) -> None:
"""Add a field to a specified zone in the grid.
Args:
name (str): The name of the field to be added.
field (Field): The field data to be added. Integer arrays with dtype
``np.int32`` or ``np.int64`` are automatically converted to
``np.float64`` (with a warning) for CGNS compatibility.
location (str, optional): The grid location where the field will be stored. Defaults to 'Vertex'.
Possible values : :py:const:`plaid.constants.CGNS_FIELD_LOCATIONS`
zone_name (str, optional): The name of the zone where the field will be added. Defaults to None.
base_name (str, optional): The name of the base where the zone is located. Defaults to None.
time (float, optional): The time associated with the field. Defaults to 0.
warning_overwrite (bool, optional): Show warning if a preexisting field is being overwritten. Defaults to True.
Raises:
KeyError: Raised if the specified zone does not exist in the given base.
"""
assert isinstance(field, np.ndarray)
_check_names([name])
# init_tree will look for default time
self.init_tree(time)
# get_zone will look for default zone_name, base_name and time
zone_node = self.get_zone(zone_name=zone_name, base_name=base_name, time=time)
if zone_node is None:
raise KeyError(
f"there is no Zone with name {zone_name} in base {base_name}. Did you check topological and physical dimensions ?"
)
if field.dtype in (np.int32, np.int64):
logger.warning(
f"(add_field) provided field is of type {field.dtype} and has been converted to np.float64 for CGNS compatibility."
)
field = field.astype(np.float64)
# solution_paths = CGU.getPathsByTypeOrNameList(self._tree, '/.*/.*/FlowSolution_t')
solution_paths = CGU.getPathsByTypeSet(zone_node, "FlowSolution_t")
has_FlowSolution_with_location = False
if len(solution_paths) > 0:
for s_path in solution_paths:
val_location = (
CGU.getValueByPath(zone_node, f"{s_path}/GridLocation")
.tobytes()
.decode()
)
if val_location == location:
has_FlowSolution_with_location = True
if not (has_FlowSolution_with_location):
CGL.newFlowSolution(zone_node, f"{location}Fields", gridlocation=location)
solution_paths = CGU.getPathsByTypeSet(zone_node, "FlowSolution_t")
assert len(solution_paths) > 0
for s_path in solution_paths:
val_location = (
CGU.getValueByPath(zone_node, f"{s_path}/GridLocation")
.tobytes()
.decode()
)
if val_location != location:
continue
field_node = CGU.getNodeByPath(zone_node, f"{s_path}/{name}")
if field_node is None:
flow_solution_node = CGU.getNodeByPath(zone_node, s_path)
# CGL.newDataArray(flow_solution_node, name, np.asfortranarray(np.copy(field), dtype=np.float64))
CGL.newDataArray(flow_solution_node, name, np.asfortranarray(field))
# res = [name, np.asfortranarray(field, dtype=np.float32), [], 'DataArray_t']
# print(field.shape)
# flow_solution_node[2].append(res)
else:
if warning_overwrite:
logger.warning(
f"field node with name {name} already exists -> data will be replaced"
)
CGU.setValue(field_node, np.asfortranarray(field))
[docs]
def del_field(
self,
name: str,
location: str = "Vertex",
zone_name: Optional[str] = None,
base_name: Optional[str] = None,
time: Optional[float] = None,
) -> CGNSTree:
"""Delete a field with specified name in the mesh.
Args:
name (str): The name of the field to be deleted.
location (str, optional): The grid location where the field is stored. Defaults to 'Vertex'.
Possible values : :py:const:`plaid.constants.CGNS_FIELD_LOCATIONS`
zone_name (str, optional): The name of the zone from which the field will be deleted. Defaults to None.
base_name (str, optional): The name of the base where the zone is located. Defaults to None.
time (float, optional): The time associated with the field. Defaults to None.
Raises:
KeyError: Raised if the specified zone or field does not exist in the given base.
Returns:
CGNSTree: The tree at the provided time (without the deleted node)
"""
# get_zone will look for default zone_name, base_name, and time
zone_node = self.get_zone(zone_name=zone_name, base_name=base_name, time=time)
time = self.resolve_time(time)
mesh_tree = self.data[time]
if zone_node is None:
raise KeyError(
f"There is no Zone with name {zone_name} in base {base_name}."
)
solution_paths = CGU.getPathsByTypeSet(zone_node, [CGK.FlowSolution_t])
updated_tree = None
for s_path in solution_paths:
if (
CGU.getValueByPath(zone_node, f"{s_path}/GridLocation")
.tobytes()
.decode()
== location
):
field_node = CGU.getNodeByPath(zone_node, f"{s_path}/{name}")
if field_node is not None:
updated_tree = CGU.nodeDelete(mesh_tree, field_node)
# If the function reaches here, the field was not found
if updated_tree is None:
raise KeyError(f"There is no field with name {name} in the specified zone.")
return updated_tree
[docs]
def show_tree(self, time: Optional[float] = None) -> None:
"""Display the structure of the CGNS tree for a specified time.
Args:
time (float, optional): The time step for which you want to display the CGNS tree structure. Defaults to None. If a specific time is not provided, the method will display the tree structure for the default time step.
"""
time = self.resolve_time(time)
if self.data is not None:
CGH.show_cgns_tree(self.data.get(time))