Learnixo

Live Coding Interview Prep · Lesson 13 of 16

Implement a Token Bucket Rate Limiter

The Problem

Rate limiting is a common interview topic and essential for AI APIs where every call costs money. Implement a rate limiter that allows bursting while enforcing a long-term rate.

Token Bucket Algorithm:

  • Bucket holds up to capacity tokens
  • Tokens refill at rate tokens/second
  • Each request consumes cost tokens
  • If fewer tokens than cost, reject the request

Token Bucket: Core Implementation

Python
import time

class TokenBucket:
    """Thread-unsafe token bucket — for demonstration."""

    def __init__(self, capacity: float, rate: float):
        """
        capacity: maximum tokens the bucket can hold (burst limit)
        rate: tokens added per second (sustained throughput)
        """
        self.capacity = capacity
        self.rate = rate
        self.tokens = capacity      # Start full
        self.last_refill = time.monotonic()

    def _refill(self):
        """Add tokens based on elapsed time since last refill."""
        now = time.monotonic()
        elapsed = now - self.last_refill
        added = elapsed * self.rate
        self.tokens = min(self.capacity, self.tokens + added)
        self.last_refill = now

    def consume(self, cost: float = 1.0) -> bool:
        """Try to consume `cost` tokens. Returns True if allowed."""
        self._refill()
        if self.tokens >= cost:
            self.tokens -= cost
            return True
        return False

    def time_until_available(self, cost: float = 1.0) -> float:
        """Seconds until `cost` tokens are available."""
        self._refill()
        if self.tokens >= cost:
            return 0.0
        deficit = cost - self.tokens
        return deficit / self.rate

# Test
bucket = TokenBucket(capacity=10, rate=2)  # 10 token burst, 2 tokens/sec

# Burst: 10 requests succeed immediately
for i in range(10):
    allowed = bucket.consume()
    print(f"Request {i+1}: {'allowed' if allowed else 'rejected'}")

# 11th request fails
allowed = bucket.consume()
print(f"Request 11: {'allowed' if allowed else 'rejected'}")  # rejected

# After 1 second, 2 more tokens available
time.sleep(1)
for i in range(3):
    allowed = bucket.consume()
    print(f"After 1s, request {i+1}: {'allowed' if allowed else 'rejected'}")
# First 2 succeed, 3rd fails

Thread-Safe Version

For multi-threaded applications:

Python
import threading

class ThreadSafeTokenBucket:
    def __init__(self, capacity: float, rate: float):
        self.capacity = capacity
        self.rate = rate
        self.tokens = capacity
        self.last_refill = time.monotonic()
        self._lock = threading.Lock()

    def consume(self, cost: float = 1.0) -> bool:
        with self._lock:
            now = time.monotonic()
            elapsed = now - self.last_refill
            self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
            self.last_refill = now

            if self.tokens >= cost:
                self.tokens -= cost
                return True
            return False

    def time_until_available(self, cost: float = 1.0) -> float:
        with self._lock:
            deficit = max(0, cost - self.tokens)
            return deficit / self.rate

Per-Key Rate Limiter

For APIs, each user/IP gets their own bucket:

Python
from collections import defaultdict

class PerKeyRateLimiter:
    def __init__(self, capacity: float, rate: float):
        self.capacity = capacity
        self.rate = rate
        self._buckets: dict[str, ThreadSafeTokenBucket] = {}
        self._lock = threading.Lock()

    def _get_bucket(self, key: str) -> ThreadSafeTokenBucket:
        with self._lock:
            if key not in self._buckets:
                self._buckets[key] = ThreadSafeTokenBucket(self.capacity, self.rate)
            return self._buckets[key]

    def is_allowed(self, key: str, cost: float = 1.0) -> bool:
        return self._get_bucket(key).consume(cost)

    def retry_after(self, key: str, cost: float = 1.0) -> float:
        return self._get_bucket(key).time_until_available(cost)

# Usage
limiter = PerKeyRateLimiter(capacity=100, rate=10)  # 100 burst, 10/sec

for i in range(5):
    user_id = f"user_{i % 3}"  # 3 different users
    allowed = limiter.is_allowed(user_id)
    print(f"{user_id}: {'allowed' if allowed else 'rejected'}")

Async Rate Limiter

For async applications (FastAPI):

Python
import asyncio

class AsyncTokenBucket:
    def __init__(self, capacity: float, rate: float):
        self.capacity = capacity
        self.rate = rate
        self.tokens = capacity
        self.last_refill = time.monotonic()
        self._lock = asyncio.Lock()

    async def consume(self, cost: float = 1.0) -> bool:
        async with self._lock:
            now = time.monotonic()
            elapsed = now - self.last_refill
            self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
            self.last_refill = now

            if self.tokens >= cost:
                self.tokens -= cost
                return True
            return False

# FastAPI middleware usage
class RateLimitMiddleware:
    def __init__(self, app, capacity=100, rate=10):
        self.app = app
        self.limiters: dict[str, AsyncTokenBucket] = {}
        self.capacity = capacity
        self.rate = rate
        self._lock = asyncio.Lock()

    async def get_limiter(self, key: str) -> AsyncTokenBucket:
        async with self._lock:
            if key not in self.limiters:
                self.limiters[key] = AsyncTokenBucket(self.capacity, self.rate)
            return self.limiters[key]

    async def __call__(self, scope, receive, send):
        if scope["type"] != "http":
            await self.app(scope, receive, send)
            return

        from starlette.requests import Request
        from starlette.responses import JSONResponse

        request = Request(scope, receive)
        ip = request.client.host

        limiter = await self.get_limiter(ip)
        allowed = await limiter.consume()

        if not allowed:
            response = JSONResponse(
                {"error": "rate_limited", "retry_after": 1.0},
                status_code=429,
            )
            await response(scope, receive, send)
            return

        await self.app(scope, receive, send)

Token-Based Limiting for LLMs

For LLM APIs, limit by tokens consumed rather than request count:

Python
async def check_llm_token_limit(
    user_id: str,
    estimated_tokens: int,
    limiter: PerKeyRateLimiter,
) -> bool:
    """Rate limit by token consumption, not request count."""
    # Each token costs 1 unit from the bucket
    return limiter.is_allowed(user_id, cost=estimated_tokens)

# Config: 50,000 tokens per hour per user
tokens_per_hour_limiter = PerKeyRateLimiter(
    capacity=50_000,
    rate=50_000 / 3600,  # tokens per second
)

# A 500-token query
allowed = tokens_per_hour_limiter.is_allowed("user_abc", cost=500)
print(f"Query allowed: {allowed}")

Interview Follow-Ups

Q: What's the difference between token bucket and leaky bucket?

Token bucket allows bursting — if the bucket is full, a burst of requests all succeed immediately. Leaky bucket smooths out traffic — requests are queued and processed at a fixed rate, preventing any burst. Token bucket is better for APIs where occasional bursts are acceptable. Leaky bucket is better when you need consistent throughput (e.g., message processing).

Q: How would you implement this across multiple API servers?

In-memory rate limiters only work for a single process. For multi-instance deployments, store bucket state in Redis. Use a Lua script for atomic check-and-consume to prevent race conditions between instances.

Q: What is a sliding window rate limiter?

Token bucket uses continuous time. Sliding window tracks exact request timestamps within a rolling window (e.g., last 60 seconds). More accurate but uses more memory (stores N timestamps per user). Fixed window resets at specific intervals — simpler but allows a burst at window boundaries.