DataFlint是用于Apache Spark的开源性能监控库。DataFlint拥有一个对于Spark更易于人阅读的用户界面,它可以提示你性能问题,例如小文件的输入输出问题,并且还能提出改进建议!
数据工程师的经验
我第一次使用 Apache Spark 的大数据应用时,我的Spark作业无法完成,因为我错误地对数据进行了分区,并且无意中写入了数百万个非常小的文件到S3。
随着时间的推移,我发现这种开发经验在大数据领域非常常见。我学会了使用工具,比如 Spark UI,但也发现了它(很多)的局限性,主要是它的可读性不是很友好。
示例
让我们尝试用简单的 PySpark 脚本来重现这个确切的场景:
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Sales Filterer") \
.master("local[1]") \
.getOrCreate()
df = spark.read.load("~/data/store_sales")
df_filtered = df.filter(df.ss_quantity > 1)
df_filtered.write \
.mode("overwrite") \
.partitionBy("ss_quantity") \
.parquet("/tmp/store_sales")
spark.stop()
在这个简单的例子中,我们读取了一些与销售相关的数据,我们通过筛选数量大于1的商品,然后按照数量划分来保存数据。
通过静态分析代码,你无法找到任何性能问题,但是在我的机器上,这个简单的脚本大约需要运行1分钟!
输入DataFlint
为了找到并解决这个性能问题,我们可以使用 DataFlint——一个针对 Apache Spark 的开源性能监控库。
要安装它,我们只需添加这两行代码:
.config("spark.jars.packages", "io.dataflint:spark_2.12:0.1.0") \
.config("spark.plugins", "io.dataflint.spark.SparkDataflintPlugin") \
对我们的Python应用程序:
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Sales Filterer") \
.config("spark.jars.packages", "io.dataflint:spark_2.12:0.1.0") \
.config("spark.plugins", "io.dataflint.spark.SparkDataflintPlugin") \
.master("local[1]") \
.getOrCreate()
df = spark.read.load("~/data/store_sales")
df_filtered = df.filter(df.ss_quantity > 1)
df_filtered.write \
.mode("overwrite") \
.partitionBy("ss_quantity") \
.parquet("/tmp/store_sales")
spark.stop()
现在,当我们打开 Spark UI 时,我们将看到一个新按钮,用于打开 DataFlint
识别性能问题
一旦我们在我们的Spark作业中进入DataFlint,我们可以看到DataFlint在查询运行期间实时地识别出了一个“读取小文件”的问题。
当查询结束时,我们可以看到DataFlint还识别出了一个“写入小分区文件”的问题:
最后,这个非常简单的查询任务,只过滤了大约273MB的数据,却需要51秒钟才能完成。
那么到底发生了什么?我们如何解决这个问题?
小文件在大数据中的输入输出
大数据引擎通常被优化来处理128MB到1GB大小范围的文件,处理像我们案例中的小文件(DataFlint计算的我们读取的每个文件平均大小为85KB)可能会引起各种性能问题,有些问题不如其他问题那么明显。
在我们的情况下,对于HDFS中的每个小文件,Spark在读取时会应用过滤器,然后按照数量划分剩余数据,将每个小文件的各个分区保存回HDFS,结果是写入了更小的文件!
那么,在了解了发生了什么之后,我们该如何修复呢?
修复写入小文件问题
在这种情况下,我们将重点解决写入小文件的问题,因为源表文件大小可能不受我们控制。
幸运的是,DataFlint 为我们识别出了 "写入小分区文件 "问题,甚至还提出了修复建议--告诉 spark 根据表分区键在内存中重新分区数据:
现在,让我们在代码中应用这一修复:
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Sales Filterer") \
.config("spark.jars.packages", "io.dataflint:spark_2.12:0.1.0") \
.config("spark.plugins", "io.dataflint.spark.SparkDataflintPlugin") \
.master("local[1]") \
.getOrCreate()
df = spark.read.load("~/data/store_sales")
df_filtered = df.filter(df.ss_quantity > 1)
df_repartitioned = df_filtered.repartition("ss_quantity") = # THE FIX
df_repartitioned.write \
.mode("overwrite") \
.partitionBy("ss_quantity") \
.parquet("/tmp/store_sales")
spark.stop()
并在 DataFlint 中查看更新后的查询计划:
我们可以看到,我们的查询有两个新步骤--按数量散列重新分区到 200 个分区(200 是 spark 默认的重新分区),以及 spark 优化器将其重新分区到 4 个分区。
通过按数量进行哈希分区,我们可以确保每个 spark 分区都有所有必要的数据,以便向该表的分区写入精确的 1 个文件--这意味着所有记录都具有相同的数量值。
现在,固定查询只需 15 秒,而不是 51 秒,速度提高了 300%!
好了,就到这里了,如果你使用Apache Spark,我建议你亲自尝试下DataFlint。