Skip to content

plaid.storage.hf_datasets.reader

plaid.storage.hf_datasets.reader

Reader for hf dataset storage.

  • If the environment variable HF_ENDPOINT is set, uses a private Hugging Face mirror.

    • Streaming is disabled.
    • The dataset is downloaded locally via snapshot_download and loaded from disk.
  • If HF_ENDPOINT is not set, attempts to load from the public Hugging Face hub.

    • If the dataset is already cached locally, loads from disk.
    • Otherwise, loads from the hub, optionally using streaming mode.

plaid.storage.hf_datasets.reader.init_datasetdict_from_disk

init_datasetdict_from_disk(path)

Initializes a DatasetDict from local disk files.

Parameters:

  • path (Union[str, Path]) –

    Path to the directory containing the dataset files.

Returns:

  • HFDatasetDict ( HFDatasetDict ) –

    The loaded dataset dictionary.

Source code in plaid/storage/hf_datasets/reader.py
def init_datasetdict_from_disk(path: Union[str, Path]) -> HFDatasetDict:
    """Initializes a DatasetDict from local disk files.

    Args:
        path (Union[str, Path]): Path to the directory containing the dataset files.

    Returns:
        HFDatasetDict: The loaded dataset dictionary.
    """
    dataset = load_from_disk(dataset_path=str(Path(path) / "data"))
    if not isinstance(dataset, datasets.DatasetDict):  # pragma: no cover
        raise TypeError(
            "Expected DatasetDict when loading hf_datasets backend from disk"
        )
    return dataset

plaid.storage.hf_datasets.reader.download_datasetdict_from_hub

download_datasetdict_from_hub(
    repo_id,
    local_dir,
    split_ids=None,
    features=None,
    overwrite=False,
)

Downloads a dataset from Hugging Face Hub to local directory.

Parameters:

  • repo_id (str) –

    The repository ID on Hugging Face Hub.

  • local_dir (Union[str, Path]) –

    Local directory to download to.

  • split_ids (Optional[dict[str, Iterable[int]]], default: None ) –

    Unused parameter for split selection.

  • features (Optional[list[str]], default: None ) –

    Unused parameter for feature selection.

  • overwrite (bool, default: False ) –

    Whether to overwrite existing directory.

Returns:

  • str ( str ) –

    Path to the downloaded dataset.

Source code in plaid/storage/hf_datasets/reader.py
def download_datasetdict_from_hub(
    repo_id: str,
    local_dir: Union[str, Path],
    split_ids: Optional[dict[str, Iterable[int]]] = None,  # noqa: ARG001
    features: Optional[list[str]] = None,  # noqa: ARG001
    overwrite: bool = False,
) -> str:  # pragma: no cover (not tested in unit tests)
    """Downloads a dataset from Hugging Face Hub to local directory.

    Args:
        repo_id (str): The repository ID on Hugging Face Hub.
        local_dir (Union[str, Path]): Local directory to download to.
        split_ids (Optional[dict[str, Iterable[int]]]): Unused parameter for split selection.
        features (Optional[list[str]]): Unused parameter for feature selection.
        overwrite (bool): Whether to overwrite existing directory.

    Returns:
        str: Path to the downloaded dataset.
    """
    output_folder = Path(local_dir)

    if output_folder.is_dir():
        if overwrite:
            shutil.rmtree(output_folder)
            logger.warning(f"Existing {output_folder} directory has been reset.")
        elif any(output_folder.iterdir()):
            raise ValueError(
                f"directory {output_folder} already exists and is not empty. Set `overwrite` to True if needed."
            )

    with tempfile.TemporaryDirectory() as tmp_dir:
        snapshot_download(
            repo_id=repo_id,
            repo_type="dataset",
            allow_patterns=["data/*"],
            local_dir=tmp_dir,
        )
        infos = load_infos_from_hub(repo_id=repo_id)
        split_names = list(infos.num_samples.keys())
        base = Path(tmp_dir) / "data"
        data_files = {sn: str(base / f"{sn}*.parquet") for sn in split_names}
        datasetdict = load_dataset("parquet", data_files=data_files, cache_dir=tmp_dir)
        datasetdict.save_to_disk(str(Path(output_folder) / "data"))

    return str(output_folder)

plaid.storage.hf_datasets.reader.init_datasetdict_streaming_from_hub

init_datasetdict_streaming_from_hub(
    repo_id, split_ids=None, features=None
)

Initializes a streaming DatasetDict from Hugging Face Hub.

Parameters:

  • repo_id (str) –

    The repository ID on Hugging Face Hub.

  • split_ids (Optional[dict[str, Iterable[int]]], default: None ) –

    Unused parameter for split selection.

  • features (Optional[list[str]], default: None ) –

    Optional list of features to load.

Returns:

  • Any

    datasets.IterableDatasetDict: The streaming dataset dictionary.

Source code in plaid/storage/hf_datasets/reader.py
def init_datasetdict_streaming_from_hub(
    repo_id: str,
    split_ids: Optional[dict[str, Iterable[int]]] = None,  # noqa: ARG001
    features: Optional[list[str]] = None,
) -> Any:  # pragma: no cover
    """Initializes a streaming DatasetDict from Hugging Face Hub.

    Args:
        repo_id (str): The repository ID on Hugging Face Hub.
        split_ids (Optional[dict[str, Iterable[int]]]): Unused parameter for split selection.
        features (Optional[list[str]]): Optional list of features to load.

    Returns:
        datasets.IterableDatasetDict: The streaming dataset dictionary.
    """
    hf_endpoint = os.getenv("HF_ENDPOINT", "").strip()
    if hf_endpoint:
        raise RuntimeError("Streaming mode not compatible with private mirror.")

    return load_dataset(repo_id, streaming=True, columns=features)