Commit 9bdff895 by pangyiwen

init

parents
/project3/sample.html
/project3/data
/project3/.idea
/.idea
# 运行说明
主函数在./tanxin/industry_classifier.py中,传入数据集:【待抽取关键词文本数据.txt】
import collections
import os
import random
import torch
from torch import nn
import torchtext.vocab as Vocab
import torch.utils.data as Data
from sklearn.model_selection import train_test_split
import jieba
import time
import torch.nn.functional as F
class MyDataset:
def __init__(self, data_dir, batch_size=64):
self.batch_size = batch_size
self.data_dir = data_dir
self.label_list = ['体育', '娱乐', '家居', '房产', '教育',
'时尚', '时政', '游戏', '科技', '财经']
self.data = self.read_file()
self.train_iter, self.test_iter, self.vocab = self.preprocess(self.data)
def read_file(self):
with open(self.data_dir, 'rb') as f:
lines = f.readlines()
reviews = map(lambda x: x.decode('utf-8').lower().replace('\r\n', ''), lines)
data = []
for review in reviews:
str_array = review.split('\t')
data.append([str_array[1], self.label_list.index(str_array[0])])
random.shuffle(data)
return data
@staticmethod
def get_tokenized(data):
"""
data: list of [string, label]
"""
def tokenizer(text):
return jieba.cut(text, cut_all=True)
reviews = []
for review, _ in data:
reviews.append([tokens for tokens in tokenizer(review)])
return reviews
@staticmethod
def get_vocab(tokenized_data):
counter = collections.Counter([tk for st in tokenized_data for tk in st])
return Vocab.Vocab(counter, min_freq=5)
def preprocess(self, data):
max_l = 350
def pad(x):
return x[:max_l] if len(x) > max_l else x + [0] * (max_l - len(x))
tokenized_data = self.get_tokenized(data)
vocab = self.get_vocab(tokenized_data)
features = torch.tensor([pad([vocab.stoi[word] for word in words]) for words in tokenized_data])
labels = torch.tensor([score for _, score in data])
X_train, X_test, y_train, y_test = train_test_split(features, labels, test_size=0.25, random_state=0)
train_set = Data.TensorDataset(X_train, y_train)
test_set = Data.TensorDataset(X_test, y_test)
train_iter = Data.DataLoader(train_set, self.batch_size, shuffle=True)
test_iter = Data.DataLoader(test_set, self.batch_size)
return train_iter, test_iter, vocab
class BiLSTM(nn.Module):
def __init__(self, vocab, embed_size, num_hiddens, num_layers):
super(BiLSTM, self).__init__()
self.embedding = nn.Embedding(len(vocab), embed_size)
# bidirectional设为True即得到双向循环神经网络
self.encoder = nn.LSTM(input_size=embed_size,
hidden_size=num_hiddens,
num_layers=num_layers,
bidirectional=True)
self.attention_layer = SelfAttention(num_hiddens * 2)
# 初始时间步和最终时间步的隐藏状态作为全连接层输入,10分类
self.decoder = nn.Linear(2 * num_hiddens, 10)
def forward(self, inputs):
# inputs的形状是(批量大小, 词数),LSTM需要将序列长度(seq_len)作为第一维
# 再提取词特征,输出形状为(词数, 批量大小, 词向量维度)
embeddings = self.embedding(inputs.permute(1, 0))
# outputs (词数, 批量大小, 2 * 隐藏单元个数)
outputs, _ = self.encoder(embeddings) # output, (h, c)
# # # Attention过程
feat, att_score = self.attention_layer(outputs.permute(1, 0, 2))
outs = self.decoder(feat)
return outs, att_score
class SelfAttention(nn.Module):
def __init__(self, num_hiddens):
super(SelfAttention, self).__init__()
self.weight_W = nn.Parameter(torch.empty(num_hiddens, num_hiddens), requires_grad=True)
self.weight_proj = nn.Parameter(torch.empty(num_hiddens, 1), requires_grad=True)
# 自定义的权重初始化
nn.init.uniform_(self.weight_W, -0.1, 0.1)
nn.init.uniform_(self.weight_proj, -0.1, 0.1)
def forward(self, x):
u = torch.tanh(torch.matmul(x, self.weight_W))
att = torch.matmul(u, self.weight_proj)
att_score = F.softmax(att, dim=1)
scored_x = x * att_score
feat = torch.sum(scored_x, dim=1)
return feat, att_score
def train_model(data_dir):
os.environ["CUDA_VISIBLE_DEVICES"] = "0"
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
dataset = MyDataset(data_dir=data_dir)
embed_size, num_hiddens, num_layers = 100, 128, 2
net = BiLSTM(dataset.vocab, embed_size, num_hiddens, num_layers)
lr, num_epochs = 0.01, 5
# 要过滤掉不计算梯度的embedding参数,如果是预训练词嵌入就不需要更新embedding层,这里要训练
optimizer = torch.optim.Adam(filter(lambda p: p.requires_grad, net.parameters()), lr=lr)
loss = nn.CrossEntropyLoss()
train(dataset.train_iter, dataset.test_iter, net, loss, optimizer, device, num_epochs)
def train(train_iter, test_iter, net, loss_fn, optimizer, device, num_epochs):
net = net.to(device)
print("training on ", device)
batch_count = 0
for epoch in range(num_epochs):
train_l_sum, train_acc_sum, n, start = 0.0, 0.0, 0, time.time()
count = 0
for X, y in train_iter:
count += 1
X = X.to(device)
y = y.to(device)
y_hat, _ = net(X)
loss = loss_fn(y_hat, y)
optimizer.zero_grad()
loss.backward()
optimizer.step()
train_l_sum += loss.cpu().item()
train_acc_sum += (y_hat.argmax(dim=1) == y).sum().cpu().item()
n += y.shape[0]
batch_count += 1
if count % 10 == 0:
print('epoch %d, loss %.4f, train acc %.3f'
% (epoch + 1, train_l_sum / batch_count, train_acc_sum / n))
test_acc = evaluate_accuracy(test_iter, net)
print('epoch %d, loss %.4f, train acc %.3f, test acc %.3f, time %.1f sec'
% (epoch + 1, train_l_sum / batch_count, train_acc_sum / n, test_acc, time.time() - start))
def evaluate_accuracy(data_iter, net, device=None):
if device is None and isinstance(net, torch.nn.Module):
# 如果没指定device就使用net的device
device = list(net.parameters())[0].device
acc_sum, n = 0.0, 0
with torch.no_grad():
for X, y in data_iter:
if isinstance(net, torch.nn.Module):
net.eval() # 评估模式, 这会关闭dropout
outputs, _ = net(X.to(device))
acc_sum += (outputs.argmax(dim=1) == y.to(device)).float().sum().cpu().item()
net.train() # 改回训练模式
n += y.shape[0]
return acc_sum / n
if __name__ == '__main__':
data_path = os.path.join(r'D:\JetBrains\workspace\project3\data', '待抽取关键词文本数据.txt')
train_model(data_path)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment