Back to blog
Data Engineeringintermediate

Delta Lake on Databricks: ACID Transactions, Time Travel & Medallion Architecture

A production-depth guide to Delta Lake — ACID guarantees, MERGE upserts, time travel, schema enforcement, table constraints, OPTIMIZE with Z-ORDER, VACUUM, and a complete Bronze → Silver → Gold pipeline.

LearnixoMay 7, 202614 min read
DatabricksDelta LakePySparkData EngineeringMedallion ArchitectureApache Spark
Share:𝕏

Why Delta Lake Instead of Plain Parquet?

Parquet is a columnar file format, not a storage layer with transactional semantics. When you have dozens of concurrent writers, streaming ingestion, and business queries running at the same time on cloud object storage, plain Parquet breaks down fast.

Delta Lake adds a transaction log (the _delta_log/ directory) on top of Parquet files. Every write operation — insert, update, delete, merge — is recorded as a JSON commit entry. That log is the source of truth. It gives you:

| Capability | Plain Parquet | Delta Lake | |---|---|---| | ACID transactions | No — concurrent writes corrupt data | Yes — serialisable isolation | | Schema enforcement | None | Configurable strict or merge mode | | Schema evolution | Manual file rewrites | mergeSchema option | | Time travel | None | Query any past version by version number or timestamp | | Streaming + batch unified | Two separate pipelines | One table for both | | UPDATE / DELETE / MERGE | Rewrite entire partition | Efficient delta operations | | Scalable metadata | Slow LIST calls on S3/ADLS | Centralized transaction log |

The transaction log also enables data skipping — Delta tracks min/max statistics per file for indexed columns, so queries that filter on those columns skip irrelevant files entirely.


Creating Delta Tables

There are three equivalent ways to create a Delta table. Each produces the same result — a table backed by Parquet files plus a _delta_log/.

DDL: CREATE TABLE … USING DELTA

SQL
-- Managed table  Databricks controls both metadata and data location
CREATE TABLE IF NOT EXISTS catalog.bronze.raw_events (
    event_id     STRING    NOT NULL,
    event_type   STRING    NOT NULL,
    user_id      STRING,
    payload      STRING,
    event_ts     TIMESTAMP NOT NULL,
    _ingested_at TIMESTAMP
)
USING DELTA
PARTITIONED BY (date_trunc('day', event_ts))
TBLPROPERTIES (
    'delta.logRetentionDuration'      = 'interval 30 days',
    'delta.dataSkippingNumIndexedCols' = '4'
);
SQL
-- External table  you control the data location (common for cloud lake paths)
CREATE TABLE IF NOT EXISTS catalog.bronze.raw_events
USING DELTA
LOCATION 'abfss://bronze@datalake.dfs.core.windows.net/raw_events'
TBLPROPERTIES ('delta.logRetentionDuration' = 'interval 30 days');

DataFrame API: save with format("delta")

Python
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp, col, lit

spark = SparkSession.builder.appName("BronzeIngestion").getOrCreate()

raw_df = spark.read.json("abfss://landing@datalake.dfs.core.windows.net/events/2026/05/07/")

# Tag with ingestion metadata before writing
enriched = (
    raw_df
    .withColumn("_ingested_at", current_timestamp())
    .withColumn("_source_file", col("_metadata.file_path"))
)

# Write as Delta, partitioned for efficient time-range queries
(
    enriched.write
    .format("delta")
    .mode("append")
    .partitionBy("event_date")        # pre-existing string column yyyy-MM-dd
    .option("mergeSchema", "false")   # strict: reject unexpected columns
    .save("abfss://bronze@datalake.dfs.core.windows.net/raw_events")
)

DataFrame API: saveAsTable

Python
# Writes data AND registers the table in Unity Catalog in one call
(
    enriched.write
    .format("delta")
    .mode("append")
    .partitionBy("event_date")
    .saveAsTable("catalog.bronze.raw_events")    # catalog.schema.table
)

saveAsTable is the most convenient for managed tables. save() is preferred when you need fine-grained control over the storage path (external tables).


MERGE INTO: Production Upserts

MERGE INTO is the most powerful Delta Lake operation. It atomically applies inserts, updates, and deletes in a single pass — avoiding the "read-rewrite-partition" anti-pattern that plagued plain Parquet pipelines.

Full MERGE Syntax

Python
from delta.tables import DeltaTable
from pyspark.sql.functions import current_timestamp

# Reference the target table
silver_customers = DeltaTable.forName(spark, "catalog.silver.customers")

# The incoming batch of changes (could be a stream micro-batch too)
updates_df = (
    spark.read
    .format("delta")
    .load("abfss://staging@datalake.dfs.core.windows.net/customer_changes")
)

(
    silver_customers.alias("target")
    .merge(
        updates_df.alias("source"),
        condition="target.customer_id = source.customer_id"
    )
    # Matched + source is a real update (not a delete marker)
    .whenMatchedUpdate(
        condition="source.record_type = 'UPDATE'",
        set={
            "name":          "source.name",
            "email":         "source.email",
            "phone":         "source.phone",
            "tier":          "source.tier",
            "updated_at":    "current_timestamp()",
            "row_hash":      "source.row_hash",
        }
    )
    # Matched + source is a soft-delete marker
    .whenMatchedUpdate(
        condition="source.record_type = 'DELETE'",
        set={
            "is_deleted":  "true",
            "deleted_at":  "current_timestamp()",
        }
    )
    # No existing record  insert it
    .whenNotMatchedInsert(
        condition="source.record_type != 'DELETE'",
        values={
            "customer_id":  "source.customer_id",
            "name":         "source.name",
            "email":        "source.email",
            "phone":        "source.phone",
            "tier":         "source.tier",
            "is_deleted":   "false",
            "created_at":   "current_timestamp()",
            "updated_at":   "current_timestamp()",
            "row_hash":     "source.row_hash",
        }
    )
    # Source records not in target are ignored (no whenNotMatchedBySource needed here)
    .execute()
)

MERGE with SQL

SQL
MERGE INTO catalog.silver.customers AS target
USING catalog.staging.customer_changes AS source
ON target.customer_id = source.customer_id

WHEN MATCHED AND source.record_type = 'UPDATE' THEN UPDATE SET
    target.name       = source.name,
    target.email      = source.email,
    target.tier       = source.tier,
    target.updated_at = current_timestamp()

WHEN MATCHED AND source.record_type = 'DELETE' THEN UPDATE SET
    target.is_deleted = true,
    target.deleted_at = current_timestamp()

WHEN NOT MATCHED AND source.record_type != 'DELETE' THEN INSERT (
    customer_id, name, email, phone, tier,
    is_deleted, created_at, updated_at
) VALUES (
    source.customer_id, source.name, source.email, source.phone, source.tier,
    false, current_timestamp(), current_timestamp()
);

Performance tip: Always filter updates_df to only the changed records before the MERGE. A MERGE that scans the entire source table on every run is the most common performance anti-pattern in Delta pipelines.


Time Travel: Querying Historical Data

Every commit to a Delta table creates a new version. You can query any version at any time — this is time travel.

VERSION AS OF and TIMESTAMP AS OF

Python
# Query by version number
df_v10 = (
    spark.read.format("delta")
    .option("versionAsOf", 10)
    .table("catalog.silver.customers")
)

# Query by timestamp
df_before_incident = (
    spark.read.format("delta")
    .option("timestampAsOf", "2026-05-06T14:00:00.000Z")
    .table("catalog.silver.customers")
)

# SQL equivalents
spark.sql("""
    SELECT * FROM catalog.silver.customers VERSION AS OF 10
""")

spark.sql("""
    SELECT * FROM catalog.silver.customers
    TIMESTAMP AS OF '2026-05-06T14:00:00'
""")

DESCRIBE HISTORY

Python
# Show the full audit trail for a table
history_df = spark.sql("DESCRIBE HISTORY catalog.silver.customers")
history_df.select(
    "version", "timestamp", "userName",
    "operation", "operationParameters"
).show(20, truncate=False)

Typical output columns:

version  timestamp            userName         operation   operationParameters
-------  -------------------  ---------------  ----------  ------------------------------------------
15       2026-05-07 09:12:01  etl@company.com  MERGE       {"predicate":"[...]","matchedPredicates":...}
14       2026-05-07 08:00:00  etl@company.com  WRITE       {"mode":"Append","partitionBy":"[...]"}
13       2026-05-06 22:00:00  etl@company.com  OPTIMIZE    {"predicate":"[]","zOrderBy":"[customer_id]"}

RESTORE TABLE: Emergency Rollback

Python
from delta.tables import DeltaTable

dt = DeltaTable.forName(spark, "catalog.silver.customers")

# Roll back to a specific version
dt.restoreToVersion(12)

# Roll back to a point in time
dt.restoreToTimestamp("2026-05-06T08:00:00.000Z")
SQL
-- SQL equivalents
RESTORE TABLE catalog.silver.customers TO VERSION AS OF 12;
RESTORE TABLE catalog.silver.customers TO TIMESTAMP AS OF '2026-05-06T08:00:00';

RESTORE creates a new commit that resets the table to the target state. It does not delete history — the original commits remain visible in DESCRIBE HISTORY.


Schema Enforcement and Evolution

Schema Enforcement: Rejecting Bad Writes

By default, Delta rejects writes whose schema does not match the registered table schema.

Python
# This will raise AnalysisException if `new_column` doesn't exist in the table
try:
    bad_df.write.format("delta").mode("append").saveAsTable("catalog.silver.customers")
except Exception as e:
    print(f"Schema enforcement blocked the write: {e}")
    # AnalysisException: A schema mismatch detected when writing to the Delta table
    # To enable schema migration using DataFrameWriter or DataStreamWriter, please set:
    # '.option("mergeSchema", "true")'

Schema Evolution: Merging New Columns

When a data source adds a new field, you want Delta to accept it and add the column rather than reject the write.

Python
# Allow new columns to be added automatically
(
    updated_df.write
    .format("delta")
    .mode("append")
    .option("mergeSchema", "true")   # new columns are added; existing columns are unchanged
    .saveAsTable("catalog.silver.customers")
)
SQL
-- Enable for the entire table via a table property
ALTER TABLE catalog.silver.customers
SET TBLPROPERTIES ('delta.columnMapping.mode' = 'name');

Rules:

  • mergeSchema=true can add new columns or widen types (e.g., INTLONG).
  • It cannot drop columns or narrow types. Use overwriteSchema=true only when you intend a full schema replacement (destructive).

Table Constraints

Constraints enforce data quality at the storage layer — violations are rejected before data lands.

NOT NULL Constraints

SQL
ALTER TABLE catalog.silver.orders
    ADD CONSTRAINT orders_id_not_null CHECK (order_id IS NOT NULL);

ALTER TABLE catalog.silver.orders
    ADD CONSTRAINT orders_total_positive CHECK (total > 0);

CHECK Constraints

SQL
ALTER TABLE catalog.silver.orders
    ADD CONSTRAINT valid_status
    CHECK (status IN ('pending', 'processing', 'shipped', 'completed', 'cancelled'));

ALTER TABLE catalog.silver.orders
    ADD CONSTRAINT created_before_updated
    CHECK (created_at <= updated_at);

Attempting to write a row that violates any constraint raises a DeltaInvariantViolationException — the entire batch is rejected, keeping the table clean.

Generated Columns

Generated columns are computed automatically from other columns — they're stored physically and can be used for partitioning.

SQL
CREATE TABLE catalog.silver.events (
    event_id   STRING NOT NULL,
    event_ts   TIMESTAMP NOT NULL,
    -- Generated column: Delta computes and stores this automatically
    event_date DATE GENERATED ALWAYS AS (CAST(event_ts AS DATE)),
    payload    STRING
)
USING DELTA
PARTITIONED BY (event_date);   -- partition on the generated column
Python
# When writing, omit event_date  Delta fills it in
from pyspark.sql.functions import current_timestamp

events_df.drop("event_date").write \
    .format("delta") \
    .mode("append") \
    .saveAsTable("catalog.silver.events")

Delta Table Properties

These TBLPROPERTIES control Delta behavior at the table level:

SQL
ALTER TABLE catalog.silver.orders SET TBLPROPERTIES (
    -- How long to retain the transaction log (default 30 days)
    'delta.logRetentionDuration'          = 'interval 60 days',

    -- How long to retain data files that are no longer in the latest snapshot
    -- (controls minimum safe retention for VACUUM  default 7 days)
    'delta.deletedFileRetentionDuration'  = 'interval 7 days',

    -- Number of columns to collect min/max stats for data skipping (default 32)
    'delta.dataSkippingNumIndexedCols'    = '6',

    -- Auto-optimize: compact small files on write (Databricks-specific)
    'delta.autoOptimize.optimizeWrite'    = 'true',

    -- Auto-optimize: automatically run OPTIMIZE after writes (Databricks-specific)
    'delta.autoOptimize.autoCompact'      = 'true'
);

OPTIMIZE and Z-ORDER

Over time, a Delta table accumulates many small files — each Spark task writes its own output file. OPTIMIZE compacts them into larger files (target ~1 GB each) and rebuilds the data layout for better query performance.

OPTIMIZE: Compaction

SQL
-- Compact all files in the table
OPTIMIZE catalog.silver.orders;

-- Compact only a specific partition
OPTIMIZE catalog.silver.orders
WHERE event_date = '2026-05-07';

Z-ORDER: Multi-Dimensional Clustering

Z-ORDER co-locates rows with similar values for the specified columns in the same files. Queries that filter on those columns skip far more files.

SQL
-- Compact AND re-order by customer_id and order_date together
OPTIMIZE catalog.silver.orders
ZORDER BY (customer_id, order_date);
Python
from delta.tables import DeltaTable

dt = DeltaTable.forName(spark, "catalog.silver.orders")
dt.optimize().executeZOrderBy("customer_id", "order_date")

Z-ORDER guidelines:

  • Use on high-cardinality columns that appear in WHERE or JOIN clauses.
  • Limit to 2–4 columns — diminishing returns beyond that.
  • Z-ORDER applies within a partition; partition first on date, Z-ORDER on IDs within each date.
  • Schedule OPTIMIZE + ZORDER nightly or after large batch loads.

VACUUM: Reclaiming Storage

VACUUM permanently deletes data files that are no longer part of any table version, freeing storage.

Python
from delta.tables import DeltaTable

dt = DeltaTable.forName(spark, "catalog.silver.orders")

# Default: retain files referenced by versions in the last 7 days
dt.vacuum()

# Shorten retention for a heavily-written table (never go below 7 days
# if concurrent readers might still be querying older versions)
dt.vacuum(retentionHours=168)   # 7 days = 168 hours

# DRY RUN: see what would be deleted without deleting it
dt.vacuum(retentionHours=168).dryRun()
SQL
-- SQL equivalents
VACUUM catalog.silver.orders RETAIN 168 HOURS;
VACUUM catalog.silver.orders RETAIN 168 HOURS DRY RUN;

Warning: Never set retentionHours below 168 hours (7 days) unless you have confirmed no active streaming queries or long-running reads against the table. Doing so can cause data loss for active readers.


Complete Medallion Architecture: Bronze → Silver → Gold

Here is a production-ready pipeline that wires together every concept above. Each layer builds on the previous one.

Python
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, current_timestamp, to_date, lit,
    regexp_replace, sha2, concat_ws, count, sum as _sum, avg
)
from pyspark.sql.types import DecimalType, TimestampType
from delta.tables import DeltaTable

spark = SparkSession.builder.appName("MedallionPipeline").getOrCreate()

# ---------------------------------------------------------------------------
# LAYER 1  BRONZE: Raw append, no transforms, capture ingestion metadata
# ---------------------------------------------------------------------------

def ingest_bronze(source_path: str, target_table: str) -> int:
    """
    Read raw JSON files from the landing zone and append them to the
    Bronze Delta table exactly as-is. Only ingestion metadata is added.
    """
    raw_df = spark.read.json(source_path)

    if raw_df.rdd.isEmpty():
        print(f"[Bronze] No new files at {source_path}, skipping.")
        return 0

    enriched = (
        raw_df
        .withColumn("_ingested_at", current_timestamp())
        .withColumn("_source_file", col("_metadata.file_path"))
        .withColumn("_batch_date",  to_date(current_timestamp()))
    )

    (
        enriched.write
        .format("delta")
        .mode("append")
        .option("mergeSchema", "false")   # strict: fail on unexpected columns
        .partitionBy("_batch_date")
        .saveAsTable(target_table)
    )

    record_count = enriched.count()
    print(f"[Bronze] Appended {record_count:,} rows to {target_table}")
    return record_count


# ---------------------------------------------------------------------------
# LAYER 2  SILVER: Clean, deduplicate, validate, MERGE upsert
# ---------------------------------------------------------------------------

def bronze_to_silver_orders() -> None:
    """
    Transform Bronze raw orders into a clean Silver table.
    Uses MERGE to apply incremental updates without reprocessing history.
    """
    # Read only the latest batch from Bronze (use _batch_date as watermark)
    latest_batch_date = spark.sql(
        "SELECT MAX(_batch_date) FROM catalog.bronze.raw_orders"
    ).collect()[0][0]

    bronze_batch = spark.sql(f"""
        SELECT *
        FROM catalog.bronze.raw_orders
        WHERE _batch_date = '{latest_batch_date}'
    """)

    # --- Cleaning ---
    clean = (
        bronze_batch
        # Drop duplicates on business key  keep last seen by event timestamp
        .dropDuplicates(["order_id"])
        # Reject null business keys
        .filter(col("order_id").isNotNull())
        .filter(col("customer_id").isNotNull())
        # Type coercions
        .withColumn("total",      col("total").cast(DecimalType(14, 2)))
        .withColumn("created_at", col("created_at").cast(TimestampType()))
        .withColumn("updated_at", col("updated_at").cast(TimestampType()))
        # Mask PII: keep first two chars of email, replace rest before @
        .withColumn("email_masked",
            regexp_replace(col("email"), r"(?<=.{2}).(?=.*@)", "*"))
        # Row hash for change detection
        .withColumn("row_hash",
            sha2(concat_ws("|", col("order_id"), col("total"), col("status")), 256))
    )

    # --- MERGE into Silver ---
    silver_table = DeltaTable.forName(spark, "catalog.silver.orders")

    (
        silver_table.alias("t")
        .merge(
            clean.alias("s"),
            "t.order_id = s.order_id"
        )
        .whenMatchedUpdate(
            condition="t.row_hash != s.row_hash",   # only update if something changed
            set={
                "status":       "s.status",
                "total":        "s.total",
                "email_masked": "s.email_masked",
                "updated_at":   "s.updated_at",
                "row_hash":     "s.row_hash",
                "_updated_at":  "current_timestamp()",
            }
        )
        .whenNotMatchedInsert(values={
            "order_id":     "s.order_id",
            "customer_id":  "s.customer_id",
            "status":       "s.status",
            "total":        "s.total",
            "email_masked": "s.email_masked",
            "created_at":   "s.created_at",
            "updated_at":   "s.updated_at",
            "row_hash":     "s.row_hash",
            "_inserted_at": "current_timestamp()",
            "_updated_at":  "current_timestamp()",
        })
        .execute()
    )

    print("[Silver] MERGE complete for catalog.silver.orders")


# ---------------------------------------------------------------------------
# LAYER 3  GOLD: Business aggregates, served to BI and ML consumers
# ---------------------------------------------------------------------------

def silver_to_gold_revenue() -> None:
    """
    Aggregate Silver orders into a Gold monthly revenue summary.
    Replaces the partition for the current month on each run (idempotent).
    """
    spark.sql("""
        INSERT OVERWRITE catalog.gold.monthly_revenue
        PARTITION (report_year, report_month)

        SELECT
            year(created_at)                AS report_year,
            month(created_at)               AS report_month,
            COUNT(*)                         AS order_count,
            COUNT(DISTINCT customer_id)      AS unique_customers,
            SUM(total)                       AS gross_revenue,
            AVG(total)                       AS avg_order_value,
            SUM(CASE WHEN status = 'completed' THEN total ELSE 0 END)
                                             AS net_revenue,
            COUNT(CASE WHEN status = 'cancelled' THEN 1 END)
                                             AS cancelled_count,
            current_timestamp()              AS _refreshed_at
        FROM catalog.silver.orders
        WHERE status IN ('completed', 'cancelled')
          AND created_at >= add_months(current_date(), -3)  -- rolling 3-month window
        GROUP BY 1, 2
    """)
    print("[Gold] monthly_revenue refreshed.")


def silver_to_gold_customer_ltv() -> None:
    """
    Build a customer lifetime value (LTV) Gold table.
    Used by ML feature pipelines and the BI dashboard.
    """
    spark.sql("""
        CREATE OR REPLACE TABLE catalog.gold.customer_ltv AS
        SELECT
            customer_id,
            COUNT(*)                     AS total_orders,
            SUM(total)                   AS lifetime_value,
            AVG(total)                   AS avg_order_value,
            MAX(created_at)              AS last_order_at,
            MIN(created_at)              AS first_order_at,
            datediff(MAX(created_at), MIN(created_at))
                                         AS customer_tenure_days,
            current_timestamp()          AS _refreshed_at
        FROM catalog.silver.orders
        WHERE status = 'completed'
          AND is_deleted = false
        GROUP BY customer_id
    """)
    print("[Gold] customer_ltv refreshed.")


# ---------------------------------------------------------------------------
# MAINTENANCE: OPTIMIZE + ZORDER + VACUUM
# ---------------------------------------------------------------------------

def run_table_maintenance(table_name: str, zorder_cols: list[str]) -> None:
    """
    Compact small files and apply Z-ORDER clustering for query acceleration.
    Run nightly after the ETL jobs complete.
    """
    dt = DeltaTable.forName(spark, table_name)

    print(f"[Maintenance] Running OPTIMIZE on {table_name} ...")
    dt.optimize().executeZOrderBy(*zorder_cols)

    print(f"[Maintenance] Running VACUUM on {table_name} (retaining 7 days) ...")
    dt.vacuum(retentionHours=168)

    print(f"[Maintenance] Done for {table_name}")


# ---------------------------------------------------------------------------
# ENTRY POINT: Full pipeline run
# ---------------------------------------------------------------------------

if __name__ == "__main__":
    # Bronze ingestion
    ingest_bronze(
        source_path="abfss://landing@datalake.dfs.core.windows.net/orders/2026/05/07/",
        target_table="catalog.bronze.raw_orders"
    )

    # Silver refinement
    bronze_to_silver_orders()

    # Gold aggregation
    silver_to_gold_revenue()
    silver_to_gold_customer_ltv()

    # Nightly maintenance
    run_table_maintenance("catalog.silver.orders",  zorder_cols=["customer_id", "created_at"])
    run_table_maintenance("catalog.bronze.raw_orders", zorder_cols=["order_id"])

Verifying the Pipeline

Python
# Check row counts at each layer
for layer, table in [
    ("Bronze", "catalog.bronze.raw_orders"),
    ("Silver", "catalog.silver.orders"),
    ("Gold",   "catalog.gold.monthly_revenue"),
]:
    count = spark.table(table).count()
    print(f"{layer}: {table} — {count:,} rows")

# Check table history for Silver
spark.sql("DESCRIBE HISTORY catalog.silver.orders") \
    .select("version", "timestamp", "operation", "operationMetrics") \
    .show(5, truncate=False)

# Spot-check time travel: compare today's Silver against yesterday
current  = spark.table("catalog.silver.orders").count()
previous = (
    spark.read.format("delta")
    .option("timestampAsOf", "2026-05-06")
    .table("catalog.silver.orders")
    .count()
)
print(f"Row delta since yesterday: {current - previous:+,}")

Key Takeaways

  • Delta Lake's transaction log is what separates it from plain Parquet — every operation is atomic and auditable.
  • Use MERGE INTO for upserts; filter the source to only changed records before merging.
  • DESCRIBE HISTORY and VERSION AS OF give you a full audit trail and easy rollback — use them before and after risky operations.
  • Schema enforcement is on by default; opt into mergeSchema only deliberately.
  • CHECK constraints and NOT NULL constraints push data quality guarantees into the storage layer.
  • Run OPTIMIZE … ZORDER BY nightly on high-query tables; choose Z-ORDER columns based on your most common filter predicates.
  • Never vacuum below 7 days; set logRetentionDuration to at least 30 days in production.

Related: Databricks PySpark Advanced — Delta Live Tables, Auto Loader, and streaming
Related: Databricks MLflow and Unity Catalog — ML workflows, governance, and model serving

Enjoyed this article?

Explore the Data Engineering learning path for more.

Found this helpful?

Share:𝕏

Leave a comment

Have a question, correction, or just found this helpful? Leave a note below.