Back to blog
Data Engineeringbeginner

PySpark Fundamentals: Architecture, DataFrames, and Your First Pipeline

Master Apache Spark from the ground up — understand the driver/executor model, RDDs vs DataFrames, schema handling, lazy evaluation, and build a complete CSV-to-Parquet pipeline.

LearnixoMay 7, 202610 min read
PySparkApache SparkData EngineeringDataFramesParquetETL
Share:š•

What Is Apache Spark and Why Do Data Engineers Use It?

When a single machine cannot process your data fast enough — or at all — you need a distributed compute engine. Apache Spark is the de-facto standard for large-scale data processing. It runs on clusters, processes terabytes in minutes, and provides a clean Python API through PySpark.

Unlike Hadoop MapReduce, Spark keeps data in memory between stages, making iterative workloads orders of magnitude faster.

Spark Architecture

Understanding the architecture is not optional. Every performance decision traces back to it.

ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”
│                    Driver Program                        │
│  ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”   │
│  │  SparkContext / SparkSession                     │   │
│  │  DAG Scheduler  →  Task Scheduler                │   │
│  ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜   │
ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¬ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜
                             │  schedules tasks
                             ā–¼
           ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”
           │        Cluster Manager          │
           │  (YARN / Kubernetes / Standalone)│
           ā””ā”€ā”€ā”€ā”€ā”¬ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¬ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜
                │              │
         ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā–¼ā”€ā”€ā”€ā”€ā”€ā”€ā” ā”Œā”€ā”€ā”€ā”€ā”€ā–¼ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”
         │  Executor 1  │ │  Executor 2 │
         │  ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”  │ │  ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā” │
         │  │ Task 1 │  │ │  │ Task 3 │ │
         │  │ Task 2 │  │ │  │ Task 4 │ │
         │  ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜  │ │  ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜ │
         ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜ ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜

Driver

The driver is your Python process. It holds the SparkSession, builds the execution plan (DAG), and coordinates work. It never processes data — it only directs. One expensive mistake: collecting large datasets back to the driver with .collect(). That brings millions of rows into a single machine's memory.

Executors

Executors are JVM processes on worker nodes. Each executor runs tasks in parallel across its CPU cores, and holds its partition of data in memory or on disk. Executor count, cores per executor, and memory are the primary tuning knobs.

Cluster Manager

The cluster manager allocates resources. In production you'll use YARN (Hadoop clusters), Kubernetes (cloud-native), or Databricks' managed runtime. For local development, local[*] uses all CPU cores on your machine.

DAG Scheduler

When you call an action (like .write), Spark compiles your transformation chain into a Directed Acyclic Graph of stages. Each stage is a set of tasks that can run in parallel without shuffling data across the network. The DAG scheduler optimizes this graph before execution.

RDDs, DataFrames, and Datasets

Spark has three APIs. Data engineers almost always use DataFrames.

| Concept | Description | When to Use | |---|---|---| | RDD | Low-level, untyped distributed collection | Rarely — legacy code or custom partitioning | | DataFrame | Distributed table with named columns and schema | Default choice — optimized by Catalyst | | Dataset | Typed DataFrame (Scala/Java only in practice) | Not available in PySpark |

The Catalyst optimizer rewrites your DataFrame query into an optimized physical plan. It applies predicate pushdown, column pruning, join reordering, and more — automatically. This is why DataFrames outperform hand-written RDD code.

Creating a SparkSession

SparkSession is the single entry point for all Spark functionality.

Python
from pyspark.sql import SparkSession

# Local development — uses all available cores
spark = (
    SparkSession.builder
    .appName("CustomerPipeline")
    .master("local[*]")
    # Tune memory for local runs
    .config("spark.driver.memory", "4g")
    .config("spark.executor.memory", "4g")
    # Enable Delta Lake support
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config(
        "spark.sql.catalog.spark_catalog",
        "org.apache.spark.sql.delta.catalog.DeltaCatalog",
    )
    .getOrCreate()
)

# Always set log level in notebooks to reduce noise
spark.sparkContext.setLogLevel("WARN")

print(spark.version)  # e.g., 3.5.0

In production (Databricks, EMR, Dataproc), the SparkSession is pre-created. Use SparkSession.getOrCreate() to retrieve it without re-creating.

Reading Data

CSV with Schema Inference

Python
df_raw = (
    spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .option("nullValue", "NULL")
    .option("emptyValue", "")
    .csv("s3://my-bucket/raw/customers/2026/")
)

df_raw.printSchema()
# root
#  |-- customer_id: integer (nullable = true)
#  |-- email: string (nullable = true)
#  |-- signup_date: string (nullable = true)   ← dates inferred as strings!
#  |-- revenue: double (nullable = true)

Problem with inferSchema: Spark reads the entire first partition to guess types. It gets dates wrong (they come in as strings), it scans extra data on every job, and the inferred schema can change if data changes. Never use inferSchema in production.

Explicit Schema with StructType

Python
from pyspark.sql.types import (
    StructType, StructField,
    StringType, IntegerType, DoubleType, DateType, BooleanType, TimestampType
)

customer_schema = StructType([
    StructField("customer_id",   IntegerType(),   nullable=False),
    StructField("email",         StringType(),    nullable=False),
    StructField("full_name",     StringType(),    nullable=True),
    StructField("signup_date",   DateType(),      nullable=True),
    StructField("country_code",  StringType(),    nullable=True),
    StructField("is_active",     BooleanType(),   nullable=True),
    StructField("lifetime_value",DoubleType(),    nullable=True),
])

df = (
    spark.read
    .schema(customer_schema)
    .option("header", "true")
    .option("dateFormat", "yyyy-MM-dd")
    .option("mode", "DROPMALFORMED")  # options: PERMISSIVE, DROPMALFORMED, FAILFAST
    .csv("s3://my-bucket/raw/customers/2026/")
)

The mode option controls bad record handling:

  • PERMISSIVE (default): puts bad rows in a _corrupt_record column
  • DROPMALFORMED: silently drops rows that don't match the schema
  • FAILFAST: throws an exception on the first bad row

In production, use PERMISSIVE and route corrupt records to a dead-letter path for inspection.

Reading Parquet

Python
# Parquet stores schema in file metadata — no schema needed
df_orders = spark.read.parquet("s3://my-bucket/silver/orders/")

# Read specific columns only (column pruning happens at file level)
df_orders_slim = spark.read.parquet("s3://my-bucket/silver/orders/").select(
    "order_id", "customer_id", "order_date", "total_amount"
)

Reading JSON

Python
# Multi-line JSON (one object per file)
df_events = (
    spark.read
    .option("multiLine", "true")
    .json("s3://my-bucket/raw/events/")
)

# JSON Lines (one JSON object per line) — default
df_events = spark.read.json("s3://my-bucket/raw/events/")

Reading Delta Lake

Python
# Read current snapshot
df_delta = spark.read.format("delta").load("s3://my-bucket/delta/customers/")

# Read a specific version (time travel)
df_yesterday = (
    spark.read
    .format("delta")
    .option("versionAsOf", 5)
    .load("s3://my-bucket/delta/customers/")
)

# Read at a specific timestamp
df_as_of = (
    spark.read
    .format("delta")
    .option("timestampAsOf", "2026-04-01 00:00:00")
    .load("s3://my-bucket/delta/customers/")
)

Inspecting DataFrames

Python
# Show first N rows (default 20), truncate=False shows full column values
df.show(10, truncate=False)

# Print the schema tree
df.printSchema()

# Count rows — triggers a full scan, use sparingly
print(f"Row count: {df.count():,}")

# Column names and types as a list
print(df.dtypes)
# [('customer_id', 'int'), ('email', 'string'), ...]

# Summary statistics (like pandas describe)
df.describe("lifetime_value", "revenue").show()

# View the execution plan
# Physical plan only
df.filter("is_active = true").explain()

# Full plan: parsed → analyzed → optimized → physical
df.filter("is_active = true").explain(mode="extended")

Lazy Evaluation: The Key Mental Model

This is the most important concept in Spark. Nothing runs until you call an action.

Python
# These are TRANSFORMATIONS — they build a logical plan, no data moves
df_active = df.filter("is_active = true")          # transformation
df_enriched = df_active.withColumn(                 # transformation
    "email_domain",
    F.split(F.col("email"), "@").getItem(1)
)
df_final = df_enriched.select(                      # transformation
    "customer_id", "email", "email_domain", "country_code"
)

# ACTIONS — these trigger actual computation:
df_final.show()            # action: materialize and display
df_final.count()           # action: count rows
df_final.collect()         # action: bring all data to driver (dangerous!)
df_final.write.parquet(…)  # action: write to storage
df_final.first()           # action: get first row

Transformation vs Action quick reference:

| Transformations (lazy) | Actions (eager) | |---|---| | select, filter, where | show, collect | | withColumn, drop | count, first, take | | groupBy, agg | write, save | | join, union | foreach, toPandas | | repartition, coalesce | describe |

Because transformations are lazy, Spark can optimize the entire chain before running it. It may reorder operations, push filters down to the file reader, or eliminate unused columns before any data is read.

Writing Output

Writing Parquet (Production Standard)

Python
(
    df_final
    .write
    .mode("overwrite")           # overwrite | append | ignore | errorIfExists
    .option("compression", "snappy")  # snappy | gzip | zstd | none
    .parquet("s3://my-bucket/silver/customers/")
)

Writing with Partitioning

Partitioning splits the output across subdirectories by column value. Queries that filter on the partition column skip entire directories — this is called partition pruning.

Python
(
    df_orders
    .write
    .mode("overwrite")
    .partitionBy("year", "month")   # creates .../year=2026/month=05/part-*.parquet
    .parquet("s3://my-bucket/silver/orders/")
)

Choose partition columns carefully:

  • High-cardinality columns (like customer_id) create millions of tiny files — bad
  • Low-cardinality columns that queries filter on (like country, year, month) — good
  • Aim for partition files between 128MB and 1GB each

Writing Delta Lake

Python
(
    df_final
    .write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .partitionBy("country_code")
    .save("s3://my-bucket/delta/customers/")
)

Controlling Output File Count

By default Spark writes one file per partition (task). Repartition before writing to control file count:

Python
# Write exactly 10 Parquet files
(
    df_final
    .repartition(10)
    .write
    .mode("overwrite")
    .parquet("s3://my-bucket/silver/customers/")
)

# Repartition by a column — co-locates related data
(
    df_orders
    .repartition(200, "customer_id")
    .write
    .mode("overwrite")
    .parquet("s3://my-bucket/silver/orders/")
)

Complete Pipeline: CSV → Validate → Filter → Parquet

This is a real Bronze-to-Silver pipeline pattern.

Python
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import (
    StructType, StructField,
    StringType, IntegerType, DoubleType, DateType, BooleanType
)
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


# ─── 1. Session ───────────────────────────────────────────────────────────────

spark = (
    SparkSession.builder
    .appName("CustomerSilverPipeline")
    .config("spark.sql.adaptive.enabled", "true")
    .getOrCreate()
)
spark.sparkContext.setLogLevel("WARN")


# ─── 2. Define Schema ─────────────────────────────────────────────────────────

RAW_SCHEMA = StructType([
    StructField("customer_id",    IntegerType(),  nullable=True),
    StructField("email",          StringType(),   nullable=True),
    StructField("full_name",      StringType(),   nullable=True),
    StructField("signup_date",    DateType(),     nullable=True),
    StructField("country_code",   StringType(),   nullable=True),
    StructField("is_active",      BooleanType(),  nullable=True),
    StructField("lifetime_value", DoubleType(),   nullable=True),
])


# ─── 3. Read Raw CSV ──────────────────────────────────────────────────────────

SOURCE_PATH = "s3://my-bucket/raw/customers/2026/05/"
SINK_PATH   = "s3://my-bucket/silver/customers/"

df_raw = (
    spark.read
    .schema(RAW_SCHEMA)
    .option("header", "true")
    .option("dateFormat", "yyyy-MM-dd")
    .option("mode", "PERMISSIVE")
    .option("columnNameOfCorruptRecord", "_corrupt_record")
    .csv(SOURCE_PATH)
)

raw_count = df_raw.count()
logger.info(f"Raw rows read: {raw_count:,}")


# ─── 4. Quarantine Corrupt Records ────────────────────────────────────────────

df_corrupt = df_raw.filter(F.col("_corrupt_record").isNotNull())
corrupt_count = df_corrupt.count()

if corrupt_count > 0:
    logger.warning(f"Quarantining {corrupt_count:,} corrupt records")
    (
        df_corrupt
        .write
        .mode("append")
        .parquet("s3://my-bucket/quarantine/customers/")
    )

df_parseable = df_raw.filter(F.col("_corrupt_record").isNull()).drop("_corrupt_record")


# ─── 5. Validate: Reject Nulls on Required Fields ────────────────────────────

REQUIRED_COLUMNS = ["customer_id", "email"]

null_condition = F.lit(False)
for col_name in REQUIRED_COLUMNS:
    null_condition = null_condition | F.col(col_name).isNull()

df_invalid = df_parseable.filter(null_condition)
invalid_count = df_invalid.count()

if invalid_count > 0:
    logger.warning(f"Dropping {invalid_count:,} rows missing required fields")

df_valid = df_parseable.filter(~null_condition)


# ─── 6. Clean and Enrich ──────────────────────────────────────────────────────

df_clean = (
    df_valid
    # Normalize email to lowercase, trim whitespace
    .withColumn("email", F.lower(F.trim(F.col("email"))))
    # Normalize country code to uppercase
    .withColumn("country_code", F.upper(F.trim(F.col("country_code"))))
    # Extract email domain for analytics
    .withColumn(
        "email_domain",
        F.split(F.col("email"), "@").getItem(1)
    )
    # Nullify obviously bad values
    .withColumn(
        "lifetime_value",
        F.when(F.col("lifetime_value") < 0, F.lit(None))
         .otherwise(F.col("lifetime_value"))
    )
    # Add pipeline metadata
    .withColumn("_ingested_at", F.current_timestamp())
    .withColumn("_source_file", F.input_file_name())
)


# ─── 7. Filter: Active Customers Only for Silver ──────────────────────────────

df_silver = df_clean.filter(F.col("is_active") == True)

silver_count = df_silver.count()
logger.info(f"Silver rows to write: {silver_count:,}")


# ─── 8. Write Silver Parquet, Partitioned by Country ──────────────────────────

(
    df_silver
    .repartition(20, "country_code")     # co-locate by partition key
    .write
    .mode("overwrite")
    .option("compression", "snappy")
    .partitionBy("country_code")
    .parquet(SINK_PATH)
)

logger.info(f"Pipeline complete. {silver_count:,} rows written to {SINK_PATH}")


# ─── 9. Verification Read-Back ────────────────────────────────────────────────

df_verify = spark.read.parquet(SINK_PATH)
logger.info(f"Verification count: {df_verify.count():,}")
df_verify.printSchema()
df_verify.show(5, truncate=False)

spark.stop()

Schema Evolution Patterns

When source schemas change, pipelines break. Handle this defensively:

Python
from pyspark.sql.types import StructType

def read_with_schema_check(
    spark: SparkSession,
    path: str,
    expected_schema: StructType,
) -> tuple:
    """
    Read CSV and return (valid_df, missing_cols, extra_cols).
    Lets callers decide how strictly to enforce schema.
    """
    df = spark.read.option("header", "true").option("inferSchema", "true").csv(path)

    actual_cols   = set(df.columns)
    expected_cols = {f.name for f in expected_schema.fields}

    missing_cols = expected_cols - actual_cols
    extra_cols   = actual_cols - expected_cols

    # Add missing columns as null so downstream processing doesn't break
    for col_name in missing_cols:
        field = next(f for f in expected_schema.fields if f.name == col_name)
        df = df.withColumn(col_name, F.lit(None).cast(field.dataType))

    return df, missing_cols, extra_cols

Key Takeaways

  • The driver plans work; executors do work. Never collect large datasets to the driver.
  • Always use explicit StructType schemas in production — never inferSchema.
  • Spark is lazily evaluated: transformations build a plan; actions execute it.
  • Write Parquet with partitionBy on low-cardinality filter columns for query performance.
  • Always quarantine corrupt records rather than silently dropping them.
  • Use explain() to see what Spark actually plans to do before you run it at scale.

Next up: mastering DataFrame transformations, window functions, and Spark SQL.

Enjoyed this article?

Explore the Data Engineering learning path for more.

Found this helpful?

Share:š•

Leave a comment

Have a question, correction, or just found this helpful? Leave a note below.