Skip to content

Conversion tutorial

End‑to‑end workflows for creating, saving, and loading PLAID datasets with the three storage backends: hf_datasets, cgns, and zarr.

Storage concepts

  • sample_constructor is a simple function that takes a single identifier (of any type) and returns a PLAID Sample. The identifier can be an integer, a file path, a string, a tuple — anything that makes sense for your data.
  • ids is a dictionary mapping split names to sliceable sequences of identifiers — anything with __getitem__ and __len__ (list, tuple, numpy array, …). PLAID handles iteration, generator creation, and parallel sharding internally.
  • save_to_disk writes a dataset locally; push_to_hub uploads it to Hugging Face Hub.
  • init_from_disk / download_from_hub / init_streaming_from_hub load datasets back into PLAID.
  • Backend converters turn raw backend samples into PLAID Sample objects.

Choosing a backend

Extensible backend interface: integrate the storage system that best fits your workflow with minimal changes to your PLAID code.

Capability CGNS Hugging Face Datasets Zarr
Feature-wise streaming
Human-readable
Zero-copy instantiation
Extremely large simulations
Parallel writing and reading
Recommended usage Sample visualization Low to medium scale Large-scale frontier

How to create data and save to disk/push to hub

import time
from pathlib import Path

import numpy as np

from plaid import Sample, ProblemDefinition, Infos
from plaid.storage import save_to_disk, push_to_hub

# plyfile and Muscat not included in plaid run dependencies
from plyfile import PlyData
from Muscat.Bridges.CGNSBridge import MeshToCGNS
from Muscat.MeshTools.MeshCreationTools import CreateMeshOf
import Muscat.MeshContainers.ElementsDescription as ED


N_PROC = 6 # number of parallel processes (set to 1 for sequential execution)

# raw data downloaded from https://zenodo.org/records/13993629
# set the folder where the raw data has been downloaded:
BASE_RAW_DATA_FOLDER = "/path/to/raw" # TO UPDATE
# set the folder where the data converted to plaid will be saved locally
BASE_GENERATED_DATA_FOLDER = "/path/to/generated" # TO UPDATE
# set the Hugging Face's repo_id where the datasets will be uploaded
BASE_REPO_ID = "channel/ShapeNetCar" # TO UPDATE
# set the folder where the downloaded data will be saved locally
BASE_DOWNLOADED_DATA_FOLDER = "/path/to/downloaded" # TO UPDATE

#---------------------------------------------------------------
# define some functions to handle ShapeNetCar data

with open(f"{BASE_RAW_DATA_FOLDER}/train.txt") as f:
    line = f.readline().strip()
    train_ids = [int(x) for x in line.split(",")]

with open(f"{BASE_RAW_DATA_FOLDER}/test.txt") as f:
    line = f.readline().strip()
    test_ids = [int(x) for x in line.split(",")]


base_dir = Path(f"{BASE_RAW_DATA_FOLDER}/data/")

tri_folders = [p for p in base_dir.iterdir() if p.is_dir()]

curated_train_ids = []
curated_test_ids = []

for count, folder in enumerate(tri_folders):
    id_ = int(folder.name)
    if id_ in train_ids:
        curated_train_ids.append(count)
    else:
        curated_test_ids.append(count)

# we can reduced the number of samples in each split for faster execution
curated_train_ids = curated_train_ids[:10]
curated_test_ids = curated_test_ids[:10]

#---------------------------------------------------------------
# infos and problem definition can be defined to correctly populate the dataset's metadata (they are not mandatory)

infos = Infos(
    owner="NeuralOperator (https://zenodo.org/records/13993629)",
    license="cc-by-4.0",
    data_description="No changes to data content from original dataset",
    data_production={
        "type": "simulation",
        "physics": "phase-field fracture models for brittle fracture",
        "script": "Subset 'res-SENS' of the initial dataset, 1/5th time steps, converted to PLAID format for standardized access; no changes to data content.",
    },
)


input_features = [
"Base_2_3/Zone/Elements_TRI_3/ElementConnectivity",
"Base_2_3/Zone/GridCoordinates/CoordinateX",
"Base_2_3/Zone/GridCoordinates/CoordinateY",
"Base_2_3/Zone/GridCoordinates/CoordinateZ",
]

output_features = [
"Base_2_3/Zone/VertexFields/pressure",
]

pb_def = ProblemDefinition(
    input_features=input_features,
    output_features=output_features,
    train_split={"train": "all"},
    test_split={"test": "all"},
)

#---------------------------------------------------------------
# Define a simple function that takes a single identifier and returns a Sample.
# PLAID handles iteration, generator creation, and parallel sharding internally.
# When num_proc > 1, PLAID automatically shards the ids across workers.

def sample_constructor(i):
    folder = tri_folders[i]

    plydata = PlyData.read(folder / "tri_mesh.ply")
    tris = np.ascontiguousarray(np.stack(plydata['face'].data['vertex_indices']))

    vertex_data = plydata['vertex'].data
    x = vertex_data['x']
    y = vertex_data['y']
    z = vertex_data['z']

    nodes = np.ascontiguousarray(np.stack((x, y, z)).T)

    mesh = CreateMeshOf(nodes, tris, elemName=ED.Triangle_3)

    press = np.load(folder / "press.npy")
    offset = np.abs(press.shape[0]-mesh.nodes.shape[0])
    mesh.nodeFields["pressure"] = press[offset:]

    tree = MeshToCGNS(mesh, exportOriginalIDs=False)

    sample = Sample()
    sample.add_tree(tree)

    return sample

ids = {"train": curated_train_ids,
       "test": curated_test_ids}

local_folder = f"{BASE_GENERATED_DATA_FOLDER}/hf_dataset"

# DISK
start = time.time()
save_to_disk(output_folder=local_folder,
            sample_constructor=sample_constructor,
            ids=ids,
            backend="hf_datasets",
            infos=infos,
            pb_defs={"regression_1": pb_def},
            num_proc=N_PROC,
            overwrite=True,
            verbose=True)
print(f"duration generate with num_proc={N_PROC} is {time.time()-start} s")

# HUB
start = time.time()
push_to_hub(repo_id=BASE_REPO_ID,
            local_dir=local_folder,
            num_workers=N_PROC,
            viewer=True,
            illustration_urls=["https://i.ibb.co/3mGHsHMk/Shape-Net-Car-samples.png"])
print(f"duration push to hub N_PROC={N_PROC} is {time.time()-start} s")

# Note: for maximal compatibility, you may need to call `save_to_disk` and `push_to_hub` under an `if __name__ == "__main__":` guard.

How to read data from disk/hub

import time

# pytorch not included in plaid dependencies
import torch
from torch.utils.data import DataLoader

from plaid.utils.cgns_helper import show_cgns_tree
from plaid.storage import init_from_disk, download_from_hub, init_streaming_from_hub
from plaid.storage import load_problem_definitions_from_disk


split = "train"

print("----------------------------------------------------")
print("-- Download datasets -------------------------------")
print("----------------------------------------------------")

# download dataset
download_folder = f"{BASE_DOWNLOADED_DATA_FOLDER}/downloaded_hf_dataset"

# depending on the backends, one can download a subset of the samples and features. We keep them all here
split_ids_ = None
features_ = None

download_from_hub(BASE_REPO_ID, download_folder, split_ids=split_ids_, features=features_, overwrite=True)

# Load problem definitions and define features as all the input and output features
pb_defs = load_problem_definitions_from_disk(download_folder)
pb_def = next(iter(pb_defs.values()))
features = pb_def.input_features + pb_def.output_features


print("-------------------------------------------------------")
print("-- Dataset local read and plaid sample instantiation --")
print("-------------------------------------------------------")

datasetdict, converterdict = init_from_disk(download_folder)

# specify one dataset/converter pair for one split
dataset = datasetdict[split]
converter = converterdict[split]

# generic way to instantiate all the samples
start = time.time()
for i in range(len(dataset)):
    plaid_sample = converter.to_plaid(dataset, i)
print(f"duration {time.time()-start}")

# Optional: extract only selected indices inside specific variable features
# (currently supported for hf_datasets and zarr backends).
field_path = "Base_2_3/Zone/VertexFields/pressure"
selected_idx = [0, 10, 20, 30]
plaid_sample_sub = converter.to_plaid(
    dataset,
    0,
    features=[field_path],
    indexers={field_path: selected_idx},
)

# raw backend record for the first sample (format is backend-specific, no PLAID instantiation)
sample = dataset[0]
# alternative way to instantiate a plaid sample (much slower for hf_datasets)
plaid_sample = converter.sample_to_plaid(dataset[0])

# save a plaid sample in a CGNS that can be opened in paraview
plaid_sample.save_to_dir(f"{BASE_DOWNLOADED_DATA_FOLDER}/sample_0_hf", overwrite = True)

# generic way to access all features for all time steps (values are returned but not stored here)
for t in plaid_sample.get_all_time_values():
    for path in pb_def.input_features:
        _ = plaid_sample.get_feature_by_path(path=path, time=t)
    for path in pb_def.output_features:
        _ = plaid_sample.get_feature_by_path(path=path, time=t)

# generic way to return the data as a dict containing all constant and variable features
sample_dict = converter.to_dict(dataset, 0)
sample_dict = converter.sample_to_dict(dataset[0])

# alternative way to return the data as a dict containing all constant and variable features from a plaid sample
sample_dict = converter.plaid_to_dict(plaid_sample)


print("----------------------------------------------------")
print("-- Torch dataloader + send to GPU ------------------")
print("----------------------------------------------------")

# define a simple class for efficient torch Dataloader iterations
class IndexDataset(torch.utils.data.Dataset):
    def __init__(self, n):
        self.n = n

    def __len__(self):
        return self.n

    def __getitem__(self, idx):
        return idx

datasetdict, converterdict = init_from_disk(download_folder)
dataset = datasetdict[split]
converter = converterdict[split]

# define a torch dataloader directly from this IndexDataset class
loader = DataLoader(
    IndexDataset(len(dataset)),
    batch_size=10,
    shuffle=False,
    num_workers=N_PROC,
    pin_memory=True,
    persistent_workers=True
)
start = time.time()
for batch in loader:
    for idx in batch:
        # efficient plaid sample reconstruction
        plaid_sample = converter.to_plaid(dataset, idx)
        # generic way of retrieving features and send them to GPU
        for time_ in plaid_sample.get_all_time_values():
            torch_sample = {}
            for path in features:
                value = plaid_sample.get_feature_by_path(path=path, time=time_)
                if value is not None:
                    if not value.flags.writeable:
                        value = value.copy()
                    torch_sample[path] = torch.as_tensor(value).to("cuda", non_blocking=True)
print(f"duration {time.time()-start}")

How to change a dataset backend (read then write in another backend)

from plaid.storage import init_from_disk, save_to_disk, load_infos_from_disk, load_problem_definitions_from_disk

FOLDER = "tests/containers/dataset_cgns"

ds, conv = init_from_disk(FOLDER)
infos = load_infos_from_disk(FOLDER)
pb_defs = load_problem_definitions_from_disk(FOLDER)

ids = {}
for split in ds.keys():
    ids[split] = [(split, i) for i in range(len(ds[split]))]

def sample_constructor(id_):
    split, index = id_[0], id_[1]
    return conv[split].to_plaid(ds[split], index)

if __name__ == "__main__":
    save_to_disk(output_folder="tests/containers/dataset_hf",
        sample_constructor=sample_constructor,
        ids=ids,
        backend="hf_datasets",
        infos=infos,
        pb_defs=pb_defs,
        num_proc=1,
        overwrite=True,
        verbose=True)

Indexed extraction with indexers

converter.to_dict(...) and converter.to_plaid(...) accept an optional indexers argument:

sample = converter.to_plaid(
    dataset,
    idx=0,
    features=["Base/Zone/VertexFields/mach"],
    indexers={"Base/Zone/VertexFields/mach": [1, 5, 9]},
)

Indexer behavior

  • indexers is a mapping feature_path -> indexer (list/array of indices or slice).
  • Indexing is applied on the last axis of each indexed feature.
  • This enables a “read less + one gathered output copy” behavior:
    • zarr: partial chunk reads + gathered output
    • hf_datasets: Arrow/NumPy best-effort gather + gathered output
  • cgns backend does not use this mechanism.
print("----------------------------------------------------")
print("-- Streaming test ----------------------------------")
print("----------------------------------------------------")


datasetdict, converterdict = init_streaming_from_hub(BASE_REPO_ID)

dataset = datasetdict[split]
converter = converterdict[split]

# dataset here is an IterableDataset, retrieving one sample and converting it to plaid
raw_sample = next(iter(dataset))
plaid_sample = converter.sample_to_plaid(raw_sample)

# utility to print a summary of the CGNS tree from the plaid sample
show_cgns_tree(plaid_sample.get_tree(0.))