Custom PyTorch Dataset & DataLoader: Multiprocessing Optimization Guide
This article walks through diagnosing a severe GPU under‑utilization bug in an 8‑A100 training job, explains why the default Dataset/DataLoader setup stalls, and presents a step‑by‑step redesign using MapDataset or IterableDataset, WebDataset tar shards, tuned DataLoader parameters, worker‑level seeding, GPU‑side prefetching, and distributed sampling to boost GPU utilization from 5‑12% to over 85% while cutting epoch time from 40 h to 9 h.
During a 7B instruction‑tuning experiment on an 8‑GPU A100 cluster the author observed only 5‑12% GPU utilization, 100% CPU load, and a single step taking 0.3 s, leading to an estimated 40‑hour epoch instead of the expected 8 h. After two days of investigation the root causes were identified:
Dataset __getitem__ reads JPEGs from disk for each sample, overwhelming I/O. num_workers was ineffective because the process used a single‑process loader.
On Windows, num_workers=16 with spawn took ten minutes to start.
Enabling pin_memory=True crashed on NUMA machines.
All preprocessing ran in the main process, causing the GPU to wait.
Random seeds were not synchronized across workers, breaking reproducibility.
To solve these issues the data‑loading stack was rebuilt as a four‑layer model:
┌───────────────────────────────────────┐
│ 1. Storage (SSD/NFS/S3) │
│ raw files / tar shards / parquet … │
└───────────────────────────────────────┘
↓
┌───────────────────────────────────────┐
│ 2. Dataset (MapDataset or Iterable) │
│ __getitem__ / __iter__ implementations│
└───────────────────────────────────────┘
↓
┌───────────────────────────────────────┐
│ 3. DataLoader (PyTorch / HF / WebDataset)│
│ num_workers, pin_memory, prefetch_factor, …│
└───────────────────────────────────────┘
↓
┌───────────────────────────────────────┐
│ 4. Transform & Collate (CPU) │
│ decode, augment, tokenize, padding │
└───────────────────────────────────────┘
↓
┌───────────────────────────────────────┐
│ 5. GPU (Model) │
│ forward / backward / optimizer step │
└───────────────────────────────────────┘The core processing of a single batch is:
[Main Process]
for batch in dataloader:
↓
[Sampler] select index list
↓
[Dispatch] distribute indices to workers
↓
[Worker 0..N] parallel:
- dataset[index] → read image / text
- preprocess / augment
- collate
↓
[Pin Memory] move tensors to page‑locked memory
↓
[Async Copy] CPU → GPU transfer
↓
[GPU Queue] batch ready for compute
↓
[Model] forward + backwardKey parameter formula (empirical):
def build_dataloader(dataset, batch_size: int, is_train: bool = True):
num_workers = min(8, os.cpu_count()) # 4‑8 is usually enough
prefetch_factor = 4 # each worker prefetches 4 batches
return DataLoader(
dataset,
batch_size=batch_size,
shuffle=is_train if isinstance(dataset, Dataset) else False,
num_workers=num_workers,
pin_memory=True,
prefetch_factor=prefetch_factor,
persistent_workers=is_train,
multiprocessing_context="fork" if sys.platform != "win32" else "spawn",
drop_last=is_train,
collate_fn=custom_collate_fn,
worker_init_fn=worker_init_fn,
)Worker initialization ensures each worker gets a unique deterministic seed:
def worker_init_fn(worker_id):
seed = torch.initial_seed() % (2**32)
np.random.seed(seed)
random.seed(seed)
# torch.cuda.manual_seed_all(seed) # usually not neededMapDataset vs IterableDataset : MapDataset provides random access and is suitable when the whole dataset fits in memory (<10 GB). IterableDataset streams data without loading everything into RAM and works best with large image collections stored as WebDataset tar shards.
# map_dataset.py
class ImageMapDataset(Dataset):
"""Random‑access dataset, good for shuffling."""
def __init__(self, image_paths, labels, transform=None):
self.image_paths = image_paths
self.labels = labels
self.transform = transform
def __len__(self):
return len(self.image_paths)
def __getitem__(self, idx):
img = Image.open(self.image_paths[idx]).convert("RGB")
if self.transform:
img = self.transform(img)
return img, self.labels[idx]
# iterable_dataset.py
class ImageIterableDataset(IterableDataset):
"""Streaming dataset, memory‑friendly."""
def __init__(self, data_dir, transform=None):
self.data_dir = data_dir
self.transform = transform
def __iter__(self):
worker_info = torch.utils.data.get_worker_info()
for i, path in enumerate(self._iter_paths()):
if worker_info and i % worker_info.num_workers != worker_info.id:
continue
img = Image.open(path).convert("RGB")
if self.transform:
img = self.transform(img)
yield img, self._get_label(path)WebDataset integration packs thousands of samples into a single tar file, reducing disk seeks dramatically and providing built‑in multi‑node sharding:
# webdataset_usage.py
import webdataset as wds
def build_webdataset(tar_pattern, batch_size):
def decode(sample):
img = Image.open(io.BytesIO(sample["jpg"])).convert("RGB")
img = transform(img)
label = json.loads(sample["json"]).get("class")
return img, label
dataset = (
wds.WebDataset(tar_pattern, shardshuffle=True, nodesplitter=wds.split_by_node)
.shuffle(1000)
.map(decode)
.batched(batch_size, partial=False)
)
return dataset
loader = build_webdataset("/data/imagenet/train-{0000..1023}.tar", batch_size=256)
for batch in loader:
images, labels = batch
# ... training ...HuggingFace datasets are used for large text corpora; the workflow caches the tokenized Arrow file to avoid re‑computing:
# hf_datasets_usage.py
from datasets import load_dataset
dataset = load_dataset("json", data_files="data.jsonl", split="train")
tokenized = dataset.map(
lambda x: tokenizer(x["text"], truncation=True, max_length=2048),
batched=True,
num_proc=8,
remove_columns=dataset.column_names,
cache_file_name="./cache/tokenized.arrow",
)
filtered = tokenized.filter(lambda x: len(x["input_ids"]) >= 10, num_proc=8)
def add_labels(example):
example["labels"] = example["input_ids"].copy()
return example
with_labels = filtered.map(add_labels, num_proc=8)
loader = DataLoader(
with_labels.with_format("torch"),
batch_size=32,
collate_fn=TextCollator(tokenizer),
num_workers=4,
pin_memory=True,
)GPU‑side prefetching hides the CPU‑to‑GPU transfer latency using a dedicated CUDA stream:
# gpu_prefetch.py
class CudaPrefetcher:
"""Asynchronously move batches to GPU."""
def __init__(self, loader, device):
self.loader = loader
self.device = device
self.stream = torch.cuda.Stream(device=device)
def __iter__(self):
self.iter = iter(self.loader)
self._preload()
return self
def _preload(self):
try:
self.next_batch = next(self.iter)
except StopIteration:
self.next_batch = None
with torch.cuda.stream(self.stream):
for k, v in self.next_batch.items():
if isinstance(v, torch.Tensor):
self.next_batch[k] = v.to(self.device, non_blocking=True)
def __next__(self):
torch.cuda.current_stream(self.device).wait_stream(self.stream)
batch = self.next_batch
if batch is None:
raise StopIteration
for k, v in batch.items():
if isinstance(v, torch.Tensor):
v.record_stream(torch.cuda.current_stream(self.device))
self._preload()
return batch
prefetcher = CudaPrefetcher(loader, device="cuda")
for batch in prefetcher:
loss = model(**batch)
loss.backward()
optimizer.step()Distributed training uses DistributedSampler so each rank sees a distinct shard and reshuffles every epoch:
# distributed_dataloader.py
from torch.utils.data import DataLoader, DistributedSampler
def build_distributed_loader(dataset, batch_size, rank, world_size):
sampler = DistributedSampler(
dataset,
num_replicas=world_size,
rank=rank,
shuffle=True,
seed=42,
drop_last=True,
)
loader = DataLoader(
dataset,
batch_size=batch_size,
sampler=sampler,
num_workers=8,
pin_memory=True,
prefetch_factor=4,
persistent_workers=True,
collate_fn=custom_collate_fn,
)
return loader, sampler
for epoch in range(num_epochs):
sampler.set_epoch(epoch)
for batch in loader:
# training step
...Memory‑mapped large arrays (e.g., contrastive‑learning embeddings) can be read on‑demand without loading the whole file into RAM:
# memmap_dataset.py
class MemmapArrayDataset(Dataset):
"""Zero‑copy read of a large np.float32 array."""
def __init__(self, array_path, labels_path):
self.data = np.load(array_path, mmap_mode="r")
self.labels = np.load(labels_path)
def __len__(self):
return len(self.labels)
def __getitem__(self, idx):
sample = np.array(self.data[idx]) # triggers actual read
return torch.from_numpy(sample), int(self.labels[idx])Diagnostic script measures per‑stage latency to pinpoint I/O, transform, or transfer bottlenecks:
# diagnose_io.py
import time, torch, numpy as np
from torch.utils.data import DataLoader
def diagnose(loader, num_batches=50):
times = {"fetch": [], "transform": [], "transfer": [], "compute": []}
for i, batch in enumerate(loader):
if i >= num_batches:
break
t0 = time.perf_counter()
if torch.cuda.is_available():
torch.cuda.synchronize()
t1 = time.perf_counter()
times["compute"].append(t1 - t0)
print("Average stage times (ms):")
for k, v in times.items():
if v:
print(f" {k}: {np.mean(v)*1000:.1f} ms")Monitoring commands such as nvidia-smi dmon -s pucm -d 1, iostat -x 1, and vmstat 1 help decide whether the bottleneck lies in GPU, CPU, or disk I/O.
Common pitfalls and fixes :
Too many workers (e.g., 64) duplicate large in‑memory objects and cause OOM – keep num_workers between 4‑8.
Windows requires multiprocessing_context="spawn" and the if __name__ == "__main__": guard.
On NUMA systems pin_memory=True may add cross‑node latency – either disable it or bind the process to the local GPU with torch.cuda.set_device(local_numa_gpu_id). IterableDataset does not support shuffle=True; use a buffer‑shuffle implementation or switch to MapDataset with WebDataset shard shuffling.
HF map caching must specify cache_file_name to avoid re‑computing on every run.
For multi‑node training ensure nodesplitter=wds.split_by_node (WebDataset) or DistributedSampler with set_epoch each epoch.
Evaluation metrics after deployment (target thresholds): GPU SM utilization ≥ 80 %, GPU memory ≥ 60 %, DataLoader batch fetch ≤ 50 ms, worker CPU ≤ 80 % of cores, disk I/O ≤ 70 %, epoch time matches expectations, HF cache hit ≥ 95 %.
Roadmap – short‑term: enable CUDA prefetcher and NUMA‑aware pinning; mid‑term: adopt WebDataset tar shards and distributed samplers; long‑term: move decoding to GPU (e.g., NVIDIA DALI) and explore zero‑I/O training where the whole dataset resides in GPU memory.
Finally, a one‑page cheat sheet summarises dataset size selection, DataLoader defaults, acceleration tricks, monitoring KPIs, and the most frequent pitfalls.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
MaGe Linux Operations
Founded in 2009, MaGe Education is a top Chinese high‑end IT training brand. Its graduates earn 12K+ RMB salaries, and the school has trained tens of thousands of students. It offers high‑pay courses in Linux cloud operations, Python full‑stack, automation, data analysis, AI, and Go high‑concurrency architecture. Thanks to quality courses and a solid reputation, it has talent partnerships with numerous internet firms.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
