Python Automation & Tooling: subprocess, Pipelines, and Framework Patterns
Build real automation scripts with subprocess, schedule tasks, run shell pipelines, create internal tooling frameworks, and apply the advanced Python patterns used in professional engineering roles.
Python as a Tooling Language
In many engineering roles, Python isn't just for web APIs or data science — it's the glue language. It orchestrates other tools, runs shell commands, automates deployments, builds internal CLIs, and ties together systems that don't speak to each other. This lesson is about writing Python that runs other things.
1. subprocess — Running Shell Commands
subprocess is the standard library module for running external processes.
subprocess.run — simple, synchronous
import subprocess
# run a command, wait for it to finish
result = subprocess.run(
["git", "status"],
capture_output=True,
text=True,
check=True, # raises CalledProcessError if exit code != 0
)
print(result.stdout) # normal output
print(result.stderr) # error output
print(result.returncode) # 0 = successHandling failure
try:
result = subprocess.run(
["pytest", "tests/"],
capture_output=True,
text=True,
check=True,
)
print("Tests passed")
print(result.stdout)
except subprocess.CalledProcessError as e:
print(f"Tests failed (exit {e.returncode})")
print(e.stdout)
print(e.stderr)Passing input to a process
result = subprocess.run(
["python", "-c", "import sys; print(sys.stdin.read().upper())"],
input="hello world",
capture_output=True,
text=True,
)
print(result.stdout) # HELLO WORLDRunning shell commands with pipes
# shell=True lets you use pipes and shell features
# CAUTION: never use shell=True with user-supplied input — command injection risk
result = subprocess.run(
"git log --oneline | head -5",
shell=True,
capture_output=True,
text=True,
)Safe pipeline without shell=True
p1 = subprocess.Popen(["git", "log", "--oneline"], stdout=subprocess.PIPE)
p2 = subprocess.Popen(["head", "-5"], stdin=p1.stdout, stdout=subprocess.PIPE)
p1.stdout.close()
output, _ = p2.communicate()
print(output.decode())2. Environment Variables in subprocesses
import os
import subprocess
# inherit current env + add/override specific vars
env = os.environ.copy()
env["MY_VAR"] = "custom_value"
env["PYTHONDONTWRITEBYTECODE"] = "1"
result = subprocess.run(["python", "script.py"], env=env, capture_output=True, text=True)3. Running Long-Running Processes with Popen
import subprocess
import sys
def stream_output(command: list[str]) -> int:
"""Run a command, stream output to stdout in real time."""
with subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
) as proc:
for line in proc.stdout:
sys.stdout.write(line)
sys.stdout.flush()
return proc.returncode
# example: stream a long build
exit_code = stream_output(["npm", "run", "build"])
if exit_code != 0:
raise SystemExit(f"Build failed with exit code {exit_code}")4. shutil — High-Level File Operations
import shutil
from pathlib import Path
# copy file
shutil.copy("src/config.py", "backup/config.py.bak")
shutil.copy2("src/file.txt", "dst/file.txt") # copy2 preserves metadata
# copy directory tree
shutil.copytree("src/", "dst/", dirs_exist_ok=True)
# move
shutil.move("old_dir/", "new_dir/")
# delete directory tree
shutil.rmtree("build/")
# create zip archive
shutil.make_archive("archive", "zip", root_dir="dist/")
# extract archive
shutil.unpack_archive("archive.zip", extract_dir="output/")
# disk usage
total, used, free = shutil.disk_usage("/")
print(f"Free: {free // (1024**3)} GB")
# find executable
python_path = shutil.which("python3") # None if not found5. Watching Files for Changes
pip install watchdogfrom watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler, FileModifiedEvent
import time
class ReloadHandler(FileSystemEventHandler):
def on_modified(self, event: FileModifiedEvent) -> None:
if event.src_path.endswith(".py"):
print(f"Changed: {event.src_path} — reloading...")
# trigger your reload action
observer = Observer()
observer.schedule(ReloadHandler(), path="src/", recursive=True)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()6. Scheduling Tasks
Simple cron-like scheduling with schedule
pip install scheduleimport schedule
import time
def run_daily_report() -> None:
print("Generating daily report...")
# ... pipeline logic ...
def sync_data() -> None:
print("Syncing data...")
schedule.every().day.at("09:00").do(run_daily_report)
schedule.every(30).minutes.do(sync_data)
schedule.every().monday.do(lambda: print("Weekly task"))
while True:
schedule.run_pending()
time.sleep(30)APScheduler for production
pip install apschedulerfrom apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
scheduler = BlockingScheduler()
@scheduler.scheduled_job(CronTrigger.from_crontab("0 9 * * *"))
def daily_report():
print("Running daily report")
@scheduler.scheduled_job("interval", minutes=30)
def sync():
print("Syncing")
scheduler.start()7. Building a CLI Framework with Typer + Rich
For internal tooling with beautiful terminal output:
pip install typer richfrom pathlib import Path
import typer
from rich.console import Console
from rich.table import Table
from rich.progress import track
from typing_extensions import Annotated
app = typer.Typer(name="pipeline-tools", help="Internal data pipeline CLI")
console = Console()
@app.command()
def status() -> None:
"""Show pipeline run status."""
table = Table(title="Recent Pipeline Runs")
table.add_column("Run ID", style="cyan")
table.add_column("Status", style="green")
table.add_column("Records")
table.add_column("Duration")
runs = [
("run-001", "success", "12,450", "3.2s"),
("run-002", "success", "9,800", "2.8s"),
("run-003", "failed", "0", "0.4s"),
]
for run in runs:
status_style = "red" if run[1] == "failed" else "green"
table.add_row(run[0], f"[{status_style}]{run[1]}[/]", run[2], run[3])
console.print(table)
@app.command()
def run(
source: Annotated[str, typer.Argument(help="Data source name")],
dry_run: Annotated[bool, typer.Option("--dry-run")] = False,
) -> None:
"""Run the pipeline for a data source."""
if dry_run:
console.print(f"[yellow]DRY RUN:[/] Would process {source}")
return
with console.status(f"Processing {source}..."):
for step in track(["load", "clean", "enrich", "export"], description="Steps"):
import time; time.sleep(0.3) # simulate work
console.print(f"[green]Done![/] Processed {source}")
if __name__ == "__main__":
app()8. Plugin Architecture Pattern
Internal frameworks often need extensibility. The plugin pattern lets users register handlers.
from __future__ import annotations
from typing import Callable, TypeVar
T = TypeVar("T")
ProcessorFn = Callable[[dict], dict]
class PipelineRegistry:
_processors: dict[str, list[ProcessorFn]] = {}
@classmethod
def register(cls, stage: str) -> Callable[[ProcessorFn], ProcessorFn]:
def decorator(fn: ProcessorFn) -> ProcessorFn:
cls._processors.setdefault(stage, []).append(fn)
return fn
return decorator
@classmethod
def run_stage(cls, stage: str, data: dict) -> dict:
for fn in cls._processors.get(stage, []):
data = fn(data)
return data
# users register their processors
@PipelineRegistry.register("transform")
def normalize_email(record: dict) -> dict:
if "email" in record:
record["email"] = record["email"].lower().strip()
return record
@PipelineRegistry.register("transform")
def add_timestamp(record: dict) -> dict:
from datetime import datetime
record["processed_at"] = datetime.utcnow().isoformat()
return record
# framework runs registered processors
record = {"email": " USER@EXAMPLE.COM ", "name": "Alice"}
result = PipelineRegistry.run_stage("transform", record)
# {"email": "user@example.com", "name": "Alice", "processed_at": "..."}9. Configuration Management Pattern
from __future__ import annotations
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
import json
import os
@dataclass
class PipelineConfig:
source_url: str
output_dir: Path
batch_size: int = 100
max_workers: int = 4
retry_attempts: int = 3
tags: list[str] = field(default_factory=list)
extra: dict[str, Any] = field(default_factory=dict)
@classmethod
def from_file(cls, path: Path) -> "PipelineConfig":
data = json.loads(path.read_text())
data["output_dir"] = Path(data["output_dir"])
return cls(**data)
@classmethod
def from_env(cls) -> "PipelineConfig":
return cls(
source_url=os.environ["PIPELINE_SOURCE_URL"],
output_dir=Path(os.environ.get("PIPELINE_OUTPUT_DIR", "output")),
batch_size=int(os.environ.get("PIPELINE_BATCH_SIZE", "100")),
max_workers=int(os.environ.get("PIPELINE_MAX_WORKERS", "4")),
)
def to_file(self, path: Path) -> None:
data = {
**self.__dict__,
"output_dir": str(self.output_dir),
}
path.write_text(json.dumps(data, indent=2))10. A Complete Automation Script
This script runs a full data pipeline: fetch → validate → process → upload.
#!/usr/bin/env python3
"""
run_pipeline.py — fetch, process, and upload data in one command.
Usage:
python run_pipeline.py --source prod --dry-run
python run_pipeline.py --source staging --output-dir /tmp/output
"""
from __future__ import annotations
import logging
import subprocess
import sys
from pathlib import Path
import typer
from typing_extensions import Annotated
app = typer.Typer()
logger = logging.getLogger("pipeline")
def setup_logging(verbose: bool) -> None:
level = logging.DEBUG if verbose else logging.INFO
logging.basicConfig(
level=level,
format="%(asctime)s | %(levelname)-8s | %(message)s",
)
def run_step(name: str, command: list[str]) -> None:
logger.info("Step: %s", name)
try:
result = subprocess.run(command, capture_output=True, text=True, check=True)
if result.stdout:
logger.debug(result.stdout.strip())
except subprocess.CalledProcessError as e:
logger.error("Step '%s' failed:\n%s", name, e.stderr)
raise typer.Exit(code=1)
@app.command()
def run(
source: Annotated[str, typer.Option(help="Data source env (prod/staging)")] = "staging",
output_dir: Annotated[Path, typer.Option(help="Output directory")] = Path("output"),
dry_run: Annotated[bool, typer.Option("--dry-run")] = False,
verbose: Annotated[bool, typer.Option("--verbose", "-v")] = False,
) -> None:
"""Run the full data pipeline."""
setup_logging(verbose)
output_dir.mkdir(parents=True, exist_ok=True)
logger.info("Pipeline starting | source=%s | output=%s", source, output_dir)
if dry_run:
logger.info("DRY RUN — no changes will be made")
return
steps = [
("lint", ["ruff", "check", "src/"]),
("test", ["pytest", "tests/", "-q"]),
]
for step_name, cmd in steps:
run_step(step_name, cmd)
logger.info("Pipeline complete")
if __name__ == "__main__":
app()11. Makefile for Python Projects
A Makefile is the universal task runner — works on Linux, macOS, and WSL.
.DEFAULT_GOAL := help
PYTHON = poetry run python
PYTEST = poetry run pytest
.PHONY: help install dev test lint format typecheck run clean
help:
@grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-20s\033[0m %s\n", $$1, $$2}'
install: ## Install production deps
poetry install --only main
dev: ## Install all deps including dev
poetry install
test: ## Run tests with coverage
$(PYTEST) --cov=src --cov-report=term-missing -q
lint: ## Run ruff linter
poetry run ruff check src tests
format: ## Format code
poetry run black src tests
poetry run ruff check --fix src tests
typecheck: ## Run mypy
poetry run mypy src
run: ## Run the pipeline CLI
$(PYTHON) -m my_project.cli
clean: ## Remove build artifacts
rm -rf dist/ build/ .coverage htmlcov/ __pycache__ .pytest_cache .mypy_cachemake help # list all targets
make dev # set up dev environment
make test # run tests
make lint # check code qualityExercises
Exercise 1: Write a git_info() function that uses subprocess.run to return the current branch name, last commit hash, and whether the working tree is clean.
Exercise 2: Build a CLI with Typer that has a watch command — it monitors a directory for new .csv files and automatically runs a processing function on each new file.
Exercise 3: Create a plugin-based data transformer using the PipelineRegistry pattern, with at least 3 registered transformers: strip_whitespace, parse_dates, and drop_empty_rows.
Summary
| Tool | Use Case |
|------|---------|
| subprocess.run | Run commands, capture output, check exit code |
| subprocess.Popen | Stream long-running process output |
| shutil | Copy, move, archive files and directories |
| watchdog | React to filesystem changes |
| schedule / APScheduler | Run tasks on a cron schedule |
| Typer + Rich | Beautiful internal CLI tools |
| Plugin registry | Extensible framework internals |
| Config dataclass | Typed config from file or env |
| Makefile | Universal task runner for teams |
You've now completed the full Python foundation. The next lesson applies everything to building production REST APIs with FastAPI.
Enjoyed this article?
Explore the Backend Systems learning path for more.
Found this helpful?
Leave a comment
Have a question, correction, or just found this helpful? Leave a note below.