📘 Introduction

When working with large datasets in PySpark, joins can easily become performance bottlenecks. This happens because Spark needs to shuffle data across the cluster to match rows between DataFrames — a costly operation when both datasets are big.

If one of your DataFrames is small, though, there’s a faster way. 🚀 A Broadcast Join lets Spark send that small DataFrame to every worker node, so each node can perform the join locally — no data shuffling required. This simple trick can drastically speed up your joins and make your Spark jobs much more efficient.

In this guide, you’ll learn how broadcast joins work, how they differ from normal joins, and how to use them step by step.

💡 Why Use a Broadcast Join?

You might need a broadcast join when you have:

✅ A large DataFrame (millions of rows)
✅ A small DataFrame (a lookup or reference table)
✅ A need to reduce shuffle and improve performance

Without broadcasting, Spark uses a shuffle join, redistributing both DataFrames across nodes so matching keys align. This shuffle causes heavy network traffic and slows down your job.

💡
Broadcasting avoids this entirely by sending the small DataFrame to all nodes. Every node can then join its partition of the large DataFrame locally — fast and efficient.

✅ Prerequisites

Before getting started, make sure you have:

🐍☑️ Python installed
🔥☑️ A working Spark environment

📦1️⃣ Install Libraries

Install the following Python packages using pip:

pip install pyspark

📥2️⃣ Import Libraries

Start by importing the required Python modules:

from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast

⚙️3️⃣ Build a Spark Session

Next, initialize your Spark session — this is your entry point to working with DataFrames and Spark SQL:

spark = SparkSession.builder \
    .appName("BroadcastJoinDemo") \
    .getOrCreate()

✍️4️⃣ Create Sample DataFrames

Let’s create two DataFrames — one large and one small — to demonstrate the concept.

Large Dataset:

# Large DataFrame
data_large = [
    (1, "Alice", "NY"),
    (2, "Bob", "CA"),
    (3, "Charlie", "TX"),
    (4, "David", "CA")
]
df_large = spark.createDataFrame(data_large, ["id", "name", "state"])

Output:

+---+-------+-----+
| id|   name|state|
+---+-------+-----+
|  1|  Alice|   NY|
|  2|    Bob|   CA|
|  3|Charlie|   TX|
|  4|  David|   CA|
+---+-------+-----+

Small Dataset:

# Small lookup DataFrame
data_small = [
    ("NY", "New York"),
    ("CA", "California"),
    ("TX", "Texas")
]
df_small = spark.createDataFrame(data_small, ["state", "state_name"])

Output:

+-----+----------+
|state|state_name|
+-----+----------+
|   NY|  New York|
|   CA|California|
|   TX|     Texas|
+-----+----------+
💡
These sample DataFrames are intentionally small — just for demonstration purposes. In real-world scenarios, broadcast joins are most useful when one DataFrame is large (millions of rows) and the other is small (a few thousand rows or less, such as a lookup or reference table).

🔄5️⃣ Perform a Normal Join (Without Broadcast)

A standard join between these two DataFrames looks like this:

joined_df = df_large.join(df_small, "state", "left")
joined_df.show()

Output:

+-----+---+-------+----------+
|state| id|   name|state_name|
+-----+---+-------+----------+
|   NY|  1|  Alice|  New York|
|   CA|  2|    Bob|California|
|   TX|  3|Charlie|     Texas|
|   CA|  4|  David|California|
+-----+---+-------+----------+
🧠 What’s happening:
Spark shuffles both DataFrames based on the join key (state). This ensures rows with matching keys are placed together, but it also triggers expensive data movement across the cluster — which can slow things down significantly when one dataset is large.

You can inspect how Spark performs this join using explain():

joined_df.explain()

Output:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [state#81, id#79L, name#80, state_name#93]
   +- SortMergeJoin [state#81], [state#92], LeftOuter
      :- Sort [state#81 ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(state#81, 200), ENSURE_REQUIREMENTS, [plan_id=527]
      :     +- Scan ExistingRDD[id#79L,name#80,state#81]
      +- Sort [state#92 ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(state#92, 200), ENSURE_REQUIREMENTS, [plan_id=528]
            +- Filter isnotnull(state#92)
               +- Scan ExistingRDD[state#92,state_name#93]

⚡6️⃣ Perform a Broadcast Join

Now let’s optimize the join using broadcasting:

You can view this post with the tier: Academy Membership

Join academy now to read the post and get access to the full library of premium posts for academy members only.

Join Academy Already have an account? Sign In