大数据处理的10个必备PySpark命令

2025年01月20日 由 佚名 发表 43 0

10 Essential PySpark Commands for Big Data Processing
作者提供的图片 | Ideogram

 

PySpark结合了两个世界的最佳:Python语言和库的简洁性,以及Apache Spark的可扩展性。

本文通过代码示例列出了10个实用的PySpark命令,用于在Python项目中加速大数据处理管道。

 

初始设置

 
为了说明所介绍的10个命令的使用,我们导入必要的库,初始化一个Spark会话,并加载公开可用的企鹅数据集,该数据集使用数值和分类特征描述了属于三个物种的企鹅标本。数据集最初被加载到一个Pandas DataFrame中。

!pip install pyspark pandasfrom pyspark.sql import SparkSessionimport pandas as pdspark = SparkSession.builder \    .appName("PenguinAnalysis") \    .config("spark.driver.memory", "2g") \    .getOrCreate()pdf = pd.read_csv("https://raw.githubusercontent.com/gakudo-ai/open-datasets/refs/heads/main/penguins.csv")

 

大数据处理的10个必备PySpark命令

 

1. 将数据加载到Spark DataFrame中

Spark DataFrame与Pandas DataFrame的不同之处在于其分布式特性、惰性求值和不可变性。转换Pandas DataFrame就像使用createDataFrame命令一样简单。

df = spark.createDataFrame(pdf)

 

这种数据结构更适合于幕后并行数据处理,同时保持许多常规DataFrame的特性。

 

2. 选择和过滤数据

使用selectfilter函数可以在Spark DataFrame中选择特定列(特征)并过滤出符合指定条件的行(实例)。此示例选择并过滤出体重超过5公斤的Gentoo物种的企鹅。

df.select("species", "body_mass_g").filter(df.species == "Gentoo").filter(df.body_mass_g > 5000).show()

 

3. 分组和聚合数据

按类别分组数据通常是为了执行聚合操作,如获取每个类别的平均值或其他统计数据。此示例展示了如何使用groupByagg命令来获取每个物种的平均测量值摘要。

df.groupBy("species").agg({"flipper_length_mm": "mean", "body_mass_g": "mean"}).show()

 

4. 窗口函数

窗口函数在与当前行相关的行上执行计算,例如排名或运行总计。此示例展示了如何创建一个窗口函数,将数据按物种分区,然后在每个物种内按体重对企鹅进行排名。

from pyspark.sql.window import Windowfrom pyspark.sql.functions import rank, colwindowSpec = Window.partitionBy("species").orderBy(col("body_mass_g").desc())df.withColumn("mass_rank", rank().over(windowSpec)).show()

 

5. 连接操作

类似于SQL的表连接操作,PySpark连接操作基于指定的公共列合并两个DataFrame,使用多种策略如左连接、右连接、外连接等。此示例构建了一个包含每个物种总数的DataFrame,然后基于“连接列”物种进行左连接,将与其物种相关的计数附加到原始数据集的每个观测值。

species_stats = df.groupBy("species").count()df.join(species_stats, "species", "left").show()

 

6. 用户定义函数

PySpark的udf允许创建用户定义函数,本质上是自定义的lambda函数,一旦定义,可以用于对DataFrame中的列应用复杂的转换。此示例定义并应用一个用户定义函数,将企鹅体重映射为描述企鹅大小的新分类变量,如大或小。

from pyspark.sql.functions import udffrom pyspark.sql.types import StringTypesize_category = udf(lambda x: "Large" if x > 4500 else "Small", StringType())df.withColumn("size_class", size_category(df.body_mass_g)).show()

 

7. 透视表

透视表用于将列中的类别(如性别)转换为多个列,旨在描述每个类别的聚合或摘要统计。下面我们创建一个透视表,分别按性别计算并显示每个物种的平均喙长:

df.groupBy("species").pivot("sex").agg({"bill_length_mm": "avg"}).show()

 

8. 处理缺失值

使用filldropna函数可以填充缺失值或清理数据集中包含缺失值的行。如果我们想填充一个特定列的缺失值,然后删除其他列中包含缺失值的观测值,它们也可以联合使用:

df.na.fill({"sex": "Unknown"}).dropna().show()

 

9. 保存处理后的数据集

当然,一旦我们对数据集应用了一些处理操作,我们可以将其保存为多种格式,如parquet:

large_penguins = df.filter(df.body_mass_g > 4500)large_penguins.write.mode("overwrite").parquet("large_penguins.parquet")

 

10. 执行SQL查询

此列表中的最后一个命令允许您将SQL查询嵌入到PySpark命令中,并在Spark DataFrame的临时视图上运行它们。这种方法结合了SQL的灵活性和PySpark的分布式处理。

df.createOrReplaceTempView("penguins")spark.sql("""    SELECT species, island,            AVG(body_mass_g) as avg_mass,           AVG(flipper_length_mm) as avg_flipper    FROM penguins     GROUP BY species, island    ORDER BY avg_mass DESC""").show()

 

我们强烈建议您在Jupyter或Google Colab笔记本中逐一尝试这10个示例命令,以便查看结果输出。数据集可以直接读取并加载到您的代码中。

    文章来源:https://www.kdnuggets.com/10-essential-pyspark-commands-big-data-processing
    欢迎关注ATYUN官方公众号
    商务合作及内容投稿请联系邮箱:bd@atyun.com
    评论 登录
    热门职位
    Maluuba
    20000~40000/月
    Cisco
    25000~30000/月 深圳市
    PilotAILabs
    30000~60000/年 深圳市
    写评论取消
    回复取消