Python & FastAPI · Lesson 8 of 10

Async & Concurrency: asyncio, await, and Concurrent I/O

What Problem Does Async Solve?

Your pipeline calls 50 API endpoints. Each takes 200ms. Sequentially: 50 × 200ms = 10 seconds. Concurrently: they all run at once → ~200ms.

Python async solves I/O-bound waiting — network calls, file reads, database queries — where the CPU is idle most of the time. It does not parallelize CPU-bound work (use multiprocessing for that).


1. The Mental Model

Normal (synchronous) code:

call A → wait → result A → call B → wait → result B

Async code:

start A → start B → start C → result A → result B → result C

The event loop is a single-threaded scheduler that switches between coroutines whenever one is waiting for I/O.


2. Coroutines: async def and await

Python
import asyncio

async def fetch_user(user_id: int) -> dict:
    await asyncio.sleep(0.2)   # simulates network I/O
    return {"id": user_id, "name": f"User {user_id}"}

# run a coroutine
result = asyncio.run(fetch_user(1))
print(result)   # {"id": 1, "name": "User 1"}

Rules:

  • async def defines a coroutine function
  • await suspends the coroutine and yields control to the event loop
  • You can only await inside an async def
  • asyncio.run() is the entry point — call it once, in __main__

3. Running Coroutines Concurrently

asyncio.gather — run all, wait for all

Python
import asyncio
import time

async def simulate_api_call(n: int) -> str:
    await asyncio.sleep(0.2)   # I/O wait
    return f"result_{n}"

async def main() -> None:
    start = time.perf_counter()

    # sequential  5 × 200ms = ~1s
    for i in range(5):
        await simulate_api_call(i)

    print(f"Sequential: {time.perf_counter() - start:.2f}s")

    start = time.perf_counter()

    # concurrent  all run at once  ~200ms
    results = await asyncio.gather(
        *(simulate_api_call(i) for i in range(5))
    )

    print(f"Concurrent: {time.perf_counter() - start:.2f}s")
    print(results)   # ['result_0', 'result_1', ...]

asyncio.run(main())
# Sequential: 1.00s
# Concurrent: 0.20s

asyncio.gather with error handling

Python
results = await asyncio.gather(
    *tasks,
    return_exceptions=True,   # exceptions are returned, not raised
)

for result in results:
    if isinstance(result, Exception):
        print(f"Task failed: {result}")
    else:
        process(result)

4. asyncio.create_task — Fire and Continue

Python
async def background_save(data: dict) -> None:
    await asyncio.sleep(1)
    print("Saved:", data)

async def main() -> None:
    task = asyncio.create_task(background_save({"key": "value"}))
    # continue doing other work while save runs in background
    print("Doing other work...")
    await asyncio.sleep(0.5)
    print("Still working...")
    await task   # wait for task when you need it

5. Concurrent HTTP with httpx

Python
import asyncio
import httpx

async def fetch(client: httpx.AsyncClient, url: str) -> dict:
    resp = await client.get(url, timeout=15)
    resp.raise_for_status()
    return resp.json()

async def fetch_all_users(user_ids: list[int]) -> list[dict]:
    base = "https://jsonplaceholder.typicode.com/users"

    async with httpx.AsyncClient() as client:
        tasks = [fetch(client, f"{base}/{uid}") for uid in user_ids]
        return await asyncio.gather(*tasks, return_exceptions=True)

users = asyncio.run(fetch_all_users([1, 2, 3, 4, 5]))

Rate-limited concurrent fetching

Python
async def fetch_with_semaphore(
    client: httpx.AsyncClient,
    url: str,
    semaphore: asyncio.Semaphore,
) -> dict:
    async with semaphore:   # at most N concurrent requests
        resp = await client.get(url, timeout=15)
        resp.raise_for_status()
        return resp.json()

async def fetch_bulk(urls: list[str], max_concurrent: int = 10) -> list[dict]:
    semaphore = asyncio.Semaphore(max_concurrent)
    async with httpx.AsyncClient() as client:
        tasks = [fetch_with_semaphore(client, url, semaphore) for url in urls]
        return await asyncio.gather(*tasks, return_exceptions=True)

6. Async Context Managers

Python
class AsyncDatabaseConnection:
    async def __aenter__(self) -> "AsyncDatabaseConnection":
        await self._connect()
        return self

    async def __aexit__(self, *args: object) -> None:
        await self._disconnect()

    async def _connect(self) -> None:
        await asyncio.sleep(0.01)  # simulate connection
        print("Connected")

    async def _disconnect(self) -> None:
        await asyncio.sleep(0.01)
        print("Disconnected")

    async def query(self, sql: str) -> list[dict]:
        await asyncio.sleep(0.05)
        return [{"result": "data"}]

async def main() -> None:
    async with AsyncDatabaseConnection() as db:
        rows = await db.query("SELECT * FROM users")

7. Async Generators and Iterators

Python
from typing import AsyncIterator

async def paginate_api(base_url: str, token: str) -> AsyncIterator[dict]:
    async with httpx.AsyncClient() as client:
        page = 1
        while True:
            resp = await client.get(
                f"{base_url}/records",
                params={"page": page, "per_page": 100},
                headers={"Authorization": f"Bearer {token}"},
                timeout=15,
            )
            resp.raise_for_status()
            data = resp.json()
            items = data.get("items", [])

            if not items:
                break

            for item in items:
                yield item

            page += 1

async def main() -> None:
    async for record in paginate_api("https://api.example.com", token):
        process(record)

8. aiofiles — Async File I/O

Bash
pip install aiofiles
Python
import aiofiles
import json

async def read_json_file(path: str) -> dict:
    async with aiofiles.open(path, "r", encoding="utf-8") as f:
        content = await f.read()
    return json.loads(content)

async def write_json_file(path: str, data: dict) -> None:
    async with aiofiles.open(path, "w", encoding="utf-8") as f:
        await f.write(json.dumps(data, indent=2))

# read many files concurrently
async def read_all(paths: list[str]) -> list[dict]:
    tasks = [read_json_file(p) for p in paths]
    return await asyncio.gather(*tasks)

9. asyncio.Queue — Producer/Consumer Pattern

Python
import asyncio

async def producer(queue: asyncio.Queue, items: list[str]) -> None:
    for item in items:
        await queue.put(item)
        print(f"Produced: {item}")
    await queue.put(None)   # sentinel to signal done

async def consumer(queue: asyncio.Queue, worker_id: int) -> None:
    while True:
        item = await queue.get()
        if item is None:
            queue.task_done()
            break
        await asyncio.sleep(0.1)   # simulate processing
        print(f"Worker {worker_id} processed: {item}")
        queue.task_done()

async def main() -> None:
    queue: asyncio.Queue = asyncio.Queue(maxsize=10)
    items = [f"record_{i}" for i in range(20)]

    await asyncio.gather(
        producer(queue, items),
        consumer(queue, 1),
        consumer(queue, 2),
    )

asyncio.run(main())

10. Async vs Threading vs Multiprocessing

| Use Case | Tool | |----------|------| | Many network requests, DB queries | asyncio | | CPU-heavy work (numpy, parsing) | multiprocessing | | Blocking third-party libraries | threading | | Mixed: async + blocking code | asyncio.to_thread |

Running blocking code in async context

Python
import asyncio
from pathlib import Path
import pandas as pd

def load_dataframe(path: Path) -> pd.DataFrame:
    return pd.read_csv(path)  # blocking I/O

async def main() -> None:
    # run blocking call in a thread, don't block the event loop
    df = await asyncio.to_thread(load_dataframe, Path("data.csv"))
    print(df.shape)

asyncio.run(main())

11. Error Handling in Async Code

Python
async def safe_fetch(client: httpx.AsyncClient, url: str) -> dict | None:
    try:
        resp = await client.get(url, timeout=10)
        resp.raise_for_status()
        return resp.json()
    except httpx.TimeoutException:
        print(f"Timeout: {url}")
        return None
    except httpx.HTTPStatusError as e:
        print(f"HTTP {e.response.status_code}: {url}")
        return None
    except Exception as e:
        print(f"Unexpected error for {url}: {e}")
        raise

async def fetch_all_safe(urls: list[str]) -> list[dict | None]:
    async with httpx.AsyncClient() as client:
        return await asyncio.gather(*(safe_fetch(client, url) for url in urls))

12. Real Pattern: Async Pipeline Step

Python
from abc import ABC, abstractmethod
from dataclasses import dataclass

@dataclass
class PipelineContext:
    run_id: str
    data: list[dict]

class AsyncStep(ABC):
    @abstractmethod
    async def process(self, ctx: PipelineContext) -> PipelineContext:
        ...

class FetchEnrichmentsStep(AsyncStep):
    def __init__(self, api_url: str, token: str) -> None:
        self._api_url = api_url
        self._token = token

    async def process(self, ctx: PipelineContext) -> PipelineContext:
        async with httpx.AsyncClient() as client:
            semaphore = asyncio.Semaphore(10)
            tasks = [
                self._enrich(client, semaphore, record)
                for record in ctx.data
            ]
            enriched = await asyncio.gather(*tasks, return_exceptions=True)

        valid = [r for r in enriched if isinstance(r, dict)]
        return PipelineContext(run_id=ctx.run_id, data=valid)

    async def _enrich(
        self,
        client: httpx.AsyncClient,
        semaphore: asyncio.Semaphore,
        record: dict,
    ) -> dict:
        async with semaphore:
            resp = await client.get(
                f"{self._api_url}/enrich",
                params={"id": record["id"]},
                headers={"Authorization": f"Bearer {self._token}"},
                timeout=10,
            )
            resp.raise_for_status()
            return {**record, **resp.json()}


async def run_pipeline(steps: list[AsyncStep], initial_data: list[dict]) -> list[dict]:
    ctx = PipelineContext(run_id="run-001", data=initial_data)
    for step in steps:
        ctx = await step.process(ctx)
    return ctx.data

Exercises

Exercise 1: Write an async function download_files(urls: list[str], output_dir: Path) that downloads all files concurrently (max 5 at a time), saving each to output_dir using the URL filename. Use httpx + aiofiles.

Exercise 2: Build an async producer/consumer queue where the producer reads rows from a CSV and puts them in the queue, and 3 worker coroutines each call an API to enrich the row before writing to output.

Exercise 3: Write a function retry_async(coro_fn, max_attempts, backoff) that calls the given coroutine, retries on exception with exponential backoff using asyncio.sleep.


Summary

| Concept | API | |---------|-----| | Define coroutine | async def fn(): | | Wait for I/O | await some_coroutine() | | Entry point | asyncio.run(main()) | | Run many at once | asyncio.gather(*tasks) | | Limit concurrency | asyncio.Semaphore(n) | | Background task | asyncio.create_task() | | Async for | async for item in aiter: | | Async with | async with resource: | | Run blocking code | await asyncio.to_thread(fn, args) | | Async HTTP | httpx.AsyncClient | | Async file I/O | aiofiles.open() |

Next: building automation scripts and internal tooling with subprocess and Makefile.