Delta Lake on Databricks: ACID Transactions, Time Travel & Medallion Architecture
A production-depth guide to Delta Lake — ACID guarantees, MERGE upserts, time travel, schema enforcement, table constraints, OPTIMIZE with Z-ORDER, VACUUM, and a complete Bronze → Silver → Gold pipeline.
Why Delta Lake Instead of Plain Parquet?
Parquet is a columnar file format, not a storage layer with transactional semantics. When you have dozens of concurrent writers, streaming ingestion, and business queries running at the same time on cloud object storage, plain Parquet breaks down fast.
Delta Lake adds a transaction log (the _delta_log/ directory) on top of Parquet files. Every write operation — insert, update, delete, merge — is recorded as a JSON commit entry. That log is the source of truth. It gives you:
| Capability | Plain Parquet | Delta Lake |
|---|---|---|
| ACID transactions | No — concurrent writes corrupt data | Yes — serialisable isolation |
| Schema enforcement | None | Configurable strict or merge mode |
| Schema evolution | Manual file rewrites | mergeSchema option |
| Time travel | None | Query any past version by version number or timestamp |
| Streaming + batch unified | Two separate pipelines | One table for both |
| UPDATE / DELETE / MERGE | Rewrite entire partition | Efficient delta operations |
| Scalable metadata | Slow LIST calls on S3/ADLS | Centralized transaction log |
The transaction log also enables data skipping — Delta tracks min/max statistics per file for indexed columns, so queries that filter on those columns skip irrelevant files entirely.
Creating Delta Tables
There are three equivalent ways to create a Delta table. Each produces the same result — a table backed by Parquet files plus a _delta_log/.
DDL: CREATE TABLE … USING DELTA
-- Managed table — Databricks controls both metadata and data location
CREATE TABLE IF NOT EXISTS catalog.bronze.raw_events (
event_id STRING NOT NULL,
event_type STRING NOT NULL,
user_id STRING,
payload STRING,
event_ts TIMESTAMP NOT NULL,
_ingested_at TIMESTAMP
)
USING DELTA
PARTITIONED BY (date_trunc('day', event_ts))
TBLPROPERTIES (
'delta.logRetentionDuration' = 'interval 30 days',
'delta.dataSkippingNumIndexedCols' = '4'
);-- External table — you control the data location (common for cloud lake paths)
CREATE TABLE IF NOT EXISTS catalog.bronze.raw_events
USING DELTA
LOCATION 'abfss://bronze@datalake.dfs.core.windows.net/raw_events'
TBLPROPERTIES ('delta.logRetentionDuration' = 'interval 30 days');DataFrame API: save with format("delta")
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp, col, lit
spark = SparkSession.builder.appName("BronzeIngestion").getOrCreate()
raw_df = spark.read.json("abfss://landing@datalake.dfs.core.windows.net/events/2026/05/07/")
# Tag with ingestion metadata before writing
enriched = (
raw_df
.withColumn("_ingested_at", current_timestamp())
.withColumn("_source_file", col("_metadata.file_path"))
)
# Write as Delta, partitioned for efficient time-range queries
(
enriched.write
.format("delta")
.mode("append")
.partitionBy("event_date") # pre-existing string column yyyy-MM-dd
.option("mergeSchema", "false") # strict: reject unexpected columns
.save("abfss://bronze@datalake.dfs.core.windows.net/raw_events")
)DataFrame API: saveAsTable
# Writes data AND registers the table in Unity Catalog in one call
(
enriched.write
.format("delta")
.mode("append")
.partitionBy("event_date")
.saveAsTable("catalog.bronze.raw_events") # catalog.schema.table
)saveAsTable is the most convenient for managed tables. save() is preferred when you need fine-grained control over the storage path (external tables).
MERGE INTO: Production Upserts
MERGE INTO is the most powerful Delta Lake operation. It atomically applies inserts, updates, and deletes in a single pass — avoiding the "read-rewrite-partition" anti-pattern that plagued plain Parquet pipelines.
Full MERGE Syntax
from delta.tables import DeltaTable
from pyspark.sql.functions import current_timestamp
# Reference the target table
silver_customers = DeltaTable.forName(spark, "catalog.silver.customers")
# The incoming batch of changes (could be a stream micro-batch too)
updates_df = (
spark.read
.format("delta")
.load("abfss://staging@datalake.dfs.core.windows.net/customer_changes")
)
(
silver_customers.alias("target")
.merge(
updates_df.alias("source"),
condition="target.customer_id = source.customer_id"
)
# Matched + source is a real update (not a delete marker)
.whenMatchedUpdate(
condition="source.record_type = 'UPDATE'",
set={
"name": "source.name",
"email": "source.email",
"phone": "source.phone",
"tier": "source.tier",
"updated_at": "current_timestamp()",
"row_hash": "source.row_hash",
}
)
# Matched + source is a soft-delete marker
.whenMatchedUpdate(
condition="source.record_type = 'DELETE'",
set={
"is_deleted": "true",
"deleted_at": "current_timestamp()",
}
)
# No existing record — insert it
.whenNotMatchedInsert(
condition="source.record_type != 'DELETE'",
values={
"customer_id": "source.customer_id",
"name": "source.name",
"email": "source.email",
"phone": "source.phone",
"tier": "source.tier",
"is_deleted": "false",
"created_at": "current_timestamp()",
"updated_at": "current_timestamp()",
"row_hash": "source.row_hash",
}
)
# Source records not in target are ignored (no whenNotMatchedBySource needed here)
.execute()
)MERGE with SQL
MERGE INTO catalog.silver.customers AS target
USING catalog.staging.customer_changes AS source
ON target.customer_id = source.customer_id
WHEN MATCHED AND source.record_type = 'UPDATE' THEN UPDATE SET
target.name = source.name,
target.email = source.email,
target.tier = source.tier,
target.updated_at = current_timestamp()
WHEN MATCHED AND source.record_type = 'DELETE' THEN UPDATE SET
target.is_deleted = true,
target.deleted_at = current_timestamp()
WHEN NOT MATCHED AND source.record_type != 'DELETE' THEN INSERT (
customer_id, name, email, phone, tier,
is_deleted, created_at, updated_at
) VALUES (
source.customer_id, source.name, source.email, source.phone, source.tier,
false, current_timestamp(), current_timestamp()
);Performance tip: Always filter updates_df to only the changed records before the MERGE. A MERGE that scans the entire source table on every run is the most common performance anti-pattern in Delta pipelines.
Time Travel: Querying Historical Data
Every commit to a Delta table creates a new version. You can query any version at any time — this is time travel.
VERSION AS OF and TIMESTAMP AS OF
# Query by version number
df_v10 = (
spark.read.format("delta")
.option("versionAsOf", 10)
.table("catalog.silver.customers")
)
# Query by timestamp
df_before_incident = (
spark.read.format("delta")
.option("timestampAsOf", "2026-05-06T14:00:00.000Z")
.table("catalog.silver.customers")
)
# SQL equivalents
spark.sql("""
SELECT * FROM catalog.silver.customers VERSION AS OF 10
""")
spark.sql("""
SELECT * FROM catalog.silver.customers
TIMESTAMP AS OF '2026-05-06T14:00:00'
""")DESCRIBE HISTORY
# Show the full audit trail for a table
history_df = spark.sql("DESCRIBE HISTORY catalog.silver.customers")
history_df.select(
"version", "timestamp", "userName",
"operation", "operationParameters"
).show(20, truncate=False)Typical output columns:
version timestamp userName operation operationParameters
------- ------------------- --------------- ---------- ------------------------------------------
15 2026-05-07 09:12:01 etl@company.com MERGE {"predicate":"[...]","matchedPredicates":...}
14 2026-05-07 08:00:00 etl@company.com WRITE {"mode":"Append","partitionBy":"[...]"}
13 2026-05-06 22:00:00 etl@company.com OPTIMIZE {"predicate":"[]","zOrderBy":"[customer_id]"}RESTORE TABLE: Emergency Rollback
from delta.tables import DeltaTable
dt = DeltaTable.forName(spark, "catalog.silver.customers")
# Roll back to a specific version
dt.restoreToVersion(12)
# Roll back to a point in time
dt.restoreToTimestamp("2026-05-06T08:00:00.000Z")-- SQL equivalents
RESTORE TABLE catalog.silver.customers TO VERSION AS OF 12;
RESTORE TABLE catalog.silver.customers TO TIMESTAMP AS OF '2026-05-06T08:00:00';RESTORE creates a new commit that resets the table to the target state. It does not delete history — the original commits remain visible in DESCRIBE HISTORY.
Schema Enforcement and Evolution
Schema Enforcement: Rejecting Bad Writes
By default, Delta rejects writes whose schema does not match the registered table schema.
# This will raise AnalysisException if `new_column` doesn't exist in the table
try:
bad_df.write.format("delta").mode("append").saveAsTable("catalog.silver.customers")
except Exception as e:
print(f"Schema enforcement blocked the write: {e}")
# AnalysisException: A schema mismatch detected when writing to the Delta table
# To enable schema migration using DataFrameWriter or DataStreamWriter, please set:
# '.option("mergeSchema", "true")'Schema Evolution: Merging New Columns
When a data source adds a new field, you want Delta to accept it and add the column rather than reject the write.
# Allow new columns to be added automatically
(
updated_df.write
.format("delta")
.mode("append")
.option("mergeSchema", "true") # new columns are added; existing columns are unchanged
.saveAsTable("catalog.silver.customers")
)-- Enable for the entire table via a table property
ALTER TABLE catalog.silver.customers
SET TBLPROPERTIES ('delta.columnMapping.mode' = 'name');Rules:
mergeSchema=truecan add new columns or widen types (e.g.,INT→LONG).- It cannot drop columns or narrow types. Use
overwriteSchema=trueonly when you intend a full schema replacement (destructive).
Table Constraints
Constraints enforce data quality at the storage layer — violations are rejected before data lands.
NOT NULL Constraints
ALTER TABLE catalog.silver.orders
ADD CONSTRAINT orders_id_not_null CHECK (order_id IS NOT NULL);
ALTER TABLE catalog.silver.orders
ADD CONSTRAINT orders_total_positive CHECK (total > 0);CHECK Constraints
ALTER TABLE catalog.silver.orders
ADD CONSTRAINT valid_status
CHECK (status IN ('pending', 'processing', 'shipped', 'completed', 'cancelled'));
ALTER TABLE catalog.silver.orders
ADD CONSTRAINT created_before_updated
CHECK (created_at <= updated_at);Attempting to write a row that violates any constraint raises a DeltaInvariantViolationException — the entire batch is rejected, keeping the table clean.
Generated Columns
Generated columns are computed automatically from other columns — they're stored physically and can be used for partitioning.
CREATE TABLE catalog.silver.events (
event_id STRING NOT NULL,
event_ts TIMESTAMP NOT NULL,
-- Generated column: Delta computes and stores this automatically
event_date DATE GENERATED ALWAYS AS (CAST(event_ts AS DATE)),
payload STRING
)
USING DELTA
PARTITIONED BY (event_date); -- partition on the generated column# When writing, omit event_date — Delta fills it in
from pyspark.sql.functions import current_timestamp
events_df.drop("event_date").write \
.format("delta") \
.mode("append") \
.saveAsTable("catalog.silver.events")Delta Table Properties
These TBLPROPERTIES control Delta behavior at the table level:
ALTER TABLE catalog.silver.orders SET TBLPROPERTIES (
-- How long to retain the transaction log (default 30 days)
'delta.logRetentionDuration' = 'interval 60 days',
-- How long to retain data files that are no longer in the latest snapshot
-- (controls minimum safe retention for VACUUM — default 7 days)
'delta.deletedFileRetentionDuration' = 'interval 7 days',
-- Number of columns to collect min/max stats for data skipping (default 32)
'delta.dataSkippingNumIndexedCols' = '6',
-- Auto-optimize: compact small files on write (Databricks-specific)
'delta.autoOptimize.optimizeWrite' = 'true',
-- Auto-optimize: automatically run OPTIMIZE after writes (Databricks-specific)
'delta.autoOptimize.autoCompact' = 'true'
);OPTIMIZE and Z-ORDER
Over time, a Delta table accumulates many small files — each Spark task writes its own output file. OPTIMIZE compacts them into larger files (target ~1 GB each) and rebuilds the data layout for better query performance.
OPTIMIZE: Compaction
-- Compact all files in the table
OPTIMIZE catalog.silver.orders;
-- Compact only a specific partition
OPTIMIZE catalog.silver.orders
WHERE event_date = '2026-05-07';Z-ORDER: Multi-Dimensional Clustering
Z-ORDER co-locates rows with similar values for the specified columns in the same files. Queries that filter on those columns skip far more files.
-- Compact AND re-order by customer_id and order_date together
OPTIMIZE catalog.silver.orders
ZORDER BY (customer_id, order_date);from delta.tables import DeltaTable
dt = DeltaTable.forName(spark, "catalog.silver.orders")
dt.optimize().executeZOrderBy("customer_id", "order_date")Z-ORDER guidelines:
- Use on high-cardinality columns that appear in
WHEREorJOINclauses. - Limit to 2–4 columns — diminishing returns beyond that.
- Z-ORDER applies within a partition; partition first on date, Z-ORDER on IDs within each date.
- Schedule OPTIMIZE + ZORDER nightly or after large batch loads.
VACUUM: Reclaiming Storage
VACUUM permanently deletes data files that are no longer part of any table version, freeing storage.
from delta.tables import DeltaTable
dt = DeltaTable.forName(spark, "catalog.silver.orders")
# Default: retain files referenced by versions in the last 7 days
dt.vacuum()
# Shorten retention for a heavily-written table (never go below 7 days
# if concurrent readers might still be querying older versions)
dt.vacuum(retentionHours=168) # 7 days = 168 hours
# DRY RUN: see what would be deleted without deleting it
dt.vacuum(retentionHours=168).dryRun()-- SQL equivalents
VACUUM catalog.silver.orders RETAIN 168 HOURS;
VACUUM catalog.silver.orders RETAIN 168 HOURS DRY RUN;Warning: Never set retentionHours below 168 hours (7 days) unless you have confirmed no active streaming queries or long-running reads against the table. Doing so can cause data loss for active readers.
Complete Medallion Architecture: Bronze → Silver → Gold
Here is a production-ready pipeline that wires together every concept above. Each layer builds on the previous one.
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
col, current_timestamp, to_date, lit,
regexp_replace, sha2, concat_ws, count, sum as _sum, avg
)
from pyspark.sql.types import DecimalType, TimestampType
from delta.tables import DeltaTable
spark = SparkSession.builder.appName("MedallionPipeline").getOrCreate()
# ---------------------------------------------------------------------------
# LAYER 1 — BRONZE: Raw append, no transforms, capture ingestion metadata
# ---------------------------------------------------------------------------
def ingest_bronze(source_path: str, target_table: str) -> int:
"""
Read raw JSON files from the landing zone and append them to the
Bronze Delta table exactly as-is. Only ingestion metadata is added.
"""
raw_df = spark.read.json(source_path)
if raw_df.rdd.isEmpty():
print(f"[Bronze] No new files at {source_path}, skipping.")
return 0
enriched = (
raw_df
.withColumn("_ingested_at", current_timestamp())
.withColumn("_source_file", col("_metadata.file_path"))
.withColumn("_batch_date", to_date(current_timestamp()))
)
(
enriched.write
.format("delta")
.mode("append")
.option("mergeSchema", "false") # strict: fail on unexpected columns
.partitionBy("_batch_date")
.saveAsTable(target_table)
)
record_count = enriched.count()
print(f"[Bronze] Appended {record_count:,} rows to {target_table}")
return record_count
# ---------------------------------------------------------------------------
# LAYER 2 — SILVER: Clean, deduplicate, validate, MERGE upsert
# ---------------------------------------------------------------------------
def bronze_to_silver_orders() -> None:
"""
Transform Bronze raw orders into a clean Silver table.
Uses MERGE to apply incremental updates without reprocessing history.
"""
# Read only the latest batch from Bronze (use _batch_date as watermark)
latest_batch_date = spark.sql(
"SELECT MAX(_batch_date) FROM catalog.bronze.raw_orders"
).collect()[0][0]
bronze_batch = spark.sql(f"""
SELECT *
FROM catalog.bronze.raw_orders
WHERE _batch_date = '{latest_batch_date}'
""")
# --- Cleaning ---
clean = (
bronze_batch
# Drop duplicates on business key — keep last seen by event timestamp
.dropDuplicates(["order_id"])
# Reject null business keys
.filter(col("order_id").isNotNull())
.filter(col("customer_id").isNotNull())
# Type coercions
.withColumn("total", col("total").cast(DecimalType(14, 2)))
.withColumn("created_at", col("created_at").cast(TimestampType()))
.withColumn("updated_at", col("updated_at").cast(TimestampType()))
# Mask PII: keep first two chars of email, replace rest before @
.withColumn("email_masked",
regexp_replace(col("email"), r"(?<=.{2}).(?=.*@)", "*"))
# Row hash for change detection
.withColumn("row_hash",
sha2(concat_ws("|", col("order_id"), col("total"), col("status")), 256))
)
# --- MERGE into Silver ---
silver_table = DeltaTable.forName(spark, "catalog.silver.orders")
(
silver_table.alias("t")
.merge(
clean.alias("s"),
"t.order_id = s.order_id"
)
.whenMatchedUpdate(
condition="t.row_hash != s.row_hash", # only update if something changed
set={
"status": "s.status",
"total": "s.total",
"email_masked": "s.email_masked",
"updated_at": "s.updated_at",
"row_hash": "s.row_hash",
"_updated_at": "current_timestamp()",
}
)
.whenNotMatchedInsert(values={
"order_id": "s.order_id",
"customer_id": "s.customer_id",
"status": "s.status",
"total": "s.total",
"email_masked": "s.email_masked",
"created_at": "s.created_at",
"updated_at": "s.updated_at",
"row_hash": "s.row_hash",
"_inserted_at": "current_timestamp()",
"_updated_at": "current_timestamp()",
})
.execute()
)
print("[Silver] MERGE complete for catalog.silver.orders")
# ---------------------------------------------------------------------------
# LAYER 3 — GOLD: Business aggregates, served to BI and ML consumers
# ---------------------------------------------------------------------------
def silver_to_gold_revenue() -> None:
"""
Aggregate Silver orders into a Gold monthly revenue summary.
Replaces the partition for the current month on each run (idempotent).
"""
spark.sql("""
INSERT OVERWRITE catalog.gold.monthly_revenue
PARTITION (report_year, report_month)
SELECT
year(created_at) AS report_year,
month(created_at) AS report_month,
COUNT(*) AS order_count,
COUNT(DISTINCT customer_id) AS unique_customers,
SUM(total) AS gross_revenue,
AVG(total) AS avg_order_value,
SUM(CASE WHEN status = 'completed' THEN total ELSE 0 END)
AS net_revenue,
COUNT(CASE WHEN status = 'cancelled' THEN 1 END)
AS cancelled_count,
current_timestamp() AS _refreshed_at
FROM catalog.silver.orders
WHERE status IN ('completed', 'cancelled')
AND created_at >= add_months(current_date(), -3) -- rolling 3-month window
GROUP BY 1, 2
""")
print("[Gold] monthly_revenue refreshed.")
def silver_to_gold_customer_ltv() -> None:
"""
Build a customer lifetime value (LTV) Gold table.
Used by ML feature pipelines and the BI dashboard.
"""
spark.sql("""
CREATE OR REPLACE TABLE catalog.gold.customer_ltv AS
SELECT
customer_id,
COUNT(*) AS total_orders,
SUM(total) AS lifetime_value,
AVG(total) AS avg_order_value,
MAX(created_at) AS last_order_at,
MIN(created_at) AS first_order_at,
datediff(MAX(created_at), MIN(created_at))
AS customer_tenure_days,
current_timestamp() AS _refreshed_at
FROM catalog.silver.orders
WHERE status = 'completed'
AND is_deleted = false
GROUP BY customer_id
""")
print("[Gold] customer_ltv refreshed.")
# ---------------------------------------------------------------------------
# MAINTENANCE: OPTIMIZE + ZORDER + VACUUM
# ---------------------------------------------------------------------------
def run_table_maintenance(table_name: str, zorder_cols: list[str]) -> None:
"""
Compact small files and apply Z-ORDER clustering for query acceleration.
Run nightly after the ETL jobs complete.
"""
dt = DeltaTable.forName(spark, table_name)
print(f"[Maintenance] Running OPTIMIZE on {table_name} ...")
dt.optimize().executeZOrderBy(*zorder_cols)
print(f"[Maintenance] Running VACUUM on {table_name} (retaining 7 days) ...")
dt.vacuum(retentionHours=168)
print(f"[Maintenance] Done for {table_name}")
# ---------------------------------------------------------------------------
# ENTRY POINT: Full pipeline run
# ---------------------------------------------------------------------------
if __name__ == "__main__":
# Bronze ingestion
ingest_bronze(
source_path="abfss://landing@datalake.dfs.core.windows.net/orders/2026/05/07/",
target_table="catalog.bronze.raw_orders"
)
# Silver refinement
bronze_to_silver_orders()
# Gold aggregation
silver_to_gold_revenue()
silver_to_gold_customer_ltv()
# Nightly maintenance
run_table_maintenance("catalog.silver.orders", zorder_cols=["customer_id", "created_at"])
run_table_maintenance("catalog.bronze.raw_orders", zorder_cols=["order_id"])Verifying the Pipeline
# Check row counts at each layer
for layer, table in [
("Bronze", "catalog.bronze.raw_orders"),
("Silver", "catalog.silver.orders"),
("Gold", "catalog.gold.monthly_revenue"),
]:
count = spark.table(table).count()
print(f"{layer}: {table} — {count:,} rows")
# Check table history for Silver
spark.sql("DESCRIBE HISTORY catalog.silver.orders") \
.select("version", "timestamp", "operation", "operationMetrics") \
.show(5, truncate=False)
# Spot-check time travel: compare today's Silver against yesterday
current = spark.table("catalog.silver.orders").count()
previous = (
spark.read.format("delta")
.option("timestampAsOf", "2026-05-06")
.table("catalog.silver.orders")
.count()
)
print(f"Row delta since yesterday: {current - previous:+,}")Key Takeaways
- Delta Lake's transaction log is what separates it from plain Parquet — every operation is atomic and auditable.
- Use
MERGE INTOfor upserts; filter the source to only changed records before merging. DESCRIBE HISTORYandVERSION AS OFgive you a full audit trail and easy rollback — use them before and after risky operations.- Schema enforcement is on by default; opt into
mergeSchemaonly deliberately. CHECKconstraints andNOT NULLconstraints push data quality guarantees into the storage layer.- Run
OPTIMIZE … ZORDER BYnightly on high-query tables; choose Z-ORDER columns based on your most common filter predicates. - Never vacuum below 7 days; set
logRetentionDurationto at least 30 days in production.
Related: Databricks PySpark Advanced — Delta Live Tables, Auto Loader, and streaming
Related: Databricks MLflow and Unity Catalog — ML workflows, governance, and model serving
Enjoyed this article?
Explore the Data Engineering learning path for more.
Found this helpful?
Leave a comment
Have a question, correction, or just found this helpful? Leave a note below.