Python & FastAPI · Lesson 6 of 10
Pandas for Data Pipelines: CSV to Transformed Output
Pandas Is Your Data Swiss Army Knife
You'll use Pandas in every role that touches data: ETL pipelines, report generation, data cleaning automation, feature engineering for ML, and analytics scripts. This lesson focuses on the real-world patterns — not just what methods exist, but when and why to use them.
1. Installation
pip install pandas openpyxl xlrd # openpyxl for .xlsx, xlrd for .xls2. Core Concepts
Series — a single column
import pandas as pd
prices = pd.Series([10.5, 9.99, 7.25, 14.0], name="price")
prices.dtype # float64
prices.shape # (4,)
prices.mean() # 10.435
prices.describe() # count, mean, std, min, quartiles, maxDataFrame — a table
df = pd.DataFrame({
"name": ["Alice", "Bob", "Carol", "Dave"],
"age": [30, 25, 35, 28],
"department": ["Eng", "Sales", "Eng", "Marketing"],
"salary": [90000, 55000, 95000, 60000],
})
df.shape # (4, 4)
df.dtypes # dtype per column
df.info() # memory usage + types
df.describe() # numeric stats
df.head(2) # first 2 rows
df.tail(2) # last 2 rows3. Reading Data
# CSV
df = pd.read_csv("data/users.csv")
df = pd.read_csv(
"data/sales.csv",
sep=";", # semicolon-separated
encoding="utf-8",
dtype={"id": str, "amount": float},
parse_dates=["created_at"],
usecols=["id", "name", "amount", "created_at"], # only load these cols
skiprows=2, # skip header rows
na_values=["N/A", "-", "null"], # treat as NaN
)
# Excel
df = pd.read_excel("report.xlsx", sheet_name="Sheet1")
df = pd.read_excel("report.xlsx", sheet_name=0) # by index
# JSON
df = pd.read_json("data.json")
df = pd.read_json("data.json", orient="records") # list of dicts
# From a list of dicts (from requests.json())
records = [{"name": "Alice", "age": 30}, {"name": "Bob", "age": 25}]
df = pd.DataFrame(records)4. Selecting Data
# column selection
df["name"] # Series
df[["name", "salary"]] # DataFrame (double brackets)
# row selection by index
df.loc[0] # row by label
df.iloc[0] # row by integer position
df.loc[0:2] # rows 0-2 by label (inclusive)
df.iloc[0:2] # rows 0-2 by position (exclusive)
# cell
df.loc[0, "name"] # "Alice"
df.iloc[0, 0] # "Alice"
# multiple columns, specific rows
df.loc[df["department"] == "Eng", ["name", "salary"]]5. Filtering
# boolean mask
mask = df["age"] > 28
df[mask]
# multiple conditions — use & (and) | (or) ~ (not), always wrap in ()
df[(df["age"] > 28) & (df["department"] == "Eng")]
df[(df["salary"] < 60000) | (df["department"] == "Marketing")]
# isin — membership test
df[df["department"].isin(["Eng", "Marketing"])]
# string operations
df[df["name"].str.startswith("A")]
df[df["name"].str.contains("a", case=False)]
# null checks
df[df["email"].isna()]
df[df["email"].notna()]
# query method (SQL-like syntax)
df.query("age > 28 and department == 'Eng'")
df.query("salary.between(60000, 100000)", engine="python")6. Data Cleaning
Handling nulls
df.isna().sum() # count nulls per column
df.dropna() # drop rows with any null
df.dropna(subset=["email"]) # drop rows where email is null
df.fillna(0) # fill all nulls with 0
df["age"].fillna(df["age"].mean()) # fill with column mean
df.fillna({"age": 0, "name": "Unknown"})Removing duplicates
df.duplicated() # boolean Series
df.duplicated(subset=["email"]) # duplicates by email
df.drop_duplicates() # remove all duplicate rows
df.drop_duplicates(subset=["email"], keep="first")Renaming and dropping columns
df.rename(columns={"name": "full_name", "salary": "annual_salary"})
df.drop(columns=["temp_col", "unused"])
df.drop(columns=df.columns[df.isna().all()]) # drop all-null columnsType conversion
df["age"] = df["age"].astype(int)
df["salary"] = pd.to_numeric(df["salary"], errors="coerce") # invalid -> NaN
df["created_at"] = pd.to_datetime(df["created_at"])
df["active"] = df["active"].astype(bool)String cleaning
df["email"] = df["email"].str.strip().str.lower()
df["name"] = df["name"].str.strip().str.title()
df["phone"] = df["phone"].str.replace(r"\D", "", regex=True) # digits only7. Adding and Modifying Columns
# new column
df["bonus"] = df["salary"] * 0.10
df["full_label"] = df["name"] + " (" + df["department"] + ")"
df["seniority"] = df["age"].apply(lambda x: "senior" if x >= 30 else "junior")
# conditional column
import numpy as np
df["tier"] = np.where(df["salary"] >= 80000, "high", "standard")
# pd.cut — bin numeric data
df["age_group"] = pd.cut(
df["age"],
bins=[0, 25, 35, 100],
labels=["young", "mid", "senior"],
)8. apply — Row-wise and Column-wise Transformations
def compute_tax(row: pd.Series) -> float:
rate = 0.35 if row["salary"] > 80000 else 0.25
return row["salary"] * rate
df["tax"] = df.apply(compute_tax, axis=1) # axis=1 = apply per row
# apply to a single column (faster than apply row-wise)
df["email_domain"] = df["email"].map(lambda e: e.split("@")[-1])
# str accessor is fastest for string ops
df["email_domain"] = df["email"].str.split("@").str[-1]9. groupby — Aggregations
# simple aggregation
df.groupby("department")["salary"].mean()
df.groupby("department")["salary"].agg(["mean", "max", "count"])
# multiple columns
df.groupby(["department", "seniority"])["salary"].mean()
# named aggregations (pandas 0.25+)
result = df.groupby("department").agg(
avg_salary=("salary", "mean"),
max_salary=("salary", "max"),
headcount=("name", "count"),
)
# transform — keep original index, broadcast group stats
df["dept_avg_salary"] = df.groupby("department")["salary"].transform("mean")
df["salary_vs_avg"] = df["salary"] - df["dept_avg_salary"]10. merge and concat
merge (SQL-style joins)
employees = pd.DataFrame({
"emp_id": [1, 2, 3],
"name": ["Alice", "Bob", "Carol"],
"dept_id": [10, 20, 10],
})
departments = pd.DataFrame({
"dept_id": [10, 20, 30],
"dept_name": ["Engineering", "Sales", "HR"],
})
# inner join (only matching rows)
merged = employees.merge(departments, on="dept_id", how="inner")
# left join (keep all employees, null for missing dept)
merged = employees.merge(departments, on="dept_id", how="left")
# different column names
merged = employees.merge(
departments,
left_on="dept_id",
right_on="id",
how="left",
)concat — stack rows or columns
# stack rows vertically
combined = pd.concat([df_jan, df_feb, df_mar], ignore_index=True)
# stack columns horizontally
combined = pd.concat([df_base, df_extra], axis=1)11. Sorting and Indexing
df.sort_values("salary", ascending=False)
df.sort_values(["department", "salary"], ascending=[True, False])
# reset index after filtering
filtered = df[df["salary"] > 70000].reset_index(drop=True)
# set a column as index
df_indexed = df.set_index("emp_id")
df_indexed.loc[1] # lookup by emp_id
# sort by index
df.sort_index()12. Writing Output
# CSV
df.to_csv("output/result.csv", index=False, encoding="utf-8")
# Excel
df.to_excel("output/report.xlsx", sheet_name="Results", index=False)
# JSON
df.to_json("output/data.json", orient="records", indent=2)
# multiple sheets
with pd.ExcelWriter("output/report.xlsx", engine="openpyxl") as writer:
summary.to_excel(writer, sheet_name="Summary", index=False)
detail.to_excel(writer, sheet_name="Detail", index=False)13. Complete Pipeline Example
from pathlib import Path
import pandas as pd
import logging
logger = logging.getLogger(__name__)
def load_sales(path: Path) -> pd.DataFrame:
df = pd.read_csv(
path,
parse_dates=["order_date"],
dtype={"order_id": str},
)
logger.info("Loaded %d rows from %s", len(df), path)
return df
def clean_sales(df: pd.DataFrame) -> pd.DataFrame:
before = len(df)
df = df.dropna(subset=["order_id", "amount", "customer_email"])
df["customer_email"] = df["customer_email"].str.strip().str.lower()
df["amount"] = pd.to_numeric(df["amount"], errors="coerce")
df = df[df["amount"] > 0]
logger.info("Cleaned: %d -> %d rows", before, len(df))
return df.reset_index(drop=True)
def enrich_sales(df: pd.DataFrame) -> pd.DataFrame:
df["year_month"] = df["order_date"].dt.to_period("M")
df["revenue_tier"] = pd.cut(
df["amount"],
bins=[0, 100, 500, float("inf")],
labels=["low", "mid", "high"],
)
return df
def summarize_by_month(df: pd.DataFrame) -> pd.DataFrame:
return (
df.groupby("year_month")
.agg(
total_revenue=("amount", "sum"),
order_count=("order_id", "count"),
avg_order=("amount", "mean"),
)
.reset_index()
)
def run_pipeline(input_path: Path, output_path: Path) -> None:
df = load_sales(input_path)
df = clean_sales(df)
df = enrich_sales(df)
summary = summarize_by_month(df)
output_path.parent.mkdir(parents=True, exist_ok=True)
summary.to_csv(output_path, index=False)
logger.info("Wrote summary to %s", output_path)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
run_pipeline(
Path("data/sales_2025.csv"),
Path("output/monthly_summary.csv"),
)Exercises
Exercise 1: Load a CSV of employees with columns name, salary, department, hire_date. Find the average salary per department for employees hired after 2022.
Exercise 2: Write a function fill_missing_with_median(df, columns) that fills null values in the given columns with each column's median.
Exercise 3: Given two DataFrames — orders (order_id, customer_id, amount) and customers (customer_id, name, country) — produce a summary: total spend per country, sorted descending.
Summary
| Operation | Method |
|-----------|--------|
| Read CSV/Excel/JSON | pd.read_csv, pd.read_excel, pd.read_json |
| Select rows | Boolean mask, .query(), .loc[], .iloc[] |
| Clean nulls | .dropna(), .fillna() |
| Clean strings | .str.strip(), .str.lower(), .str.replace() |
| Transform rows | .apply(fn, axis=1), .map() |
| Aggregate | .groupby().agg() |
| Broadcast stats | .groupby().transform() |
| Join tables | .merge(how="left"/"inner") |
| Stack tables | pd.concat |
| Write output | .to_csv(), .to_excel(), .to_json() |
Next: structured logging and observability for your Python scripts and services.