PySpark Fundamentals: Architecture, DataFrames, and Your First Pipeline
Master Apache Spark from the ground up ā understand the driver/executor model, RDDs vs DataFrames, schema handling, lazy evaluation, and build a complete CSV-to-Parquet pipeline.
What Is Apache Spark and Why Do Data Engineers Use It?
When a single machine cannot process your data fast enough ā or at all ā you need a distributed compute engine. Apache Spark is the de-facto standard for large-scale data processing. It runs on clusters, processes terabytes in minutes, and provides a clean Python API through PySpark.
Unlike Hadoop MapReduce, Spark keeps data in memory between stages, making iterative workloads orders of magnitude faster.
Spark Architecture
Understanding the architecture is not optional. Every performance decision traces back to it.
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
ā Driver Program ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
ā ā SparkContext / SparkSession ā ā
ā ā DAG Scheduler ā Task Scheduler ā ā
ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāā¬āāāāāāāāāāāāāāāāāāāāāāāāāāāāā
ā schedules tasks
ā¼
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
ā Cluster Manager ā
ā (YARN / Kubernetes / Standalone)ā
āāāāāā¬āāāāāāāāāāāāāāā¬āāāāāāāāāāāāāā
ā ā
āāāāāāāā¼āāāāāāā āāāāāāā¼āāāāāāāā
ā Executor 1 ā ā Executor 2 ā
ā āāāāāāāāāā ā ā āāāāāāāāāā ā
ā ā Task 1 ā ā ā ā Task 3 ā ā
ā ā Task 2 ā ā ā ā Task 4 ā ā
ā āāāāāāāāāā ā ā āāāāāāāāāā ā
āāāāāāāāāāāāāāāā āāāāāāāāāāāāāāāDriver
The driver is your Python process. It holds the SparkSession, builds the execution plan (DAG), and coordinates work. It never processes data ā it only directs. One expensive mistake: collecting large datasets back to the driver with .collect(). That brings millions of rows into a single machine's memory.
Executors
Executors are JVM processes on worker nodes. Each executor runs tasks in parallel across its CPU cores, and holds its partition of data in memory or on disk. Executor count, cores per executor, and memory are the primary tuning knobs.
Cluster Manager
The cluster manager allocates resources. In production you'll use YARN (Hadoop clusters), Kubernetes (cloud-native), or Databricks' managed runtime. For local development, local[*] uses all CPU cores on your machine.
DAG Scheduler
When you call an action (like .write), Spark compiles your transformation chain into a Directed Acyclic Graph of stages. Each stage is a set of tasks that can run in parallel without shuffling data across the network. The DAG scheduler optimizes this graph before execution.
RDDs, DataFrames, and Datasets
Spark has three APIs. Data engineers almost always use DataFrames.
| Concept | Description | When to Use | |---|---|---| | RDD | Low-level, untyped distributed collection | Rarely ā legacy code or custom partitioning | | DataFrame | Distributed table with named columns and schema | Default choice ā optimized by Catalyst | | Dataset | Typed DataFrame (Scala/Java only in practice) | Not available in PySpark |
The Catalyst optimizer rewrites your DataFrame query into an optimized physical plan. It applies predicate pushdown, column pruning, join reordering, and more ā automatically. This is why DataFrames outperform hand-written RDD code.
Creating a SparkSession
SparkSession is the single entry point for all Spark functionality.
from pyspark.sql import SparkSession
# Local development ā uses all available cores
spark = (
SparkSession.builder
.appName("CustomerPipeline")
.master("local[*]")
# Tune memory for local runs
.config("spark.driver.memory", "4g")
.config("spark.executor.memory", "4g")
# Enable Delta Lake support
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config(
"spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog",
)
.getOrCreate()
)
# Always set log level in notebooks to reduce noise
spark.sparkContext.setLogLevel("WARN")
print(spark.version) # e.g., 3.5.0In production (Databricks, EMR, Dataproc), the SparkSession is pre-created. Use SparkSession.getOrCreate() to retrieve it without re-creating.
Reading Data
CSV with Schema Inference
df_raw = (
spark.read
.option("header", "true")
.option("inferSchema", "true")
.option("nullValue", "NULL")
.option("emptyValue", "")
.csv("s3://my-bucket/raw/customers/2026/")
)
df_raw.printSchema()
# root
# |-- customer_id: integer (nullable = true)
# |-- email: string (nullable = true)
# |-- signup_date: string (nullable = true) ā dates inferred as strings!
# |-- revenue: double (nullable = true)Problem with inferSchema: Spark reads the entire first partition to guess types. It gets dates wrong (they come in as strings), it scans extra data on every job, and the inferred schema can change if data changes. Never use inferSchema in production.
Explicit Schema with StructType
from pyspark.sql.types import (
StructType, StructField,
StringType, IntegerType, DoubleType, DateType, BooleanType, TimestampType
)
customer_schema = StructType([
StructField("customer_id", IntegerType(), nullable=False),
StructField("email", StringType(), nullable=False),
StructField("full_name", StringType(), nullable=True),
StructField("signup_date", DateType(), nullable=True),
StructField("country_code", StringType(), nullable=True),
StructField("is_active", BooleanType(), nullable=True),
StructField("lifetime_value",DoubleType(), nullable=True),
])
df = (
spark.read
.schema(customer_schema)
.option("header", "true")
.option("dateFormat", "yyyy-MM-dd")
.option("mode", "DROPMALFORMED") # options: PERMISSIVE, DROPMALFORMED, FAILFAST
.csv("s3://my-bucket/raw/customers/2026/")
)The mode option controls bad record handling:
PERMISSIVE(default): puts bad rows in a_corrupt_recordcolumnDROPMALFORMED: silently drops rows that don't match the schemaFAILFAST: throws an exception on the first bad row
In production, use PERMISSIVE and route corrupt records to a dead-letter path for inspection.
Reading Parquet
# Parquet stores schema in file metadata ā no schema needed
df_orders = spark.read.parquet("s3://my-bucket/silver/orders/")
# Read specific columns only (column pruning happens at file level)
df_orders_slim = spark.read.parquet("s3://my-bucket/silver/orders/").select(
"order_id", "customer_id", "order_date", "total_amount"
)Reading JSON
# Multi-line JSON (one object per file)
df_events = (
spark.read
.option("multiLine", "true")
.json("s3://my-bucket/raw/events/")
)
# JSON Lines (one JSON object per line) ā default
df_events = spark.read.json("s3://my-bucket/raw/events/")Reading Delta Lake
# Read current snapshot
df_delta = spark.read.format("delta").load("s3://my-bucket/delta/customers/")
# Read a specific version (time travel)
df_yesterday = (
spark.read
.format("delta")
.option("versionAsOf", 5)
.load("s3://my-bucket/delta/customers/")
)
# Read at a specific timestamp
df_as_of = (
spark.read
.format("delta")
.option("timestampAsOf", "2026-04-01 00:00:00")
.load("s3://my-bucket/delta/customers/")
)Inspecting DataFrames
# Show first N rows (default 20), truncate=False shows full column values
df.show(10, truncate=False)
# Print the schema tree
df.printSchema()
# Count rows ā triggers a full scan, use sparingly
print(f"Row count: {df.count():,}")
# Column names and types as a list
print(df.dtypes)
# [('customer_id', 'int'), ('email', 'string'), ...]
# Summary statistics (like pandas describe)
df.describe("lifetime_value", "revenue").show()
# View the execution plan
# Physical plan only
df.filter("is_active = true").explain()
# Full plan: parsed ā analyzed ā optimized ā physical
df.filter("is_active = true").explain(mode="extended")Lazy Evaluation: The Key Mental Model
This is the most important concept in Spark. Nothing runs until you call an action.
# These are TRANSFORMATIONS ā they build a logical plan, no data moves
df_active = df.filter("is_active = true") # transformation
df_enriched = df_active.withColumn( # transformation
"email_domain",
F.split(F.col("email"), "@").getItem(1)
)
df_final = df_enriched.select( # transformation
"customer_id", "email", "email_domain", "country_code"
)
# ACTIONS ā these trigger actual computation:
df_final.show() # action: materialize and display
df_final.count() # action: count rows
df_final.collect() # action: bring all data to driver (dangerous!)
df_final.write.parquet(ā¦) # action: write to storage
df_final.first() # action: get first rowTransformation vs Action quick reference:
| Transformations (lazy) | Actions (eager) |
|---|---|
| select, filter, where | show, collect |
| withColumn, drop | count, first, take |
| groupBy, agg | write, save |
| join, union | foreach, toPandas |
| repartition, coalesce | describe |
Because transformations are lazy, Spark can optimize the entire chain before running it. It may reorder operations, push filters down to the file reader, or eliminate unused columns before any data is read.
Writing Output
Writing Parquet (Production Standard)
(
df_final
.write
.mode("overwrite") # overwrite | append | ignore | errorIfExists
.option("compression", "snappy") # snappy | gzip | zstd | none
.parquet("s3://my-bucket/silver/customers/")
)Writing with Partitioning
Partitioning splits the output across subdirectories by column value. Queries that filter on the partition column skip entire directories ā this is called partition pruning.
(
df_orders
.write
.mode("overwrite")
.partitionBy("year", "month") # creates .../year=2026/month=05/part-*.parquet
.parquet("s3://my-bucket/silver/orders/")
)Choose partition columns carefully:
- High-cardinality columns (like
customer_id) create millions of tiny files ā bad - Low-cardinality columns that queries filter on (like
country,year,month) ā good - Aim for partition files between 128MB and 1GB each
Writing Delta Lake
(
df_final
.write
.format("delta")
.mode("overwrite")
.option("overwriteSchema", "true")
.partitionBy("country_code")
.save("s3://my-bucket/delta/customers/")
)Controlling Output File Count
By default Spark writes one file per partition (task). Repartition before writing to control file count:
# Write exactly 10 Parquet files
(
df_final
.repartition(10)
.write
.mode("overwrite")
.parquet("s3://my-bucket/silver/customers/")
)
# Repartition by a column ā co-locates related data
(
df_orders
.repartition(200, "customer_id")
.write
.mode("overwrite")
.parquet("s3://my-bucket/silver/orders/")
)Complete Pipeline: CSV ā Validate ā Filter ā Parquet
This is a real Bronze-to-Silver pipeline pattern.
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import (
StructType, StructField,
StringType, IntegerType, DoubleType, DateType, BooleanType
)
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# āāā 1. Session āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
spark = (
SparkSession.builder
.appName("CustomerSilverPipeline")
.config("spark.sql.adaptive.enabled", "true")
.getOrCreate()
)
spark.sparkContext.setLogLevel("WARN")
# āāā 2. Define Schema āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
RAW_SCHEMA = StructType([
StructField("customer_id", IntegerType(), nullable=True),
StructField("email", StringType(), nullable=True),
StructField("full_name", StringType(), nullable=True),
StructField("signup_date", DateType(), nullable=True),
StructField("country_code", StringType(), nullable=True),
StructField("is_active", BooleanType(), nullable=True),
StructField("lifetime_value", DoubleType(), nullable=True),
])
# āāā 3. Read Raw CSV āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
SOURCE_PATH = "s3://my-bucket/raw/customers/2026/05/"
SINK_PATH = "s3://my-bucket/silver/customers/"
df_raw = (
spark.read
.schema(RAW_SCHEMA)
.option("header", "true")
.option("dateFormat", "yyyy-MM-dd")
.option("mode", "PERMISSIVE")
.option("columnNameOfCorruptRecord", "_corrupt_record")
.csv(SOURCE_PATH)
)
raw_count = df_raw.count()
logger.info(f"Raw rows read: {raw_count:,}")
# āāā 4. Quarantine Corrupt Records āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
df_corrupt = df_raw.filter(F.col("_corrupt_record").isNotNull())
corrupt_count = df_corrupt.count()
if corrupt_count > 0:
logger.warning(f"Quarantining {corrupt_count:,} corrupt records")
(
df_corrupt
.write
.mode("append")
.parquet("s3://my-bucket/quarantine/customers/")
)
df_parseable = df_raw.filter(F.col("_corrupt_record").isNull()).drop("_corrupt_record")
# āāā 5. Validate: Reject Nulls on Required Fields āāāāāāāāāāāāāāāāāāāāāāāāāāāā
REQUIRED_COLUMNS = ["customer_id", "email"]
null_condition = F.lit(False)
for col_name in REQUIRED_COLUMNS:
null_condition = null_condition | F.col(col_name).isNull()
df_invalid = df_parseable.filter(null_condition)
invalid_count = df_invalid.count()
if invalid_count > 0:
logger.warning(f"Dropping {invalid_count:,} rows missing required fields")
df_valid = df_parseable.filter(~null_condition)
# āāā 6. Clean and Enrich āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
df_clean = (
df_valid
# Normalize email to lowercase, trim whitespace
.withColumn("email", F.lower(F.trim(F.col("email"))))
# Normalize country code to uppercase
.withColumn("country_code", F.upper(F.trim(F.col("country_code"))))
# Extract email domain for analytics
.withColumn(
"email_domain",
F.split(F.col("email"), "@").getItem(1)
)
# Nullify obviously bad values
.withColumn(
"lifetime_value",
F.when(F.col("lifetime_value") < 0, F.lit(None))
.otherwise(F.col("lifetime_value"))
)
# Add pipeline metadata
.withColumn("_ingested_at", F.current_timestamp())
.withColumn("_source_file", F.input_file_name())
)
# āāā 7. Filter: Active Customers Only for Silver āāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
df_silver = df_clean.filter(F.col("is_active") == True)
silver_count = df_silver.count()
logger.info(f"Silver rows to write: {silver_count:,}")
# āāā 8. Write Silver Parquet, Partitioned by Country āāāāāāāāāāāāāāāāāāāāāāāāāā
(
df_silver
.repartition(20, "country_code") # co-locate by partition key
.write
.mode("overwrite")
.option("compression", "snappy")
.partitionBy("country_code")
.parquet(SINK_PATH)
)
logger.info(f"Pipeline complete. {silver_count:,} rows written to {SINK_PATH}")
# āāā 9. Verification Read-Back āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
df_verify = spark.read.parquet(SINK_PATH)
logger.info(f"Verification count: {df_verify.count():,}")
df_verify.printSchema()
df_verify.show(5, truncate=False)
spark.stop()Schema Evolution Patterns
When source schemas change, pipelines break. Handle this defensively:
from pyspark.sql.types import StructType
def read_with_schema_check(
spark: SparkSession,
path: str,
expected_schema: StructType,
) -> tuple:
"""
Read CSV and return (valid_df, missing_cols, extra_cols).
Lets callers decide how strictly to enforce schema.
"""
df = spark.read.option("header", "true").option("inferSchema", "true").csv(path)
actual_cols = set(df.columns)
expected_cols = {f.name for f in expected_schema.fields}
missing_cols = expected_cols - actual_cols
extra_cols = actual_cols - expected_cols
# Add missing columns as null so downstream processing doesn't break
for col_name in missing_cols:
field = next(f for f in expected_schema.fields if f.name == col_name)
df = df.withColumn(col_name, F.lit(None).cast(field.dataType))
return df, missing_cols, extra_colsKey Takeaways
- The driver plans work; executors do work. Never collect large datasets to the driver.
- Always use explicit StructType schemas in production ā never
inferSchema. - Spark is lazily evaluated: transformations build a plan; actions execute it.
- Write Parquet with
partitionByon low-cardinality filter columns for query performance. - Always quarantine corrupt records rather than silently dropping them.
- Use
explain()to see what Spark actually plans to do before you run it at scale.
Next up: mastering DataFrame transformations, window functions, and Spark SQL.
Enjoyed this article?
Explore the Data Engineering learning path for more.
Found this helpful?
Leave a comment
Have a question, correction, or just found this helpful? Leave a note below.