Conversion tutorial¶
End‑to‑end workflows for creating, saving, and loading PLAID datasets with the three storage backends: hf_datasets, cgns, and zarr.
Storage concepts¶
sample_constructoris a simple function that takes a single identifier (of any type) and returns a PLAIDSample. The identifier can be an integer, a file path, a string, a tuple — anything that makes sense for your data.idsis a dictionary mapping split names to sliceable sequences of identifiers — anything with__getitem__and__len__(list, tuple, numpy array, …). PLAID handles iteration, generator creation, and parallel sharding internally.save_to_diskwrites a dataset locally;push_to_hubuploads it to Hugging Face Hub.init_from_disk/download_from_hub/init_streaming_from_hubload datasets back into PLAID.- Backend converters turn raw backend samples into PLAID
Sampleobjects.
Choosing a backend¶
Extensible backend interface: integrate the storage system that best fits your workflow with minimal changes to your PLAID code.
| Capability | CGNS | Hugging Face Datasets | Zarr |
|---|---|---|---|
| Feature-wise streaming | |||
| Human-readable | |||
| Zero-copy instantiation | |||
| Extremely large simulations | |||
| Parallel writing and reading | |||
| Recommended usage | Sample visualization | Low to medium scale | Large-scale frontier |
How to create data and save to disk/push to hub¶
import time
from pathlib import Path
import numpy as np
from plaid import Sample, ProblemDefinition, Infos
from plaid.storage import save_to_disk, push_to_hub
# plyfile and Muscat not included in plaid run dependencies
from plyfile import PlyData
from Muscat.Bridges.CGNSBridge import MeshToCGNS
from Muscat.MeshTools.MeshCreationTools import CreateMeshOf
import Muscat.MeshContainers.ElementsDescription as ED
N_PROC = 6 # number of parallel processes (set to 1 for sequential execution)
# raw data downloaded from https://zenodo.org/records/13993629
# set the folder where the raw data has been downloaded:
BASE_RAW_DATA_FOLDER = "/path/to/raw" # TO UPDATE
# set the folder where the data converted to plaid will be saved locally
BASE_GENERATED_DATA_FOLDER = "/path/to/generated" # TO UPDATE
# set the Hugging Face's repo_id where the datasets will be uploaded
BASE_REPO_ID = "channel/ShapeNetCar" # TO UPDATE
# set the folder where the downloaded data will be saved locally
BASE_DOWNLOADED_DATA_FOLDER = "/path/to/downloaded" # TO UPDATE
#---------------------------------------------------------------
# define some functions to handle ShapeNetCar data
with open(f"{BASE_RAW_DATA_FOLDER}/train.txt") as f:
line = f.readline().strip()
train_ids = [int(x) for x in line.split(",")]
with open(f"{BASE_RAW_DATA_FOLDER}/test.txt") as f:
line = f.readline().strip()
test_ids = [int(x) for x in line.split(",")]
base_dir = Path(f"{BASE_RAW_DATA_FOLDER}/data/")
tri_folders = [p for p in base_dir.iterdir() if p.is_dir()]
curated_train_ids = []
curated_test_ids = []
for count, folder in enumerate(tri_folders):
id_ = int(folder.name)
if id_ in train_ids:
curated_train_ids.append(count)
else:
curated_test_ids.append(count)
# we can reduced the number of samples in each split for faster execution
curated_train_ids = curated_train_ids[:10]
curated_test_ids = curated_test_ids[:10]
#---------------------------------------------------------------
# infos and problem definition can be defined to correctly populate the dataset's metadata (they are not mandatory)
infos = Infos(
owner="NeuralOperator (https://zenodo.org/records/13993629)",
license="cc-by-4.0",
data_description="No changes to data content from original dataset",
data_production={
"type": "simulation",
"physics": "phase-field fracture models for brittle fracture",
"script": "Subset 'res-SENS' of the initial dataset, 1/5th time steps, converted to PLAID format for standardized access; no changes to data content.",
},
)
input_features = [
"Base_2_3/Zone/Elements_TRI_3/ElementConnectivity",
"Base_2_3/Zone/GridCoordinates/CoordinateX",
"Base_2_3/Zone/GridCoordinates/CoordinateY",
"Base_2_3/Zone/GridCoordinates/CoordinateZ",
]
output_features = [
"Base_2_3/Zone/VertexFields/pressure",
]
pb_def = ProblemDefinition(
input_features=input_features,
output_features=output_features,
train_split={"train": "all"},
test_split={"test": "all"},
)
#---------------------------------------------------------------
# Define a simple function that takes a single identifier and returns a Sample.
# PLAID handles iteration, generator creation, and parallel sharding internally.
# When num_proc > 1, PLAID automatically shards the ids across workers.
def sample_constructor(i):
folder = tri_folders[i]
plydata = PlyData.read(folder / "tri_mesh.ply")
tris = np.ascontiguousarray(np.stack(plydata['face'].data['vertex_indices']))
vertex_data = plydata['vertex'].data
x = vertex_data['x']
y = vertex_data['y']
z = vertex_data['z']
nodes = np.ascontiguousarray(np.stack((x, y, z)).T)
mesh = CreateMeshOf(nodes, tris, elemName=ED.Triangle_3)
press = np.load(folder / "press.npy")
offset = np.abs(press.shape[0]-mesh.nodes.shape[0])
mesh.nodeFields["pressure"] = press[offset:]
tree = MeshToCGNS(mesh, exportOriginalIDs=False)
sample = Sample()
sample.add_tree(tree)
return sample
ids = {"train": curated_train_ids,
"test": curated_test_ids}
local_folder = f"{BASE_GENERATED_DATA_FOLDER}/hf_dataset"
# DISK
start = time.time()
save_to_disk(output_folder=local_folder,
sample_constructor=sample_constructor,
ids=ids,
backend="hf_datasets",
infos=infos,
pb_defs={"regression_1": pb_def},
num_proc=N_PROC,
overwrite=True,
verbose=True)
print(f"duration generate with num_proc={N_PROC} is {time.time()-start} s")
# HUB
start = time.time()
push_to_hub(repo_id=BASE_REPO_ID,
local_dir=local_folder,
num_workers=N_PROC,
viewer=True,
illustration_urls=["https://i.ibb.co/3mGHsHMk/Shape-Net-Car-samples.png"])
print(f"duration push to hub N_PROC={N_PROC} is {time.time()-start} s")
# Note: for maximal compatibility, you may need to call `save_to_disk` and `push_to_hub` under an `if __name__ == "__main__":` guard.
How to read data from disk/hub¶
import time
# pytorch not included in plaid dependencies
import torch
from torch.utils.data import DataLoader
from plaid.utils.cgns_helper import show_cgns_tree
from plaid.storage import init_from_disk, download_from_hub, init_streaming_from_hub
from plaid.storage import load_problem_definitions_from_disk
split = "train"
print("----------------------------------------------------")
print("-- Download datasets -------------------------------")
print("----------------------------------------------------")
# download dataset
download_folder = f"{BASE_DOWNLOADED_DATA_FOLDER}/downloaded_hf_dataset"
# depending on the backends, one can download a subset of the samples and features. We keep them all here
split_ids_ = None
features_ = None
download_from_hub(BASE_REPO_ID, download_folder, split_ids=split_ids_, features=features_, overwrite=True)
# Load problem definitions and define features as all the input and output features
pb_defs = load_problem_definitions_from_disk(download_folder)
pb_def = next(iter(pb_defs.values()))
features = pb_def.input_features + pb_def.output_features
print("-------------------------------------------------------")
print("-- Dataset local read and plaid sample instantiation --")
print("-------------------------------------------------------")
datasetdict, converterdict = init_from_disk(download_folder)
# specify one dataset/converter pair for one split
dataset = datasetdict[split]
converter = converterdict[split]
# generic way to instantiate all the samples
start = time.time()
for i in range(len(dataset)):
plaid_sample = converter.to_plaid(dataset, i)
print(f"duration {time.time()-start}")
# Optional: extract only selected indices inside specific variable features
# (currently supported for hf_datasets and zarr backends).
field_path = "Base_2_3/Zone/VertexFields/pressure"
selected_idx = [0, 10, 20, 30]
plaid_sample_sub = converter.to_plaid(
dataset,
0,
features=[field_path],
indexers={field_path: selected_idx},
)
# raw backend record for the first sample (format is backend-specific, no PLAID instantiation)
sample = dataset[0]
# alternative way to instantiate a plaid sample (much slower for hf_datasets)
plaid_sample = converter.sample_to_plaid(dataset[0])
# save a plaid sample in a CGNS that can be opened in paraview
plaid_sample.save_to_dir(f"{BASE_DOWNLOADED_DATA_FOLDER}/sample_0_hf", overwrite = True)
# generic way to access all features for all time steps (values are returned but not stored here)
for t in plaid_sample.get_all_time_values():
for path in pb_def.input_features:
_ = plaid_sample.get_feature_by_path(path=path, time=t)
for path in pb_def.output_features:
_ = plaid_sample.get_feature_by_path(path=path, time=t)
# generic way to return the data as a dict containing all constant and variable features
sample_dict = converter.to_dict(dataset, 0)
sample_dict = converter.sample_to_dict(dataset[0])
# alternative way to return the data as a dict containing all constant and variable features from a plaid sample
sample_dict = converter.plaid_to_dict(plaid_sample)
print("----------------------------------------------------")
print("-- Torch dataloader + send to GPU ------------------")
print("----------------------------------------------------")
# define a simple class for efficient torch Dataloader iterations
class IndexDataset(torch.utils.data.Dataset):
def __init__(self, n):
self.n = n
def __len__(self):
return self.n
def __getitem__(self, idx):
return idx
datasetdict, converterdict = init_from_disk(download_folder)
dataset = datasetdict[split]
converter = converterdict[split]
# define a torch dataloader directly from this IndexDataset class
loader = DataLoader(
IndexDataset(len(dataset)),
batch_size=10,
shuffle=False,
num_workers=N_PROC,
pin_memory=True,
persistent_workers=True
)
start = time.time()
for batch in loader:
for idx in batch:
# efficient plaid sample reconstruction
plaid_sample = converter.to_plaid(dataset, idx)
# generic way of retrieving features and send them to GPU
for time_ in plaid_sample.get_all_time_values():
torch_sample = {}
for path in features:
value = plaid_sample.get_feature_by_path(path=path, time=time_)
if value is not None:
if not value.flags.writeable:
value = value.copy()
torch_sample[path] = torch.as_tensor(value).to("cuda", non_blocking=True)
print(f"duration {time.time()-start}")
How to change a dataset backend (read then write in another backend)¶
from plaid.storage import init_from_disk, save_to_disk, load_infos_from_disk, load_problem_definitions_from_disk
FOLDER = "tests/containers/dataset_cgns"
ds, conv = init_from_disk(FOLDER)
infos = load_infos_from_disk(FOLDER)
pb_defs = load_problem_definitions_from_disk(FOLDER)
ids = {}
for split in ds.keys():
ids[split] = [(split, i) for i in range(len(ds[split]))]
def sample_constructor(id_):
split, index = id_[0], id_[1]
return conv[split].to_plaid(ds[split], index)
if __name__ == "__main__":
save_to_disk(output_folder="tests/containers/dataset_hf",
sample_constructor=sample_constructor,
ids=ids,
backend="hf_datasets",
infos=infos,
pb_defs=pb_defs,
num_proc=1,
overwrite=True,
verbose=True)
Indexed extraction with indexers¶
converter.to_dict(...) and converter.to_plaid(...) accept an optional
indexers argument:
sample = converter.to_plaid(
dataset,
idx=0,
features=["Base/Zone/VertexFields/mach"],
indexers={"Base/Zone/VertexFields/mach": [1, 5, 9]},
)
Indexer behavior
indexersis a mappingfeature_path -> indexer(list/array of indices or slice).- Indexing is applied on the last axis of each indexed feature.
- This enables a “read less + one gathered output copy” behavior:
- zarr: partial chunk reads + gathered output
- hf_datasets: Arrow/NumPy best-effort gather + gathered output
- cgns backend does not use this mechanism.
print("----------------------------------------------------")
print("-- Streaming test ----------------------------------")
print("----------------------------------------------------")
datasetdict, converterdict = init_streaming_from_hub(BASE_REPO_ID)
dataset = datasetdict[split]
converter = converterdict[split]
# dataset here is an IterableDataset, retrieving one sample and converting it to plaid
raw_sample = next(iter(dataset))
plaid_sample = converter.sample_to_plaid(raw_sample)
# utility to print a summary of the CGNS tree from the plaid sample
show_cgns_tree(plaid_sample.get_tree(0.))