Back to blog
AI Systemsintermediate

Databricks — Delta Lake, PySpark & ML Workflows

Production Databricks guide — Delta Lake architecture, PySpark at scale, structured streaming, Unity Catalog, MLflow integration, Feature Store, and Model Serving. With Python examples throughout.

SystemForgeApril 18, 20267 min read
DatabricksDelta LakePySparkApache SparkMLflowUnity CatalogData EngineeringMLOps
Share:𝕏

Databricks is the unified data and AI platform built on Apache Spark. It combines data engineering (Delta Lake, structured streaming), data science (notebooks, MLflow), and ML serving under one platform — running on Azure, AWS, or GCP.


Architecture Overview

┌──────────────────────────────────────────────────────────────┐
│                    Databricks Platform                        │
│                                                              │
│  Notebooks          SQL Warehouse        Jobs                │
│  (Python/R/SQL)     (serverless SQL)     (scheduled)         │
│                                                              │
│  ┌──────────────────────────────────────────────────────┐   │
│  │               Unity Catalog (Governance)              │   │
│  │  catalog.schema.table  — access control, lineage      │   │
│  └──────────────────────────────────────────────────────┘   │
│                                                              │
│  ┌──────────────────────────────────────────────────────┐   │
│  │               Delta Lake (Storage Layer)              │   │
│  │  ACID transactions, time travel, schema enforcement   │   │
│  └──────────────────────────────────────────────────────┘   │
│                                                              │
│  ┌──────────────────────────────────────────────────────┐   │
│  │                  Apache Spark                         │   │
│  │  Cluster (driver + workers), auto-scaling             │   │
│  └──────────────────────────────────────────────────────┘   │
│                                                              │
│  Cloud storage: Azure Data Lake / AWS S3 / GCS               │
└──────────────────────────────────────────────────────────────┘

Delta Lake — The Storage Foundation

Delta Lake is an open-source storage layer that adds ACID transactions, versioning, and schema enforcement to Parquet files on cloud object storage.

Why Delta Lake Over Plain Parquet

| | Plain Parquet | Delta Lake | |--|--------------|-----------| | ACID transactions | No | Yes | | Schema evolution | Manual | Automatic with enforcement | | Time travel | No | Yes (query any past version) | | Streaming + batch | Separate | Unified | | DELETE / UPDATE / MERGE | Rewrite entire partition | Efficient delta operations | | Concurrent writes | Data corruption risk | Serialisable isolation |

Creating and Writing Delta Tables

Python
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, current_timestamp
from delta.tables import DeltaTable

spark = SparkSession.builder.appName("DataPipeline").getOrCreate()

# Create Delta table from DataFrame
df = spark.read.json("/mnt/raw/orders/*.json")

df.write.format("delta") \
    .mode("overwrite") \
    .partitionBy("year", "month") \
    .save("/mnt/bronze/orders")

# Register as a table in Unity Catalog
spark.sql("""
    CREATE TABLE IF NOT EXISTS catalog.bronze.orders
    USING DELTA
    LOCATION '/mnt/bronze/orders'
""")

MERGE (Upsert) — The Workhorse

Python
# Upsert: update existing records, insert new ones
delta_table = DeltaTable.forName(spark, "catalog.silver.customers")

updates_df = spark.read.format("delta").load("/mnt/staging/customer_updates")

delta_table.alias("target").merge(
    updates_df.alias("source"),
    "target.customer_id = source.customer_id"
).whenMatchedUpdate(set={
    "name":       "source.name",
    "email":      "source.email",
    "updated_at": current_timestamp()
}).whenNotMatchedInsert(values={
    "customer_id": "source.customer_id",
    "name":        "source.name",
    "email":       "source.email",
    "created_at":  current_timestamp(),
    "updated_at":  current_timestamp()
}).execute()

Time Travel — Querying Historical Data

Python
# Query data as of a specific version
df_v3 = spark.read.format("delta") \
    .option("versionAsOf", 3) \
    .table("catalog.silver.customers")

# Query data as of a specific timestamp
df_yesterday = spark.read.format("delta") \
    .option("timestampAsOf", "2026-04-17") \
    .table("catalog.silver.customers")

# View table history
spark.sql("DESCRIBE HISTORY catalog.silver.customers").show(10)
# Shows: version, timestamp, userId, operation, operationParameters

# Restore to a previous version (emergency rollback)
delta_table.restoreToVersion(5)
# or: delta_table.restoreToTimestamp("2026-04-17T12:00:00")

# Vacuum: remove old files (keeps 7 days by default)
delta_table.vacuum(retentionHours=168)

Medallion Architecture: Bronze → Silver → Gold

The standard Databricks data organisation pattern:

Raw Sources                Bronze Layer           Silver Layer          Gold Layer
────────────────           ─────────────          ─────────────         ─────────────
Kafka events      →        Raw, as-is             Cleaned,              Business-ready
S3 files                   No transforms          deduped,              aggregated
Database CDC               Schema enforced        validated             views
APIs                       All history kept       PII masked            served to BI/ML
Python
# Bronze: ingest raw, append-only
def ingest_to_bronze(source_path: str, table: str):
    df = spark.read.json(source_path)
    df = df.withColumn("_ingested_at", current_timestamp()) \
           .withColumn("_source_file", col("_metadata.file_path"))
    df.write.format("delta").mode("append").saveAsTable(f"catalog.bronze.{table}")

# Silver: clean and conform
def bronze_to_silver():
    df = spark.table("catalog.bronze.orders")

    silver_df = df \
        .dropDuplicates(["order_id"]) \
        .filter(col("order_id").isNotNull()) \
        .withColumn("total", col("total").cast("decimal(12,2)")) \
        .withColumn("created_at", col("created_at").cast("timestamp"))

    # Apply masking for PII
    silver_df = silver_df.withColumn("email",
        regexp_replace(col("email"), r"(?<=.{2}).(?=.*@)", "*"))

    silver_df.write.format("delta").mode("overwrite") \
        .option("overwriteSchema", "true") \
        .saveAsTable("catalog.silver.orders")

# Gold: business aggregations
def silver_to_gold():
    spark.sql("""
        CREATE OR REPLACE TABLE catalog.gold.monthly_revenue AS
        SELECT
            year(created_at)  AS year,
            month(created_at) AS month,
            COUNT(*)           AS order_count,
            SUM(total)         AS revenue,
            AVG(total)         AS avg_order_value
        FROM catalog.silver.orders
        WHERE status = 'completed'
        GROUP BY 1, 2
        ORDER BY 1, 2
    """)

Structured Streaming — Real-Time Pipelines

Databricks Structured Streaming processes data continuously as it arrives — Kafka, Event Hubs, cloud storage, or Delta tables.

Python
# Read from Azure Event Hubs (Kafka-compatible)
stream_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "eventhub.servicebus.windows.net:9093") \
    .option("subscribe", "order-events") \
    .option("kafka.security.protocol", "SASL_SSL") \
    .load()

from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

order_schema = StructType([
    StructField("order_id",    StringType()),
    StructField("customer_id", StringType()),
    StructField("total",       DoubleType()),
    StructField("created_at",  TimestampType()),
])

parsed = stream_df \
    .select(from_json(col("value").cast("string"), order_schema).alias("order")) \
    .select("order.*")

# Write to Delta (append) with checkpointing for exactly-once
query = parsed.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/mnt/checkpoints/orders") \
    .trigger(processingTime="10 seconds") \
    .toTable("catalog.bronze.orders_stream")

query.awaitTermination()

PySpark Essentials

DataFrames vs SQL

PySpark has two equivalent APIs — use whichever is more readable:

Python
# DataFrame API
result = spark.table("catalog.silver.orders") \
    .filter(col("status") == "completed") \
    .groupBy("customer_id") \
    .agg(
        count("*").alias("order_count"),
        sum("total").alias("lifetime_value"),
        max("created_at").alias("last_order_at")
    ) \
    .filter(col("lifetime_value") > 1000) \
    .orderBy(col("lifetime_value").desc())

# SQL API (same result)
result = spark.sql("""
    SELECT customer_id,
           COUNT(*)    AS order_count,
           SUM(total)  AS lifetime_value,
           MAX(created_at) AS last_order_at
    FROM catalog.silver.orders
    WHERE status = 'completed'
    GROUP BY customer_id
    HAVING SUM(total) > 1000
    ORDER BY lifetime_value DESC
""")

Performance: Caching, Partitioning, Broadcast Joins

Python
# Cache frequently accessed DataFrames
customers = spark.table("catalog.silver.customers").cache()
customers.count()  # triggers materialisation

# Broadcast join: send small table to all workers (avoid shuffle)
from pyspark.sql.functions import broadcast

result = orders.join(
    broadcast(product_lookup),   # product_lookup is small  broadcast it
    on="product_id"
)

# Coalesce / repartition for write performance
df.coalesce(10).write.format("delta").save(...)    # reduce partitions before write
df.repartition(200, col("customer_id")).write...   # repartition by key for co-located reads

MLflow on Databricks

Databricks includes a managed MLflow server — fully integrated with the workspace. No setup required.

Python
import mlflow

# Experiment is automatically created in your workspace
mlflow.set_experiment("/Users/me@company.com/churn-model")

with mlflow.start_run():
    mlflow.log_param("model_type", "lightgbm")
    mlflow.log_param("n_estimators", 500)

    # Train model on Spark cluster, log to MLflow
    import lightgbm as lgb
    model = lgb.train(params, train_set)

    mlflow.lightgbm.log_model(model, "model")
    mlflow.log_metric("auc", evaluate_auc(model, test_set))

# Register to Unity Catalog Model Registry
mlflow.set_registry_uri("databricks-uc")
mlflow.register_model(
    f"runs:/{mlflow.active_run().info.run_id}/model",
    "main.ml_models.churn_predictor"
)

Feature Store

The Databricks Feature Store maintains a central repository of feature definitions — compute once, use everywhere:

Python
from databricks.feature_store import FeatureStoreClient

fs = FeatureStoreClient()

# Create a feature table
fs.create_table(
    name="catalog.features.customer_features",
    primary_keys=["customer_id"],
    description="Computed features for customer churn model"
)

# Write features
feature_df = compute_customer_features(spark)   # your feature engineering function
fs.write_table(
    name="catalog.features.customer_features",
    df=feature_df,
    mode="merge"   # upsert
)

# Training with Feature Store  automatic feature lookup by primary key
training_set = fs.create_training_set(
    df=labels_df,          # just the label + primary key
    feature_lookups=[
        FeatureLookup(
            table_name="catalog.features.customer_features",
            lookup_key="customer_id"
        )
    ],
    label="churned"
)

# Model trained with feature store can retrieve features at serving time automatically

Unity Catalog: Governance at Scale

Unity Catalog is the data governance layer — one place to manage access, lineage, and discovery across all data assets.

SQL
-- Three-level namespace: catalog.schema.table
USE CATALOG main;
USE SCHEMA silver;

-- Grant access
GRANT SELECT ON TABLE main.silver.customers TO `data-analyst@company.com`;
GRANT SELECT ON TABLE main.silver.customers TO `analysts` GROUP;

-- Column-level security
CREATE ROW FILTER POLICY tenant_filter ON main.silver.orders
USING (tenant_id = current_user_tenant_id());

-- Tag sensitive columns for compliance
ALTER TABLE main.silver.customers
  ALTER COLUMN email SET TAGS ('pii' = 'true', 'classification' = 'confidential');

Databricks vs Azure Synapse vs Snowflake

| | Databricks | Azure Synapse | Snowflake | |--|-----------|--------------|----------| | Core strength | Spark + ML | SQL + BI | SQL analytics | | ML/AI | Native MLflow, Feature Store | Azure ML integration | Snowpark ML | | Streaming | Structured Streaming | Spark Streaming | Snowpipe | | SQL performance | Good (Photon) | Excellent (dedicated pool) | Excellent | | Language | Python/SQL/Scala/R | SQL/Python | SQL/Python (Snowpark) | | Best for | Data engineering + ML | BI-heavy workloads | SQL analytics, sharing |


Related: MLflow Experiment Tracking — MLflow deep dive
Related: Snowflake Guide — SQL analytics and data sharing
Related: Azure Cloud Integration — Event Hubs, Data Factory

Enjoyed this article?

Explore the AI Systems learning path for more.

Found this helpful?

Share:𝕏

Leave a comment

Have a question, correction, or just found this helpful? Leave a note below.