🎓 模型训练
这是最重要的一节!我们将学习完整的模型训练流程。
📋 训练流程总览
# 1. 准备数据
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
# 2. 定义模型
model = MyModel()
# 3. 定义损失函数
criterion = nn.CrossEntropyLoss()
# 4. 定义优化器
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
# 5. 训练循环
for epoch in range(num_epochs):
for data, labels in train_loader:
# 前向传播
outputs = model(data)
loss = criterion(outputs, labels)
# 反向传播
optimizer.zero_grad() # 清零梯度
loss.backward() # 计算梯度
optimizer.step() # 更新参数
📉 损失函数
损失函数衡量预测值和真实值之间的差距。
分类任务
import torch
import torch.nn as nn
# 多分类:CrossEntropyLoss
# 内部自动包含Softmax,所以模型输出不需要加Softmax
criterion = nn.CrossEntropyLoss()
outputs = torch.randn(5, 10) # 5个样本,10个类别的logits
labels = torch.tensor([0, 3, 2, 1, 5]) # 真实标签
loss = criterion(outputs, labels)
print(f"交叉熵损失: {loss.item():.4f}")
# 二分类:BCEWithLogitsLoss
# 模型输出不需要加Sigmoid
criterion = nn.BCEWithLogitsLoss()
outputs = torch.randn(5, 1) # 5个样本的logits
labels = torch.tensor([[0.], [1.], [1.], [0.], [1.]]) # 二分类标签
loss = criterion(outputs, labels)
print(f"二分类损失: {loss.item():.4f}")
回归任务
import torch
import torch.nn as nn
# 均方误差 (MSE)
criterion = nn.MSELoss()
predictions = torch.tensor([2.5, 0.0, 2.1])
targets = torch.tensor([3.0, -0.5, 2.0])
loss = criterion(predictions, targets)
print(f"MSE损失: {loss.item():.4f}")
# 平均绝对误差 (MAE / L1)
criterion = nn.L1Loss()
loss = criterion(predictions, targets)
print(f"MAE损失: {loss.item():.4f}")
损失函数选择指南
| 任务类型 | 推荐损失函数 |
|---|---|
| 多分类 | nn.CrossEntropyLoss() |
| 二分类 | nn.BCEWithLogitsLoss() |
| 回归 | nn.MSELoss() 或 nn.L1Loss() |
| 多标签分类 | nn.BCEWithLogitsLoss() |
⚙️ 优化器
优化器决定如何根据梯度更新参数。
常用优化器
import torch.optim as optim
# SGD: 随机梯度下降
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)
# Adam: 最常用,自适应学习率
optimizer = optim.Adam(model.parameters(), lr=0.001)
# AdamW: Adam + 权重衰减(推荐)
optimizer = optim.AdamW(model.parameters(), lr=0.001, weight_decay=0.01)
# RMSprop: 适合RNN
optimizer = optim.RMSprop(model.parameters(), lr=0.001)
优化器选择建议
| 场景 | 推荐优化器 |
|---|---|
| 首选/默认 | Adam 或 AdamW |
| 大规模训练 | SGD + momentum |
| RNN/LSTM | Adam 或 RMSprop |
学习率调度器
学习率不是固定不变的,可以动态调整:
import torch.optim as optim
from torch.optim.lr_scheduler import StepLR, ReduceLROnPlateau
optimizer = optim.Adam(model.parameters(), lr=0.001)
# 每10个epoch,学习率乘以0.1
scheduler = StepLR(optimizer, step_size=10, gamma=0.1)
# 或者:当验证损失不再下降时,降低学习率
scheduler = ReduceLROnPlateau(optimizer, mode='min', patience=5, factor=0.5)
# 在训练循环中使用
for epoch in range(100):
train_one_epoch()
val_loss = validate()
scheduler.step() # StepLR
# 或
scheduler.step(val_loss) # ReduceLROnPlateau
🔄 完整训练循环
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
# ==================== 1. 准备数据 ====================
# 模拟数据
X = torch.randn(1000, 20) # 1000个样本,20个特征
y = (X.sum(dim=1) > 0).long() # 二分类标签
dataset = TensorDataset(X, y)
train_loader = DataLoader(dataset, batch_size=32, shuffle=True)
# ==================== 2. 定义模型 ====================
class SimpleClassifier(nn.Module):
def __init__(self):
super().__init__()
self.net = nn.Sequential(
nn.Linear(20, 64),
nn.ReLU(),
nn.Linear(64, 32),
nn.ReLU(),
nn.Linear(32, 2) # 2类
)
def forward(self, x):
return self.net(x)
model = SimpleClassifier()
# ==================== 3. 损失函数和优化器 ====================
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
# ==================== 4. 训练循环 ====================
num_epochs = 20
for epoch in range(num_epochs):
model.train() # 设置为训练模式
total_loss = 0
correct = 0
total = 0
for batch_idx, (data, labels) in enumerate(train_loader):
# 前向传播
outputs = model(data)
loss = criterion(outputs, labels)
# 反向传播
optimizer.zero_grad() # 清零梯度(重要!)
loss.backward() # 计算梯度
optimizer.step() # 更新参数
# 统计
total_loss += loss.item()
_, predicted = outputs.max(1)
correct += predicted.eq(labels).sum().item()
total += labels.size(0)
# 打印进度
avg_loss = total_loss / len(train_loader)
accuracy = 100. * correct / total
print(f"Epoch [{epoch+1}/{num_epochs}] Loss: {avg_loss:.4f} Acc: {accuracy:.2f}%")
print("训练完成!")
📊 训练和验证
实际训练时,需要分别处理训练集和验证集:
def train_epoch(model, train_loader, criterion, optimizer, device):
"""训练一个epoch"""
model.train() # 训练模式(启用Dropout、BatchNorm更新等)
total_loss = 0
correct = 0
total = 0
for data, labels in train_loader:
data, labels = data.to(device), labels.to(device)
# 前向传播
outputs = model(data)
loss = criterion(outputs, labels)
# 反向传播
optimizer.zero_grad()
loss.backward()
optimizer.step()
# 统计
total_loss += loss.item()
_, predicted = outputs.max(1)
correct += predicted.eq(labels).sum().item()
total += labels.size(0)
return total_loss / len(train_loader), 100. * correct / total
def validate(model, val_loader, criterion, device):
"""验证模型"""
model.eval() # 评估模式(禁用Dropout、BatchNorm固定等)
total_loss = 0
correct = 0
total = 0
with torch.no_grad(): # 不计算梯度(节省内存和计算)
for data, labels in val_loader:
data, labels = data.to(device), labels.to(device)
outputs = model(data)
loss = criterion(outputs, labels)
total_loss += loss.item()
_, predicted = outputs.max(1)
correct += predicted.eq(labels).sum().item()
total += labels.size(0)
return total_loss / len(val_loader), 100. * correct / total
# 完整训练流程
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)
best_val_acc = 0
for epoch in range(num_epochs):
# 训练
train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, device)
# 验证
val_loss, val_acc = validate(model, val_loader, criterion, device)
print(f"Epoch {epoch+1}: "
f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}% | "
f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2f}%")
# 保存最佳模型
if val_acc > best_val_acc:
best_val_acc = val_acc
torch.save(model.state_dict(), 'best_model.pth')
print(f" → 保存最佳模型 (Val Acc: {val_acc:.2f}%)")
⚠️ 常见错误和注意事项
1. 忘记清零梯度
# ❌ 错误:梯度会累加
for data, labels in train_loader:
outputs = model(data)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
# ✅ 正确:每次迭代前清零梯度
for data, labels in train_loader:
optimizer.zero_grad() # 在这里清零!
outputs = model(data)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
2. 忘记设置训练/评估模式
# ❌ 错误:验证时Dropout仍然生效
val_loss = validate(model, val_loader)
# ✅ 正确:明确设置模式
model.train() # 训练时
model.eval() # 验证/测试时
3. 验证时计算梯度
# ❌ 错误:浪费内存和计算
def validate(model, val_loader):
for data, labels in val_loader:
outputs = model(data) # 会追踪梯度
# ✅ 正确:禁用梯度计算
def validate(model, val_loader):
with torch.no_grad(): # 加上这行
for data, labels in val_loader:
outputs = model(data)
4. 数据和模型不在同一设备
# ❌ 错误:数据在CPU,模型在GPU
model = model.to('cuda')
outputs = model(data) # data在CPU上,报错!
# ✅ 正确:确保在同一设备
model = model.to(device)
data = data.to(device)
labels = labels.to(device)
outputs = model(data)
🎯 实战:MNIST手写数字识别
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
# 设备
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"使用设备: {device}")
# 数据预处理
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
])
# 加载数据
train_dataset = datasets.MNIST('./data', train=True, download=True, transform=transform)
test_dataset = datasets.MNIST('./data', train=False, transform=transform)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)
# 模型
class MNISTNet(nn.Module):
def __init__(self):
super().__init__()
self.flatten = nn.Flatten()
self.fc1 = nn.Linear(784, 512)
self.fc2 = nn.Linear(512, 256)
self.fc3 = nn.Linear(256, 10)
self.dropout = nn.Dropout(0.3)
def forward(self, x):
x = self.flatten(x)
x = torch.relu(self.fc1(x))
x = self.dropout(x)
x = torch.relu(self.fc2(x))
x = self.dropout(x)
x = self.fc3(x)
return x
model = MNISTNet().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
# 训练
num_epochs = 10
for epoch in range(num_epochs):
model.train()
for data, labels in train_loader:
data, labels = data.to(device), labels.to(device)
optimizer.zero_grad()
outputs = model(data)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
# 测试
model.eval()
correct = 0
total = 0
with torch.no_grad():
for data, labels in test_loader:
data, labels = data.to(device), labels.to(device)
outputs = model(data)
_, predicted = outputs.max(1)
correct += predicted.eq(labels).sum().item()
total += labels.size(0)
print(f"Epoch {epoch+1}/{num_epochs}, 测试准确率: {100.*correct/total:.2f}%")
print("训练完成!")
🏋️ 练习
# 练习:实现一个完整的训练流程
# 数据:使用TensorDataset创建一个简单的二分类问题
# 模型:3层全连接网络
# 要求:
# 1. 划分训练集和验证集
# 2. 每个epoch打印训练和验证的loss及accuracy
# 3. 保存验证集上表现最好的模型
# 你的代码:
下一步
训练好模型后,让我们学习如何保存和加载模型!