Skip to content

plaid.storage.hf_datasets.writer

plaid.storage.hf_datasets.writer

HF Datasets writer module.

This module provides functionality for writing and managing datasets in Hugging Face Datasets format for the PLAID library. It includes utilities for generating datasets from sample generators, saving to disk, uploading to Hugging Face Hub, and configuring dataset cards with metadata.

Key features: - Dataset generation from generators with parallel processing - Disk saving with automatic sharding - Hub uploading with optimized sharding - Dataset card configuration and updating

plaid.storage.hf_datasets.writer.save_datasetdict_to_disk

save_datasetdict_to_disk(path, hf_datasetdict, **kwargs)

Save a Hugging Face DatasetDict to disk.

This function serializes the provided DatasetDict and writes it to the specified directory, preserving its features, splits, and data for later loading.

Parameters:

  • path (Union[str, Path]) –

    Directory path where the DatasetDict will be saved.

  • hf_datasetdict (DatasetDict) –

    The Hugging Face DatasetDict to save.

  • **kwargs (Any, default: {} ) –

    Keyword arguments forwarded to DatasetDict.save_to_disk.

Returns:

  • None

    None

Source code in plaid/storage/hf_datasets/writer.py
def save_datasetdict_to_disk(
    path: Union[str, Path], hf_datasetdict: Any, **kwargs: Any
) -> None:
    """Save a Hugging Face DatasetDict to disk.

    This function serializes the provided DatasetDict and writes it to the specified
    directory, preserving its features, splits, and data for later loading.

    Args:
        path (Union[str, Path]): Directory path where the DatasetDict will be saved.
        hf_datasetdict (datasets.DatasetDict): The Hugging Face DatasetDict to save.
        **kwargs:
            Keyword arguments forwarded to
            [`DatasetDict.save_to_disk`](https://huggingface.co/docs/datasets/main/en/package_reference/main_classes#datasets.DatasetDict.save_to_disk).

    Returns:
        None
    """
    num_shards = _compute_num_shards(hf_datasetdict)
    num_proc = kwargs.get("num_proc", None)
    if num_proc is not None:  # pragma: no cover
        min_num_shards = min(num_shards.values())
        if min_num_shards < num_proc:
            logger.warning(
                f"num_proc changed from {num_proc} to 1 to safely adapt for num_shards={num_shards}"
            )
            num_proc = 1
        del kwargs["num_proc"]

    hf_datasetdict.save_to_disk(
        str(Path(path) / "data"), num_shards=num_shards, num_proc=num_proc, **kwargs
    )

plaid.storage.hf_datasets.writer.generate_datasetdict_to_disk

generate_datasetdict_to_disk(
    output_folder,
    generators,
    variable_schema,
    gen_kwargs=None,
    num_proc=1,
    verbose=False,
)

Generates and saves a DatasetDict to disk from sample generators.

Parameters:

  • output_folder (Union[str, Path]) –

    Base directory to save the dataset.

  • generators (dict[str, Callable[..., Generator[Sample, None, None]]]) –

    Dictionary of split names to generator functions.

  • variable_schema (dict[str, dict]) –

    Schema describing variables.

  • gen_kwargs (Optional[dict[str, dict[str, IndexArrayType]]], default: None ) –

    Optional generator arguments for parallel processing.

  • num_proc (int, default: 1 ) –

    Number of processes for generation.

  • verbose (bool, default: False ) –

    Whether to enable verbose output.

Source code in plaid/storage/hf_datasets/writer.py
def generate_datasetdict_to_disk(
    output_folder: Union[str, Path],
    generators: dict[str, Callable[..., Generator[Sample, None, None]]],
    variable_schema: dict[str, dict],
    gen_kwargs: Optional[dict[str, dict[str, Any]]] = None,
    num_proc: int = 1,
    verbose: bool = False,  # noqa: ARG001
) -> None:
    """Generates and saves a DatasetDict to disk from sample generators.

    Args:
        output_folder (Union[str, Path]): Base directory to save the dataset.
        generators (dict[str, Callable[..., Generator[Sample, None, None]]]):
            Dictionary of split names to generator functions.
        variable_schema (dict[str, dict]): Schema describing variables.
        gen_kwargs (Optional[dict[str, dict[str, IndexArrayType]]]): Optional
            generator arguments for parallel processing.
        num_proc (int): Number of processes for generation.
        verbose (bool): Whether to enable verbose output.
    """
    with tempfile.TemporaryDirectory() as tmpdirname:
        hf_datasetdict = generator_to_datasetdict(
            generators,
            variable_schema,
            cache_dir=tmpdirname,
            gen_kwargs=gen_kwargs,
            processes_number=num_proc,
        )
        save_datasetdict_to_disk(output_folder, hf_datasetdict, num_proc=num_proc)
        del hf_datasetdict
        gc.collect()

plaid.storage.hf_datasets.writer.push_datasetdict_to_hub

push_datasetdict_to_hub(repo_id, hf_datasetdict, **kwargs)

Push a Hugging Face DatasetDict to the Hugging Face Hub.

This is a thin wrapper around datasets.DatasetDict.push_to_hub, allowing you to upload a dataset dictionary (with one or more splits such as "train", "validation", "test") to the Hugging Face Hub.

Note

The function automatically handles sharding of the dataset by setting num_shards for each split. For each split, the number of shards is set to the minimum between the number of samples in that split and such that shards are targetted to approx. 500 MB. This ensures efficient chunking while preventing excessive fragmentation. Empty splits will raise an assertion error.

Parameters:

  • repo_id (str) –

    The repository ID on the Hugging Face Hub (e.g. "username/dataset_name").

  • hf_datasetdict (DatasetDict) –

    The Hugging Face dataset dictionary to push.

  • **kwargs (Any, default: {} ) –

    Keyword arguments forwarded to DatasetDict.push_to_hub.

Returns:

  • None

    None

Source code in plaid/storage/hf_datasets/writer.py
def push_datasetdict_to_hub(
    repo_id: str, hf_datasetdict: Any, **kwargs: Any
) -> None:  # pragma: no cover
    """Push a Hugging Face `DatasetDict` to the Hugging Face Hub.

    This is a thin wrapper around `datasets.DatasetDict.push_to_hub`, allowing
    you to upload a dataset dictionary (with one or more splits such as
    `"train"`, `"validation"`, `"test"`) to the Hugging Face Hub.

    Note:
        The function automatically handles sharding of the dataset by setting `num_shards`
        for each split. For each split, the number of shards is set to the minimum between
        the number of samples in that split and such that shards are targetted to approx. 500 MB.
        This ensures efficient chunking while preventing excessive fragmentation. Empty splits
        will raise an assertion error.

    Args:
        repo_id (str):
            The repository ID on the Hugging Face Hub
            (e.g. `"username/dataset_name"`).
        hf_datasetdict (datasets.DatasetDict):
            The Hugging Face dataset dictionary to push.
        **kwargs:
            Keyword arguments forwarded to
            [`DatasetDict.push_to_hub`](https://huggingface.co/docs/datasets/main/en/package_reference/main_classes#datasets.DatasetDict.push_to_hub).

    Returns:
        None
    """
    num_shards = _compute_num_shards(hf_datasetdict)
    num_proc = kwargs.get("num_proc", None)
    if num_proc is not None:  # pragma: no cover
        min_num_shards = min(num_shards.values())
        if min_num_shards < num_proc:
            logger.warning(
                f"num_proc changed from {num_proc} to 1 to safely adapt for num_shards={num_shards}"
            )
            num_proc = 1
        del kwargs["num_proc"]

    hf_datasetdict.push_to_hub(
        repo_id, num_shards=num_shards, num_proc=num_proc, **kwargs
    )

plaid.storage.hf_datasets.writer.push_local_datasetdict_to_hub

push_local_datasetdict_to_hub(
    repo_id, local_dir, num_workers=1
)

Pushes a local DatasetDict to Hugging Face Hub.

Parameters:

  • repo_id (str) –

    The repository ID on Hugging Face Hub.

  • local_dir (Union[str, Path]) –

    Local directory containing the dataset.

  • num_workers (int, default: 1 ) –

    Number of workers for uploading.

Source code in plaid/storage/hf_datasets/writer.py
def push_local_datasetdict_to_hub(
    repo_id: str, local_dir: Union[str, Path], num_workers: int = 1
) -> None:  # pragma: no cover
    """Pushes a local DatasetDict to Hugging Face Hub.

    Args:
        repo_id (str): The repository ID on Hugging Face Hub.
        local_dir (Union[str, Path]): Local directory containing the dataset.
        num_workers (int): Number of workers for uploading.
    """
    datasetdict = init_datasetdict_from_disk(local_dir)
    push_datasetdict_to_hub(repo_id, datasetdict, num_proc=num_workers)

plaid.storage.hf_datasets.writer.configure_dataset_card

configure_dataset_card(
    repo_id,
    infos,
    local_dir=None,
    viewer=False,
    pretty_name=None,
    dataset_long_description=None,
    illustration_urls=None,
    arxiv_paper_urls=None,
)

Configures and updates a dataset card on Hugging Face Hub for HF datasets backend.

This function downloads the existing README.md (dataset card) from the specified Hugging Face repository, modifies it by adding metadata such as license, viewer settings, task categories, tags, and optional descriptions/illustrations. It then pushes the updated card back to the repository.

Parameters:

  • repo_id (str) –

    The Hugging Face repository ID where the dataset card is located and will be updated.

  • infos (Infos) –

    Dataset metadata, including legal information like license.

  • local_dir (Optional[Union[str, Path]], default: None ) –

    Unused parameter for local directory path.

  • viewer (bool, default: False ) –

    Whether to enable the dataset viewer. Defaults to False, which sets 'viewer: false' in the card.

  • pretty_name (Optional[str], default: None ) –

    A human-readable name for the dataset to display in the card.

  • dataset_long_description (Optional[str], default: None ) –

    A detailed description of the dataset to include in the card.

  • illustration_urls (Optional[list[str]], default: None ) –

    List of URLs to images that illustrate the dataset, displayed in the card.

  • arxiv_paper_urls (Optional[list[str]], default: None ) –

    List of arXiv URLs for papers related to the dataset, included as sources.

Returns:

  • None ( None ) –

    This function does not return a value; it updates the dataset card directly on Hugging Face Hub.

Source code in plaid/storage/hf_datasets/writer.py
def configure_dataset_card(
    repo_id: str,
    infos: Infos,
    local_dir: Optional[Union[str, Path]] = None,  # noqa: ARG001
    viewer: bool = False,
    pretty_name: Optional[str] = None,
    dataset_long_description: Optional[str] = None,
    illustration_urls: Optional[list[str]] = None,
    arxiv_paper_urls: Optional[list[str]] = None,
) -> None:  # pragma: no cover
    """Configures and updates a dataset card on Hugging Face Hub for HF datasets backend.

    This function downloads the existing README.md (dataset card) from the specified
    Hugging Face repository, modifies it by adding metadata such as license, viewer
    settings, task categories, tags, and optional descriptions/illustrations. It then
    pushes the updated card back to the repository.

    Args:
        repo_id (str): The Hugging Face repository ID where the dataset card is located
            and will be updated.
        infos (Infos): Dataset metadata,
            including legal information like license.
        local_dir (Optional[Union[str, Path]]): Unused parameter for local directory path.
        viewer (bool): Whether to enable the dataset viewer. Defaults to False, which
            sets 'viewer: false' in the card.
        pretty_name (Optional[str]): A human-readable name for the dataset to
            display in the card.
        dataset_long_description (Optional[str]): A detailed description of the
            dataset to include in the card.
        illustration_urls (Optional[list[str]]): List of URLs to images that
            illustrate the dataset, displayed in the card.
        arxiv_paper_urls (Optional[list[str]]): List of arXiv URLs for papers
            related to the dataset, included as sources.

    Returns:
        None: This function does not return a value; it updates the dataset card
            directly on Hugging Face Hub.
    """
    infos_dict = infos.model_dump(exclude_none=True)
    readme_path = hf_hub_download(
        repo_id=repo_id, filename="README.md", repo_type="dataset"
    )

    with open(readme_path, "r", encoding="utf-8") as f:
        dataset_card_str = f.read()

    lines = dataset_card_str.splitlines()
    lines = [s for s in lines if not s.startswith("license")]

    indices = [i for i, line in enumerate(lines) if line.strip() == "---"]

    assert len(indices) >= 2, (
        "Cannot find two instances of '---', you should try to update a correct dataset_card."
    )
    lines = lines[: indices[1] + 1]

    count = 1
    lines.insert(count, f"license: {infos.license}")
    count += 1
    if viewer is False:
        lines.insert(count, "viewer: false")
        count += 1
    lines.insert(count, "task_categories:")
    count += 1
    lines.insert(count, "- graph-ml")
    count += 1
    if pretty_name:
        lines.insert(count, f"pretty_name: {pretty_name}")
        count += 1
    lines.insert(count, "tags:")
    count += 1
    lines.insert(count, "- physics learning")
    count += 1
    lines.insert(count, "- geometry learning")
    count += 1

    str__ = "\n".join(lines) + "\n"

    if illustration_urls:
        str__ += "<p align='center'>\n"
        for url in illustration_urls:
            str__ += f"<img src='{url}' alt='{url}' width='1000'/>\n"
        str__ += "</p>\n\n"

    str__ += (
        f"```yaml\n{yaml.dump(infos_dict, sort_keys=False, allow_unicode=True)}\n```"
    )

    str__ += """
This dataset was generated with [`plaid`](https://plaid-lib.readthedocs.io/), we refer to this documentation for additional details on how to extract data from `plaid_sample` objects.

The simplest way to use this dataset is to first download it:
```python
from plaid.storage import download_from_hub

repo_id = "channel/dataset"
local_folder = "downloaded_dataset"

download_from_hub(repo_id, local_folder)
```

Then, to iterate over the dataset and instantiate samples:
```python
from plaid.storage import init_from_disk

local_folder = "downloaded_dataset"
split_name = "train"

datasetdict, converterdict = init_from_disk(local_folder)

dataset = datasetdict[split]
converter = converterdict[split]

for i in range(len(dataset)):
    plaid_sample = converter.to_plaid(dataset, i)
```

It is possible to stream the data directly:
```python
from plaid.storage import init_streaming_from_hub

repo_id = "channel/dataset"

datasetdict, converterdict = init_streaming_from_hub(repo_id)

dataset = datasetdict[split]
converter = converterdict[split]

for sample_raw in dataset:
    plaid_sample = converter.sample_to_plaid(sample_raw)
```

Sample features can then be retrieved as follows:
```python
from plaid.storage import load_problem_definitions_from_disk
local_folder = "downloaded_dataset"
pb_defs = load_problem_definitions_from_disk(local_folder)

# or
from plaid.storage import load_problem_definitions_from_hub
repo_id = "channel/dataset"
pb_defs = load_problem_definitions_from_hub(repo_id)


pb_def = next(iter(pb_defs.values()))

plaid_sample = ... # use a method from above to instantiate a plaid sample

for t in plaid_sample.get_all_time_values():
    for path in pb_def.input_features:
        feature = plaid_sample.get_feature_by_path(path=path, time=t)
        ...
    for path in pb_def.output_features:
        feature = plaid_sample.get_feature_by_path(path=path, time=t)
        ...
```

For those familiar with HF's `datasets` library, raw data can be retrieved without using the `plaid` library:
```python
from datasets import load_dataset

repo_id = "channel/dataset"

datasetdict = load_dataset(repo_id)

for split_name, dataset in datasetdict.items():
    for raw_sample in dataset:
        for feat_name in dataset.column_names:
            feature = raw_sample[feat_name]
```
Notice that raw data refers to the variable features only, with a specific encoding for time variable features.
"""

    if dataset_long_description:
        str__ += f"""
### Dataset Description
{dataset_long_description}
"""

    if arxiv_paper_urls:
        str__ += """
### Dataset Sources

- **Papers:**
"""
        for url in arxiv_paper_urls:
            str__ += f"   - [arxiv]({url})\n"

    dataset_card = DatasetCard(str__)
    dataset_card.push_to_hub(repo_id)