Databricks — Delta Lake, PySpark & ML Workflows
Production Databricks guide — Delta Lake architecture, PySpark at scale, structured streaming, Unity Catalog, MLflow integration, Feature Store, and Model Serving. With Python examples throughout.
Databricks is the unified data and AI platform built on Apache Spark. It combines data engineering (Delta Lake, structured streaming), data science (notebooks, MLflow), and ML serving under one platform — running on Azure, AWS, or GCP.
Architecture Overview
┌──────────────────────────────────────────────────────────────┐
│ Databricks Platform │
│ │
│ Notebooks SQL Warehouse Jobs │
│ (Python/R/SQL) (serverless SQL) (scheduled) │
│ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ Unity Catalog (Governance) │ │
│ │ catalog.schema.table — access control, lineage │ │
│ └──────────────────────────────────────────────────────┘ │
│ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ Delta Lake (Storage Layer) │ │
│ │ ACID transactions, time travel, schema enforcement │ │
│ └──────────────────────────────────────────────────────┘ │
│ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ Apache Spark │ │
│ │ Cluster (driver + workers), auto-scaling │ │
│ └──────────────────────────────────────────────────────┘ │
│ │
│ Cloud storage: Azure Data Lake / AWS S3 / GCS │
└──────────────────────────────────────────────────────────────┘Delta Lake — The Storage Foundation
Delta Lake is an open-source storage layer that adds ACID transactions, versioning, and schema enforcement to Parquet files on cloud object storage.
Why Delta Lake Over Plain Parquet
| | Plain Parquet | Delta Lake | |--|--------------|-----------| | ACID transactions | No | Yes | | Schema evolution | Manual | Automatic with enforcement | | Time travel | No | Yes (query any past version) | | Streaming + batch | Separate | Unified | | DELETE / UPDATE / MERGE | Rewrite entire partition | Efficient delta operations | | Concurrent writes | Data corruption risk | Serialisable isolation |
Creating and Writing Delta Tables
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, current_timestamp
from delta.tables import DeltaTable
spark = SparkSession.builder.appName("DataPipeline").getOrCreate()
# Create Delta table from DataFrame
df = spark.read.json("/mnt/raw/orders/*.json")
df.write.format("delta") \
.mode("overwrite") \
.partitionBy("year", "month") \
.save("/mnt/bronze/orders")
# Register as a table in Unity Catalog
spark.sql("""
CREATE TABLE IF NOT EXISTS catalog.bronze.orders
USING DELTA
LOCATION '/mnt/bronze/orders'
""")MERGE (Upsert) — The Workhorse
# Upsert: update existing records, insert new ones
delta_table = DeltaTable.forName(spark, "catalog.silver.customers")
updates_df = spark.read.format("delta").load("/mnt/staging/customer_updates")
delta_table.alias("target").merge(
updates_df.alias("source"),
"target.customer_id = source.customer_id"
).whenMatchedUpdate(set={
"name": "source.name",
"email": "source.email",
"updated_at": current_timestamp()
}).whenNotMatchedInsert(values={
"customer_id": "source.customer_id",
"name": "source.name",
"email": "source.email",
"created_at": current_timestamp(),
"updated_at": current_timestamp()
}).execute()Time Travel — Querying Historical Data
# Query data as of a specific version
df_v3 = spark.read.format("delta") \
.option("versionAsOf", 3) \
.table("catalog.silver.customers")
# Query data as of a specific timestamp
df_yesterday = spark.read.format("delta") \
.option("timestampAsOf", "2026-04-17") \
.table("catalog.silver.customers")
# View table history
spark.sql("DESCRIBE HISTORY catalog.silver.customers").show(10)
# Shows: version, timestamp, userId, operation, operationParameters
# Restore to a previous version (emergency rollback)
delta_table.restoreToVersion(5)
# or: delta_table.restoreToTimestamp("2026-04-17T12:00:00")
# Vacuum: remove old files (keeps 7 days by default)
delta_table.vacuum(retentionHours=168)Medallion Architecture: Bronze → Silver → Gold
The standard Databricks data organisation pattern:
Raw Sources Bronze Layer Silver Layer Gold Layer
──────────────── ───────────── ───────────── ─────────────
Kafka events → Raw, as-is Cleaned, Business-ready
S3 files No transforms deduped, aggregated
Database CDC Schema enforced validated views
APIs All history kept PII masked served to BI/ML# Bronze: ingest raw, append-only
def ingest_to_bronze(source_path: str, table: str):
df = spark.read.json(source_path)
df = df.withColumn("_ingested_at", current_timestamp()) \
.withColumn("_source_file", col("_metadata.file_path"))
df.write.format("delta").mode("append").saveAsTable(f"catalog.bronze.{table}")
# Silver: clean and conform
def bronze_to_silver():
df = spark.table("catalog.bronze.orders")
silver_df = df \
.dropDuplicates(["order_id"]) \
.filter(col("order_id").isNotNull()) \
.withColumn("total", col("total").cast("decimal(12,2)")) \
.withColumn("created_at", col("created_at").cast("timestamp"))
# Apply masking for PII
silver_df = silver_df.withColumn("email",
regexp_replace(col("email"), r"(?<=.{2}).(?=.*@)", "*"))
silver_df.write.format("delta").mode("overwrite") \
.option("overwriteSchema", "true") \
.saveAsTable("catalog.silver.orders")
# Gold: business aggregations
def silver_to_gold():
spark.sql("""
CREATE OR REPLACE TABLE catalog.gold.monthly_revenue AS
SELECT
year(created_at) AS year,
month(created_at) AS month,
COUNT(*) AS order_count,
SUM(total) AS revenue,
AVG(total) AS avg_order_value
FROM catalog.silver.orders
WHERE status = 'completed'
GROUP BY 1, 2
ORDER BY 1, 2
""")Structured Streaming — Real-Time Pipelines
Databricks Structured Streaming processes data continuously as it arrives — Kafka, Event Hubs, cloud storage, or Delta tables.
# Read from Azure Event Hubs (Kafka-compatible)
stream_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "eventhub.servicebus.windows.net:9093") \
.option("subscribe", "order-events") \
.option("kafka.security.protocol", "SASL_SSL") \
.load()
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
order_schema = StructType([
StructField("order_id", StringType()),
StructField("customer_id", StringType()),
StructField("total", DoubleType()),
StructField("created_at", TimestampType()),
])
parsed = stream_df \
.select(from_json(col("value").cast("string"), order_schema).alias("order")) \
.select("order.*")
# Write to Delta (append) with checkpointing for exactly-once
query = parsed.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "/mnt/checkpoints/orders") \
.trigger(processingTime="10 seconds") \
.toTable("catalog.bronze.orders_stream")
query.awaitTermination()PySpark Essentials
DataFrames vs SQL
PySpark has two equivalent APIs — use whichever is more readable:
# DataFrame API
result = spark.table("catalog.silver.orders") \
.filter(col("status") == "completed") \
.groupBy("customer_id") \
.agg(
count("*").alias("order_count"),
sum("total").alias("lifetime_value"),
max("created_at").alias("last_order_at")
) \
.filter(col("lifetime_value") > 1000) \
.orderBy(col("lifetime_value").desc())
# SQL API (same result)
result = spark.sql("""
SELECT customer_id,
COUNT(*) AS order_count,
SUM(total) AS lifetime_value,
MAX(created_at) AS last_order_at
FROM catalog.silver.orders
WHERE status = 'completed'
GROUP BY customer_id
HAVING SUM(total) > 1000
ORDER BY lifetime_value DESC
""")Performance: Caching, Partitioning, Broadcast Joins
# Cache frequently accessed DataFrames
customers = spark.table("catalog.silver.customers").cache()
customers.count() # triggers materialisation
# Broadcast join: send small table to all workers (avoid shuffle)
from pyspark.sql.functions import broadcast
result = orders.join(
broadcast(product_lookup), # product_lookup is small — broadcast it
on="product_id"
)
# Coalesce / repartition for write performance
df.coalesce(10).write.format("delta").save(...) # reduce partitions before write
df.repartition(200, col("customer_id")).write... # repartition by key for co-located readsMLflow on Databricks
Databricks includes a managed MLflow server — fully integrated with the workspace. No setup required.
import mlflow
# Experiment is automatically created in your workspace
mlflow.set_experiment("/Users/me@company.com/churn-model")
with mlflow.start_run():
mlflow.log_param("model_type", "lightgbm")
mlflow.log_param("n_estimators", 500)
# Train model on Spark cluster, log to MLflow
import lightgbm as lgb
model = lgb.train(params, train_set)
mlflow.lightgbm.log_model(model, "model")
mlflow.log_metric("auc", evaluate_auc(model, test_set))
# Register to Unity Catalog Model Registry
mlflow.set_registry_uri("databricks-uc")
mlflow.register_model(
f"runs:/{mlflow.active_run().info.run_id}/model",
"main.ml_models.churn_predictor"
)Feature Store
The Databricks Feature Store maintains a central repository of feature definitions — compute once, use everywhere:
from databricks.feature_store import FeatureStoreClient
fs = FeatureStoreClient()
# Create a feature table
fs.create_table(
name="catalog.features.customer_features",
primary_keys=["customer_id"],
description="Computed features for customer churn model"
)
# Write features
feature_df = compute_customer_features(spark) # your feature engineering function
fs.write_table(
name="catalog.features.customer_features",
df=feature_df,
mode="merge" # upsert
)
# Training with Feature Store — automatic feature lookup by primary key
training_set = fs.create_training_set(
df=labels_df, # just the label + primary key
feature_lookups=[
FeatureLookup(
table_name="catalog.features.customer_features",
lookup_key="customer_id"
)
],
label="churned"
)
# Model trained with feature store can retrieve features at serving time automaticallyUnity Catalog: Governance at Scale
Unity Catalog is the data governance layer — one place to manage access, lineage, and discovery across all data assets.
-- Three-level namespace: catalog.schema.table
USE CATALOG main;
USE SCHEMA silver;
-- Grant access
GRANT SELECT ON TABLE main.silver.customers TO `data-analyst@company.com`;
GRANT SELECT ON TABLE main.silver.customers TO `analysts` GROUP;
-- Column-level security
CREATE ROW FILTER POLICY tenant_filter ON main.silver.orders
USING (tenant_id = current_user_tenant_id());
-- Tag sensitive columns for compliance
ALTER TABLE main.silver.customers
ALTER COLUMN email SET TAGS ('pii' = 'true', 'classification' = 'confidential');Databricks vs Azure Synapse vs Snowflake
| | Databricks | Azure Synapse | Snowflake | |--|-----------|--------------|----------| | Core strength | Spark + ML | SQL + BI | SQL analytics | | ML/AI | Native MLflow, Feature Store | Azure ML integration | Snowpark ML | | Streaming | Structured Streaming | Spark Streaming | Snowpipe | | SQL performance | Good (Photon) | Excellent (dedicated pool) | Excellent | | Language | Python/SQL/Scala/R | SQL/Python | SQL/Python (Snowpark) | | Best for | Data engineering + ML | BI-heavy workloads | SQL analytics, sharing |
Related: MLflow Experiment Tracking — MLflow deep dive
Related: Snowflake Guide — SQL analytics and data sharing
Related: Azure Cloud Integration — Event Hubs, Data Factory
Found this helpful?
Leave a comment
Have a question, correction, or just found this helpful? Leave a note below.