BERT:Bidirectional Encoder Representation from Transformers

BERT:Bidirectional Encoder Representation from Transformers,第1张

基本概念 Embedding嵌入层

BERT不同于Transformer,Embedding采用三种相加的形式来表示:

token embeddings、segment embeddings和position embeddings都是简单的Embedding层。

假设输入的batch形状为(batch_size, seq_len),token embeddings负责将输入句子的单词映射为维度为d_model的向量,通过该层后的形状为(batch_size, seq_len, d_model);segment embeddings将输入的句子中分句的信息映射为维度为d_model的向量,通过该层后的形状为(batch_size, seq_len, d_model);position embeddings将输入的句子中单词的位置的信息映射为d_model的向量。

以上图为例,输入的句子为[[CLS] my dog is cute [SEP] he likes play ing [SEP]],则对应的句子的分句信息为[1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2],句子的位置信息为[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]。将这些数据分别进入对应的token embedding、segment embedding、position embedding层后,再将结果相加,就是BERT的嵌入层Embedding的输出。

实现代码:

class PositionEmbedding(layers.Layer):
    def __init__(self, seq_len, d_model):
        super(PositionEmbedding, self).__init__()
        self.embedding = layers.Embedding(seq_len, d_model)

    def __call__(self, x):
        position_ids = tf.range(x.shape[1], dtype=tf.int32)[tf.newaxis, :]
        position_embeddings = self.embedding(position_ids)
        return position_embeddings


class Embedding(layers.Layer):
    def __init__(self, vocab_size, segment_size, seq_len, d_model, rate=0.1):
        super(Embedding, self).__init__()
        self.seq_len = seq_len
        self.d_model = d_model
        self.token_embedding = layers.Embedding(vocab_size, d_model)
        self.segment_embedding = layers.Embedding(segment_size, d_model)
        self.position_embedding = PositionEmbedding(seq_len, d_model)

        self.ln = layers.LayerNormalization(epsilon=1e-6)
        self.dropout = layers.Dropout(rate)

    def __call__(self, x, segment_ids, training):
        # 将输入的字的信息映射到嵌入空间
        token_embedding_output = self.token_embedding(x)
        # 将输入的分句的信息映射到嵌入空间,表示有多少句话
        segment_embedding_output = self.segment_embedding(segment_ids)
        # 将输入的位置信息映射到嵌入空间
        position_embedding_output = self.position_embedding(x)
        # 进行相加
        output = token_embedding_output + segment_embedding_output + position_embedding_output

        output = self.ln(output)
        output = self.dropout(output, training=training)
        return output
Encoder编码层

BERT中的Encoder编码层和Transformer中的Encoder编码层一模一样,结构图如下:

实现代码:

# 编码器层
class Encoder(tf.keras.layers.Layer):
    def __init__(self, num_heads, d_model, d_ff, rate=0.1):
        super(Encoder, self).__init__()
        # 多头注意力层
        self.mha = MultiHeadAttention(num_heads, d_model)
        self.dropout1 = layers.Dropout(rate)
        self.ln1 = layers.LayerNormalization(epsilon=1e-6)
        # 前馈网络层
        self.ffn = FeedForwardNetwork(d_model, d_ff)
        self.dropout2 = layers.Dropout(rate)
        self.ln2 = layers.LayerNormalization(epsilon=1e-6)

    def __call__(self, x, padding_mask, training):
        # (batch_size, seq_len, d_model)=>(batch_size, seq_len, d_model)
        mha_output, _ = self.mha(x, x, x, padding_mask)
        dropout_output1 = self.dropout1(mha_output, training=training)
        ln_output1 = self.ln1(x + dropout_output1)
        # (batch_size, seq_len, d_model)=>(batch_size, seq_len, d_model)
        ffn_output = self.ffn(ln_output1)
        dropout_output2 = self.dropout2(ffn_output, training=training)
        ln_output2 = self.ln2(ln_output1 + dropout_output2)
        return ln_output2
BERT模型

BERT的主要部分由多个Transformer中Encoder堆叠而成,结构如下:

实现代码:

class BERT(tf.keras.Model):
    def __init__(self, vocab_size, segment_size, seq_len, num_heads, d_model, d_ff, rate=0.1):
        super(BERT, self).__init__()
        self.embedding = Embedding(vocab_size, segment_size, seq_len, d_model, rate)
        self.encoders = [Encoder(num_heads, d_model, d_ff, rate) for _ in range(2)]
        self.dense = layers.Dense(vocab_size, activation="softmax")

    def __call__(self, x, segment_ids, padding_mask, training):
        output = self.embedding(x, segment_ids, training)
        for encoder in self.encoders:
            output = encoder(output, padding_mask, training)
        output = self.dense(output)
        return output
测试代码

一个基于BERT的MLM任务的例子:
bert.py:

import numpy as np
import tensorflow as tf
from tensorflow.keras import layers


# 缩放点积注意力层
# 计算公式:
# attention(q,k,v)=soft(q*kT/sqrt(dk)+mask)*v
def scaled_dot_product_attention(q, k, v, mask):
    # q*kT
    matmul_qk = tf.matmul(q, k, transpose_b=True)
    # 使用dk进行缩放,dk=k_dim
    dk = tf.shape(k)[-1]
    scaled_attention = matmul_qk / tf.sqrt(tf.cast(dk, tf.float32))
    # 添加掩码
    # 如果序列中后面是被PAD填充的怎么办?因为PAD只是为了并行化计算而填补的部分,它不应该含有任何信息。
    # 所以我们需要加上一个Mask,Mask的形状为[batch, 1, 1, seq_len],补零的地方为1,其余为0。
    # 乘上-1e9之后,数值上没有PAD的列均为0,PAD的列为一个很大的负数。
    # 这样,经过softmax之后,被Mask的地方就近似等于0了,再乘上V的时候,就不会注意(或者说融合)PAD的信息了。
    if mask is not None:
        scaled_attention += (mask * -1e9)
    # 获取attention weights矩阵
    attention_weights = tf.nn.softmax(scaled_attention, axis=-1)
    # 获取attention矩阵
    attention = tf.matmul(attention_weights, v)
    return attention, attention_weights


# 多头注意力层
class MultiHeadAttention(tf.keras.layers.Layer):
    # d_model为词向量的维数
    # num_heads为头的数量,也就是num_heads个q,k,v个矩阵
    # seq_len为句子的长度
    def __init__(self, num_heads, d_model):
        super(MultiHeadAttention, self).__init__()
        self.num_heads = num_heads
        self.d_model = d_model
        # k_dim = dk
        self.k_dim = d_model // num_heads
        # 全连接层
        self.wq = layers.Dense(d_model)
        self.wk = layers.Dense(d_model)
        self.wv = layers.Dense(d_model)
        # 全连接层
        self.dense = tf.keras.layers.Dense(d_model)

    # 分离出多个头
    def split_heads(self, x, batch_size):
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.k_dim))
        return tf.transpose(x, perm=[0, 2, 1, 3])

    # 输入矩阵形状:
    # q(..., seq_len_q, k_dim)
    # k(..., seq_len_k, k_dim)
    # v(..., seq_len_v, v_dim)
    # mask(..., seq_len, seq_len)
    # 输出矩阵形状
    # attention(..., seq_len_q, v_dim)
    # attention_weights(..., seq_len_q, seq_len_k)
    def __call__(self, q, k, v, mask):
        batch_size = tf.shape(q)[0]
        # (batch_size, seq_len, d_model)=>(batch_size, seq_len, d_model)
        x_q = self.wq(q)
        x_k = self.wk(k)
        x_v = self.wv(v)
        #
        # (batch_size, seq_len, d_model)=>(batch_size, num_heads, seq_len, k_dim)
        q = self.split_heads(x_q, batch_size)
        k = self.split_heads(x_k, batch_size)
        v = self.split_heads(x_v, batch_size)
        # attention:(batch_size, num_heads, seq_len, k_dim)
        # attention_weights:(batch_size, num_heads, seq_len, seq_len)
        attention, attention_weights = scaled_dot_product_attention(q, k, v, mask)
        # (batch_size, num_heads, seq_len, k_dim)=>(batch_size, seq_len, num_heads, k_dim)
        attention = tf.transpose(attention, perm=[0, 2, 1, 3])
        # (batch_size, seq_len, num_heads, k_dim)=>(batch_size, seq_len, d_model)
        attention = tf.reshape(attention, (batch_size, -1, self.d_model))
        # (batch_size, seq_len, d_model)=>(batch_size, seq_len, d_model)
        attention = self.dense(attention)
        return attention, attention_weights


# 前馈神经网络层
class FeedForwardNetwork(tf.keras.layers.Layer):
    def __init__(self, d_model, d_ff):
        super(FeedForwardNetwork, self).__init__()
        self.dense1 = layers.Dense(d_ff, activation="relu")
        self.dense2 = layers.Dense(d_model)

    def __call__(self, x):
        output = self.dense1(x)
        output = self.dense2(output)
        return output


# # 测试代码
# sample_fnn = FeedForwardNetwork(512, 2048)
# print(sample_fnn(tf.random.uniform((64, 50, 512))).shape)


# 编码器层
class Encoder(tf.keras.layers.Layer):
    def __init__(self, num_heads, d_model, d_ff, rate=0.1):
        super(Encoder, self).__init__()
        # 多头注意力层
        self.mha = MultiHeadAttention(num_heads, d_model)
        self.dropout1 = layers.Dropout(rate)
        self.ln1 = layers.LayerNormalization(epsilon=1e-6)
        # 前馈网络层
        self.ffn = FeedForwardNetwork(d_model, d_ff)
        self.dropout2 = layers.Dropout(rate)
        self.ln2 = layers.LayerNormalization(epsilon=1e-6)

    def __call__(self, x, padding_mask, training):
        # (batch_size, seq_len, d_model)=>(batch_size, seq_len, d_model)
        mha_output, _ = self.mha(x, x, x, padding_mask)
        dropout_output1 = self.dropout1(mha_output, training=training)
        ln_output1 = self.ln1(x + dropout_output1)
        # (batch_size, seq_len, d_model)=>(batch_size, seq_len, d_model)
        ffn_output = self.ffn(ln_output1)
        dropout_output2 = self.dropout2(ffn_output, training=training)
        ln_output2 = self.ln2(ln_output1 + dropout_output2)
        return ln_output2


# 固定位置编码
def get_position_embedding(sentence_length, d_model):
    pos = np.expand_dims(np.arange(sentence_length), axis=-1)
    i = np.expand_dims(np.arange(d_model), axis=0)
    result = pos * np.power(10000, np.float32(2 * (i // 2) / d_model))
    # 使用sin函数计算偶数列的向量元素
    result_sin = np.sin(result[:, 0::2])
    # 使用cos函数计算奇数列的向量元素
    result_cos = np.cos(result[:, 1::2])
    # 将偶数列的奇数列的元素连接在一起
    position_embedding = np.concatenate((result_sin, result_cos), axis=-1)
    # 增加数组的维数,最终形状为(1, sentence_length, d_model)
    position_embedding = np.expand_dims(position_embedding, axis=0)
    # 将numpy数组转换成tensorflow张量形式
    return tf.cast(position_embedding, tf.float32)


# 可学习位置编码
class PositionEmbedding(layers.Layer):
    def __init__(self, seq_len, d_model):
        super(PositionEmbedding, self).__init__()
        self.embedding = layers.Embedding(seq_len, d_model)

    def __call__(self, x):
        position_ids = tf.range(x.shape[1], dtype=tf.int32)[tf.newaxis, :]
        position_embeddings = self.embedding(position_ids)
        return position_embeddings


class Embedding(layers.Layer):
    def __init__(self, vocab_size, segment_size, seq_len, d_model, rate=0.1):
        super(Embedding, self).__init__()
        self.seq_len = seq_len
        self.d_model = d_model
        self.token_embedding = layers.Embedding(vocab_size, d_model)
        self.segment_embedding = layers.Embedding(segment_size, d_model)
        self.position_embedding = PositionEmbedding(seq_len, d_model)

        self.ln = layers.LayerNormalization(epsilon=1e-6)
        self.dropout = layers.Dropout(rate)

    def __call__(self, x, segment_ids, training):
        # 将输入的字的信息映射到嵌入空间
        token_embedding_output = self.token_embedding(x)
        # 将输入的分句的信息映射到嵌入空间,表示有多少句话
        segment_embedding_output = self.segment_embedding(segment_ids)
        # 将输入的位置信息映射到嵌入空间
        position_embedding_output = self.position_embedding(x)
        # 进行相加
        output = token_embedding_output + segment_embedding_output + position_embedding_output

        output = self.ln(output)
        output = self.dropout(output, training=training)
        return output


class BERT(tf.keras.Model):
    def __init__(self, vocab_size, segment_size, seq_len, num_heads, d_model, d_ff, rate=0.1):
        super(BERT, self).__init__()
        self.embedding = Embedding(vocab_size, segment_size, seq_len, d_model, rate)
        self.encoders = [Encoder(num_heads, d_model, d_ff, rate) for _ in range(2)]
        self.dense = layers.Dense(vocab_size, activation="softmax")

    def __call__(self, x, segment_ids, padding_mask, training):
        output = self.embedding(x, segment_ids, training)
        for encoder in self.encoders:
            output = encoder(output, padding_mask, training)
        output = self.dense(output)
        return output


# # 测试代码
# bert = BERT(1000, 2, 40, 4, 128, 256)
# x = tf.ones((64, 40))
# bert(x, padding_mask=None, training=False)

train.py

import time

import numpy as np

import train_data1
import tensorflow as tf
from tensorflow.keras import losses, optimizers, metrics
from BERT.bert import BERT

train_x, train_y, weights, vocab_size, tokenizer = train_data1.get_data()

# 模型
bert = BERT(vocab_size=vocab_size + 1,
            segment_size=1,
            seq_len=40,
            num_heads=4,
            d_model=128,
            d_ff=512,
            rate=0.1)

# 损失函数
loss_fn = losses.SparseCategoricalCrossentropy()
# 优化器
optimizer = optimizers.Adam(0.001)
# 指标
train_loss = metrics.Mean()
train_accuracy = metrics.SparseCategoricalAccuracy()

directory = ".\checkpoint"
checkpoint = tf.train.Checkpoint(bert=bert, optimizer=optimizer)
checkpoint_manager = tf.train.CheckpointManager(checkpoint, directory, max_to_keep=3)

if checkpoint_manager.latest_checkpoint:
    checkpoint.restore(checkpoint_manager.latest_checkpoint)


def create_padding_mark(seq):
    # 获取为0的padding项
    seq = tf.cast(tf.math.equal(seq, 0), tf.float32)
    # 扩充维度以便用于attention矩阵
    return seq[:, np.newaxis, np.newaxis, :]


@tf.function
def train_step(input_x, input_y):
    segment_ids = tf.ones_like(input_x)
    padding_mask = create_padding_mark(input_x)
    with tf.GradientTape() as tape:
        predictions = bert(input_x,
                           segment_ids,
                           padding_mask,
                           True)
        loss = loss_fn(input_y, predictions)
    gradients = tape.gradient(loss, bert.trainable_variables)
    optimizer.apply_gradients(zip(gradients, bert.trainable_variables))

    train_loss(loss)
    train_accuracy(input_y, predictions)


def train(epochs=100):
    for epoch in range(1, epochs + 1):
        start = time.time()
        train_loss.reset_state()
        train_accuracy.reset_state()
        train_step(train_x, train_y)
        print("epoch:{} loss:{:.6f} accuracy:{:.6f}".format(epoch, train_loss.result(), train_accuracy.result()))
        end = time.time()
        print("times in 1 epoch:{:.6f} secs".format(end - start))
        if epoch % 10 == 0:
            path = checkpoint_manager.save()
            print("save model at {}".format(path))


def evaluate():
    inputs = [413, 1, 120, 121, 9, 29, 51, 1, 122, 3, 123, 124, 52, 1, 53, 54]
    inputs = tf.expand_dims(inputs, axis=0)
    # 句子长度最大为40,每个循环翻译出一个单词
    segment_ids = tf.ones_like(inputs)
    padding_mask = create_padding_mark(inputs)
    predictions = bert(inputs, segment_ids, padding_mask, False)
    # 获取单词对应的id
    predictions_id = tf.cast(tf.argmax(predictions, axis=-1), dtype=tf.int32)
    return predictions_id


train()
output = evaluate()
print(output)
print(tokenizer.sequences_to_texts(output.numpy()))

data.py

import numpy as np
from tensorflow.keras import preprocessing

path = "./data/sample_data.txt"
tokenizer = preprocessing.text.Tokenizer()
text_list = []
with open(path, 'r') as lines:
    for line in lines:
        text = preprocessing.text.text_to_word_sequence(line)
        tokenizer.fit_on_texts(text)
        text_list.append(text)
vocab_size = len(tokenizer.word_index)

id_list = [tokenizer.texts_to_sequences(text) for text in text_list]
padding_ids = preprocessing.sequence.pad_sequences(id_list, 40, padding="post", truncating="post")
padding_ids = np.squeeze(padding_ids, axis=-1)

train_y = padding_ids.copy()
mask = np.random.rand(*padding_ids.shape) < 0.15
mask[padding_ids <= 0] = False

labels = -1 * np.ones(padding_ids.shape, dtype=int)
labels[mask] = padding_ids[mask]

weights = np.ones(padding_ids.shape, dtype=int)
weights[labels == -1] = 0

mask1 = mask & (np.random.rand(*padding_ids.shape) < 0.90)
print(padding_ids.shape)
print(*padding_ids.shape)
padding_ids[mask1] = vocab_size + 1
mask2 = mask1 & (np.random.rand(*padding_ids.shape) < 1 / 9)
padding_ids[mask2] = np.random.randint(1, vocab_size + 1, mask2.sum())

train_x = padding_ids


def get_data():
    return train_x, train_y, weights, vocab_size, tokenizer

data1.py

import random

import tensorflow as tf
from tensorflow.keras import preprocessing


# 数据预处理
# 1、给定输入文本,添加[MASK]、[CLS]、[SEP]标记
# 2、获取输入文本对应的分句信息
# 3、格式化数据,使用inputs、segment_ids作为训练数据
tokenizer = preprocessing.text.Tokenizer()
# 预处理前的text
text_list = []
# 预处理后的text
text_list1 = []
with open("./data/sample_data.txt", 'r') as lines:
    for line in lines:
        line = line.strip()
        line = line.replace('.', '')
        text = preprocessing.text.text_to_word_sequence(line)
        tokenizer.fit_on_texts(text)
        split_list = line.split(",")
        text_list.append(split_list)
    tokenizer.fit_on_texts(["[MASK]", "[CLS]", "[SEP]"])
# 在句子开头加上[CLS],在分句后加上[SEP]
for text in text_list:
    for i in range(len(text)):
        text[i] += " [SEP]"  # 前面有一个空格,如果没有会出错
    text.insert(0, "[CLS] ")  # 后面有一个空格,如果没有会出错
    texts = ""
    for token in text:
        texts += token
    text_list1.append(texts)
sequences = tokenizer.texts_to_sequences(text_list1)
# 添加mask
vocab_size = len(tokenizer.word_index) - 3
mask_id = tokenizer.word_index.get("mask")
sep_id = tokenizer.word_index.get("sep")
cls_id = tokenizer.word_index.get("cls")
print(sequences)
for sequence in sequences:
    for i in range(len(sequence)):
        if sequence[i] == sep_id or sequence[i] == cls_id:
            continue
        prob = random.random()
        if prob > 0.85:
            prob = (1 - prob) / 0.15
            if prob < 0.8:
                sequence[i] = mask_id
            elif prob < 0.9:
                sequence[i] = random.randint(1, vocab_size)
padding_value = 0
segments = []
# 获取对应的分句信息
for data in sequences:
    sid = 1
    s_list = []
    for i in range(len(data)):
        s_list.append(sid)
        # 如果为[SEP]
        if data[i] == sep_id:
            sid += 1
    segments.append(s_list)

inputs = preprocessing.sequence.pad_sequences(sequences,
                                              maxlen=40,
                                              padding="post",
                                              truncating="post",
                                              value=padding_value)
segment_ids = preprocessing.sequence.pad_sequences(segments,
                                                   maxlen=40,
                                                   padding="post",
                                                   truncating="post",
                                                   value=padding_value + 1)

测试结果

输入数据:[413, 1, 120, 121, 9, 29, 51, 1, 122, 3, 123, 124, 52, 1, 53, 54]
对应字符串:[mask] the heavy showers had washed away the debris and dust heaps before the cabin doors
输出数据:[119 1 120 121 9 29 51 1 122 3 123 124 52 1 53 54]
对应字符串:where the heavy showers had washed away the debris and dust heaps before the cabin doors

从结果可以看出,通过多轮训练,模型有准确预测mask处的能力。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/876188.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-13
下一篇 2022-05-13

发表评论

登录后才能评论

评论列表(0条)

保存