DataFlint:解决Apache Spark小文件性能问题

2024年01月22日 由 alex 发表 317 0

DataFlint是用于Apache Spark的开源性能监控库。DataFlint拥有一个对于Spark更易于人阅读的用户界面,它可以提示你性能问题,例如小文件的输入输出问题,并且还能提出改进建议!


数据工程师的经验


我第一次使用 Apache Spark 的大数据应用时,我的Spark作业无法完成,因为我错误地对数据进行了分区,并且无意中写入了数百万个非常小的文件到S3。


随着时间的推移,我发现这种开发经验在大数据领域非常常见。我学会了使用工具,比如 Spark UI,但也发现了它(很多)的局限性,主要是它的可读性不是很友好。


示例


让我们尝试用简单的 PySpark 脚本来重现这个确切的场景:


from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("Sales Filterer") \
    .master("local[1]") \
    .getOrCreate()
df = spark.read.load("~/data/store_sales")
df_filtered = df.filter(df.ss_quantity > 1)
df_filtered.write \
    .mode("overwrite") \
    .partitionBy("ss_quantity") \
    .parquet("/tmp/store_sales")
spark.stop()


在这个简单的例子中,我们读取了一些与销售相关的数据,我们通过筛选数量大于1的商品,然后按照数量划分来保存数据。


通过静态分析代码,你无法找到任何性能问题,但是在我的机器上,这个简单的脚本大约需要运行1分钟!


输入DataFlint


为了找到并解决这个性能问题,我们可以使用 DataFlint——一个针对 Apache Spark 的开源性能监控库。


要安装它,我们只需添加这两行代码:


.config("spark.jars.packages", "io.dataflint:spark_2.12:0.1.0") \
.config("spark.plugins", "io.dataflint.spark.SparkDataflintPlugin") \


对我们的Python应用程序:


from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("Sales Filterer") \
    .config("spark.jars.packages", "io.dataflint:spark_2.12:0.1.0") \
    .config("spark.plugins", "io.dataflint.spark.SparkDataflintPlugin") \
    .master("local[1]") \
    .getOrCreate()
df = spark.read.load("~/data/store_sales")
df_filtered = df.filter(df.ss_quantity > 1)
df_filtered.write \
    .mode("overwrite") \
    .partitionBy("ss_quantity") \
    .parquet("/tmp/store_sales")
spark.stop()


现在,当我们打开 Spark UI 时,我们将看到一个新按钮,用于打开 DataFlint


8


识别性能问题


一旦我们在我们的Spark作业中进入DataFlint,我们可以看到DataFlint在查询运行期间实时地识别出了一个“读取小文件”的问题。


9


当查询结束时,我们可以看到DataFlint还识别出了一个“写入小分区文件”的问题:


10


最后,这个非常简单的查询任务,只过滤了大约273MB的数据,却需要51秒钟才能完成。


11


那么到底发生了什么?我们如何解决这个问题?


小文件在大数据中的输入输出


大数据引擎通常被优化来处理128MB到1GB大小范围的文件,处理像我们案例中的小文件(DataFlint计算的我们读取的每个文件平均大小为85KB)可能会引起各种性能问题,有些问题不如其他问题那么明显。


在我们的情况下,对于HDFS中的每个小文件,Spark在读取时会应用过滤器,然后按照数量划分剩余数据,将每个小文件的各个分区保存回HDFS,结果是写入了更小的文件!


12


那么,在了解了发生了什么之后,我们该如何修复呢?


修复写入小文件问题


在这种情况下,我们将重点解决写入小文件的问题,因为源表文件大小可能不受我们控制。


幸运的是,DataFlint 为我们识别出了 "写入小分区文件 "问题,甚至还提出了修复建议--告诉 spark 根据表分区键在内存中重新分区数据:


13


现在,让我们在代码中应用这一修复:


from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("Sales Filterer") \
    .config("spark.jars.packages", "io.dataflint:spark_2.12:0.1.0") \
    .config("spark.plugins", "io.dataflint.spark.SparkDataflintPlugin") \
    .master("local[1]") \
    .getOrCreate()
df = spark.read.load("~/data/store_sales")
df_filtered = df.filter(df.ss_quantity > 1)
df_repartitioned = df_filtered.repartition("ss_quantity") = # THE FIX
df_repartitioned.write \
    .mode("overwrite") \
    .partitionBy("ss_quantity") \
    .parquet("/tmp/store_sales")
spark.stop()


并在 DataFlint 中查看更新后的查询计划:


14


我们可以看到,我们的查询有两个新步骤--按数量散列重新分区到 200 个分区(200 是 spark 默认的重新分区),以及 spark 优化器将其重新分区到 4 个分区。


通过按数量进行哈希分区,我们可以确保每个 spark 分区都有所有必要的数据,以便向该表的分区写入精确的 1 个文件--这意味着所有记录都具有相同的数量值。


15


现在,固定查询只需 15 秒,而不是 51 秒,速度提高了 300%!


16


好了,就到这里了,如果你使用Apache Spark,我建议你亲自尝试下DataFlint。


文章来源:https://medium.com/@menishmueli/fixing-small-files-performance-issues-in-apache-spark-using-dataflint-49ffe3eb755f
欢迎关注ATYUN官方公众号
商务合作及内容投稿请联系邮箱:bd@atyun.com
评论 登录
热门职位
Maluuba
20000~40000/月
Cisco
25000~30000/月 深圳市
PilotAILabs
30000~60000/年 深圳市
写评论取消
回复取消