PySpark结合了两个世界的最佳:Python语言和库的简洁性,以及Apache Spark的可扩展性。
本文通过代码示例列出了10个实用的PySpark命令,用于在Python项目中加速大数据处理管道。
为了说明所介绍的10个命令的使用,我们导入必要的库,初始化一个Spark会话,并加载公开可用的企鹅数据集,该数据集使用数值和分类特征描述了属于三个物种的企鹅标本。数据集最初被加载到一个Pandas DataFrame中。
!pip install pyspark pandasfrom pyspark.sql import SparkSessionimport pandas as pdspark = SparkSession.builder \ .appName("PenguinAnalysis") \ .config("spark.driver.memory", "2g") \ .getOrCreate()pdf = pd.read_csv("https://raw.githubusercontent.com/gakudo-ai/open-datasets/refs/heads/main/penguins.csv")
Spark DataFrame与Pandas DataFrame的不同之处在于其分布式特性、惰性求值和不可变性。转换Pandas DataFrame就像使用createDataFrame命令一样简单。
df = spark.createDataFrame(pdf)
这种数据结构更适合于幕后并行数据处理,同时保持许多常规DataFrame的特性。
使用select和filter函数可以在Spark DataFrame中选择特定列(特征)并过滤出符合指定条件的行(实例)。此示例选择并过滤出体重超过5公斤的Gentoo物种的企鹅。
df.select("species", "body_mass_g").filter(df.species == "Gentoo").filter(df.body_mass_g > 5000).show()
按类别分组数据通常是为了执行聚合操作,如获取每个类别的平均值或其他统计数据。此示例展示了如何使用groupBy和agg命令来获取每个物种的平均测量值摘要。
df.groupBy("species").agg({"flipper_length_mm": "mean", "body_mass_g": "mean"}).show()
窗口函数在与当前行相关的行上执行计算,例如排名或运行总计。此示例展示了如何创建一个窗口函数,将数据按物种分区,然后在每个物种内按体重对企鹅进行排名。
from pyspark.sql.window import Windowfrom pyspark.sql.functions import rank, colwindowSpec = Window.partitionBy("species").orderBy(col("body_mass_g").desc())df.withColumn("mass_rank", rank().over(windowSpec)).show()
类似于SQL的表连接操作,PySpark连接操作基于指定的公共列合并两个DataFrame,使用多种策略如左连接、右连接、外连接等。此示例构建了一个包含每个物种总数的DataFrame,然后基于“连接列”物种进行左连接,将与其物种相关的计数附加到原始数据集的每个观测值。
species_stats = df.groupBy("species").count()df.join(species_stats, "species", "left").show()
PySpark的udf允许创建用户定义函数,本质上是自定义的lambda函数,一旦定义,可以用于对DataFrame中的列应用复杂的转换。此示例定义并应用一个用户定义函数,将企鹅体重映射为描述企鹅大小的新分类变量,如大或小。
from pyspark.sql.functions import udffrom pyspark.sql.types import StringTypesize_category = udf(lambda x: "Large" if x > 4500 else "Small", StringType())df.withColumn("size_class", size_category(df.body_mass_g)).show()
透视表用于将列中的类别(如性别)转换为多个列,旨在描述每个类别的聚合或摘要统计。下面我们创建一个透视表,分别按性别计算并显示每个物种的平均喙长:
df.groupBy("species").pivot("sex").agg({"bill_length_mm": "avg"}).show()
使用fill和dropna函数可以填充缺失值或清理数据集中包含缺失值的行。如果我们想填充一个特定列的缺失值,然后删除其他列中包含缺失值的观测值,它们也可以联合使用:
df.na.fill({"sex": "Unknown"}).dropna().show()
当然,一旦我们对数据集应用了一些处理操作,我们可以将其保存为多种格式,如parquet:
large_penguins = df.filter(df.body_mass_g > 4500)large_penguins.write.mode("overwrite").parquet("large_penguins.parquet")
此列表中的最后一个命令允许您将SQL查询嵌入到PySpark命令中,并在Spark DataFrame的临时视图上运行它们。这种方法结合了SQL的灵活性和PySpark的分布式处理。
df.createOrReplaceTempView("penguins")spark.sql(""" SELECT species, island, AVG(body_mass_g) as avg_mass, AVG(flipper_length_mm) as avg_flipper FROM penguins GROUP BY species, island ORDER BY avg_mass DESC""").show()
我们强烈建议您在Jupyter或Google Colab笔记本中逐一尝试这10个示例命令,以便查看结果输出。数据集可以直接读取并加载到您的代码中。