手动搭建transformer模型,时序预测
一、数据股票的数据具有时序性,采用股票数据来进行预测
下载随机一只股票历史数据进行处理,此次选用600243的数据
在这一步直接设定batch为1了,需要改的话需要自己重写一下数据,可以看我之前的那一个博客
(42条消息) transformer模型多特征、单特征seq2seq时序预测_m0_57144920的博客-CSDN博客
import pandas as pd
import torch
import numpy as np
from torch.utils.data import TensorDataset
from torch.utils.data import DataLoader
data_path=r'D:\pytorch_learn\second0243pre0243.csv'
elements=['收盘价','最高价','最低价','开盘价','前收盘']
# element=['开盘价']
def single_data():#以收盘价为y,且x归一化
data_all = pd.read_csv(data_path, encoding='gbk')
data_ha = []
length = len(data_all)
for index, element in enumerate(elements):
data_element = data_all[element].values.astype(np.float64)
data_element = data_element.reshape(length, 1)
data_ha.append(data_element)
X_hat = np.concatenate(data_ha, axis=1)
# X_hat=data_all[element].values.astype(np.float64)
X_CONVERT = torch.from_numpy(X_hat)
X = torch.zeros_like(X_CONVERT)
a = len(X_CONVERT)
for i in range(a):
X[i, :] = X_CONVERT[a - 1 - i, :]
y = X[5:,3].type(torch.float32)
y=y.reshape(y.shape[0],1)
X = X[0:-5, :].type(torch.float32)
# X-=torch.min(X,dim=0)
# X/=torch.max(X,dim=0)
# X -= torch.mean(X, dim=0)
# X /= torch.std(X, dim=0)
dataset=TensorDataset(X,y)
data_loader=DataLoader(dataset,batch_size=64,shuffle=False)
return data_loader #torch.Size([64, 5]) [64,1]) 作为一个batch(batch_size为1)
二、模型
import torch
import numpy as np
import torch.nn as nn
d_model = 512 # 字 Embedding 的维度
d_ff = 2048 # 前向传播隐藏层维度
d_k = d_v = 64 # K(=Q), V的维度
n_layers = 1 # 有多少个encoder和decoder
n_heads = 8 # Multi-Head Attention设置为8
#
class PositionalEncoding(nn.Module):
def __init__(self, d_model, dropout=0.1, max_len=5000):
super(PositionalEncoding, self).__init__()
self.dropout = nn.Dropout(p=dropout)
pos_table = np.array([
[pos / np.power(10000, 2 * i / d_model) for i in range(d_model)]
if pos != 0 else np.zeros(d_model) for pos in range(max_len)])
pos_table[1:, 0::2] = np.sin(pos_table[1:, 0::2]) # 字嵌入维度为偶数时
pos_table[1:, 1::2] = np.cos(pos_table[1:, 1::2]) # 字嵌入维度为奇数时
self.pos_table = torch.FloatTensor(pos_table) # enc_inputs: [seq_len, d_model]
def forward(self, enc_inputs): # enc_inputs: [batch_size, seq_len, d_model]
enc_inputs += self.pos_table[:enc_inputs.size(1), :]
return self.dropout(enc_inputs)
def get_attn_pad_mask(seq_q, seq_k): # seq_q: [batch_size, seq_len] ,seq_k: [batch_size, seq_len]
batch_size, len_q ,_= seq_q.size() #1*64*5
batch_size, len_k ,_= seq_k.size()
# pad_attn_mask = seq_k.data.eq(0).unsqueeze(1) # 判断 输入那些含有P(=0),用1标记 ,[batch_size, 1, len_k]
pad_attn_mask = torch.ones(batch_size,len_q,len_k) # 判断 输入那些含有P(=0),用1标记 ,[batch_size, 1, len_k]
return pad_attn_mask # 扩展成多维度
class ScaledDotProductAttention(nn.Module):
def __init__(self):
super(ScaledDotProductAttention, self).__init__()
def forward(self, Q, K, V, attn_mask): # Q: [batch_size, n_heads, len_q, d_k]
# K: [batch_size, n_heads, len_k, d_k]
# V: [batch_size, n_heads, len_v(=len_k), d_v]
# attn_mask: [batch_size, n_heads, seq_len, seq_len]
scores = torch.matmul(Q, K.transpose(-1, -2)) / np.sqrt(d_k) # scores : [batch_size, n_heads, len_q, len_k]
scores.masked_fill_(attn_mask, -1e9) # 如果时停用词P就等于 0
attn = nn.Softmax(dim=-1)(scores)
context = torch.matmul(attn, V) # [batch_size, n_heads, len_q, d_v]
return context, attn
class MultiHeadAttention(nn.Module):
def __init__(self):
super(MultiHeadAttention, self).__init__()
self.W_Q = nn.Linear(d_model, d_k * n_heads, bias=False)
self.W_K = nn.Linear(d_model, d_k * n_heads, bias=False)
self.W_V = nn.Linear(d_model, d_v * n_heads, bias=False)
self.fc = nn.Linear(n_heads * d_v, d_model, bias=False)
def forward(self, input_Q, input_K, input_V, attn_mask): # input_Q: [batch_size, len_q, d_model]
# input_K: [batch_size, len_k, d_model]
# input_V: [batch_size, len_v(=len_k), d_model]
# attn_mask: [batch_size, seq_len, seq_len]
residual, batch_size = input_Q, input_Q.size(0)
Q = self.W_Q(input_Q).view(batch_size, -1, n_heads, d_k).transpose(1, 2) # Q: [batch_size, n_heads, len_q, d_k]
K = self.W_K(input_K).view(batch_size, -1, n_heads, d_k).transpose(1, 2) # K: [batch_size, n_heads, len_k, d_k]
V = self.W_V(input_V).view(batch_size, -1, n_heads, d_v).transpose(1,2) # V: [batch_size, n_heads, len_v(=len_k), d_v]
attn_mask = attn_mask.unsqueeze(1).repeat(1, n_heads, 1,
1) # attn_mask : [batch_size, n_heads, seq_len, seq_len]
context, attn = ScaledDotProductAttention()(Q, K, V, attn_mask) # context: [batch_size, n_heads, len_q, d_v]
# attn: [batch_size, n_heads, len_q, len_k]
context = context.transpose(1, 2).reshape(batch_size, -1,
n_heads * d_v) # context: [batch_size, len_q, n_heads * d_v]
output = self.fc(context) # [batch_size, len_q, d_model]
return nn.LayerNorm(d_model)(output + residual), attn
class PoswiseFeedForwardNet(nn.Module):
def __init__(self):
super(PoswiseFeedForwardNet, self).__init__()
self.fc = nn.Sequential(
nn.Linear(d_model, d_ff, bias=False),
nn.ReLU(),
nn.Linear(d_ff, d_model, bias=False))
def forward(self, inputs): # inputs: [batch_size, seq_len, d_model]
residual = inputs
output = self.fc(inputs)
return nn.LayerNorm(d_model)(output + residual) # [batch_size, seq_len, d_model]
class EncoderLayer(nn.Module):
def __init__(self):
super(EncoderLayer, self).__init__()
self.enc_self_attn = MultiHeadAttention() # 多头注意力机制
self.pos_ffn = PoswiseFeedForwardNet() # 前馈神经网络
def forward(self, enc_inputs, enc_self_attn_mask): # enc_inputs: [batch_size, src_len, d_model]
#输入3个enc_inputs分别与W_q、W_k、W_v相乘得到Q、K、V # enc_self_attn_mask: [batch_size, src_len, src_len]
enc_outputs, attn = self.enc_self_attn(enc_inputs, enc_inputs, enc_inputs, # enc_outputs: [batch_size, src_len, d_model],
enc_self_attn_mask) # attn: [batch_size, n_heads, src_len, src_len]
enc_outputs = self.pos_ffn(enc_outputs) # enc_outputs: [batch_size, src_len, d_model]
return enc_outputs, attn
class Encoder(nn.Module):
def __init__(self):
super(Encoder, self).__init__()
self.src_emb = nn.Linear(5, d_model)
self.pos_emb = PositionalEncoding(d_model) # 加入位置信息
self.layers = nn.ModuleList([EncoderLayer() for _ in range(n_layers)])
def forward(self, enc_inputs): # enc_inputs: [batch_size, src_len]
enc_outputs = self.src_emb(enc_inputs) # enc_outputs: [batch_size, src_len, d_model]
# print('1',enc_outputs)
enc_outputs = self.pos_emb(enc_outputs) # enc_outputs: [batch_size, src_len, d_model]
enc_self_attn_mask = get_attn_pad_mask(enc_inputs, enc_inputs) # enc_self_attn_mask: [batch_size, src_len, src_len]
enc_self_attns = []
for layer in self.layers:
enc_outputs, enc_self_attn = layer(enc_outputs, enc_self_attn_mask) # enc_outputs : [batch_size, src_len, d_model], # enc_self_attn : [batch_size, n_heads, src_len, src_len]
enc_self_attns.append(enc_self_attn)
return enc_outputs, enc_self_attns
class Transformer(nn.Module):
def __init__(self):
super(Transformer, self).__init__()
self.Encoder = Encoder()
self.projection = nn.Linear(d_model, 1, bias=False)
# self.projection1 = nn.Linear(128, 1, bias=False)
def forward(self, enc_inputs): # enc_inputs: [batch_size, src_len] # dec_inputs: [batch_size, tgt_len]
enc_outputs, enc_self_attns = self.Encoder(enc_inputs) # enc_outputs: [batch_size, src_len, d_model],
# enc_self_attns: [n_layers, batch_size, n_heads, src_len, src_len]
dec_logits = self.projection(enc_outputs) # dec_logits: [batch_size, tgt_len, 1]
# dec_logits = self.projection1(dec_logits) # dec_logits: [batch_size, tgt_len, 1]
return dec_logits.view(-1, dec_logits.size(-1)), enc_self_attns
三、训练
from transformerhah import Transformer
import torch.nn as nn
import torch.optim as optim
from data import single_data
import matplotlib.pyplot as plt
import numpy as np
import os
import torch
import copy
png_save_path=r'D:\pytorch_learn\second.24transformer\png'
if not os.path.isdir(png_save_path):
os.mkdir(png_save_path)
path_train=os.path.join(png_save_path,'weight.pth')
loader=single_data()
model = Transformer()
criterion = nn.MSELoss() #忽略 占位符 索引为0.
optimizer = optim.SGD(model.parameters(), lr=1e-3, momentum=0.99)
best_loss=100000
best_epoch=0
for epoch in range(50):
epoch_loss=0
y_pre=[]
y_true=[]
for X,y in loader: # enc_inputs : [batch_size, src_len,1](64*5)
enc_inputs=X.unsqueeze(0) #(1*64*5)
# enc_inputs=enc_inputs.squeeze(2)
# dec_inputs : [batch_size, ]
# dec_outputs: [batch_size, 1]
outputs, enc_self_attns = model(enc_inputs)
# print(outputs.shape)
outputs=outputs.squeeze(1)
outputs=outputs.unsqueeze(0)
y=y.unsqueeze(0)
# outputs: [batch_size * tgt_len, tgt_vocab_size]
loss = criterion(outputs, y.view(1,-1))
loss_num=loss.item()
epoch_loss+=loss_num
optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 0.5)
optimizer.step()
y_pre.append(outputs.detach().numpy())
y_true.append(y.detach().numpy())
if epoch_loss<best_loss:
best_loss=epoch_loss
best_epoch=epoch
best_model_wts=copy.deepcopy(model.state_dict())
torch.save(best_model_wts,path_train)
pre = np.concatenate(y_pre, axis=1).squeeze(0) # no norm label
true = np.concatenate(y_true, axis=1).squeeze(2) # no norm label
true=true.squeeze(0)
if True:
plt.plot(true, color="blue", alpha=0.5)
plt.plot(pre, color="red", alpha=0.5)
plt.plot(pre - true, color="green", alpha=0.8)
plt.grid(True, which='both')
plt.axhline(y=0, color='k')
# pyplot.savefig(os.path.join(png_save_path, 'pre.png'))
plt.savefig(os.path.join(png_save_path, '%d.png'%epoch))
plt.close()
print('Epoch:', '%04d' % (epoch + 1), 'loss =', '{:.6f}'.format(epoch_loss))
print('best_loss::|',best_loss,'---best_epoch::|',best_epoch)
train_over_path=os.path.join(png_save_path,'loss(%d)---epoch(%d).pth'%(best_loss,best_epoch))
os.rename(path_train,train_over_path)
print('*******************over****************************')
四、效果
五、总结
在这里好像没有进行padding,每个批次的seq_len都为64,但是最后一个批次不足64,没进行padding,
2022.4.24更新:
最近私信的小伙伴有点多(可能是大家都开始接手课题的原因吧哈哈),如想要代码的话可以在后台私信我留下邮箱,我看到了会发过去的。
接下来补充一下代码部分的缺点:
代码是去年写的,现在来看还有很多不足之处,有些地方写的也很稚嫩
1、没有配置gpu环境
2、batch_size在数据处理时设置为1了,导致loss看起来很大,而且训练效果不是很理想(我的理解是batch_size设置为1就相当于随机梯度下降了,收敛效果应该没有batch下降好),感兴趣的兄弟们可以在数据预处理代码部分改一下
3、超参数设置不是很方便,当时写代码的只想着快点跑起来,没考虑代码的可读性,现在写的话应该会用parser库把超参数都集中到一起
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)