使用 PySpark 创建新列的 4 种不同方式 - Soner


了解如何在 Spark 数据框中创建新列?
我们生活在大数据时代。收集、存储和传输数据变得非常容易。随着数据量的增加,传统的工具开始变得不够用。
当数据太大而无法通过传统工具和技术进行处理时,我们应该使用允许分布式计算的工具和技术,例如Spark。
Spark 是一种用于大规模数据处理的分析引擎。它让我们在集群上分布数据和计算,以实现显着的性能提升。
PySpark 是Spark的 Py t hon API。它结合了 Python 的简单性和 Spark 的效率,这种合作得到了数据科学家和工程师的高度赞赏。
在本文中,我们将介绍使用 PySpark SQL 模块创建新列的 4 种方法。
 
导入库并创建 Spark 会话。

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.getOrCreate()

我们还在模块中导入了函数,因为我们将在创建列时使用其中的一些函数。
下一步是获取一些数据。我们总是可以通过从外部文件读取数据来创建数据框。在本文中,我们将使用该createDataFrame函数创建我们自己的数据框。

data = [
    ("John","Doe",28,45000,1,0,1),
    (
"Jane","Doe",26,52000,1,1,1),
    (
"Matt","Anderson",34,62000,1,0,0),
    (
"Ashley","James",30,58000,1,1,0),
    (
"Amber","Murray",24,48000,1,0,0)
]
schema = StructType([
    StructField(
"FirstName",StringType(),True),
    StructField(
"LastName",StringType(),True),
    StructField(
"Age",IntegerType(),True),
    StructField(
"Salary", IntegerType(), True),
    StructField(
"Checking", IntegerType(), True),
    StructField(
"Savings", IntegerType(), True),
    StructField(
"CreditCard", IntegerType(), True)
  ])
df = spark.createDataFrame(data=data, schema=schema)
df.show()

 
1.创建一个具有常量值的新列
该withColumn函数可用于创建新列。为了创建一个具有常量值的值,我们需要用lit函数指定值,而不管数据类型如何。
df = df.withColumn("IsCustomer", F.lit(1))df.show()

withColumn函数的第一个参数是新列的名称,第二个参数指定值。
 
2.基于其他列创建一个新列
我们可以使用另一列中的值来计算新列的值。该withColumn功能也允许进行计算。

df = df.withColumn( 
    "NumberOfProducts"
     F.col(
"Checking") + F.col("Savings") + F.col("CreditCard"
)df.select(
"Checking","Savings","CreditCard","NumberOfProducts").show()

产品数列是支票、储蓄和信用卡列的总和。我们需要使用该col函数编写列名。
 
3. 创建条件列
我们可以使用该when函数根据一个或多个条件指定新列的值。

让我们创建一个列来指示客户是否至少拥有一种产品。如果产品数量为 1 个或多个,则新列的值为 1。否则为 0。

df = df.withColumn( 
    "HasProduct"
    F.when(F.col(
"NumberOfProducts") >= 1, 1).else(0) 
)df.select(
"NumberOfProducts", "HasProduct").show()

条件col为when函数的第一个参数。然后,我们为符合给定条件的行指定值。为了为不同的条件指定单独的值,我们可以将when函数组合为一个链式操作。不符合任何给定条件的行的值写入otherwise部件中。
 
4.在select函数内创建一列
该select函数可用于从数据框中选择列。它与 SQL 的 select 语句非常相似。

我们通常在select函数中写入列的名称。我们还可以在select函数中进行计算以创建新列。

df_new = df.select( 
    "FirstName"
   
"LastName"
   
"NumberOfProducts"
    F.lit(3 - df.NumberOfProducts).alias(
"Potential"
)df_new.show()


 

Potential
列显示可以向客户销售多少新产品。因此,它的计算方法是从我们产品组合中的产品总数中减去客户拥有的产品数量,即 3。
alias方法用于为派生列或计算列分配名称。
 
结论
我们已经介绍了使用 PySpark SQL 模块创建新列的 4 种不同方法。
需要注意的是,Spark 针对大规模数据进行了优化。因此,在处理小规模数据时,您可能看不到任何性能提升。事实上,在处理小型数据集时,Pandas 的性能可能优于 PySpark。