plaid.storage.common.preprocessor ================================= .. py:module:: plaid.storage.common.preprocessor .. autoapi-nested-parse:: Common preprocessing utilities. This module provides utilities for preprocessing PLAID samples into formats suitable for storage, including flattening CGNS trees, inferring data types, and handling parallel processing of sample shards. Functions --------- .. autoapisummary:: plaid.storage.common.preprocessor.infer_dtype plaid.storage.common.preprocessor.build_sample_dict plaid.storage.common.preprocessor.process_shard plaid.storage.common.preprocessor.preprocess_splits plaid.storage.common.preprocessor.preprocess Module Contents --------------- .. py:function:: infer_dtype(value: Any) -> dict[str, int | str] Infer canonical dtype schema from a value. .. py:function:: build_sample_dict(sample: plaid.Sample) -> tuple[dict[str, Any], set[str], dict[str, str]] Flatten a PLAID Sample's CGNS trees into Hugging Face–compatible arrays and metadata. The function traverses every CGNS tree stored in sample.features.data (keyed by time), produces a flattened mapping path -> primitive value for each time, and then builds compact numpy arrays suitable for storage in a Hugging Face Dataset. Repeated value blocks that are identical across times are deduplicated and referenced by start/end indices; companion "_times" arrays describe, per time, the slice indices into the concatenated arrays. :param sample: A PLAID Sample whose features contain one or more CGNS trees (sample.features.data maps time -> CGNSTree). :type sample: Sample :returns: - sample_dict (dict[str, Any]): Mapping of flattened CGNS paths to either a numpy array (concatenation of per-time blocks) or None. For each path there is also an entry "_times" containing a flattened numpy array of triplets [time, start, end] (end == -1 indicates the block extends to the end of the array). - all_paths (list[str]): Sorted list of all considered variable feature paths (excluding Time-related nodes and CGNSLibraryVersion). - sample_cgns_types (dict[str, str]): Mapping from path to CGNS node type (metadata produced by flatten_cgns_tree). :rtype: tuple .. note:: - Byte-array encoded strings (dtype ``"|S1"``) are handled by reassembling and storing the string as a single-element numpy array; a sha256 hash is used for deduplication. - Deduplication reduces storage when identical blocks recur across times. - Paths containing "/Time" or "CGNSLibraryVersion" are ignored for variable features. .. py:function:: process_shard(generator_fn: Callable[Ellipsis, Any], progress: Any, n_proc: int, shard_ids: Optional[list[plaid.types.IndexType]] = None) -> tuple[set[str], dict[str, str], dict[str, Any], dict[str, dict[str, Union[str, bool, int]]], int] Process a single shard of sample ids and collect per-shard metadata. This function drives a shard-level pass over samples produced by `generator_fn`. For each sample it: - flattens the sample into Hugging Face friendly arrays (build_sample_dict), - collects observed flattened paths, - aggregates CGNS type metadata, - infers Hugging Face feature types for each path, - detects per-path constants using a content hash, - updates progress (either a multiprocessing.Queue or a tqdm progress bar). :param shard_ids: Sequence of sample ids (a single shard) to process. :type shard_ids: list[IndexType] :param generator_fn: Generator function accepting a list of shard id sequences and yielding Sample objects for those ids. :type generator_fn: Callable :param progress: Progress reporter; either a multiprocessing.Queue (for parallel execution) or a tqdm progress bar object (for sequential execution). :type progress: Any :param n_proc: Number of worker processes used by the caller (used to decide how to report progress). :type n_proc: int :returns: - split_all_paths (set[str]): Set of all flattened feature paths observed in the shard. - shard_global_cgns_types (dict[str, str]): Mapping path -> CGNS node type observed in the shard. - shard_global_feature_types (dict[str, Union[Value, Sequence]]): Inferred feature types per path. - split_constant_leaves (dict[str, dict]): Per-path metadata for constant detection. Each entry is a dict with keys "hash" (str), "constant" (bool) and "count" (int). - n_samples_processed (int): Number of samples processed in this shard. :rtype: tuple :raises ValueError: If inconsistent feature types are detected for the same path within the shard. .. py:function:: preprocess_splits(generators: dict[str, Callable[Ellipsis, Generator[plaid.Sample, None, None]]], gen_kwargs: Optional[dict[str, dict[str, Any]]] = None, num_proc: int = 1, verbose: bool = True) -> tuple[dict[str, set[str]], dict[str, dict[str, Any]], dict[str, set[str]], dict[str, str], dict[str, Any], dict[str, int]] Pre-process dataset splits: inspect samples to infer features, constants and CGNS metadata. This function iterates over the provided split generators (optionally in parallel), flattens each PLAID sample into Hugging Face friendly arrays, detects constant CGNS leaves (features identical across all samples in a split), infers global Hugging Face feature types, and aggregates CGNS type metadata. The work is sharded per-split and each shard is processed by `process_shard`. In parallel mode, progress is updated via a multiprocessing.Queue; otherwise a tqdm progress bar is used. :param generators: Mapping from split name to a generator function. Each generator must accept a single argument (a sequence of shard ids) and yield PLAID samples. :type generators: dict[str, Callable] :param gen_kwargs: Per-split kwargs used to drive generator invocation (e.g. {"train": {"shards_ids": [...]}}). :type gen_kwargs: dict[str, dict[str, list[IndexType]]] :param num_proc: Number of worker processes to use for shard-level parallelism. Defaults to 1. :type num_proc: int, optional :param verbose: If True, displays progress bars. Defaults to True. :type verbose: bool, optional :returns: - split_all_paths (dict[str, set[str]]): For each split, the set of all observed flattened feature paths (including "_times" keys). - split_flat_cst (dict[str, dict[str, Any]]): For each split, a mapping of constant feature path -> value (constant parts of the tree). - split_var_path (dict[str, set[str]]): For each split, the set of variable feature paths (non-constant). - global_cgns_types (dict[str, str]): Aggregated mapping from flattened path -> CGNS node type. - global_feature_types (dict[str, Union[Value, Sequence]]): Aggregated inferred Hugging Face feature types for each variable path. - split_n_samples (dict[str, int]): For each split, the total number of samples processed. :rtype: tuple :raises ValueError: If inconsistent feature types or CGNS types are detected across shards/splits. .. py:function:: preprocess(generators: dict[str, Callable[Ellipsis, Generator[plaid.Sample, None, None]]], gen_kwargs: Optional[dict[str, dict[str, Any]]] = None, num_proc: int = 1, verbose: bool = True) Preprocess generators to extract schemas and metadata. :param generators: Dict of split generators. :param gen_kwargs: Optional generator kwargs for parallel processing. :param num_proc: Number of processes. :param verbose: Whether to show progress. :returns: (split_flat_cst, variable_schema, constant_schema, split_n_samples, global_cgns_types) :rtype: tuple