BERT不同于Transformer,Embedding采用三种相加的形式来表示:
token embeddings、segment embeddings和position embeddings都是简单的Embedding层。
假设输入的batch形状为(batch_size, seq_len),token embeddings负责将输入句子的单词映射为维度为d_model的向量,通过该层后的形状为(batch_size, seq_len, d_model);segment embeddings将输入的句子中分句的信息映射为维度为d_model的向量,通过该层后的形状为(batch_size, seq_len, d_model);position embeddings将输入的句子中单词的位置的信息映射为d_model的向量。
以上图为例,输入的句子为[[CLS] my dog is cute [SEP] he likes play ing [SEP]],则对应的句子的分句信息为[1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2],句子的位置信息为[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]。将这些数据分别进入对应的token embedding、segment embedding、position embedding层后,再将结果相加,就是BERT的嵌入层Embedding的输出。
实现代码:
class PositionEmbedding(layers.Layer):
def __init__(self, seq_len, d_model):
super(PositionEmbedding, self).__init__()
self.embedding = layers.Embedding(seq_len, d_model)
def __call__(self, x):
position_ids = tf.range(x.shape[1], dtype=tf.int32)[tf.newaxis, :]
position_embeddings = self.embedding(position_ids)
return position_embeddings
class Embedding(layers.Layer):
def __init__(self, vocab_size, segment_size, seq_len, d_model, rate=0.1):
super(Embedding, self).__init__()
self.seq_len = seq_len
self.d_model = d_model
self.token_embedding = layers.Embedding(vocab_size, d_model)
self.segment_embedding = layers.Embedding(segment_size, d_model)
self.position_embedding = PositionEmbedding(seq_len, d_model)
self.ln = layers.LayerNormalization(epsilon=1e-6)
self.dropout = layers.Dropout(rate)
def __call__(self, x, segment_ids, training):
# 将输入的字的信息映射到嵌入空间
token_embedding_output = self.token_embedding(x)
# 将输入的分句的信息映射到嵌入空间,表示有多少句话
segment_embedding_output = self.segment_embedding(segment_ids)
# 将输入的位置信息映射到嵌入空间
position_embedding_output = self.position_embedding(x)
# 进行相加
output = token_embedding_output + segment_embedding_output + position_embedding_output
output = self.ln(output)
output = self.dropout(output, training=training)
return output
Encoder编码层
BERT中的Encoder编码层和Transformer中的Encoder编码层一模一样,结构图如下:
实现代码:
# 编码器层
class Encoder(tf.keras.layers.Layer):
def __init__(self, num_heads, d_model, d_ff, rate=0.1):
super(Encoder, self).__init__()
# 多头注意力层
self.mha = MultiHeadAttention(num_heads, d_model)
self.dropout1 = layers.Dropout(rate)
self.ln1 = layers.LayerNormalization(epsilon=1e-6)
# 前馈网络层
self.ffn = FeedForwardNetwork(d_model, d_ff)
self.dropout2 = layers.Dropout(rate)
self.ln2 = layers.LayerNormalization(epsilon=1e-6)
def __call__(self, x, padding_mask, training):
# (batch_size, seq_len, d_model)=>(batch_size, seq_len, d_model)
mha_output, _ = self.mha(x, x, x, padding_mask)
dropout_output1 = self.dropout1(mha_output, training=training)
ln_output1 = self.ln1(x + dropout_output1)
# (batch_size, seq_len, d_model)=>(batch_size, seq_len, d_model)
ffn_output = self.ffn(ln_output1)
dropout_output2 = self.dropout2(ffn_output, training=training)
ln_output2 = self.ln2(ln_output1 + dropout_output2)
return ln_output2
BERT模型
BERT的主要部分由多个Transformer中Encoder堆叠而成,结构如下:
实现代码:
class BERT(tf.keras.Model):
def __init__(self, vocab_size, segment_size, seq_len, num_heads, d_model, d_ff, rate=0.1):
super(BERT, self).__init__()
self.embedding = Embedding(vocab_size, segment_size, seq_len, d_model, rate)
self.encoders = [Encoder(num_heads, d_model, d_ff, rate) for _ in range(2)]
self.dense = layers.Dense(vocab_size, activation="softmax")
def __call__(self, x, segment_ids, padding_mask, training):
output = self.embedding(x, segment_ids, training)
for encoder in self.encoders:
output = encoder(output, padding_mask, training)
output = self.dense(output)
return output
测试代码
一个基于BERT的MLM任务的例子:
bert.py:
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers
# 缩放点积注意力层
# 计算公式:
# attention(q,k,v)=soft(q*kT/sqrt(dk)+mask)*v
def scaled_dot_product_attention(q, k, v, mask):
# q*kT
matmul_qk = tf.matmul(q, k, transpose_b=True)
# 使用dk进行缩放,dk=k_dim
dk = tf.shape(k)[-1]
scaled_attention = matmul_qk / tf.sqrt(tf.cast(dk, tf.float32))
# 添加掩码
# 如果序列中后面是被PAD填充的怎么办?因为PAD只是为了并行化计算而填补的部分,它不应该含有任何信息。
# 所以我们需要加上一个Mask,Mask的形状为[batch, 1, 1, seq_len],补零的地方为1,其余为0。
# 乘上-1e9之后,数值上没有PAD的列均为0,PAD的列为一个很大的负数。
# 这样,经过softmax之后,被Mask的地方就近似等于0了,再乘上V的时候,就不会注意(或者说融合)PAD的信息了。
if mask is not None:
scaled_attention += (mask * -1e9)
# 获取attention weights矩阵
attention_weights = tf.nn.softmax(scaled_attention, axis=-1)
# 获取attention矩阵
attention = tf.matmul(attention_weights, v)
return attention, attention_weights
# 多头注意力层
class MultiHeadAttention(tf.keras.layers.Layer):
# d_model为词向量的维数
# num_heads为头的数量,也就是num_heads个q,k,v个矩阵
# seq_len为句子的长度
def __init__(self, num_heads, d_model):
super(MultiHeadAttention, self).__init__()
self.num_heads = num_heads
self.d_model = d_model
# k_dim = dk
self.k_dim = d_model // num_heads
# 全连接层
self.wq = layers.Dense(d_model)
self.wk = layers.Dense(d_model)
self.wv = layers.Dense(d_model)
# 全连接层
self.dense = tf.keras.layers.Dense(d_model)
# 分离出多个头
def split_heads(self, x, batch_size):
x = tf.reshape(x, (batch_size, -1, self.num_heads, self.k_dim))
return tf.transpose(x, perm=[0, 2, 1, 3])
# 输入矩阵形状:
# q(..., seq_len_q, k_dim)
# k(..., seq_len_k, k_dim)
# v(..., seq_len_v, v_dim)
# mask(..., seq_len, seq_len)
# 输出矩阵形状
# attention(..., seq_len_q, v_dim)
# attention_weights(..., seq_len_q, seq_len_k)
def __call__(self, q, k, v, mask):
batch_size = tf.shape(q)[0]
# (batch_size, seq_len, d_model)=>(batch_size, seq_len, d_model)
x_q = self.wq(q)
x_k = self.wk(k)
x_v = self.wv(v)
#
# (batch_size, seq_len, d_model)=>(batch_size, num_heads, seq_len, k_dim)
q = self.split_heads(x_q, batch_size)
k = self.split_heads(x_k, batch_size)
v = self.split_heads(x_v, batch_size)
# attention:(batch_size, num_heads, seq_len, k_dim)
# attention_weights:(batch_size, num_heads, seq_len, seq_len)
attention, attention_weights = scaled_dot_product_attention(q, k, v, mask)
# (batch_size, num_heads, seq_len, k_dim)=>(batch_size, seq_len, num_heads, k_dim)
attention = tf.transpose(attention, perm=[0, 2, 1, 3])
# (batch_size, seq_len, num_heads, k_dim)=>(batch_size, seq_len, d_model)
attention = tf.reshape(attention, (batch_size, -1, self.d_model))
# (batch_size, seq_len, d_model)=>(batch_size, seq_len, d_model)
attention = self.dense(attention)
return attention, attention_weights
# 前馈神经网络层
class FeedForwardNetwork(tf.keras.layers.Layer):
def __init__(self, d_model, d_ff):
super(FeedForwardNetwork, self).__init__()
self.dense1 = layers.Dense(d_ff, activation="relu")
self.dense2 = layers.Dense(d_model)
def __call__(self, x):
output = self.dense1(x)
output = self.dense2(output)
return output
# # 测试代码
# sample_fnn = FeedForwardNetwork(512, 2048)
# print(sample_fnn(tf.random.uniform((64, 50, 512))).shape)
# 编码器层
class Encoder(tf.keras.layers.Layer):
def __init__(self, num_heads, d_model, d_ff, rate=0.1):
super(Encoder, self).__init__()
# 多头注意力层
self.mha = MultiHeadAttention(num_heads, d_model)
self.dropout1 = layers.Dropout(rate)
self.ln1 = layers.LayerNormalization(epsilon=1e-6)
# 前馈网络层
self.ffn = FeedForwardNetwork(d_model, d_ff)
self.dropout2 = layers.Dropout(rate)
self.ln2 = layers.LayerNormalization(epsilon=1e-6)
def __call__(self, x, padding_mask, training):
# (batch_size, seq_len, d_model)=>(batch_size, seq_len, d_model)
mha_output, _ = self.mha(x, x, x, padding_mask)
dropout_output1 = self.dropout1(mha_output, training=training)
ln_output1 = self.ln1(x + dropout_output1)
# (batch_size, seq_len, d_model)=>(batch_size, seq_len, d_model)
ffn_output = self.ffn(ln_output1)
dropout_output2 = self.dropout2(ffn_output, training=training)
ln_output2 = self.ln2(ln_output1 + dropout_output2)
return ln_output2
# 固定位置编码
def get_position_embedding(sentence_length, d_model):
pos = np.expand_dims(np.arange(sentence_length), axis=-1)
i = np.expand_dims(np.arange(d_model), axis=0)
result = pos * np.power(10000, np.float32(2 * (i // 2) / d_model))
# 使用sin函数计算偶数列的向量元素
result_sin = np.sin(result[:, 0::2])
# 使用cos函数计算奇数列的向量元素
result_cos = np.cos(result[:, 1::2])
# 将偶数列的奇数列的元素连接在一起
position_embedding = np.concatenate((result_sin, result_cos), axis=-1)
# 增加数组的维数,最终形状为(1, sentence_length, d_model)
position_embedding = np.expand_dims(position_embedding, axis=0)
# 将numpy数组转换成tensorflow张量形式
return tf.cast(position_embedding, tf.float32)
# 可学习位置编码
class PositionEmbedding(layers.Layer):
def __init__(self, seq_len, d_model):
super(PositionEmbedding, self).__init__()
self.embedding = layers.Embedding(seq_len, d_model)
def __call__(self, x):
position_ids = tf.range(x.shape[1], dtype=tf.int32)[tf.newaxis, :]
position_embeddings = self.embedding(position_ids)
return position_embeddings
class Embedding(layers.Layer):
def __init__(self, vocab_size, segment_size, seq_len, d_model, rate=0.1):
super(Embedding, self).__init__()
self.seq_len = seq_len
self.d_model = d_model
self.token_embedding = layers.Embedding(vocab_size, d_model)
self.segment_embedding = layers.Embedding(segment_size, d_model)
self.position_embedding = PositionEmbedding(seq_len, d_model)
self.ln = layers.LayerNormalization(epsilon=1e-6)
self.dropout = layers.Dropout(rate)
def __call__(self, x, segment_ids, training):
# 将输入的字的信息映射到嵌入空间
token_embedding_output = self.token_embedding(x)
# 将输入的分句的信息映射到嵌入空间,表示有多少句话
segment_embedding_output = self.segment_embedding(segment_ids)
# 将输入的位置信息映射到嵌入空间
position_embedding_output = self.position_embedding(x)
# 进行相加
output = token_embedding_output + segment_embedding_output + position_embedding_output
output = self.ln(output)
output = self.dropout(output, training=training)
return output
class BERT(tf.keras.Model):
def __init__(self, vocab_size, segment_size, seq_len, num_heads, d_model, d_ff, rate=0.1):
super(BERT, self).__init__()
self.embedding = Embedding(vocab_size, segment_size, seq_len, d_model, rate)
self.encoders = [Encoder(num_heads, d_model, d_ff, rate) for _ in range(2)]
self.dense = layers.Dense(vocab_size, activation="softmax")
def __call__(self, x, segment_ids, padding_mask, training):
output = self.embedding(x, segment_ids, training)
for encoder in self.encoders:
output = encoder(output, padding_mask, training)
output = self.dense(output)
return output
# # 测试代码
# bert = BERT(1000, 2, 40, 4, 128, 256)
# x = tf.ones((64, 40))
# bert(x, padding_mask=None, training=False)
train.py
import time
import numpy as np
import train_data1
import tensorflow as tf
from tensorflow.keras import losses, optimizers, metrics
from BERT.bert import BERT
train_x, train_y, weights, vocab_size, tokenizer = train_data1.get_data()
# 模型
bert = BERT(vocab_size=vocab_size + 1,
segment_size=1,
seq_len=40,
num_heads=4,
d_model=128,
d_ff=512,
rate=0.1)
# 损失函数
loss_fn = losses.SparseCategoricalCrossentropy()
# 优化器
optimizer = optimizers.Adam(0.001)
# 指标
train_loss = metrics.Mean()
train_accuracy = metrics.SparseCategoricalAccuracy()
directory = ".\checkpoint"
checkpoint = tf.train.Checkpoint(bert=bert, optimizer=optimizer)
checkpoint_manager = tf.train.CheckpointManager(checkpoint, directory, max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
checkpoint.restore(checkpoint_manager.latest_checkpoint)
def create_padding_mark(seq):
# 获取为0的padding项
seq = tf.cast(tf.math.equal(seq, 0), tf.float32)
# 扩充维度以便用于attention矩阵
return seq[:, np.newaxis, np.newaxis, :]
@tf.function
def train_step(input_x, input_y):
segment_ids = tf.ones_like(input_x)
padding_mask = create_padding_mark(input_x)
with tf.GradientTape() as tape:
predictions = bert(input_x,
segment_ids,
padding_mask,
True)
loss = loss_fn(input_y, predictions)
gradients = tape.gradient(loss, bert.trainable_variables)
optimizer.apply_gradients(zip(gradients, bert.trainable_variables))
train_loss(loss)
train_accuracy(input_y, predictions)
def train(epochs=100):
for epoch in range(1, epochs + 1):
start = time.time()
train_loss.reset_state()
train_accuracy.reset_state()
train_step(train_x, train_y)
print("epoch:{} loss:{:.6f} accuracy:{:.6f}".format(epoch, train_loss.result(), train_accuracy.result()))
end = time.time()
print("times in 1 epoch:{:.6f} secs".format(end - start))
if epoch % 10 == 0:
path = checkpoint_manager.save()
print("save model at {}".format(path))
def evaluate():
inputs = [413, 1, 120, 121, 9, 29, 51, 1, 122, 3, 123, 124, 52, 1, 53, 54]
inputs = tf.expand_dims(inputs, axis=0)
# 句子长度最大为40,每个循环翻译出一个单词
segment_ids = tf.ones_like(inputs)
padding_mask = create_padding_mark(inputs)
predictions = bert(inputs, segment_ids, padding_mask, False)
# 获取单词对应的id
predictions_id = tf.cast(tf.argmax(predictions, axis=-1), dtype=tf.int32)
return predictions_id
train()
output = evaluate()
print(output)
print(tokenizer.sequences_to_texts(output.numpy()))
data.py
import numpy as np
from tensorflow.keras import preprocessing
path = "./data/sample_data.txt"
tokenizer = preprocessing.text.Tokenizer()
text_list = []
with open(path, 'r') as lines:
for line in lines:
text = preprocessing.text.text_to_word_sequence(line)
tokenizer.fit_on_texts(text)
text_list.append(text)
vocab_size = len(tokenizer.word_index)
id_list = [tokenizer.texts_to_sequences(text) for text in text_list]
padding_ids = preprocessing.sequence.pad_sequences(id_list, 40, padding="post", truncating="post")
padding_ids = np.squeeze(padding_ids, axis=-1)
train_y = padding_ids.copy()
mask = np.random.rand(*padding_ids.shape) < 0.15
mask[padding_ids <= 0] = False
labels = -1 * np.ones(padding_ids.shape, dtype=int)
labels[mask] = padding_ids[mask]
weights = np.ones(padding_ids.shape, dtype=int)
weights[labels == -1] = 0
mask1 = mask & (np.random.rand(*padding_ids.shape) < 0.90)
print(padding_ids.shape)
print(*padding_ids.shape)
padding_ids[mask1] = vocab_size + 1
mask2 = mask1 & (np.random.rand(*padding_ids.shape) < 1 / 9)
padding_ids[mask2] = np.random.randint(1, vocab_size + 1, mask2.sum())
train_x = padding_ids
def get_data():
return train_x, train_y, weights, vocab_size, tokenizer
data1.py
import random
import tensorflow as tf
from tensorflow.keras import preprocessing
# 数据预处理
# 1、给定输入文本,添加[MASK]、[CLS]、[SEP]标记
# 2、获取输入文本对应的分句信息
# 3、格式化数据,使用inputs、segment_ids作为训练数据
tokenizer = preprocessing.text.Tokenizer()
# 预处理前的text
text_list = []
# 预处理后的text
text_list1 = []
with open("./data/sample_data.txt", 'r') as lines:
for line in lines:
line = line.strip()
line = line.replace('.', '')
text = preprocessing.text.text_to_word_sequence(line)
tokenizer.fit_on_texts(text)
split_list = line.split(",")
text_list.append(split_list)
tokenizer.fit_on_texts(["[MASK]", "[CLS]", "[SEP]"])
# 在句子开头加上[CLS],在分句后加上[SEP]
for text in text_list:
for i in range(len(text)):
text[i] += " [SEP]" # 前面有一个空格,如果没有会出错
text.insert(0, "[CLS] ") # 后面有一个空格,如果没有会出错
texts = ""
for token in text:
texts += token
text_list1.append(texts)
sequences = tokenizer.texts_to_sequences(text_list1)
# 添加mask
vocab_size = len(tokenizer.word_index) - 3
mask_id = tokenizer.word_index.get("mask")
sep_id = tokenizer.word_index.get("sep")
cls_id = tokenizer.word_index.get("cls")
print(sequences)
for sequence in sequences:
for i in range(len(sequence)):
if sequence[i] == sep_id or sequence[i] == cls_id:
continue
prob = random.random()
if prob > 0.85:
prob = (1 - prob) / 0.15
if prob < 0.8:
sequence[i] = mask_id
elif prob < 0.9:
sequence[i] = random.randint(1, vocab_size)
padding_value = 0
segments = []
# 获取对应的分句信息
for data in sequences:
sid = 1
s_list = []
for i in range(len(data)):
s_list.append(sid)
# 如果为[SEP]
if data[i] == sep_id:
sid += 1
segments.append(s_list)
inputs = preprocessing.sequence.pad_sequences(sequences,
maxlen=40,
padding="post",
truncating="post",
value=padding_value)
segment_ids = preprocessing.sequence.pad_sequences(segments,
maxlen=40,
padding="post",
truncating="post",
value=padding_value + 1)
测试结果
输入数据:[413, 1, 120, 121, 9, 29, 51, 1, 122, 3, 123, 124, 52, 1, 53, 54]
对应字符串:[mask] the heavy showers had washed away the debris and dust heaps before the cabin doors
输出数据:[119 1 120 121 9 29 51 1 122 3 123 124 52 1 53 54]
对应字符串:where the heavy showers had washed away the debris and dust heaps before the cabin doors
从结果可以看出,通过多轮训练,模型有准确预测mask处的能力。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)