PySpark Cheat Sheet

Last updated on Sep 8, 2021

Set Up

Set Up PySpark 1.x

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext

Create a SparkContext:

sc = SparkContext()

Create a SQLContext:

sc = SparkContext()
sql_context = SQLContext(sc)

Create a HiveContext:

sc = SparkContext()
hive_context = HiveContext(sc)

Set Up PySpark 2.x

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

Set Up PySpark on AWS Glue

from pyspark.context import SparkContext
from awsglue.context import GlueContext

glueContext = GlueContext(SparkContext.getOrCreate())

 

Load Data

Create a DataFrame from RDD

Create a DataFrame using the .toDF() function:

population = [ ("Croatia", 4_058_000), ("Oregon", 4_218_000 ) ]

rdd = spark.sparkContext.parallelize(population)

cols = ["state", "population"]

df = rdd.toDF(rdd_columns)

Create a DataFrame using the createDataFrame() function:

population = [ ("Croatia", 4_058_000), ("Oregon", 4_218_000 ) ]

cols = ["state", "population"]

df = spark.createDataFrame(data=population, schema=cols)

Create a DataFrame using a combination of the createDataFrame() function and StructType schema:

population = [ ("Croatia", 4_058_000), ("Oregon", 4_218_000 ) ]

pop_schema = StructType([
    StructField("state", StringType(), True),	
    StructField("population", IntegerType(), True)])

df = spark.createDataFrame(data=population, schema=pop_schema)

Create a DataFrame from a Spark Data Source

Load a CSV file:

df = spark.read.csv("sport.csv", sep=";", header=True, inferSchema=True)

Read a TXT file:

df = spark.read.text("names.txt")

Read a JSON file:

df = spark.read.json("fruits.json", format="json")

Read a PARQUET file:

df = spark.read.load("stock_prices.parquet")

or

df = spark.read.parquet("stock_prices.parquet")

Create a Glue DynamicFrame

dfg = glueContext.create_dynamic_frame.from_catalog(database="example_database", table_name="example_table")
spark_df = dfg.toDF()

 

Write Data

Write Data from a DataFrame in PySpark

df_modified.write.json("fruits_modified.jsonl", mode="overwrite")

Convert a DynamicFrame to a DataFrame and Write Data to AWS S3 Files

dfg = glueContext.create_dynamic_frame.from_catalog(database="example_database", table_name="example_table")

Repartition into one partition and write:

df = dfg.toDF().repartition(1)
df.write.parquet("s3://glue-sample-target/outputdir/dfg")

Repartition by a column and write:

dfg.toDF().write.parquet("s3://glue-sample-target/outputdir/dfg", partitionBy=["example_column"])

Convert a DataFrame to a DynamicFrame and Write Data to AWS S3 Files

dfg = DynamicFrame.fromDF(df, glueContext, "dfg")

glueContext.write_dynamic_frame.from_options(
    frame=dfg,
    connection_type="s3",
    connection_options={"path": "s3://glue-sample-target/outputdir/dfg"},
    format="parquet")

 

Inspect Data

Display Content

Display DataFrame content:

df.show()

Display DataFrame schema:

df.schema()

Display DataFrame as a Pandas DataFrame:

df.toPandas()

Return DataFrame columns:

df.columns

Return the first n rows of a DataFrame:

df.head(n)

Return the first row of a DataFrame:

df.first()

Display DynamicFrame schema:

dfg.printSchema()

Display DynamicFrame content by converting it to a DataFrame:

dfg.toDF().show()

Analyze Content

Generate a basic statistical analysis of a DataFrame:

df.describe.show()

Count the number of rows inside a DataFrame:

df.count()

Count the number of distinct rows:

df.distinct().count()

Print the logical and physical plans:

df.explain()

 

Add, Remove, and Update Columns

Add Columns

Add columns with Spark native functions:

import pyspark.sql.functions as f

new_df = df.withColumn("column_3_multiplied", 3 * f.col("column_3_original"))

Add columns with user defined functions (UDFs):

import pyspark.sql.functions as f
from psyspark.sql.types import *

def example_func(filter_value):
	if values >= 5:
		return "enough free spots"
	else:
		return "not enough free spots"

my_udf = f.udf(example_func, StringType())

cinema_tickets = cinema.withColumn("free_spots", my_udf("spots") ) 

Remove Columns

Remove columns using column names:

sports = df.drop("football", "basketball")

Remove columns using chaining:

sports = sports.drop(sports.football).drop(sports.basketball)

Modify Columns

Rename column:

df = df.withColumnRenamed("basketball", "BASKETBALL")

Remove duplicates based on data in a column:

df.drop_duplicates(subset=["basketball"])

Remove rows with missing values based on columns in the DataFrame:

df.na.drop(subset=["basketball", "football"])

Impute missing data:

df.na.fill(25)

 

Select and Modify Data

Select Data

Select a single column:

df.select("basketball")

Select multiple columns:

df.select("basketball", "football")

Select a filtered version of a column:

df.select(df["goals"] >= 2)

Select a modified version of a column:

df.select(df["goals"] + 1)

Select Data with Conditional Arguments

Select using a "when otherwise" clause:

df.select("goals", f.when(df.goals == 0, "boring").otherwise("interesting"))

Select using "like":

df.select("sport", df.sport.like("basketball"))

Select using "between":

df.select(df.goals.between(1, 3))

Select using "startswith" or "endswith":

df.select("sports", df.players.startwith("B"))

df.select(df.players.endswith("s"))

Select a substring:

df.select(df.players.substr(1, 4).alias("nickname"))

Group Data

Group data:

df.groupby("players").count().show()

Group and aggregate data:

df.groupby("players").agg(spark_max("goals"), spark_min("goals"), spark_sum("goals").alias("total_goal_num")).show()

Filter Data

df.filter(df["goals"] > 3)

Sort Data

df.sort("goals", ascending=True).collect()

or

df.sort(df.goals.asc()).collect()

or

df.orderBy(["goals"], ascending = [0,1]).collect()

Repartition Data

Create multiple partitions:

df.repartition(5).rdd.getNumPartitions()

Create a single partition:

df.coalesce(1).rdd.getNumPartitions()

Perform Joins

Perform an inner join:

df = df_1.join(df_2, on=["key"], how="inner")

Perform an inner join with conditions:

df = df_1.join(df_2, df_1.key < df_2.key, how="inner")

Perform an outer join:

df = df_1.join(df_2, on=["key"], how="outer")

Perform a left join:

df = df_1.join(df_2, on=["key"], how="left")

Perform a right join:

df = df_1.join(df_2, on=["key"], how="right")

Perform a left semi join:

df = df_1.join(df_2, on=["key"], how="left_semi")

Perform a left anti join:

df = df_1.join(df_2, on=["key"], how="left_anti")

Query Data

people.createOrReplaceTempView('people')
spark.sql(
    """
    SELECT *
    FROM people
        INNER JOIN places
            ON people.city = LOWER(places.location)
    """
).show()