PySpark Structured Streaming: Kafka, Delta Lake, and Real-Time Pipelines
Build production-grade streaming pipelines with PySpark Structured Streaming β Kafka sources, watermarking, trigger strategies, foreachBatch sinks, fault tolerance, and Delta Live Tables.
Structured Streaming vs DStreams
Apache Spark has two streaming APIs. You should only use one of them.
| Feature | DStreams (legacy) | Structured Streaming (current) | |---|---|---| | API | RDD-based micro-batch | DataFrame/SQL-based | | Fault tolerance | Manual checkpointing | Built-in, automatic | | Exactly-once | Hard to achieve | Built-in with idempotent sinks | | Late data | Not supported | Watermarking | | Performance | Lower (RDD overhead) | Catalyst-optimized | | Status | Deprecated in Spark 3.x | Active development |
Always use Structured Streaming. DStreams exist only in legacy code.
The key mental model: Structured Streaming treats a live data stream as an unbounded table that grows over time. Every readStream returns a "streaming DataFrame" and you write transformations against it exactly as you would a static DataFrame. The engine handles micro-batches, state, and fault recovery for you.
Architecture of a Structured Streaming Pipeline
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Structured Streaming Engine β
β β
β Source Processing Sink β
β ββββββββββββ ββββββββββββ ββββββββββββββββ β
β β Kafka βββreadStreamββΆ Filter ββwriteStreamβΆ Delta Lake β β
β β Files β β Join β β Kafka β β
β β Socket β β Agg β β Console β β
β ββββββββββββ ββββββββββββ ββββββββββββββββ β
β β β
β ββββββββββββΌβββββββββββ β
β β Checkpoint Store β β
β β (HDFS / S3 / ADLS) β β
β βββββββββββββββββββββββ β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββReading Streams
From Kafka
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, LongType, TimestampType
spark = (
SparkSession.builder
.appName("KafkaBronzePipeline")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog")
.getOrCreate()
)
# βββ Basic Kafka read βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
df_kafka_raw = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "broker1:9092,broker2:9092")
.option("subscribe", "orders.events") # single topic
# .option("subscribePattern", "orders.*") # regex topic pattern
# .option("assign", '{"orders.events": [0,1,2]}') # specific partitions
.option("startingOffsets", "latest") # latest | earliest | json offset
.option("maxOffsetsPerTrigger", 100_000) # rate limiting per micro-batch
.option("failOnDataLoss", "false") # don't fail if offsets are gone
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required "
'username="user" password="secret";')
.load()
)
# Kafka gives you these columns automatically:
# key (binary), value (binary), topic, partition, offset, timestamp, timestampType, headers
df_kafka_raw.printSchema()Deserializing Kafka Messages
# βββ JSON messages without Schema Registry ββββββββββββββββββββββββββββββββββββ
ORDER_EVENT_SCHEMA = StructType([
StructField("order_id", StringType(), nullable=False),
StructField("customer_id", StringType(), nullable=False),
StructField("event_type", StringType(), nullable=True), # placed / cancelled / updated
StructField("total", StringType(), nullable=True), # parse as string, cast later
StructField("items", StringType(), nullable=True), # JSON array as string
StructField("event_ts", TimestampType(), nullable=True),
])
df_orders_parsed = (
df_kafka_raw
.select(
F.col("key").cast("string").alias("kafka_key"),
F.col("partition").alias("kafka_partition"),
F.col("offset").alias("kafka_offset"),
F.col("timestamp").alias("kafka_timestamp"),
F.from_json(F.col("value").cast("string"), ORDER_EVENT_SCHEMA).alias("data"),
)
.select(
"kafka_key",
"kafka_partition",
"kafka_offset",
"kafka_timestamp",
"data.*",
)
.withColumn("total", F.col("total").cast("double"))
.withColumn(
"event_ts",
F.coalesce(F.col("event_ts"), F.col("kafka_timestamp"))
)
)From a File Source (for Development / Landing Zone)
# Auto-detected schema from first file
df_file_stream = (
spark.readStream
.format("parquet")
# Spark monitors this path for new files
.option("path", "s3://my-bucket/landing/orders/")
.option("maxFilesPerTrigger", 10) # process at most 10 files per micro-batch
.option("latestFirst", "false") # process oldest files first
.schema(ORDER_EVENT_SCHEMA)
.load()
)
# CSV file stream
df_csv_stream = (
spark.readStream
.format("csv")
.option("path", "s3://my-bucket/landing/events/*.csv")
.option("header", "true")
.option("maxFilesPerTrigger", 5)
.schema(ORDER_EVENT_SCHEMA)
.load()
)From Socket (Development / Testing Only)
# Never use in production β not fault-tolerant
df_socket = (
spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
)Trigger Types
The trigger controls how often Spark processes a micro-batch.
from pyspark.sql.streaming import DataStreamWriter
# βββ ProcessingTime: run every N seconds βββββββββββββββββββββββββββββββββββββ
query = (
df_orders_parsed.writeStream
.trigger(processingTime="30 seconds") # run every 30s
.format("delta")
.outputMode("append")
.option("checkpointLocation", "s3://checkpoints/orders-bronze/")
.start("s3://my-bucket/delta/orders_bronze/")
)
# βββ AvailableNow: process all available data, then stop ββββββββββββββββββββββ
# Replaces the deprecated Trigger.Once β ideal for scheduled batch-streaming jobs
query = (
df_orders_parsed.writeStream
.trigger(availableNow=True)
.format("delta")
.outputMode("append")
.option("checkpointLocation", "s3://checkpoints/orders-bronze/")
.start("s3://my-bucket/delta/orders_bronze/")
)
query.awaitTermination() # blocks until all available data is processed
# βββ Continuous: experimental, millisecond latency ββββββββββββββββββββββββββββ
# Only supports map operations (no aggregations, no stateful ops)
query = (
df_orders_parsed.writeStream
.trigger(continuous="1 second") # checkpoint every 1 second
.format("memory")
.queryName("orders_live")
.outputMode("append")
.start()
)Watermarking for Late Data
Without watermarking, Spark must keep state forever to handle late events β which eventually causes OOM errors. Watermarking tells Spark: "discard any state older than N time units."
# βββ Apply watermark on the event timestamp column ββββββββββββββββββββββββββββ
df_watermarked = df_orders_parsed.withWatermark(
"event_ts", # the column representing event time
"2 hours" # tolerate events arriving up to 2 hours late
)
# βββ Aggregation with watermark βββββββββββββββββββββββββββββββββββββββββββββββ
# Spark will maintain window state only for events within the watermark window
df_windowed_agg = (
df_watermarked
.groupBy(
F.window("event_ts", "10 minutes", "5 minutes"), # 10-min tumbling windows, slide 5m
"event_type",
)
.agg(
F.count("order_id").alias("event_count"),
F.sum("total").alias("window_revenue"),
)
.select(
F.col("window.start").alias("window_start"),
F.col("window.end").alias("window_end"),
"event_type",
"event_count",
"window_revenue",
)
)Important: Watermarks only affect stateful operations (aggregations, stream-stream joins). They have no effect on stateless pipelines like simple filter + select.
Output Modes
# append β only new rows are written (safe for non-aggregating pipelines)
# update β only changed rows since last trigger (aggregations that change)
# complete β entire result table is rewritten each trigger (small aggregations only)
# Stateless pipeline β always use append
query_bronze = (
df_orders_parsed.writeStream
.outputMode("append")
.format("delta")
...
)
# Aggregation with watermark β use update (efficient) or complete (full rewrite)
query_agg = (
df_windowed_agg.writeStream
.outputMode("update")
.format("delta")
...
)
# Complete only makes sense for small aggregated results (dashboard metrics, etc.)
query_complete = (
df_country_totals.writeStream
.outputMode("complete")
.format("memory")
.queryName("country_totals")
...
)foreachBatch: Custom Sink Logic
foreachBatch gives you a static DataFrame for each micro-batch. Use it when your sink doesn't have a native Structured Streaming connector, or when you need conditional write logic.
from pyspark.sql import DataFrame
from delta.tables import DeltaTable
SILVER_PATH = "s3://my-bucket/delta/orders_silver/"
DQ_LOG_PATH = "s3://my-bucket/delta/dq_log/"
def process_batch(batch_df: DataFrame, batch_id: int) -> None:
"""
Called once per micro-batch.
batch_df: static DataFrame for this batch
batch_id: monotonically increasing batch sequence number
"""
if batch_df.isEmpty():
return
# ββ Data quality checks ββββββββββββββββββββββββββββββββββββββββββββββββββ
total_rows = batch_df.count()
df_valid = batch_df.filter(
F.col("order_id").isNotNull() &
F.col("customer_id").isNotNull() &
(F.col("total") >= 0)
)
df_invalid = batch_df.subtract(df_valid)
valid_count = df_valid.count()
invalid_count = df_invalid.count()
# ββ Log DQ metrics βββββββββββββββββββββββββββββββββββββββββββββββββββββββ
dq_record = spark.createDataFrame([{
"batch_id": batch_id,
"total_rows": total_rows,
"valid_rows": valid_count,
"invalid_rows": invalid_count,
"logged_at": F.current_timestamp(),
}])
dq_record.write.format("delta").mode("append").save(DQ_LOG_PATH)
# ββ Quarantine invalid rows βββββββββββββββββββββββββββββββββββββββββββββββ
if invalid_count > 0:
(
df_invalid
.write
.format("delta")
.mode("append")
.save("s3://my-bucket/delta/orders_quarantine/")
)
# ββ Upsert valid rows into Silver (idempotent via batch_id) βββββββββββββββ
if DeltaTable.isDeltaTable(spark, SILVER_PATH):
delta_silver = DeltaTable.forPath(spark, SILVER_PATH)
(
delta_silver.alias("target")
.merge(
df_valid.alias("source"),
"target.order_id = source.order_id"
)
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
else:
df_valid.write.format("delta").mode("overwrite").save(SILVER_PATH)
query = (
df_orders_parsed.writeStream
.foreachBatch(process_batch)
.option("checkpointLocation", "s3://checkpoints/orders-silver/")
.trigger(processingTime="60 seconds")
.start()
)Checkpointing and Fault Tolerance
Checkpointing is mandatory in production. Without it, the query has no memory of its offset and restarts from the beginning.
# βββ Always set checkpointLocation βββββββββββββββββββββββββββββββββββββββββββ
query = (
df_stream.writeStream
.option("checkpointLocation", "s3://my-checkpoints/pipeline-name/v1/")
.format("delta")
.outputMode("append")
.start("s3://my-bucket/delta/output/")
)
# The checkpoint stores:
# - Last committed Kafka offsets (source state)
# - Aggregation state (for stateful queries)
# - Write-ahead log of micro-batch IDs (prevents duplicates)Important rules:
- Never change the
checkpointLocationbetween restarts β it must match the original query. - When you change the query logic significantly (schema, source, sink), you must use a new checkpoint location and handle backfill separately.
- With Delta Lake sinks and idempotent
foreachBatch, you get exactly-once semantics. Without Delta, you may get at-least-once.
Monitoring Streaming Queries
# Get reference to a running query
query = df_stream.writeStream.start(...)
# βββ Status βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
import json
# Current status snapshot
print(json.dumps(query.status, indent=2))
# {
# "message": "Processing new data",
# "isDataAvailable": true,
# "isTriggerActive": true
# }
# βββ Last progress (metrics from last completed micro-batch) ββββββββββββββββββ
progress = query.lastProgress
if progress:
print(f"Batch ID : {progress['batchId']}")
print(f"Input rows : {progress['numInputRows']}")
print(f"Input rows/sec : {progress['inputRowsPerSecond']:.1f}")
print(f"Processed rows/s : {progress['processedRowsPerSecond']:.1f}")
print(f"Trigger duration : {progress['triggerExecution']['prettyString']}")
# Kafka-specific
if "sources" in progress:
for src in progress["sources"]:
print(f"Source: {src.get('description')}")
print(f" End offset: {src.get('endOffset')}")
# βββ All recent progress entries βββββββββββββββββββββββββββββββββββββββββββββ
for p in query.recentProgress[-5:]:
print(f"Batch {p['batchId']}: {p['numInputRows']} rows")
# βββ Await termination ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
query.awaitTermination() # blocks forever
query.awaitTermination(timeout=300) # blocks for 5 minutes, returns bool
# βββ Stop a query βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
query.stop()
# βββ Exception handling βββββββββββββββββββββββββββββββββββββββββββββββββββββββ
try:
query.awaitTermination()
except Exception as e:
print(f"Query failed: {e}")
# query.exception() returns the exception that caused failure
exc = query.exception()
if exc:
print(f"Root cause: {exc}")Delta Live Tables (Declarative Streaming Pipelines)
Delta Live Tables (DLT) is Databricks' higher-level abstraction over Structured Streaming. Instead of writing readStream / writeStream imperatively, you declare your tables and let DLT handle orchestration, retries, monitoring, and lineage.
# In a DLT notebook (Databricks only):
import dlt
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
KAFKA_BOOTSTRAP = spark.conf.get("pipeline.kafka.bootstrap_servers")
ORDER_SCHEMA = StructType([
StructField("order_id", StringType(), nullable=False),
StructField("customer_id", StringType(), nullable=False),
StructField("total", DoubleType(), nullable=True),
StructField("event_type", StringType(), nullable=True),
StructField("event_ts", TimestampType(), nullable=True),
])
# βββ Bronze: raw ingest from Kafka ββββββββββββββββββββββββββββββββββββββββββββ
@dlt.table(
name="orders_bronze",
comment="Raw order events from Kafka, no transformations",
table_properties={"quality": "bronze"},
)
def orders_bronze():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP)
.option("subscribe", "orders.events")
.option("startingOffsets", "latest")
.load()
.select(
F.col("key").cast("string").alias("kafka_key"),
F.col("value").cast("string").alias("raw_value"),
F.col("timestamp").alias("kafka_timestamp"),
F.col("partition").alias("kafka_partition"),
F.col("offset").alias("kafka_offset"),
)
)
# βββ Silver: parse, validate, enrich ββββββββββββββββββββββββββββββββββββββββββ
@dlt.expect("valid_order_id", "order_id IS NOT NULL")
@dlt.expect("valid_customer_id","customer_id IS NOT NULL")
@dlt.expect_or_drop("positive_total", "total >= 0")
@dlt.table(
name="orders_silver",
comment="Parsed and validated order events",
table_properties={"quality": "silver"},
)
def orders_silver():
return (
dlt.read_stream("orders_bronze")
.withColumn("data", F.from_json(F.col("raw_value"), ORDER_SCHEMA))
.select(
"kafka_key",
"kafka_timestamp",
"kafka_partition",
"kafka_offset",
"data.*",
)
.withColumn("ingested_at", F.current_timestamp())
.withWatermark("event_ts", "2 hours")
)
# βββ Gold: aggregated metrics ββββββββββββββββββββββββββββββββββββββββββββββββββ
@dlt.table(
name="orders_gold_hourly",
comment="Hourly order metrics for BI",
table_properties={"quality": "gold"},
)
def orders_gold_hourly():
return (
dlt.read_stream("orders_silver")
.groupBy(
F.window("event_ts", "1 hour").alias("hour_window"),
"event_type",
)
.agg(
F.count("order_id").alias("order_count"),
F.sum("total").alias("total_revenue"),
F.avg("total").alias("avg_order_value"),
)
.select(
F.col("hour_window.start").alias("hour_start"),
F.col("hour_window.end").alias("hour_end"),
"event_type",
"order_count",
"total_revenue",
"avg_order_value",
)
)Complete Production Pipeline: Kafka to Bronze Delta
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import (
StructType, StructField, StringType, DoubleType,
TimestampType, IntegerType, ArrayType
)
# βββ Session βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
spark = (
SparkSession.builder
.appName("OrdersBronzePipeline")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog")
.config("spark.databricks.delta.schema.autoMerge.enabled", "true")
.getOrCreate()
)
# βββ Constants ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
KAFKA_SERVERS = "kafka-broker-1:9092,kafka-broker-2:9092"
KAFKA_TOPIC = "orders.events"
BRONZE_PATH = "s3://my-bucket/delta/orders_bronze/"
CHECKPOINT_PATH = "s3://my-checkpoints/orders-bronze/v2/"
ORDER_SCHEMA = StructType([
StructField("order_id", StringType(), nullable=False),
StructField("customer_id", StringType(), nullable=False),
StructField("store_id", StringType(), nullable=True),
StructField("event_type", StringType(), nullable=True),
StructField("total", DoubleType(), nullable=True),
StructField("currency", StringType(), nullable=True),
StructField("items_count", IntegerType(), nullable=True),
StructField("event_ts", TimestampType(), nullable=True),
])
# βββ Read from Kafka ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
df_raw = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", KAFKA_SERVERS)
.option("subscribe", KAFKA_TOPIC)
.option("startingOffsets", "latest")
.option("maxOffsetsPerTrigger", 50_000)
.option("failOnDataLoss", "false")
.load()
)
# βββ Parse and flatten ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
df_parsed = (
df_raw
.select(
F.col("key").cast("string").alias("kafka_key"),
F.from_json(F.col("value").cast("string"), ORDER_SCHEMA).alias("payload"),
F.col("partition").alias("kafka_partition"),
F.col("offset").alias("kafka_offset"),
F.col("timestamp").alias("kafka_ingest_ts"),
)
.select(
"kafka_key",
"kafka_partition",
"kafka_offset",
"kafka_ingest_ts",
"payload.*",
)
# Coalesce event_ts fallback to kafka ingest time
.withColumn("event_ts", F.coalesce(F.col("event_ts"), F.col("kafka_ingest_ts")))
# Add partition columns for Delta write
.withColumn("event_date", F.to_date(F.col("event_ts")))
.withColumn("event_hour", F.hour(F.col("event_ts")))
)
# βββ Write to Bronze Delta ββββββββββββββββββββββββββββββββββββββββββββββββββββ
query = (
df_parsed
.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", CHECKPOINT_PATH)
.option("mergeSchema", "true")
.partitionBy("event_date", "event_hour")
.trigger(processingTime="30 seconds")
.start(BRONZE_PATH)
)
# βββ Wait and monitor βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
import time, json
for _ in range(20):
time.sleep(30)
if query.lastProgress:
p = query.lastProgress
print(
f"Batch {p['batchId']:4d} | "
f"Input: {p['numInputRows']:6,} rows | "
f"Throughput: {p['processedRowsPerSecond']:,.0f} rows/s"
)
if not query.isActive:
if query.exception():
raise query.exception()
break
query.awaitTermination()Key Takeaways
- Always use Structured Streaming, never DStreams.
- Set
checkpointLocationon every query β without it you have no fault tolerance. - Use watermarking for any aggregation over event time to bound state size.
AvailableNowtrigger replacesTrigger.Oncefor scheduled batch-streaming jobs.foreachBatchgives you full control over each micro-batch as a static DataFrame β ideal for upserts, DQ checks, and multi-sink writes.- With Delta Lake as your sink, you get schema evolution, time travel, and idempotent writes for free.
- Delta Live Tables (Databricks) abstracts all the plumbing into declarative
@dlt.tabledefinitions.
Enjoyed this article?
Explore the Data Engineering learning path for more.
Found this helpful?
Leave a comment
Have a question, correction, or just found this helpful? Leave a note below.