"""Common storage writer utilities.
This module provides common utilities for writing dataset metadata, problem definitions,
and other auxiliary files to disk or uploading them to Hugging Face Hub. It handles
serialization of infos, problem definitions, and dataset tree structures.
"""
# -*- coding: utf-8 -*-
#
# This file is subject to the terms and conditions defined in
# file 'LICENSE.txt', which is part of this source code package.
#
#
import io
import json
import logging
from pathlib import Path
from typing import Any, Union
import numpy as np
import yaml
from huggingface_hub import HfApi
from plaid import ProblemDefinition
logger = logging.getLogger(__name__)
# ------------------------------------------------------
# Write to disk
# ------------------------------------------------------
[docs]
def save_infos_to_disk(
path: Union[str, Path], infos: dict[str, dict[str, str]]
) -> None:
"""Save dataset infos as a YAML file to disk.
Args:
path (Union[str, Path]): The directory path where the infos file will be saved.
infos (dict[str, dict[str, str]]): Dictionary containing dataset infos.
"""
infos_fname = Path(path) / "infos.yaml"
infos_fname.parent.mkdir(parents=True, exist_ok=True)
with open(infos_fname, "w") as file:
yaml.dump(infos, file, default_flow_style=False, sort_keys=False)
[docs]
def save_problem_definitions_to_disk(
path: Union[str, Path],
pb_defs: Union[dict[str, ProblemDefinition], ProblemDefinition],
) -> None:
"""Save ProblemDefinitions to disk.
Args:
path (Union[str, Path]): The directory path for saving.
pb_defs (Union[dict[str, ProblemDefinition], ProblemDefinition]): The problem definitions to save.
"""
if isinstance(pb_defs, ProblemDefinition):
pb_defs = {pb_defs.get_name(): pb_defs}
target_dir = Path(path) / "problem_definitions"
target_dir.mkdir(parents=True, exist_ok=True)
for name, pb_def in pb_defs.items():
if name is None:
raise ValueError(
"At least one of the provided pb_defs has no initialized name."
)
pb_def.save_to_file(target_dir / name)
[docs]
def save_constants_to_disk(path, constant_schema, flat_cst):
"""Write constant features to disk under <path>/constants/.
For each split in flat_cst this creates a directory:
<path>/constants/<split>/
- data.mmap : concatenated raw bytes of all constants for that split
- layout.json : mapping constant_name -> {'offset': int, 'shape': [...] } or None
- constant_schema.yaml : the provided schema for that split (dtype and ndim)
Behavior:
- Numeric constants are written as their C-order bytes.
- String constants support two cases:
* CGNS string scalar: a 1-element array of Python str -> written as ASCII bytes, shape recorded as [len].
* CGNS char array: multi-char arrays -> converted to fixed-width bytes and written.
- If a schema entry's dtype is None, the layout entry is set to None and no bytes are written.
Args:
path (str | Path): Root dataset directory where "constants" will be created.
constant_schema (dict): Mapping split -> {constant_name: {'dtype': str | None, 'ndim': int, ...}}.
flat_cst (dict): Mapping split -> {constant_name: numpy array | None} containing values to save.
Returns:
None
Raises:
AssertionError: if a numeric array does not match the expected ndim.
OSError / IOError: on file system write errors.
"""
for split in flat_cst.keys():
layout = {}
offset = 0
cst_path = path / "constants" / split
cst_path.mkdir(parents=True, exist_ok=True)
with open(cst_path / "data.mmap", "wb") as f:
for key, spec in constant_schema[split].items():
dtype = spec["dtype"]
if dtype is None:
layout[key] = None
continue
value = flat_cst[split][key]
# -----------------
# STRING CASE
# -----------------
if dtype == "string":
arr = np.asarray(value)
# ---- CASE 1: CGNS string scalar ----
if arr.ndim == 1 and arr.size == 1:
s = arr[0]
assert isinstance(s, str)
raw = s.encode("ascii", "strict")
f.write(raw)
shape = [len(raw)]
nbytes = len(raw)
layout[key] = {
"offset": offset,
"shape": shape,
"dtype": "|S1",
}
# ---- CASE 2: CGNS char array ----
else: # pragma: no cover
arr = arr.astype("<U1")
arr_bytes = arr.astype("|S1")
f.write(arr_bytes.tobytes(order="C"))
shape = list(arr.shape)
nbytes = arr_bytes.nbytes
layout[key] = {
"offset": offset,
"shape": shape,
"dtype": "|S1",
}
# -----------------
# NUMERIC CASE
# -----------------
else:
arr = np.asarray(value)
assert arr.ndim == spec["ndim"]
# FORCE contiguous + little-endian
arr = np.ascontiguousarray(arr)
arr = arr.astype(arr.dtype.newbyteorder("<"), copy=False)
f.write(arr.tobytes(order="C"))
shape = list(arr.shape)
nbytes = arr.nbytes
layout[key] = {
"offset": offset,
"shape": shape,
"dtype": arr.dtype.str,
}
offset += nbytes
json.dump(layout, open(cst_path / "layout.json", "w"), indent=2)
with open(cst_path / "constant_schema.yaml", "w", encoding="utf-8") as f:
yaml.dump(constant_schema[split], f, sort_keys=False)
# ------------------------------------------------------
# Push to hub
# ------------------------------------------------------
[docs]
def push_infos_to_hub(
repo_id: str, infos: dict[str, dict[str, str]]
) -> None: # pragma: no cover (not tested in unit tests)
"""Upload dataset infos.yaml to a Hugging Face dataset repository.
Serializes the provided `infos` mapping to YAML and uploads it as `infos.yaml`
to the target `repo_id` using the HfApi.
Args:
repo_id (str): Hugging Face dataset repository identifier (e.g. "user/repo").
infos (dict[str, dict[str, str]]): Dataset infos mapping to serialize and upload.
Raises:
ValueError: If `infos` is empty.
OSError / IOError: If the upload fails due to I/O errors or network problems.
Notes:
- The function uses HfApi.upload_file and constructs the file contents in-memory.
- Not covered by unit tests (pragma: no cover).
"""
if len(infos) > 0:
api = HfApi()
yaml_str = yaml.dump(infos)
yaml_buffer = io.BytesIO(yaml_str.encode("utf-8"))
api.upload_file(
path_or_fileobj=yaml_buffer,
path_in_repo="infos.yaml",
repo_id=repo_id,
repo_type="dataset",
commit_message="Upload infos.yaml",
)
else:
raise ValueError("'infos' must not be empty")
[docs]
def push_local_problem_definitions_to_hub(
repo_id: str,
path: Union[Path, str],
) -> None: # pragma: no cover (not tested in unit tests)
"""Upload local ProblemDefinitions to a Hugging Face dataset repository.
This function uploads the entire local ``problem_definitions/`` directory
located under ``path`` to the target Hugging Face dataset repository using
``HfApi.upload_folder``.
Expected local layout:
<path>/
problem_definitions/
<name_1>
<name_2>
...
Each problem definition is assumed to already be serialized on disk
(e.g. via ``ProblemDefinition.save_to_file``). The function performs a
directory-level upload and does not inspect, validate, or re-serialize
individual problem definitions.
Args:
repo_id (str):
Hugging Face dataset repository identifier
(e.g. ``"username/dataset_name"``).
path (Union[Path, str]):
Root dataset directory containing the ``problem_definitions/`` folder.
Notes:
- Upload is atomic at the folder level.
- Existing files in ``problem_definitions/`` on the Hub may be overwritten.
- Uses ``repo_type="dataset"``.
- Not covered by unit tests (``pragma: no cover``).
Raises:
OSError / IOError:
If the local folder does not exist or an upload error occurs.
"""
path = Path(path)
api = HfApi()
api.upload_folder(
folder_path=path / Path("problem_definitions"),
repo_id=repo_id,
repo_type="dataset",
path_in_repo="problem_definitions",
commit_message="Upload problem_definitions",
)