介绍
在本文中,我们将探索Transformer架构及其所有组件。我将使用PyTorch构建所有必要的结构和模块。
让我们开始导入所有必要的库。
# Importing libraries
# PyTorch
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader, random_split
from torch.utils.tensorboard import SummaryWriter
# Math
import math
# HuggingFace libraries
from datasets import load_dataset
from tokenizers import Tokenizer
from tokenizers.models import WordLevel
from tokenizers.trainers import WordLevelTrainer
from tokenizers.pre_tokenizers import Whitespace
# Pathlib
from pathlib import Path
# typing
from typing import Any
# Library for progress bars in loops
from tqdm import tqdm
# Importing library of warnings
import warnings
在开始编码之前,让我们来看一下 Transformer 架构。
Transformer架构主要包括两个模块:编码器和解码器。让我们进一步了解一下。
编码器:它拥有一个多头注意力机制和完全连接的前馈网络。两个子层周围还有残差连接,加上每个子层输出的层标准化。模型中的所有子层和嵌入层都产生维度?_?????=512的输出。
解码器:解码器的结构与编码器相似,但它增加了第三个子层,该子层对编码块的输出执行多头注意力操作。在解码块的自注意子层中也进行了修改,以避免位置对后续位置的关注。这种掩蔽确保了位置?的预测仅依赖于小于?的位置的已知输出。
编码器和解码器块都重复?次。在原始论文中,他们定义了?=6,我们在这个笔记本中也将定义一个相似的值。
输入嵌入
当我们观察上面的Transformer架构图像时,我们可以看到嵌入表示两个块的第一步。
下面的InputEmbedding类负责将输入文本转换为d_model维的数值向量。为了防止我们的输入嵌入变得极小,我们通过乘以√?_?????对其进行规范化。
在下面的图像中,我们可以看到嵌入是如何创建的。首先,我们有一个句子被分割成符号,我们稍后将探讨符号是什么。然后,令牌ID (标识号 )被转换为嵌入,这些嵌入是高维向量。
# Creating Input Embeddings
class InputEmbeddings(nn.Module):
def __init__(self, d_model: int, vocab_size: int):
super().__init__()
self.d_model = d_model # Dimension of vectors (512)
self.vocab_size = vocab_size # Size of the vocabulary
self.embedding = nn.Embedding(vocab_size, d_model) # PyTorch layer that converts integer indices to dense embeddings
def forward(self, x):
return self.embedding(x) * math.sqrt(self.d_model) # Normalizing the variance of the embeddings
位置编码
在原始论文中,作者将位置编码添加到编码器和解码器块底部的输入嵌入中,因此模型可以获取有关序列中的标记的相对或绝对位置的信息。位置编码与嵌入有相同的维度 ?_model,这样两个向量就可以相加,我们就可以结合词嵌入的语义内容和位置编码的位置信息。
在下面呢,我们将创建一个尺寸为 的PositionalEncoding位置编码矩阵。我们首先用0填充它。然后,我们将正弦函数应用于位置编码矩阵的偶数索引,而余弦函数应用于奇数索引。pe(seq_len, d_model)
我们应用正弦和余弦函数,因为这允许模型根据序列中其他单词的位置来确定一个单词的位置,因为对于任何固定的偏移量?,??ₚₒₛ₊ₖ可以被表示为??ₚₒₛ的线性函数。这是由正弦和余弦函数的属性决定的,其中输入的移位会导致输出中预测的变化。
# Creating the Positional Encoding
class PositionalEncoding(nn.Module):
def __init__(self, d_model: int, seq_len: int, dropout: float) -> None:
super().__init__()
self.d_model = d_model # Dimensionality of the model
self.seq_len = seq_len # Maximum sequence length
self.dropout = nn.Dropout(dropout) # Dropout layer to prevent overfitting
# Creating a positional encoding matrix of shape (seq_len, d_model) filled with zeros
pe = torch.zeros(seq_len, d_model)
# Creating a tensor representing positions (0 to seq_len - 1)
position = torch.arange(0, seq_len, dtype = torch.float).unsqueeze(1) # Transforming 'position' into a 2D tensor['seq_len, 1']
# Creating the division term for the positional encoding formula
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
# Apply sine to even indices in pe
pe[:, 0::2] = torch.sin(position * div_term)
# Apply cosine to odd indices in pe
pe[:, 1::2] = torch.cos(position * div_term)
# Adding an extra dimension at the beginning of pe matrix for batch handling
pe = pe.unsqueeze(0)
# Registering 'pe' as buffer. Buffer is a tensor not considered as a model parameter
self.register_buffer('pe', pe)
def forward(self,x):
# Addind positional encoding to the input tensor X
x = x + (self.pe[:, :x.shape[1], :]).requires_grad_(False)
return self.dropout(x) # Dropout for regularization
层归一化
在我们观察编码器和解码器块时,我们看到了几个称作“加法 & 规范化”(Add & Norm)的规范化层。
下面的LayerNormalization类对输入数据进行层规范化。在其前向传递中,我们计算输入数据的平均值和标准偏差。然后,通过减去平均值,并除以标准偏差加上一个称为epsilon的小数来避免任何0的除数,对输入数据进行规范化。这个过程产生了一个规范化的输出,具有0的均值和1的标准偏差。
然后,我们将规范化的输出通过一个可学习的参数alpha进行缩放,并添加一个称为偏置的可学习参数。训练过程负责调整这些参数。最终结果是一个层规范化的张量,它确保了网络中各层输入的规模是一致的。
# Creating Layer Normalization
class LayerNormalization(nn.Module):
def __init__(self, eps: float = 10**-6) -> None: # We define epsilon as 0.000001 to avoid division by zero
super().__init__()
self.eps = eps
# We define alpha as a trainable parameter and initialize it with ones
self.alpha = nn.Parameter(torch.ones(1)) # One-dimensional tensor that will be used to scale the input data
# We define bias as a trainable parameter and initialize it with zeros
self.bias = nn.Parameter(torch.zeros(1)) # One-dimensional tenso that will be added to the input data
def forward(self, x):
mean = x.mean(dim = -1, keepdim = True) # Computing the mean of the input data. Keeping the number of dimensions unchanged
std = x.std(dim = -1, keepdim = True) # Computing the standard deviation of the input data. Keeping the number of dimensions unchanged
# Returning the normalized input
return self.alpha * (x-mean) / (std + self.eps) + self.bias
前馈网络
在完全连接的前馈网络中,我们进行了两次线性变换,在两次变换之间使用了ReLU激活函数。我们可以用数学公式来表示这个操作:
W₁ 和 W₂ 是权重,而 b₁ 和 b₂ 是两个线性变换的偏差。
在下面的 FeedForwardBlock 中,我们将定义两个线性变换——self.linear_1 和 self.linear_2以及内层 d_ff。输入数据首先会通过 self.linear_1 变换,这增加了它的维度,从 d_model 到 d_ff。此操作的输出通过 ReLU 激活函数,引入非线性,以便网络能够学习更复杂的模式,然后应用 self.dropout 层来缓解过拟合现象。最后的操作是对 dropout 修改后的张量进行 self.linear_2 变换,将其变回原始的 d_model 维度。
# Creating Feed Forward Layers
class FeedForwardBlock(nn.Module):
def __init__(self, d_model: int, d_ff: int, dropout: float) -> None:
super().__init__()
# First linear transformation
self.linear_1 = nn.Linear(d_model, d_ff) # W1 & b1
self.dropout = nn.Dropout(dropout) # Dropout to prevent overfitting
# Second linear transformation
self.linear_2 = nn.Linear(d_ff, d_model) # W2 & b2
def forward(self, x):
# (Batch, seq_len, d_model) --> (batch, seq_len, d_ff) -->(batch, seq_len, d_model)
return self.linear_2(self.dropout(torch.relu(self.linear_1(x))))
多头注意力
多头注意力(Multi-Head Attention)是Transformer最关键的部件。它负责帮助模型理解数据中的复杂关系和模式。
下面的图片展示了多头注意力是如何工作的。它没有包括批次维度,因为它只是为了演示单个句子的处理过程。
多头注意力模块接收输入数据并将其拆分成查询(queries)、键(keys)和值(values),这些数据被组织成矩阵?、?和?。每个矩阵包含了输入的不同方面,并且它们与输入具有相同的维度。
然后,我们分别用各自的权重矩阵?^?、?^?和?^?对每个矩阵进行线性变换。这些变化将产生新的矩阵?′、?′和?′,这些矩阵将被分割成对应于不同头部ℎ的更小矩阵,从而允许模型并行地关注不同表示子空间中的信息。这种分割为每个头部创建了多组查询、键和值。
最后,我们将每个头部连接起来形成一个?矩阵,然后通过另一个权重矩阵??进行变换,以产出多头注意力的输出,一个保持输入维度的矩阵??−?。
# Creating the Multi-Head Attention block
class MultiHeadAttentionBlock(nn.Module):
def __init__(self, d_model: int, h: int, dropout: float) -> None: # h = number of heads
super().__init__()
self.d_model = d_model
self.h = h
# We ensure that the dimensions of the model is divisible by the number of heads
assert d_model % h == 0, 'd_model is not divisible by h'
# d_k is the dimension of each attention head's key, query, and value vectors
self.d_k = d_model // h # d_k formula, like in the original "Attention Is All You Need" paper
# Defining the weight matrices
self.w_q = nn.Linear(d_model, d_model) # W_q
self.w_k = nn.Linear(d_model, d_model) # W_k
self.w_v = nn.Linear(d_model, d_model) # W_v
self.w_o = nn.Linear(d_model, d_model) # W_o
self.dropout = nn.Dropout(dropout) # Dropout layer to avoid overfitting
@staticmethod
def attention(query, key, value, mask, dropout: nn.Dropout):# mask => When we want certain words to NOT interact with others, we "hide" them
d_k = query.shape[-1] # The last dimension of query, key, and value
# We calculate the Attention(Q,K,V) as in the formula in the image above
attention_scores = (query @ key.transpose(-2,-1)) / math.sqrt(d_k) # @ = Matrix multiplication sign in PyTorch
# Before applying the softmax, we apply the mask to hide some interactions between words
if mask is not None: # If a mask IS defined...
attention_scores.masked_fill_(mask == 0, -1e9) # Replace each value where mask is equal to 0 by -1e9
attention_scores = attention_scores.softmax(dim = -1) # Applying softmax
if dropout is not None: # If a dropout IS defined...
attention_scores = dropout(attention_scores) # We apply dropout to prevent overfitting
return (attention_scores @ value), attention_scores # Multiply the output matrix by the V matrix, as in the formula
def forward(self, q, k, v, mask):
query = self.w_q(q) # Q' matrix
key = self.w_k(k) # K' matrix
value = self.w_v(v) # V' matrix
# Splitting results into smaller matrices for the different heads
# Splitting embeddings (third dimension) into h parts
query = query.view(query.shape[0], query.shape[1], self.h, self.d_k).transpose(1,2) # Transpose => bring the head to the second dimension
key = key.view(key.shape[0], key.shape[1], self.h, self.d_k).transpose(1,2) # Transpose => bring the head to the second dimension
value = value.view(value.shape[0], value.shape[1], self.h, self.d_k).transpose(1,2) # Transpose => bring the head to the second dimension
# Obtaining the output and the attention scores
x, self.attention_scores = MultiHeadAttentionBlock.attention(query, key, value, mask, self.dropout)
# Obtaining the H matrix
x = x.transpose(1, 2).contiguous().view(x.shape[0], -1, self.h * self.d_k)
return self.w_o(x) # Multiply the H matrix by the weight matrix W_o, resulting in the MH-A matrix
残差连接
当我们观察Transformer的架构时,我们看到每个子层,包括自注意力和前馈神经网络块,都在将其输出通过加法和标准化层之前,会先将其输出加到其输入上。这种方法在加法和标准化层中将输出与原始输入结合起来。这个过程被称为跳跃连接,它使得Transformer能够通过在反向传播过程中为梯度流提供一条捷径,从而更有效地训练深度网络。
下面的ResidualConnection类负责这个过程。
# Building Residual Connection
class ResidualConnection(nn.Module):
def __init__(self, dropout: float) -> None:
super().__init__()
self.dropout = nn.Dropout(dropout) # We use a dropout layer to prevent overfitting
self.norm = LayerNormalization() # We use a normalization layer
def forward(self, x, sublayer):
# We normalize the input and add it to the original input 'x'. This creates the residual connection process.
return x + self.dropout(sublayer(self.norm(x)))
编码器
我们现在将构建编码器。我们创建了EncoderBlock类,它包括多头注意力和前馈层,以及残差连接。
在原始论文中,编码器模块重复六次。我们创建编码器类作为多个编码器模块的组合。在通过其所有模块处理输入后,我们还添加了层归一化作为最后一步。
# Building Encoder Block
class EncoderBlock(nn.Module):
# This block takes in the MultiHeadAttentionBlock and FeedForwardBlock, as well as the dropout rate for the residual connections
def __init__(self, self_attention_block: MultiHeadAttentionBlock, feed_forward_block: FeedForwardBlock, dropout: float) -> None:
super().__init__()
# Storing the self-attention block and feed-forward block
self.self_attention_block = self_attention_block
self.feed_forward_block = feed_forward_block
self.residual_connections = nn.ModuleList([ResidualConnection(dropout) for _ in range(2)]) # 2 Residual Connections with dropout
def forward(self, x, src_mask):
# Applying the first residual connection with the self-attention block
x = self.residual_connections[0](x, lambda x: self.self_attention_block(x, x, x, src_mask)) # Three 'x's corresponding to query, key, and value inputs plus source mask
# Applying the second residual connection with the feed-forward block
x = self.residual_connections[1](x, self.feed_forward_block)
return x # Output tensor after applying self-attention and feed-forward layers with residual connections.
# Building Encoder
# An Encoder can have several Encoder Blocks
class Encoder(nn.Module):
# The Encoder takes in instances of 'EncoderBlock'
def __init__(self, layers: nn.ModuleList) -> None:
super().__init__()
self.layers = layers # Storing the EncoderBlocks
self.norm = LayerNormalization() # Layer for the normalization of the output of the encoder layers
def forward(self, x, mask):
# Iterating over each EncoderBlock stored in self.layers
for layer in self.layers:
x = layer(x, mask) # Applying each EncoderBlock to the input tensor 'x'
return self.norm(x) # Normalizing output
解码器
同样地,解码器也由若干个解码器模块组成,在原始论文中这些模块重复六次。主要区别在于它增加了一个额外子层,该子层执行多头注意力操作,并带有交叉关注(cross-attention)的成分,使用编码器的输出作为其键(keys)和值(values),同时使用解码器输入作为查询(queries)。
对于输出嵌入,我们可以使用与编码器相同的InputEmbeddings类。你还会注意到,自注意力子层是被遮蔽的,这限制了模型访问序列中的未来元素。
我们将从构建DecoderBlock类开始,然后我们将构建Decoder类,它将组装多个DecoderBlock。
# Building Decoder Block
class DecoderBlock(nn.Module):
# The DecoderBlock takes in two MultiHeadAttentionBlock. One is self-attention, while the other is cross-attention.
# It also takes in the feed-forward block and the dropout rate
def __init__(self, self_attention_block: MultiHeadAttentionBlock, cross_attention_block: MultiHeadAttentionBlock, feed_forward_block: FeedForwardBlock, dropout: float) -> None:
super().__init__()
self.self_attention_block = self_attention_block
self.cross_attention_block = cross_attention_block
self.feed_forward_block = feed_forward_block
self.residual_connections = nn.ModuleList([ResidualConnection(dropout) for _ in range(3)]) # List of three Residual Connections with dropout rate
def forward(self, x, encoder_output, src_mask, tgt_mask):
# Self-Attention block with query, key, and value plus the target language mask
x = self.residual_connections[0](x, lambda x: self.self_attention_block(x, x, x, tgt_mask))
# The Cross-Attention block using two 'encoder_ouput's for key and value plus the source language mask. It also takes in 'x' for Decoder queries
x = self.residual_connections[1](x, lambda x: self.cross_attention_block(x, encoder_output, encoder_output, src_mask))
# Feed-forward block with residual connections
x = self.residual_connections[2](x, self.feed_forward_block)
return x
# Building Decoder
# A Decoder can have several Decoder Blocks
class Decoder(nn.Module):
# The Decoder takes in instances of 'DecoderBlock'
def __init__(self, layers: nn.ModuleList) -> None:
super().__init__()
# Storing the 'DecoderBlock's
self.layers = layers
self.norm = LayerNormalization() # Layer to normalize the output
def forward(self, x, encoder_output, src_mask, tgt_mask):
# Iterating over each DecoderBlock stored in self.layers
for layer in self.layers:
# Applies each DecoderBlock to the input 'x' plus the encoder output and source and target masks
x = layer(x, encoder_output, src_mask, tgt_mask)
return self.norm(x) # Returns normalized output
你可以在解码器图像中看到,在运行了一系列解码器块之后,我们有一个线性层和一个 Softmax 函数来输出概率。下面的 ProjectionLayer 类负责将模型的输出转换为覆盖词汇库的概率分布,在这个分布中,我们从可能的词汇库中选择每个输出令牌。
# Buiding Linear Layer
class ProjectionLayer(nn.Module):
def __init__(self, d_model: int, vocab_size: int) -> None: # Model dimension and the size of the output vocabulary
super().__init__()
self.proj = nn.Linear(d_model, vocab_size) # Linear layer for projecting the feature space of 'd_model' to the output space of 'vocab_size'
def forward(self, x):
return torch.log_softmax(self.proj(x), dim = -1) # Applying the log Softmax function to the output
构建变压器
至此,我们终于集齐了变压器架构的所有组件。现在,我们可以将所有部分集合起来构建变压器了。
在下面的变压器类中,我们将把模型架构的所有组件整合在一起。
# Creating the Transformer Architecture
class Transformer(nn.Module):
# This takes in the encoder and decoder, as well the embeddings for the source and target language.
# It also takes in the Positional Encoding for the source and target language, as well as the projection layer
def __init__(self, encoder: Encoder, decoder: Decoder, src_embed: InputEmbeddings, tgt_embed: InputEmbeddings, src_pos: PositionalEncoding, tgt_pos: PositionalEncoding, projection_layer: ProjectionLayer) -> None:
super().__init__()
self.encoder = encoder
self.decoder = decoder
self.src_embed = src_embed
self.tgt_embed = tgt_embed
self.src_pos = src_pos
self.tgt_pos = tgt_pos
self.projection_layer = projection_layer
# Encoder
def encode(self, src, src_mask):
src = self.src_embed(src) # Applying source embeddings to the input source language
src = self.src_pos(src) # Applying source positional encoding to the source embeddings
return self.encoder(src, src_mask) # Returning the source embeddings plus a source mask to prevent attention to certain elements
# Decoder
def decode(self, encoder_output, src_mask, tgt, tgt_mask):
tgt = self.tgt_embed(tgt) # Applying target embeddings to the input target language (tgt)
tgt = self.tgt_pos(tgt) # Applying target positional encoding to the target embeddings
# Returning the target embeddings, the output of the encoder, and both source and target masks
# The target mask ensures that the model won't 'see' future elements of the sequence
return self.decoder(tgt, encoder_output, src_mask, tgt_mask)
# Applying Projection Layer with the Softmax function to the Decoder output
def project(self, x):
return self.projection_layer(x)
架构终于准备就绪。我们现在定义一个名为 build_transformer 的函数,在这个函数中我们将定义参数以及我们需要的一切,以便为机器翻译任务构建一个完全可操作的 Transformer 模型。
我们将设置与原始论文《Attention Is All You Need》中相同的参数,其中 ?_????? = 512,? = 6,ℎ = 8,dropout rate ?_???? = 0.1,以及 ?_??= 2048。
# Building & Initializing Transformer
# Definin function and its parameter, including model dimension, number of encoder and decoder stacks, heads, etc.
def build_transformer(src_vocab_size: int, tgt_vocab_size: int, src_seq_len: int, tgt_seq_len: int, d_model: int = 512, N: int = 6, h: int = 8, dropout: float = 0.1, d_ff: int = 2048) -> Transformer:
# Creating Embedding layers
src_embed = InputEmbeddings(d_model, src_vocab_size) # Source language (Source Vocabulary to 512-dimensional vectors)
tgt_embed = InputEmbeddings(d_model, tgt_vocab_size) # Target language (Target Vocabulary to 512-dimensional vectors)
# Creating Positional Encoding layers
src_pos = PositionalEncoding(d_model, src_seq_len, dropout) # Positional encoding for the source language embeddings
tgt_pos = PositionalEncoding(d_model, tgt_seq_len, dropout) # Positional encoding for the target language embeddings
# Creating EncoderBlocks
encoder_blocks = [] # Initial list of empty EncoderBlocks
for _ in range(N): # Iterating 'N' times to create 'N' EncoderBlocks (N = 6)
encoder_self_attention_block = MultiHeadAttentionBlock(d_model, h, dropout) # Self-Attention
feed_forward_block = FeedForwardBlock(d_model, d_ff, dropout) # FeedForward
# Combine layers into an EncoderBlock
encoder_block = EncoderBlock(encoder_self_attention_block, feed_forward_block, dropout)
encoder_blocks.append(encoder_block) # Appending EncoderBlock to the list of EncoderBlocks
# Creating DecoderBlocks
decoder_blocks = [] # Initial list of empty DecoderBlocks
for _ in range(N): # Iterating 'N' times to create 'N' DecoderBlocks (N = 6)
decoder_self_attention_block = MultiHeadAttentionBlock(d_model, h, dropout) # Self-Attention
decoder_cross_attention_block = MultiHeadAttentionBlock(d_model, h, dropout) # Cross-Attention
feed_forward_block = FeedForwardBlock(d_model, d_ff, dropout) # FeedForward
# Combining layers into a DecoderBlock
decoder_block = DecoderBlock(decoder_self_attention_block, decoder_cross_attention_block, feed_forward_block, dropout)
decoder_blocks.append(decoder_block) # Appending DecoderBlock to the list of DecoderBlocks
# Creating the Encoder and Decoder by using the EncoderBlocks and DecoderBlocks lists
encoder = Encoder(nn.ModuleList(encoder_blocks))
decoder = Decoder(nn.ModuleList(decoder_blocks))
# Creating projection layer
projection_layer = ProjectionLayer(d_model, tgt_vocab_size) # Map the output of Decoder to the Target Vocabulary Space
# Creating the transformer by combining everything above
transformer = Transformer(encoder, decoder, src_embed, tgt_embed, src_pos, tgt_pos, projection_layer)
# Initialize the parameters
for p in transformer.parameters():
if p.dim() > 1:
nn.init.xavier_uniform_(p)
return transformer # Assembled and initialized Transformer. Ready to be trained and validated!
模型现在已经准备好进行训练!
分词器
对于我们的Transformer模型来说,分词是一个关键的预处理步骤。在这一步中,我们将原始文本转换为模型可以处理的数字格式。
有几种分词策略。我们将使用基于词级的分词,将句子中的每个词转换为一个token。
在对句子进行词元化之后,我们会根据在训练分词器期间,训练语料库中存在的已创建词汇表,将每个词元映射到一个独特的整数ID。每个整数代表词汇表中的一个特定单词。
除了训练语料库中的单词,变换器(Transformers)使用特殊的词元用于特定目的。以下是我们将立即定义的一些特殊词元:
• [UNK]:该词元用于标识序列中未知的单词。
• [PAD]:填充词元,以确保一批次中的所有序列长度相同,因此我们会用这个词元来填充较短的句子。我们使用注意力掩码来“告知”模型在训练期间忽略被填充的词元,因为它们对于任务没有任何实际含义。
• [SOS]:这是一个用来标识句子开始的词元。
• [EOS]:这是一个用来标识句子结束的词元。
在下面的 build_tokenizer 函数中,我们确保了分词器准备好了用于训练模型。它会检查是否存在一个已经训练好的分词器,如果不存在,它会训练一个新的分词器。
# Defining Tokenizer
def build_tokenizer(config, ds, lang):
# Crating a file path for the tokenizer
tokenizer_path = Path(config['tokenizer_file'].format(lang))
# Checking if Tokenizer already exists
if not Path.exists(tokenizer_path):
# If it doesn't exist, we create a new one
tokenizer = Tokenizer(WordLevel(unk_token = '[UNK]')) # Initializing a new world-level tokenizer
tokenizer.pre_tokenizer = Whitespace() # We will split the text into tokens based on whitespace
# Creating a trainer for the new tokenizer
trainer = WordLevelTrainer(special_tokens = ["[UNK]", "[PAD]",
"[SOS]", "[EOS]"], min_frequency = 2) # Defining Word Level strategy and special tokens
# Training new tokenizer on sentences from the dataset and language specified
tokenizer.train_from_iterator(get_all_sentences(ds, lang), trainer = trainer)
tokenizer.save(str(tokenizer_path)) # Saving trained tokenizer to the file path specified at the beginning of the function
else:
tokenizer = Tokenizer.from_file(str(tokenizer_path)) # If the tokenizer already exist, we load it
return tokenizer # Returns the loaded tokenizer or the trained tokenizer
加载数据集
对于这项任务,我们将使用Hugging Face上可用的OpusBooks数据集。这个数据集由两个特征组成,即id和translation。translation特征包含不同语言的句子对,例如西班牙语和葡萄牙语、英语和法语等等。
我们首先定义get_all_sentences函数来遍历数据集,并根据定义的语言对提取句子——我们稍后会做这个工作。
# Iterating through dataset to extract the original sentence and its translation
def get_all_sentences(ds, lang):
for pair in ds:
yield pair['translation'][lang]
get_ds 函数被定义为加载和准备数据集用于训练和验证。在这个函数中,我们构建或加载分词器,拆分数据集,并创建 DataLoaders,以便模型能够成功地批次迭代数据集。这些函数的结果是源语言和目标语言的分词器以及 DataLoader 对象。
def get_ds(config):
# Loading the train portion of the OpusBooks dataset.
# The Language pairs will be defined in the 'config' dictionary we will build later
ds_raw = load_dataset('opus_books', f'{config["lang_src"]}-{config["lang_tgt"]}', split = 'train')
# Building or loading tokenizer for both the source and target languages
tokenizer_src = build_tokenizer(config, ds_raw, config['lang_src'])
tokenizer_tgt = build_tokenizer(config, ds_raw, config['lang_tgt'])
# Splitting the dataset for training and validation
train_ds_size = int(0.9 * len(ds_raw)) # 90% for training
val_ds_size = len(ds_raw) - train_ds_size # 10% for validation
train_ds_raw, val_ds_raw = random_split(ds_raw, [train_ds_size, val_ds_size]) # Randomly splitting the dataset
# Processing data with the BilingualDataset class, which we will define below
train_ds = BilingualDataset(train_ds_raw, tokenizer_src, tokenizer_tgt, config['lang_src'], config['lang_tgt'], config['seq_len'])
val_ds = BilingualDataset(val_ds_raw, tokenizer_src, tokenizer_tgt, config['lang_src'], config['lang_tgt'], config['seq_len'])
# Iterating over the entire dataset and printing the maximum length found in the sentences of both the source and target languages
max_len_src = 0
max_len_tgt = 0
for pair in ds_raw:
src_ids = tokenizer_src.encode(pair['translation'][config['lang_src']]).ids
tgt_ids = tokenizer_src.encode(pair['translation'][config['lang_tgt']]).ids
max_len_src = max(max_len_src, len(src_ids))
max_len_tgt = max(max_len_tgt, len(tgt_ids))
print(f'Max length of source sentence: {max_len_src}')
print(f'Max length of target sentence: {max_len_tgt}')
# Creating dataloaders for the training and validadion sets
# Dataloaders are used to iterate over the dataset in batches during training and validation
train_dataloader = DataLoader(train_ds, batch_size = config['batch_size'], shuffle = True) # Batch size will be defined in the config dictionary
val_dataloader = DataLoader(val_ds, batch_size = 1, shuffle = True)
return train_dataloader, val_dataloader, tokenizer_src, tokenizer_tgt # Returning the DataLoader objects and tokenizers
我们定义了一个 casual_mask 函数来创建编码器的注意力机制所需的掩码。这个掩码防止模型获取序列中未来元素的信息。
我们首先创建一个填充了 1 的正方形网格。我们用 size 参数来确定网格的大小。然后,我们把主对角线以上的所有数字变成 0。一侧的每个数字变成 0,而其余的仍然保持 1。然后,函数翻转所有这些值,将 1 变成 0,将 0 变成 1。这个过程对于预测序列中未来标记的模型至关重要。
def casual_mask(size):
# Creating a square matrix of dimensions 'size x size' filled with ones
mask = torch.triu(torch.ones(1, size, size), diagonal = 1).type(torch.int)
return mask == 0
BilingualDataset类通过为数据集中的目标语言和源语言文本进行分词,并添加所有必要的特殊标记来处理这些文本。这个类还确保两种语言的句子都在最大序列长度之内,并对所有需要的句子进行填充。
class BilingualDataset(Dataset):
# This takes in the dataset contaning sentence pairs, the tokenizers for target and source languages, and the strings of source and target languages
# 'seq_len' defines the sequence length for both languages
def __init__(self, ds, tokenizer_src, tokenizer_tgt, src_lang, tgt_lang, seq_len) -> None:
super().__init__()
self.seq_len = seq_len
self.ds = ds
self.tokenizer_src = tokenizer_src
self.tokenizer_tgt = tokenizer_tgt
self.src_lang = src_lang
self.tgt_lang = tgt_lang
# Defining special tokens by using the target language tokenizer
self.sos_token = torch.tensor([tokenizer_tgt.token_to_id("[SOS]")], dtype=torch.int64)
self.eos_token = torch.tensor([tokenizer_tgt.token_to_id("[EOS]")], dtype=torch.int64)
self.pad_token = torch.tensor([tokenizer_tgt.token_to_id("[PAD]")], dtype=torch.int64)
# Total number of instances in the dataset (some pairs are larger than others)
def __len__(self):
return len(self.ds)
# Using the index to retrive source and target texts
def __getitem__(self, index: Any) -> Any:
src_target_pair = self.ds[index]
src_text = src_target_pair['translation'][self.src_lang]
tgt_text = src_target_pair['translation'][self.tgt_lang]
# Tokenizing source and target texts
enc_input_tokens = self.tokenizer_src.encode(src_text).ids
dec_input_tokens = self.tokenizer_tgt.encode(tgt_text).ids
# Computing how many padding tokens need to be added to the tokenized texts
# Source tokens
enc_num_padding_tokens = self.seq_len - len(enc_input_tokens) - 2 # Subtracting the two '[EOS]' and '[SOS]' special tokens
# Target tokens
dec_num_padding_tokens = self.seq_len - len(dec_input_tokens) - 1 # Subtracting the '[SOS]' special token
# If the texts exceed the 'seq_len' allowed, it will raise an error. This means that one of the sentences in the pair is too long to be processed
# given the current sequence length limit (this will be defined in the config dictionary below)
if enc_num_padding_tokens < 0 or dec_num_padding_tokens < 0:
raise ValueError('Sentence is too long')
# Building the encoder input tensor by combining several elements
encoder_input = torch.cat(
[
self.sos_token, # inserting the '[SOS]' token
torch.tensor(enc_input_tokens, dtype = torch.int64), # Inserting the tokenized source text
self.eos_token, # Inserting the '[EOS]' token
torch.tensor([self.pad_token] * enc_num_padding_tokens, dtype = torch.int64) # Addind padding tokens
]
)
# Building the decoder input tensor by combining several elements
decoder_input = torch.cat(
[
self.sos_token, # inserting the '[SOS]' token
torch.tensor(dec_input_tokens, dtype = torch.int64), # Inserting the tokenized target text
torch.tensor([self.pad_token] * dec_num_padding_tokens, dtype = torch.int64) # Addind padding tokens
]
)
# Creating a label tensor, the expected output for training the model
label = torch.cat(
[
torch.tensor(dec_input_tokens, dtype = torch.int64), # Inserting the tokenized target text
self.eos_token, # Inserting the '[EOS]' token
torch.tensor([self.pad_token] * dec_num_padding_tokens, dtype = torch.int64) # Adding padding tokens
]
)
# Ensuring that the length of each tensor above is equal to the defined 'seq_len'
assert encoder_input.size(0) == self.seq_len
assert decoder_input.size(0) == self.seq_len
assert label.size(0) == self.seq_len
return {
'encoder_input': encoder_input,
'decoder_input': decoder_input,
'encoder_mask': (encoder_input != self.pad_token).unsqueeze(0).unsqueeze(0).int(),
'decoder_mask': (decoder_input != self.pad_token).unsqueeze(0).unsqueeze(0).int() & casual_mask(decoder_input.size(0)),
'label': label,
'src_text': src_text,
'tgt_text': tgt_text
}
验证循环
我们现在将创建两个用于验证循环的函数。验证循环对于评估模型在翻译训练过程中未见过的数据的句子的性能至关重要。
我们将定义两个函数。第一个函数,greedy_decode,通过获取最可能的下一个令牌来给出模型的输出。第二个函数,run_validation,负责运行验证过程,在这个过程中我们对模型的输出进行解码并与目标句子的参考文本进行比较。
# Define function to obtain the most probable next token
def greedy_decode(model, source, source_mask, tokenizer_src, tokenizer_tgt, max_len, device):
# Retrieving the indices from the start and end of sequences of the target tokens
sos_idx = tokenizer_tgt.token_to_id('[SOS]')
eos_idx = tokenizer_tgt.token_to_id('[EOS]')
# Computing the output of the encoder for the source sequence
encoder_output = model.encode(source, source_mask)
# Initializing the decoder input with the Start of Sentence token
decoder_input = torch.empty(1,1).fill_(sos_idx).type_as(source).to(device)
# Looping until the 'max_len', maximum length, is reached
while True:
if decoder_input.size(1) == max_len:
break
# Building a mask for the decoder input
decoder_mask = casual_mask(decoder_input.size(1)).type_as(source_mask).to(device)
# Calculating the output of the decoder
out = model.decode(encoder_output, source_mask, decoder_input, decoder_mask)
# Applying the projection layer to get the probabilities for the next token
prob = model.project(out[:, -1])
# Selecting token with the highest probability
_, next_word = torch.max(prob, dim=1)
decoder_input = torch.cat([decoder_input, torch.empty(1,1). type_as(source).fill_(next_word.item()).to(device)], dim=1)
# If the next token is an End of Sentence token, we finish the loop
if next_word == eos_idx:
break
return decoder_input.squeeze(0) # Sequence of tokens generated by the decoder
# Defining function to evaluate the model on the validation dataset
# num_examples = 2, two examples per run
def run_validation(model, validation_ds, tokenizer_src, tokenizer_tgt, max_len, device, print_msg, global_state, writer, num_examples=2):
model.eval() # Setting model to evaluation mode
count = 0 # Initializing counter to keep track of how many examples have been processed
console_width = 80 # Fixed witdh for printed messages
# Creating evaluation loop
with torch.no_grad(): # Ensuring that no gradients are computed during this process
for batch in validation_ds:
count += 1
encoder_input = batch['encoder_input'].to(device)
encoder_mask = batch['encoder_mask'].to(device)
# Ensuring that the batch_size of the validation set is 1
assert encoder_input.size(0) == 1, 'Batch size must be 1 for validation.'
# Applying the 'greedy_decode' function to get the model's output for the source text of the input batch
model_out = greedy_decode(model, encoder_input, encoder_mask, tokenizer_src, tokenizer_tgt, max_len, device)
# Retrieving source and target texts from the batch
source_text = batch['src_text'][0]
target_text = batch['tgt_text'][0] # True translation
model_out_text = tokenizer_tgt.decode(model_out.detach().cpu().numpy()) # Decoded, human-readable model output
# Printing results
print_msg('-'*console_width)
print_msg(f'SOURCE: {source_text}')
print_msg(f'TARGET: {target_text}')
print_msg(f'PREDICTED: {model_out_text}')
# After two examples, we break the loop
if count == num_examples:
break
训练循环
我们准备在OpusBook数据集上训练我们的Transformer模型以执行英语到意大利语的翻译任务。
首先,我们通过定义get_model函数开始,该函数通过调用我们之前定义的build_transformer函数来加载模型。这个函数使用配置字典来设置一些参数。
# We pass as parameters the config dictionary, the length of the vocabylary of the source language and the target language
def get_model(config, vocab_src_len, vocab_tgt_len):
# Loading model using the 'build_transformer' function.
# We will use the lengths of the source language and target language vocabularies, the 'seq_len', and the dimensionality of the embeddings
model = build_transformer(vocab_src_len, vocab_tgt_len, config['seq_len'], config['seq_len'], config['d_model'])
return model
在接下来,我们将定义两个函数来配置我们的模型和训练过程。
在 get_config 函数中,我们定义了训练过程的关键参数。 batch_size 用于指定一次迭代中使用的训练样例数量, num_epochs 作为整个数据集在变压器上前向和后向传播的次数, lr 作为优化器的学习率,等等。我们还将最终定义来自 OpusBook 数据集的配对, 'lang_src': 'en' 用于选择英语作为源语言, 'lang_tgt': 'it' 用于选择意大利语作为目标语言。
get_weights_file_path 函数构建了用于保存或加载特定时期模型权重的文件路径。
# Define settings for building and training the transformer model
def get_config():
return{
'batch_size': 8,
'num_epochs': 20,
'lr': 10**-4,
'seq_len': 350,
'd_model': 512, # Dimensions of the embeddings in the Transformer. 512 like in the "Attention Is All You Need" paper.
'lang_src': 'en',
'lang_tgt': 'it',
'model_folder': 'weights',
'model_basename': 'tmodel_',
'preload': None,
'tokenizer_file': 'tokenizer_{0}.json',
'experiment_name': 'runs/tmodel'
}
# Function to construct the path for saving and retrieving model weights
def get_weights_file_path(config, epoch: str):
model_folder = config['model_folder'] # Extracting model folder from the config
model_basename = config['model_basename'] # Extracting the base name for model files
model_filename = f"{model_basename}{epoch}.pt" # Building filename
return str(Path('.')/ model_folder/ model_filename) # Combining current directory, the model folder, and the model filename
最终,我们定义了最后一个函数 train_model,该函数接受配置参数作为输入。
在这个函数中,我们将为训练做好一切准备。我们将把模型及其必要的组件加载到 GPU 上以加快训练速度,设定 Adam 优化器,并配置 CrossEntropyLoss 函数来计算模型输出的翻译和数据集中参考翻译之间的差异。
该函数包含了迭代训练批次、执行反向传播和计算梯度所需的每一个循环。我们还会使用它来运行验证函数并保存模型的当前状态。
def train_model(config):
# Setting up device to run on GPU to train faster
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device {device}")
# Creating model directory to store weights
Path(config['model_folder']).mkdir(parents=True, exist_ok=True)
# Retrieving dataloaders and tokenizers for source and target languages using the 'get_ds' function
train_dataloader, val_dataloader, tokenizer_src, tokenizer_tgt = get_ds(config)
# Initializing model on the GPU using the 'get_model' function
model = get_model(config,tokenizer_src.get_vocab_size(), tokenizer_tgt.get_vocab_size()).to(device)
# Tensorboard
writer = SummaryWriter(config['experiment_name'])
# Setting up the Adam optimizer with the specified learning rate from the '
# config' dictionary plus an epsilon value
optimizer = torch.optim.Adam(model.parameters(), lr=config['lr'], eps = 1e-9)
# Initializing epoch and global step variables
initial_epoch = 0
global_step = 0
# Checking if there is a pre-trained model to load
# If true, loads it
if config['preload']:
model_filename = get_weights_file_path(config, config['preload'])
print(f'Preloading model {model_filename}')
state = torch.load(model_filename) # Loading model
# Sets epoch to the saved in the state plus one, to resume from where it stopped
initial_epoch = state['epoch'] + 1
# Loading the optimizer state from the saved model
optimizer.load_state_dict(state['optimizer_state_dict'])
# Loading the global step state from the saved model
global_step = state['global_step']
# Initializing CrossEntropyLoss function for training
# We ignore padding tokens when computing loss, as they are not relevant for the learning process
# We also apply label_smoothing to prevent overfitting
loss_fn = nn.CrossEntropyLoss(ignore_index = tokenizer_src.token_to_id('[PAD]'), label_smoothing = 0.1).to(device)
# Initializing training loop
# Iterating over each epoch from the 'initial_epoch' variable up to
# the number of epochs informed in the config
for epoch in range(initial_epoch, config['num_epochs']):
# Initializing an iterator over the training dataloader
# We also use tqdm to display a progress bar
batch_iterator = tqdm(train_dataloader, desc = f'Processing epoch {epoch:02d}')
# For each batch...
for batch in batch_iterator:
model.train() # Train the model
# Loading input data and masks onto the GPU
encoder_input = batch['encoder_input'].to(device)
decoder_input = batch['decoder_input'].to(device)
encoder_mask = batch['encoder_mask'].to(device)
decoder_mask = batch['decoder_mask'].to(device)
# Running tensors through the Transformer
encoder_output = model.encode(encoder_input, encoder_mask)
decoder_output = model.decode(encoder_output, encoder_mask, decoder_input, decoder_mask)
proj_output = model.project(decoder_output)
# Loading the target labels onto the GPU
label = batch['label'].to(device)
# Computing loss between model's output and true labels
loss = loss_fn(proj_output.view(-1, tokenizer_tgt.get_vocab_size()), label.view(-1))
# Updating progress bar
batch_iterator.set_postfix({f"loss": f"{loss.item():6.3f}"})
writer.add_scalar('train loss', loss.item(), global_step)
writer.flush()
# Performing backpropagation
loss.backward()
# Updating parameters based on the gradients
optimizer.step()
# Clearing the gradients to prepare for the next batch
optimizer.zero_grad()
global_step += 1 # Updating global step count
# We run the 'run_validation' function at the end of each epoch
# to evaluate model performance
run_validation(model, val_dataloader, tokenizer_src, tokenizer_tgt, config['seq_len'], device, lambda msg: batch_iterator.write(msg), global_step, writer)
# Saving model
model_filename = get_weights_file_path(config, f'{epoch:02d}')
# Writting current model state to the 'model_filename'
torch.save({
'epoch': epoch, # Current epoch
'model_state_dict': model.state_dict(),# Current model state
'optimizer_state_dict': optimizer.state_dict(), # Current optimizer state
'global_step': global_step # Current global step
}, model_filename)
我们现在可以训练模型了!
if __name__ == '__main__':
warnings.filterwarnings('ignore') # Filtering warnings
config = get_config() # Retrieving config settings
train_model(config) # Training model with the config arguments
Using device cuda
Downloading builder script:
6.08k/? [00:00<00:00, 391kB/s]
Downloading metadata:
161k/? [00:00<00:00, 11.0MB/s]
Downloading and preparing dataset opus_books/en-it (download: 3.14 MiB, generated: 8.58 MiB, post-processed: Unknown size, total: 11.72 MiB) to /root/.cache/huggingface/datasets/opus_books/en-it/1.0.0/e8f950a4f32dc39b7f9088908216cd2d7e21ac35f893d04d39eb594746af2daf...
Downloading data: 100%
3.30M/3.30M [00:00<00:00, 10.6MB/s]
Dataset opus_books downloaded and prepared to /root/.cache/huggingface/datasets/opus_books/en-it/1.0.0/e8f950a4f32dc39b7f9088908216cd2d7e21ac35f893d04d39eb594746af2daf. Subsequent calls will reuse this data.
Max length of source sentence: 309
Max length of target sentence: 274
....................................................................
结论
在本文中,我们深入探讨了原始Transformer架构,我们使用PyTorch一步一步地实现了它,在英语到意大利语翻译任务中使用OpusBook数据集。