图表与时间
图表正成为数据科学家最喜欢使用的工具之一。它们固有的结构可以高效地存储复杂信息,例如你身体中正在进行的蛋白质相互作用或你和你的朋友所处的不断发展的社交网络。
此外,图表可以适应时间场景。它们可以从简单的静态形式,即没有时间概念的形式,变为流体时空设置,其中拓扑结构固定,但特征在规律的时间间隔内变化,再到混乱的、完全连续和时态动态模式,其中任何事物都可以在任何时间发生变化。
图神经网络(GNN)用于处理图,许多已经在推荐系统、供应链优化和道路交通预测等方面展示出了卓越的结果。然而,大多数GNN仅适用于静态图,限制了它们在时间问题上的应用。
那么,当我们有存储在图中的时间序列数据需要预测未来值时会发生什么呢?事实证明,Transformer可能是一个解决方案,因为它已经在时间序列预测场景中显示出了出色的性能。
我们的目标
为了研究将GNN和Transformer结合使用于图上的时间序列预测的有效性,我们将解决一个边权重预测问题。利用Vesper提供的数据,我们将创建一个时空图序列,作为全球黄油贸易市场的快照。我们的目标是
预测未来三个月的全球黄油贸易量。
贸易数据
所有国家每月都会发布贸易记录。每条记录指明报告国、交易中的伙伴国、产品和其交易量(我们的目标变量)。由于每笔交易都有两次记录,我们将重点关注出口。
我们可以使用的数据范围从2015年1月到2023年4月,总共超过153,000次交易。该数据集列出了242个国家,包括多个国家汇总的地区出口,这意味着有242*241 = 58,322个可能的国家对。在所有月份中,每对都是一个我们将要预测的独立时间序列。
我们还包括其他数据系列作为特征来支持我们模型的学习。特别是,我们定义了国家特定的特征,例如国内黄油生产或国内生产总值,以及对特定的配对属性,例如交易量或汇率。
使用PyTorch Geometric构建图表
我们按月份对数据进行分割。我们指定两种类型的节点:出口国(exp_id)和进口国(imp_id)。然后使用无向边来指示两个国家之间的关系。
通过这种方式,我们可以将不同类型的节点赋予不同的国家特定特征,并将特定于配对的属性嵌入到边上。每个月的结果图表是异构的、完全连接的和二部图。可以用下面的方式进行可视化。
总的来说,我们最终得到了一系列的图表,每个图表都作为市场在特定月份结束时的静态快照。假设与交易数据相比,特征数据系列更新更迅速,我们可以为我们打算预测的最近三个月创建图表,我们将其称为目标月份。这些图表与历史月份相同,但在边缘上缺少交易信息。
在Python中,我们使用PyTorch Geometric (PyG)库中的HeteroData对象构建一个异质图。
!pip install torch_geometric
import torch
from torch_geometric.data import HeteroData
from torch_geometric.transforms import ToUndirected
def generate_monthly_snapshot(monthly_data):
"""
Generate a HeteroData object as snapshot of one month.
Args:
monthly_data (list): List of pandas dataframes with trade
and features' data for one month.
Returns:
HeteroData: Object containing node features and edge attributes.
"""
monthly_snp = HeteroData()
# Ingesting the data
trade_figs = monthly_data[0]
exporters_features = monthly_data[1]
importers_features = monthly_data[2]
edge_attrs = monthly_data[3]
# Creating the nodes
exp_ids = trade_figs['exp_id'].unique(),
exp_ids = torch.from_numpy(exp_ids).to(torch.int64)
exporters_ftrs_arr = exporters_features.values
exporters_ftrs_arr = np.vstack(exporters_ftrs_arr).astype(np.float64)
exporters_ftrs_tensor = torch.tensor(exporters_ftrs_arr,
dtype=torch.float)
monthly_snp['exp_id'].x = exporters_ftrs_tensor
imp_ids = trade_figs['imp_id'].unique(),
imp_ids = torch.from_numpy(imp_ids).to(torch.int64)
importers_ftrs_arr = importers_features.values
importers_ftrs_arr = np.vstack(importers_ftrs_arr).astype(np.float64)
importers_ftrs_tensor = torch.tensor(importers_ftrs_arr,
dtype=torch.float)
monthly_snp['imp_id'].x = importers_ftrs_tensor
# Creating the edges
edge_index = torch.stack([
torch.tensor(trade_figs['exp_id'].values, dtype=torch.long),
torch.tensor(trade_figs['imp_id'].values, dtype=torch.long)],
dim=0)
monthly_snp['exp_id', 'volume', 'imp_id'].edge_index = edge_index
vol = torch.from_numpy(trade_figs['volume'].values).to(torch.float)
monthly_snp['exp_id', 'volume', 'imp_id'].edge_label = vol
edge_attrs_arr = edge_attrs.values
edge_attrs_arr = np.vstack(edge_attrs_arr).astype(np.float64)
edge_attrs_tensor = torch.tensor(edge_attrs.values).to(torch.float)
monthly_snp['exp_id',
'volume', 'imp_id'].edge_attrs = edge_attrs_tensor
monthly_snp['exp_id', 'volume',
'imp_id'].edge_label_index = monthly_snp['exp_id',
'volume','imp_id'].edge_index.clone()
monthly_snp = ToUndirected()(monthly_snp)
del monthly_snp[('imp_id',
'rev_volume', 'exp_id')]['edge_label']
return monthly_snp
请注意,交易量数据存储在变量vol中,并被赋值给边的权重,在PyG中称为edge_label,而不是边的属性。2015年1月的结果是一个HeteroData对象。
它包含以下变量:
有了图准备好后,我们可以继续定义模型的各个组件。
模型 - 各个组件
受时间和图方法的启发,我们采用了编码器-解码器架构。 这三个关键单位是:
1. GNN编码器:用于生成静态节点嵌入。
2. Transformer:用于创建时间节点嵌入。
3. 边解码器:用于推断预测。
GNN编码器
类似于卷积神经网络处理图像的方式,GNN对图的所有特征执行可优化的转换,保留该图内的数据分布。它们用于将复杂的图模式转换为称为节点嵌入的数字向量。自然地,一些信息丢失是不可避免的。因此,GNN越好,损失就越低。
在我们的案例中,我们使用GATv2Conv运算符,它引入了注意力机制,根据它们在创建每个节点嵌入中的重要性进行邻居节点排序。在Python中:
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.nn import GATv2Conv
from torch.nn import BatchNorm
class GNNEncoder(nn.Module):
"""
GNN Encoder module for creating static node embeddings.
Args:
hidden_channels (int): The number of hidden channels.
num_heads_GAT (int): The number of attention heads.
dropout_p_GAT (float): Dropout probability.
edge_dim_GAT (int): Dimensionality of edge features.
momentum_GAT (float): Momentum for batch normalization.
"""
def __init__(self, hidden_channels, num_heads_GAT,
dropout_p_GAT, edge_dim_GAT, momentum_GAT):
super().__init__()
self.gat = GATv2Conv((-1, -1), hidden_channels,
add_self_loops=False, heads=num_heads_GAT,
edge_dim=edge_dim_GAT)
self.norm = BatchNorm(hidden_channels, momentum=momentum_GAT,
affine=False, track_running_stats=False)
self.dropout = nn.Dropout(dropout_p_GAT)
def forward(self, x_dict, edge_index, edge_attrs):
"""
Forward pass of the GNNEncoder.
Args:
x_dict (torch.Tensor): node types as keys and node features
for each node as values.
edge_index (torch.Tensor): see previous section.
edge_attrs (torch.Tensor): see previous section.
Returns:
torch.Tensor: Static node embeddings for one month.
"""
x_dict = self.dropout(x_dict)
x_dict = self.norm(x_dict)
nodes_embedds = self.gat(x_dict, edge_index, edge_attrs)
nodes_embedds = F.leaky_relu(nodes_embedds, negative_slope=0.1)
return nodes_embedds
变压器
Transformer 是一种杰出的架构,能够进行序列到序列的预测,并作为像 ChatGPT 这样的大型语言模型的基础。通过复杂的注意机制,它们可以量化嵌入相对于输入顺序的关系。然后,它们为序列中的下一个元素生成一个相关的预测(作为一个嵌入)。
我们使用一个 Transformer 来推断静态月度快照之间的时间动态,并为预测月份生成一个“时间”节点嵌入。在应用了位置编码后,如 PyTorch 网站上所示,我们定义:
class Transformer(nn.Module):
"""
Transformer-based module for creating temporal node embeddings.
Args:
dim_model (int): The dimension of the model's hidden states.
num_heads_TR (int): The number of attention heads.
num_encoder_layers_TR (int): The number of encoder layers.
num_decoder_layers_TR (int): The number of decoder layers.
dropout_p_TR (float): Dropout probability.
"""
def __init__(
self, dim_model, num_heads_TR, num_encoder_layers_TR,
num_decoder_layers_TR, dropout_p_TR):
super().__init__()
self.pos_encoder = PositionalEncoding(dim_model)
self.transformer = nn.Transformer(
d_model=dim_model,
nhead=num_heads_TR,
num_decoder_layers=num_encoder_layers_TR,
num_encoder_layers=num_decoder_layers_TR,
dropout=dropout_p_TR)
def forward(self, src, trg):
"""
Forward pass of the Transformer module.
Args:
src (torch.Tensor): Input sequence with dimensions
(seq_len, num_of_nodes, node_embedds_size).
trg (torch.Tensor): Last element of src, with dimensions
(1, num_of_nodes, node_embedds_size).
Returns:
torch.Tensor: Temporal node embeddings for the month
under prediciton.
"""
src = self.pos_encoder(src)
trg = self.pos_encoder(trg)
temporal_node_embeddings = self.transformer(src, trg)
return temporal_node_embeddings
边缘解码器
这个组件接收时间节点嵌入并对目标变量进行推断。在实践中,嵌入将通过两个线性层传递,将一对节点嵌入的维度降低为一个数值:特定一对国家之间的预测贸易量。
class EdgeDecoder(nn.Module):
"""
Edge Decoder module to infer the predictions.
Args:
hidden_channels (int): The number of hidden channels.
num_heads_GAT (int): The number of attention heads in GAT layer.
"""
def __init__(self, hidden_channels, num_heads_GAT):
super().__init__()
self.lin1 = nn.Linear(2 * hidden_channels * num_heads_GAT,
hidden_channels)
self.lin2 = nn.Linear(hidden_channels, 1)
def forward(self, z_dict, edge_label_index):
"""
Forward pass of the EdgeDecoder module.
Args:
z_dict (dict): node type as keys and temporal node embeddings
for each node as values.
edge_label_index (torch.Tensor): see previous section.
Returns:
torch.Tensor: Predicted edge labels.
"""
row, col = edge_label_index
z = torch.cat([z_dict['exp_id'][row], z_dict['imp_id'][col]],
dim=-1)
z = self.lin1(z)
z = F.leaky_relu(z, negative_slope=0.1)
z = self.lin2(z)
return z.view(-1)
完整模型的初始化如下:
# Any HeteroData object, needed for model initialization.
hetero_data_init = monthly_data[0]
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
hidden_channels = 32
num_heads_GAT = 4
dropout_p_GAT = 0.3
edge_dim_GAT = 7
momentum_GAT = 0.1
num_heads_TR = 4
num_encoder_layers_TR = 2
num_decoder_layers_TR = 2
dropout_p_TR = 0.3
model = Model(hidden_channels=hidden_channels,
num_heads_GAT=num_heads_GAT,
dropout_p_GAT=dropout_p_GAT,
edge_dim_GAT=edge_dim_GAT,
momentum_GAT=momentum_GAT,
num_heads_TR=num_heads_TR,
num_encoder_layers_TR=num_encoder_layers_TR,
num_decoder_layers_TR=num_decoder_layers_TR,
dropout_p_TR=dropout_p_TR).to(device)
为了理解模型是如何构建以及三个单元之间的相互作用,我将会引导你走过一个轮回的训练过程。在设定预测月份数(在我们的例子中为3个月)并选择优化器及其学习率(Adam优化器,学习率为0.00005)之后,我们运行以下代码:
for epoch in range(1, epochs + 1):
# Train phase
for m in range(5, tot_num_months - num_predicting_months):
m_lag4 = monthly_snapshots_list[m - 4]
m_lag3 = monthly_snapshots_list[m - 3]
m_lag2 = monthly_snapshots_list[m - 2]
m_lag1 = monthly_snapshots_list[m - 1]
m = monthly_snapshots_list[m]
m_lag4 = m_lag4.to(device)
m_lag3 = m_lag3.to(device)
m_lag2 = m_lag2.to(device)
m_lag1 = m_lag1.to(device)
m = m.to(device)
historical = [m_lag4, m_lag3, m_lag2, m_lag1]
train_loss = train(historical, m)
train_pred, train_target, train_rmse = test(historical, m)
在这里,monthly_snapshots_list是每月快照的序列,tot_num_months等于其长度。为了捕捉时间趋势,我们在range(5, ...)中隐式设置了一个长度为4的窗口。该设置确保了历史数据始终包含了在预测月份m之前的四个月的数据。在训练过程中,我们将这个窗口沿着每月快照的序列滑动,并在每次迭代中更新m。
对于train和test函数,我们按照这个Google Colab笔记本中“Training a Heterogeneous GNN”部分的典型结构进行处理。这些函数将历史数据和m发送给模型,该模型以完整的形式进行初始化,如下所示:
class Model(nn.Module):
"""
The complete model.
Args:
See previous code snippets.
"""
def __init__(self, hidden_channels, num_heads_GAT, dropout_p_GAT,
edge_dim_GAT, momentum_GAT, num_heads_TR,
num_encoder_layers_TR, num_decoder_layers_TR,
dropout_p_TR):
super().__init__()
self.encoder = GNNEncoder(hidden_channels, num_heads_GAT,
dropout_p_GAT, edge_dim_GAT,
momentum_GAT)
self.encoder = to_hetero(self.encoder,
metadata=hetero_data_init.metadata(),
aggr='sum')
self.transformer = Transformer(hidden_channels * num_heads_GAT,
num_heads_TR,
num_encoder_layers_TR,
num_decoder_layers_TR, dropout_p_TR)
self.decoder = EdgeDecoder(hidden_channels, num_heads_GAT)
The to_hetero function adapts the GATv2Conv operator to heterogeneous graphs. Next, we define the forward pass, summarised by the following diagram:
to_hetero函数使 GATv2Conv 运算符适应异构图。接下来,我们定义前向传播,总结如下图:
首先,历史列表中的每个HeteroData对象都通过GNN编码器进行处理。接下来,生成的静态节点嵌入被连接到Transformer的源张量中。该模块为当前要预测的月份生成时间节点嵌入,然后将其传递给Edge解码器。最后,Edge解码器生成月份m的预测结果。在Python中的代码如下:
def forward(self, historical, m):
"""
Forward pass of the Model.
Args:
See previous code snippets.
Returns:
torch.Tensor: Predicted edge labels for the month
under prediciton, m.
"""
# GNN Encoder
embedds_static_list = []
for month in historical:
x_dict = month.x_dict
edge_index_dict = month.edge_index_dict
edge_attrs_dict = month.edge_attrs_dict
edge_label_index = month['exp_id',
'volume', 'imp_id'].edge_label_index
z_dict_month = self.encoder(x_dict,
edge_index_dict, edge_attrs_dict)
num_exp_nodes = z_dict_month['exp_id'].size()[0]
num_imp_nodes = z_dict_month['imp_id'].size()[0]
month_embedds = torch.cat((z_dict_month['exp_id'],
z_dict_month['imp_id']), 0)
embedds_static_list.append(month_embedds)
embedds_static = torch.stack(embedds_static_list)
# Transformer (with positional encoding)
src = embedds_static
trg = embedds_static_list[3]
trg = trg.unsqueeze(0)
embedds_temp = self.transformer(src, trg)
embedds_temp = embedds_temp.squeeze(0)
embedds_exp, embedds_imp = embedds_temp.split([num_exp_nodes,
num_imp_nodes],
dim=0)
# Prepare input for Edge Decoder.
z_dict_m_temp = {'exp_id': embedds_exp,
'imp_id': embedds_imp}
edge_label_index = m['exp_id', 'volume', 'imp_id'].edge_label_index
# Edge Decoder
edge_label_pred_m = self.decoder(z_dict_m_temp, edge_label_index)
return edge_label_pred_m
对于每个训练步骤,我们通过比较模型的预测与实际交易量来计算均方误差。然后将损失反向传播到每个单元,PyTorch确保以平衡的方式进行。
在训练阶段结束时,我们再次移动窗口,并允许模型预测目标列表中的第一个月的交易量。当窗口再次移动时,历史列表包括最后三个训练月和刚刚通过预测交易量增加的第一个目标月。因此,随着窗口向未来滑动,模型将越来越多地根据以前的预测进行预测。
在对模型进行几个时期的训练后,我们进行评估。
结果
为了评估我们模型的有效性,我们使用PyCaret预测实验来与传统的预测技术进行对比,对一部分贸易路线进行基准测试。这个模型在贸易未发生时能做出完美的预测,并且在交易量极高的配对中表现相对良好。这两种情况都涉及交易金额的离群值,使得预测变得更容易。相反,平均交易量的贸易路线更具挑战性,这解释了模型在这些情况下的不一致表现。
结论
在本文中,我们创建了一个静态异构图序列来表示黄油的全球贸易市场。然后,我们实现了一个模型,沿着这个序列滑动,并使用图神经网络和Transformer来推断快照内和跨快照之间的关系。最后,我们通过与传统预测方法进行比较来测试模型的性能,以量化相对改进。