知识图谱推理中最大的难题是维度的不确定性——有些节点没有连接,而有些节点则有数百个连接——因此无法将它们拟合到256维向量中。
以下是我认为的一个不错的解决方案:
将连接转换为统一长度!怎么做呢?使用IFT(傅里叶变换)。
你的方法确实可以形成一个强大的框架,以兼容变换器和深度学习模型的方式嵌入知识图谱的可变结构。使用逆傅里叶变换(IFT)将每个节点的连接表示为一个固定大小的数组是一个特别有创意的想法,它可以使知识图谱数据无缝集成到变换器架构中,而在这些架构中,逻辑流是至关重要的。
以下是一个分步工作流程,说明如何构建、训练和推断使用基于IFT的知识图谱表示与变换器进行逻辑推理的方法。
使用IFT编码知识图谱中的节点
a. 定义节点信号:
b. 应用傅里叶变换以捕获连接模式:
c. 使用IFT表示嵌入每个节点:
为逻辑流设计变换器模型
每个节点都嵌入在256维空间中,变换器可以处理表示查询、路径或知识图谱中逻辑流的节点序列。
a. 逻辑查询的输入表示:
b. 编码路径和序列信息:
c. 训练变换器模型:
在变换器中进行逻辑查询路径的推理
在推理期间,你可以利用训练好的变换器识别节点嵌入中逻辑流的能力。
a. 构建逻辑查询:
b. 嵌入路径并进行预测:
c. 解释模型输出:
工作流程总结
预处理:
模型设计:
训练:
推理:
这种方法利用固定大小节点嵌入表示的一致性,使变压器能够处理连接性可变的知识图谱,通过捕捉图谱的结构丰富性来增强逻辑推理能力,同时保持与神经网络架构的兼容性。
以下是代码:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
# Define the text-based knowledge graph
knowledge_graph = {
# People and Roles
"Obama": ["President", "Senator"],
"Biden": ["President", "Vice President", "Candidate", "Senator"],
"Trump": ["President", "Candidate", "Businessman"],
"Harris": ["Vice President", "Candidate", "Senator"],
"Clinton": ["Secretary of State", "Senator", "Candidate"],
"Bush": ["President", "Governor"],
"Lincoln": ["President"],
"Reagan": ["President", "Governor"],
# Roles and Offices
"President": ["2024", "White House", "Washington DC"],
"Vice President": ["Washington DC"],
"Secretary of State": ["State Department"],
"Senator": ["Congress"],
"Governor": ["State Capitol"],
"Congress": ["Washington DC"],
# Locations
"Washington DC": ["White House", "President", "Vice President", "State Department", "Congress"],
"White House": ["President", "US"],
"State Department": ["Washington DC"],
"State Capitol": ["Governor"],
# Organizations and Business Roles
"Businessman": ["Company"],
"Company": ["CEO", "Headquarters"],
"CEO": ["Company"],
# Countries and Regions
"US": ["White House", "Congress", "State Department"],
"Russia": ["Kremlin", "President"],
"China": ["Beijing", "President"],
# Candidates by Election Year
"Candidate": ["President"],
"2024": ["Harris", "Biden"],
"2020": ["Trump", "Biden"],
"2016": ["Trump", "Clinton"],
"2008": ["Obama", "Biden", "Clinton"],
"2000": ["Bush"],
"1980": ["Reagan"],
# Locations by country
"Kremlin": ["President"],
"Beijing": ["President"],
# Add relationships to cover locations and entities more consistently
"Headquarters": ["Company"],
"State Capitol": ["Governor"],
"Congress": ["Washington DC"],
}
# Create node mappings
nodes = list(knowledge_graph.keys())
node_to_idx = {node: idx for idx, node in enumerate(nodes)}
idx_to_node = {idx: node for node, idx in node_to_idx.items()}
# Create node embeddings using Fourier Transform and Inverse Fourier Transform
def node_to_signal(node, graph, size):
signal = np.zeros(size)
for conn in graph.get(node, []):
idx = node_to_idx[conn]
signal[idx % size] = 1
return signal
def create_node_embedding(signal, embedding_size):
fft_result = np.fft.fft(signal)
embedding = np.fft.ifft(fft_result[:embedding_size]).real
return embedding
size = len(nodes)
embedding_size = 4
node_embeddings = {
node: create_node_embedding(node_to_signal(node, knowledge_graph, size), embedding_size)
for node in nodes
}
# Generate sequences from the knowledge graph
def generate_sequences(graph, max_length):
sequences = []
for start_node in graph:
queue = [(start_node, [start_node])]
while queue:
current_node, path = queue.pop(0)
if len(path) >= max_length:
continue
for neighbor in graph[current_node]:
if neighbor not in path:
new_path = path + [neighbor]
sequences.append(new_path)
queue.append((neighbor, new_path))
return sequences
sequences = generate_sequences(knowledge_graph, max_length=4)
# Prepare input and target tensors
def pad_sequence(seq, max_length):
return seq + ["<PAD>"] * (max_length - len(seq))
# Add a <PAD> token to the node mappings
if "<PAD>" not in node_to_idx:
pad_idx = len(node_to_idx)
node_to_idx["<PAD>"] = pad_idx
idx_to_node[pad_idx] = "<PAD>"
# Initialize embedding for <PAD> token
node_embeddings["<PAD>"] = np.zeros(embedding_size)
max_seq_length = max(len(seq) - 1 for seq in sequences if len(seq) > 1)
input_sequences = []
target_nodes = []
for seq in sequences:
if len(seq) > 1:
input_sequences.append(seq[:-1])
target_nodes.append(seq[-1])
input_tensors = []
for seq in input_sequences:
padded_seq = pad_sequence(seq, max_seq_length)
embedding_seq = [torch.tensor(node_embeddings[node], dtype=torch.float32) for node in padded_seq]
input_tensors.append(torch.stack(embedding_seq))
input_tensors = torch.stack(input_tensors)
target_indices = torch.tensor([node_to_idx[node] for node in target_nodes], dtype=torch.long)
# Define the transformer model
class PositionalEncoding(nn.Module):
def __init__(self, embedding_dim, max_len=500):
super(PositionalEncoding, self).__init__()
pe = torch.zeros(max_len, embedding_dim)
position = torch.arange(0, max_len).unsqueeze(1).float()
div_term = torch.exp(torch.arange(0, embedding_dim, 2).float() * (-np.log(10000.0) / embedding_dim))
pe[:, 0::2] = torch.sin(position * div_term)
if embedding_dim % 2 == 1:
pe[:, 1::2] = torch.cos(position * div_term)[:, :pe[:, 1::2].shape[1]]
else:
pe[:, 1::2] = torch.cos(position * div_term)
self.register_buffer('pe', pe.unsqueeze(1))
def forward(self, x):
x = x + self.pe[:x.size(0)]
return x
class TransformerModel(nn.Module):
def __init__(self, embedding_dim, nhead, num_layers, num_nodes):
super(TransformerModel, self).__init__()
self.pos_encoder = PositionalEncoding(embedding_dim)
encoder_layers = nn.TransformerEncoderLayer(embedding_dim, nhead)
self.transformer_encoder = nn.TransformerEncoder(encoder_layers, num_layers)
self.decoder = nn.Linear(embedding_dim, num_nodes)
def forward(self, src):
src = src.permute(1, 0, 2) # seq_len, batch_size, embedding_dim
src = self.pos_encoder(src)
output = self.transformer_encoder(src)
output = output[-1, :, :] # Take the last output
output = self.decoder(output)
return output
num_nodes = len(node_to_idx)
embedding_dim = embedding_size # Set embedding_dim to embedding_size
model = TransformerModel(embedding_dim, nhead=2, num_layers=2, num_nodes=num_nodes)
# Training the model
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
losses = []
for epoch in range(400):
model.train()
optimizer.zero_grad()
output = model(input_tensors)
loss = criterion(output, target_indices)
losses.append(loss.item())
loss.backward()
optimizer.step()
if epoch % 20 == 0:
print(f'Epoch {epoch}, Loss {loss.item():.4f}')
# Evaluating the model
model.eval()
with torch.no_grad():
output = model(input_tensors)
predictions = torch.argmax(output, dim=1)
accuracy = (predictions == target_indices).float().mean()
print(f'Accuracy: {accuracy.item():.4f}')
# Testing the model on the question "Who is almost the president in the year 2024?"
# Define the query sequence: ["President", "2024"]
query_sequence = ["President", "2024"]
padded_query = pad_sequence(query_sequence, max_seq_length)
embedding_seq = [torch.tensor(node_embeddings.get(node, np.zeros(embedding_size)), dtype=torch.float32) for node in padded_query]
input_tensor = torch.stack(embedding_seq).unsqueeze(0) # Add batch dimension
model.eval()
with torch.no_grad():
output = model(input_tensor)
predicted_idx = torch.argmax(output, dim=1).item()
predicted_node = idx_to_node[predicted_idx]
print(f'Predicted answer: {predicted_node}')
输出;
Epoch 0, Loss 3.8112
Epoch 20, Loss 2.9358
Epoch 40, Loss 2.6116
Epoch 60, Loss 2.4373
Epoch 80, Loss 2.2315
Epoch 100, Loss 2.1335
Epoch 120, Loss 1.9916
Epoch 140, Loss 1.9691
Epoch 160, Loss 1.9247
Epoch 180, Loss 1.8379
Epoch 200, Loss 1.8687
Epoch 220, Loss 1.8979
Epoch 240, Loss 1.8913
Epoch 260, Loss 1.9347
Epoch 280, Loss 1.8863
Epoch 300, Loss 1.7876
Epoch 320, Loss 1.8959
Epoch 340, Loss 1.7988
Epoch 360, Loss 1.8072
Epoch 380, Loss 1.8047
Accuracy: 0.3718
Predicted answer: Harris
损失图:
import matplotlib.pyplot as plt
# plot the losses list using matplotlib
plt.figure(figsize=(10, 6))
plt.plot(range(len(losses)), losses, label="Training Loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.title("Training Loss Over Epochs")
plt.legend()
plt.show()
希望这对你有所帮助。