Live Coding Interview Prep · Lesson 13 of 16
Implement a Token Bucket Rate Limiter
The Problem
Rate limiting is a common interview topic and essential for AI APIs where every call costs money. Implement a rate limiter that allows bursting while enforcing a long-term rate.
Token Bucket Algorithm:
- Bucket holds up to
capacitytokens - Tokens refill at
ratetokens/second - Each request consumes
costtokens - If fewer tokens than
cost, reject the request
Token Bucket: Core Implementation
import time
class TokenBucket:
"""Thread-unsafe token bucket — for demonstration."""
def __init__(self, capacity: float, rate: float):
"""
capacity: maximum tokens the bucket can hold (burst limit)
rate: tokens added per second (sustained throughput)
"""
self.capacity = capacity
self.rate = rate
self.tokens = capacity # Start full
self.last_refill = time.monotonic()
def _refill(self):
"""Add tokens based on elapsed time since last refill."""
now = time.monotonic()
elapsed = now - self.last_refill
added = elapsed * self.rate
self.tokens = min(self.capacity, self.tokens + added)
self.last_refill = now
def consume(self, cost: float = 1.0) -> bool:
"""Try to consume `cost` tokens. Returns True if allowed."""
self._refill()
if self.tokens >= cost:
self.tokens -= cost
return True
return False
def time_until_available(self, cost: float = 1.0) -> float:
"""Seconds until `cost` tokens are available."""
self._refill()
if self.tokens >= cost:
return 0.0
deficit = cost - self.tokens
return deficit / self.rate
# Test
bucket = TokenBucket(capacity=10, rate=2) # 10 token burst, 2 tokens/sec
# Burst: 10 requests succeed immediately
for i in range(10):
allowed = bucket.consume()
print(f"Request {i+1}: {'allowed' if allowed else 'rejected'}")
# 11th request fails
allowed = bucket.consume()
print(f"Request 11: {'allowed' if allowed else 'rejected'}") # rejected
# After 1 second, 2 more tokens available
time.sleep(1)
for i in range(3):
allowed = bucket.consume()
print(f"After 1s, request {i+1}: {'allowed' if allowed else 'rejected'}")
# First 2 succeed, 3rd failsThread-Safe Version
For multi-threaded applications:
import threading
class ThreadSafeTokenBucket:
def __init__(self, capacity: float, rate: float):
self.capacity = capacity
self.rate = rate
self.tokens = capacity
self.last_refill = time.monotonic()
self._lock = threading.Lock()
def consume(self, cost: float = 1.0) -> bool:
with self._lock:
now = time.monotonic()
elapsed = now - self.last_refill
self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
self.last_refill = now
if self.tokens >= cost:
self.tokens -= cost
return True
return False
def time_until_available(self, cost: float = 1.0) -> float:
with self._lock:
deficit = max(0, cost - self.tokens)
return deficit / self.ratePer-Key Rate Limiter
For APIs, each user/IP gets their own bucket:
from collections import defaultdict
class PerKeyRateLimiter:
def __init__(self, capacity: float, rate: float):
self.capacity = capacity
self.rate = rate
self._buckets: dict[str, ThreadSafeTokenBucket] = {}
self._lock = threading.Lock()
def _get_bucket(self, key: str) -> ThreadSafeTokenBucket:
with self._lock:
if key not in self._buckets:
self._buckets[key] = ThreadSafeTokenBucket(self.capacity, self.rate)
return self._buckets[key]
def is_allowed(self, key: str, cost: float = 1.0) -> bool:
return self._get_bucket(key).consume(cost)
def retry_after(self, key: str, cost: float = 1.0) -> float:
return self._get_bucket(key).time_until_available(cost)
# Usage
limiter = PerKeyRateLimiter(capacity=100, rate=10) # 100 burst, 10/sec
for i in range(5):
user_id = f"user_{i % 3}" # 3 different users
allowed = limiter.is_allowed(user_id)
print(f"{user_id}: {'allowed' if allowed else 'rejected'}")Async Rate Limiter
For async applications (FastAPI):
import asyncio
class AsyncTokenBucket:
def __init__(self, capacity: float, rate: float):
self.capacity = capacity
self.rate = rate
self.tokens = capacity
self.last_refill = time.monotonic()
self._lock = asyncio.Lock()
async def consume(self, cost: float = 1.0) -> bool:
async with self._lock:
now = time.monotonic()
elapsed = now - self.last_refill
self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
self.last_refill = now
if self.tokens >= cost:
self.tokens -= cost
return True
return False
# FastAPI middleware usage
class RateLimitMiddleware:
def __init__(self, app, capacity=100, rate=10):
self.app = app
self.limiters: dict[str, AsyncTokenBucket] = {}
self.capacity = capacity
self.rate = rate
self._lock = asyncio.Lock()
async def get_limiter(self, key: str) -> AsyncTokenBucket:
async with self._lock:
if key not in self.limiters:
self.limiters[key] = AsyncTokenBucket(self.capacity, self.rate)
return self.limiters[key]
async def __call__(self, scope, receive, send):
if scope["type"] != "http":
await self.app(scope, receive, send)
return
from starlette.requests import Request
from starlette.responses import JSONResponse
request = Request(scope, receive)
ip = request.client.host
limiter = await self.get_limiter(ip)
allowed = await limiter.consume()
if not allowed:
response = JSONResponse(
{"error": "rate_limited", "retry_after": 1.0},
status_code=429,
)
await response(scope, receive, send)
return
await self.app(scope, receive, send)Token-Based Limiting for LLMs
For LLM APIs, limit by tokens consumed rather than request count:
async def check_llm_token_limit(
user_id: str,
estimated_tokens: int,
limiter: PerKeyRateLimiter,
) -> bool:
"""Rate limit by token consumption, not request count."""
# Each token costs 1 unit from the bucket
return limiter.is_allowed(user_id, cost=estimated_tokens)
# Config: 50,000 tokens per hour per user
tokens_per_hour_limiter = PerKeyRateLimiter(
capacity=50_000,
rate=50_000 / 3600, # tokens per second
)
# A 500-token query
allowed = tokens_per_hour_limiter.is_allowed("user_abc", cost=500)
print(f"Query allowed: {allowed}")Interview Follow-Ups
Q: What's the difference between token bucket and leaky bucket?
Token bucket allows bursting — if the bucket is full, a burst of requests all succeed immediately. Leaky bucket smooths out traffic — requests are queued and processed at a fixed rate, preventing any burst. Token bucket is better for APIs where occasional bursts are acceptable. Leaky bucket is better when you need consistent throughput (e.g., message processing).
Q: How would you implement this across multiple API servers?
In-memory rate limiters only work for a single process. For multi-instance deployments, store bucket state in Redis. Use a Lua script for atomic check-and-consume to prevent race conditions between instances.
Q: What is a sliding window rate limiter?
Token bucket uses continuous time. Sliding window tracks exact request timestamps within a rolling window (e.g., last 60 seconds). More accurate but uses more memory (stores N timestamps per user). Fixed window resets at specific intervals — simpler but allows a burst at window boundaries.