🖼️ 图像分类器
使用迁移学习构建一个强大的图像分类器!
🎯 项目目标
- 使用预训练模型进行迁移学习
- 实现一个能分类多种图片的模型
- 学会数据增强和模型微调
📊 数据集
我们使用 CIFAR-10 数据集:
- 60,000张32×32彩色图片
- 10个类别:飞机、汽车、鸟、猫、鹿、狗、青蛙、马、船、卡车
📝 完整代码
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms, models
import matplotlib.pyplot as plt
import time
# ==================== 配置 ====================
BATCH_SIZE = 32
EPOCHS = 20
LEARNING_RATE = 0.001
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
NUM_CLASSES = 10
# 类别名称
CLASSES = ['飞机', '汽车', '鸟', '猫', '鹿', '狗', '青蛙', '马', '船', '卡车']
print(f"使用设备: {DEVICE}")
# ==================== 数据预处理 ====================
# 训练集:数据增强
train_transform = transforms.Compose([
transforms.Resize(224), # ResNet需要224×224输入
transforms.RandomHorizontalFlip(),
transforms.RandomRotation(15),
transforms.ColorJitter(brightness=0.2, contrast=0.2),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
# 测试集:只做标准化
test_transform = transforms.Compose([
transforms.Resize(224),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
# ==================== 加载数据 ====================
train_dataset = datasets.CIFAR10(
root='./data', train=True, download=True, transform=train_transform
)
test_dataset = datasets.CIFAR10(
root='./data', train=False, download=True, transform=test_transform
)
train_loader = DataLoader(
train_dataset, batch_size=BATCH_SIZE, shuffle=True,
num_workers=4, pin_memory=True
)
test_loader = DataLoader(
test_dataset, batch_size=BATCH_SIZE, shuffle=False,
num_workers=4, pin_memory=True
)
print(f"训练集: {len(train_dataset)} 样本")
print(f"测试集: {len(test_dataset)} 样本")
# ==================== 构建模型 ====================
def create_model(num_classes=10, freeze_backbone=True):
"""
创建迁移学习模型
参数:
num_classes: 类别数
freeze_backbone: 是否冻结预训练层
"""
# 加载预训练的ResNet18
model = models.resnet18(weights='IMAGENET1K_V1')
# 冻结预训练层
if freeze_backbone:
for param in model.parameters():
param.requires_grad = False
# 替换最后的分类层
num_features = model.fc.in_features
model.fc = nn.Sequential(
nn.Dropout(0.5),
nn.Linear(num_features, 256),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(256, num_classes)
)
return model
model = create_model(NUM_CLASSES, freeze_backbone=True)
model = model.to(DEVICE)
# 统计参数
total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"总参数: {total_params:,}")
print(f"可训练参数: {trainable_params:,}")
# ==================== 训练配置 ====================
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.fc.parameters(), lr=LEARNING_RATE)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)
# ==================== 训练函数 ====================
def train_epoch(model, loader, criterion, optimizer, device):
model.train()
running_loss = 0.0
correct = 0
total = 0
for inputs, labels in loader:
inputs, labels = inputs.to(device), labels.to(device)
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
running_loss += loss.item() * inputs.size(0)
_, predicted = outputs.max(1)
total += labels.size(0)
correct += predicted.eq(labels).sum().item()
epoch_loss = running_loss / total
epoch_acc = 100. * correct / total
return epoch_loss, epoch_acc
def evaluate(model, loader, criterion, device):
model.eval()
running_loss = 0.0
correct = 0
total = 0
with torch.no_grad():
for inputs, labels in loader:
inputs, labels = inputs.to(device), labels.to(device)
outputs = model(inputs)
loss = criterion(outputs, labels)
running_loss += loss.item() * inputs.size(0)
_, predicted = outputs.max(1)
total += labels.size(0)
correct += predicted.eq(labels).sum().item()
epoch_loss = running_loss / total
epoch_acc = 100. * correct / total
return epoch_loss, epoch_acc
# ==================== 训练循环 ====================
print("\n" + "="*50)
print("阶段1: 只训练分类层")
print("="*50)
history = {'train_loss': [], 'train_acc': [], 'val_loss': [], 'val_acc': []}
best_acc = 0
start_time = time.time()
for epoch in range(EPOCHS):
train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, DEVICE)
val_loss, val_acc = evaluate(model, test_loader, criterion, DEVICE)
history['train_loss'].append(train_loss)
history['train_acc'].append(train_acc)
history['val_loss'].append(val_loss)
history['val_acc'].append(val_acc)
print(f"Epoch {epoch+1}/{EPOCHS}")
print(f" Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}%")
print(f" Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2f}%")
# 保存最佳模型
if val_acc > best_acc:
best_acc = val_acc
torch.save(model.state_dict(), 'best_classifier.pth')
print(f" → 保存最佳模型 (Acc: {val_acc:.2f}%)")
scheduler.step()
elapsed_time = time.time() - start_time
print(f"\n训练完成! 耗时: {elapsed_time/60:.2f}分钟")
print(f"最佳验证准确率: {best_acc:.2f}%")
# ==================== 阶段2: 微调(可选) ====================
print("\n" + "="*50)
print("阶段2: 微调整个网络")
print("="*50)
# 解冻所有层
for param in model.parameters():
param.requires_grad = True
# 使用更小的学习率
optimizer = optim.Adam(model.parameters(), lr=1e-5)
for epoch in range(5):
train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, DEVICE)
val_loss, val_acc = evaluate(model, test_loader, criterion, DEVICE)
print(f"Epoch {epoch+1}/5")
print(f" Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}%")
print(f" Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2f}%")
if val_acc > best_acc:
best_acc = val_acc
torch.save(model.state_dict(), 'best_classifier.pth')
print(f" → 保存最佳模型 (Acc: {val_acc:.2f}%)")
print(f"\n最终最佳准确率: {best_acc:.2f}%")
# ==================== 可视化结果 ====================
def plot_training_history(history):
fig, axes = plt.subplots(1, 2, figsize=(14, 5))
# 损失
axes[0].plot(history['train_loss'], label='Train', linewidth=2)
axes[0].plot(history['val_loss'], label='Validation', linewidth=2)
axes[0].set_xlabel('Epoch')
axes[0].set_ylabel('Loss')
axes[0].set_title('Training and Validation Loss')
axes[0].legend()
axes[0].grid(True, alpha=0.3)
# 准确率
axes[1].plot(history['train_acc'], label='Train', linewidth=2)
axes[1].plot(history['val_acc'], label='Validation', linewidth=2)
axes[1].set_xlabel('Epoch')
axes[1].set_ylabel('Accuracy (%)')
axes[1].set_title('Training and Validation Accuracy')
axes[1].legend()
axes[1].grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig('training_history.png', dpi=150)
plt.show()
# plot_training_history(history)
🔍 预测与可视化
import torch
import matplotlib.pyplot as plt
from torchvision import transforms, datasets
import numpy as np
def predict_and_visualize(model, dataset, device, num_samples=10):
"""预测并可视化结果"""
model.eval()
fig, axes = plt.subplots(2, 5, figsize=(15, 6))
axes = axes.flatten()
# 反标准化用于显示
inv_normalize = transforms.Normalize(
mean=[-0.485/0.229, -0.456/0.224, -0.406/0.225],
std=[1/0.229, 1/0.224, 1/0.225]
)
indices = np.random.choice(len(dataset), num_samples, replace=False)
for idx, i in enumerate(indices):
image, true_label = dataset[i]
# 预测
with torch.no_grad():
output = model(image.unsqueeze(0).to(device))
prob = torch.softmax(output, dim=1)
pred_label = output.argmax(dim=1).item()
confidence = prob[0][pred_label].item()
# 显示图片
img_display = inv_normalize(image).permute(1, 2, 0).numpy()
img_display = np.clip(img_display, 0, 1)
axes[idx].imshow(img_display)
color = 'green' if pred_label == true_label else 'red'
axes[idx].set_title(
f'预测: {CLASSES[pred_label]}\n'
f'实际: {CLASSES[true_label]}\n'
f'置信度: {confidence:.1%}',
color=color, fontsize=10
)
axes[idx].axis('off')
plt.tight_layout()
plt.savefig('predictions.png', dpi=150)
plt.show()
# 加载最佳模型
model = create_model(NUM_CLASSES)
model.load_state_dict(torch.load('best_classifier.pth'))
model = model.to(DEVICE)
# predict_and_visualize(model, test_dataset, DEVICE)
📊 混淆矩阵
import torch
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix
import seaborn as sns
def plot_confusion_matrix(model, loader, device, classes):
"""绘制混淆矩阵"""
model.eval()
all_preds = []
all_labels = []
with torch.no_grad():
for inputs, labels in loader:
inputs = inputs.to(device)
outputs = model(inputs)
preds = outputs.argmax(dim=1).cpu().numpy()
all_preds.extend(preds)
all_labels.extend(labels.numpy())
# 计算混淆矩阵
cm = confusion_matrix(all_labels, all_preds)
# 绘制
plt.figure(figsize=(12, 10))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
xticklabels=classes, yticklabels=classes)
plt.xlabel('预测类别')
plt.ylabel('真实类别')
plt.title('混淆矩阵')
plt.tight_layout()
plt.savefig('confusion_matrix.png', dpi=150)
plt.show()
# 计算每类准确率
class_acc = cm.diagonal() / cm.sum(axis=1)
for i, (cls, acc) in enumerate(zip(classes, class_acc)):
print(f"{cls}: {acc:.2%}")
# plot_confusion_matrix(model, test_loader, DEVICE, CLASSES)
📈 优化建议
提高准确率
- 更强的预训练模型
# 使用更大的模型
model = models.resnet50(weights='IMAGENET1K_V1')
# 或
model = models.efficientnet_b0(weights='IMAGENET1K_V1')
- 更多数据增强
train_transform = transforms.Compose([
transforms.Resize(256),
transforms.RandomCrop(224),
transforms.RandomHorizontalFlip(),
transforms.RandomVerticalFlip(p=0.1),
transforms.RandomRotation(20),
transforms.ColorJitter(0.3, 0.3, 0.3, 0.1),
transforms.RandomGrayscale(p=0.1),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
- 使用更好的优化策略
optimizer = optim.AdamW(model.parameters(), lr=3e-4, weight_decay=0.01)
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=EPOCHS)
🎯 应用到自己的数据
如果你有自己的图片数据:
from torchvision.datasets import ImageFolder
# 组织数据目录结构:
# data/
# train/
# class1/
# img1.jpg
# class2/
# img2.jpg
# test/
# class1/
# img3.jpg
# class2/
# img4.jpg
train_dataset = ImageFolder('data/train', transform=train_transform)
test_dataset = ImageFolder('data/test', transform=test_transform)
# 获取类别名称
classes = train_dataset.classes
num_classes = len(classes)
下一个项目
恭喜完成图像分类!接下来挑战情感分析处理文本数据!