💬 情感分析
使用LSTM构建一个电影评论情感分析模型!
🎯 项目目标
- 判断电影评论是正面还是负面
- 学习文本预处理和词嵌入
- 掌握RNN/LSTM的实际应用
📊 数据集
使用 IMDB电影评论数据集:
- 50,000条电影评论
- 二分类:正面/负面
- 已预分为训练集和测试集各25,000条
📝 完整代码
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
from collections import Counter
import re
import time
# ==================== 配置 ====================
VOCAB_SIZE = 20000 # 词表大小
MAX_LEN = 256 # 最大序列长度
EMBED_DIM = 128 # 词嵌入维度
HIDDEN_DIM = 256 # LSTM隐藏层维度
NUM_LAYERS = 2 # LSTM层数
BATCH_SIZE = 64
EPOCHS = 10
LEARNING_RATE = 0.001
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"使用设备: {DEVICE}")
# ==================== 数据预处理 ====================
def clean_text(text):
"""清洗文本"""
# 转小写
text = text.lower()
# 去除HTML标签
text = re.sub(r'<[^>]+>', '', text)
# 去除特殊字符,保留字母和空格
text = re.sub(r'[^a-zA-Z\s]', '', text)
# 去除多余空格
text = ' '.join(text.split())
return text
def tokenize(text):
"""简单分词"""
return text.split()
# ==================== 构建词表 ====================
class Vocabulary:
def __init__(self, max_size=None):
self.word2idx = {'<PAD>': 0, '<UNK>': 1}
self.idx2word = {0: '<PAD>', 1: '<UNK>'}
self.max_size = max_size
def build(self, texts):
"""从文本构建词表"""
word_counts = Counter()
for text in texts:
word_counts.update(tokenize(text))
# 取最常见的词
most_common = word_counts.most_common(self.max_size - 2)
for word, _ in most_common:
idx = len(self.word2idx)
self.word2idx[word] = idx
self.idx2word[idx] = word
def encode(self, text, max_len=None):
"""文本转索引"""
tokens = tokenize(text)
if max_len:
tokens = tokens[:max_len]
return [self.word2idx.get(token, 1) for token in tokens] # 1是<UNK>
def __len__(self):
return len(self.word2idx)
# ==================== 数据集类 ====================
class SentimentDataset(Dataset):
def __init__(self, texts, labels, vocab, max_len=256):
self.texts = texts
self.labels = labels
self.vocab = vocab
self.max_len = max_len
def __len__(self):
return len(self.texts)
def __getitem__(self, idx):
text = clean_text(self.texts[idx])
encoded = self.vocab.encode(text, self.max_len)
return torch.tensor(encoded), torch.tensor(self.labels[idx])
def collate_fn(batch):
"""处理变长序列的批次"""
texts, labels = zip(*batch)
# 计算每个序列的长度
lengths = torch.tensor([len(t) for t in texts])
# 填充到相同长度
texts_padded = pad_sequence(texts, batch_first=True, padding_value=0)
labels = torch.stack(labels)
return texts_padded, labels, lengths
# ==================== 模型定义 ====================
class SentimentLSTM(nn.Module):
def __init__(self, vocab_size, embed_dim, hidden_dim, num_layers, num_classes=1):
super().__init__()
# 词嵌入层
self.embedding = nn.Embedding(
vocab_size, embed_dim, padding_idx=0
)
# LSTM层
self.lstm = nn.LSTM(
embed_dim, hidden_dim, num_layers,
batch_first=True,
bidirectional=True,
dropout=0.5 if num_layers > 1 else 0
)
# 注意力层(可选)
self.attention = nn.Linear(hidden_dim * 2, 1)
# 分类层
self.fc = nn.Sequential(
nn.Linear(hidden_dim * 2, hidden_dim),
nn.ReLU(),
nn.Dropout(0.5),
nn.Linear(hidden_dim, num_classes)
)
def attention_pooling(self, lstm_output, mask=None):
"""注意力池化"""
# lstm_output: (batch, seq_len, hidden*2)
scores = self.attention(lstm_output).squeeze(-1) # (batch, seq_len)
if mask is not None:
scores = scores.masked_fill(mask == 0, float('-inf'))
weights = torch.softmax(scores, dim=1).unsqueeze(-1) # (batch, seq_len, 1)
pooled = (lstm_output * weights).sum(dim=1) # (batch, hidden*2)
return pooled
def forward(self, x, lengths=None):
# x: (batch, seq_len)
# 词嵌入
embedded = self.embedding(x) # (batch, seq_len, embed_dim)
# LSTM
if lengths is not None:
# 使用pack_padded_sequence处理变长序列
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence
lengths_cpu = lengths.cpu()
packed = pack_padded_sequence(
embedded, lengths_cpu, batch_first=True, enforce_sorted=False
)
packed_output, (hidden, cell) = self.lstm(packed)
lstm_output, _ = pad_packed_sequence(packed_output, batch_first=True)
else:
lstm_output, (hidden, cell) = self.lstm(embedded)
# 获取最后的隐藏状态(双向拼接)
# hidden: (num_layers*2, batch, hidden_dim)
hidden = torch.cat([hidden[-2], hidden[-1]], dim=1) # (batch, hidden*2)
# 或者使用注意力池化
# mask = (x != 0)
# hidden = self.attention_pooling(lstm_output, mask)
# 分类
output = self.fc(hidden)
return output.squeeze(-1)
# ==================== 准备数据 ====================
# 这里使用模拟数据演示,实际使用时替换为真实的IMDB数据
# 真实数据可以使用: from torchtext.datasets import IMDB
# 模拟数据
print("准备模拟数据...")
positive_reviews = [
"This movie is great! I loved every minute of it.",
"Amazing film with brilliant acting and story.",
"One of the best movies I have ever seen.",
"Fantastic movie, highly recommended!",
"Wonderful cinematography and excellent plot.",
] * 500
negative_reviews = [
"Terrible movie, waste of time and money.",
"The worst film I have ever watched.",
"Boring and predictable, very disappointing.",
"Bad acting and poor storyline.",
"Do not waste your time on this movie.",
] * 500
# 合并数据
texts = positive_reviews + negative_reviews
labels = [1] * len(positive_reviews) + [0] * len(negative_reviews)
# 打乱数据
import random
combined = list(zip(texts, labels))
random.shuffle(combined)
texts, labels = zip(*combined)
texts, labels = list(texts), list(labels)
# 划分训练集和测试集
split_idx = int(0.8 * len(texts))
train_texts, test_texts = texts[:split_idx], texts[split_idx:]
train_labels, test_labels = labels[:split_idx], labels[split_idx:]
print(f"训练集: {len(train_texts)} 样本")
print(f"测试集: {len(test_texts)} 样本")
# 构建词表
print("构建词表...")
vocab = Vocabulary(max_size=VOCAB_SIZE)
cleaned_texts = [clean_text(t) for t in train_texts]
vocab.build(cleaned_texts)
print(f"词表大小: {len(vocab)}")
# 创建数据集
train_dataset = SentimentDataset(train_texts, train_labels, vocab, MAX_LEN)
test_dataset = SentimentDataset(test_texts, test_labels, vocab, MAX_LEN)
train_loader = DataLoader(
train_dataset, batch_size=BATCH_SIZE, shuffle=True,
collate_fn=collate_fn, num_workers=0
)
test_loader = DataLoader(
test_dataset, batch_size=BATCH_SIZE, shuffle=False,
collate_fn=collate_fn, num_workers=0
)
# ==================== 训练 ====================
model = SentimentLSTM(
vocab_size=len(vocab),
embed_dim=EMBED_DIM,
hidden_dim=HIDDEN_DIM,
num_layers=NUM_LAYERS
).to(DEVICE)
print(model)
print(f"参数量: {sum(p.numel() for p in model.parameters()):,}")
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', patience=2)
def train_epoch(model, loader, criterion, optimizer, device):
model.train()
total_loss = 0
correct = 0
total = 0
for texts, labels, lengths in loader:
texts = texts.to(device)
labels = labels.float().to(device)
lengths = lengths.to(device)
optimizer.zero_grad()
outputs = model(texts, lengths)
loss = criterion(outputs, labels)
loss.backward()
# 梯度裁剪
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
optimizer.step()
total_loss += loss.item()
predictions = (torch.sigmoid(outputs) > 0.5).long()
correct += (predictions == labels.long()).sum().item()
total += labels.size(0)
return total_loss / len(loader), 100. * correct / total
def evaluate(model, loader, criterion, device):
model.eval()
total_loss = 0
correct = 0
total = 0
with torch.no_grad():
for texts, labels, lengths in loader:
texts = texts.to(device)
labels = labels.float().to(device)
lengths = lengths.to(device)
outputs = model(texts, lengths)
loss = criterion(outputs, labels)
total_loss += loss.item()
predictions = (torch.sigmoid(outputs) > 0.5).long()
correct += (predictions == labels.long()).sum().item()
total += labels.size(0)
return total_loss / len(loader), 100. * correct / total
# 训练循环
best_acc = 0
print("\n开始训练...")
for epoch in range(EPOCHS):
start_time = time.time()
train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, DEVICE)
val_loss, val_acc = evaluate(model, test_loader, criterion, DEVICE)
elapsed = time.time() - start_time
print(f"Epoch {epoch+1}/{EPOCHS} ({elapsed:.1f}s)")
print(f" Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}%")
print(f" Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2f}%")
scheduler.step(val_acc)
if val_acc > best_acc:
best_acc = val_acc
torch.save({
'model_state_dict': model.state_dict(),
'vocab': vocab,
}, 'sentiment_model.pth')
print(f" → 保存最佳模型 (Acc: {val_acc:.2f}%)")
print(f"\n训练完成! 最佳准确率: {best_acc:.2f}%")
# ==================== 预测示例 ====================
def predict_sentiment(model, text, vocab, device):
"""预测单条文本的情感"""
model.eval()
# 预处理
cleaned = clean_text(text)
encoded = vocab.encode(cleaned, MAX_LEN)
tensor = torch.tensor([encoded]).to(device)
length = torch.tensor([len(encoded)])
# 预测
with torch.no_grad():
output = model(tensor, length)
prob = torch.sigmoid(output).item()
sentiment = "正面 😊" if prob > 0.5 else "负面 😞"
return sentiment, prob
# 测试预测
test_reviews = [
"This is the best movie I have ever seen! Absolutely amazing!",
"Terrible film, I want my money back.",
"It was okay, nothing special but not bad either.",
]
print("\n预测示例:")
print("-" * 60)
for review in test_reviews:
sentiment, prob = predict_sentiment(model, review, vocab, DEVICE)
print(f"评论: {review[:50]}...")
print(f"情感: {sentiment} (置信度: {prob:.2%})")
print("-" * 60)
🔧 使用真实IMDB数据
# 使用torchtext加载真实IMDB数据
from torchtext.datasets import IMDB
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
# 分词器
tokenizer = get_tokenizer('basic_english')
# 加载数据
train_iter, test_iter = IMDB()
# 构建词表
def yield_tokens(data_iter):
for label, text in data_iter:
yield tokenizer(text)
vocab = build_vocab_from_iterator(
yield_tokens(train_iter),
specials=['<PAD>', '<UNK>'],
max_tokens=VOCAB_SIZE
)
vocab.set_default_index(vocab['<UNK>'])
# 文本处理管道
text_pipeline = lambda x: vocab(tokenizer(x))
label_pipeline = lambda x: 1 if x == 'pos' else 0
📈 优化建议
1. 使用预训练词向量
import torchtext
# 加载GloVe词向量
glove = torchtext.vocab.GloVe(name='6B', dim=100)
# 初始化嵌入层
def init_embeddings(vocab, glove, embed_dim):
embeddings = torch.zeros(len(vocab), embed_dim)
for word, idx in vocab.word2idx.items():
if word in glove.stoi:
embeddings[idx] = glove[word]
else:
embeddings[idx] = torch.randn(embed_dim)
return embeddings
# 使用预训练向量初始化
pretrained = init_embeddings(vocab, glove, 100)
model.embedding.weight.data.copy_(pretrained)
2. 使用Transformer
class TransformerClassifier(nn.Module):
def __init__(self, vocab_size, embed_dim, num_heads, num_layers, num_classes=1):
super().__init__()
self.embedding = nn.Embedding(vocab_size, embed_dim)
self.pos_encoding = nn.Parameter(torch.randn(1, 512, embed_dim))
encoder_layer = nn.TransformerEncoderLayer(
d_model=embed_dim, nhead=num_heads, batch_first=True
)
self.transformer = nn.TransformerEncoder(encoder_layer, num_layers)
self.fc = nn.Linear(embed_dim, num_classes)
def forward(self, x):
embedded = self.embedding(x) + self.pos_encoding[:, :x.size(1), :]
transformed = self.transformer(embedded)
pooled = transformed.mean(dim=1) # 平均池化
return self.fc(pooled).squeeze(-1)
3. 使用预训练语言模型
# 使用Hugging Face的transformers库
from transformers import BertTokenizer, BertModel
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
bert = BertModel.from_pretrained('bert-base-uncased')
class BertClassifier(nn.Module):
def __init__(self, bert_model, num_classes=1):
super().__init__()
self.bert = bert_model
self.fc = nn.Linear(768, num_classes)
# 冻结BERT参数(可选)
for param in self.bert.parameters():
param.requires_grad = False
def forward(self, input_ids, attention_mask):
outputs = self.bert(input_ids, attention_mask=attention_mask)
pooled = outputs.pooler_output
return self.fc(pooled).squeeze(-1)
🎯 应用场景
- 产品评论分析
- 社交媒体监控
- 客户反馈分类
- 新闻情感分析
🎉 恭喜完成所有项目!
你已经完成了PyTorch学习指南的所有内容!现在你掌握了:
- PyTorch基础知识
- 神经网络构建与训练
- CNN图像处理
- RNN文本处理
- 迁移学习
- 完整的深度学习项目实战
继续探索,祝你在深度学习的道路上越走越远!🚀