Back to blog
Backend Systemsintermediate

Pandas for Data Pipelines: From CSV to Transformed Output

Master Pandas for real data work — DataFrames, filtering, groupby, merge, cleaning, apply, and building reusable pipeline steps that process CSV, Excel, and JSON data at scale.

LearnixoMay 7, 20267 min read
PythonPandasdata pipelinesETLdata cleaningDataFrame
Share:𝕏

Pandas Is Your Data Swiss Army Knife

You'll use Pandas in every role that touches data: ETL pipelines, report generation, data cleaning automation, feature engineering for ML, and analytics scripts. This lesson focuses on the real-world patterns — not just what methods exist, but when and why to use them.


1. Installation

Bash
pip install pandas openpyxl xlrd   # openpyxl for .xlsx, xlrd for .xls

2. Core Concepts

Series — a single column

Python
import pandas as pd

prices = pd.Series([10.5, 9.99, 7.25, 14.0], name="price")
prices.dtype        # float64
prices.shape        # (4,)
prices.mean()       # 10.435
prices.describe()   # count, mean, std, min, quartiles, max

DataFrame — a table

Python
df = pd.DataFrame({
    "name": ["Alice", "Bob", "Carol", "Dave"],
    "age": [30, 25, 35, 28],
    "department": ["Eng", "Sales", "Eng", "Marketing"],
    "salary": [90000, 55000, 95000, 60000],
})

df.shape         # (4, 4)
df.dtypes        # dtype per column
df.info()        # memory usage + types
df.describe()    # numeric stats
df.head(2)       # first 2 rows
df.tail(2)       # last 2 rows

3. Reading Data

Python
# CSV
df = pd.read_csv("data/users.csv")
df = pd.read_csv(
    "data/sales.csv",
    sep=";",                          # semicolon-separated
    encoding="utf-8",
    dtype={"id": str, "amount": float},
    parse_dates=["created_at"],
    usecols=["id", "name", "amount", "created_at"],  # only load these cols
    skiprows=2,                        # skip header rows
    na_values=["N/A", "-", "null"],   # treat as NaN
)

# Excel
df = pd.read_excel("report.xlsx", sheet_name="Sheet1")
df = pd.read_excel("report.xlsx", sheet_name=0)  # by index

# JSON
df = pd.read_json("data.json")
df = pd.read_json("data.json", orient="records")  # list of dicts

# From a list of dicts (from requests.json())
records = [{"name": "Alice", "age": 30}, {"name": "Bob", "age": 25}]
df = pd.DataFrame(records)

4. Selecting Data

Python
# column selection
df["name"]              # Series
df[["name", "salary"]] # DataFrame (double brackets)

# row selection by index
df.loc[0]              # row by label
df.iloc[0]             # row by integer position
df.loc[0:2]            # rows 0-2 by label (inclusive)
df.iloc[0:2]           # rows 0-2 by position (exclusive)

# cell
df.loc[0, "name"]      # "Alice"
df.iloc[0, 0]          # "Alice"

# multiple columns, specific rows
df.loc[df["department"] == "Eng", ["name", "salary"]]

5. Filtering

Python
# boolean mask
mask = df["age"] > 28
df[mask]

# multiple conditions  use & (and) | (or) ~ (not), always wrap in ()
df[(df["age"] > 28) & (df["department"] == "Eng")]
df[(df["salary"] < 60000) | (df["department"] == "Marketing")]

# isin  membership test
df[df["department"].isin(["Eng", "Marketing"])]

# string operations
df[df["name"].str.startswith("A")]
df[df["name"].str.contains("a", case=False)]

# null checks
df[df["email"].isna()]
df[df["email"].notna()]

# query method (SQL-like syntax)
df.query("age > 28 and department == 'Eng'")
df.query("salary.between(60000, 100000)", engine="python")

6. Data Cleaning

Handling nulls

Python
df.isna().sum()                    # count nulls per column
df.dropna()                        # drop rows with any null
df.dropna(subset=["email"])        # drop rows where email is null
df.fillna(0)                       # fill all nulls with 0
df["age"].fillna(df["age"].mean()) # fill with column mean
df.fillna({"age": 0, "name": "Unknown"})

Removing duplicates

Python
df.duplicated()                    # boolean Series
df.duplicated(subset=["email"])    # duplicates by email
df.drop_duplicates()               # remove all duplicate rows
df.drop_duplicates(subset=["email"], keep="first")

Renaming and dropping columns

Python
df.rename(columns={"name": "full_name", "salary": "annual_salary"})
df.drop(columns=["temp_col", "unused"])
df.drop(columns=df.columns[df.isna().all()])  # drop all-null columns

Type conversion

Python
df["age"] = df["age"].astype(int)
df["salary"] = pd.to_numeric(df["salary"], errors="coerce")  # invalid -> NaN
df["created_at"] = pd.to_datetime(df["created_at"])
df["active"] = df["active"].astype(bool)

String cleaning

Python
df["email"] = df["email"].str.strip().str.lower()
df["name"] = df["name"].str.strip().str.title()
df["phone"] = df["phone"].str.replace(r"\D", "", regex=True)  # digits only

7. Adding and Modifying Columns

Python
# new column
df["bonus"] = df["salary"] * 0.10
df["full_label"] = df["name"] + " (" + df["department"] + ")"
df["seniority"] = df["age"].apply(lambda x: "senior" if x >= 30 else "junior")

# conditional column
import numpy as np
df["tier"] = np.where(df["salary"] >= 80000, "high", "standard")

# pd.cut  bin numeric data
df["age_group"] = pd.cut(
    df["age"],
    bins=[0, 25, 35, 100],
    labels=["young", "mid", "senior"],
)

8. apply — Row-wise and Column-wise Transformations

Python
def compute_tax(row: pd.Series) -> float:
    rate = 0.35 if row["salary"] > 80000 else 0.25
    return row["salary"] * rate

df["tax"] = df.apply(compute_tax, axis=1)   # axis=1 = apply per row

# apply to a single column (faster than apply row-wise)
df["email_domain"] = df["email"].map(lambda e: e.split("@")[-1])

# str accessor is fastest for string ops
df["email_domain"] = df["email"].str.split("@").str[-1]

9. groupby — Aggregations

Python
# simple aggregation
df.groupby("department")["salary"].mean()
df.groupby("department")["salary"].agg(["mean", "max", "count"])

# multiple columns
df.groupby(["department", "seniority"])["salary"].mean()

# named aggregations (pandas 0.25+)
result = df.groupby("department").agg(
    avg_salary=("salary", "mean"),
    max_salary=("salary", "max"),
    headcount=("name", "count"),
)

# transform  keep original index, broadcast group stats
df["dept_avg_salary"] = df.groupby("department")["salary"].transform("mean")
df["salary_vs_avg"] = df["salary"] - df["dept_avg_salary"]

10. merge and concat

merge (SQL-style joins)

Python
employees = pd.DataFrame({
    "emp_id": [1, 2, 3],
    "name": ["Alice", "Bob", "Carol"],
    "dept_id": [10, 20, 10],
})

departments = pd.DataFrame({
    "dept_id": [10, 20, 30],
    "dept_name": ["Engineering", "Sales", "HR"],
})

# inner join (only matching rows)
merged = employees.merge(departments, on="dept_id", how="inner")

# left join (keep all employees, null for missing dept)
merged = employees.merge(departments, on="dept_id", how="left")

# different column names
merged = employees.merge(
    departments,
    left_on="dept_id",
    right_on="id",
    how="left",
)

concat — stack rows or columns

Python
# stack rows vertically
combined = pd.concat([df_jan, df_feb, df_mar], ignore_index=True)

# stack columns horizontally
combined = pd.concat([df_base, df_extra], axis=1)

11. Sorting and Indexing

Python
df.sort_values("salary", ascending=False)
df.sort_values(["department", "salary"], ascending=[True, False])

# reset index after filtering
filtered = df[df["salary"] > 70000].reset_index(drop=True)

# set a column as index
df_indexed = df.set_index("emp_id")
df_indexed.loc[1]   # lookup by emp_id

# sort by index
df.sort_index()

12. Writing Output

Python
# CSV
df.to_csv("output/result.csv", index=False, encoding="utf-8")

# Excel
df.to_excel("output/report.xlsx", sheet_name="Results", index=False)

# JSON
df.to_json("output/data.json", orient="records", indent=2)

# multiple sheets
with pd.ExcelWriter("output/report.xlsx", engine="openpyxl") as writer:
    summary.to_excel(writer, sheet_name="Summary", index=False)
    detail.to_excel(writer, sheet_name="Detail", index=False)

13. Complete Pipeline Example

Python
from pathlib import Path
import pandas as pd
import logging

logger = logging.getLogger(__name__)


def load_sales(path: Path) -> pd.DataFrame:
    df = pd.read_csv(
        path,
        parse_dates=["order_date"],
        dtype={"order_id": str},
    )
    logger.info("Loaded %d rows from %s", len(df), path)
    return df


def clean_sales(df: pd.DataFrame) -> pd.DataFrame:
    before = len(df)
    df = df.dropna(subset=["order_id", "amount", "customer_email"])
    df["customer_email"] = df["customer_email"].str.strip().str.lower()
    df["amount"] = pd.to_numeric(df["amount"], errors="coerce")
    df = df[df["amount"] > 0]
    logger.info("Cleaned: %d -> %d rows", before, len(df))
    return df.reset_index(drop=True)


def enrich_sales(df: pd.DataFrame) -> pd.DataFrame:
    df["year_month"] = df["order_date"].dt.to_period("M")
    df["revenue_tier"] = pd.cut(
        df["amount"],
        bins=[0, 100, 500, float("inf")],
        labels=["low", "mid", "high"],
    )
    return df


def summarize_by_month(df: pd.DataFrame) -> pd.DataFrame:
    return (
        df.groupby("year_month")
        .agg(
            total_revenue=("amount", "sum"),
            order_count=("order_id", "count"),
            avg_order=("amount", "mean"),
        )
        .reset_index()
    )


def run_pipeline(input_path: Path, output_path: Path) -> None:
    df = load_sales(input_path)
    df = clean_sales(df)
    df = enrich_sales(df)
    summary = summarize_by_month(df)

    output_path.parent.mkdir(parents=True, exist_ok=True)
    summary.to_csv(output_path, index=False)
    logger.info("Wrote summary to %s", output_path)


if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    run_pipeline(
        Path("data/sales_2025.csv"),
        Path("output/monthly_summary.csv"),
    )

Exercises

Exercise 1: Load a CSV of employees with columns name, salary, department, hire_date. Find the average salary per department for employees hired after 2022.

Exercise 2: Write a function fill_missing_with_median(df, columns) that fills null values in the given columns with each column's median.

Exercise 3: Given two DataFrames — orders (order_id, customer_id, amount) and customers (customer_id, name, country) — produce a summary: total spend per country, sorted descending.


Summary

| Operation | Method | |-----------|--------| | Read CSV/Excel/JSON | pd.read_csv, pd.read_excel, pd.read_json | | Select rows | Boolean mask, .query(), .loc[], .iloc[] | | Clean nulls | .dropna(), .fillna() | | Clean strings | .str.strip(), .str.lower(), .str.replace() | | Transform rows | .apply(fn, axis=1), .map() | | Aggregate | .groupby().agg() | | Broadcast stats | .groupby().transform() | | Join tables | .merge(how="left"/"inner") | | Stack tables | pd.concat | | Write output | .to_csv(), .to_excel(), .to_json() |

Next: structured logging and observability for your Python scripts and services.

Enjoyed this article?

Explore the Backend Systems learning path for more.

Found this helpful?

Share:𝕏

Leave a comment

Have a question, correction, or just found this helpful? Leave a note below.