Learnixo

Databricks — Delta Lake & PySpark · Lesson 3 of 4

Advanced PySpark: DLT, Auto Loader & Optimization

Databricks Is Not Plain Spark

A lot of PySpark tutorials teach you how to write SparkSession.builder.getOrCreate() and submit jobs to a cluster. Databricks wraps Spark in a platform that adds utilities, managed infrastructure, and declarative pipeline frameworks that change how you build pipelines.

This guide covers the Databricks-specific patterns that don't exist in vanilla Spark: dbutils, parameterized notebooks, Delta Live Tables (DLT), and Auto Loader — plus the cluster configuration decisions that have the biggest impact on cost and performance.


dbutils: The Databricks Utility Belt

dbutils is a Databricks-only Python module pre-injected into every notebook. It gives you access to the file system, secrets, and notebook orchestration without writing boilerplate SDK code.

dbutils.fs: File System Operations

Python
# List files in cloud storage (ADLS Gen2 / S3 / GCS)
files = dbutils.fs.ls("abfss://bronze@datalake.dfs.core.windows.net/raw_events/")
for f in files:
    print(f.name, f.size, f.modificationTime)

# Move a file (staging  processed)
dbutils.fs.mv(
    "abfss://landing@datalake.dfs.core.windows.net/orders/file.json",
    "abfss://processed@datalake.dfs.core.windows.net/orders/file.json"
)

# Copy a directory recursively
dbutils.fs.cp(
    "abfss://bronze@datalake.dfs.core.windows.net/snapshot/",
    "abfss://backup@datalake.dfs.core.windows.net/snapshot_2026_05_07/",
    recurse=True
)

# Delete old files (recurse=True for directories)
dbutils.fs.rm("abfss://landing@datalake.dfs.core.windows.net/orders/old_batch/", recurse=True)

# Mount cloud storage (classic workspaces without Unity Catalog)
dbutils.fs.mount(
    source="wasbs://container@storageaccount.blob.core.windows.net",
    mount_point="/mnt/raw",
    extra_configs={
        "fs.azure.account.key.storageaccount.blob.core.windows.net":
            dbutils.secrets.get(scope="azure-kv", key="storage-account-key")
    }
)

dbutils.secrets: Credential Management

Never hardcode passwords, connection strings, or API keys in notebooks. Use Databricks secret scopes backed by Azure Key Vault or Databricks-managed secrets.

Python
# Read a secret  the value is never printed in notebook output
jdbc_password   = dbutils.secrets.get(scope="prod-secrets", key="sql-db-password")
kafka_api_key   = dbutils.secrets.get(scope="prod-secrets", key="confluent-api-key")
openai_api_key  = dbutils.secrets.get(scope="prod-secrets", key="openai-api-key")

# List all keys in a scope (values are hidden)
dbutils.secrets.list("prod-secrets")
# Returns: [SecretMetadata(key='sql-db-password'), SecretMetadata(key='confluent-api-key'), ...]

# Use in JDBC connection
df = (
    spark.read
    .format("jdbc")
    .option("url",      "jdbc:sqlserver://sqlserver.database.windows.net:1433;database=OrdersDB")
    .option("dbtable",  "dbo.Orders")
    .option("user",     "etl_user")
    .option("password", jdbc_password)
    .option("driver",   "com.microsoft.sqlserver.jdbc.SQLServerDriver")
    .load()
)

dbutils.widgets: Parameterized Notebooks

Widgets turn a notebook into a reusable function you can call from a Job, a parent notebook, or the Databricks REST API — passing different parameters each time.

Python
# Declare widgets at the top of the notebook
dbutils.widgets.text(
    name="start_date",
    defaultValue="2026-05-01",
    label="Start Date (yyyy-MM-dd)"
)
dbutils.widgets.text(
    name="end_date",
    defaultValue="2026-05-07",
    label="End Date (yyyy-MM-dd)"
)
dbutils.widgets.dropdown(
    name="environment",
    defaultValue="dev",
    choices=["dev", "staging", "prod"],
    label="Target Environment"
)
dbutils.widgets.combobox(
    name="table_suffix",
    defaultValue="orders",
    choices=["orders", "events", "customers"],
    label="Table Name Suffix"
)

# Read widget values anywhere in the notebook
start_date  = dbutils.widgets.get("start_date")
end_date    = dbutils.widgets.get("end_date")
environment = dbutils.widgets.get("environment")

catalog     = "prod" if environment == "prod" else "dev"
table_name  = f"{catalog}.silver.{dbutils.widgets.get('table_suffix')}"

print(f"Processing {table_name} from {start_date} to {end_date}")

# Filter data using the widget parameters
df = spark.sql(f"""
    SELECT * FROM {table_name}
    WHERE created_at BETWEEN '{start_date}' AND '{end_date}'
""")

dbutils.notebook: Orchestrating Notebooks

Python
# Run a child notebook and pass parameters to it
result = dbutils.notebook.run(
    path="/Pipelines/ingest_bronze",
    timeout_seconds=3600,
    arguments={
        "start_date":  start_date,
        "end_date":    end_date,
        "environment": environment,
    }
)
print(f"Child notebook returned: {result}")

# Child notebook signals completion by calling:
dbutils.notebook.exit("SUCCESS: 12,450 rows ingested")

# For error propagation:
try:
    dbutils.notebook.run("/Pipelines/risky_job", timeout_seconds=600, arguments={})
except Exception as e:
    print(f"Child notebook failed: {e}")
    raise   # re-raise to fail the parent job

Notebook Workflows vs Databricks Jobs

| | Notebook Workflows (dbutils.notebook.run) | Databricks Jobs | |---|---|---| | Orchestration | Python code inside a notebook | JSON/YAML job definition via UI or Terraform | | Parallelism | Sequential or manual threading | Native parallel task graph | | Scheduling | None (triggered by parent) | Cron, file arrival trigger, continuous | | Monitoring | Notebook output | Job run history, metrics, alerts | | Cluster reuse | Shares caller's cluster | Job clusters or existing interactive clusters | | Best for | Ad-hoc ETL chains, simple fan-out | Production scheduled pipelines |

For production, prefer Databricks Jobs with a task graph over chaining notebooks with dbutils.notebook.run. Jobs give you a visual DAG, retry policies, email/Slack alerts, and detailed metrics.


Delta Live Tables (DLT): Declarative Pipelines

DLT is Databricks' managed pipeline framework. Instead of writing code that orchestrates reads, transforms, and writes, you declare what each table is — DLT figures out execution order, manages checkpoints, and monitors data quality automatically.

The @dlt.table Decorator

Python
import dlt
from pyspark.sql.functions import col, current_timestamp, to_date, sha2, concat_ws
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

# ─── BRONZE LAYER ────────────────────────────────────────────────────────────

@dlt.table(
    name="raw_orders",
    comment="Raw order events ingested from cloud storage via Auto Loader",
    table_properties={
        "delta.autoOptimize.optimizeWrite": "true",
        "delta.autoOptimize.autoCompact":   "true",
    }
)
def raw_orders():
    """
    Auto Loader: incrementally ingest new JSON files from ADLS Gen2.
    DLT manages the checkpoint automatically.
    """
    return (
        spark.readStream
        .format("cloudFiles")                    # Auto Loader format
        .option("cloudFiles.format", "json")
        .option("cloudFiles.schemaLocation",
                "abfss://checkpoints@lake.dfs.core.windows.net/dlt/raw_orders/schema")
        .option("cloudFiles.inferColumnTypes", "true")
        .load("abfss://landing@lake.dfs.core.windows.net/orders/")
        .withColumn("_ingested_at", current_timestamp())
        .withColumn("_source_file", col("_metadata.file_path"))
    )

@dlt.expect: Data Quality Expectations

Expectations are named assertions on a table. They let you monitor data quality without stopping your pipeline — you choose the action (warn, drop, or fail).

Python
# ─── SILVER LAYER ────────────────────────────────────────────────────────────

@dlt.table(
    name="clean_orders",
    comment="Validated and deduplicated orders — Silver layer"
)
@dlt.expect("order_id is not null",    "order_id IS NOT NULL")
@dlt.expect("customer_id is not null", "customer_id IS NOT NULL")
@dlt.expect("valid status",
    "status IN ('pending', 'processing', 'shipped', 'completed', 'cancelled')")
@dlt.expect_or_drop("total is positive", "total > 0")     # drop rows that fail
@dlt.expect_or_fail("no duplicate order_id",              # fail the pipeline if violated
    "COUNT(order_id) OVER (PARTITION BY order_id) = 1")
def clean_orders():
    """
    Clean raw orders: type-cast, validate, deduplicate.
    Expectations are evaluated and metrics surfaced in the DLT pipeline UI.
    """
    return (
        dlt.read_stream("raw_orders")              # reference upstream DLT table
        .dropDuplicates(["order_id"])
        .withColumn("total",      col("total").cast(DoubleType()))
        .withColumn("created_at", col("created_at").cast(TimestampType()))
        .withColumn("order_date", to_date(col("created_at")))
        .withColumn("row_hash",
            sha2(concat_ws("|", col("order_id"), col("total"), col("status")), 256))
    )


# Expectation action reference:
# @dlt.expect(name, condition)            warn in metrics, keep bad rows
# @dlt.expect_or_drop(name, condition)    silently drop rows that fail
# @dlt.expect_or_fail(name, condition)    halt the pipeline on violation

Materialized Views in DLT

For batch (non-streaming) aggregations, use @dlt.table with dlt.read() instead of dlt.read_stream():

Python
# ─── GOLD LAYER ──────────────────────────────────────────────────────────────

@dlt.table(
    name="monthly_revenue",
    comment="Monthly revenue summary — Gold layer, refreshed on each pipeline run"
)
def monthly_revenue():
    return (
        dlt.read("clean_orders")                   # batch read, not streaming
        .filter(col("status") == "completed")
        .groupBy(
            col("order_date").substr(1, 7).alias("month")   # "2026-05"
        )
        .agg(
            {"total": "sum", "order_id": "count"}
        )
        .withColumnRenamed("sum(total)",     "revenue")
        .withColumnRenamed("count(order_id)", "order_count")
    )

Auto Loader: Incremental File Ingestion

Auto Loader (cloudFiles format) is Databricks' solution for scalable incremental ingestion from cloud object storage. It tracks which files have been processed using a checkpoint and a file notification service (Azure Event Grid or S3 Event Notifications), so each file is processed exactly once — even when millions of files accumulate.

Auto Loader Outside DLT (Structured Streaming)

Python
# Schema hint: tell Auto Loader the expected schema (avoids full schema inference on every restart)
order_schema = StructType([
    StructField("order_id",    StringType(),  nullable=False),
    StructField("customer_id", StringType(),  nullable=False),
    StructField("total",       DoubleType(),  nullable=True),
    StructField("status",      StringType(),  nullable=True),
    StructField("created_at",  TimestampType(), nullable=True),
])

stream_df = (
    spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    # Schema location: Auto Loader stores inferred schema here for schema evolution tracking
    .option("cloudFiles.schemaLocation",
            "abfss://checkpoints@lake.dfs.core.windows.net/autoloader/orders/schema")
    # File notification mode: Azure Event Grid / SQS (faster than directory listing)
    .option("cloudFiles.useNotifications", "true")
    .option("cloudFiles.subscriptionId",  dbutils.secrets.get("az-secrets", "subscription-id"))
    .option("cloudFiles.tenantId",        dbutils.secrets.get("az-secrets", "tenant-id"))
    .option("cloudFiles.clientId",        dbutils.secrets.get("az-secrets", "sp-client-id"))
    .option("cloudFiles.clientSecret",    dbutils.secrets.get("az-secrets", "sp-client-secret"))
    # If schema evolves (new columns added), merge rather than fail
    .option("cloudFiles.schemaEvolutionMode", "addNewColumns")
    .schema(order_schema)
    .load("abfss://landing@lake.dfs.core.windows.net/orders/")
)

# Write to Bronze Delta table  checkpoint ensures exactly-once processing
query = (
    stream_df
    .withColumn("_ingested_at", current_timestamp())
    .writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation",
            "abfss://checkpoints@lake.dfs.core.windows.net/autoloader/orders/checkpoint")
    .trigger(availableNow=True)     # process all backlog then stop (batch-like behavior)
    .toTable("catalog.bronze.raw_orders")
)

query.awaitTermination()

Auto Loader Trigger Modes

Python
# Process all available files then stop (scheduled batch pattern)
.trigger(availableNow=True)

# Run continuously, process new files every 30 seconds
.trigger(processingTime="30 seconds")

# Run once and stop (deprecated  use availableNow instead)
.trigger(once=True)

Reading Kafka Streams into DLT

Python
import dlt
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

ORDER_EVENT_SCHEMA = StructType([
    StructField("order_id",    StringType()),
    StructField("customer_id", StringType()),
    StructField("total",       DoubleType()),
    StructField("event_type",  StringType()),
    StructField("event_ts",    TimestampType()),
])

@dlt.table(
    name="raw_order_events_kafka",
    comment="Raw Kafka events from the orders topic — Bronze layer"
)
def raw_order_events_kafka():
    return (
        spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers",
                dbutils.secrets.get("kafka-secrets", "bootstrap-servers"))
        .option("kafka.security.protocol", "SASL_SSL")
        .option("kafka.sasl.mechanism",    "PLAIN")
        .option("kafka.sasl.jaas.config",
                f'org.apache.kafka.common.security.plain.PlainLoginModule required '
                f'username="{dbutils.secrets.get("kafka-secrets", "api-key")}" '
                f'password="{dbutils.secrets.get("kafka-secrets", "api-secret")}";')
        .option("subscribe",          "order-events")
        .option("startingOffsets",    "latest")
        .option("failOnDataLoss",     "false")
        .load()
        # Kafka value is binary  cast to string and parse JSON
        .select(
            col("offset"),
            col("partition"),
            col("timestamp").alias("kafka_ts"),
            from_json(col("value").cast("string"), ORDER_EVENT_SCHEMA).alias("data")
        )
        .select("offset", "partition", "kafka_ts", "data.*")
    )


@dlt.table(name="clean_order_events")
@dlt.expect_or_drop("valid order_id", "order_id IS NOT NULL")
@dlt.expect_or_drop("valid total",    "total > 0")
def clean_order_events():
    return dlt.read_stream("raw_order_events_kafka")

Structured Streaming Checkpoints

Checkpoints are how Spark Structured Streaming achieves fault-tolerance and exactly-once processing. The checkpoint directory stores:

  • Offsets: which source positions have been read
  • Commits: which micro-batches have been committed to the sink
  • State: aggregation state for stateful operations (windows, deduplication)
Python
# Checkpoint best practices
query = (
    clean_df.writeStream
    .format("delta")
    .outputMode("append")
    # Store checkpoint on reliable cloud storage  NOT local disk
    .option("checkpointLocation",
            "abfss://checkpoints@lake.dfs.core.windows.net/streams/orders/v1")
    # Never change the checkpoint path after production deployment
    # If you need to change the query, create a new checkpoint path (v2, v3)
    .trigger(processingTime="60 seconds")
    .toTable("catalog.silver.orders_stream")
)

# Monitor query progress
import time
for _ in range(5):
    progress = query.lastProgress
    if progress:
        print(f"Batch: {progress['batchId']} | "
              f"Input rows: {progress['numInputRows']} | "
              f"Processing rate: {progress['processedRowsPerSecond']:.0f} rows/s")
    time.sleep(30)

PySpark Performance on Databricks

Photon Engine

Photon is Databricks' native vectorized query engine, written in C++. It replaces the JVM-based Spark SQL engine for supported operations, giving 2–8x speedup on SQL workloads at no code change.

Python
# Photon is enabled at the cluster level  no code change needed
# Enable via: Cluster Configuration  Runtime  "Use Photon Acceleration"
# Or in cluster JSON:
# "spark_conf": {"spark.databricks.photon.enabled": "true"}

# Photon accelerates: aggregations, joins, sort, filter, columnar scan
# Photon does NOT accelerate: Python UDFs, RDD operations, map/flatMap
# Best practice: eliminate Python UDFs  replace with SQL functions where possible

# Instead of a Python UDF:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# BAD  bypasses Photon
@udf(StringType())
def clean_email_udf(email: str) -> str:
    return email.lower().strip() if email else None

# GOOD  Photon-accelerated built-in functions
from pyspark.sql.functions import lower, trim
df = df.withColumn("email", lower(trim(col("email"))))

Liquid Clustering vs Z-ORDER

Liquid Clustering is the newer alternative to Z-ORDER + partition-based layouts. It uses space-filling curves on the cluster columns and is designed for tables that change query patterns over time.

SQL
-- Enable liquid clustering on a new table
CREATE TABLE catalog.silver.events
CLUSTER BY (customer_id, event_date)
USING DELTA;

-- Enable on an existing table (migrates incrementally on next OPTIMIZE)
ALTER TABLE catalog.silver.events
CLUSTER BY (customer_id, event_date);

-- Optimize runs the same  liquid clustering is applied automatically
OPTIMIZE catalog.silver.events;

| | Z-ORDER | Liquid Clustering | |---|---|---| | Setup | OPTIMIZE … ZORDER BY | CLUSTER BY on table DDL | | Partition required | Usually yes (partition + Z-ORDER) | No partitions needed | | Reoptimization | Full OPTIMIZE run | Incremental on each OPTIMIZE | | Column changes | Re-run full OPTIMIZE | ALTER TABLE … CLUSTER BY | | Best for | Stable access patterns | Evolving access patterns |


Cluster Configuration

Single Node vs Multi-Node

Single Node cluster:
  Driver = Worker (same VM)
  No network shuffle between nodes
  Best for: ML training on pandas/sklearn, small ETL, notebook exploration

Multi-Node cluster:
  Driver VM + N Worker VMs
  Data distributed across workers via Shuffle
  Best for: large-scale PySpark jobs, DLT pipelines, SQL warehouses

Autoscaling

JSON
{
  "autoscale": {
    "min_workers": 2,
    "max_workers": 20
  },
  "spark_conf": {
    "spark.databricks.adaptive.autoBroadcastJoinThreshold": "50MB",
    "spark.sql.shuffle.partitions": "auto"
  }
}

Autoscaling scales up when task queues are long and scales down after idle timeout. For streaming jobs, disable autoscaling — it can cause lag spikes. For batch ETL, autoscaling is strongly recommended.

Spot Instances (Cost Reduction)

JSON
{
  "aws_attributes": {
    "availability": "SPOT_WITH_FALLBACK",
    "spot_bid_price_percent": 100,
    "first_on_demand": 1
  }
}

SPOT_WITH_FALLBACK: uses spot for workers, on-demand for the driver. If spot capacity is unavailable, it falls back to on-demand. This is the standard production configuration — it cuts compute costs by 60–80% without reliability risk.

Job Clusters vs Interactive Clusters

| | Interactive Cluster | Job Cluster | |---|---|---| | Lifecycle | Manually started/stopped | Created at job start, terminated at job end | | Cost | Runs idle between jobs | Pay only for job runtime | | Warm-up time | None (already running) | 3–6 minutes to provision | | Use for | Notebook development, ad hoc queries | Scheduled production jobs | | Sharing | Multiple users, multiple notebooks | Single job (no sharing) |

Python
# Best practice for scheduled jobs:
# 1. Develop on an interactive cluster
# 2. In the Job definition, specify a "new_cluster" config (job cluster)
# 3. For time-sensitive jobs where 5 minutes startup matters, use a
#    "existing_cluster_id" to reuse a warm interactive cluster

Complete DLT Pipeline: Auto Loader → Bronze → Silver

This is a single DLT pipeline file that wires together Auto Loader ingestion with Bronze and Silver layers, full data quality expectations, and a Kafka stream merged into the same Silver table.

Python
import dlt
from pyspark.sql.functions import (
    col, current_timestamp, to_date, lower, trim,
    sha2, concat_ws, from_json, coalesce, lit
)
from pyspark.sql.types import (
    StructType, StructField, StringType, DoubleType, TimestampType
)

# ── Shared schema ─────────────────────────────────────────────────────────────

ORDER_SCHEMA = StructType([
    StructField("order_id",    StringType(),  False),
    StructField("customer_id", StringType(),  False),
    StructField("total",       DoubleType(),  True),
    StructField("status",      StringType(),  True),
    StructField("email",       StringType(),  True),
    StructField("created_at",  TimestampType(), True),
])

# ── Bronze: Auto Loader from ADLS ─────────────────────────────────────────────

@dlt.table(
    name="bronze_orders_files",
    comment="Raw order JSON files ingested incrementally via Auto Loader",
    table_properties={"delta.autoOptimize.optimizeWrite": "true"}
)
def bronze_orders_files():
    return (
        spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format",         "json")
        .option("cloudFiles.schemaLocation",
                "abfss://checkpoints@lake.dfs.core.windows.net/dlt/schema/orders_files")
        .option("cloudFiles.schemaEvolutionMode", "addNewColumns")
        .option("cloudFiles.inferColumnTypes",    "true")
        .schema(ORDER_SCHEMA)
        .load("abfss://landing@lake.dfs.core.windows.net/orders/")
        .withColumn("_ingested_at", current_timestamp())
        .withColumn("_source",      lit("files"))
    )

# ── Bronze: Kafka stream ───────────────────────────────────────────────────────

@dlt.table(
    name="bronze_orders_kafka",
    comment="Real-time order events consumed from Kafka"
)
def bronze_orders_kafka():
    return (
        spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers",
                dbutils.secrets.get("kafka-secrets", "bootstrap-servers"))
        .option("kafka.security.protocol", "SASL_SSL")
        .option("kafka.sasl.mechanism",    "PLAIN")
        .option("kafka.sasl.jaas.config",
                f'org.apache.kafka.common.security.plain.PlainLoginModule required '
                f'username="{dbutils.secrets.get("kafka-secrets", "key")}" '
                f'password="{dbutils.secrets.get("kafka-secrets", "secret")}";')
        .option("subscribe",       "order-events")
        .option("startingOffsets", "latest")
        .option("failOnDataLoss",  "false")
        .load()
        .select(
            from_json(col("value").cast("string"), ORDER_SCHEMA).alias("d"),
            col("timestamp").alias("_kafka_ts")
        )
        .select("d.*", "_kafka_ts")
        .withColumn("_ingested_at", current_timestamp())
        .withColumn("_source",      lit("kafka"))
    )

# ── Silver: Union both sources, validate, clean ───────────────────────────────

@dlt.table(
    name="silver_orders",
    comment="Validated and cleaned orders from all sources — Silver layer",
    table_properties={
        "delta.autoOptimize.optimizeWrite": "true",
        "delta.dataSkippingNumIndexedCols": "5",
    }
)
@dlt.expect("order_id not null",    "order_id IS NOT NULL")
@dlt.expect("customer_id not null", "customer_id IS NOT NULL")
@dlt.expect_or_drop("positive total",   "total > 0")
@dlt.expect_or_drop("valid status",
    "status IN ('pending','processing','shipped','completed','cancelled')")
def silver_orders():
    # Union the two Bronze sources
    files_stream = dlt.read_stream("bronze_orders_files")
    kafka_stream = dlt.read_stream("bronze_orders_kafka")

    combined = files_stream.unionByName(kafka_stream, allowMissingColumns=True)

    return (
        combined
        .dropDuplicates(["order_id"])
        .withColumn("email",      lower(trim(col("email"))))
        .withColumn("order_date", to_date(col("created_at")))
        .withColumn("row_hash",
            sha2(concat_ws("|", col("order_id"), col("total"), col("status")), 256))
        .withColumn("_cleaned_at", current_timestamp())
    )

Deploying the DLT Pipeline

Python
# DLT pipelines are deployed via Databricks UI or the REST API / Terraform
# Terraform example:
#
# resource "databricks_pipeline" "orders_dlt" {
#   name = "orders-medallion-pipeline"
#   storage = "abfss://dlt@lake.dfs.core.windows.net/pipelines/orders"
#   cluster {
#     autoscale { min_workers = 1; max_workers = 5 }
#     node_type_id = "Standard_DS3_v2"
#   }
#   library { notebook { path = "/Pipelines/orders_dlt_pipeline" } }
#   continuous = false   # triggered mode: run on schedule
#   channel = "CURRENT"
# }

Key Takeaways

  • dbutils.secrets is non-negotiable for credentials — never hardcode connection strings in notebooks.
  • dbutils.widgets turns notebooks into parameterized functions; use them for every notebook that runs in production.
  • DLT @dlt.expect_or_drop is the right default for data quality — warn in metrics, don't silently keep bad data.
  • Auto Loader with availableNow=True trigger gives you cost-efficient scheduled batch ingestion with exactly-once guarantees.
  • Photon eliminates Python UDFs — replace them with built-in SQL functions wherever possible.
  • Liquid Clustering beats Z-ORDER for tables where query patterns evolve; use Z-ORDER for stable, well-understood access patterns.
  • Use job clusters for production scheduled pipelines; interactive clusters are for development only.

Related: Delta Lake Deep Dive — ACID transactions, MERGE, time travel
Related: Databricks MLflow and Unity Catalog — ML workflows, governance, model serving