Back to blog
Backend Systemsintermediate

Python Automation & Tooling: subprocess, Pipelines, and Framework Patterns

Build real automation scripts with subprocess, schedule tasks, run shell pipelines, create internal tooling frameworks, and apply the advanced Python patterns used in professional engineering roles.

LearnixoMay 7, 20268 min read
Pythonautomationsubprocesspipelinestoolingscriptingframeworks
Share:𝕏

Python as a Tooling Language

In many engineering roles, Python isn't just for web APIs or data science — it's the glue language. It orchestrates other tools, runs shell commands, automates deployments, builds internal CLIs, and ties together systems that don't speak to each other. This lesson is about writing Python that runs other things.


1. subprocess — Running Shell Commands

subprocess is the standard library module for running external processes.

subprocess.run — simple, synchronous

Python
import subprocess

# run a command, wait for it to finish
result = subprocess.run(
    ["git", "status"],
    capture_output=True,
    text=True,
    check=True,          # raises CalledProcessError if exit code != 0
)

print(result.stdout)     # normal output
print(result.stderr)     # error output
print(result.returncode) # 0 = success

Handling failure

Python
try:
    result = subprocess.run(
        ["pytest", "tests/"],
        capture_output=True,
        text=True,
        check=True,
    )
    print("Tests passed")
    print(result.stdout)
except subprocess.CalledProcessError as e:
    print(f"Tests failed (exit {e.returncode})")
    print(e.stdout)
    print(e.stderr)

Passing input to a process

Python
result = subprocess.run(
    ["python", "-c", "import sys; print(sys.stdin.read().upper())"],
    input="hello world",
    capture_output=True,
    text=True,
)
print(result.stdout)   # HELLO WORLD

Running shell commands with pipes

Python
# shell=True lets you use pipes and shell features
# CAUTION: never use shell=True with user-supplied input  command injection risk
result = subprocess.run(
    "git log --oneline | head -5",
    shell=True,
    capture_output=True,
    text=True,
)

Safe pipeline without shell=True

Python
p1 = subprocess.Popen(["git", "log", "--oneline"], stdout=subprocess.PIPE)
p2 = subprocess.Popen(["head", "-5"], stdin=p1.stdout, stdout=subprocess.PIPE)
p1.stdout.close()
output, _ = p2.communicate()
print(output.decode())

2. Environment Variables in subprocesses

Python
import os
import subprocess

# inherit current env + add/override specific vars
env = os.environ.copy()
env["MY_VAR"] = "custom_value"
env["PYTHONDONTWRITEBYTECODE"] = "1"

result = subprocess.run(["python", "script.py"], env=env, capture_output=True, text=True)

3. Running Long-Running Processes with Popen

Python
import subprocess
import sys

def stream_output(command: list[str]) -> int:
    """Run a command, stream output to stdout in real time."""
    with subprocess.Popen(
        command,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
        text=True,
        bufsize=1,
    ) as proc:
        for line in proc.stdout:
            sys.stdout.write(line)
            sys.stdout.flush()

    return proc.returncode


# example: stream a long build
exit_code = stream_output(["npm", "run", "build"])
if exit_code != 0:
    raise SystemExit(f"Build failed with exit code {exit_code}")

4. shutil — High-Level File Operations

Python
import shutil
from pathlib import Path

# copy file
shutil.copy("src/config.py", "backup/config.py.bak")
shutil.copy2("src/file.txt", "dst/file.txt")    # copy2 preserves metadata

# copy directory tree
shutil.copytree("src/", "dst/", dirs_exist_ok=True)

# move
shutil.move("old_dir/", "new_dir/")

# delete directory tree
shutil.rmtree("build/")

# create zip archive
shutil.make_archive("archive", "zip", root_dir="dist/")

# extract archive
shutil.unpack_archive("archive.zip", extract_dir="output/")

# disk usage
total, used, free = shutil.disk_usage("/")
print(f"Free: {free // (1024**3)} GB")

# find executable
python_path = shutil.which("python3")   # None if not found

5. Watching Files for Changes

Bash
pip install watchdog
Python
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler, FileModifiedEvent
import time

class ReloadHandler(FileSystemEventHandler):
    def on_modified(self, event: FileModifiedEvent) -> None:
        if event.src_path.endswith(".py"):
            print(f"Changed: {event.src_path} — reloading...")
            # trigger your reload action

observer = Observer()
observer.schedule(ReloadHandler(), path="src/", recursive=True)
observer.start()

try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    observer.stop()

observer.join()

6. Scheduling Tasks

Simple cron-like scheduling with schedule

Bash
pip install schedule
Python
import schedule
import time

def run_daily_report() -> None:
    print("Generating daily report...")
    # ... pipeline logic ...

def sync_data() -> None:
    print("Syncing data...")

schedule.every().day.at("09:00").do(run_daily_report)
schedule.every(30).minutes.do(sync_data)
schedule.every().monday.do(lambda: print("Weekly task"))

while True:
    schedule.run_pending()
    time.sleep(30)

APScheduler for production

Bash
pip install apscheduler
Python
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger

scheduler = BlockingScheduler()

@scheduler.scheduled_job(CronTrigger.from_crontab("0 9 * * *"))
def daily_report():
    print("Running daily report")

@scheduler.scheduled_job("interval", minutes=30)
def sync():
    print("Syncing")

scheduler.start()

7. Building a CLI Framework with Typer + Rich

For internal tooling with beautiful terminal output:

Bash
pip install typer rich
Python
from pathlib import Path
import typer
from rich.console import Console
from rich.table import Table
from rich.progress import track
from typing_extensions import Annotated

app = typer.Typer(name="pipeline-tools", help="Internal data pipeline CLI")
console = Console()

@app.command()
def status() -> None:
    """Show pipeline run status."""
    table = Table(title="Recent Pipeline Runs")
    table.add_column("Run ID", style="cyan")
    table.add_column("Status", style="green")
    table.add_column("Records")
    table.add_column("Duration")

    runs = [
        ("run-001", "success", "12,450", "3.2s"),
        ("run-002", "success", "9,800", "2.8s"),
        ("run-003", "failed", "0", "0.4s"),
    ]

    for run in runs:
        status_style = "red" if run[1] == "failed" else "green"
        table.add_row(run[0], f"[{status_style}]{run[1]}[/]", run[2], run[3])

    console.print(table)


@app.command()
def run(
    source: Annotated[str, typer.Argument(help="Data source name")],
    dry_run: Annotated[bool, typer.Option("--dry-run")] = False,
) -> None:
    """Run the pipeline for a data source."""
    if dry_run:
        console.print(f"[yellow]DRY RUN:[/] Would process {source}")
        return

    with console.status(f"Processing {source}..."):
        for step in track(["load", "clean", "enrich", "export"], description="Steps"):
            import time; time.sleep(0.3)  # simulate work

    console.print(f"[green]Done![/] Processed {source}")


if __name__ == "__main__":
    app()

8. Plugin Architecture Pattern

Internal frameworks often need extensibility. The plugin pattern lets users register handlers.

Python
from __future__ import annotations
from typing import Callable, TypeVar

T = TypeVar("T")
ProcessorFn = Callable[[dict], dict]

class PipelineRegistry:
    _processors: dict[str, list[ProcessorFn]] = {}

    @classmethod
    def register(cls, stage: str) -> Callable[[ProcessorFn], ProcessorFn]:
        def decorator(fn: ProcessorFn) -> ProcessorFn:
            cls._processors.setdefault(stage, []).append(fn)
            return fn
        return decorator

    @classmethod
    def run_stage(cls, stage: str, data: dict) -> dict:
        for fn in cls._processors.get(stage, []):
            data = fn(data)
        return data


# users register their processors
@PipelineRegistry.register("transform")
def normalize_email(record: dict) -> dict:
    if "email" in record:
        record["email"] = record["email"].lower().strip()
    return record

@PipelineRegistry.register("transform")
def add_timestamp(record: dict) -> dict:
    from datetime import datetime
    record["processed_at"] = datetime.utcnow().isoformat()
    return record

# framework runs registered processors
record = {"email": "  USER@EXAMPLE.COM  ", "name": "Alice"}
result = PipelineRegistry.run_stage("transform", record)
# {"email": "user@example.com", "name": "Alice", "processed_at": "..."}

9. Configuration Management Pattern

Python
from __future__ import annotations
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
import json
import os


@dataclass
class PipelineConfig:
    source_url: str
    output_dir: Path
    batch_size: int = 100
    max_workers: int = 4
    retry_attempts: int = 3
    tags: list[str] = field(default_factory=list)
    extra: dict[str, Any] = field(default_factory=dict)

    @classmethod
    def from_file(cls, path: Path) -> "PipelineConfig":
        data = json.loads(path.read_text())
        data["output_dir"] = Path(data["output_dir"])
        return cls(**data)

    @classmethod
    def from_env(cls) -> "PipelineConfig":
        return cls(
            source_url=os.environ["PIPELINE_SOURCE_URL"],
            output_dir=Path(os.environ.get("PIPELINE_OUTPUT_DIR", "output")),
            batch_size=int(os.environ.get("PIPELINE_BATCH_SIZE", "100")),
            max_workers=int(os.environ.get("PIPELINE_MAX_WORKERS", "4")),
        )

    def to_file(self, path: Path) -> None:
        data = {
            **self.__dict__,
            "output_dir": str(self.output_dir),
        }
        path.write_text(json.dumps(data, indent=2))

10. A Complete Automation Script

This script runs a full data pipeline: fetch → validate → process → upload.

Python
#!/usr/bin/env python3
"""
run_pipeline.py — fetch, process, and upload data in one command.

Usage:
    python run_pipeline.py --source prod --dry-run
    python run_pipeline.py --source staging --output-dir /tmp/output
"""
from __future__ import annotations

import logging
import subprocess
import sys
from pathlib import Path

import typer
from typing_extensions import Annotated

app = typer.Typer()
logger = logging.getLogger("pipeline")


def setup_logging(verbose: bool) -> None:
    level = logging.DEBUG if verbose else logging.INFO
    logging.basicConfig(
        level=level,
        format="%(asctime)s | %(levelname)-8s | %(message)s",
    )


def run_step(name: str, command: list[str]) -> None:
    logger.info("Step: %s", name)
    try:
        result = subprocess.run(command, capture_output=True, text=True, check=True)
        if result.stdout:
            logger.debug(result.stdout.strip())
    except subprocess.CalledProcessError as e:
        logger.error("Step '%s' failed:\n%s", name, e.stderr)
        raise typer.Exit(code=1)


@app.command()
def run(
    source: Annotated[str, typer.Option(help="Data source env (prod/staging)")] = "staging",
    output_dir: Annotated[Path, typer.Option(help="Output directory")] = Path("output"),
    dry_run: Annotated[bool, typer.Option("--dry-run")] = False,
    verbose: Annotated[bool, typer.Option("--verbose", "-v")] = False,
) -> None:
    """Run the full data pipeline."""
    setup_logging(verbose)

    output_dir.mkdir(parents=True, exist_ok=True)
    logger.info("Pipeline starting | source=%s | output=%s", source, output_dir)

    if dry_run:
        logger.info("DRY RUN — no changes will be made")
        return

    steps = [
        ("lint", ["ruff", "check", "src/"]),
        ("test", ["pytest", "tests/", "-q"]),
    ]

    for step_name, cmd in steps:
        run_step(step_name, cmd)

    logger.info("Pipeline complete")


if __name__ == "__main__":
    app()

11. Makefile for Python Projects

A Makefile is the universal task runner — works on Linux, macOS, and WSL.

MAKEFILE
.DEFAULT_GOAL := help
PYTHON = poetry run python
PYTEST = poetry run pytest

.PHONY: help install dev test lint format typecheck run clean

help:
	@grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-20s\033[0m %s\n", $$1, $$2}'

install: ## Install production deps
	poetry install --only main

dev: ## Install all deps including dev
	poetry install

test: ## Run tests with coverage
	$(PYTEST) --cov=src --cov-report=term-missing -q

lint: ## Run ruff linter
	poetry run ruff check src tests

format: ## Format code
	poetry run black src tests
	poetry run ruff check --fix src tests

typecheck: ## Run mypy
	poetry run mypy src

run: ## Run the pipeline CLI
	$(PYTHON) -m my_project.cli

clean: ## Remove build artifacts
	rm -rf dist/ build/ .coverage htmlcov/ __pycache__ .pytest_cache .mypy_cache
Bash
make help      # list all targets
make dev       # set up dev environment
make test      # run tests
make lint      # check code quality

Exercises

Exercise 1: Write a git_info() function that uses subprocess.run to return the current branch name, last commit hash, and whether the working tree is clean.

Exercise 2: Build a CLI with Typer that has a watch command — it monitors a directory for new .csv files and automatically runs a processing function on each new file.

Exercise 3: Create a plugin-based data transformer using the PipelineRegistry pattern, with at least 3 registered transformers: strip_whitespace, parse_dates, and drop_empty_rows.


Summary

| Tool | Use Case | |------|---------| | subprocess.run | Run commands, capture output, check exit code | | subprocess.Popen | Stream long-running process output | | shutil | Copy, move, archive files and directories | | watchdog | React to filesystem changes | | schedule / APScheduler | Run tasks on a cron schedule | | Typer + Rich | Beautiful internal CLI tools | | Plugin registry | Extensible framework internals | | Config dataclass | Typed config from file or env | | Makefile | Universal task runner for teams |

You've now completed the full Python foundation. The next lesson applies everything to building production REST APIs with FastAPI.

Enjoyed this article?

Explore the Backend Systems learning path for more.

Found this helpful?

Share:𝕏

Leave a comment

Have a question, correction, or just found this helpful? Leave a note below.