plaid.storage.common.preprocessor¶
Common preprocessing utilities.
This module provides utilities for preprocessing PLAID samples into formats suitable for storage, including flattening CGNS trees, inferring data types, and handling parallel processing of sample shards.
Functions¶
|
Infer canonical dtype schema from a value. |
|
Flatten a PLAID Sample's CGNS trees into Hugging Face–compatible arrays and metadata. |
|
Process a single shard of sample ids and collect per-shard metadata. |
|
Pre-process dataset splits: inspect samples to infer features, constants and CGNS metadata. |
|
Preprocess generators to extract schemas and metadata. |
Module Contents¶
- build_sample_dict(sample: plaid.Sample) tuple[dict[str, Any], set[str], dict[str, str]][source]¶
Flatten a PLAID Sample’s CGNS trees into Hugging Face–compatible arrays and metadata.
The function traverses every CGNS tree stored in sample.features.data (keyed by time), produces a flattened mapping path -> primitive value for each time, and then builds compact numpy arrays suitable for storage in a Hugging Face Dataset. Repeated value blocks that are identical across times are deduplicated and referenced by start/end indices; companion “<path>_times” arrays describe, per time, the slice indices into the concatenated arrays.
- Parameters:
sample (Sample) – A PLAID Sample whose features contain one or more CGNS trees (sample.features.data maps time -> CGNSTree).
- Returns:
sample_dict (dict[str, Any]): Mapping of flattened CGNS paths to either a numpy array (concatenation of per-time blocks) or None. For each path there is also an entry “<path>_times” containing a flattened numpy array of triplets [time, start, end] (end == -1 indicates the block extends to the end of the array).
all_paths (list[str]): Sorted list of all considered variable feature paths (excluding Time-related nodes and CGNSLibraryVersion).
sample_cgns_types (dict[str, str]): Mapping from path to CGNS node type (metadata produced by flatten_cgns_tree).
- Return type:
Note
Byte-array encoded strings (dtype
"|S1") are handled by reassembling and storing the string as a single-element numpy array; a sha256 hash is used for deduplication.Deduplication reduces storage when identical blocks recur across times.
Paths containing “/Time” or “CGNSLibraryVersion” are ignored for variable features.
- process_shard(generator_fn: Callable[Ellipsis, Any], progress: Any, n_proc: int, shard_ids: list[plaid.types.IndexType] | None = None) tuple[set[str], dict[str, str], dict[str, Any], dict[str, dict[str, str | bool | int]], int][source]¶
Process a single shard of sample ids and collect per-shard metadata.
This function drives a shard-level pass over samples produced by generator_fn. For each sample it: - flattens the sample into Hugging Face friendly arrays (build_sample_dict), - collects observed flattened paths, - aggregates CGNS type metadata, - infers Hugging Face feature types for each path, - detects per-path constants using a content hash, - updates progress (either a multiprocessing.Queue or a tqdm progress bar).
- Parameters:
shard_ids (list[IndexType]) – Sequence of sample ids (a single shard) to process.
generator_fn (Callable) – Generator function accepting a list of shard id sequences and yielding Sample objects for those ids.
progress (Any) – Progress reporter; either a multiprocessing.Queue (for parallel execution) or a tqdm progress bar object (for sequential execution).
n_proc (int) – Number of worker processes used by the caller (used to decide how to report progress).
- Returns:
split_all_paths (set[str]): Set of all flattened feature paths observed in the shard.
shard_global_cgns_types (dict[str, str]): Mapping path -> CGNS node type observed in the shard.
shard_global_feature_types (dict[str, Union[Value, Sequence]]): Inferred feature types per path.
split_constant_leaves (dict[str, dict]): Per-path metadata for constant detection. Each entry is a dict with keys “hash” (str), “constant” (bool) and “count” (int).
n_samples_processed (int): Number of samples processed in this shard.
- Return type:
- Raises:
ValueError – If inconsistent feature types are detected for the same path within the shard.
- preprocess_splits(generators: dict[str, Callable[Ellipsis, Generator[plaid.Sample, None, None]]], gen_kwargs: dict[str, dict[str, Any]] | None = None, num_proc: int = 1, verbose: bool = True) tuple[dict[str, set[str]], dict[str, dict[str, Any]], dict[str, set[str]], dict[str, str], dict[str, Any], dict[str, int]][source]¶
Pre-process dataset splits: inspect samples to infer features, constants and CGNS metadata.
This function iterates over the provided split generators (optionally in parallel), flattens each PLAID sample into Hugging Face friendly arrays, detects constant CGNS leaves (features identical across all samples in a split), infers global Hugging Face feature types, and aggregates CGNS type metadata.
The work is sharded per-split and each shard is processed by process_shard. In parallel mode, progress is updated via a multiprocessing.Queue; otherwise a tqdm progress bar is used.
- Parameters:
generators (dict[str, Callable]) – Mapping from split name to a generator function. Each generator must accept a single argument (a sequence of shard ids) and yield PLAID samples.
gen_kwargs (dict[str, dict[str, list[IndexType]]]) – Per-split kwargs used to drive generator invocation (e.g. {“train”: {“shards_ids”: […]}}).
num_proc (int, optional) – Number of worker processes to use for shard-level parallelism. Defaults to 1.
verbose (bool, optional) – If True, displays progress bars. Defaults to True.
- Returns:
- split_all_paths (dict[str, set[str]]):
For each split, the set of all observed flattened feature paths (including “_times” keys).
- split_flat_cst (dict[str, dict[str, Any]]):
For each split, a mapping of constant feature path -> value (constant parts of the tree).
- split_var_path (dict[str, set[str]]):
For each split, the set of variable feature paths (non-constant).
- global_cgns_types (dict[str, str]):
Aggregated mapping from flattened path -> CGNS node type.
- global_feature_types (dict[str, Union[Value, Sequence]]):
Aggregated inferred Hugging Face feature types for each variable path.
- split_n_samples (dict[str, int]):
For each split, the total number of samples processed.
- Return type:
- Raises:
ValueError – If inconsistent feature types or CGNS types are detected across shards/splits.
- preprocess(generators: dict[str, Callable[Ellipsis, Generator[plaid.Sample, None, None]]], gen_kwargs: dict[str, dict[str, Any]] | None = None, num_proc: int = 1, verbose: bool = True)[source]¶
Preprocess generators to extract schemas and metadata.
- Parameters:
generators – Dict of split generators.
gen_kwargs – Optional generator kwargs for parallel processing.
num_proc – Number of processes.
verbose – Whether to show progress.
- Returns:
(split_flat_cst, variable_schema, constant_schema, split_n_samples, global_cgns_types)
- Return type: