PyTorch学习指南
首页
基础篇
进阶篇
高级篇
实战项目
🚀 编程指南
首页
基础篇
进阶篇
高级篇
实战项目
🚀 编程指南
  • 🚀 基础篇

    • 🚀 基础篇概述
    • ⚡ 快速入门:60分钟上手PyTorch
    • 📦 安装配置
    • 🔢 张量基础
    • ⚡ 自动求导
    • 🧩 torch.nn 快速入门
    • 📂 数据集处理

📂 数据集处理

在深度学习中,我们需要高效地加载和处理数据。PyTorch提供了Dataset和DataLoader两个核心类。

🤔 为什么需要Dataset和DataLoader?

想象一下这些问题:

  • 数据太多,内存放不下怎么办?
  • 如何把数据分成小批次(batch)?
  • 如何打乱数据顺序?
  • 如何并行加载数据加快速度?

Dataset和DataLoader就是来解决这些问题的!

Dataset:定义数据在哪里、怎么读取
     ↓
DataLoader:分批、打乱、并行加载
     ↓
模型训练

📦 使用内置数据集

PyTorch提供了很多常用数据集,可以直接使用:

import torch
from torchvision import datasets, transforms

# 定义数据预处理
transform = transforms.Compose([
    transforms.ToTensor(),  # 转为张量
    transforms.Normalize((0.5,), (0.5,))  # 标准化
])

# 下载并加载MNIST数据集
train_dataset = datasets.MNIST(
    root='./data',          # 存储路径
    train=True,             # 训练集
    download=True,          # 自动下载
    transform=transform     # 应用预处理
)

test_dataset = datasets.MNIST(
    root='./data',
    train=False,            # 测试集
    download=True,
    transform=transform
)

print(f"训练集大小: {len(train_dataset)}")  # 60000
print(f"测试集大小: {len(test_dataset)}")   # 10000

# 查看一个样本
image, label = train_dataset[0]
print(f"图片形状: {image.shape}")  # torch.Size([1, 28, 28])
print(f"标签: {label}")            # 数字标签(0-9)

常用内置数据集

from torchvision import datasets

# 图像分类
datasets.MNIST       # 手写数字
datasets.CIFAR10     # 10类彩色图片
datasets.CIFAR100    # 100类彩色图片
datasets.ImageNet    # 大规模图像分类

# 文本数据(使用torchtext)
# from torchtext import datasets
# datasets.IMDB      # 电影评论情感分析

🛠️ 创建自定义Dataset

大多数情况下,你需要加载自己的数据。继承Dataset类即可:

import torch
from torch.utils.data import Dataset

class MyDataset(Dataset):
    def __init__(self, data, labels):
        """
        初始化数据集
        """
        self.data = data
        self.labels = labels
    
    def __len__(self):
        """
        返回数据集大小
        """
        return len(self.data)
    
    def __getitem__(self, idx):
        """
        获取单个样本
        """
        return self.data[idx], self.labels[idx]

# 使用示例
X = torch.randn(100, 10)  # 100个样本,每个10维特征
y = torch.randint(0, 2, (100,))  # 100个二分类标签

dataset = MyDataset(X, y)
print(f"数据集大小: {len(dataset)}")

# 获取一个样本
sample_x, sample_y = dataset[0]
print(f"样本特征: {sample_x.shape}")
print(f"样本标签: {sample_y}")

实际例子:从CSV加载数据

import torch
from torch.utils.data import Dataset
import pandas as pd

class CSVDataset(Dataset):
    def __init__(self, csv_file, feature_cols, label_col):
        """
        从CSV文件加载数据
        
        参数:
            csv_file: CSV文件路径
            feature_cols: 特征列名列表
            label_col: 标签列名
        """
        self.df = pd.read_csv(csv_file)
        self.features = torch.tensor(
            self.df[feature_cols].values, 
            dtype=torch.float32
        )
        self.labels = torch.tensor(
            self.df[label_col].values, 
            dtype=torch.long
        )
    
    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, idx):
        return self.features[idx], self.labels[idx]

# 使用示例(假设有一个data.csv文件)
# dataset = CSVDataset('data.csv', ['feature1', 'feature2'], 'label')

实际例子:图像文件夹数据集

import torch
from torch.utils.data import Dataset
from PIL import Image
import os

class ImageFolderDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        """
        从文件夹加载图片
        
        文件夹结构:
        root_dir/
            class1/
                img1.jpg
                img2.jpg
            class2/
                img3.jpg
                img4.jpg
        """
        self.root_dir = root_dir
        self.transform = transform
        self.samples = []
        self.class_to_idx = {}
        
        # 遍历所有类别
        for idx, class_name in enumerate(sorted(os.listdir(root_dir))):
            class_dir = os.path.join(root_dir, class_name)
            if os.path.isdir(class_dir):
                self.class_to_idx[class_name] = idx
                for img_name in os.listdir(class_dir):
                    img_path = os.path.join(class_dir, img_name)
                    self.samples.append((img_path, idx))
    
    def __len__(self):
        return len(self.samples)
    
    def __getitem__(self, idx):
        img_path, label = self.samples[idx]
        image = Image.open(img_path).convert('RGB')
        
        if self.transform:
            image = self.transform(image)
        
        return image, label

💡 小技巧

PyTorch已经提供了ImageFolder类,功能和上面的类似:

from torchvision.datasets import ImageFolder
dataset = ImageFolder(root='path/to/data', transform=transform)

🔄 DataLoader详解

DataLoader负责从Dataset中加载数据并组织成批次:

from torch.utils.data import DataLoader

# 创建DataLoader
train_loader = DataLoader(
    dataset=train_dataset,  # 数据集
    batch_size=32,          # 每批32个样本
    shuffle=True,           # 打乱顺序
    num_workers=4,          # 4个进程并行加载
    drop_last=True          # 丢弃最后不足一批的数据
)

# 迭代数据
for batch_idx, (data, labels) in enumerate(train_loader):
    print(f"批次 {batch_idx}: 数据形状 {data.shape}, 标签形状 {labels.shape}")
    
    # 这里进行训练...
    # output = model(data)
    # loss = criterion(output, labels)
    
    if batch_idx >= 2:  # 只看前3批
        break

DataLoader常用参数

参数说明默认值
batch_size每批样本数1
shuffle是否打乱False
num_workers并行加载进程数0
drop_last丢弃最后不完整批次False
pin_memory固定内存(GPU训练时用)False
collate_fn自定义批次整理函数默认
# 完整示例
train_loader = DataLoader(
    dataset=train_dataset,
    batch_size=64,
    shuffle=True,
    num_workers=4,
    pin_memory=True,  # GPU训练时设为True
    drop_last=True
)

# 测试集不需要打乱
test_loader = DataLoader(
    dataset=test_dataset,
    batch_size=64,
    shuffle=False,  # 测试时不打乱
    num_workers=4
)

⚠️ Windows用户注意

在Windows上使用num_workers > 0时,需要把代码放在if __name__ == '__main__':中:

if __name__ == '__main__':
    loader = DataLoader(dataset, num_workers=4)
    for data, label in loader:
        pass

🔧 数据预处理 transforms

torchvision.transforms提供了丰富的数据预处理功能:

from torchvision import transforms

# 组合多个变换
transform = transforms.Compose([
    # 图像变换
    transforms.Resize((224, 224)),      # 调整大小
    transforms.RandomCrop(200),          # 随机裁剪
    transforms.RandomHorizontalFlip(),   # 随机水平翻转
    transforms.RandomRotation(10),       # 随机旋转±10度
    
    # 转换为张量(必需)
    transforms.ToTensor(),               # [0,255] → [0,1]
    
    # 标准化
    transforms.Normalize(
        mean=[0.485, 0.456, 0.406],      # ImageNet均值
        std=[0.229, 0.224, 0.225]        # ImageNet标准差
    )
])

常用transforms

变换说明
ToTensor()PIL/numpy转张量,值缩放到[0,1]
Normalize(mean, std)标准化
Resize(size)调整大小
CenterCrop(size)中心裁剪
RandomCrop(size)随机裁剪
RandomHorizontalFlip()随机水平翻转
RandomVerticalFlip()随机垂直翻转
RandomRotation(degrees)随机旋转
ColorJitter()颜色抖动

训练和测试使用不同的变换

# 训练时:使用数据增强
train_transform = transforms.Compose([
    transforms.Resize(256),
    transforms.RandomCrop(224),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

# 测试时:不使用随机变换
test_transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),  # 中心裁剪而非随机
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

📊 数据集划分

from torch.utils.data import random_split

# 假设有1000个样本
full_dataset = MyDataset(data, labels)

# 划分训练集和验证集
train_size = int(0.8 * len(full_dataset))  # 80%训练
val_size = len(full_dataset) - train_size   # 20%验证

train_dataset, val_dataset = random_split(
    full_dataset, 
    [train_size, val_size],
    generator=torch.Generator().manual_seed(42)  # 固定随机种子
)

print(f"训练集: {len(train_dataset)}")
print(f"验证集: {len(val_dataset)}")

🎯 完整示例

import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms

# 1. 定义数据集
class SimpleDataset(Dataset):
    def __init__(self, num_samples=1000):
        # 模拟一些数据
        self.X = torch.randn(num_samples, 10)
        self.y = (self.X.sum(dim=1) > 0).long()
    
    def __len__(self):
        return len(self.X)
    
    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

# 2. 创建数据集
dataset = SimpleDataset(1000)

# 3. 划分训练/测试集
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = torch.utils.data.random_split(
    dataset, [train_size, test_size]
)

# 4. 创建DataLoader
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# 5. 使用数据
print("训练一个epoch:")
for batch_idx, (data, labels) in enumerate(train_loader):
    print(f"  批次 {batch_idx+1}: 输入 {data.shape}, 标签 {labels.shape}")
    
    # 这里进行模型训练...
    # output = model(data)
    # loss = criterion(output, labels)
    # loss.backward()
    # optimizer.step()

print(f"\n总批次数: {len(train_loader)}")

🎨 高级数据增强

v2 transforms(推荐,PyTorch 2.0+)

from torchvision.transforms import v2

# 新版transforms更快、更灵活
transform = v2.Compose([
    v2.RandomResizedCrop(224, scale=(0.8, 1.0)),
    v2.RandomHorizontalFlip(p=0.5),
    v2.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
    v2.RandomRotation(degrees=15),
    v2.ToTensor(),
    v2.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

自定义transform

import torch
from torchvision import transforms

class AddGaussianNoise:
    """添加高斯噪声"""
    def __init__(self, mean=0., std=0.1):
        self.mean = mean
        self.std = std
    
    def __call__(self, tensor):
        return tensor + torch.randn_like(tensor) * self.std + self.mean

class RandomErasing:
    """随机遮挡"""
    def __init__(self, p=0.5, scale=(0.02, 0.33)):
        self.p = p
        self.scale = scale
    
    def __call__(self, tensor):
        if torch.rand(1) > self.p:
            return tensor
        
        h, w = tensor.shape[-2:]
        area = h * w
        erase_area = torch.empty(1).uniform_(*self.scale).item() * area
        aspect_ratio = torch.empty(1).uniform_(0.3, 3.3).item()
        
        h_erase = int((erase_area * aspect_ratio) ** 0.5)
        w_erase = int((erase_area / aspect_ratio) ** 0.5)
        
        if h_erase < h and w_erase < w:
            top = torch.randint(0, h - h_erase, (1,)).item()
            left = torch.randint(0, w - w_erase, (1,)).item()
            tensor[..., top:top+h_erase, left:left+w_erase] = 0
        
        return tensor

# 使用自定义transform
transform = transforms.Compose([
    transforms.ToTensor(),
    AddGaussianNoise(std=0.05),
    RandomErasing(p=0.3)
])

MixUp和CutMix数据增强

import torch
import torch.nn.functional as F

def mixup_data(x, y, alpha=0.2):
    """MixUp: 混合两个样本"""
    if alpha > 0:
        lam = torch.distributions.Beta(alpha, alpha).sample()
    else:
        lam = 1
    
    batch_size = x.size(0)
    index = torch.randperm(batch_size)
    
    mixed_x = lam * x + (1 - lam) * x[index]
    y_a, y_b = y, y[index]
    
    return mixed_x, y_a, y_b, lam

def mixup_criterion(criterion, pred, y_a, y_b, lam):
    """MixUp损失函数"""
    return lam * criterion(pred, y_a) + (1 - lam) * criterion(pred, y_b)

# 在训练循环中使用
for data, labels in train_loader:
    data, labels_a, labels_b, lam = mixup_data(data, labels)
    
    outputs = model(data)
    loss = mixup_criterion(criterion, outputs, labels_a, labels_b, lam)
    
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

🎲 Sampler详解

Sampler控制数据的采样顺序:

内置Sampler

from torch.utils.data import DataLoader, RandomSampler, SequentialSampler, WeightedRandomSampler

dataset = MyDataset(1000)

# 随机采样
random_sampler = RandomSampler(dataset)
loader = DataLoader(dataset, sampler=random_sampler, batch_size=32)

# 顺序采样
sequential_sampler = SequentialSampler(dataset)
loader = DataLoader(dataset, sampler=sequential_sampler, batch_size=32)

# 加权采样(处理类别不平衡)
# 假设类别0有900个样本,类别1有100个样本
weights = [1.0/900 if label == 0 else 1.0/100 for label in dataset.labels]
sampler = WeightedRandomSampler(weights, num_samples=len(weights), replacement=True)
loader = DataLoader(dataset, sampler=sampler, batch_size=32)

处理类别不平衡

import torch
from torch.utils.data import DataLoader, WeightedRandomSampler
from collections import Counter

def make_balanced_sampler(labels):
    """创建平衡采样器"""
    class_counts = Counter(labels)
    class_weights = {cls: 1.0/count for cls, count in class_counts.items()}
    sample_weights = [class_weights[label] for label in labels]
    
    sampler = WeightedRandomSampler(
        weights=sample_weights,
        num_samples=len(labels),
        replacement=True
    )
    return sampler

# 使用
labels = [0, 0, 0, 0, 0, 1, 1, 1]  # 不平衡
sampler = make_balanced_sampler(labels)
loader = DataLoader(dataset, sampler=sampler, batch_size=4)

自定义Sampler

from torch.utils.data import Sampler
import random

class BalancedBatchSampler(Sampler):
    """每个batch包含相同数量的各类别样本"""
    def __init__(self, labels, batch_size, samples_per_class):
        self.labels = labels
        self.batch_size = batch_size
        self.samples_per_class = samples_per_class
        
        # 按类别分组索引
        self.label_to_indices = {}
        for idx, label in enumerate(labels):
            if label not in self.label_to_indices:
                self.label_to_indices[label] = []
            self.label_to_indices[label].append(idx)
    
    def __iter__(self):
        # 生成平衡的batch
        batches = []
        for _ in range(len(self)):
            batch = []
            for label in self.label_to_indices:
                indices = self.label_to_indices[label]
                batch.extend(random.sample(indices, min(self.samples_per_class, len(indices))))
            random.shuffle(batch)
            batches.append(batch[:self.batch_size])
        
        for batch in batches:
            yield batch
    
    def __len__(self):
        return len(self.labels) // self.batch_size

🧩 自定义collate_fn

collate_fn控制如何将多个样本组合成一个batch:

处理变长序列

from torch.nn.utils.rnn import pad_sequence

def pad_collate_fn(batch):
    """将变长序列padding到相同长度"""
    sequences, labels = zip(*batch)
    
    # 获取每个序列的长度
    lengths = [len(seq) for seq in sequences]
    
    # Padding
    padded_sequences = pad_sequence(sequences, batch_first=True, padding_value=0)
    
    labels = torch.tensor(labels)
    lengths = torch.tensor(lengths)
    
    return padded_sequences, labels, lengths

# 使用
loader = DataLoader(dataset, batch_size=32, collate_fn=pad_collate_fn)

处理不同大小的图片

def resize_collate_fn(batch, target_size=(224, 224)):
    """将不同大小的图片resize到相同尺寸"""
    from torchvision.transforms.functional import resize
    
    images, labels = zip(*batch)
    
    resized_images = [resize(img, target_size) for img in images]
    images = torch.stack(resized_images)
    labels = torch.tensor(labels)
    
    return images, labels

处理字典数据

def dict_collate_fn(batch):
    """处理字典格式的样本"""
    # batch是一个包含多个字典的列表
    collated = {}
    
    for key in batch[0].keys():
        if isinstance(batch[0][key], torch.Tensor):
            collated[key] = torch.stack([sample[key] for sample in batch])
        else:
            collated[key] = [sample[key] for sample in batch]
    
    return collated

# 使用
class DictDataset(Dataset):
    def __getitem__(self, idx):
        return {
            'image': torch.randn(3, 64, 64),
            'label': torch.tensor(0),
            'filename': f'image_{idx}.jpg'
        }

loader = DataLoader(DictDataset(), batch_size=4, collate_fn=dict_collate_fn)

🔄 多进程数据加载

worker_init_fn

import numpy as np

def worker_init_fn(worker_id):
    """为每个worker设置不同的随机种子"""
    worker_seed = torch.initial_seed() % 2**32
    np.random.seed(worker_seed)
    random.seed(worker_seed)

loader = DataLoader(
    dataset,
    batch_size=32,
    num_workers=4,
    worker_init_fn=worker_init_fn
)

持久化workers

# persistent_workers=True: workers在epoch之间保持活跃
# 避免每个epoch重新创建workers的开销
loader = DataLoader(
    dataset,
    batch_size=32,
    num_workers=4,
    persistent_workers=True  # PyTorch 1.7+
)

prefetch_factor

# prefetch_factor: 每个worker预取的batch数量
loader = DataLoader(
    dataset,
    batch_size=32,
    num_workers=4,
    prefetch_factor=2  # 默认值是2
)

📈 IterableDataset(流式数据)

对于超大数据集或实时数据流:

from torch.utils.data import IterableDataset, DataLoader

class StreamDataset(IterableDataset):
    """流式读取大文件"""
    def __init__(self, file_path):
        self.file_path = file_path
    
    def __iter__(self):
        with open(self.file_path, 'r') as f:
            for line in f:
                # 逐行处理,不需要全部加载到内存
                data = self.process_line(line)
                yield data
    
    def process_line(self, line):
        # 处理一行数据
        values = [float(x) for x in line.strip().split(',')]
        return torch.tensor(values[:-1]), torch.tensor(values[-1])

# 对于多worker,需要处理数据分割
class ShardedStreamDataset(IterableDataset):
    def __init__(self, file_paths):
        self.file_paths = file_paths
    
    def __iter__(self):
        worker_info = torch.utils.data.get_worker_info()
        
        if worker_info is None:
            # 单进程模式
            files = self.file_paths
        else:
            # 多进程模式:分配不同文件给不同worker
            per_worker = len(self.file_paths) // worker_info.num_workers
            worker_id = worker_info.id
            start = worker_id * per_worker
            end = start + per_worker
            files = self.file_paths[start:end]
        
        for file_path in files:
            for data in self.read_file(file_path):
                yield data

🏋️ 练习

# 练习:创建一个自定义数据集,加载并使用DataLoader

import torch
from torch.utils.data import Dataset, DataLoader

# 1. 创建一个数据集类,包含100个样本
#    - 每个样本是一个5维特征向量
#    - 标签是特征和大于0时为1,否则为0
# 你的代码:


# 2. 创建DataLoader,batch_size=16,shuffle=True
# 你的代码:


# 3. 遍历所有批次,打印每批的形状
# 你的代码:


# 4. 创建一个WeightedRandomSampler处理不平衡数据
# 你的代码:


# 5. 实现一个自定义collate_fn,在每个样本后添加一个分隔符
# 你的代码:

点击查看答案
import torch
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler
from collections import Counter

# 练习1
class MyDataset(Dataset):
    def __init__(self, num_samples=100):
        self.features = torch.randn(num_samples, 5)
        self.labels = (self.features.sum(dim=1) > 0).long()
    
    def __len__(self):
        return len(self.features)
    
    def __getitem__(self, idx):
        return self.features[idx], self.labels[idx]

dataset = MyDataset(100)

# 练习2
dataloader = DataLoader(dataset, batch_size=16, shuffle=True)

# 练习3
for batch_idx, (features, labels) in enumerate(dataloader):
    print(f"批次 {batch_idx}: 特征 {features.shape}, 标签 {labels.shape}")

# 练习4
labels_list = dataset.labels.tolist()
class_counts = Counter(labels_list)
weights = [1.0/class_counts[label] for label in labels_list]
sampler = WeightedRandomSampler(weights, num_samples=len(weights), replacement=True)
balanced_loader = DataLoader(dataset, sampler=sampler, batch_size=16)

# 练习5
def custom_collate(batch):
    features, labels = zip(*batch)
    features = torch.stack(features)
    labels = torch.tensor(labels)
    # 添加batch信息
    return {
        'features': features,
        'labels': labels,
        'batch_size': len(batch)
    }

custom_loader = DataLoader(dataset, batch_size=16, collate_fn=custom_collate)
for batch in custom_loader:
    print(batch.keys())
    break

🎉 基础篇完成!

恭喜你完成了基础篇的学习!现在你已经掌握了:

  • PyTorch的安装和配置
  • 张量的创建和操作
  • 自动求导机制
  • 数据集的加载和处理

接下来,让我们进入进阶篇,学习如何构建和训练神经网络!

上次更新: 2025/11/25 18:38
Prev
🧩 torch.nn 快速入门