Back to blog
Data Engineeringintermediate

PySpark Structured Streaming: Kafka, Delta Lake, and Real-Time Pipelines

Build production-grade streaming pipelines with PySpark Structured Streaming β€” Kafka sources, watermarking, trigger strategies, foreachBatch sinks, fault tolerance, and Delta Live Tables.

LearnixoMay 7, 202610 min read
PySparkApache SparkStructured StreamingKafkaDelta LakeReal-TimeData Engineering
Share:𝕏

Structured Streaming vs DStreams

Apache Spark has two streaming APIs. You should only use one of them.

| Feature | DStreams (legacy) | Structured Streaming (current) | |---|---|---| | API | RDD-based micro-batch | DataFrame/SQL-based | | Fault tolerance | Manual checkpointing | Built-in, automatic | | Exactly-once | Hard to achieve | Built-in with idempotent sinks | | Late data | Not supported | Watermarking | | Performance | Lower (RDD overhead) | Catalyst-optimized | | Status | Deprecated in Spark 3.x | Active development |

Always use Structured Streaming. DStreams exist only in legacy code.

The key mental model: Structured Streaming treats a live data stream as an unbounded table that grows over time. Every readStream returns a "streaming DataFrame" and you write transformations against it exactly as you would a static DataFrame. The engine handles micro-batches, state, and fault recovery for you.

Architecture of a Structured Streaming Pipeline

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    Structured Streaming Engine                        β”‚
β”‚                                                                       β”‚
β”‚  Source                   Processing              Sink                β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”             β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”           β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”‚
β”‚  β”‚  Kafka   │──readStream─▢  Filter  │─writeStreamβ–Ά  Delta Lake  β”‚    β”‚
β”‚  β”‚  Files   β”‚             β”‚  Join    β”‚           β”‚  Kafka       β”‚    β”‚
β”‚  β”‚  Socket  β”‚             β”‚  Agg     β”‚           β”‚  Console     β”‚    β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜             β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜           β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚
β”‚                                β”‚                                      β”‚
β”‚                     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                          β”‚
β”‚                     β”‚  Checkpoint Store   β”‚                          β”‚
β”‚                     β”‚  (HDFS / S3 / ADLS) β”‚                          β”‚
β”‚                     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                          β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Reading Streams

From Kafka

Python
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, LongType, TimestampType

spark = (
    SparkSession.builder
    .appName("KafkaBronzePipeline")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog",
            "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .getOrCreate()
)

# ─── Basic Kafka read ─────────────────────────────────────────────────────────
df_kafka_raw = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "broker1:9092,broker2:9092")
    .option("subscribe", "orders.events")           # single topic
    # .option("subscribePattern", "orders.*")        # regex topic pattern
    # .option("assign", '{"orders.events": [0,1,2]}') # specific partitions
    .option("startingOffsets", "latest")             # latest | earliest | json offset
    .option("maxOffsetsPerTrigger", 100_000)         # rate limiting per micro-batch
    .option("failOnDataLoss", "false")               # don't fail if offsets are gone
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("kafka.sasl.jaas.config",
            "org.apache.kafka.common.security.plain.PlainLoginModule required "
            'username="user" password="secret";')
    .load()
)

# Kafka gives you these columns automatically:
# key (binary), value (binary), topic, partition, offset, timestamp, timestampType, headers
df_kafka_raw.printSchema()

Deserializing Kafka Messages

Python
# ─── JSON messages without Schema Registry ────────────────────────────────────

ORDER_EVENT_SCHEMA = StructType([
    StructField("order_id",    StringType(),    nullable=False),
    StructField("customer_id", StringType(),    nullable=False),
    StructField("event_type",  StringType(),    nullable=True),   # placed / cancelled / updated
    StructField("total",       StringType(),    nullable=True),   # parse as string, cast later
    StructField("items",       StringType(),    nullable=True),   # JSON array as string
    StructField("event_ts",    TimestampType(), nullable=True),
])

df_orders_parsed = (
    df_kafka_raw
    .select(
        F.col("key").cast("string").alias("kafka_key"),
        F.col("partition").alias("kafka_partition"),
        F.col("offset").alias("kafka_offset"),
        F.col("timestamp").alias("kafka_timestamp"),
        F.from_json(F.col("value").cast("string"), ORDER_EVENT_SCHEMA).alias("data"),
    )
    .select(
        "kafka_key",
        "kafka_partition",
        "kafka_offset",
        "kafka_timestamp",
        "data.*",
    )
    .withColumn("total", F.col("total").cast("double"))
    .withColumn(
        "event_ts",
        F.coalesce(F.col("event_ts"), F.col("kafka_timestamp"))
    )
)

From a File Source (for Development / Landing Zone)

Python
# Auto-detected schema from first file
df_file_stream = (
    spark.readStream
    .format("parquet")
    # Spark monitors this path for new files
    .option("path", "s3://my-bucket/landing/orders/")
    .option("maxFilesPerTrigger", 10)      # process at most 10 files per micro-batch
    .option("latestFirst", "false")         # process oldest files first
    .schema(ORDER_EVENT_SCHEMA)
    .load()
)

# CSV file stream
df_csv_stream = (
    spark.readStream
    .format("csv")
    .option("path", "s3://my-bucket/landing/events/*.csv")
    .option("header", "true")
    .option("maxFilesPerTrigger", 5)
    .schema(ORDER_EVENT_SCHEMA)
    .load()
)

From Socket (Development / Testing Only)

Python
# Never use in production β€” not fault-tolerant
df_socket = (
    spark.readStream
    .format("socket")
    .option("host", "localhost")
    .option("port", 9999)
    .load()
)

Trigger Types

The trigger controls how often Spark processes a micro-batch.

Python
from pyspark.sql.streaming import DataStreamWriter

# ─── ProcessingTime: run every N seconds ─────────────────────────────────────
query = (
    df_orders_parsed.writeStream
    .trigger(processingTime="30 seconds")   # run every 30s
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "s3://checkpoints/orders-bronze/")
    .start("s3://my-bucket/delta/orders_bronze/")
)

# ─── AvailableNow: process all available data, then stop ──────────────────────
# Replaces the deprecated Trigger.Once β€” ideal for scheduled batch-streaming jobs
query = (
    df_orders_parsed.writeStream
    .trigger(availableNow=True)
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "s3://checkpoints/orders-bronze/")
    .start("s3://my-bucket/delta/orders_bronze/")
)
query.awaitTermination()   # blocks until all available data is processed

# ─── Continuous: experimental, millisecond latency ────────────────────────────
# Only supports map operations (no aggregations, no stateful ops)
query = (
    df_orders_parsed.writeStream
    .trigger(continuous="1 second")   # checkpoint every 1 second
    .format("memory")
    .queryName("orders_live")
    .outputMode("append")
    .start()
)

Watermarking for Late Data

Without watermarking, Spark must keep state forever to handle late events β€” which eventually causes OOM errors. Watermarking tells Spark: "discard any state older than N time units."

Python
# ─── Apply watermark on the event timestamp column ────────────────────────────
df_watermarked = df_orders_parsed.withWatermark(
    "event_ts",      # the column representing event time
    "2 hours"        # tolerate events arriving up to 2 hours late
)

# ─── Aggregation with watermark ───────────────────────────────────────────────
# Spark will maintain window state only for events within the watermark window
df_windowed_agg = (
    df_watermarked
    .groupBy(
        F.window("event_ts", "10 minutes", "5 minutes"),   # 10-min tumbling windows, slide 5m
        "event_type",
    )
    .agg(
        F.count("order_id").alias("event_count"),
        F.sum("total").alias("window_revenue"),
    )
    .select(
        F.col("window.start").alias("window_start"),
        F.col("window.end").alias("window_end"),
        "event_type",
        "event_count",
        "window_revenue",
    )
)

Important: Watermarks only affect stateful operations (aggregations, stream-stream joins). They have no effect on stateless pipelines like simple filter + select.

Output Modes

Python
# append  β€” only new rows are written (safe for non-aggregating pipelines)
# update  β€” only changed rows since last trigger (aggregations that change)
# complete β€” entire result table is rewritten each trigger (small aggregations only)

# Stateless pipeline β†’ always use append
query_bronze = (
    df_orders_parsed.writeStream
    .outputMode("append")
    .format("delta")
    ...
)

# Aggregation with watermark β†’ use update (efficient) or complete (full rewrite)
query_agg = (
    df_windowed_agg.writeStream
    .outputMode("update")
    .format("delta")
    ...
)

# Complete only makes sense for small aggregated results (dashboard metrics, etc.)
query_complete = (
    df_country_totals.writeStream
    .outputMode("complete")
    .format("memory")
    .queryName("country_totals")
    ...
)

foreachBatch: Custom Sink Logic

foreachBatch gives you a static DataFrame for each micro-batch. Use it when your sink doesn't have a native Structured Streaming connector, or when you need conditional write logic.

Python
from pyspark.sql import DataFrame
from delta.tables import DeltaTable

SILVER_PATH = "s3://my-bucket/delta/orders_silver/"
DQ_LOG_PATH = "s3://my-bucket/delta/dq_log/"

def process_batch(batch_df: DataFrame, batch_id: int) -> None:
    """
    Called once per micro-batch.
    batch_df: static DataFrame for this batch
    batch_id: monotonically increasing batch sequence number
    """
    if batch_df.isEmpty():
        return

    # ── Data quality checks ──────────────────────────────────────────────────
    total_rows = batch_df.count()

    df_valid   = batch_df.filter(
        F.col("order_id").isNotNull() &
        F.col("customer_id").isNotNull() &
        (F.col("total") >= 0)
    )
    df_invalid = batch_df.subtract(df_valid)

    valid_count   = df_valid.count()
    invalid_count = df_invalid.count()

    # ── Log DQ metrics ───────────────────────────────────────────────────────
    dq_record = spark.createDataFrame([{
        "batch_id":      batch_id,
        "total_rows":    total_rows,
        "valid_rows":    valid_count,
        "invalid_rows":  invalid_count,
        "logged_at":     F.current_timestamp(),
    }])
    dq_record.write.format("delta").mode("append").save(DQ_LOG_PATH)

    # ── Quarantine invalid rows ───────────────────────────────────────────────
    if invalid_count > 0:
        (
            df_invalid
            .write
            .format("delta")
            .mode("append")
            .save("s3://my-bucket/delta/orders_quarantine/")
        )

    # ── Upsert valid rows into Silver (idempotent via batch_id) ───────────────
    if DeltaTable.isDeltaTable(spark, SILVER_PATH):
        delta_silver = DeltaTable.forPath(spark, SILVER_PATH)
        (
            delta_silver.alias("target")
            .merge(
                df_valid.alias("source"),
                "target.order_id = source.order_id"
            )
            .whenMatchedUpdateAll()
            .whenNotMatchedInsertAll()
            .execute()
        )
    else:
        df_valid.write.format("delta").mode("overwrite").save(SILVER_PATH)


query = (
    df_orders_parsed.writeStream
    .foreachBatch(process_batch)
    .option("checkpointLocation", "s3://checkpoints/orders-silver/")
    .trigger(processingTime="60 seconds")
    .start()
)

Checkpointing and Fault Tolerance

Checkpointing is mandatory in production. Without it, the query has no memory of its offset and restarts from the beginning.

Python
# ─── Always set checkpointLocation ───────────────────────────────────────────
query = (
    df_stream.writeStream
    .option("checkpointLocation", "s3://my-checkpoints/pipeline-name/v1/")
    .format("delta")
    .outputMode("append")
    .start("s3://my-bucket/delta/output/")
)

# The checkpoint stores:
# - Last committed Kafka offsets (source state)
# - Aggregation state (for stateful queries)
# - Write-ahead log of micro-batch IDs (prevents duplicates)

Important rules:

  1. Never change the checkpointLocation between restarts β€” it must match the original query.
  2. When you change the query logic significantly (schema, source, sink), you must use a new checkpoint location and handle backfill separately.
  3. With Delta Lake sinks and idempotent foreachBatch, you get exactly-once semantics. Without Delta, you may get at-least-once.

Monitoring Streaming Queries

Python
# Get reference to a running query
query = df_stream.writeStream.start(...)

# ─── Status ───────────────────────────────────────────────────────────────────
import json

# Current status snapshot
print(json.dumps(query.status, indent=2))
# {
#   "message": "Processing new data",
#   "isDataAvailable": true,
#   "isTriggerActive": true
# }

# ─── Last progress (metrics from last completed micro-batch) ──────────────────
progress = query.lastProgress
if progress:
    print(f"Batch ID         : {progress['batchId']}")
    print(f"Input rows       : {progress['numInputRows']}")
    print(f"Input rows/sec   : {progress['inputRowsPerSecond']:.1f}")
    print(f"Processed rows/s : {progress['processedRowsPerSecond']:.1f}")
    print(f"Trigger duration : {progress['triggerExecution']['prettyString']}")
    # Kafka-specific
    if "sources" in progress:
        for src in progress["sources"]:
            print(f"Source: {src.get('description')}")
            print(f"  End offset: {src.get('endOffset')}")

# ─── All recent progress entries ─────────────────────────────────────────────
for p in query.recentProgress[-5:]:
    print(f"Batch {p['batchId']}: {p['numInputRows']} rows")

# ─── Await termination ────────────────────────────────────────────────────────
query.awaitTermination()               # blocks forever
query.awaitTermination(timeout=300)    # blocks for 5 minutes, returns bool

# ─── Stop a query ─────────────────────────────────────────────────────────────
query.stop()

# ─── Exception handling ───────────────────────────────────────────────────────
try:
    query.awaitTermination()
except Exception as e:
    print(f"Query failed: {e}")
    # query.exception() returns the exception that caused failure
    exc = query.exception()
    if exc:
        print(f"Root cause: {exc}")

Delta Live Tables (Declarative Streaming Pipelines)

Delta Live Tables (DLT) is Databricks' higher-level abstraction over Structured Streaming. Instead of writing readStream / writeStream imperatively, you declare your tables and let DLT handle orchestration, retries, monitoring, and lineage.

Python
# In a DLT notebook (Databricks only):
import dlt
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

KAFKA_BOOTSTRAP = spark.conf.get("pipeline.kafka.bootstrap_servers")

ORDER_SCHEMA = StructType([
    StructField("order_id",    StringType(),    nullable=False),
    StructField("customer_id", StringType(),    nullable=False),
    StructField("total",       DoubleType(),    nullable=True),
    StructField("event_type",  StringType(),    nullable=True),
    StructField("event_ts",    TimestampType(), nullable=True),
])


# ─── Bronze: raw ingest from Kafka ────────────────────────────────────────────

@dlt.table(
    name="orders_bronze",
    comment="Raw order events from Kafka, no transformations",
    table_properties={"quality": "bronze"},
)
def orders_bronze():
    return (
        spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP)
        .option("subscribe", "orders.events")
        .option("startingOffsets", "latest")
        .load()
        .select(
            F.col("key").cast("string").alias("kafka_key"),
            F.col("value").cast("string").alias("raw_value"),
            F.col("timestamp").alias("kafka_timestamp"),
            F.col("partition").alias("kafka_partition"),
            F.col("offset").alias("kafka_offset"),
        )
    )


# ─── Silver: parse, validate, enrich ──────────────────────────────────────────

@dlt.expect("valid_order_id",   "order_id IS NOT NULL")
@dlt.expect("valid_customer_id","customer_id IS NOT NULL")
@dlt.expect_or_drop("positive_total", "total >= 0")
@dlt.table(
    name="orders_silver",
    comment="Parsed and validated order events",
    table_properties={"quality": "silver"},
)
def orders_silver():
    return (
        dlt.read_stream("orders_bronze")
        .withColumn("data", F.from_json(F.col("raw_value"), ORDER_SCHEMA))
        .select(
            "kafka_key",
            "kafka_timestamp",
            "kafka_partition",
            "kafka_offset",
            "data.*",
        )
        .withColumn("ingested_at", F.current_timestamp())
        .withWatermark("event_ts", "2 hours")
    )


# ─── Gold: aggregated metrics ──────────────────────────────────────────────────

@dlt.table(
    name="orders_gold_hourly",
    comment="Hourly order metrics for BI",
    table_properties={"quality": "gold"},
)
def orders_gold_hourly():
    return (
        dlt.read_stream("orders_silver")
        .groupBy(
            F.window("event_ts", "1 hour").alias("hour_window"),
            "event_type",
        )
        .agg(
            F.count("order_id").alias("order_count"),
            F.sum("total").alias("total_revenue"),
            F.avg("total").alias("avg_order_value"),
        )
        .select(
            F.col("hour_window.start").alias("hour_start"),
            F.col("hour_window.end").alias("hour_end"),
            "event_type",
            "order_count",
            "total_revenue",
            "avg_order_value",
        )
    )

Complete Production Pipeline: Kafka to Bronze Delta

Python
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import (
    StructType, StructField, StringType, DoubleType,
    TimestampType, IntegerType, ArrayType
)

# ─── Session ───────────────────────────────────────────────────────────────────
spark = (
    SparkSession.builder
    .appName("OrdersBronzePipeline")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog",
            "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .config("spark.databricks.delta.schema.autoMerge.enabled", "true")
    .getOrCreate()
)

# ─── Constants ────────────────────────────────────────────────────────────────
KAFKA_SERVERS    = "kafka-broker-1:9092,kafka-broker-2:9092"
KAFKA_TOPIC      = "orders.events"
BRONZE_PATH      = "s3://my-bucket/delta/orders_bronze/"
CHECKPOINT_PATH  = "s3://my-checkpoints/orders-bronze/v2/"

ORDER_SCHEMA = StructType([
    StructField("order_id",    StringType(),    nullable=False),
    StructField("customer_id", StringType(),    nullable=False),
    StructField("store_id",    StringType(),    nullable=True),
    StructField("event_type",  StringType(),    nullable=True),
    StructField("total",       DoubleType(),    nullable=True),
    StructField("currency",    StringType(),    nullable=True),
    StructField("items_count", IntegerType(),   nullable=True),
    StructField("event_ts",    TimestampType(), nullable=True),
])

# ─── Read from Kafka ──────────────────────────────────────────────────────────
df_raw = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", KAFKA_SERVERS)
    .option("subscribe", KAFKA_TOPIC)
    .option("startingOffsets", "latest")
    .option("maxOffsetsPerTrigger", 50_000)
    .option("failOnDataLoss", "false")
    .load()
)

# ─── Parse and flatten ────────────────────────────────────────────────────────
df_parsed = (
    df_raw
    .select(
        F.col("key").cast("string").alias("kafka_key"),
        F.from_json(F.col("value").cast("string"), ORDER_SCHEMA).alias("payload"),
        F.col("partition").alias("kafka_partition"),
        F.col("offset").alias("kafka_offset"),
        F.col("timestamp").alias("kafka_ingest_ts"),
    )
    .select(
        "kafka_key",
        "kafka_partition",
        "kafka_offset",
        "kafka_ingest_ts",
        "payload.*",
    )
    # Coalesce event_ts fallback to kafka ingest time
    .withColumn("event_ts", F.coalesce(F.col("event_ts"), F.col("kafka_ingest_ts")))
    # Add partition columns for Delta write
    .withColumn("event_date", F.to_date(F.col("event_ts")))
    .withColumn("event_hour", F.hour(F.col("event_ts")))
)

# ─── Write to Bronze Delta ────────────────────────────────────────────────────
query = (
    df_parsed
    .writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", CHECKPOINT_PATH)
    .option("mergeSchema", "true")
    .partitionBy("event_date", "event_hour")
    .trigger(processingTime="30 seconds")
    .start(BRONZE_PATH)
)

# ─── Wait and monitor ─────────────────────────────────────────────────────────
import time, json

for _ in range(20):
    time.sleep(30)
    if query.lastProgress:
        p = query.lastProgress
        print(
            f"Batch {p['batchId']:4d} | "
            f"Input: {p['numInputRows']:6,} rows | "
            f"Throughput: {p['processedRowsPerSecond']:,.0f} rows/s"
        )
    if not query.isActive:
        if query.exception():
            raise query.exception()
        break

query.awaitTermination()

Key Takeaways

  • Always use Structured Streaming, never DStreams.
  • Set checkpointLocation on every query β€” without it you have no fault tolerance.
  • Use watermarking for any aggregation over event time to bound state size.
  • AvailableNow trigger replaces Trigger.Once for scheduled batch-streaming jobs.
  • foreachBatch gives you full control over each micro-batch as a static DataFrame β€” ideal for upserts, DQ checks, and multi-sink writes.
  • With Delta Lake as your sink, you get schema evolution, time travel, and idempotent writes for free.
  • Delta Live Tables (Databricks) abstracts all the plumbing into declarative @dlt.table definitions.

Enjoyed this article?

Explore the Data Engineering learning path for more.

Found this helpful?

Share:𝕏

Leave a comment

Have a question, correction, or just found this helpful? Leave a note below.