PySpark Cheat Sheet

A brief list of common PySpark methods and how to use them.
By Ciprian Stratulat • Updated on Jan 17, 2023
blog image

How to Set Up PySpark 1.X

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext

 

Create a SparkContext:

sc = SparkContext()

 

Create a SQLContext:

sc = SparkContext()
sql_context = SQLContext(sc)

 

Create a HiveContext:

sc = SparkContext()
hive_context = HiveContext(sc)

 

How to Set Up PySpark 2.x

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

 

Set Up PySpark on AWS Glue

from pyspark.context import SparkContext
from awsglue.context import GlueContext

glueContext = GlueContext(SparkContext.getOrCreate())

 

How to Load Data in PySpark

 

Create a DataFrame from RDD

Create a DataFrame using the .toDF() function:

population = [ ("Croatia", 4_058_000), ("Oregon", 4_218_000 ) ]

rdd = spark.sparkContext.parallelize(population)

cols = ["state", "population"]

df = rdd.toDF(rdd_columns)

Create a DataFrame using the createDataFrame() function:

population = [ ("Croatia", 4_058_000), ("Oregon", 4_218_000 ) ]

cols = ["state", "population"]

df = spark.createDataFrame(data=population, schema=cols)

Create a DataFrame using a combination of the createDataFrame() function and StructType schema:

population = [ ("Croatia", 4_058_000), ("Oregon", 4_218_000 ) ]

pop_schema = StructType([
    StructField("state", StringType(), True),	
    StructField("population", IntegerType(), True)])

df = spark.createDataFrame(data=population, schema=pop_schema)

 

Create a DataFrame from a Spark Data Source

Load a .csv file:

df = spark.read.csv("sport.csv", sep=";", header=True, inferSchema=True)

Read a .txt file:

df = spark.read.text("names.txt")

Read a .json file:

df = spark.read.json("fruits.json", format="json")

Read a .parquet file:

df = spark.read.load("stock_prices.parquet")

or:

df = spark.read.parquet("stock_prices.parquet")

 

Create a Glue DynamicFrame

dfg = glueContext.create_dynamic_frame.from_catalog(database="example_database", table_name="example_table")
spark_df = dfg.toDF()

Article continues below

 

How to Write Data in PySpark

 

Write Data from a DataFrame in PySpark

df_modified.write.json("fruits_modified.jsonl", mode="overwrite")

 

Convert a DynamicFrame to a DataFrame and write data to AWS S3 files

dfg = glueContext.create_dynamic_frame.from_catalog(database="example_database", table_name="example_table")

Repartition into one partition and write:

df = dfg.toDF().repartition(1)
df.write.parquet("s3://glue-sample-target/outputdir/dfg")

Repartition by a column and write:

dfg.toDF().write.parquet("s3://glue-sample-target/outputdir/dfg", partitionBy=["example_column"])

 

Convert a DataFrame to a DynamicFrame and Write Data to AWS S3 Files

dfg = DynamicFrame.fromDF(df, glueContext, "dfg")

glueContext.write_dynamic_frame.from_options(
    frame=dfg,
    connection_type="s3",
    connection_options={"path": "s3://glue-sample-target/outputdir/dfg"},
    format="parquet")

 

How to Inspect Data in PySpark

 

Display Content

Display DataFrame content:

df.show()

Display DataFrame schema:

df.schema()

Display DataFrame as a Pandas DataFrame:

df.toPandas()

Return DataFrame columns:

df.columns

Return the first n rows of a DataFrame:

df.head(n)

Return the first row of a DataFrame:

df.first()

Display DynamicFrame schema:

dfg.printSchema()

Display DynamicFrame content by converting it to a DataFrame:

dfg.toDF().show()

 

How to Analyze Content in PySpark

Analyze a DataFrame

Generate a basic statistical analysis of a DataFrame:

df.describe.show()

Count the number of rows inside a DataFrame:

df.count()

Count the number of distinct rows:

df.distinct().count()

Print the logical and physical plans:

df.explain()

 

How to Add, Remove, and Update Columns in PySpark

 

Add Columns

Add columns with Spark native functions:

import pyspark.sql.functions as f

new_df = df.withColumn("column_3_multiplied", 3 * f.col("column_3_original"))

Add columns with user defined functions (UDFs):

import pyspark.sql.functions as f
from psyspark.sql.types import *

def example_func(filter_value):
	if values >= 5:
		return "enough free spots"
	else:
		return "not enough free spots"

my_udf = f.udf(example_func, StringType())

cinema_tickets = cinema.withColumn("free_spots", my_udf("spots") ) 

 

Remove Columns

Remove columns using column names:

sports = df.drop("football", "basketball")

Remove columns using chaining:

sports = sports.drop(sports.football).drop(sports.basketball)

 

Modify Columns

Rename columns:

df = df.withColumnRenamed("basketball", "BASKETBALL")

Remove duplicates based on data in a column:

df.drop_duplicates(subset=["basketball"])

Remove rows with missing values based on columns in the DataFrame:

df.na.drop(subset=["basketball", "football"])

Impute missing data:

df.na.fill(25)

 

How to Select and Modify Data in PySpark

 

Select Data

Select a single column:

df.select("basketball")

Select multiple columns:

df.select("basketball", "football")

Select a filtered version of a column:

df.select(df["goals"] >= 2)

Select a modified version of a column:

df.select(df["goals"] + 1)

 

Select Data with Conditional Arguments in PySpark

Select using a when-otherwise clause:

df.select("goals", f.when(df.goals == 0, "boring").otherwise("interesting"))

Select using like:

df.select("sport", df.sport.like("basketball"))

Select using between:

df.select(df.goals.between(1, 3))

Select using startswith or endswith:

df.select("sports", df.players.startwith("B"))

df.select(df.players.endswith("s"))

Select a substring (substr):

df.select(df.players.substr(1, 4).alias("nickname"))

 

How to Group Data in PySpark

Group data:

df.groupby("players").count().show()

Group and aggregate data:

df.groupby("players").agg(spark_max("goals"), spark_min("goals"), spark_sum("goals").alias("total_goal_num")).show()

 

How to Filter Data in PySpark

df.filter(df["goals"] > 3)

 

How to Sort Data in PySpark

df.sort("goals", ascending=True).collect()

or

df.sort(df.goals.asc()).collect()

or

df.orderBy(["goals"], ascending = [0,1]).collect()

 

How to Repartition Data in PySpark

Create multiple partitions:

df.repartition(5).rdd.getNumPartitions()

Create a single partition:

df.coalesce(1).rdd.getNumPartitions()

 

How to Perform Joins in PySpark

Perform an inner join:

df = df_1.join(df_2, on=["key"], how="inner")

Perform an inner join with conditions:

df = df_1.join(df_2, df_1.key < df_2.key, how="inner")

Perform an outer join:

df = df_1.join(df_2, on=["key"], how="outer")

Perform a left join:

df = df_1.join(df_2, on=["key"], how="left")

Perform a right join:

df = df_1.join(df_2, on=["key"], how="right")

Perform a left semi join:

df = df_1.join(df_2, on=["key"], how="left_semi")

Perform a left anti join:

df = df_1.join(df_2, on=["key"], how="left_anti")

 

How to Query Data in PySpark

people.createOrReplaceTempView('people')
spark.sql(
    """
    SELECT *
    FROM people
        INNER JOIN places
            ON people.city = LOWER(places.location)
    """
).show()

 

Ciprian Stratulat

CTO | Software Engineer

Ciprian Stratulat

Ciprian is a software engineer and the CTO of Edlitera. As an instructor, Ciprian is a big believer in first building an intuition about a new topic, and then mastering it through guided deliberate practice.

Before Edlitera, Ciprian worked as a Software Engineer in finance, biotech, genomics and e-book publishing. Ciprian holds a degree in Computer Science from Harvard University.