扩散自回归变压器在时间序列预测中的应用解析

2024年10月22日 由 alex 发表 167 0

时间序列在各个领域都非常重要,因为它能够提供对未来数据点的准确预测,进而改善决策、资源分配和风险管理。这种能力可显著改善运营并带来战略优势,尤其是在金融、医疗保健和能源管理等领域。


深度神经网络已成为时间序列预测的一种流行而有效的解决方案范例,这反映了人们对利用先进的机器学习技术来处理复杂的序列数据的兴趣与日俱增。


自我监督学习

一种模型通过内部生成监督信号(通常是通过借口任务)从无标记数据中学习的范式。


与需要标记数据的监督学习不同,自监督学习利用数据的内在结构来创建训练所需的标签。


时间序列的自监督学习:

在自监督学习的背景下,时间序列提供了独特的能力,可以开发出从无标签数据中学习通用表征的模型。


这种方法允许模型同时捕捉长期依赖性和局部细节特征,从而增强了时间序列预测能力。然而,有效捕捉这些方面仍然具有挑战性,因此需要像 TimeDART(本文)这样的创新方法来应对这些挑战。


问题

时间序列面临的挑战是如何利用自监督学习方法有效捕捉全局序列依赖性和局部细节特征。


传统方法难以完成这一双重任务,影响了它们学习时间序列数据的全面性和表现力。


TimeDarT

TimeDART 是 Diffusion Auto-regressive Transformer 的缩写,是一种自监督学习方法,专门用于时间序列预测。它旨在通过学习时间序列中过去数据的模式来改进对未来数据点的预测。它就像把时间序列数据分解成更小的片段,即补丁,并把这些补丁作为建模的基本单位。


研究人员使用带有自我关注机制的 Transformer 编码器来理解这些片段之间的依赖关系,从而有效地捕捉数据的整体序列结构。


扩散和去噪这两个过程用于处理每个片段内的细节特征。这两个过程通过添加和去除数据中的噪声来捕捉局部特征(这在所有扩散模型中都是非常典型的过程)。事实上,这有助于模型更好地处理细节模式。


TimeDART 架构:


3


实例规范化和修补嵌入


4


第一步是对输入的多元时间序列数据进行实例归一化(归一化),以确保每个实例的平均值为零,标准差为单位,这有助于保持最终预测的一致性。


时间序列数据被划分为片段,而不是单个点,这样可以捕捉到更全面的局部信息。


为避免信息泄露,我们将补丁长度设置为与步长相等,这有助于确保每个补丁只包含原始序列的非重叠片段。


针对补丁间依赖关系的Transformer编码器


5


我们在架构中采用了基于自我注意的Transformer编码器,这有助于为补丁之间的依赖关系建模。


这种方法通过考虑时间序列数据不同补丁之间的关系,有助于捕捉全局序列依赖关系。


Transformer 编码器的使用使 TimeDART 能够学习有意义的补丁间表示,这对理解时间序列的高层结构至关重要。


class TransformerEncoderBlock(nn.Module):
    def __init__(
        self, d_model: int, num_heads: int, feedforward_dim: int, dropout: float
    ):
        super(TransformerEncoderBlock, self).__init__()
        self.attention = nn.MultiheadAttention(
            embed_dim=d_model, num_heads=num_heads, dropout=dropout, batch_first=True
        )
        self.norm1 = nn.LayerNorm(d_model)
        self.ff = nn.Sequential(
            nn.Linear(d_model, feedforward_dim),
            nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(feedforward_dim, d_model),
        )
        self.conv1 = nn.Conv1d(in_channels=d_model, out_channels=feedforward_dim, kernel_size=1)
        self.activation = nn.GELU()
        self.conv2 = nn.Conv1d(in_channels=feedforward_dim, out_channels=d_model, kernel_size=1)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
    def forward(self, x, mask):
        """
        :param x: [batch_size * num_features, seq_len, d_model]
        :param mask: [1, 1, seq_len, seq_len]
        :return: [batch_size * num_features, seq_len, d_model]
        """
        # Self-attention
        attn_output, _ = self.attention(x, x, x, attn_mask=mask)
        x = self.norm1(x + self.dropout(attn_output))
        # Feed-forward network
        # y = self.dropout(self.activation(self.conv1(y.permute(0, 2, 1))))
        # ff_output = self.conv2(y).permute(0, 2, 1)
        ff_output = self.ff(x)
        output = self.norm2(x + self.dropout(ff_output))
        return output


前向扩散过程


6


在前向扩散过程中,输入块上会出现噪声。这一步骤对于生成自监督信号至关重要,可使模型通过从噪声版本中重建原始数据来学习稳健的表征。


这种噪声有助于模型识别和关注时间序列数据中的内在模式。


class Diffusion(nn.Module):
    def __init__(
        self,
        time_steps: int,
        device: torch.device,
        scheduler: str = "cosine",
    ):
        super(Diffusion, self).__init__()
        self.device = device
        self.time_steps = time_steps
        if scheduler == "cosine":
            self.betas = self._cosine_beta_schedule().to(self.device)
        elif scheduler == "linear":
            self.betas = self._linear_beta_schedule().to(self.device)
        else:
            raise ValueError(f"Invalid scheduler: {scheduler=}")
        self.alpha = 1 - self.betas
        self.gamma = torch.cumprod(self.alpha, dim=0).to(self.device)
    def _cosine_beta_schedule(self, s=0.008):
        steps = self.time_steps + 1
        x = torch.linspace(0, self.time_steps, steps)
        alphas_cumprod = (
            torch.cos(((x / self.time_steps) + s) / (1 + s) * torch.pi * 0.5) ** 2
        )
        alphas_cumprod = alphas_cumprod / alphas_cumprod[0]
        betas = 1 - (alphas_cumprod[1:] / alphas_cumprod[:-1])
        return torch.clip(betas, 0, 0.999)
    def _linear_beta_schedule(self, beta_start=1e-4, beta_end=0.02):
        betas = torch.linspace(beta_start, beta_end, self.time_steps)
        return betas
    def sample_time_steps(self, shape):
        return torch.randint(0, self.time_steps, shape, device=self.device)
    def noise(self, x, t):
        noise = torch.randn_like(x)
        gamma_t = self.gamma[t].unsqueeze(-1)  # [batch_size * num_features, seq_len, 1]
        # x_t = sqrt(gamma_t) * x + sqrt(1 - gamma_t) * noise
        noisy_x = torch.sqrt(gamma_t) * x + torch.sqrt(1 - gamma_t) * noise
        return noisy_x, noise
    def forward(self, x):
        # x: [batch_size * num_features, seq_len, patch_len]
        t = self.sample_time_steps(x.shape[:2])  # [batch_size * num_features, seq_len]
        noisy_x, noise = self.noise(x, t)
        return noisy_x, noise, t


基于交叉注意的去噪解码器

去噪解码器采用交叉注意机制来重建无噪声的原始补丁。


这样就可以调整优化难度,使自我监督任务更加有效,并使模型能够专注于捕捉详细的片段内特征。这种设计提高了模型有效学习局部和全局特征的能力。


7


它接收噪声(作为查询)和编码器的输出(键和值),我们对解码器进行屏蔽,以确保噪声中的第 j 个输入与Transformer编码器的第 j 个输出相对应。


class TransformerDecoderBlock(nn.Module):
    def __init__(
        self, d_model: int, num_heads: int, feedforward_dim: int, dropout: float
    ):
        super(TransformerDecoderBlock, self).__init__()
        self.self_attention = nn.MultiheadAttention(
            embed_dim=d_model, num_heads=num_heads, dropout=dropout, batch_first=True
        )
        self.norm1 = nn.LayerNorm(d_model)
        self.encoder_attention = nn.MultiheadAttention(
            embed_dim=d_model, num_heads=num_heads, dropout=dropout, batch_first=True
        )
        self.norm2 = nn.LayerNorm(d_model)
        self.ff = nn.Sequential(
            nn.Linear(d_model, feedforward_dim),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(feedforward_dim, d_model),
        )
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
    def forward(self, query, key, value, tgt_mask, src_mask):
        """
        :param query: [batch_size * num_features, seq_len, d_model]
        :param key: [batch_size * num_features, seq_len, d_model]
        :param value: [batch_size * num_features, seq_len, d_model]
        :param mask: [1, 1, seq_len, seq_len]
        :return: [batch_size * num_features, seq_len, d_model]
        """
        # Self-attention
        attn_output, _ = self.self_attention(query, query, query, attn_mask=tgt_mask)
        query = self.norm1(query + self.dropout(attn_output))
        # Encoder attention
        attn_output, _ = self.encoder_attention(query, key, value, attn_mask=src_mask)
        query = self.norm2(query + self.dropout(attn_output))
        # Feed-forward network
        ff_output = self.ff(query)
        x = self.norm3(query + self.dropout(ff_output))
        return x


全局依赖关系的自动递归生成


8


其职责是捕捉时间序列中的高层次全局依赖关系 。通过自动回归还原原始序列,模型可以理解  整体的时间模式 和依赖关系,从而提高预测能力。


class DenoisingPatchDecoder(nn.Module):
    def __init__(
        self,
        d_model: int,
        num_heads: int,
        num_layers: int,
        feedforward_dim: int,
        dropout: float,
    ):
        super(DenoisingPatchDecoder, self).__init__()
        self.layers = nn.ModuleList(
            [
                TransformerDecoderBlock(d_model, num_heads, feedforward_dim, dropout)
                for _ in range(num_layers)
            ]
        )
        self.norm = nn.LayerNorm(d_model)
    def forward(self, query, key, value, is_tgt_mask=True, is_src_mask=True):
        seq_len = query.size(1)
        tgt_mask = (
            generate_self_only_mask(seq_len).to(query.device) if is_tgt_mask else None
        )
        src_mask = (
            generate_self_only_mask(seq_len).to(query.device) if is_src_mask else None
        )
        for layer in self.layers:
            query = layer(query, key, value, tgt_mask, src_mask)
        x = self.norm(query)
        return x

class ForecastingHead(nn.Module):
    def __init__(
        self,
        seq_len: int,
        d_model: int,
        pred_len: int,
        dropout: float,
    ):
        super(ForecastingHead, self).__init__()
        self.pred_len = pred_len
        self.flatten = nn.Flatten(start_dim=-2)
        self.forecast_head = nn.Linear(seq_len * d_model, pred_len)
        self.dropout = nn.Dropout(dropout)
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """
        :param x: [batch_size, num_features, seq_len, d_model]
        :return: [batch_size, pred_len, num_features]
        """
        x = self.flatten(x)  # (batch_size, num_features, seq_len * d_model)
        x = self.forecast_head(x)  # (batch_size, num_features, pred_len)
        x = self.dropout(x)  # (batch_size, num_features, pred_len)
        x = x.permute(0, 2, 1)  # (batch_size, pred_len, num_features)
        return x


优化和微调

最后,以自动回归的方式对整个模型进行优化,以获得可针对特定预测任务进行微调的可转移表征。这一步骤可确保模型学习到的表征既全面又能适应各种下游应用,从而在时间序列预测中实现卓越性能。


评估:

数据集


9


为了测试 TimeDART 模型在时间序列预测中的有效性,我们使用八个常用数据集对其进行了评估。这些数据集包括四个 ETT 数据集(ETTh1、ETTh2、ETTm1、ETTm2),以及天气、交易所、电力和交通数据集。


这些数据集包括一系列应用场景,如电力系统、交通网络和天气预报。


结果


10

11

12


文章来源:https://medium.com/towards-artificial-intelligence/diffusion-auto-regressive-transformer-for-effective-self-supervised-time-series-forecasting-31b7d7ba2062
欢迎关注ATYUN官方公众号
商务合作及内容投稿请联系邮箱:bd@atyun.com
评论 登录
热门职位
Maluuba
20000~40000/月
Cisco
25000~30000/月 深圳市
PilotAILabs
30000~60000/年 深圳市
写评论取消
回复取消