Back to blog
Data Engineeringadvanced

Advanced PySpark on Databricks: Delta Live Tables, Auto Loader & Streaming

Production PySpark patterns on Databricks — dbutils, parameterized notebooks, Delta Live Tables with data quality expectations, Auto Loader for incremental ingestion, Kafka streaming, cluster tuning, and Photon.

LearnixoMay 7, 202613 min read
DatabricksPySparkDelta Live TablesAuto LoaderKafkaStructured StreamingData Engineering
Share:𝕏

Databricks Is Not Plain Spark

A lot of PySpark tutorials teach you how to write SparkSession.builder.getOrCreate() and submit jobs to a cluster. Databricks wraps Spark in a platform that adds utilities, managed infrastructure, and declarative pipeline frameworks that change how you build pipelines.

This guide covers the Databricks-specific patterns that don't exist in vanilla Spark: dbutils, parameterized notebooks, Delta Live Tables (DLT), and Auto Loader — plus the cluster configuration decisions that have the biggest impact on cost and performance.


dbutils: The Databricks Utility Belt

dbutils is a Databricks-only Python module pre-injected into every notebook. It gives you access to the file system, secrets, and notebook orchestration without writing boilerplate SDK code.

dbutils.fs: File System Operations

Python
# List files in cloud storage (ADLS Gen2 / S3 / GCS)
files = dbutils.fs.ls("abfss://bronze@datalake.dfs.core.windows.net/raw_events/")
for f in files:
    print(f.name, f.size, f.modificationTime)

# Move a file (staging  processed)
dbutils.fs.mv(
    "abfss://landing@datalake.dfs.core.windows.net/orders/file.json",
    "abfss://processed@datalake.dfs.core.windows.net/orders/file.json"
)

# Copy a directory recursively
dbutils.fs.cp(
    "abfss://bronze@datalake.dfs.core.windows.net/snapshot/",
    "abfss://backup@datalake.dfs.core.windows.net/snapshot_2026_05_07/",
    recurse=True
)

# Delete old files (recurse=True for directories)
dbutils.fs.rm("abfss://landing@datalake.dfs.core.windows.net/orders/old_batch/", recurse=True)

# Mount cloud storage (classic workspaces without Unity Catalog)
dbutils.fs.mount(
    source="wasbs://container@storageaccount.blob.core.windows.net",
    mount_point="/mnt/raw",
    extra_configs={
        "fs.azure.account.key.storageaccount.blob.core.windows.net":
            dbutils.secrets.get(scope="azure-kv", key="storage-account-key")
    }
)

dbutils.secrets: Credential Management

Never hardcode passwords, connection strings, or API keys in notebooks. Use Databricks secret scopes backed by Azure Key Vault or Databricks-managed secrets.

Python
# Read a secret  the value is never printed in notebook output
jdbc_password   = dbutils.secrets.get(scope="prod-secrets", key="sql-db-password")
kafka_api_key   = dbutils.secrets.get(scope="prod-secrets", key="confluent-api-key")
openai_api_key  = dbutils.secrets.get(scope="prod-secrets", key="openai-api-key")

# List all keys in a scope (values are hidden)
dbutils.secrets.list("prod-secrets")
# Returns: [SecretMetadata(key='sql-db-password'), SecretMetadata(key='confluent-api-key'), ...]

# Use in JDBC connection
df = (
    spark.read
    .format("jdbc")
    .option("url",      "jdbc:sqlserver://sqlserver.database.windows.net:1433;database=OrdersDB")
    .option("dbtable",  "dbo.Orders")
    .option("user",     "etl_user")
    .option("password", jdbc_password)
    .option("driver",   "com.microsoft.sqlserver.jdbc.SQLServerDriver")
    .load()
)

dbutils.widgets: Parameterized Notebooks

Widgets turn a notebook into a reusable function you can call from a Job, a parent notebook, or the Databricks REST API — passing different parameters each time.

Python
# Declare widgets at the top of the notebook
dbutils.widgets.text(
    name="start_date",
    defaultValue="2026-05-01",
    label="Start Date (yyyy-MM-dd)"
)
dbutils.widgets.text(
    name="end_date",
    defaultValue="2026-05-07",
    label="End Date (yyyy-MM-dd)"
)
dbutils.widgets.dropdown(
    name="environment",
    defaultValue="dev",
    choices=["dev", "staging", "prod"],
    label="Target Environment"
)
dbutils.widgets.combobox(
    name="table_suffix",
    defaultValue="orders",
    choices=["orders", "events", "customers"],
    label="Table Name Suffix"
)

# Read widget values anywhere in the notebook
start_date  = dbutils.widgets.get("start_date")
end_date    = dbutils.widgets.get("end_date")
environment = dbutils.widgets.get("environment")

catalog     = "prod" if environment == "prod" else "dev"
table_name  = f"{catalog}.silver.{dbutils.widgets.get('table_suffix')}"

print(f"Processing {table_name} from {start_date} to {end_date}")

# Filter data using the widget parameters
df = spark.sql(f"""
    SELECT * FROM {table_name}
    WHERE created_at BETWEEN '{start_date}' AND '{end_date}'
""")

dbutils.notebook: Orchestrating Notebooks

Python
# Run a child notebook and pass parameters to it
result = dbutils.notebook.run(
    path="/Pipelines/ingest_bronze",
    timeout_seconds=3600,
    arguments={
        "start_date":  start_date,
        "end_date":    end_date,
        "environment": environment,
    }
)
print(f"Child notebook returned: {result}")

# Child notebook signals completion by calling:
dbutils.notebook.exit("SUCCESS: 12,450 rows ingested")

# For error propagation:
try:
    dbutils.notebook.run("/Pipelines/risky_job", timeout_seconds=600, arguments={})
except Exception as e:
    print(f"Child notebook failed: {e}")
    raise   # re-raise to fail the parent job

Notebook Workflows vs Databricks Jobs

| | Notebook Workflows (dbutils.notebook.run) | Databricks Jobs | |---|---|---| | Orchestration | Python code inside a notebook | JSON/YAML job definition via UI or Terraform | | Parallelism | Sequential or manual threading | Native parallel task graph | | Scheduling | None (triggered by parent) | Cron, file arrival trigger, continuous | | Monitoring | Notebook output | Job run history, metrics, alerts | | Cluster reuse | Shares caller's cluster | Job clusters or existing interactive clusters | | Best for | Ad-hoc ETL chains, simple fan-out | Production scheduled pipelines |

For production, prefer Databricks Jobs with a task graph over chaining notebooks with dbutils.notebook.run. Jobs give you a visual DAG, retry policies, email/Slack alerts, and detailed metrics.


Delta Live Tables (DLT): Declarative Pipelines

DLT is Databricks' managed pipeline framework. Instead of writing code that orchestrates reads, transforms, and writes, you declare what each table is — DLT figures out execution order, manages checkpoints, and monitors data quality automatically.

The @dlt.table Decorator

Python
import dlt
from pyspark.sql.functions import col, current_timestamp, to_date, sha2, concat_ws
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

# ─── BRONZE LAYER ────────────────────────────────────────────────────────────

@dlt.table(
    name="raw_orders",
    comment="Raw order events ingested from cloud storage via Auto Loader",
    table_properties={
        "delta.autoOptimize.optimizeWrite": "true",
        "delta.autoOptimize.autoCompact":   "true",
    }
)
def raw_orders():
    """
    Auto Loader: incrementally ingest new JSON files from ADLS Gen2.
    DLT manages the checkpoint automatically.
    """
    return (
        spark.readStream
        .format("cloudFiles")                    # Auto Loader format
        .option("cloudFiles.format", "json")
        .option("cloudFiles.schemaLocation",
                "abfss://checkpoints@lake.dfs.core.windows.net/dlt/raw_orders/schema")
        .option("cloudFiles.inferColumnTypes", "true")
        .load("abfss://landing@lake.dfs.core.windows.net/orders/")
        .withColumn("_ingested_at", current_timestamp())
        .withColumn("_source_file", col("_metadata.file_path"))
    )

@dlt.expect: Data Quality Expectations

Expectations are named assertions on a table. They let you monitor data quality without stopping your pipeline — you choose the action (warn, drop, or fail).

Python
# ─── SILVER LAYER ────────────────────────────────────────────────────────────

@dlt.table(
    name="clean_orders",
    comment="Validated and deduplicated orders — Silver layer"
)
@dlt.expect("order_id is not null",    "order_id IS NOT NULL")
@dlt.expect("customer_id is not null", "customer_id IS NOT NULL")
@dlt.expect("valid status",
    "status IN ('pending', 'processing', 'shipped', 'completed', 'cancelled')")
@dlt.expect_or_drop("total is positive", "total > 0")     # drop rows that fail
@dlt.expect_or_fail("no duplicate order_id",              # fail the pipeline if violated
    "COUNT(order_id) OVER (PARTITION BY order_id) = 1")
def clean_orders():
    """
    Clean raw orders: type-cast, validate, deduplicate.
    Expectations are evaluated and metrics surfaced in the DLT pipeline UI.
    """
    return (
        dlt.read_stream("raw_orders")              # reference upstream DLT table
        .dropDuplicates(["order_id"])
        .withColumn("total",      col("total").cast(DoubleType()))
        .withColumn("created_at", col("created_at").cast(TimestampType()))
        .withColumn("order_date", to_date(col("created_at")))
        .withColumn("row_hash",
            sha2(concat_ws("|", col("order_id"), col("total"), col("status")), 256))
    )


# Expectation action reference:
# @dlt.expect(name, condition)            warn in metrics, keep bad rows
# @dlt.expect_or_drop(name, condition)    silently drop rows that fail
# @dlt.expect_or_fail(name, condition)    halt the pipeline on violation

Materialized Views in DLT

For batch (non-streaming) aggregations, use @dlt.table with dlt.read() instead of dlt.read_stream():

Python
# ─── GOLD LAYER ──────────────────────────────────────────────────────────────

@dlt.table(
    name="monthly_revenue",
    comment="Monthly revenue summary — Gold layer, refreshed on each pipeline run"
)
def monthly_revenue():
    return (
        dlt.read("clean_orders")                   # batch read, not streaming
        .filter(col("status") == "completed")
        .groupBy(
            col("order_date").substr(1, 7).alias("month")   # "2026-05"
        )
        .agg(
            {"total": "sum", "order_id": "count"}
        )
        .withColumnRenamed("sum(total)",     "revenue")
        .withColumnRenamed("count(order_id)", "order_count")
    )

Auto Loader: Incremental File Ingestion

Auto Loader (cloudFiles format) is Databricks' solution for scalable incremental ingestion from cloud object storage. It tracks which files have been processed using a checkpoint and a file notification service (Azure Event Grid or S3 Event Notifications), so each file is processed exactly once — even when millions of files accumulate.

Auto Loader Outside DLT (Structured Streaming)

Python
# Schema hint: tell Auto Loader the expected schema (avoids full schema inference on every restart)
order_schema = StructType([
    StructField("order_id",    StringType(),  nullable=False),
    StructField("customer_id", StringType(),  nullable=False),
    StructField("total",       DoubleType(),  nullable=True),
    StructField("status",      StringType(),  nullable=True),
    StructField("created_at",  TimestampType(), nullable=True),
])

stream_df = (
    spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    # Schema location: Auto Loader stores inferred schema here for schema evolution tracking
    .option("cloudFiles.schemaLocation",
            "abfss://checkpoints@lake.dfs.core.windows.net/autoloader/orders/schema")
    # File notification mode: Azure Event Grid / SQS (faster than directory listing)
    .option("cloudFiles.useNotifications", "true")
    .option("cloudFiles.subscriptionId",  dbutils.secrets.get("az-secrets", "subscription-id"))
    .option("cloudFiles.tenantId",        dbutils.secrets.get("az-secrets", "tenant-id"))
    .option("cloudFiles.clientId",        dbutils.secrets.get("az-secrets", "sp-client-id"))
    .option("cloudFiles.clientSecret",    dbutils.secrets.get("az-secrets", "sp-client-secret"))
    # If schema evolves (new columns added), merge rather than fail
    .option("cloudFiles.schemaEvolutionMode", "addNewColumns")
    .schema(order_schema)
    .load("abfss://landing@lake.dfs.core.windows.net/orders/")
)

# Write to Bronze Delta table  checkpoint ensures exactly-once processing
query = (
    stream_df
    .withColumn("_ingested_at", current_timestamp())
    .writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation",
            "abfss://checkpoints@lake.dfs.core.windows.net/autoloader/orders/checkpoint")
    .trigger(availableNow=True)     # process all backlog then stop (batch-like behavior)
    .toTable("catalog.bronze.raw_orders")
)

query.awaitTermination()

Auto Loader Trigger Modes

Python
# Process all available files then stop (scheduled batch pattern)
.trigger(availableNow=True)

# Run continuously, process new files every 30 seconds
.trigger(processingTime="30 seconds")

# Run once and stop (deprecated  use availableNow instead)
.trigger(once=True)

Reading Kafka Streams into DLT

Python
import dlt
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

ORDER_EVENT_SCHEMA = StructType([
    StructField("order_id",    StringType()),
    StructField("customer_id", StringType()),
    StructField("total",       DoubleType()),
    StructField("event_type",  StringType()),
    StructField("event_ts",    TimestampType()),
])

@dlt.table(
    name="raw_order_events_kafka",
    comment="Raw Kafka events from the orders topic — Bronze layer"
)
def raw_order_events_kafka():
    return (
        spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers",
                dbutils.secrets.get("kafka-secrets", "bootstrap-servers"))
        .option("kafka.security.protocol", "SASL_SSL")
        .option("kafka.sasl.mechanism",    "PLAIN")
        .option("kafka.sasl.jaas.config",
                f'org.apache.kafka.common.security.plain.PlainLoginModule required '
                f'username="{dbutils.secrets.get("kafka-secrets", "api-key")}" '
                f'password="{dbutils.secrets.get("kafka-secrets", "api-secret")}";')
        .option("subscribe",          "order-events")
        .option("startingOffsets",    "latest")
        .option("failOnDataLoss",     "false")
        .load()
        # Kafka value is binary  cast to string and parse JSON
        .select(
            col("offset"),
            col("partition"),
            col("timestamp").alias("kafka_ts"),
            from_json(col("value").cast("string"), ORDER_EVENT_SCHEMA).alias("data")
        )
        .select("offset", "partition", "kafka_ts", "data.*")
    )


@dlt.table(name="clean_order_events")
@dlt.expect_or_drop("valid order_id", "order_id IS NOT NULL")
@dlt.expect_or_drop("valid total",    "total > 0")
def clean_order_events():
    return dlt.read_stream("raw_order_events_kafka")

Structured Streaming Checkpoints

Checkpoints are how Spark Structured Streaming achieves fault-tolerance and exactly-once processing. The checkpoint directory stores:

  • Offsets: which source positions have been read
  • Commits: which micro-batches have been committed to the sink
  • State: aggregation state for stateful operations (windows, deduplication)
Python
# Checkpoint best practices
query = (
    clean_df.writeStream
    .format("delta")
    .outputMode("append")
    # Store checkpoint on reliable cloud storage  NOT local disk
    .option("checkpointLocation",
            "abfss://checkpoints@lake.dfs.core.windows.net/streams/orders/v1")
    # Never change the checkpoint path after production deployment
    # If you need to change the query, create a new checkpoint path (v2, v3)
    .trigger(processingTime="60 seconds")
    .toTable("catalog.silver.orders_stream")
)

# Monitor query progress
import time
for _ in range(5):
    progress = query.lastProgress
    if progress:
        print(f"Batch: {progress['batchId']} | "
              f"Input rows: {progress['numInputRows']} | "
              f"Processing rate: {progress['processedRowsPerSecond']:.0f} rows/s")
    time.sleep(30)

PySpark Performance on Databricks

Photon Engine

Photon is Databricks' native vectorized query engine, written in C++. It replaces the JVM-based Spark SQL engine for supported operations, giving 2–8x speedup on SQL workloads at no code change.

Python
# Photon is enabled at the cluster level  no code change needed
# Enable via: Cluster Configuration  Runtime  "Use Photon Acceleration"
# Or in cluster JSON:
# "spark_conf": {"spark.databricks.photon.enabled": "true"}

# Photon accelerates: aggregations, joins, sort, filter, columnar scan
# Photon does NOT accelerate: Python UDFs, RDD operations, map/flatMap
# Best practice: eliminate Python UDFs  replace with SQL functions where possible

# Instead of a Python UDF:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# BAD  bypasses Photon
@udf(StringType())
def clean_email_udf(email: str) -> str:
    return email.lower().strip() if email else None

# GOOD  Photon-accelerated built-in functions
from pyspark.sql.functions import lower, trim
df = df.withColumn("email", lower(trim(col("email"))))

Liquid Clustering vs Z-ORDER

Liquid Clustering is the newer alternative to Z-ORDER + partition-based layouts. It uses space-filling curves on the cluster columns and is designed for tables that change query patterns over time.

SQL
-- Enable liquid clustering on a new table
CREATE TABLE catalog.silver.events
CLUSTER BY (customer_id, event_date)
USING DELTA;

-- Enable on an existing table (migrates incrementally on next OPTIMIZE)
ALTER TABLE catalog.silver.events
CLUSTER BY (customer_id, event_date);

-- Optimize runs the same  liquid clustering is applied automatically
OPTIMIZE catalog.silver.events;

| | Z-ORDER | Liquid Clustering | |---|---|---| | Setup | OPTIMIZE … ZORDER BY | CLUSTER BY on table DDL | | Partition required | Usually yes (partition + Z-ORDER) | No partitions needed | | Reoptimization | Full OPTIMIZE run | Incremental on each OPTIMIZE | | Column changes | Re-run full OPTIMIZE | ALTER TABLE … CLUSTER BY | | Best for | Stable access patterns | Evolving access patterns |


Cluster Configuration

Single Node vs Multi-Node

Single Node cluster:
  Driver = Worker (same VM)
  No network shuffle between nodes
  Best for: ML training on pandas/sklearn, small ETL, notebook exploration

Multi-Node cluster:
  Driver VM + N Worker VMs
  Data distributed across workers via Shuffle
  Best for: large-scale PySpark jobs, DLT pipelines, SQL warehouses

Autoscaling

JSON
{
  "autoscale": {
    "min_workers": 2,
    "max_workers": 20
  },
  "spark_conf": {
    "spark.databricks.adaptive.autoBroadcastJoinThreshold": "50MB",
    "spark.sql.shuffle.partitions": "auto"
  }
}

Autoscaling scales up when task queues are long and scales down after idle timeout. For streaming jobs, disable autoscaling — it can cause lag spikes. For batch ETL, autoscaling is strongly recommended.

Spot Instances (Cost Reduction)

JSON
{
  "aws_attributes": {
    "availability": "SPOT_WITH_FALLBACK",
    "spot_bid_price_percent": 100,
    "first_on_demand": 1
  }
}

SPOT_WITH_FALLBACK: uses spot for workers, on-demand for the driver. If spot capacity is unavailable, it falls back to on-demand. This is the standard production configuration — it cuts compute costs by 60–80% without reliability risk.

Job Clusters vs Interactive Clusters

| | Interactive Cluster | Job Cluster | |---|---|---| | Lifecycle | Manually started/stopped | Created at job start, terminated at job end | | Cost | Runs idle between jobs | Pay only for job runtime | | Warm-up time | None (already running) | 3–6 minutes to provision | | Use for | Notebook development, ad hoc queries | Scheduled production jobs | | Sharing | Multiple users, multiple notebooks | Single job (no sharing) |

Python
# Best practice for scheduled jobs:
# 1. Develop on an interactive cluster
# 2. In the Job definition, specify a "new_cluster" config (job cluster)
# 3. For time-sensitive jobs where 5 minutes startup matters, use a
#    "existing_cluster_id" to reuse a warm interactive cluster

Complete DLT Pipeline: Auto Loader → Bronze → Silver

This is a single DLT pipeline file that wires together Auto Loader ingestion with Bronze and Silver layers, full data quality expectations, and a Kafka stream merged into the same Silver table.

Python
import dlt
from pyspark.sql.functions import (
    col, current_timestamp, to_date, lower, trim,
    sha2, concat_ws, from_json, coalesce, lit
)
from pyspark.sql.types import (
    StructType, StructField, StringType, DoubleType, TimestampType
)

# ── Shared schema ─────────────────────────────────────────────────────────────

ORDER_SCHEMA = StructType([
    StructField("order_id",    StringType(),  False),
    StructField("customer_id", StringType(),  False),
    StructField("total",       DoubleType(),  True),
    StructField("status",      StringType(),  True),
    StructField("email",       StringType(),  True),
    StructField("created_at",  TimestampType(), True),
])

# ── Bronze: Auto Loader from ADLS ─────────────────────────────────────────────

@dlt.table(
    name="bronze_orders_files",
    comment="Raw order JSON files ingested incrementally via Auto Loader",
    table_properties={"delta.autoOptimize.optimizeWrite": "true"}
)
def bronze_orders_files():
    return (
        spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format",         "json")
        .option("cloudFiles.schemaLocation",
                "abfss://checkpoints@lake.dfs.core.windows.net/dlt/schema/orders_files")
        .option("cloudFiles.schemaEvolutionMode", "addNewColumns")
        .option("cloudFiles.inferColumnTypes",    "true")
        .schema(ORDER_SCHEMA)
        .load("abfss://landing@lake.dfs.core.windows.net/orders/")
        .withColumn("_ingested_at", current_timestamp())
        .withColumn("_source",      lit("files"))
    )

# ── Bronze: Kafka stream ───────────────────────────────────────────────────────

@dlt.table(
    name="bronze_orders_kafka",
    comment="Real-time order events consumed from Kafka"
)
def bronze_orders_kafka():
    return (
        spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers",
                dbutils.secrets.get("kafka-secrets", "bootstrap-servers"))
        .option("kafka.security.protocol", "SASL_SSL")
        .option("kafka.sasl.mechanism",    "PLAIN")
        .option("kafka.sasl.jaas.config",
                f'org.apache.kafka.common.security.plain.PlainLoginModule required '
                f'username="{dbutils.secrets.get("kafka-secrets", "key")}" '
                f'password="{dbutils.secrets.get("kafka-secrets", "secret")}";')
        .option("subscribe",       "order-events")
        .option("startingOffsets", "latest")
        .option("failOnDataLoss",  "false")
        .load()
        .select(
            from_json(col("value").cast("string"), ORDER_SCHEMA).alias("d"),
            col("timestamp").alias("_kafka_ts")
        )
        .select("d.*", "_kafka_ts")
        .withColumn("_ingested_at", current_timestamp())
        .withColumn("_source",      lit("kafka"))
    )

# ── Silver: Union both sources, validate, clean ───────────────────────────────

@dlt.table(
    name="silver_orders",
    comment="Validated and cleaned orders from all sources — Silver layer",
    table_properties={
        "delta.autoOptimize.optimizeWrite": "true",
        "delta.dataSkippingNumIndexedCols": "5",
    }
)
@dlt.expect("order_id not null",    "order_id IS NOT NULL")
@dlt.expect("customer_id not null", "customer_id IS NOT NULL")
@dlt.expect_or_drop("positive total",   "total > 0")
@dlt.expect_or_drop("valid status",
    "status IN ('pending','processing','shipped','completed','cancelled')")
def silver_orders():
    # Union the two Bronze sources
    files_stream = dlt.read_stream("bronze_orders_files")
    kafka_stream = dlt.read_stream("bronze_orders_kafka")

    combined = files_stream.unionByName(kafka_stream, allowMissingColumns=True)

    return (
        combined
        .dropDuplicates(["order_id"])
        .withColumn("email",      lower(trim(col("email"))))
        .withColumn("order_date", to_date(col("created_at")))
        .withColumn("row_hash",
            sha2(concat_ws("|", col("order_id"), col("total"), col("status")), 256))
        .withColumn("_cleaned_at", current_timestamp())
    )

Deploying the DLT Pipeline

Python
# DLT pipelines are deployed via Databricks UI or the REST API / Terraform
# Terraform example:
#
# resource "databricks_pipeline" "orders_dlt" {
#   name = "orders-medallion-pipeline"
#   storage = "abfss://dlt@lake.dfs.core.windows.net/pipelines/orders"
#   cluster {
#     autoscale { min_workers = 1; max_workers = 5 }
#     node_type_id = "Standard_DS3_v2"
#   }
#   library { notebook { path = "/Pipelines/orders_dlt_pipeline" } }
#   continuous = false   # triggered mode: run on schedule
#   channel = "CURRENT"
# }

Key Takeaways

  • dbutils.secrets is non-negotiable for credentials — never hardcode connection strings in notebooks.
  • dbutils.widgets turns notebooks into parameterized functions; use them for every notebook that runs in production.
  • DLT @dlt.expect_or_drop is the right default for data quality — warn in metrics, don't silently keep bad data.
  • Auto Loader with availableNow=True trigger gives you cost-efficient scheduled batch ingestion with exactly-once guarantees.
  • Photon eliminates Python UDFs — replace them with built-in SQL functions wherever possible.
  • Liquid Clustering beats Z-ORDER for tables where query patterns evolve; use Z-ORDER for stable, well-understood access patterns.
  • Use job clusters for production scheduled pipelines; interactive clusters are for development only.

Related: Delta Lake Deep Dive — ACID transactions, MERGE, time travel
Related: Databricks MLflow and Unity Catalog — ML workflows, governance, model serving

Enjoyed this article?

Explore the Data Engineering learning path for more.

Found this helpful?

Share:𝕏

Leave a comment

Have a question, correction, or just found this helpful? Leave a note below.