介绍
RAPIDs生态系统是由NVIDIA支持的开源软件集合,利用NVIDIA的GPU接口层CUDA,提供更高级的GPU加速抽象。其中一个关键示例是spark-rapids。spark-rapids是Spark的扩展,可以在Spark查询执行过程中实现基于GPU的优化。在本文中,我们将参考spark-rapids源代码,在展示一个示例计划优化的过程中,重点关注基本过滤操作在GPU上的实现。
开发环境
与spark-rapids存储库一起工作需要安装CUDA驱动程序和兼容的GPU。我们可以通过分配一个g4dn.xlarge AWS EC2实例,并使用免费的“带有NVIDIA TESLA GPU驱动程序的Amazon Linux 2 AMI” AMI来满足这些要求。然后,我们可以发出以下命令来正确设置我们的环境,以进行VSCode远程SSH会话:
sudo yum install git
git clone <https://github.com/NVIDIA/spark-rapids.git>clone <https://github.com/NVIDIA/spark-rapids.git>
# Note - important to use this version of Maven.
wget <https://archive.apache.org/dist/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.tar.gz>
tar xvf apache-maven-3.6.3-bin.tar.gz
echo "export PATH=$PATH:~/apache-maven-3.6.3/bin" >> ~/.bashrc
source ~/.bashrc
# Note - important to use Java 11 with VSCode.
# See: <https://github.com/NVIDIA/spark-rapids/issues/9542>.
sudo yum install java-11-amazon-corretto.x86_64
sudo yum install jq
cd spark-rapids
git checkout branch-23.04
mvn -U clean install -DskipTests -Dmaven.javadoc.skip
# Note - important to use spark-rapid's bloop instead of VSCode plugin bloop.
# See: <https://github.com/NVIDIA/spark-rapids/issues/9542>.
./build/buildall --generate-bloop --profile=311
ln -s .bloop-spark311 .bloop
配置了VSCode远程SSH后,通过安装Scala Metals插件来完成设置。请注意,拒绝弹出提示窗口以导入Maven构建是很重要的,这将强制Metals使用我们自定义生成的bloop安装。
计划修改
Spark将SparkSessionExtension暴露为集成新规则的入口点。调用者实现一个接受该类的可调用对象,然后在会话初始化过程中使用接口方法来注册自定义规则。spark-rapids在其SQLExecPlugin中调用了几个SparkSessionExtension方法。
class SQLExecPlugin extends (SparkSessionExtensions => Unit) with Logging {
...
override def apply(extensions: SparkSessionExtensions): Unit = {
extensions.injectColumnar(columnarOverrides)
extensions.injectQueryStagePrepRule(queryStagePrepOverrides)
extensions.injectPlannerStrategy(_ => strategyRules)
}
...
}
每个这些inject调用都代表了不同的规则类别。Spark使用的规则类别包括:
1. Columnar Rules(列规则) - 用于将物理计划中的运算符替换为列式版本,并用自定义版本替换列到行转换运算符的规则。
2. Query Stage Prep Rules(查询阶段准备规则) - 在自适应查询执行准备阶段应用的规则。
3. Strategy Rules(策略规则) - 告诉Spark如何将逻辑计划转换为物理计划的规则。
spark-rapids的大部分计划修改逻辑都存在于列规则中,因此我们将重点关注这个规则类别。
在进入列重写注册代码时,我们可以看到ColumnarOverrideRules类是由GpuOverrides和GpuTransitionOverrides套件成员组成的。
case class ColumnarOverrideRules() extends ColumnarRule with Logging {
lazy val overrides: Rule[SparkPlan] = GpuOverrides()
lazy val overrideTransitions: Rule[SparkPlan] = new GpuTransitionOverrides()
override def preColumnarTransitions : Rule[SparkPlan] = overrides
override def postColumnarTransitions: Rule[SparkPlan] = overrideTransitions
}
这些覆盖类中的每个类都暴露了一个应用方法,该方法在Spark框架在计划生成的各个阶段调用。我们可以通过查看Spark代码库中相应部分,来了解将这些列式规则应用于查询计划的确切机制。
def apply(plan: SparkPlan): SparkPlan = {apply(plan: SparkPlan): SparkPlan = {
var preInsertPlan: SparkPlan = plan
columnarRules.foreach((r : ColumnarRule) =>
preInsertPlan = r.preColumnarTransitions(preInsertPlan))
var postInsertPlan = insertTransitions(preInsertPlan)
columnarRules.reverse.foreach((r : ColumnarRule) =>
postInsertPlan = r.postColumnarTransitions(postInsertPlan))
postInsertPlan
}
我们看到首先应用的是preColumnarRules。然后,在insertTransitions中,框架会根据计划所需的格式自动插入行到列转换操作符。最后,调用者有机会对带有转换的计划执行另一组postColumnarTransitions规则。
进入GpuOverrides,我们看到大部分逻辑在applyOverrides中应用。
private def applyOverrides(plan: SparkPlan, conf: RapidsConf): SparkPlan = {applyOverrides(plan: SparkPlan, conf: RapidsConf): SparkPlan = {
val wrap = GpuOverrides.wrapAndTagPlan(plan, conf)
val detectDeltaCheckpoint = conf.isDetectDeltaCheckpointQueries
if (conf.isDetectDeltaLogQueries && isDeltaLakeMetadataQuery(plan, detectDeltaCheckpoint)) {
wrap.entirePlanWillNotWork("Delta Lake metadata queries are not efficient on GPU")
}
val reasonsToNotReplaceEntirePlan = wrap.getReasonsNotToReplaceEntirePlan
if (conf.allowDisableEntirePlan && reasonsToNotReplaceEntirePlan.nonEmpty) {
if (conf.shouldExplain) {
logWarning("Can't replace any part of this plan due to: " +
s"${reasonsToNotReplaceEntirePlan.mkString(",")}")
}
plan
} else {
val optimizations = GpuOverrides.getOptimizations(wrap, conf)
wrap.runAfterTagRules()
if (conf.shouldExplain) {
wrap.tagForExplain()
val explain = wrap.explain(conf.shouldExplainAll)
if (explain.nonEmpty) {
logWarning(s"\\n$explain")
if (conf.optimizerShouldExplainAll && optimizations.nonEmpty) {
logWarning(s"Cost-based optimizations applied:\\n${optimizations.mkString("\\n")}")
}
}
}
GpuOverrides.doConvertPlan(wrap, conf, optimizations)
}
}
这些函数驱动优化:
wrapAndTagPlan - 将原始的SparkPlan转换为包含每个节点GPU相关元数据的“包装”版本。主要的元数据类在RapidsMeta文件中。
getOptimizations - 使用基于成本的优化器消除GPU调度导致整体查询性能变差的情况。
doConvertPlan - 将包装后的计划转换回包含适用的GPU运算符的SparkPlan。
现在让我们来看一个完整的计划在生成过程中的情况。我们将使用一个测试用例作为示例。FilterExprSuite提供了一些基本的计划供检查。例如,此测试向DataFrame发出了几个简单的过滤器:
testSparkResultsAreEqual("filter with decimal columns", mixedDf(_), repart = 0) { df =>
df.filter(col("ints") > 90)
.filter(col("decimals").isNotNull)
.select("ints", "strings", "decimals")
}
检查原始的Spark计划,我们可以看到此查询由一个扫描操作(加载)、一个过滤操作,以及一个项目操作(选择我们的列)组成。
ProjectExec@15413 "Project [ints#118, strings#121, decimals#122]
+- Filter ((isnotnull(ints#118) AND (ints#118 > 90)) AND isnotnull(decimals#122))
+- Scan ExistingRDD[ints#118,longs#119L,doubles#120,strings#121,decimals#122]
在此计划经过spark-rapids的修改的wrapAndTagPlan阶段后,我们看到计划被丰富了额外的信息,指示哪些操作将在GPU上运行。
GpuProjectExecMeta@15444 "*Exec <ProjectExec> will run on GPU
*Exec <FilterExec> will run on GPU
*Expression <And> ((isnotnull(ints#118) AND (ints#118 > 90)) AND isnotnull(decimals#122)) will run on GPU
*Expression <And> (isnotnull(ints#118) AND (ints#118 > 90)) will run on GPU
*Expression <IsNotNull> isnotnull(ints#118) will run on GPU
*Expression <GreaterThan> (ints#118 > 90) will run on GPU
*Expression <IsNotNull> isnotnull(decimals#122) will run on GPU
! <RDDScanExec> cannot run on GPU because GPU does not currently support the operator class org.apache.spark.sql.execution.RDDScanExec
将convertPlan的调用将我们的计划转换为SparkPlan原语,并且计划中的运算符已被替换为其GPU等效物。
GpuProjectExec@15543 "GpuProject [ints#118, strings#121, decimals#122], true
+- GpuFilter ((gpuisnotnull(ints#118) AND (ints#118 > 90)) AND gpuisnotnull(decimals#122)), true
+- Scan ExistingRDD[ints#118,longs#119L,doubles#120,strings#121,decimals#122]
在Spark原生过渡阶段期间,我们可以看到自动向计划中插入了一个RowToColumnar运算符,以确保传入GpuFilter的数据处于适当的格式。
ColumnarToRowExec@15573 "ColumnarToRow
+- GpuProject [ints#118, strings#121, decimals#122], true
+- GpuFilter ((gpuisnotnull(ints#118) AND (ints#118 > 90)) AND gpuisnotnull(decimals#122)), true
+- RowToColumnar
+- Scan ExistingRDD[ints#118,longs#119L,doubles#120,strings#121,decimals#122]
最后,postColumnarTransitions规则将Spark原生的转换操作符替换为优化后的spark-rapids版本。
GpuColumnarToRowExec@15599 "GpuColumnarToRow false
+- GpuProject [ints#118, strings#121, decimals#122], true
+- GpuCoalesceBatches targetsize(2147483647)
+- GpuFilter ((gpuisnotnull(ints#118) AND (ints#118 > 90)) AND gpuisnotnull(decimals#122)), true
+- GpuRowToColumnar targetsize(2147483647)
+- Scan ExistingRDD[ints#118,longs#119L,doubles#120,strings#121,decimals#122]
最终,spark-rapids构建了一个与原始的Spark计划在逻辑上等价的计划,但插入了经过优化的GPU运算符以提供更好的性能。现在让我们深入了解GPU过滤器运算符的实现,以了解它与原生Spark版本的区别。
过滤器实现——本机与GPU
过滤器运算符的本机Spark实现是在FilterExec中完成的。这个运算符遵循默认的Spark模式,使用代码生成来创建一个最佳管道阶段执行,同时尽量减少虚函数调用的开销。实际上,这意味着FilterExec通过提供一个doConsume函数来实现CodegenSupport接口,该函数返回表示过滤操作所需的Scala代码。
override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
val numOutput = metricTerm(ctx, "numOutputRows")
/**
* Generates code for `c`, using `in` for input attributes and `attrs` for nullability.
*/
def genPredicate(c: Expression, in: Seq[ExprCode], attrs: Seq[Attribute]): String = {
val bound = BindReferences.bindReference(c, attrs)
val evaluated = evaluateRequiredVariables(child.output, in, c.references)
// Generate the code for the predicate.
val ev = ExpressionCanonicalizer.execute(bound).genCode(ctx)
val nullCheck = if (bound.nullable) {
s"${ev.isNull} || "
} else {
s""
}
s"""
|$evaluated
|${ev.code}
|if (${nullCheck}!${ev.value}) continue;
""".stripMargin
}
// To generate the predicates we will follow this algorithm.
// For each predicate that is not IsNotNull, we will generate them one by one loading attributes
// as necessary. For each of both attributes, if there is an IsNotNull predicate we will
// generate that check *before* the predicate. After all of these predicates, we will generate
// the remaining IsNotNull checks that were not part of other predicates.
// This has the property of not doing redundant IsNotNull checks and taking better advantage of
// short-circuiting, not loading attributes until they are needed.
// This is very perf sensitive.
// TODO: revisit this. We can consider reordering predicates as well.
val generatedIsNotNullChecks = new Array[Boolean](notNullPreds.length)
val extraIsNotNullAttrs = mutable.Set[Attribute]()
val generated = otherPreds.map { c =>
val nullChecks = c.references.map { r =>
val idx = notNullPreds.indexWhere { n => n.asInstanceOf[IsNotNull].child.semanticEquals(r)}
if (idx != -1 && !generatedIsNotNullChecks(idx)) {
generatedIsNotNullChecks(idx) = true
// Use the child's output. The nullability is what the child produced.
genPredicate(notNullPreds(idx), input, child.output)
} else if (notNullAttributes.contains(r.exprId) && !extraIsNotNullAttrs.contains(r)) {
extraIsNotNullAttrs += r
genPredicate(IsNotNull(r), input, child.output)
} else {
""
}
}.mkString("\n").trim
// Here we use *this* operator's output with this output's nullability since we already
// enforced them with the IsNotNull checks above.
s"""
|$nullChecks
|${genPredicate(c, input, output)}
""".stripMargin.trim
}.mkString("\n")
val nullChecks = notNullPreds.zipWithIndex.map { case (c, idx) =>
if (!generatedIsNotNullChecks(idx)) {
genPredicate(c, input, child.output)
} else {
""
}
}.mkString("\n")
// Reset the isNull to false for the not-null columns, then the followed operators could
// generate better code (remove dead branches).
val resultVars = input.zipWithIndex.map { case (ev, i) =>
if (notNullAttributes.contains(child.output(i).exprId)) {
ev.isNull = FalseLiteral
}
ev
}
// Note: wrap in "do { } while(false);", so the generated checks can jump out with "continue;"
s"""
|do {
| $generated
| $nullChecks
| $numOutput.add(1);
| ${consume(ctx, resultVars)}
|} while(false);
""".stripMargin
}
相反,GpuFilterExec仅对其正在处理的列式批次调用GPU操作。
override def internalDoExecuteColumnar(): RDD[ColumnarBatch] = {internalDoExecuteColumnar(): RDD[ColumnarBatch] = {
val numOutputRows = gpuLongMetric(NUM_OUTPUT_ROWS)
val numOutputBatches = gpuLongMetric(NUM_OUTPUT_BATCHES)
val opTime = gpuLongMetric(OP_TIME)
val boundCondition = GpuBindReferences.bindReference(condition, child.output)
val rdd = child.executeColumnar()
rdd.map { batch =>
GpuFilter.filterAndClose(batch, boundCondition, numOutputRows, numOutputBatches, opTime)
}
}
进一步迈出一步,我们可以看到过滤操作是使用 ai.rapids.cudf 的基元实现的。
private def doFilter(checkedFilterMask: Option[cudf.ColumnVector],doFilter(checkedFilterMask: Option[cudf.ColumnVector],
cb: ColumnarBatch): ColumnarBatch = {
checkedFilterMask.map { checkedFilterMask =>
withResource(checkedFilterMask) { checkedFilterMask =>
val colTypes = GpuColumnVector.extractTypes(cb)
withResource(GpuColumnVector.from(cb)) { tbl =>
withResource(tbl.filter(checkedFilterMask)) { filteredData =>
GpuColumnVector.from(filteredData, colTypes)
}
}
}
}.getOrElse {
// Nothing to filter so it is a NOOP
GpuColumnVector.incRefCounts(cb)
}
}
总结
其他计划转换的基本机制与上面介绍的示例类似。但是,它们的实现和应用的复杂性不能被低估。spark-rapids是一个支持大规模GPU转换的Scala仓库,拥有20万行代码,自2019年以来一直在开发中。本文对一些机制进行了基本介绍,但要真正掌握这个代码库,需要进行详细的研究和贡献。