Python & FastAPI · Lesson 8 of 10
Async & Concurrency: asyncio, await, and Concurrent I/O
What Problem Does Async Solve?
Your pipeline calls 50 API endpoints. Each takes 200ms. Sequentially: 50 × 200ms = 10 seconds. Concurrently: they all run at once → ~200ms.
Python async solves I/O-bound waiting — network calls, file reads, database queries — where the CPU is idle most of the time. It does not parallelize CPU-bound work (use multiprocessing for that).
1. The Mental Model
Normal (synchronous) code:
call A → wait → result A → call B → wait → result BAsync code:
start A → start B → start C → result A → result B → result CThe event loop is a single-threaded scheduler that switches between coroutines whenever one is waiting for I/O.
2. Coroutines: async def and await
import asyncio
async def fetch_user(user_id: int) -> dict:
await asyncio.sleep(0.2) # simulates network I/O
return {"id": user_id, "name": f"User {user_id}"}
# run a coroutine
result = asyncio.run(fetch_user(1))
print(result) # {"id": 1, "name": "User 1"}Rules:
async defdefines a coroutine functionawaitsuspends the coroutine and yields control to the event loop- You can only
awaitinside anasync def asyncio.run()is the entry point — call it once, in__main__
3. Running Coroutines Concurrently
asyncio.gather — run all, wait for all
import asyncio
import time
async def simulate_api_call(n: int) -> str:
await asyncio.sleep(0.2) # I/O wait
return f"result_{n}"
async def main() -> None:
start = time.perf_counter()
# sequential — 5 × 200ms = ~1s
for i in range(5):
await simulate_api_call(i)
print(f"Sequential: {time.perf_counter() - start:.2f}s")
start = time.perf_counter()
# concurrent — all run at once → ~200ms
results = await asyncio.gather(
*(simulate_api_call(i) for i in range(5))
)
print(f"Concurrent: {time.perf_counter() - start:.2f}s")
print(results) # ['result_0', 'result_1', ...]
asyncio.run(main())
# Sequential: 1.00s
# Concurrent: 0.20sasyncio.gather with error handling
results = await asyncio.gather(
*tasks,
return_exceptions=True, # exceptions are returned, not raised
)
for result in results:
if isinstance(result, Exception):
print(f"Task failed: {result}")
else:
process(result)4. asyncio.create_task — Fire and Continue
async def background_save(data: dict) -> None:
await asyncio.sleep(1)
print("Saved:", data)
async def main() -> None:
task = asyncio.create_task(background_save({"key": "value"}))
# continue doing other work while save runs in background
print("Doing other work...")
await asyncio.sleep(0.5)
print("Still working...")
await task # wait for task when you need it5. Concurrent HTTP with httpx
import asyncio
import httpx
async def fetch(client: httpx.AsyncClient, url: str) -> dict:
resp = await client.get(url, timeout=15)
resp.raise_for_status()
return resp.json()
async def fetch_all_users(user_ids: list[int]) -> list[dict]:
base = "https://jsonplaceholder.typicode.com/users"
async with httpx.AsyncClient() as client:
tasks = [fetch(client, f"{base}/{uid}") for uid in user_ids]
return await asyncio.gather(*tasks, return_exceptions=True)
users = asyncio.run(fetch_all_users([1, 2, 3, 4, 5]))Rate-limited concurrent fetching
async def fetch_with_semaphore(
client: httpx.AsyncClient,
url: str,
semaphore: asyncio.Semaphore,
) -> dict:
async with semaphore: # at most N concurrent requests
resp = await client.get(url, timeout=15)
resp.raise_for_status()
return resp.json()
async def fetch_bulk(urls: list[str], max_concurrent: int = 10) -> list[dict]:
semaphore = asyncio.Semaphore(max_concurrent)
async with httpx.AsyncClient() as client:
tasks = [fetch_with_semaphore(client, url, semaphore) for url in urls]
return await asyncio.gather(*tasks, return_exceptions=True)6. Async Context Managers
class AsyncDatabaseConnection:
async def __aenter__(self) -> "AsyncDatabaseConnection":
await self._connect()
return self
async def __aexit__(self, *args: object) -> None:
await self._disconnect()
async def _connect(self) -> None:
await asyncio.sleep(0.01) # simulate connection
print("Connected")
async def _disconnect(self) -> None:
await asyncio.sleep(0.01)
print("Disconnected")
async def query(self, sql: str) -> list[dict]:
await asyncio.sleep(0.05)
return [{"result": "data"}]
async def main() -> None:
async with AsyncDatabaseConnection() as db:
rows = await db.query("SELECT * FROM users")7. Async Generators and Iterators
from typing import AsyncIterator
async def paginate_api(base_url: str, token: str) -> AsyncIterator[dict]:
async with httpx.AsyncClient() as client:
page = 1
while True:
resp = await client.get(
f"{base_url}/records",
params={"page": page, "per_page": 100},
headers={"Authorization": f"Bearer {token}"},
timeout=15,
)
resp.raise_for_status()
data = resp.json()
items = data.get("items", [])
if not items:
break
for item in items:
yield item
page += 1
async def main() -> None:
async for record in paginate_api("https://api.example.com", token):
process(record)8. aiofiles — Async File I/O
pip install aiofilesimport aiofiles
import json
async def read_json_file(path: str) -> dict:
async with aiofiles.open(path, "r", encoding="utf-8") as f:
content = await f.read()
return json.loads(content)
async def write_json_file(path: str, data: dict) -> None:
async with aiofiles.open(path, "w", encoding="utf-8") as f:
await f.write(json.dumps(data, indent=2))
# read many files concurrently
async def read_all(paths: list[str]) -> list[dict]:
tasks = [read_json_file(p) for p in paths]
return await asyncio.gather(*tasks)9. asyncio.Queue — Producer/Consumer Pattern
import asyncio
async def producer(queue: asyncio.Queue, items: list[str]) -> None:
for item in items:
await queue.put(item)
print(f"Produced: {item}")
await queue.put(None) # sentinel to signal done
async def consumer(queue: asyncio.Queue, worker_id: int) -> None:
while True:
item = await queue.get()
if item is None:
queue.task_done()
break
await asyncio.sleep(0.1) # simulate processing
print(f"Worker {worker_id} processed: {item}")
queue.task_done()
async def main() -> None:
queue: asyncio.Queue = asyncio.Queue(maxsize=10)
items = [f"record_{i}" for i in range(20)]
await asyncio.gather(
producer(queue, items),
consumer(queue, 1),
consumer(queue, 2),
)
asyncio.run(main())10. Async vs Threading vs Multiprocessing
| Use Case | Tool |
|----------|------|
| Many network requests, DB queries | asyncio |
| CPU-heavy work (numpy, parsing) | multiprocessing |
| Blocking third-party libraries | threading |
| Mixed: async + blocking code | asyncio.to_thread |
Running blocking code in async context
import asyncio
from pathlib import Path
import pandas as pd
def load_dataframe(path: Path) -> pd.DataFrame:
return pd.read_csv(path) # blocking I/O
async def main() -> None:
# run blocking call in a thread, don't block the event loop
df = await asyncio.to_thread(load_dataframe, Path("data.csv"))
print(df.shape)
asyncio.run(main())11. Error Handling in Async Code
async def safe_fetch(client: httpx.AsyncClient, url: str) -> dict | None:
try:
resp = await client.get(url, timeout=10)
resp.raise_for_status()
return resp.json()
except httpx.TimeoutException:
print(f"Timeout: {url}")
return None
except httpx.HTTPStatusError as e:
print(f"HTTP {e.response.status_code}: {url}")
return None
except Exception as e:
print(f"Unexpected error for {url}: {e}")
raise
async def fetch_all_safe(urls: list[str]) -> list[dict | None]:
async with httpx.AsyncClient() as client:
return await asyncio.gather(*(safe_fetch(client, url) for url in urls))12. Real Pattern: Async Pipeline Step
from abc import ABC, abstractmethod
from dataclasses import dataclass
@dataclass
class PipelineContext:
run_id: str
data: list[dict]
class AsyncStep(ABC):
@abstractmethod
async def process(self, ctx: PipelineContext) -> PipelineContext:
...
class FetchEnrichmentsStep(AsyncStep):
def __init__(self, api_url: str, token: str) -> None:
self._api_url = api_url
self._token = token
async def process(self, ctx: PipelineContext) -> PipelineContext:
async with httpx.AsyncClient() as client:
semaphore = asyncio.Semaphore(10)
tasks = [
self._enrich(client, semaphore, record)
for record in ctx.data
]
enriched = await asyncio.gather(*tasks, return_exceptions=True)
valid = [r for r in enriched if isinstance(r, dict)]
return PipelineContext(run_id=ctx.run_id, data=valid)
async def _enrich(
self,
client: httpx.AsyncClient,
semaphore: asyncio.Semaphore,
record: dict,
) -> dict:
async with semaphore:
resp = await client.get(
f"{self._api_url}/enrich",
params={"id": record["id"]},
headers={"Authorization": f"Bearer {self._token}"},
timeout=10,
)
resp.raise_for_status()
return {**record, **resp.json()}
async def run_pipeline(steps: list[AsyncStep], initial_data: list[dict]) -> list[dict]:
ctx = PipelineContext(run_id="run-001", data=initial_data)
for step in steps:
ctx = await step.process(ctx)
return ctx.dataExercises
Exercise 1: Write an async function download_files(urls: list[str], output_dir: Path) that downloads all files concurrently (max 5 at a time), saving each to output_dir using the URL filename. Use httpx + aiofiles.
Exercise 2: Build an async producer/consumer queue where the producer reads rows from a CSV and puts them in the queue, and 3 worker coroutines each call an API to enrich the row before writing to output.
Exercise 3: Write a function retry_async(coro_fn, max_attempts, backoff) that calls the given coroutine, retries on exception with exponential backoff using asyncio.sleep.
Summary
| Concept | API |
|---------|-----|
| Define coroutine | async def fn(): |
| Wait for I/O | await some_coroutine() |
| Entry point | asyncio.run(main()) |
| Run many at once | asyncio.gather(*tasks) |
| Limit concurrency | asyncio.Semaphore(n) |
| Background task | asyncio.create_task() |
| Async for | async for item in aiter: |
| Async with | async with resource: |
| Run blocking code | await asyncio.to_thread(fn, args) |
| Async HTTP | httpx.AsyncClient |
| Async file I/O | aiofiles.open() |
Next: building automation scripts and internal tooling with subprocess and Makefile.