PyTorch学习指南
首页
基础篇
进阶篇
高级篇
实战项目
🚀 编程指南
首页
基础篇
进阶篇
高级篇
实战项目
🚀 编程指南
  • 💡 实战项目

    • 💡 实战项目
    • ✍️ MNIST手写数字识别
    • 🖼️ 图像分类器
    • 💬 情感分析

💬 情感分析

使用LSTM构建一个电影评论情感分析模型!

🎯 项目目标

  • 判断电影评论是正面还是负面
  • 学习文本预处理和词嵌入
  • 掌握RNN/LSTM的实际应用

📊 数据集

使用 IMDB电影评论数据集:

  • 50,000条电影评论
  • 二分类:正面/负面
  • 已预分为训练集和测试集各25,000条

📝 完整代码

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
from collections import Counter
import re
import time

# ==================== 配置 ====================
VOCAB_SIZE = 20000      # 词表大小
MAX_LEN = 256           # 最大序列长度
EMBED_DIM = 128         # 词嵌入维度
HIDDEN_DIM = 256        # LSTM隐藏层维度
NUM_LAYERS = 2          # LSTM层数
BATCH_SIZE = 64
EPOCHS = 10
LEARNING_RATE = 0.001
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

print(f"使用设备: {DEVICE}")

# ==================== 数据预处理 ====================
def clean_text(text):
    """清洗文本"""
    # 转小写
    text = text.lower()
    # 去除HTML标签
    text = re.sub(r'<[^>]+>', '', text)
    # 去除特殊字符,保留字母和空格
    text = re.sub(r'[^a-zA-Z\s]', '', text)
    # 去除多余空格
    text = ' '.join(text.split())
    return text

def tokenize(text):
    """简单分词"""
    return text.split()

# ==================== 构建词表 ====================
class Vocabulary:
    def __init__(self, max_size=None):
        self.word2idx = {'<PAD>': 0, '<UNK>': 1}
        self.idx2word = {0: '<PAD>', 1: '<UNK>'}
        self.max_size = max_size
    
    def build(self, texts):
        """从文本构建词表"""
        word_counts = Counter()
        for text in texts:
            word_counts.update(tokenize(text))
        
        # 取最常见的词
        most_common = word_counts.most_common(self.max_size - 2)
        
        for word, _ in most_common:
            idx = len(self.word2idx)
            self.word2idx[word] = idx
            self.idx2word[idx] = word
    
    def encode(self, text, max_len=None):
        """文本转索引"""
        tokens = tokenize(text)
        if max_len:
            tokens = tokens[:max_len]
        return [self.word2idx.get(token, 1) for token in tokens]  # 1是<UNK>
    
    def __len__(self):
        return len(self.word2idx)

# ==================== 数据集类 ====================
class SentimentDataset(Dataset):
    def __init__(self, texts, labels, vocab, max_len=256):
        self.texts = texts
        self.labels = labels
        self.vocab = vocab
        self.max_len = max_len
    
    def __len__(self):
        return len(self.texts)
    
    def __getitem__(self, idx):
        text = clean_text(self.texts[idx])
        encoded = self.vocab.encode(text, self.max_len)
        return torch.tensor(encoded), torch.tensor(self.labels[idx])

def collate_fn(batch):
    """处理变长序列的批次"""
    texts, labels = zip(*batch)
    
    # 计算每个序列的长度
    lengths = torch.tensor([len(t) for t in texts])
    
    # 填充到相同长度
    texts_padded = pad_sequence(texts, batch_first=True, padding_value=0)
    labels = torch.stack(labels)
    
    return texts_padded, labels, lengths

# ==================== 模型定义 ====================
class SentimentLSTM(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim, num_layers, num_classes=1):
        super().__init__()
        
        # 词嵌入层
        self.embedding = nn.Embedding(
            vocab_size, embed_dim, padding_idx=0
        )
        
        # LSTM层
        self.lstm = nn.LSTM(
            embed_dim, hidden_dim, num_layers,
            batch_first=True,
            bidirectional=True,
            dropout=0.5 if num_layers > 1 else 0
        )
        
        # 注意力层(可选)
        self.attention = nn.Linear(hidden_dim * 2, 1)
        
        # 分类层
        self.fc = nn.Sequential(
            nn.Linear(hidden_dim * 2, hidden_dim),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(hidden_dim, num_classes)
        )
    
    def attention_pooling(self, lstm_output, mask=None):
        """注意力池化"""
        # lstm_output: (batch, seq_len, hidden*2)
        scores = self.attention(lstm_output).squeeze(-1)  # (batch, seq_len)
        
        if mask is not None:
            scores = scores.masked_fill(mask == 0, float('-inf'))
        
        weights = torch.softmax(scores, dim=1).unsqueeze(-1)  # (batch, seq_len, 1)
        pooled = (lstm_output * weights).sum(dim=1)  # (batch, hidden*2)
        return pooled
    
    def forward(self, x, lengths=None):
        # x: (batch, seq_len)
        
        # 词嵌入
        embedded = self.embedding(x)  # (batch, seq_len, embed_dim)
        
        # LSTM
        if lengths is not None:
            # 使用pack_padded_sequence处理变长序列
            from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence
            
            lengths_cpu = lengths.cpu()
            packed = pack_padded_sequence(
                embedded, lengths_cpu, batch_first=True, enforce_sorted=False
            )
            packed_output, (hidden, cell) = self.lstm(packed)
            lstm_output, _ = pad_packed_sequence(packed_output, batch_first=True)
        else:
            lstm_output, (hidden, cell) = self.lstm(embedded)
        
        # 获取最后的隐藏状态(双向拼接)
        # hidden: (num_layers*2, batch, hidden_dim)
        hidden = torch.cat([hidden[-2], hidden[-1]], dim=1)  # (batch, hidden*2)
        
        # 或者使用注意力池化
        # mask = (x != 0)
        # hidden = self.attention_pooling(lstm_output, mask)
        
        # 分类
        output = self.fc(hidden)
        return output.squeeze(-1)

# ==================== 准备数据 ====================
# 这里使用模拟数据演示,实际使用时替换为真实的IMDB数据
# 真实数据可以使用: from torchtext.datasets import IMDB

# 模拟数据
print("准备模拟数据...")
positive_reviews = [
    "This movie is great! I loved every minute of it.",
    "Amazing film with brilliant acting and story.",
    "One of the best movies I have ever seen.",
    "Fantastic movie, highly recommended!",
    "Wonderful cinematography and excellent plot.",
] * 500

negative_reviews = [
    "Terrible movie, waste of time and money.",
    "The worst film I have ever watched.",
    "Boring and predictable, very disappointing.",
    "Bad acting and poor storyline.",
    "Do not waste your time on this movie.",
] * 500

# 合并数据
texts = positive_reviews + negative_reviews
labels = [1] * len(positive_reviews) + [0] * len(negative_reviews)

# 打乱数据
import random
combined = list(zip(texts, labels))
random.shuffle(combined)
texts, labels = zip(*combined)
texts, labels = list(texts), list(labels)

# 划分训练集和测试集
split_idx = int(0.8 * len(texts))
train_texts, test_texts = texts[:split_idx], texts[split_idx:]
train_labels, test_labels = labels[:split_idx], labels[split_idx:]

print(f"训练集: {len(train_texts)} 样本")
print(f"测试集: {len(test_texts)} 样本")

# 构建词表
print("构建词表...")
vocab = Vocabulary(max_size=VOCAB_SIZE)
cleaned_texts = [clean_text(t) for t in train_texts]
vocab.build(cleaned_texts)
print(f"词表大小: {len(vocab)}")

# 创建数据集
train_dataset = SentimentDataset(train_texts, train_labels, vocab, MAX_LEN)
test_dataset = SentimentDataset(test_texts, test_labels, vocab, MAX_LEN)

train_loader = DataLoader(
    train_dataset, batch_size=BATCH_SIZE, shuffle=True, 
    collate_fn=collate_fn, num_workers=0
)
test_loader = DataLoader(
    test_dataset, batch_size=BATCH_SIZE, shuffle=False,
    collate_fn=collate_fn, num_workers=0
)

# ==================== 训练 ====================
model = SentimentLSTM(
    vocab_size=len(vocab),
    embed_dim=EMBED_DIM,
    hidden_dim=HIDDEN_DIM,
    num_layers=NUM_LAYERS
).to(DEVICE)

print(model)
print(f"参数量: {sum(p.numel() for p in model.parameters()):,}")

criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', patience=2)

def train_epoch(model, loader, criterion, optimizer, device):
    model.train()
    total_loss = 0
    correct = 0
    total = 0
    
    for texts, labels, lengths in loader:
        texts = texts.to(device)
        labels = labels.float().to(device)
        lengths = lengths.to(device)
        
        optimizer.zero_grad()
        outputs = model(texts, lengths)
        loss = criterion(outputs, labels)
        loss.backward()
        
        # 梯度裁剪
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        
        optimizer.step()
        
        total_loss += loss.item()
        predictions = (torch.sigmoid(outputs) > 0.5).long()
        correct += (predictions == labels.long()).sum().item()
        total += labels.size(0)
    
    return total_loss / len(loader), 100. * correct / total

def evaluate(model, loader, criterion, device):
    model.eval()
    total_loss = 0
    correct = 0
    total = 0
    
    with torch.no_grad():
        for texts, labels, lengths in loader:
            texts = texts.to(device)
            labels = labels.float().to(device)
            lengths = lengths.to(device)
            
            outputs = model(texts, lengths)
            loss = criterion(outputs, labels)
            
            total_loss += loss.item()
            predictions = (torch.sigmoid(outputs) > 0.5).long()
            correct += (predictions == labels.long()).sum().item()
            total += labels.size(0)
    
    return total_loss / len(loader), 100. * correct / total

# 训练循环
best_acc = 0
print("\n开始训练...")

for epoch in range(EPOCHS):
    start_time = time.time()
    
    train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, DEVICE)
    val_loss, val_acc = evaluate(model, test_loader, criterion, DEVICE)
    
    elapsed = time.time() - start_time
    
    print(f"Epoch {epoch+1}/{EPOCHS} ({elapsed:.1f}s)")
    print(f"  Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}%")
    print(f"  Val   Loss: {val_loss:.4f}, Val   Acc: {val_acc:.2f}%")
    
    scheduler.step(val_acc)
    
    if val_acc > best_acc:
        best_acc = val_acc
        torch.save({
            'model_state_dict': model.state_dict(),
            'vocab': vocab,
        }, 'sentiment_model.pth')
        print(f"  → 保存最佳模型 (Acc: {val_acc:.2f}%)")

print(f"\n训练完成! 最佳准确率: {best_acc:.2f}%")

# ==================== 预测示例 ====================
def predict_sentiment(model, text, vocab, device):
    """预测单条文本的情感"""
    model.eval()
    
    # 预处理
    cleaned = clean_text(text)
    encoded = vocab.encode(cleaned, MAX_LEN)
    tensor = torch.tensor([encoded]).to(device)
    length = torch.tensor([len(encoded)])
    
    # 预测
    with torch.no_grad():
        output = model(tensor, length)
        prob = torch.sigmoid(output).item()
    
    sentiment = "正面 😊" if prob > 0.5 else "负面 😞"
    return sentiment, prob

# 测试预测
test_reviews = [
    "This is the best movie I have ever seen! Absolutely amazing!",
    "Terrible film, I want my money back.",
    "It was okay, nothing special but not bad either.",
]

print("\n预测示例:")
print("-" * 60)
for review in test_reviews:
    sentiment, prob = predict_sentiment(model, review, vocab, DEVICE)
    print(f"评论: {review[:50]}...")
    print(f"情感: {sentiment} (置信度: {prob:.2%})")
    print("-" * 60)

🔧 使用真实IMDB数据

# 使用torchtext加载真实IMDB数据
from torchtext.datasets import IMDB
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator

# 分词器
tokenizer = get_tokenizer('basic_english')

# 加载数据
train_iter, test_iter = IMDB()

# 构建词表
def yield_tokens(data_iter):
    for label, text in data_iter:
        yield tokenizer(text)

vocab = build_vocab_from_iterator(
    yield_tokens(train_iter),
    specials=['<PAD>', '<UNK>'],
    max_tokens=VOCAB_SIZE
)
vocab.set_default_index(vocab['<UNK>'])

# 文本处理管道
text_pipeline = lambda x: vocab(tokenizer(x))
label_pipeline = lambda x: 1 if x == 'pos' else 0

📈 优化建议

1. 使用预训练词向量

import torchtext
# 加载GloVe词向量
glove = torchtext.vocab.GloVe(name='6B', dim=100)

# 初始化嵌入层
def init_embeddings(vocab, glove, embed_dim):
    embeddings = torch.zeros(len(vocab), embed_dim)
    for word, idx in vocab.word2idx.items():
        if word in glove.stoi:
            embeddings[idx] = glove[word]
        else:
            embeddings[idx] = torch.randn(embed_dim)
    return embeddings

# 使用预训练向量初始化
pretrained = init_embeddings(vocab, glove, 100)
model.embedding.weight.data.copy_(pretrained)

2. 使用Transformer

class TransformerClassifier(nn.Module):
    def __init__(self, vocab_size, embed_dim, num_heads, num_layers, num_classes=1):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.pos_encoding = nn.Parameter(torch.randn(1, 512, embed_dim))
        
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=embed_dim, nhead=num_heads, batch_first=True
        )
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers)
        
        self.fc = nn.Linear(embed_dim, num_classes)
    
    def forward(self, x):
        embedded = self.embedding(x) + self.pos_encoding[:, :x.size(1), :]
        transformed = self.transformer(embedded)
        pooled = transformed.mean(dim=1)  # 平均池化
        return self.fc(pooled).squeeze(-1)

3. 使用预训练语言模型

# 使用Hugging Face的transformers库
from transformers import BertTokenizer, BertModel

tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
bert = BertModel.from_pretrained('bert-base-uncased')

class BertClassifier(nn.Module):
    def __init__(self, bert_model, num_classes=1):
        super().__init__()
        self.bert = bert_model
        self.fc = nn.Linear(768, num_classes)
        
        # 冻结BERT参数(可选)
        for param in self.bert.parameters():
            param.requires_grad = False
    
    def forward(self, input_ids, attention_mask):
        outputs = self.bert(input_ids, attention_mask=attention_mask)
        pooled = outputs.pooler_output
        return self.fc(pooled).squeeze(-1)

🎯 应用场景

  • 产品评论分析
  • 社交媒体监控
  • 客户反馈分类
  • 新闻情感分析

🎉 恭喜完成所有项目!

你已经完成了PyTorch学习指南的所有内容!现在你掌握了:

  • PyTorch基础知识
  • 神经网络构建与训练
  • CNN图像处理
  • RNN文本处理
  • 迁移学习
  • 完整的深度学习项目实战

继续探索,祝你在深度学习的道路上越走越远!🚀

上次更新: 2025/11/25 18:38
Prev
🖼️ 图像分类器