21-12-25
banq
了解如何在 Spark 数据框中创建新列?
我们生活在大数据时代。收集、存储和传输数据变得非常容易。随着数据量的增加,传统的工具开始变得不够用。
当数据太大而无法通过传统工具和技术进行处理时,我们应该使用允许分布式计算的工具和技术,例如Spark。
Spark 是一种用于大规模数据处理的分析引擎。它让我们在集群上分布数据和计算,以实现显着的性能提升。
PySpark 是Spark的 Py t hon API。它结合了 Python 的简单性和 Spark 的效率,这种合作得到了数据科学家和工程师的高度赞赏。
在本文中,我们将介绍使用 PySpark SQL 模块创建新列的 4 种方法。
导入库并创建 Spark 会话。
from pyspark.sql import SparkSession from pyspark.sql import functions as F spark = SparkSession.builder.getOrCreate() |
我们还在模块中导入了函数,因为我们将在创建列时使用其中的一些函数。
下一步是获取一些数据。我们总是可以通过从外部文件读取数据来创建数据框。在本文中,我们将使用该createDataFrame函数创建我们自己的数据框。
data = [ ("John","Doe",28,45000,1,0,1), ("Jane","Doe",26,52000,1,1,1), ("Matt","Anderson",34,62000,1,0,0), ("Ashley","James",30,58000,1,1,0), ("Amber","Murray",24,48000,1,0,0) ] schema = StructType([ StructField("FirstName",StringType(),True), StructField("LastName",StringType(),True), StructField("Age",IntegerType(),True), StructField("Salary", IntegerType(), True), StructField("Checking", IntegerType(), True), StructField("Savings", IntegerType(), True), StructField("CreditCard", IntegerType(), True) ]) df = spark.createDataFrame(data=data, schema=schema) df.show() |
1.创建一个具有常量值的新列
该withColumn函数可用于创建新列。为了创建一个具有常量值的值,我们需要用lit函数指定值,而不管数据类型如何。
df = df.withColumn("IsCustomer", F.lit(1))df.show() |
withColumn函数的第一个参数是新列的名称,第二个参数指定值。
2.基于其他列创建一个新列
我们可以使用另一列中的值来计算新列的值。该withColumn功能也允许进行计算。
df = df.withColumn( "NumberOfProducts", F.col("Checking") + F.col("Savings") + F.col("CreditCard") )df.select("Checking","Savings","CreditCard","NumberOfProducts").show() |
产品数列是支票、储蓄和信用卡列的总和。我们需要使用该col函数编写列名。
3. 创建条件列
我们可以使用该when函数根据一个或多个条件指定新列的值。
让我们创建一个列来指示客户是否至少拥有一种产品。如果产品数量为 1 个或多个,则新列的值为 1。否则为 0。
df = df.withColumn( "HasProduct", F.when(F.col("NumberOfProducts") >= 1, 1).else(0) )df.select("NumberOfProducts", "HasProduct").show() |
条件col为when函数的第一个参数。然后,我们为符合给定条件的行指定值。为了为不同的条件指定单独的值,我们可以将when函数组合为一个链式操作。不符合任何给定条件的行的值写入otherwise部件中。
4.在select函数内创建一列
该select函数可用于从数据框中选择列。它与 SQL 的 select 语句非常相似。
我们通常在select函数中写入列的名称。我们还可以在select函数中进行计算以创建新列。
df_new = df.select( "FirstName", "LastName", "NumberOfProducts", F.lit(3 - df.NumberOfProducts).alias("Potential") )df_new.show() |
Potential |
alias方法用于为派生列或计算列分配名称。
结论
我们已经介绍了使用 PySpark SQL 模块创建新列的 4 种不同方法。
需要注意的是,Spark 针对大规模数据进行了优化。因此,在处理小规模数据时,您可能看不到任何性能提升。事实上,在处理小型数据集时,Pandas 的性能可能优于 PySpark。