人工智能席卷了世界。每周都会出现新的模型、工具和应用程序。开源工具的出现使用户能够通过少量的代码来训练和使用复杂的机器学习模型,真正实现了人工智能的民主化;与此同时,虽然许多现成的模型可能提供出色的预测能力,但它们作为黑盒模型的使用可能会让人工智能好奇的学生无法更深入地了解它们的工作原理以及最初开发它们的原因。这种理解在自然科学中尤其重要,在自然科学中,仅仅知道一个模型是准确的还不够,了解它与其他物理理论的联系、它的局限性以及它对其他系统的普遍性也很重要。在本文中,我们将通过化学的视角探索一种特定机器学习模型(图卷积网络)的基础知识。这并不意味着是一个数学上严格的探索;相反,我们将尝试将网络的特征与自然科学中的传统模型进行比较,并思考为什么它如此有效。
图和图神经网络的需求
在化学或物理学中的模型通常是一个连续函数,比如 y=f(x₁, x₂, x₃, …, xₙ),其中 x₁, x₂, x₃, …, xₙ 是输入,y 是输出。这样一个模型的例子是决定两个点电荷 q₁ 和 q₂ 在相对介电常数 εᵣ 的介质中,被距离 r 分隔时的静电相互作用(或力)的方程,通常称为库仑定律。
如果我们不知道这种关系,但是假设有多个数据点,每个数据点都包含了点电荷之间互动(输出)以及相应的输入,我们可以拟合一个人工神经网络来预测任何给定点电荷在具有特定介电常数的介质中任何给定分离距离时的互动。在这个问题的情况下,虽然忽略了一些重要的注意事项,但创建一个数据驱动的物理问题模型相对比较直接。
现在考虑从分子的结构预测特定性质的问题,比如说水中的溶解度。首先,没有一个明显的输入集来描述一个分子。你可以使用各种特征,如键长、键角、不同种类元素的数量、环的数量等等。然而,并不能保证任何这样的任意集合对所有分子都能很好地工作。
其次,不像点电荷的例子,输入可能不一定存在于一个连续的空间中。例如,我们可以认为甲醇、乙醇和丙醇是一组具有递增链长的分子集合;然而,没有任何东西的概念是介于它们之间的——链长是一个离散参数,没有办法在甲醇和乙醇之间内插得到其他分子。拥有一个连续的输入空间对于计算模型的导数至关重要,这些导数然后可以用于所选性质的优化。
为了克服这些问题,已经提出了各种编码分子的方法。其中一种方法是使用SMILES和SELFIES等方案的文本表示。关于这种表示有大量的文献,我引导感兴趣的读者查阅这篇有帮助的综述。第二种方法涉及将分子表示为图。虽然每种方法都有其优点和缺点,但图表示对于化学来说更直观。
图是一种由节点通过边连接而构成的数学结构,表示节点之间的关系。分子自然而然地适合这种结构——原子变成节点,键变成边。图中的每个节点都通过一个向量来表示,该向量编码了对应原子的属性。通常,一个热编码方案就足够了(下一节将详细讨论)。这些向量可以堆叠起来创建一个节点矩阵。节点之间的关系——由边表示——可以通过一个方形相邻矩阵来勾画,其中每个元素aᵢⱼ要么是1要么是0,取决于节点i和j是否分别通过边连接。对角线元素被设置为1,表示自连接,这使得矩阵适合于卷积(如你将在下一节中看到)。可以开发更复杂的图表示,在这种表示中,边的属性也在一个单独的矩阵中进行一热编码,这些节点和相邻矩阵将作为我们模型的输入。
通常情况下,人工神经网络模型接受一维向量作为输入。对于多维输入,比如图像,一类被称为卷积神经网络的模型被开发出来。在我们的案例中,我们也有二维矩阵作为输入,因此,需要一个修改过的网络模型来接受这些作为输入。图神经网络被开发出来以便对这些节点和邻接矩阵进行操作,将它们转换为合适的一维向量,然后可以通过普通人工神经网络的隐藏层进行传输以生成输出。有许多类型的图神经网络,比如图卷积网络、信息传递网络、图注意网络等等,它们主要在于交换图中节点和边缘间信息的函数方面有所不同。我们将更仔细地观察图卷积网络,因为它们相对简单。
图卷积和池化层
考虑你的输入的初始状态。节点矩阵表示每个原子在每一行的独热编码。为了简便,让我们考虑一个原子序数的独热编码,其中一个原子的原子序数为n,将在第n个索引处有一个1,其余位置为0。邻接矩阵表示节点之间的连接。当前状态下,节点矩阵不能被用作人工神经网络的输入,原因是:(1)它是二维的,(2)它不是排列不变的,(3)它不是唯一的。在这种情况下,排列不变性意味着无论你如何排序节点,输入应该保持不变;当前,同一个分子可以通过节点矩阵的多种排列来表示(假设邻接矩阵中也有恰当的排列)。这是一个问题,因为网络将对不同排列的输入作为不同的输入来处理,而它们应该被作为同一输入来处理。
对于前两个问题有一个简单的解决方案 —— 池化。如果沿列维度对节点矩阵进行池化,那么它将被缩减为一个排列不变的一维向量。通常,这种池化是一个简单的平均池化,这意味着最终池化的向量包含节点矩阵中每一列的平均值。然而,这仍然不能解决第三个问题 —— 对两个异构体的节点矩阵,比如正戊烷和新戊烷进行池化,将产生相同的池化向量。
为了使最终池化的向量唯一,我们需要在节点矩阵中纳入一些邻居信息。在异构体的情况下,虽然它们的化学式是相同的,但它们的结构不是。纳入邻居信息的一个简单方式是对每个节点及其邻居执行某种运算,比如求和。这可以表示为节点和邻接矩阵的乘积。通常,这个和会通过预乘以对角度数矩阵的逆来进行归一化,使其成为一个邻居的平均值。最后,这个乘积被后乘一个权重矩阵,使这个操作可参数化。这整个操作被称为图卷积。图3显示了一个直观且简单的图卷积形式。托马斯·基普夫和马克斯·韦林在他们的工作中提供了一个更加数学严密且数值稳定的图卷积形式,用一个修改过的邻接矩阵归一化方法。将卷积和池化操作的组合也可以被解释为经验团献法的一种非线性形式。
图卷积网络的最终结构如下 —— 首先,为给定的分子计算节点和邻接矩阵。然后对这些节点进行多次图卷积,接着通过池化操作生成一个包含所有分子信息的单一向量。此向量随后传递至标准人工神经网络的隐藏层以产生输出。通过应用于基于回归的损失函数(如均方误差损失)的反向传播,同时确定隐藏层、池化层和卷积层的权重。
代码实现
讨论了与图卷积网络相关的所有关键概念后,我们准备开始使用PyTorch构建一个。尽管存在一个灵活、高性能的图神经网络框架叫做PyTorch Geometric,但我们不打算使用它,因为我们的目标是深入了解并发展我们的理解。
本教程分为四个主要部分 ——(1)使用RDKit自动创建图,(2)将图打包进PyTorch数据集,(3)构建图卷积网络架构,以及(4)训练网络。
使用RDKit创建图
RDKit是一个化学信息学库,它允许快速访问小分子的属性。我们将使用它进行两项任务 —— 获取分子中每个原子的原子序数以对节点矩阵进行独热编码,以及获取邻接矩阵。我们假设分子以它们的SMILES字符串形式提供(这是大多数化学信息学数据的情况)。另外,为了确保节点和邻接矩阵的大小在所有分子中都是统一的 —— 它们本来不会默认统一,因为两者的大小都取决于分子中的原子数量 —— 我们用0来填充矩阵。最后,我们将尝试对上述提出的卷积进行一个小修改 —— 我们将邻接矩阵中的“1”替换为对应键长的倒数。这样,网络将拥有更多关于分子几何结构的信息,并且还将基于邻居的键长对每个节点周围的卷积进行加权。
class Graph:
def __init__(
self, molecule_smiles: str,
node_vec_len: int,
max_atoms: int = None
):
# Store properties
self.smiles = molecule_smiles
self.node_vec_len = node_vec_len
self.max_atoms = max_atoms
# Call helper function to convert SMILES to RDKit mol
self.smiles_to_mol()
# If valid mol is created, generate a graph of the mol
if self.mol is not None:
self.smiles_to_graph()
def smiles_to_mol(self):
# Use MolFromSmiles from RDKit to get molecule object
mol = Chem.MolFromSmiles(self.smiles)
# If a valid mol is not returned, set mol as None and exit
if mol is None:
self.mol = None
return
# Add hydrogens to molecule
self.mol = Chem.AddHs(mol)
def smiles_to_graph(self):
# Get list of atoms in molecule
atoms = self.mol.GetAtoms()
# If max_atoms is not provided, max_atoms is equal to maximum number
# of atoms in this molecule.
if self.max_atoms is None:
n_atoms = len(list(atoms))
else:
n_atoms = self.max_atoms
# Create empty node matrix
node_mat = np.zeros((n_atoms, self.node_vec_len))
# Iterate over atoms and add to node matrix
for atom in atoms:
# Get atom index and atomic number
atom_index = atom.GetIdx()
atom_no = atom.GetAtomicNum()
# Assign to node matrix
node_mat[atom_index, atom_no] = 1
# Get adjacency matrix using RDKit
adj_mat = rdmolops.GetAdjacencyMatrix(self.mol)
self.std_adj_mat = np.copy(adj_mat)
# Get distance matrix using RDKit
dist_mat = molDG.GetMoleculeBoundsMatrix(self.mol)
dist_mat[dist_mat == 0.] = 1
# Get modified adjacency matrix with inverse bond lengths
adj_mat = adj_mat * (1 / dist_mat)
# Pad the adjacency matrix with 0s
dim_add = n_atoms - adj_mat.shape[0]
adj_mat = np.pad(
adj_mat, pad_width=((0, dim_add), (0, dim_add)), mode="constant"
)
# Add an identity matrix to adjacency matrix
# This will make an atom its own neighbor
adj_mat = adj_mat + np.eye(n_atoms)
# Save both matrices
self.node_mat = node_mat
self.adj_mat = adj_mat
PyTorch 提供了一个非常方便的 Dataset 类来存储和访问各种类型的数据。我们将使用它来存储每个分子的节点和邻接矩阵以及输出。值得注意的是,使用这个 Dataset 接口来处理数据并非强制性的;尽管如此,使用这种抽象化的方法可以使后续步骤更加简单。我们需要为我们从 Dataset 类继承的 GraphData 类定义两个主要方法:一个__ len __方法用来获取数据集的大小,以及一个至关重要的 __getitem __方法来获取给定索引的输入和输出。
class GraphData(Dataset):
def __init__(self, dataset_path: str, node_vec_len: int, max_atoms: int):
# Save attributes
self.node_vec_len = node_vec_len
self.max_atoms = max_atoms
# Open dataset file
df = pd.read_csv(dataset_path)
# Create lists
self.indices = df.index.to_list()
self.smiles = df["smiles"].to_list()
self.outputs = df["measured log solubility in mols per litre"].to_list()
def __len__(self):
return len(self.indices)
def __getitem__(self, i: int):
# Get smile
smile = self.smiles[i]
# Create MolGraph object using the Graph abstraction
mol = Graph(smile, self.node_vec_len, self.max_atoms)
# Get node and adjacency matrices
node_mat = torch.Tensor(mol.node_mat)
adj_mat = torch.Tensor(mol.adj_mat)
# Get output
output = torch.Tensor([self.outputs[i]])
return (node_mat, adj_mat), output, smile
由于我们已经定义了自己的自定义方式来返回节点和邻接矩阵、输出以及SMILES字符串,我们需要定义一个自定义函数来整理数据,也就是说,将其打包成一个批次,然后传递给网络。通过一次传递数据的批次,而不是单个数据点来训练神经网络,并使用小批量梯度下降,这种能力为准确性和计算效率之间提供了一个微妙的平衡。我们下面将要定义的整理函数将本质上收集所有数据对象,将它们按类别归类,堆叠在列表中,转换成PyTorch张量,并重新组合这些张量,以便它们以我们GraphData类的相同方式返回。
def collate_graph_dataset(dataset: Dataset):
# Create empty lists of node and adjacency matrices, outputs, and smiles
node_mats = []
adj_mats = []
outputs = []
smiles = []
# Iterate over list and assign each component to the correct list
for i in range(len(dataset)):
(node_mat,adj_mat), output, smile = dataset[i]
node_mats.append(node_mat)
adj_mats.append(adj_mat)
outputs.append(output)
smiles.append(smile)
# Create tensors
node_mats_tensor = torch.cat(node_mats, dim=0)
adj_mats_tensor = torch.cat(adj_mats, dim=0)
outputs_tensor = torch.stack(outputs, dim=0)
# Return tensors
return (node_mats_tensor, adj_mats_tensor), outputs_tensor, smiles
构建图卷积网络架构
完成了代码的数据处理部分之后,我们现在转向构建模型本身。我们将构建我们自己的卷积层和池化层,以便于清晰理解,但在你们中如果有更高级的开发者,可以很容易地用PyTorch Geometric模块中预定义的更复杂层来替换这些。ConvolutionLayer本质上做三件事——(1)从邻接矩阵计算逆对角度矩阵,(2)执行四个矩阵的乘法((D^{-1}ANW)),以及(3)对层输出应用非线性激活函数。如同其他PyTorch类,我们将从Module基类继承,该基类已经定义了像forward这样的方法。
class ConvolutionLayer(nn.Module):
def __init__(self, node_in_len: int, node_out_len: int):
# Call constructor of base class
super().__init__()
# Create linear layer for node matrix
self.conv_linear = nn.Linear(node_in_len, node_out_len)
# Create activation function
self.conv_activation = nn.LeakyReLU()
def forward(self, node_mat, adj_mat):
# Calculate number of neighbors
n_neighbors = adj_mat.sum(dim=-1, keepdims=True)
# Create identity tensor
self.idx_mat = torch.eye(
adj_mat.shape[-2], adj_mat.shape[-1], device=n_neighbors.device
)
# Add new (batch) dimension and expand
idx_mat = self.idx_mat.unsqueeze(0).expand(*adj_mat.shape)
# Get inverse degree matrix
inv_degree_mat = torch.mul(idx_mat, 1 / n_neighbors)
# Perform matrix multiplication: D^(-1)AN
node_fea = torch.bmm(inv_degree_mat, adj_mat)
node_fea = torch.bmm(node_fea, node_mat)
# Perform linear transformation to node features
# (multiplication with W)
node_fea = self.conv_linear(node_fea)
# Apply activation
node_fea = self.conv_activation(node_fea)
return node_fea
接下来,让我们构建池化层(PoolingLayer)。这一层只执行一个操作,即沿第二维度(节点数量)进行平均。
class PoolingLayer(nn.Module):
def __init__(self):
# Call constructor of base class
super().__init__()
def forward(self, node_fea):
# Pool the node matrix
pooled_node_fea = node_fea.mean(dim=1)
return pooled_node_fea
最后,我们将定义并创建ChemGCN类,包含卷积层、池化层和隐藏层的定义。这个类通常应该有一个构造函数,用于定义每一层的结构和顺序,以及一个前向方法,它接受输入(在我们的案例中,是节点和邻接矩阵)并生成输出。我们将对所有层输出应用LeakyReLU激活函数。此外,我们还将使用dropout技术来减少过拟合。
class ChemGCN(nn.Module):
def __init__(
self,
node_vec_len: int,
node_fea_len: int,
hidden_fea_len: int,
n_conv: int,
n_hidden: int,
n_outputs: int,
p_dropout: float = 0.0,
):
# Call constructor of base class
super().__init__()
# Define layers
# Initial transformation from node matrix to node features
self.init_transform = nn.Linear(node_vec_len, node_fea_len)
# Convolution layers
self.conv_layers = nn.ModuleList(
[
ConvolutionLayer(
node_in_len=node_fea_len,
node_out_len=node_fea_len,
)
for i in range(n_conv)
]
)
# Pool convolution outputs
self.pooling = PoolingLayer()
pooled_node_fea_len = node_fea_len
# Pooling activation
self.pooling_activation = nn.LeakyReLU()
# From pooled vector to hidden layers
self.pooled_to_hidden = nn.Linear(pooled_node_fea_len, hidden_fea_len)
# Hidden layer
self.hidden_layer = nn.Linear(hidden_fea_len, hidden_fea_len)
# Hidden layer activation function
self.hidden_activation = nn.LeakyReLU()
# Hidden layer dropout
self.dropout = nn.Dropout(p=p_dropout)
# If hidden layers more than 1, add more hidden layers
self.n_hidden = n_hidden
if self.n_hidden > 1:
self.hidden_layers = nn.ModuleList(
[self.hidden_layer for _ in range(n_hidden - 1)]
)
self.hidden_activation_layers = nn.ModuleList(
[self.hidden_activation for _ in range(n_hidden - 1)]
)
self.hidden_dropout_layers = nn.ModuleList(
[self.dropout for _ in range(n_hidden - 1)]
)
# Final layer going to the output
self.hidden_to_output = nn.Linear(hidden_fea_len, n_outputs)
def forward(self, node_mat, adj_mat):
# Perform initial transform on node_mat
node_fea = self.init_transform(node_mat)
# Perform convolutions
for conv in self.conv_layers:
node_fea = conv(node_fea, adj_mat)
# Perform pooling
pooled_node_fea = self.pooling(node_fea)
pooled_node_fea = self.pooling_activation(pooled_node_fea)
# First hidden layer
hidden_node_fea = self.pooled_to_hidden(pooled_node_fea)
hidden_node_fea = self.hidden_activation(hidden_node_fea)
hidden_node_fea = self.dropout(hidden_node_fea)
# Subsequent hidden layers
if self.n_hidden > 1:
for i in range(self.n_hidden - 1):
hidden_node_fea = self.hidden_layers[i](hidden_node_fea)
hidden_node_fea = self.hidden_activation_layers[i](hidden_node_fea)
hidden_node_fea = self.hidden_dropout_layers[i](hidden_node_fea)
# Output
out = self.hidden_to_output(hidden_node_fea)
return out
训练网络
我们已经构建了训练我们的模型和进行预测所需的工具。在本节中,我们将编写辅助函数来训练和测试我们的模型,并编写一个脚本来运行一个工作流,该工作流会生成图表、构建网络并训练模型。
首先,我们将定义一个标准化器(Standardizer)类来标准化我们的输出。神经网络倾向于处理数值相对较小,彼此变化不大的数字。标准化有助于实现这一点。
class Standardizer:
def __init__(self, X):
self.mean = torch.mean(X)
self.std = torch.std(X)
def standardize(self, X):
Z = (X - self.mean) / (self.std)
return Z
def restore(self, Z):
X = self.mean + Z * self.std
return X
def state(self):
return {"mean": self.mean, "std": self.std}
def load(self, state):
self.mean = state["mean"]
self.std = state["std"]
其次,我们定义一个函数来执行每个时代以下步骤:
该函数返回批次平均损失和平均绝对误差,这可以用来绘制损失曲线。 一个不带有反向传播的类似函数被编写来测试模型。
def train_model(
epoch,
model,
training_dataloader,
optimizer,
loss_fn,
standardizer,
use_GPU,
max_atoms,
node_vec_len,
):
# Create variables to store losses and error
avg_loss = 0
avg_mae = 0
count = 0
# Switch model to train mode
model.train()
# Go over each batch in the dataloader
for i, dataset in enumerate(training_dataloader):
# Unpack data
node_mat = dataset[0][0]
adj_mat = dataset[0][1]
output = dataset[1]
# Reshape inputs
first_dim = int((torch.numel(node_mat)) / (max_atoms * node_vec_len))
node_mat = node_mat.reshape(first_dim, max_atoms, node_vec_len)
adj_mat = adj_mat.reshape(first_dim, max_atoms, max_atoms)
# Standardize output
output_std = standardizer.standardize(output)
# Package inputs and outputs; check if GPU is enabled
if use_GPU:
nn_input = (node_mat.cuda(), adj_mat.cuda())
nn_output = output_std.cuda()
else:
nn_input = (node_mat, adj_mat)
nn_output = output_std
# Compute output from network
nn_prediction = model(*nn_input)
# Calculate loss
loss = loss_fn(nn_output, nn_prediction)
avg_loss += loss
# Calculate MAE
prediction = standardizer.restore(nn_prediction.detach().cpu())
mae = mean_absolute_error(output, prediction)
avg_mae += mae
# Set zero gradients for all tensors
optimizer.zero_grad()
# Do backward prop
loss.backward()
# Update optimizer parameters
optimizer.step()
# Increase count
count += 1
# Calculate avg loss and MAE
avg_loss = avg_loss / count
avg_mae = avg_mae / count
# Print stats
print(
"Epoch: [{0}]\tTraining Loss: [{1:.2f}]\tTraining MAE: [{2:.2f}]"\
.format(
epoch, avg_loss, avg_mae
)
)
# Return loss and MAE
return avg_loss, avg_mae
最后,让我们来编写整个工作流程。这个脚本将调用我们上面定义的所有内容。
#### Fix seeds
np.random.seed(0)
torch.manual_seed(0)
use_GPU = torch.cuda.is_available()
#### Inputs
max_atoms = 200
node_vec_len = 60
train_size = 0.7
batch_size = 32
hidden_nodes = 60
n_conv_layers = 4
n_hidden_layers = 2
learning_rate = 0.01
n_epochs = 50
#### Start by creating dataset
main_path = Path(__file__).resolve().parent
data_path = main_path / "data" / "solubility_data.csv"
dataset = GraphData(dataset_path=data_path, max_atoms=max_atoms,
node_vec_len=node_vec_len)
#### Split data into training and test sets
# Get train and test sizes
dataset_indices = np.arange(0, len(dataset), 1)
train_size = int(np.round(train_size * len(dataset)))
test_size = len(dataset) - train_size
# Randomly sample train and test indices
train_indices = np.random.choice(dataset_indices, size=train_size,
replace=False)
test_indices = np.array(list(set(dataset_indices) - set(train_indices)))
# Create dataoaders
train_sampler = SubsetRandomSampler(train_indices)
test_sampler = SubsetRandomSampler(test_indices)
train_loader = DataLoader(dataset, batch_size=batch_size,
sampler=train_sampler,
collate_fn=collate_graph_dataset)
test_loader = DataLoader(dataset, batch_size=batch_size,
sampler=test_sampler,
collate_fn=collate_graph_dataset)
#### Initialize model, standardizer, optimizer, and loss function
# Model
model = ChemGCN(node_vec_len=node_vec_len, node_fea_len=hidden_nodes,
hidden_fea_len=hidden_nodes, n_conv=n_conv_layers,
n_hidden=n_hidden_layers, n_outputs=1, p_dropout=0.1)
# Transfer to GPU if needed
if use_GPU:
model.cuda()
# Standardizer
outputs = [dataset[i][1] for i in range(len(dataset))]
standardizer = Standardizer(torch.Tensor(outputs))
# Optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
# Loss function
loss_fn = torch.nn.MSELoss()
#### Train the model
loss = []
mae = []
epoch = []
for i in range(n_epochs):
epoch_loss, epoch_mae = train_model(
i,
model,
train_loader,
optimizer,
loss_fn,
standardizer,
use_GPU,
max_atoms,
node_vec_len,
)
loss.append(epoch_loss)
mae.append(epoch_mae)
epoch.append(i)
#### Test the model
# Call test model function
test_loss, test_mae = test_model(model, test_loader, loss_fn, standardizer,
use_GPU, max_atoms, node_vec_len)
#### Print final results
print(f"Training Loss: {loss[-1]:.2f}")
print(f"Training MAE: {mae[-1]:.2f}")
print(f"Test Loss: {test_loss:.2f}")
print(f"Test MAE: {test_mae:.2f}")
结果
在给定的架构和超参数下,一个网络被训练在来自开源DeepChem仓库的溶解度数据集上,该数据集包含约1000个小分子的水溶解度。下面的图显示了一个特定训练-测试分层的训练损失曲线和测试集的对等图。训练和测试集的平均绝对误差分别为0.59和0.58(以对数摩尔/升为单位),低于数据集中预测的基于线性模型的0.69对数摩尔/升。神经网络表现优于线性回归模型并不令人惊讶;尽管如此,这种初步比较使我们确信我们模型所做的预测是合理的。此外,我们仅通过在图中结合基本的结构描述符 —— 原子序数和键长 —— 以及让卷积和池化函数构建这些之间更复杂的关系,就实现了对分子属性的最准确预测。