PLAID storage reader module.
This module provides high-level functions for loading PLAID datasets from local disk or
Hugging Face Hub. It supports multiple storage backends including CGNS, HF Datasets,
and Zarr, providing a unified interface for data access and conversion.
Key features:
- Unified interface for loading datasets across different backends
- Local disk and streaming Hub access
- Automatic backend detection and converter creation
- Sample conversion between storage formats and PLAID objects
plaid.storage.reader.Converter
Converter(
backend,
flat_cst,
cgns_types,
variable_features,
constant_features,
num_samples,
)
Converter class for transforming samples between storage and PLAID formats.
This class provides methods to convert samples between backend-specific storage formats
and PLAID Sample objects. It handles the schema transformations and metadata required
for proper data conversion.
Initialize a :class:Converter.
Parameters:
-
backend
(str)
–
The storage backend ('hf_datasets', 'zarr', or 'cgns').
-
flat_cst
(Any)
–
Flattened constants for the dataset.
-
cgns_types
(Any)
–
-
variable_features
(Any)
–
Set of variable feature names.
-
constant_features
(Any)
–
Set of constant feature names.
-
num_samples
(Any)
–
Mapping providing the number of samples for each split.
Source code in plaid/storage/reader.py
| def __init__(
self,
backend: str,
flat_cst: Any,
cgns_types: Any,
variable_features: Any,
constant_features: Any,
num_samples: Any,
) -> None:
"""Initialize a :class:`Converter`.
Args:
backend: The storage backend ('hf_datasets', 'zarr', or 'cgns').
flat_cst: Flattened constants for the dataset.
cgns_types: CGNS type information.
variable_features: Set of variable feature names.
constant_features: Set of constant feature names.
num_samples: Mapping providing the number of samples for each split.
"""
self.backend = backend
self.backend_spec = get_backend(backend)
self.flat_cst = flat_cst
self.cgns_types = cgns_types
self.variable_features = set(variable_features)
self.constant_features = set(constant_features)
self.num_samples = num_samples
|
plaid.storage.reader.Converter.to_dict
to_dict(dataset, idx, features=None, indexers=None)
Convert a dataset sample to dictionary format.
Parameters:
-
dataset
(Any)
–
The dataset object containing the sample.
-
idx
(int)
–
Index of the sample to convert.
-
features
(Optional[list[str]], default:
None
)
–
Optional list of feature names to include from the variable fields.
If None, all variable features available for the backend are included.
-
indexers
(Optional[dict[str, Any]], default:
None
)
–
Optional mapping feature_path -> indexer used to extract only
selected indices inside variable features. Indexing semantics are
backend-dependent and ignored for non-requested features.
Returns:
-
dict ( dict[float, dict[str, Any]]
) –
Sample data in dictionary format.
Raises:
-
ValueError
–
If called with CGNS backend.
Source code in plaid/storage/reader.py
| def to_dict(
self,
dataset: Any,
idx: int,
features: Optional[list[str]] = None,
indexers: Optional[dict[str, Any]] = None,
) -> dict[float, dict[str, Any]]:
"""Convert a dataset sample to dictionary format.
Args:
dataset: The dataset object containing the sample.
idx: Index of the sample to convert.
features: Optional list of feature names to include from the variable fields.
If None, all variable features available for the backend are included.
indexers: Optional mapping ``feature_path -> indexer`` used to extract only
selected indices inside variable features. Indexing semantics are
backend-dependent and ignored for non-requested features.
Returns:
dict: Sample data in dictionary format.
Raises:
ValueError: If called with CGNS backend.
"""
if self.backend_spec.to_var_sample_dict is None: # pragma: no cover
raise ValueError(
f"Converter.to_dict not available for {self.backend} backend"
)
if features:
features = update_features_for_CGNS_compatibility(
features,
self.constant_features,
self.variable_features,
)
req_var_feat = [f for f in features if f in self.variable_features]
else:
req_var_feat = None
if indexers is not None:
unknown = set(indexers.keys()) - self.variable_features
if unknown:
raise KeyError(
f"Indexers contain unknown variable features: {sorted(unknown)}"
)
if req_var_feat is not None:
not_requested = set(indexers.keys()) - set(req_var_feat)
if not_requested:
raise KeyError(
"Indexers contain features not present in requested variable "
f"features: {sorted(not_requested)}"
)
var_sample_dict = self.backend_spec.to_var_sample_dict(
dataset, idx, features=req_var_feat, indexers=indexers
)
return to_sample_dict(var_sample_dict, self.flat_cst, self.cgns_types, features)
|
plaid.storage.reader.Converter.to_plaid
to_plaid(dataset, idx, features=None, indexers=None)
Convert a dataset sample to PLAID Sample object.
Parameters:
-
dataset
(Any)
–
The dataset object containing the sample.
-
idx
(int)
–
Index of the sample to convert.
-
features
(Optional[list[str]], default:
None
)
–
Optional list of feature names to include from the variable fields.
If None, all variable features available for the backend are included.
Features are retreated based on self.constant_features and self.variable_features to satisfy the CGNS conventions.
-
indexers
(Optional[dict[str, Any]], default:
None
)
–
Optional mapping feature_path -> indexer used to extract only
selected indices inside variable features.
Returns:
Source code in plaid/storage/reader.py
| def to_plaid(
self,
dataset: Any,
idx: int,
features: Optional[list[str]] = None,
indexers: Optional[dict[str, Any]] = None,
) -> Sample:
"""Convert a dataset sample to PLAID Sample object.
Args:
dataset: The dataset object containing the sample.
idx: Index of the sample to convert.
features: Optional list of feature names to include from the variable fields.
If None, all variable features available for the backend are included.
Features are retreated based on self.constant_features and self.variable_features to satisfy the CGNS conventions.
indexers: Optional mapping ``feature_path -> indexer`` used to extract only
selected indices inside variable features.
Returns:
Sample: A PLAID Sample object.
"""
# Note: we deliberately do NOT call
# ``update_features_for_CGNS_compatibility`` here. ``to_dict`` runs it
# once for non-CGNS backends, and the CGNS branch ignores ``features``
# entirely. Calling the helper twice used to break feature filtering
# because its missing-key check only validates the *input* list while
# the helper itself appends auxiliary paths (parent FlowSolution,
# ``GridLocation``, ``Base_times``, ``ZoneType``, ...) that may not
# be declared in ``constant_features`` / ``variable_features``. On
# the second call those additions look "missing" and the helper
# raises ``KeyError("Missing features in dataset/converter: ...")``.
if self.backend != "cgns":
sample_dict = self.to_dict(dataset, idx, features, indexers=indexers)
return to_plaid_sample(sample_dict, self.cgns_types)
else:
return dataset[idx]
|
plaid.storage.reader.Converter.sample_to_dict
Convert a PLAID Sample to dictionary format.
Parameters:
-
sample
(Any)
–
The PLAID Sample object to convert.
Returns:
-
dict ( dict[float, dict[str, Any]]
) –
Sample data in dictionary format.
Raises:
-
ValueError
–
If called with CGNS backend.
Source code in plaid/storage/reader.py
| def sample_to_dict(self, sample: Any) -> dict[float, dict[str, Any]]:
"""Convert a PLAID Sample to dictionary format.
Args:
sample: The PLAID Sample object to convert.
Returns:
dict: Sample data in dictionary format.
Raises:
ValueError: If called with CGNS backend.
"""
if self.backend_spec.sample_to_var_sample_dict is None: # pragma: no cover
raise ValueError(
f"Converter.sample_to_var_sample_dict not available for {self.backend} backend"
)
var_sample_dict = self.backend_spec.sample_to_var_sample_dict(sample)
return to_sample_dict(var_sample_dict, self.flat_cst, self.cgns_types)
|
plaid.storage.reader.Converter.sample_to_plaid
Convert a sample to PLAID format (identity function for most backends).
Parameters:
-
sample
(Any)
–
The sample object to convert.
Returns:
Source code in plaid/storage/reader.py
| def sample_to_plaid(self, sample: Any) -> Sample:
"""Convert a sample to PLAID format (identity function for most backends).
Args:
sample: The sample object to convert.
Returns:
Sample: A PLAID Sample object.
"""
if self.backend != "cgns":
sample_dict = self.sample_to_dict(sample)
return to_plaid_sample(sample_dict, self.cgns_types)
else:
return sample
|
plaid.storage.reader.Converter.plaid_to_dict
plaid_to_dict(plaid_sample)
Convert a PLAID Sample to dictionary format for storage.
Parameters:
-
plaid_sample
(Sample)
–
The PLAID Sample object to convert.
Returns:
-
dict ( dict[str, Any]
) –
Sample data in dictionary format suitable for storage.
Source code in plaid/storage/reader.py
| def plaid_to_dict(self, plaid_sample: Sample) -> dict[str, Any]:
"""Convert a PLAID Sample to dictionary format for storage.
Args:
plaid_sample: The PLAID Sample object to convert.
Returns:
dict: Sample data in dictionary format suitable for storage.
"""
return plaid_to_sample_dict(
plaid_sample,
self.variable_features,
self.constant_features,
)
|
plaid.storage.reader.Converter.__repr__
String representation of the Converter.
Returns:
-
str ( str
) –
String representation including the backend.
Source code in plaid/storage/reader.py
| def __repr__(self) -> str:
"""String representation of the Converter.
Returns:
str: String representation including the backend.
"""
return f"Converter(backend={self.backend})"
|
plaid.storage.reader.init_from_disk
init_from_disk(local_dir, splits=None)
Initialize dataset and converters from local disk.
This function loads a previously saved PLAID dataset from local disk, automatically
detecting the backend and creating appropriate converters for sample transformation.
Parameters:
-
local_dir
(Union[Path, str])
–
Path to the local directory containing the saved dataset.
-
splits
(Optional[list[str]], default:
None
)
–
Optional list of split names to load converters for. If None, converters
are created for all splits present in the dataset.
Returns:
-
tuple ( tuple[dict[str, Any], dict[str, Converter]]
) –
A tuple containing (datasetdict, converterdict) where datasetdict maps
split names to dataset objects and converterdict maps split names to Converter objects.
Source code in plaid/storage/reader.py
| def init_from_disk(
local_dir: Union[Path, str], splits: Optional[list[str]] = None
) -> tuple[dict[str, Any], dict[str, Converter]]:
"""Initialize dataset and converters from local disk.
This function loads a previously saved PLAID dataset from local disk, automatically
detecting the backend and creating appropriate converters for sample transformation.
Args:
local_dir: Path to the local directory containing the saved dataset.
splits: Optional list of split names to load converters for. If None, converters
are created for all splits present in the dataset.
Returns:
tuple: A tuple containing (datasetdict, converterdict) where datasetdict maps
split names to dataset objects and converterdict maps split names to Converter objects.
"""
infos = load_infos_from_disk(local_dir)
backend = infos.storage_backend
num_samples = infos.num_samples
datasetdict = get_backend(backend).init_from_disk(path=local_dir)
if splits is None:
splits = list(datasetdict.keys())
if backend == "cgns":
# CGNS samples are self-contained: no derived metadata is written or
# consumed for this backend.
flat_cst = {str(s): {} for s in splits}
variable_schema = {}
constant_schema = {str(s): {} for s in splits}
cgns_types = {}
else:
flat_cst, variable_schema, constant_schema, cgns_types = (
load_metadata_from_disk(local_dir)
)
converterdict = {}
for split in splits:
converterdict[split] = Converter(
backend,
flat_cst[str(split)],
cgns_types,
list(variable_schema.keys()),
list(constant_schema[str(split)].keys()),
num_samples[str(split)],
)
return datasetdict, converterdict
|
plaid.storage.reader.download_from_hub
download_from_hub(
repo_id,
local_dir,
split_ids=None,
features=None,
overwrite=False,
)
Download a PLAID dataset from Hugging Face Hub to local disk.
This function downloads a dataset from Hugging Face Hub, including data, metadata,
infos, and problem definitions, saving everything to local disk.
Parameters:
-
repo_id
(str)
–
Hugging Face repository ID (e.g., 'username/dataset-name').
-
local_dir
(Union[str, Path])
–
Local directory path where the dataset will be downloaded.
-
split_ids
(Optional[dict[str, Iterable[int]]], default:
None
)
–
Optional dictionary mapping split names to iterables of sample IDs to download.
-
features
(Optional[list[str]], default:
None
)
–
Optional list of features to download.
-
overwrite
(bool, default:
False
)
–
If True, overwrites existing local directory.
Source code in plaid/storage/reader.py
| def download_from_hub(
repo_id: str,
local_dir: Union[str, Path],
split_ids: Optional[dict[str, Iterable[int]]] = None,
features: Optional[list[str]] = None,
overwrite: bool = False,
): # pragma: no cover
"""Download a PLAID dataset from Hugging Face Hub to local disk.
This function downloads a dataset from Hugging Face Hub, including data, metadata,
infos, and problem definitions, saving everything to local disk.
Args:
repo_id: Hugging Face repository ID (e.g., 'username/dataset-name').
local_dir: Local directory path where the dataset will be downloaded.
split_ids: Optional dictionary mapping split names to iterables of sample IDs to download.
features: Optional list of features to download.
overwrite: If True, overwrites existing local directory.
"""
infos = load_infos_from_hub(repo_id)
pb_defs = load_problem_definitions_from_hub(repo_id)
backend = infos.storage_backend
backend_spec = get_backend(backend)
backend_spec.download_from_hub(repo_id, local_dir, split_ids, features, overwrite)
if backend != "cgns":
flat_cst, variable_schema, constant_schema, cgns_types = load_metadata_from_hub(
repo_id
)
save_metadata_to_disk(
local_dir, flat_cst, variable_schema, constant_schema, cgns_types
)
save_infos_to_disk(local_dir, infos)
if pb_defs is not None:
save_problem_definitions_to_disk(local_dir, pb_defs)
|
plaid.storage.reader.init_streaming_from_hub
init_streaming_from_hub(
repo_id, split_ids=None, features=None
)
Initialize streaming datasets from Hugging Face Hub.
This function creates streaming dataset objects from a Hugging Face Hub repository,
along with converters for sample transformation.
Parameters:
-
repo_id
(str)
–
Hugging Face repository ID (e.g., 'username/dataset-name').
-
split_ids
(Optional[dict[str, Iterable[int]]], default:
None
)
–
Optional dictionary mapping split names to iterables of sample IDs to stream.
-
features
(Optional[list[str]], default:
None
)
–
Optional list of features to stream.
Returns:
-
tuple ( tuple[dict[str, Any], dict[str, Converter]]
) –
A tuple containing (datasetdict, converterdict) where datasetdict maps
split names to streaming dataset objects and converterdict maps split names to Converter objects.
Source code in plaid/storage/reader.py
| def init_streaming_from_hub(
repo_id: str,
split_ids: Optional[dict[str, Iterable[int]]] = None,
features: Optional[list[str]] = None,
) -> tuple[dict[str, Any], dict[str, "Converter"]]: # pragma: no cover
"""Initialize streaming datasets from Hugging Face Hub.
This function creates streaming dataset objects from a Hugging Face Hub repository,
along with converters for sample transformation.
Args:
repo_id: Hugging Face repository ID (e.g., 'username/dataset-name').
split_ids: Optional dictionary mapping split names to iterables of sample IDs to stream.
features: Optional list of features to stream.
Returns:
tuple: A tuple containing (datasetdict, converterdict) where datasetdict maps
split names to streaming dataset objects and converterdict maps split names to Converter objects.
"""
flat_cst, variable_schema, constant_schema, cgns_types = load_metadata_from_hub(
repo_id
)
infos = load_infos_from_hub(repo_id)
backend = infos.storage_backend
num_samples = infos.num_samples
backend_spec = get_backend(backend)
datasetdict = backend_spec.init_datasetdict_streaming_from_hub(
repo_id, split_ids, features
)
converterdict = {}
for split in datasetdict.keys():
converterdict[split] = Converter(
backend,
flat_cst[str(split)],
cgns_types,
list(variable_schema.keys()),
list(constant_schema[str(split)].keys()),
num_samples[str(split)],
)
return datasetdict, converterdict
|