Multiple Ways to Create New Columns in PySpark DataFrames
This tutorial explains several techniques for adding new columns to PySpark DataFrames—including native Spark functions, user‑defined functions, RDD transformations, Pandas UDFs, and SQL queries—while demonstrating data loading, schema handling, and code examples for each method.
Working with terabytes of data often requires Spark, and this article shows how to create new columns in PySpark DataFrames using a variety of approaches.
Setup and data loading : After registering on Databricks and opening a Python notebook, the Movielens u.data file is loaded with
ratings = spark.read.load("/FileStore/tables/u.data", format="csv", sep="\t", inferSchema="true", header="false") ratings = ratings.toDF(*['user_id','movie_id','rating','unix_timestamp'])and displayed via ratings.show().
Using Spark native functions : Import the functions module import pyspark.sql.functions as F and create columns such as a scaled rating
ratings_with_scale10 = ratings.withColumn("ScaledRating", 2*F.col("rating"))or an exponential rating
ratings_with_exp = ratings.withColumn("expRating", 2*F.exp("rating")), then call show() to view results.
Spark UDFs : Define a Python function and convert it to a Spark UDF
def somefunc(value):
if value < 3:
return 'low'
else:
return 'high' udfsomefunc = F.udf(somefunc, StringType())Apply it with
ratings_with_high_low = ratings.withColumn("high_low", udfsomefunc("rating"))and display the new column.
Using RDD transformations : Convert the DataFrame to an RDD, apply a row‑wise Python function that adds a new key, and convert back to a DataFrame:
import math
from pyspark.sql import Row
def rowwise_function(row):
row_dict = row.asDict()
row_dict['Newcol'] = math.exp(row_dict['rating'])
return Row(**row_dict)
ratings_rdd = ratings.rdd
ratings_rdd_new = ratings_rdd.map(lambda row: rowwise_function(row))
ratings_new_df = sqlContext.createDataFrame(ratings_rdd_new)
ratings_new_df.show()Pandas UDFs (Spark 2.3+) : Declare an output schema, decorate a function with @F.pandas_udf, perform group‑wise operations, and return a Pandas DataFrame:
# Declare schema
outSchema = StructType([
StructField('user_id', IntegerType(), True),
StructField('movie_id', IntegerType(), True),
StructField('rating', IntegerType(), True),
StructField('unix_timestamp', IntegerType(), True),
StructField('normalized_rating', DoubleType(), True)
])
@F.pandas_udf(outSchema, F.PandasUDFType.GROUPED_MAP)
def subtract_mean(pdf):
v = pdf.rating
v = v - v.mean()
pdf['normalized_rating'] = v
return pdf
rating_groupwise_normalization = ratings.groupby('movie_id').apply(subtract_mean)
rating_groupwise_normalization.show()SQL method : Register the DataFrame as a temporary table and run a SQL query to create a column:
ratings.registerTempTable('ratings_table')
newDF = sqlContext.sql('select *, 2*rating as newCol from ratings_table')
newDF.show()The article concludes that these techniques give you flexible options for column creation in Spark, helping you handle large‑scale data processing tasks efficiently.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Python Programming Learning Circle
A global community of Chinese Python developers offering technical articles, columns, original video tutorials, and problem sets. Topics include web full‑stack development, web scraping, data analysis, natural language processing, image processing, machine learning, automated testing, DevOps automation, and big data.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
