Advanced PySpark on Databricks: Delta Live Tables, Auto Loader & Streaming
Production PySpark patterns on Databricks — dbutils, parameterized notebooks, Delta Live Tables with data quality expectations, Auto Loader for incremental ingestion, Kafka streaming, cluster tuning, and Photon.
Databricks Is Not Plain Spark
A lot of PySpark tutorials teach you how to write SparkSession.builder.getOrCreate() and submit jobs to a cluster. Databricks wraps Spark in a platform that adds utilities, managed infrastructure, and declarative pipeline frameworks that change how you build pipelines.
This guide covers the Databricks-specific patterns that don't exist in vanilla Spark: dbutils, parameterized notebooks, Delta Live Tables (DLT), and Auto Loader — plus the cluster configuration decisions that have the biggest impact on cost and performance.
dbutils: The Databricks Utility Belt
dbutils is a Databricks-only Python module pre-injected into every notebook. It gives you access to the file system, secrets, and notebook orchestration without writing boilerplate SDK code.
dbutils.fs: File System Operations
# List files in cloud storage (ADLS Gen2 / S3 / GCS)
files = dbutils.fs.ls("abfss://bronze@datalake.dfs.core.windows.net/raw_events/")
for f in files:
print(f.name, f.size, f.modificationTime)
# Move a file (staging → processed)
dbutils.fs.mv(
"abfss://landing@datalake.dfs.core.windows.net/orders/file.json",
"abfss://processed@datalake.dfs.core.windows.net/orders/file.json"
)
# Copy a directory recursively
dbutils.fs.cp(
"abfss://bronze@datalake.dfs.core.windows.net/snapshot/",
"abfss://backup@datalake.dfs.core.windows.net/snapshot_2026_05_07/",
recurse=True
)
# Delete old files (recurse=True for directories)
dbutils.fs.rm("abfss://landing@datalake.dfs.core.windows.net/orders/old_batch/", recurse=True)
# Mount cloud storage (classic workspaces without Unity Catalog)
dbutils.fs.mount(
source="wasbs://container@storageaccount.blob.core.windows.net",
mount_point="/mnt/raw",
extra_configs={
"fs.azure.account.key.storageaccount.blob.core.windows.net":
dbutils.secrets.get(scope="azure-kv", key="storage-account-key")
}
)dbutils.secrets: Credential Management
Never hardcode passwords, connection strings, or API keys in notebooks. Use Databricks secret scopes backed by Azure Key Vault or Databricks-managed secrets.
# Read a secret — the value is never printed in notebook output
jdbc_password = dbutils.secrets.get(scope="prod-secrets", key="sql-db-password")
kafka_api_key = dbutils.secrets.get(scope="prod-secrets", key="confluent-api-key")
openai_api_key = dbutils.secrets.get(scope="prod-secrets", key="openai-api-key")
# List all keys in a scope (values are hidden)
dbutils.secrets.list("prod-secrets")
# Returns: [SecretMetadata(key='sql-db-password'), SecretMetadata(key='confluent-api-key'), ...]
# Use in JDBC connection
df = (
spark.read
.format("jdbc")
.option("url", "jdbc:sqlserver://sqlserver.database.windows.net:1433;database=OrdersDB")
.option("dbtable", "dbo.Orders")
.option("user", "etl_user")
.option("password", jdbc_password)
.option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
.load()
)dbutils.widgets: Parameterized Notebooks
Widgets turn a notebook into a reusable function you can call from a Job, a parent notebook, or the Databricks REST API — passing different parameters each time.
# Declare widgets at the top of the notebook
dbutils.widgets.text(
name="start_date",
defaultValue="2026-05-01",
label="Start Date (yyyy-MM-dd)"
)
dbutils.widgets.text(
name="end_date",
defaultValue="2026-05-07",
label="End Date (yyyy-MM-dd)"
)
dbutils.widgets.dropdown(
name="environment",
defaultValue="dev",
choices=["dev", "staging", "prod"],
label="Target Environment"
)
dbutils.widgets.combobox(
name="table_suffix",
defaultValue="orders",
choices=["orders", "events", "customers"],
label="Table Name Suffix"
)
# Read widget values anywhere in the notebook
start_date = dbutils.widgets.get("start_date")
end_date = dbutils.widgets.get("end_date")
environment = dbutils.widgets.get("environment")
catalog = "prod" if environment == "prod" else "dev"
table_name = f"{catalog}.silver.{dbutils.widgets.get('table_suffix')}"
print(f"Processing {table_name} from {start_date} to {end_date}")
# Filter data using the widget parameters
df = spark.sql(f"""
SELECT * FROM {table_name}
WHERE created_at BETWEEN '{start_date}' AND '{end_date}'
""")dbutils.notebook: Orchestrating Notebooks
# Run a child notebook and pass parameters to it
result = dbutils.notebook.run(
path="/Pipelines/ingest_bronze",
timeout_seconds=3600,
arguments={
"start_date": start_date,
"end_date": end_date,
"environment": environment,
}
)
print(f"Child notebook returned: {result}")
# Child notebook signals completion by calling:
dbutils.notebook.exit("SUCCESS: 12,450 rows ingested")
# For error propagation:
try:
dbutils.notebook.run("/Pipelines/risky_job", timeout_seconds=600, arguments={})
except Exception as e:
print(f"Child notebook failed: {e}")
raise # re-raise to fail the parent jobNotebook Workflows vs Databricks Jobs
| | Notebook Workflows (dbutils.notebook.run) | Databricks Jobs | |---|---|---| | Orchestration | Python code inside a notebook | JSON/YAML job definition via UI or Terraform | | Parallelism | Sequential or manual threading | Native parallel task graph | | Scheduling | None (triggered by parent) | Cron, file arrival trigger, continuous | | Monitoring | Notebook output | Job run history, metrics, alerts | | Cluster reuse | Shares caller's cluster | Job clusters or existing interactive clusters | | Best for | Ad-hoc ETL chains, simple fan-out | Production scheduled pipelines |
For production, prefer Databricks Jobs with a task graph over chaining notebooks with dbutils.notebook.run. Jobs give you a visual DAG, retry policies, email/Slack alerts, and detailed metrics.
Delta Live Tables (DLT): Declarative Pipelines
DLT is Databricks' managed pipeline framework. Instead of writing code that orchestrates reads, transforms, and writes, you declare what each table is — DLT figures out execution order, manages checkpoints, and monitors data quality automatically.
The @dlt.table Decorator
import dlt
from pyspark.sql.functions import col, current_timestamp, to_date, sha2, concat_ws
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
# ─── BRONZE LAYER ────────────────────────────────────────────────────────────
@dlt.table(
name="raw_orders",
comment="Raw order events ingested from cloud storage via Auto Loader",
table_properties={
"delta.autoOptimize.optimizeWrite": "true",
"delta.autoOptimize.autoCompact": "true",
}
)
def raw_orders():
"""
Auto Loader: incrementally ingest new JSON files from ADLS Gen2.
DLT manages the checkpoint automatically.
"""
return (
spark.readStream
.format("cloudFiles") # Auto Loader format
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation",
"abfss://checkpoints@lake.dfs.core.windows.net/dlt/raw_orders/schema")
.option("cloudFiles.inferColumnTypes", "true")
.load("abfss://landing@lake.dfs.core.windows.net/orders/")
.withColumn("_ingested_at", current_timestamp())
.withColumn("_source_file", col("_metadata.file_path"))
)@dlt.expect: Data Quality Expectations
Expectations are named assertions on a table. They let you monitor data quality without stopping your pipeline — you choose the action (warn, drop, or fail).
# ─── SILVER LAYER ────────────────────────────────────────────────────────────
@dlt.table(
name="clean_orders",
comment="Validated and deduplicated orders — Silver layer"
)
@dlt.expect("order_id is not null", "order_id IS NOT NULL")
@dlt.expect("customer_id is not null", "customer_id IS NOT NULL")
@dlt.expect("valid status",
"status IN ('pending', 'processing', 'shipped', 'completed', 'cancelled')")
@dlt.expect_or_drop("total is positive", "total > 0") # drop rows that fail
@dlt.expect_or_fail("no duplicate order_id", # fail the pipeline if violated
"COUNT(order_id) OVER (PARTITION BY order_id) = 1")
def clean_orders():
"""
Clean raw orders: type-cast, validate, deduplicate.
Expectations are evaluated and metrics surfaced in the DLT pipeline UI.
"""
return (
dlt.read_stream("raw_orders") # reference upstream DLT table
.dropDuplicates(["order_id"])
.withColumn("total", col("total").cast(DoubleType()))
.withColumn("created_at", col("created_at").cast(TimestampType()))
.withColumn("order_date", to_date(col("created_at")))
.withColumn("row_hash",
sha2(concat_ws("|", col("order_id"), col("total"), col("status")), 256))
)
# Expectation action reference:
# @dlt.expect(name, condition) → warn in metrics, keep bad rows
# @dlt.expect_or_drop(name, condition) → silently drop rows that fail
# @dlt.expect_or_fail(name, condition) → halt the pipeline on violationMaterialized Views in DLT
For batch (non-streaming) aggregations, use @dlt.table with dlt.read() instead of dlt.read_stream():
# ─── GOLD LAYER ──────────────────────────────────────────────────────────────
@dlt.table(
name="monthly_revenue",
comment="Monthly revenue summary — Gold layer, refreshed on each pipeline run"
)
def monthly_revenue():
return (
dlt.read("clean_orders") # batch read, not streaming
.filter(col("status") == "completed")
.groupBy(
col("order_date").substr(1, 7).alias("month") # "2026-05"
)
.agg(
{"total": "sum", "order_id": "count"}
)
.withColumnRenamed("sum(total)", "revenue")
.withColumnRenamed("count(order_id)", "order_count")
)Auto Loader: Incremental File Ingestion
Auto Loader (cloudFiles format) is Databricks' solution for scalable incremental ingestion from cloud object storage. It tracks which files have been processed using a checkpoint and a file notification service (Azure Event Grid or S3 Event Notifications), so each file is processed exactly once — even when millions of files accumulate.
Auto Loader Outside DLT (Structured Streaming)
# Schema hint: tell Auto Loader the expected schema (avoids full schema inference on every restart)
order_schema = StructType([
StructField("order_id", StringType(), nullable=False),
StructField("customer_id", StringType(), nullable=False),
StructField("total", DoubleType(), nullable=True),
StructField("status", StringType(), nullable=True),
StructField("created_at", TimestampType(), nullable=True),
])
stream_df = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
# Schema location: Auto Loader stores inferred schema here for schema evolution tracking
.option("cloudFiles.schemaLocation",
"abfss://checkpoints@lake.dfs.core.windows.net/autoloader/orders/schema")
# File notification mode: Azure Event Grid / SQS (faster than directory listing)
.option("cloudFiles.useNotifications", "true")
.option("cloudFiles.subscriptionId", dbutils.secrets.get("az-secrets", "subscription-id"))
.option("cloudFiles.tenantId", dbutils.secrets.get("az-secrets", "tenant-id"))
.option("cloudFiles.clientId", dbutils.secrets.get("az-secrets", "sp-client-id"))
.option("cloudFiles.clientSecret", dbutils.secrets.get("az-secrets", "sp-client-secret"))
# If schema evolves (new columns added), merge rather than fail
.option("cloudFiles.schemaEvolutionMode", "addNewColumns")
.schema(order_schema)
.load("abfss://landing@lake.dfs.core.windows.net/orders/")
)
# Write to Bronze Delta table — checkpoint ensures exactly-once processing
query = (
stream_df
.withColumn("_ingested_at", current_timestamp())
.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation",
"abfss://checkpoints@lake.dfs.core.windows.net/autoloader/orders/checkpoint")
.trigger(availableNow=True) # process all backlog then stop (batch-like behavior)
.toTable("catalog.bronze.raw_orders")
)
query.awaitTermination()Auto Loader Trigger Modes
# Process all available files then stop (scheduled batch pattern)
.trigger(availableNow=True)
# Run continuously, process new files every 30 seconds
.trigger(processingTime="30 seconds")
# Run once and stop (deprecated — use availableNow instead)
.trigger(once=True)Reading Kafka Streams into DLT
import dlt
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
ORDER_EVENT_SCHEMA = StructType([
StructField("order_id", StringType()),
StructField("customer_id", StringType()),
StructField("total", DoubleType()),
StructField("event_type", StringType()),
StructField("event_ts", TimestampType()),
])
@dlt.table(
name="raw_order_events_kafka",
comment="Raw Kafka events from the orders topic — Bronze layer"
)
def raw_order_events_kafka():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers",
dbutils.secrets.get("kafka-secrets", "bootstrap-servers"))
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.sasl.jaas.config",
f'org.apache.kafka.common.security.plain.PlainLoginModule required '
f'username="{dbutils.secrets.get("kafka-secrets", "api-key")}" '
f'password="{dbutils.secrets.get("kafka-secrets", "api-secret")}";')
.option("subscribe", "order-events")
.option("startingOffsets", "latest")
.option("failOnDataLoss", "false")
.load()
# Kafka value is binary — cast to string and parse JSON
.select(
col("offset"),
col("partition"),
col("timestamp").alias("kafka_ts"),
from_json(col("value").cast("string"), ORDER_EVENT_SCHEMA).alias("data")
)
.select("offset", "partition", "kafka_ts", "data.*")
)
@dlt.table(name="clean_order_events")
@dlt.expect_or_drop("valid order_id", "order_id IS NOT NULL")
@dlt.expect_or_drop("valid total", "total > 0")
def clean_order_events():
return dlt.read_stream("raw_order_events_kafka")Structured Streaming Checkpoints
Checkpoints are how Spark Structured Streaming achieves fault-tolerance and exactly-once processing. The checkpoint directory stores:
- Offsets: which source positions have been read
- Commits: which micro-batches have been committed to the sink
- State: aggregation state for stateful operations (windows, deduplication)
# Checkpoint best practices
query = (
clean_df.writeStream
.format("delta")
.outputMode("append")
# Store checkpoint on reliable cloud storage — NOT local disk
.option("checkpointLocation",
"abfss://checkpoints@lake.dfs.core.windows.net/streams/orders/v1")
# Never change the checkpoint path after production deployment
# If you need to change the query, create a new checkpoint path (v2, v3)
.trigger(processingTime="60 seconds")
.toTable("catalog.silver.orders_stream")
)
# Monitor query progress
import time
for _ in range(5):
progress = query.lastProgress
if progress:
print(f"Batch: {progress['batchId']} | "
f"Input rows: {progress['numInputRows']} | "
f"Processing rate: {progress['processedRowsPerSecond']:.0f} rows/s")
time.sleep(30)PySpark Performance on Databricks
Photon Engine
Photon is Databricks' native vectorized query engine, written in C++. It replaces the JVM-based Spark SQL engine for supported operations, giving 2–8x speedup on SQL workloads at no code change.
# Photon is enabled at the cluster level — no code change needed
# Enable via: Cluster Configuration → Runtime → "Use Photon Acceleration"
# Or in cluster JSON:
# "spark_conf": {"spark.databricks.photon.enabled": "true"}
# Photon accelerates: aggregations, joins, sort, filter, columnar scan
# Photon does NOT accelerate: Python UDFs, RDD operations, map/flatMap
# Best practice: eliminate Python UDFs — replace with SQL functions where possible
# Instead of a Python UDF:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
# BAD — bypasses Photon
@udf(StringType())
def clean_email_udf(email: str) -> str:
return email.lower().strip() if email else None
# GOOD — Photon-accelerated built-in functions
from pyspark.sql.functions import lower, trim
df = df.withColumn("email", lower(trim(col("email"))))Liquid Clustering vs Z-ORDER
Liquid Clustering is the newer alternative to Z-ORDER + partition-based layouts. It uses space-filling curves on the cluster columns and is designed for tables that change query patterns over time.
-- Enable liquid clustering on a new table
CREATE TABLE catalog.silver.events
CLUSTER BY (customer_id, event_date)
USING DELTA;
-- Enable on an existing table (migrates incrementally on next OPTIMIZE)
ALTER TABLE catalog.silver.events
CLUSTER BY (customer_id, event_date);
-- Optimize runs the same — liquid clustering is applied automatically
OPTIMIZE catalog.silver.events;| | Z-ORDER | Liquid Clustering |
|---|---|---|
| Setup | OPTIMIZE … ZORDER BY | CLUSTER BY on table DDL |
| Partition required | Usually yes (partition + Z-ORDER) | No partitions needed |
| Reoptimization | Full OPTIMIZE run | Incremental on each OPTIMIZE |
| Column changes | Re-run full OPTIMIZE | ALTER TABLE … CLUSTER BY |
| Best for | Stable access patterns | Evolving access patterns |
Cluster Configuration
Single Node vs Multi-Node
Single Node cluster:
Driver = Worker (same VM)
No network shuffle between nodes
Best for: ML training on pandas/sklearn, small ETL, notebook exploration
Multi-Node cluster:
Driver VM + N Worker VMs
Data distributed across workers via Shuffle
Best for: large-scale PySpark jobs, DLT pipelines, SQL warehousesAutoscaling
{
"autoscale": {
"min_workers": 2,
"max_workers": 20
},
"spark_conf": {
"spark.databricks.adaptive.autoBroadcastJoinThreshold": "50MB",
"spark.sql.shuffle.partitions": "auto"
}
}Autoscaling scales up when task queues are long and scales down after idle timeout. For streaming jobs, disable autoscaling — it can cause lag spikes. For batch ETL, autoscaling is strongly recommended.
Spot Instances (Cost Reduction)
{
"aws_attributes": {
"availability": "SPOT_WITH_FALLBACK",
"spot_bid_price_percent": 100,
"first_on_demand": 1
}
}SPOT_WITH_FALLBACK: uses spot for workers, on-demand for the driver. If spot capacity is unavailable, it falls back to on-demand. This is the standard production configuration — it cuts compute costs by 60–80% without reliability risk.
Job Clusters vs Interactive Clusters
| | Interactive Cluster | Job Cluster | |---|---|---| | Lifecycle | Manually started/stopped | Created at job start, terminated at job end | | Cost | Runs idle between jobs | Pay only for job runtime | | Warm-up time | None (already running) | 3–6 minutes to provision | | Use for | Notebook development, ad hoc queries | Scheduled production jobs | | Sharing | Multiple users, multiple notebooks | Single job (no sharing) |
# Best practice for scheduled jobs:
# 1. Develop on an interactive cluster
# 2. In the Job definition, specify a "new_cluster" config (job cluster)
# 3. For time-sensitive jobs where 5 minutes startup matters, use a
# "existing_cluster_id" to reuse a warm interactive clusterComplete DLT Pipeline: Auto Loader → Bronze → Silver
This is a single DLT pipeline file that wires together Auto Loader ingestion with Bronze and Silver layers, full data quality expectations, and a Kafka stream merged into the same Silver table.
import dlt
from pyspark.sql.functions import (
col, current_timestamp, to_date, lower, trim,
sha2, concat_ws, from_json, coalesce, lit
)
from pyspark.sql.types import (
StructType, StructField, StringType, DoubleType, TimestampType
)
# ── Shared schema ─────────────────────────────────────────────────────────────
ORDER_SCHEMA = StructType([
StructField("order_id", StringType(), False),
StructField("customer_id", StringType(), False),
StructField("total", DoubleType(), True),
StructField("status", StringType(), True),
StructField("email", StringType(), True),
StructField("created_at", TimestampType(), True),
])
# ── Bronze: Auto Loader from ADLS ─────────────────────────────────────────────
@dlt.table(
name="bronze_orders_files",
comment="Raw order JSON files ingested incrementally via Auto Loader",
table_properties={"delta.autoOptimize.optimizeWrite": "true"}
)
def bronze_orders_files():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation",
"abfss://checkpoints@lake.dfs.core.windows.net/dlt/schema/orders_files")
.option("cloudFiles.schemaEvolutionMode", "addNewColumns")
.option("cloudFiles.inferColumnTypes", "true")
.schema(ORDER_SCHEMA)
.load("abfss://landing@lake.dfs.core.windows.net/orders/")
.withColumn("_ingested_at", current_timestamp())
.withColumn("_source", lit("files"))
)
# ── Bronze: Kafka stream ───────────────────────────────────────────────────────
@dlt.table(
name="bronze_orders_kafka",
comment="Real-time order events consumed from Kafka"
)
def bronze_orders_kafka():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers",
dbutils.secrets.get("kafka-secrets", "bootstrap-servers"))
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.sasl.jaas.config",
f'org.apache.kafka.common.security.plain.PlainLoginModule required '
f'username="{dbutils.secrets.get("kafka-secrets", "key")}" '
f'password="{dbutils.secrets.get("kafka-secrets", "secret")}";')
.option("subscribe", "order-events")
.option("startingOffsets", "latest")
.option("failOnDataLoss", "false")
.load()
.select(
from_json(col("value").cast("string"), ORDER_SCHEMA).alias("d"),
col("timestamp").alias("_kafka_ts")
)
.select("d.*", "_kafka_ts")
.withColumn("_ingested_at", current_timestamp())
.withColumn("_source", lit("kafka"))
)
# ── Silver: Union both sources, validate, clean ───────────────────────────────
@dlt.table(
name="silver_orders",
comment="Validated and cleaned orders from all sources — Silver layer",
table_properties={
"delta.autoOptimize.optimizeWrite": "true",
"delta.dataSkippingNumIndexedCols": "5",
}
)
@dlt.expect("order_id not null", "order_id IS NOT NULL")
@dlt.expect("customer_id not null", "customer_id IS NOT NULL")
@dlt.expect_or_drop("positive total", "total > 0")
@dlt.expect_or_drop("valid status",
"status IN ('pending','processing','shipped','completed','cancelled')")
def silver_orders():
# Union the two Bronze sources
files_stream = dlt.read_stream("bronze_orders_files")
kafka_stream = dlt.read_stream("bronze_orders_kafka")
combined = files_stream.unionByName(kafka_stream, allowMissingColumns=True)
return (
combined
.dropDuplicates(["order_id"])
.withColumn("email", lower(trim(col("email"))))
.withColumn("order_date", to_date(col("created_at")))
.withColumn("row_hash",
sha2(concat_ws("|", col("order_id"), col("total"), col("status")), 256))
.withColumn("_cleaned_at", current_timestamp())
)Deploying the DLT Pipeline
# DLT pipelines are deployed via Databricks UI or the REST API / Terraform
# Terraform example:
#
# resource "databricks_pipeline" "orders_dlt" {
# name = "orders-medallion-pipeline"
# storage = "abfss://dlt@lake.dfs.core.windows.net/pipelines/orders"
# cluster {
# autoscale { min_workers = 1; max_workers = 5 }
# node_type_id = "Standard_DS3_v2"
# }
# library { notebook { path = "/Pipelines/orders_dlt_pipeline" } }
# continuous = false # triggered mode: run on schedule
# channel = "CURRENT"
# }Key Takeaways
dbutils.secretsis non-negotiable for credentials — never hardcode connection strings in notebooks.dbutils.widgetsturns notebooks into parameterized functions; use them for every notebook that runs in production.- DLT
@dlt.expect_or_dropis the right default for data quality — warn in metrics, don't silently keep bad data. - Auto Loader with
availableNow=Truetrigger gives you cost-efficient scheduled batch ingestion with exactly-once guarantees. - Photon eliminates Python UDFs — replace them with built-in SQL functions wherever possible.
- Liquid Clustering beats Z-ORDER for tables where query patterns evolve; use Z-ORDER for stable, well-understood access patterns.
- Use job clusters for production scheduled pipelines; interactive clusters are for development only.
Related: Delta Lake Deep Dive — ACID transactions, MERGE, time travel
Related: Databricks MLflow and Unity Catalog — ML workflows, governance, model serving
Enjoyed this article?
Explore the Data Engineering learning path for more.
Found this helpful?
Leave a comment
Have a question, correction, or just found this helpful? Leave a note below.