Multiple Ways to Create New Columns in PySpark DataFrames
This tutorial explains several techniques for adding new columns to PySpark DataFrames—including native Spark functions, user‑defined functions, RDD transformations, Pandas UDFs, and SQL queries—while demonstrating data loading, schema handling, and code examples for each method.
Working with terabytes of data often requires Spark, and this article shows how to create new columns in PySpark DataFrames using a variety of approaches.
Setup and data loading : After registering on Databricks and opening a Python notebook, the Movielens u.data file is loaded with ratings = spark.read.load("/FileStore/tables/u.data", format="csv", sep="\t", inferSchema="true", header="false") ratings = ratings.toDF(*['user_id','movie_id','rating','unix_timestamp']) and displayed via ratings.show() .
Using Spark native functions : Import the functions module import pyspark.sql.functions as F and create columns such as a scaled rating ratings_with_scale10 = ratings.withColumn("ScaledRating", 2*F.col("rating")) or an exponential rating ratings_with_exp = ratings.withColumn("expRating", 2*F.exp("rating")) , then call show() to view results.
Spark UDFs : Define a Python function and convert it to a Spark UDF def somefunc(value): if value < 3: return 'low' else: return 'high' udfsomefunc = F.udf(somefunc, StringType()) Apply it with ratings_with_high_low = ratings.withColumn("high_low", udfsomefunc("rating")) and display the new column.
Using RDD transformations : Convert the DataFrame to an RDD, apply a row‑wise Python function that adds a new key, and convert back to a DataFrame: import math from pyspark.sql import Row def rowwise_function(row): row_dict = row.asDict() row_dict['Newcol'] = math.exp(row_dict['rating']) return Row(**row_dict) ratings_rdd = ratings.rdd ratings_rdd_new = ratings_rdd.map(lambda row: rowwise_function(row)) ratings_new_df = sqlContext.createDataFrame(ratings_rdd_new) ratings_new_df.show()
Pandas UDFs (Spark 2.3+) : Declare an output schema, decorate a function with @F.pandas_udf , perform group‑wise operations, and return a Pandas DataFrame: # Declare schema outSchema = StructType([ StructField('user_id', IntegerType(), True), StructField('movie_id', IntegerType(), True), StructField('rating', IntegerType(), True), StructField('unix_timestamp', IntegerType(), True), StructField('normalized_rating', DoubleType(), True) ]) @F.pandas_udf(outSchema, F.PandasUDFType.GROUPED_MAP) def subtract_mean(pdf): v = pdf.rating v = v - v.mean() pdf['normalized_rating'] = v return pdf rating_groupwise_normalization = ratings.groupby('movie_id').apply(subtract_mean) rating_groupwise_normalization.show()
SQL method : Register the DataFrame as a temporary table and run a SQL query to create a column: ratings.registerTempTable('ratings_table') newDF = sqlContext.sql('select *, 2*rating as newCol from ratings_table') newDF.show()
The article concludes that these techniques give you flexible options for column creation in Spark, helping you handle large‑scale data processing tasks efficiently.
Python Programming Learning Circle
A global community of Chinese Python developers offering technical articles, columns, original video tutorials, and problem sets. Topics include web full‑stack development, web scraping, data analysis, natural language processing, image processing, machine learning, automated testing, DevOps automation, and big data.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.