How PyTorch DataLoader Transfers Data Between Processes Using Multiprocessing Queues
This article explains the internal mechanism of PyTorch's DataLoader when using multiple worker processes, detailing how tensors are serialized, shared via multiprocessing.Queue, and reconstructed in the main process to avoid unnecessary memory copies.
When num_workers is set to a non‑zero value, PyTorch's DataLoader creates that many worker processes; each worker reads samples from the Dataset, places the results into a torch.multiprocessing.Queue , and the main process retrieves the data from this queue.
The core worker loop looks like this:
while watchdog.is_alive():
try:
r = index_queue.get(timeout=MP_STATUS_CHECK_INTERVAL)
except queue.Empty:
continue
if isinstance(r, _ResumeIteration):
data_queue.put((r, None))
iteration_end = False
fetcher = _DatasetKind.create_fetcher(dataset_kind, dataset, auto_collation, collate_fn, drop_last)
continue
elif r is None:
assert done_event.is_set() or iteration_end
break
elif done_event.is_set() or iteration_end:
continue
idx, index = r
# fetch data ...
data_queue.put((idx, data))Here data_queue is an instance of torch.multiprocessing.Queue , which relies on Python's pickle module for serialization.
PyTorch tensors overload __reduce_ex__ so that only their metadata—storage reference, storage offset, size, stride, requires_grad , and backward hooks—are pickled. During deserialization, torch._utils._rebuild_tensor_v2 reconstructs the tensor using this metadata, while the actual data resides in shared memory.
Tensor storage objects also have a custom reduce function registered in torch/multiprocessing/reduction.py :
def reduce_storage(storage):
from . import get_sharing_strategy
if storage.is_cuda:
raise RuntimeError("Cannot pickle CUDA storage; try pickling a CUDA tensor instead")
elif get_sharing_strategy() == 'file_system':
metadata = storage._share_filename_()
cache_key = metadata[1]
rebuild = rebuild_storage_filename
storage._shared_incref()
elif storage.size() == 0:
return (rebuild_storage_empty, (type(storage),))
else:
fd, size = storage._share_fd_()
df = multiprocessing.reduction.DupFd(fd)
cache_key = fd_id(fd)
metadata = (df, size)
rebuild = rebuild_storage_fd
shared_cache[cache_key] = StorageWeakRef(storage)
return (rebuild, (type(storage),) + metadata)Depending on the sharing strategy, the storage is either shared via a temporary file, a file descriptor, or copied into shared memory; CUDA storage cannot be pickled.
During collation, if the batch elements are already tensors and the code runs inside a worker, PyTorch allocates a shared‑memory tensor and stacks the batch directly into it, eliminating the extra copy that would otherwise occur when the batch is placed into the queue:
if isinstance(elem, torch.Tensor):
out = None
if torch.utils.data.get_worker_info() is not None:
numel = sum([x.numel() for x in batch])
storage = elem.storage()._new_shared(numel)
out = elem.new(storage)
return torch.stack(batch, 0, out=out)Consequently, the DataLoader transfers only lightweight metadata through the queue while the bulk tensor data lives in shared memory, and it further reduces copies by using shared‑memory tensors during the collate step.
Python Programming Learning Circle
A global community of Chinese Python developers offering technical articles, columns, original video tutorials, and problem sets. Topics include web full‑stack development, web scraping, data analysis, natural language processing, image processing, machine learning, automated testing, DevOps automation, and big data.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.