import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.autograd import Variable
dtype = torch.FloatTensor
# 本次将要训练的句子集,也就是输入
sentences = [ "i like dog", "i love coffee", "i hate milk"]
# 以下两行代码是将上面sentences列表中的单词提取出来
word_list = " ".join(sentences).split() # 每句首先使用空格分割形成一个单词列表
word_list = list(set(word_list)) # 用一个小技巧,先让list变成set,然后再变回去,这样就提取出了单词列表
# 以下两行是建立单词对应序号的索引字典word_dict和序号对应单词的索引number_dict
# 使用了enumerate函数,使得在遍历的同时可以追踪到序号,i, w是元组,其实可以写成(i, w)
word_dict = {w: i for i, w in enumerate(word_list)} # w: i 单词对应序号键值对
number_dict = {i: w for i, w in enumerate(word_list)} # i: w 序号对应单词键值对
n_class = len(word_dict) # number of Vocabulary
# NNLM Parameter
n_step = 2 # n-1 in paper 根据前两个单词预测第三个单词
n_hidden = 2 # h in paper 隐藏层神经元个数
m = 2 # m in paper 词向量维数
# make_batch是将输入sentences中的前面的单词和最后一个单词分开
def make_batch(sentences):
input_batch = [] # 用于存放输入的单词
target_batch = [] # 用于存放最后一个单词,模拟预测的结果
for sen in sentences: # 对sentences中的每个句子
word = sen.split() # 默认空格分割
input = [word_dict[n] for n in word[:-1]] # 注意这里的切片不能切反了,[:-1]是刚好最后一个不要
target = word_dict[word[-1]] # 最后一个单词
# 将分离好的输入结果放到列表中存好
input_batch.append(input)
target_batch.append(target)
return input_batch, target_batch
# Model NNLM模型部分
class NNLM(nn.Module): # 定义网络时一般是继承torch.nn.Module创建新的子类
def __init__(self): # 构造函数
super(NNLM, self).__init__() # 子类构造函数强制调用父类构造函数
# 参数都是论文中的数学表示
# 以下是设置神经网络中的各项参数
# 一个嵌入字典,第一个参数是嵌入字典的大小,第二个参数是每个嵌入向量的大小
# C词向量C(w)存在于矩阵C(|V|*m)中,矩阵C的行数表示词汇表的大小;列数表示词向量C(w)的维度。矩阵C的某一行对应一个单词的词向量表示
self.C = nn.Embedding(n_class, m)
# Parameter类是Variable的子类,常用于模块参数,作为属性时会被自动加入到参数列表中
# 隐藏层的权重(h*(n-1)m)
self.H = nn.Parameter(torch.randn(n_step * m, n_hidden).type(dtype))
# 输入层到输出层权重(|V|*(n-1)m)
self.W = nn.Parameter(torch.randn(n_step * m, n_class).type(dtype))
# 隐藏层偏置bias(h)
self.d = nn.Parameter(torch.randn(n_hidden).type(dtype))
# 隐藏层到输出层的权重(|V|*h)
self.U = nn.Parameter(torch.randn(n_hidden, n_class).type(dtype))
# 输出层的偏置bias(|V|)
self.b = nn.Parameter(torch.randn(n_class).type(dtype))
# 前向传播过程,如paper中描述
def forward(self, X):
X = self.C(X)
X = X.view(-1, n_step * m) # [batch_size, n_step * n_class]
tanh = torch.tanh(self.d + torch.mm(X, self.H)) # [batch_size, n_hidden]
output = self.b + torch.mm(X, self.W) + torch.mm(tanh, self.U) # [batch_size, n_class]
return output
model = NNLM() # 初始化模型
# 损失函数定义为交叉熵损失函数
criterion = nn.CrossEntropyLoss()
# 采用Adam优化算法,学习率0.001
optimizer = optim.Adam(model.parameters(), lr=0.001)
# 以下三行将输入进行torch包装,用Variable可以实现自动求导
input_batch, target_batch = make_batch(sentences)
input_batch = Variable(torch.LongTensor(input_batch))
target_batch = Variable(torch.LongTensor(target_batch))
# Training 训练过程,5000轮
for epoch in range(5000):
optimizer.zero_grad() # 初始化
output = model(input_batch)
# output : [batch_size, n_class], target_batch : [batch_size] (LongTensor, not one-hot)
loss = criterion(output, target_batch)
if (epoch + 1)%1000 == 0: # 每1000轮查看一次损失函数变化
print('Epoch:', '%04d' % (epoch + 1), 'cost =', '{:.6f}'.format(loss))
# 自动求导反向传播,使用step()来更新参数
loss.backward()
optimizer.step()
# Predict 预测值
predict = model(input_batch).data.max(1, keepdim=True)[1]
# Test 测试
print([sen.split()[:2] for sen in sentences], '->', [number_dict[n.item()] for n in predict.squeeze()])
最简单入门的NLP了。
下面是一个简单的文本分类。
import time
from datetime import timedelta
from datahelper.data_process import DataProcess
from config.lr_config import LrConfig
from lr_model import LrModel
import tensorflow as tf
def get_time_dif(start_time):
"""获取已经使用的时间"""
end_time = time.time()
time_dif = end_time-start_time
return timedelta(seconds=int(round(time_dif)))
def evaluate(sess, x_, y_):
"""测试集上准曲率评估"""
data_len = len(x_)
batch_eval = data_get.batch_iter(x_, y_, 128)
total_loss = 0
total_acc = 0
for batch_xs, batch_ys in batch_eval:
batch_len = len(batch_xs)
loss, acc = sess.run([model.loss, model.accuracy], feed_dict={model.x: batch_xs, model.y_: batch_ys})
total_loss += loss * batch_len
total_acc += acc * batch_len
return total_loss/data_len, total_acc/data_len
def get_data():
# 读取数据集
print("Loading training and validation data...")
X_train, X_test, y_train, y_test = data_get.provide_data()
X_train = X_train.toarray()
X_test = X_test.toarray()
return X_train, X_test, y_train, y_test, len(X_train[0])
def train(X_train, X_test, y_train, y_test):
# 配置Saver
saver = tf.train.Saver()
# 训练模型
print("Training and evaluating...")
start_time = time.time()
total_batch = 0 # 总批次
best_acc_val = 0.0 # 最佳验证集准确率
last_improved = 0 # 记录上一次提升批次
require_improvement = 1000 # 如果超过1000轮未提升,提前结束训练
flag = False
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
for step in range(config.num_epochs):
batch_train = data_get.batch_iter(X_train, y_train)
for batch_xs, batch_ys in batch_train:
if total_batch % config.print_per_batch == 0:
loss_train, acc_train = sess.run([model.loss, model.accuracy], feed_dict={model.x: X_train, model.y_: y_train})
loss_val, acc_val = evaluate(sess, X_test, y_test)
if acc_val > best_acc_val:
# 保存最好结果
best_acc_val = acc_val
last_improved = total_batch
saver.save(sess=sess, save_path=config.lr_save_path)
improve_str = "*"
else:
improve_str = ""
time_dif = get_time_dif(start_time)
msg = 'Iter: {0:>6}, Train Loss: {1:>6.2}, Train Acc: {2:>7.2%}, '\
+ 'Val Loss: {3:>6.2}, Val Acc: {4:>7.2%}, Time: {5} {6}'
print(msg.format(total_batch, loss_train, acc_train, loss_val, acc_val, time_dif, improve_str))
sess.run(model.train_step, feed_dict={model.x: batch_xs, model.y_: batch_ys})
total_batch += 1
if total_batch - last_improved > require_improvement:
# 验证集准确率长期不提升,提前结束训练
print("No optimization for a long time, auto-stopping...")
flag = True
break
if flag:
break
# TODO:后续有需要再做
def test():
"""
目前直接输入一个语料,分为训练集和验证集合
也可以输入两个,一个训练集用sklearn分为训练集和验证集,单独找一个验证集再这测试
还可以输入训练集、验证集、测试集,测试集在这做测试
"""
pass
if __name__ == "__main__":
config = LrConfig()
data_get = DataProcess(config.dataset_path, config.stopwords_path, config.tfidf_model_save_path)
X_train, X_test, y_train, y_test, seq_length = get_data()
model = LrModel(config, seq_length)
train(X_train, X_test, y_train, y_test)
import tensorflow as tf
import joblib
import jieba
from config.lr_config import LrConfig
from lr_model import LrModel
def pre_data(data, config):
"""分词去停用词"""
stopwords = list()
text_list = list()
with open(config.stopwords_path, 'r', encoding='utf-8') as f:
for word in f.readlines():
stopwords.append(word[:-1])
seg_text = jieba.cut(data)
text = [word for word in seg_text if word not in stopwords]
text_list.append(' '.join(text))
return text_list
def read_categories():
"""读取类别"""
with open(config.categories_save_path, 'r', encoding='utf-8') as f:
categories = f.readlines()
return categories[0].split('|')
def predict_line(data, categories):
"""预测结果"""
session = tf.Session()
session.run(tf.global_variables_initializer())
saver = tf.train.Saver()
saver.restore(sess=session, save_path=config.lr_save_path)
y_pred_cls = session.run(model.y_pred_cls, feed_dict={model.x: data})
return categories[y_pred_cls[0]]
if __name__ == "__main__":
data = "教育"
config = LrConfig()
line = pre_data(data, config)
tfidf_model = joblib.load(config.tfidf_model_save_path)
X_test = tfidf_model.transform(line).toarray()
model = LrModel(config, len(X_test[0]))
categories = read_categories()
print(predict_line(X_test, categories))
总共分了十个类别。对于你想预测的文本进行类型判断。
如果机器有情感....
下面是生成对话机器人:
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)