📂 数据集处理
在深度学习中,我们需要高效地加载和处理数据。PyTorch提供了Dataset和DataLoader两个核心类。
🤔 为什么需要Dataset和DataLoader?
想象一下这些问题:
- 数据太多,内存放不下怎么办?
- 如何把数据分成小批次(batch)?
- 如何打乱数据顺序?
- 如何并行加载数据加快速度?
Dataset和DataLoader就是来解决这些问题的!
Dataset:定义数据在哪里、怎么读取
↓
DataLoader:分批、打乱、并行加载
↓
模型训练
📦 使用内置数据集
PyTorch提供了很多常用数据集,可以直接使用:
import torch
from torchvision import datasets, transforms
# 定义数据预处理
transform = transforms.Compose([
transforms.ToTensor(), # 转为张量
transforms.Normalize((0.5,), (0.5,)) # 标准化
])
# 下载并加载MNIST数据集
train_dataset = datasets.MNIST(
root='./data', # 存储路径
train=True, # 训练集
download=True, # 自动下载
transform=transform # 应用预处理
)
test_dataset = datasets.MNIST(
root='./data',
train=False, # 测试集
download=True,
transform=transform
)
print(f"训练集大小: {len(train_dataset)}") # 60000
print(f"测试集大小: {len(test_dataset)}") # 10000
# 查看一个样本
image, label = train_dataset[0]
print(f"图片形状: {image.shape}") # torch.Size([1, 28, 28])
print(f"标签: {label}") # 数字标签(0-9)
常用内置数据集
from torchvision import datasets
# 图像分类
datasets.MNIST # 手写数字
datasets.CIFAR10 # 10类彩色图片
datasets.CIFAR100 # 100类彩色图片
datasets.ImageNet # 大规模图像分类
# 文本数据(使用torchtext)
# from torchtext import datasets
# datasets.IMDB # 电影评论情感分析
🛠️ 创建自定义Dataset
大多数情况下,你需要加载自己的数据。继承Dataset类即可:
import torch
from torch.utils.data import Dataset
class MyDataset(Dataset):
def __init__(self, data, labels):
"""
初始化数据集
"""
self.data = data
self.labels = labels
def __len__(self):
"""
返回数据集大小
"""
return len(self.data)
def __getitem__(self, idx):
"""
获取单个样本
"""
return self.data[idx], self.labels[idx]
# 使用示例
X = torch.randn(100, 10) # 100个样本,每个10维特征
y = torch.randint(0, 2, (100,)) # 100个二分类标签
dataset = MyDataset(X, y)
print(f"数据集大小: {len(dataset)}")
# 获取一个样本
sample_x, sample_y = dataset[0]
print(f"样本特征: {sample_x.shape}")
print(f"样本标签: {sample_y}")
实际例子:从CSV加载数据
import torch
from torch.utils.data import Dataset
import pandas as pd
class CSVDataset(Dataset):
def __init__(self, csv_file, feature_cols, label_col):
"""
从CSV文件加载数据
参数:
csv_file: CSV文件路径
feature_cols: 特征列名列表
label_col: 标签列名
"""
self.df = pd.read_csv(csv_file)
self.features = torch.tensor(
self.df[feature_cols].values,
dtype=torch.float32
)
self.labels = torch.tensor(
self.df[label_col].values,
dtype=torch.long
)
def __len__(self):
return len(self.df)
def __getitem__(self, idx):
return self.features[idx], self.labels[idx]
# 使用示例(假设有一个data.csv文件)
# dataset = CSVDataset('data.csv', ['feature1', 'feature2'], 'label')
实际例子:图像文件夹数据集
import torch
from torch.utils.data import Dataset
from PIL import Image
import os
class ImageFolderDataset(Dataset):
def __init__(self, root_dir, transform=None):
"""
从文件夹加载图片
文件夹结构:
root_dir/
class1/
img1.jpg
img2.jpg
class2/
img3.jpg
img4.jpg
"""
self.root_dir = root_dir
self.transform = transform
self.samples = []
self.class_to_idx = {}
# 遍历所有类别
for idx, class_name in enumerate(sorted(os.listdir(root_dir))):
class_dir = os.path.join(root_dir, class_name)
if os.path.isdir(class_dir):
self.class_to_idx[class_name] = idx
for img_name in os.listdir(class_dir):
img_path = os.path.join(class_dir, img_name)
self.samples.append((img_path, idx))
def __len__(self):
return len(self.samples)
def __getitem__(self, idx):
img_path, label = self.samples[idx]
image = Image.open(img_path).convert('RGB')
if self.transform:
image = self.transform(image)
return image, label
💡 小技巧
PyTorch已经提供了ImageFolder类,功能和上面的类似:
from torchvision.datasets import ImageFolder
dataset = ImageFolder(root='path/to/data', transform=transform)
🔄 DataLoader详解
DataLoader负责从Dataset中加载数据并组织成批次:
from torch.utils.data import DataLoader
# 创建DataLoader
train_loader = DataLoader(
dataset=train_dataset, # 数据集
batch_size=32, # 每批32个样本
shuffle=True, # 打乱顺序
num_workers=4, # 4个进程并行加载
drop_last=True # 丢弃最后不足一批的数据
)
# 迭代数据
for batch_idx, (data, labels) in enumerate(train_loader):
print(f"批次 {batch_idx}: 数据形状 {data.shape}, 标签形状 {labels.shape}")
# 这里进行训练...
# output = model(data)
# loss = criterion(output, labels)
if batch_idx >= 2: # 只看前3批
break
DataLoader常用参数
| 参数 | 说明 | 默认值 |
|---|---|---|
batch_size | 每批样本数 | 1 |
shuffle | 是否打乱 | False |
num_workers | 并行加载进程数 | 0 |
drop_last | 丢弃最后不完整批次 | False |
pin_memory | 固定内存(GPU训练时用) | False |
collate_fn | 自定义批次整理函数 | 默认 |
# 完整示例
train_loader = DataLoader(
dataset=train_dataset,
batch_size=64,
shuffle=True,
num_workers=4,
pin_memory=True, # GPU训练时设为True
drop_last=True
)
# 测试集不需要打乱
test_loader = DataLoader(
dataset=test_dataset,
batch_size=64,
shuffle=False, # 测试时不打乱
num_workers=4
)
⚠️ Windows用户注意
在Windows上使用num_workers > 0时,需要把代码放在if __name__ == '__main__':中:
if __name__ == '__main__':
loader = DataLoader(dataset, num_workers=4)
for data, label in loader:
pass
🔧 数据预处理 transforms
torchvision.transforms提供了丰富的数据预处理功能:
from torchvision import transforms
# 组合多个变换
transform = transforms.Compose([
# 图像变换
transforms.Resize((224, 224)), # 调整大小
transforms.RandomCrop(200), # 随机裁剪
transforms.RandomHorizontalFlip(), # 随机水平翻转
transforms.RandomRotation(10), # 随机旋转±10度
# 转换为张量(必需)
transforms.ToTensor(), # [0,255] → [0,1]
# 标准化
transforms.Normalize(
mean=[0.485, 0.456, 0.406], # ImageNet均值
std=[0.229, 0.224, 0.225] # ImageNet标准差
)
])
常用transforms
| 变换 | 说明 |
|---|---|
ToTensor() | PIL/numpy转张量,值缩放到[0,1] |
Normalize(mean, std) | 标准化 |
Resize(size) | 调整大小 |
CenterCrop(size) | 中心裁剪 |
RandomCrop(size) | 随机裁剪 |
RandomHorizontalFlip() | 随机水平翻转 |
RandomVerticalFlip() | 随机垂直翻转 |
RandomRotation(degrees) | 随机旋转 |
ColorJitter() | 颜色抖动 |
训练和测试使用不同的变换
# 训练时:使用数据增强
train_transform = transforms.Compose([
transforms.Resize(256),
transforms.RandomCrop(224),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
# 测试时:不使用随机变换
test_transform = transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(224), # 中心裁剪而非随机
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
📊 数据集划分
from torch.utils.data import random_split
# 假设有1000个样本
full_dataset = MyDataset(data, labels)
# 划分训练集和验证集
train_size = int(0.8 * len(full_dataset)) # 80%训练
val_size = len(full_dataset) - train_size # 20%验证
train_dataset, val_dataset = random_split(
full_dataset,
[train_size, val_size],
generator=torch.Generator().manual_seed(42) # 固定随机种子
)
print(f"训练集: {len(train_dataset)}")
print(f"验证集: {len(val_dataset)}")
🎯 完整示例
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
# 1. 定义数据集
class SimpleDataset(Dataset):
def __init__(self, num_samples=1000):
# 模拟一些数据
self.X = torch.randn(num_samples, 10)
self.y = (self.X.sum(dim=1) > 0).long()
def __len__(self):
return len(self.X)
def __getitem__(self, idx):
return self.X[idx], self.y[idx]
# 2. 创建数据集
dataset = SimpleDataset(1000)
# 3. 划分训练/测试集
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = torch.utils.data.random_split(
dataset, [train_size, test_size]
)
# 4. 创建DataLoader
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)
# 5. 使用数据
print("训练一个epoch:")
for batch_idx, (data, labels) in enumerate(train_loader):
print(f" 批次 {batch_idx+1}: 输入 {data.shape}, 标签 {labels.shape}")
# 这里进行模型训练...
# output = model(data)
# loss = criterion(output, labels)
# loss.backward()
# optimizer.step()
print(f"\n总批次数: {len(train_loader)}")
🎨 高级数据增强
v2 transforms(推荐,PyTorch 2.0+)
from torchvision.transforms import v2
# 新版transforms更快、更灵活
transform = v2.Compose([
v2.RandomResizedCrop(224, scale=(0.8, 1.0)),
v2.RandomHorizontalFlip(p=0.5),
v2.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
v2.RandomRotation(degrees=15),
v2.ToTensor(),
v2.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
自定义transform
import torch
from torchvision import transforms
class AddGaussianNoise:
"""添加高斯噪声"""
def __init__(self, mean=0., std=0.1):
self.mean = mean
self.std = std
def __call__(self, tensor):
return tensor + torch.randn_like(tensor) * self.std + self.mean
class RandomErasing:
"""随机遮挡"""
def __init__(self, p=0.5, scale=(0.02, 0.33)):
self.p = p
self.scale = scale
def __call__(self, tensor):
if torch.rand(1) > self.p:
return tensor
h, w = tensor.shape[-2:]
area = h * w
erase_area = torch.empty(1).uniform_(*self.scale).item() * area
aspect_ratio = torch.empty(1).uniform_(0.3, 3.3).item()
h_erase = int((erase_area * aspect_ratio) ** 0.5)
w_erase = int((erase_area / aspect_ratio) ** 0.5)
if h_erase < h and w_erase < w:
top = torch.randint(0, h - h_erase, (1,)).item()
left = torch.randint(0, w - w_erase, (1,)).item()
tensor[..., top:top+h_erase, left:left+w_erase] = 0
return tensor
# 使用自定义transform
transform = transforms.Compose([
transforms.ToTensor(),
AddGaussianNoise(std=0.05),
RandomErasing(p=0.3)
])
MixUp和CutMix数据增强
import torch
import torch.nn.functional as F
def mixup_data(x, y, alpha=0.2):
"""MixUp: 混合两个样本"""
if alpha > 0:
lam = torch.distributions.Beta(alpha, alpha).sample()
else:
lam = 1
batch_size = x.size(0)
index = torch.randperm(batch_size)
mixed_x = lam * x + (1 - lam) * x[index]
y_a, y_b = y, y[index]
return mixed_x, y_a, y_b, lam
def mixup_criterion(criterion, pred, y_a, y_b, lam):
"""MixUp损失函数"""
return lam * criterion(pred, y_a) + (1 - lam) * criterion(pred, y_b)
# 在训练循环中使用
for data, labels in train_loader:
data, labels_a, labels_b, lam = mixup_data(data, labels)
outputs = model(data)
loss = mixup_criterion(criterion, outputs, labels_a, labels_b, lam)
optimizer.zero_grad()
loss.backward()
optimizer.step()
🎲 Sampler详解
Sampler控制数据的采样顺序:
内置Sampler
from torch.utils.data import DataLoader, RandomSampler, SequentialSampler, WeightedRandomSampler
dataset = MyDataset(1000)
# 随机采样
random_sampler = RandomSampler(dataset)
loader = DataLoader(dataset, sampler=random_sampler, batch_size=32)
# 顺序采样
sequential_sampler = SequentialSampler(dataset)
loader = DataLoader(dataset, sampler=sequential_sampler, batch_size=32)
# 加权采样(处理类别不平衡)
# 假设类别0有900个样本,类别1有100个样本
weights = [1.0/900 if label == 0 else 1.0/100 for label in dataset.labels]
sampler = WeightedRandomSampler(weights, num_samples=len(weights), replacement=True)
loader = DataLoader(dataset, sampler=sampler, batch_size=32)
处理类别不平衡
import torch
from torch.utils.data import DataLoader, WeightedRandomSampler
from collections import Counter
def make_balanced_sampler(labels):
"""创建平衡采样器"""
class_counts = Counter(labels)
class_weights = {cls: 1.0/count for cls, count in class_counts.items()}
sample_weights = [class_weights[label] for label in labels]
sampler = WeightedRandomSampler(
weights=sample_weights,
num_samples=len(labels),
replacement=True
)
return sampler
# 使用
labels = [0, 0, 0, 0, 0, 1, 1, 1] # 不平衡
sampler = make_balanced_sampler(labels)
loader = DataLoader(dataset, sampler=sampler, batch_size=4)
自定义Sampler
from torch.utils.data import Sampler
import random
class BalancedBatchSampler(Sampler):
"""每个batch包含相同数量的各类别样本"""
def __init__(self, labels, batch_size, samples_per_class):
self.labels = labels
self.batch_size = batch_size
self.samples_per_class = samples_per_class
# 按类别分组索引
self.label_to_indices = {}
for idx, label in enumerate(labels):
if label not in self.label_to_indices:
self.label_to_indices[label] = []
self.label_to_indices[label].append(idx)
def __iter__(self):
# 生成平衡的batch
batches = []
for _ in range(len(self)):
batch = []
for label in self.label_to_indices:
indices = self.label_to_indices[label]
batch.extend(random.sample(indices, min(self.samples_per_class, len(indices))))
random.shuffle(batch)
batches.append(batch[:self.batch_size])
for batch in batches:
yield batch
def __len__(self):
return len(self.labels) // self.batch_size
🧩 自定义collate_fn
collate_fn控制如何将多个样本组合成一个batch:
处理变长序列
from torch.nn.utils.rnn import pad_sequence
def pad_collate_fn(batch):
"""将变长序列padding到相同长度"""
sequences, labels = zip(*batch)
# 获取每个序列的长度
lengths = [len(seq) for seq in sequences]
# Padding
padded_sequences = pad_sequence(sequences, batch_first=True, padding_value=0)
labels = torch.tensor(labels)
lengths = torch.tensor(lengths)
return padded_sequences, labels, lengths
# 使用
loader = DataLoader(dataset, batch_size=32, collate_fn=pad_collate_fn)
处理不同大小的图片
def resize_collate_fn(batch, target_size=(224, 224)):
"""将不同大小的图片resize到相同尺寸"""
from torchvision.transforms.functional import resize
images, labels = zip(*batch)
resized_images = [resize(img, target_size) for img in images]
images = torch.stack(resized_images)
labels = torch.tensor(labels)
return images, labels
处理字典数据
def dict_collate_fn(batch):
"""处理字典格式的样本"""
# batch是一个包含多个字典的列表
collated = {}
for key in batch[0].keys():
if isinstance(batch[0][key], torch.Tensor):
collated[key] = torch.stack([sample[key] for sample in batch])
else:
collated[key] = [sample[key] for sample in batch]
return collated
# 使用
class DictDataset(Dataset):
def __getitem__(self, idx):
return {
'image': torch.randn(3, 64, 64),
'label': torch.tensor(0),
'filename': f'image_{idx}.jpg'
}
loader = DataLoader(DictDataset(), batch_size=4, collate_fn=dict_collate_fn)
🔄 多进程数据加载
worker_init_fn
import numpy as np
def worker_init_fn(worker_id):
"""为每个worker设置不同的随机种子"""
worker_seed = torch.initial_seed() % 2**32
np.random.seed(worker_seed)
random.seed(worker_seed)
loader = DataLoader(
dataset,
batch_size=32,
num_workers=4,
worker_init_fn=worker_init_fn
)
持久化workers
# persistent_workers=True: workers在epoch之间保持活跃
# 避免每个epoch重新创建workers的开销
loader = DataLoader(
dataset,
batch_size=32,
num_workers=4,
persistent_workers=True # PyTorch 1.7+
)
prefetch_factor
# prefetch_factor: 每个worker预取的batch数量
loader = DataLoader(
dataset,
batch_size=32,
num_workers=4,
prefetch_factor=2 # 默认值是2
)
📈 IterableDataset(流式数据)
对于超大数据集或实时数据流:
from torch.utils.data import IterableDataset, DataLoader
class StreamDataset(IterableDataset):
"""流式读取大文件"""
def __init__(self, file_path):
self.file_path = file_path
def __iter__(self):
with open(self.file_path, 'r') as f:
for line in f:
# 逐行处理,不需要全部加载到内存
data = self.process_line(line)
yield data
def process_line(self, line):
# 处理一行数据
values = [float(x) for x in line.strip().split(',')]
return torch.tensor(values[:-1]), torch.tensor(values[-1])
# 对于多worker,需要处理数据分割
class ShardedStreamDataset(IterableDataset):
def __init__(self, file_paths):
self.file_paths = file_paths
def __iter__(self):
worker_info = torch.utils.data.get_worker_info()
if worker_info is None:
# 单进程模式
files = self.file_paths
else:
# 多进程模式:分配不同文件给不同worker
per_worker = len(self.file_paths) // worker_info.num_workers
worker_id = worker_info.id
start = worker_id * per_worker
end = start + per_worker
files = self.file_paths[start:end]
for file_path in files:
for data in self.read_file(file_path):
yield data
🏋️ 练习
# 练习:创建一个自定义数据集,加载并使用DataLoader
import torch
from torch.utils.data import Dataset, DataLoader
# 1. 创建一个数据集类,包含100个样本
# - 每个样本是一个5维特征向量
# - 标签是特征和大于0时为1,否则为0
# 你的代码:
# 2. 创建DataLoader,batch_size=16,shuffle=True
# 你的代码:
# 3. 遍历所有批次,打印每批的形状
# 你的代码:
# 4. 创建一个WeightedRandomSampler处理不平衡数据
# 你的代码:
# 5. 实现一个自定义collate_fn,在每个样本后添加一个分隔符
# 你的代码:
点击查看答案
import torch
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler
from collections import Counter
# 练习1
class MyDataset(Dataset):
def __init__(self, num_samples=100):
self.features = torch.randn(num_samples, 5)
self.labels = (self.features.sum(dim=1) > 0).long()
def __len__(self):
return len(self.features)
def __getitem__(self, idx):
return self.features[idx], self.labels[idx]
dataset = MyDataset(100)
# 练习2
dataloader = DataLoader(dataset, batch_size=16, shuffle=True)
# 练习3
for batch_idx, (features, labels) in enumerate(dataloader):
print(f"批次 {batch_idx}: 特征 {features.shape}, 标签 {labels.shape}")
# 练习4
labels_list = dataset.labels.tolist()
class_counts = Counter(labels_list)
weights = [1.0/class_counts[label] for label in labels_list]
sampler = WeightedRandomSampler(weights, num_samples=len(weights), replacement=True)
balanced_loader = DataLoader(dataset, sampler=sampler, batch_size=16)
# 练习5
def custom_collate(batch):
features, labels = zip(*batch)
features = torch.stack(features)
labels = torch.tensor(labels)
# 添加batch信息
return {
'features': features,
'labels': labels,
'batch_size': len(batch)
}
custom_loader = DataLoader(dataset, batch_size=16, collate_fn=custom_collate)
for batch in custom_loader:
print(batch.keys())
break