使用GPU加速Spark:NVIDIA RAPIDS代码演练

2023年11月03日 由 alex 发表 470 0

介绍


RAPIDs生态系统是由NVIDIA支持的开源软件集合,利用NVIDIA的GPU接口层CUDA,提供更高级的GPU加速抽象。其中一个关键示例是spark-rapids。spark-rapids是Spark的扩展,可以在Spark查询执行过程中实现基于GPU的优化。在本文中,我们将参考spark-rapids源代码,在展示一个示例计划优化的过程中,重点关注基本过滤操作在GPU上的实现。


开发环境


与spark-rapids存储库一起工作需要安装CUDA驱动程序和兼容的GPU。我们可以通过分配一个g4dn.xlarge AWS EC2实例,并使用免费的“带有NVIDIA TESLA GPU驱动程序的Amazon Linux 2 AMI” AMI来满足这些要求。然后,我们可以发出以下命令来正确设置我们的环境,以进行VSCode远程SSH会话:


sudo yum install git
git clone <https://github.com/NVIDIA/spark-rapids.git>clone <https://github.com/NVIDIA/spark-rapids.git>
# Note - important to use this version of Maven.
wget <https://archive.apache.org/dist/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.tar.gz>
tar xvf apache-maven-3.6.3-bin.tar.gz
echo "export PATH=$PATH:~/apache-maven-3.6.3/bin" >> ~/.bashrc
source ~/.bashrc
# Note - important to use Java 11 with VSCode.
# See: <https://github.com/NVIDIA/spark-rapids/issues/9542>.
sudo yum install java-11-amazon-corretto.x86_64
sudo yum install jq
cd spark-rapids
git checkout branch-23.04
mvn -U clean install -DskipTests -Dmaven.javadoc.skip
# Note - important to use spark-rapid's bloop instead of VSCode plugin bloop.
# See: <https://github.com/NVIDIA/spark-rapids/issues/9542>.
./build/buildall --generate-bloop --profile=311
ln -s .bloop-spark311 .bloop


配置了VSCode远程SSH后,通过安装Scala Metals插件来完成设置。请注意,拒绝弹出提示窗口以导入Maven构建是很重要的,这将强制Metals使用我们自定义生成的bloop安装。


计划修改


Spark将SparkSessionExtension暴露为集成新规则的入口点。调用者实现一个接受该类的可调用对象,然后在会话初始化过程中使用接口方法来注册自定义规则。spark-rapids在其SQLExecPlugin中调用了几个SparkSessionExtension方法。


class SQLExecPlugin extends (SparkSessionExtensions => Unit) with Logging {
...
override def apply(extensions: SparkSessionExtensions): Unit = {
  extensions.injectColumnar(columnarOverrides)
  extensions.injectQueryStagePrepRule(queryStagePrepOverrides)
  extensions.injectPlannerStrategy(_ => strategyRules)
}
...
}


每个这些inject调用都代表了不同的规则类别。Spark使用的规则类别包括:


1. Columnar Rules(列规则) - 用于将物理计划中的运算符替换为列式版本,并用自定义版本替换列到行转换运算符的规则。
2. Query Stage Prep Rules(查询阶段准备规则) - 在自适应查询执行准备阶段应用的规则。
3. Strategy Rules(策略规则) - 告诉Spark如何将逻辑计划转换为物理计划的规则。


spark-rapids的大部分计划修改逻辑都存在于列规则中,因此我们将重点关注这个规则类别。


在进入列重写注册代码时,我们可以看到ColumnarOverrideRules类是由GpuOverrides和GpuTransitionOverrides套件成员组成的。


case class ColumnarOverrideRules() extends ColumnarRule with Logging {
  lazy val overrides: Rule[SparkPlan] = GpuOverrides()
  lazy val overrideTransitions: Rule[SparkPlan] = new GpuTransitionOverrides()
  override def preColumnarTransitions : Rule[SparkPlan] = overrides
  
  override def postColumnarTransitions: Rule[SparkPlan] = overrideTransitions
}


这些覆盖类中的每个类都暴露了一个应用方法,该方法在Spark框架在计划生成的各个阶段调用。我们可以通过查看Spark代码库中相应部分,来了解将这些列式规则应用于查询计划的确切机制。


def apply(plan: SparkPlan): SparkPlan = {apply(plan: SparkPlan): SparkPlan = {
    var preInsertPlan: SparkPlan = plan
    columnarRules.foreach((r : ColumnarRule) =>
      preInsertPlan = r.preColumnarTransitions(preInsertPlan))
    var postInsertPlan = insertTransitions(preInsertPlan)
    columnarRules.reverse.foreach((r : ColumnarRule) =>
      postInsertPlan = r.postColumnarTransitions(postInsertPlan))
    postInsertPlan
  }


我们看到首先应用的是preColumnarRules。然后,在insertTransitions中,框架会根据计划所需的格式自动插入行到列转换操作符。最后,调用者有机会对带有转换的计划执行另一组postColumnarTransitions规则。


进入GpuOverrides,我们看到大部分逻辑在applyOverrides中应用。


private def applyOverrides(plan: SparkPlan, conf: RapidsConf): SparkPlan = {applyOverrides(plan: SparkPlan, conf: RapidsConf): SparkPlan = {
    val wrap = GpuOverrides.wrapAndTagPlan(plan, conf)
    val detectDeltaCheckpoint = conf.isDetectDeltaCheckpointQueries
    if (conf.isDetectDeltaLogQueries && isDeltaLakeMetadataQuery(plan, detectDeltaCheckpoint)) {
      wrap.entirePlanWillNotWork("Delta Lake metadata queries are not efficient on GPU")
    }
    val reasonsToNotReplaceEntirePlan = wrap.getReasonsNotToReplaceEntirePlan
    if (conf.allowDisableEntirePlan && reasonsToNotReplaceEntirePlan.nonEmpty) {
      if (conf.shouldExplain) {
        logWarning("Can't replace any part of this plan due to: " +
            s"${reasonsToNotReplaceEntirePlan.mkString(",")}")
      }
      plan
    } else {
      val optimizations = GpuOverrides.getOptimizations(wrap, conf)
      wrap.runAfterTagRules()
      if (conf.shouldExplain) {
        wrap.tagForExplain()
        val explain = wrap.explain(conf.shouldExplainAll)
        if (explain.nonEmpty) {
          logWarning(s"\\n$explain")
          if (conf.optimizerShouldExplainAll && optimizations.nonEmpty) {
            logWarning(s"Cost-based optimizations applied:\\n${optimizations.mkString("\\n")}")
          }
        }
      }
      GpuOverrides.doConvertPlan(wrap, conf, optimizations)
    }
  }


这些函数驱动优化:


wrapAndTagPlan - 将原始的SparkPlan转换为包含每个节点GPU相关元数据的“包装”版本。主要的元数据类在RapidsMeta文件中。
getOptimizations - 使用基于成本的优化器消除GPU调度导致整体查询性能变差的情况。
doConvertPlan - 将包装后的计划转换回包含适用的GPU运算符的SparkPlan。


现在让我们来看一个完整的计划在生成过程中的情况。我们将使用一个测试用例作为示例。FilterExprSuite提供了一些基本的计划供检查。例如,此测试向DataFrame发出了几个简单的过滤器:


testSparkResultsAreEqual("filter with decimal columns", mixedDf(_), repart = 0) { df =>
    df.filter(col("ints") > 90)
      .filter(col("decimals").isNotNull)
      .select("ints", "strings", "decimals")
  }


检查原始的Spark计划,我们可以看到此查询由一个扫描操作(加载)、一个过滤操作,以及一个项目操作(选择我们的列)组成。


ProjectExec@15413 "Project [ints#118, strings#121, decimals#122]
+- Filter ((isnotnull(ints#118) AND (ints#118 > 90)) AND isnotnull(decimals#122))
   +- Scan ExistingRDD[ints#118,longs#119L,doubles#120,strings#121,decimals#122]


在此计划经过spark-rapids的修改的wrapAndTagPlan阶段后,我们看到计划被丰富了额外的信息,指示哪些操作将在GPU上运行。


GpuProjectExecMeta@15444 "*Exec <ProjectExec> will run on GPU
  *Exec <FilterExec> will run on GPU
    *Expression <And> ((isnotnull(ints#118) AND (ints#118 > 90)) AND isnotnull(decimals#122)) will run on GPU
      *Expression <And> (isnotnull(ints#118) AND (ints#118 > 90)) will run on GPU
        *Expression <IsNotNull> isnotnull(ints#118) will run on GPU
        *Expression <GreaterThan> (ints#118 > 90) will run on GPU
      *Expression <IsNotNull> isnotnull(decimals#122) will run on GPU
    ! <RDDScanExec> cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.execution.RDDScanExec


将convertPlan的调用将我们的计划转换为SparkPlan原语,并且计划中的运算符已被替换为其GPU等效物。


GpuProjectExec@15543 "GpuProject [ints#118, strings#121, decimals#122], true
+- GpuFilter ((gpuisnotnull(ints#118) AND (ints#118 > 90)) AND gpuisnotnull(decimals#122)), true
   +- Scan ExistingRDD[ints#118,longs#119L,doubles#120,strings#121,decimals#122]


在Spark原生过渡阶段期间,我们可以看到自动向计划中插入了一个RowToColumnar运算符,以确保传入GpuFilter的数据处于适当的格式。


ColumnarToRowExec@15573 "ColumnarToRow
+- GpuProject [ints#118, strings#121, decimals#122], true
   +- GpuFilter ((gpuisnotnull(ints#118) AND (ints#118 > 90)) AND gpuisnotnull(decimals#122)), true
      +- RowToColumnar
         +- Scan ExistingRDD[ints#118,longs#119L,doubles#120,strings#121,decimals#122]


最后,postColumnarTransitions规则将Spark原生的转换操作符替换为优化后的spark-rapids版本。


GpuColumnarToRowExec@15599 "GpuColumnarToRow false
+- GpuProject [ints#118, strings#121, decimals#122], true
   +- GpuCoalesceBatches targetsize(2147483647)
      +- GpuFilter ((gpuisnotnull(ints#118) AND (ints#118 > 90)) AND gpuisnotnull(decimals#122)), true
         +- GpuRowToColumnar targetsize(2147483647)
            +- Scan ExistingRDD[ints#118,longs#119L,doubles#120,strings#121,decimals#122]


最终,spark-rapids构建了一个与原始的Spark计划在逻辑上等价的计划,但插入了经过优化的GPU运算符以提供更好的性能。现在让我们深入了解GPU过滤器运算符的实现,以了解它与原生Spark版本的区别。


过滤器实现——本机与GPU


过滤器运算符的本机Spark实现是在FilterExec中完成的。这个运算符遵循默认的Spark模式,使用代码生成来创建一个最佳管道阶段执行,同时尽量减少虚函数调用的开销。实际上,这意味着FilterExec通过提供一个doConsume函数来实现CodegenSupport接口,该函数返回表示过滤操作所需的Scala代码。


override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
    val numOutput = metricTerm(ctx, "numOutputRows")
    /**
     * Generates code for `c`, using `in` for input attributes and `attrs` for nullability.
     */
    def genPredicate(c: Expression, in: Seq[ExprCode], attrs: Seq[Attribute]): String = {
      val bound = BindReferences.bindReference(c, attrs)
      val evaluated = evaluateRequiredVariables(child.output, in, c.references)
      // Generate the code for the predicate.
      val ev = ExpressionCanonicalizer.execute(bound).genCode(ctx)
      val nullCheck = if (bound.nullable) {
        s"${ev.isNull} || "
      } else {
        s""
      }
      s"""
         |$evaluated
         |${ev.code}
         |if (${nullCheck}!${ev.value}) continue;
       """.stripMargin
    }
    // To generate the predicates we will follow this algorithm.
    // For each predicate that is not IsNotNull, we will generate them one by one loading attributes
    // as necessary. For each of both attributes, if there is an IsNotNull predicate we will
    // generate that check *before* the predicate. After all of these predicates, we will generate
    // the remaining IsNotNull checks that were not part of other predicates.
    // This has the property of not doing redundant IsNotNull checks and taking better advantage of
    // short-circuiting, not loading attributes until they are needed.
    // This is very perf sensitive.
    // TODO: revisit this. We can consider reordering predicates as well.
    val generatedIsNotNullChecks = new Array[Boolean](notNullPreds.length)
    val extraIsNotNullAttrs = mutable.Set[Attribute]()
    val generated = otherPreds.map { c =>
      val nullChecks = c.references.map { r =>
        val idx = notNullPreds.indexWhere { n => n.asInstanceOf[IsNotNull].child.semanticEquals(r)}
        if (idx != -1 && !generatedIsNotNullChecks(idx)) {
          generatedIsNotNullChecks(idx) = true
          // Use the child's output. The nullability is what the child produced.
          genPredicate(notNullPreds(idx), input, child.output)
        } else if (notNullAttributes.contains(r.exprId) && !extraIsNotNullAttrs.contains(r)) {
          extraIsNotNullAttrs += r
          genPredicate(IsNotNull(r), input, child.output)
        } else {
          ""
        }
      }.mkString("\n").trim
      // Here we use *this* operator's output with this output's nullability since we already
      // enforced them with the IsNotNull checks above.
      s"""
         |$nullChecks
         |${genPredicate(c, input, output)}
       """.stripMargin.trim
    }.mkString("\n")
    val nullChecks = notNullPreds.zipWithIndex.map { case (c, idx) =>
      if (!generatedIsNotNullChecks(idx)) {
        genPredicate(c, input, child.output)
      } else {
        ""
      }
    }.mkString("\n")
    // Reset the isNull to false for the not-null columns, then the followed operators could
    // generate better code (remove dead branches).
    val resultVars = input.zipWithIndex.map { case (ev, i) =>
      if (notNullAttributes.contains(child.output(i).exprId)) {
        ev.isNull = FalseLiteral
      }
      ev
    }
    // Note: wrap in "do { } while(false);", so the generated checks can jump out with "continue;"
    s"""
       |do {
       |  $generated
       |  $nullChecks
       |  $numOutput.add(1);
       |  ${consume(ctx, resultVars)}
       |} while(false);
     """.stripMargin
  }


相反,GpuFilterExec仅对其正在处理的列式批次调用GPU操作。


override def internalDoExecuteColumnar(): RDD[ColumnarBatch] = {internalDoExecuteColumnar(): RDD[ColumnarBatch] = {
    val numOutputRows = gpuLongMetric(NUM_OUTPUT_ROWS)
    val numOutputBatches = gpuLongMetric(NUM_OUTPUT_BATCHES)
    val opTime = gpuLongMetric(OP_TIME)
    val boundCondition = GpuBindReferences.bindReference(condition, child.output)
    val rdd = child.executeColumnar()
    rdd.map { batch =>
      GpuFilter.filterAndClose(batch, boundCondition, numOutputRows, numOutputBatches, opTime)
    }
  }


进一步迈出一步,我们可以看到过滤操作是使用 ai.rapids.cudf 的基元实现的。


private def doFilter(checkedFilterMask: Option[cudf.ColumnVector],doFilter(checkedFilterMask: Option[cudf.ColumnVector],
      cb: ColumnarBatch): ColumnarBatch = {
    checkedFilterMask.map { checkedFilterMask =>
      withResource(checkedFilterMask) { checkedFilterMask =>
        val colTypes = GpuColumnVector.extractTypes(cb)
        withResource(GpuColumnVector.from(cb)) { tbl =>
          withResource(tbl.filter(checkedFilterMask)) { filteredData =>
            GpuColumnVector.from(filteredData, colTypes)
          }
        }
      }
    }.getOrElse {
      // Nothing to filter so it is a NOOP
      GpuColumnVector.incRefCounts(cb)
    }
  }


总结


其他计划转换的基本机制与上面介绍的示例类似。但是,它们的实现和应用的复杂性不能被低估。spark-rapids是一个支持大规模GPU转换的Scala仓库,拥有20万行代码,自2019年以来一直在开发中。本文对一些机制进行了基本介绍,但要真正掌握这个代码库,需要进行详细的研究和贡献。


文章来源:https://medium.com/itnext/accelerating-spark-with-gpus-nvidia-rapids-code-walkthrough-447c50d15b2a
欢迎关注ATYUN官方公众号
商务合作及内容投稿请联系邮箱:bd@atyun.com
评论 登录
热门职位
Maluuba
20000~40000/月
Cisco
25000~30000/月 深圳市
PilotAILabs
30000~60000/年 深圳市
写评论取消
回复取消