Learnixo

AI Safety & Guardrails · Lesson 14 of 15

Rate Limiting and Abuse Prevention

Why Rate Limiting Matters More for AI APIs

A standard REST API rate limit prevents DoS attacks. An AI API rate limit prevents both DoS attacks AND runaway costs. Each LLM call costs money. An attacker (or a misconfigured client) hitting your endpoint 10,000 times will cost you money, not just CPU.

For AI services, rate limit on tokens consumed, not just request count. A user who sends one request with a 50,000-token context costs as much as 50 normal requests.


Token Bucket Algorithm

The token bucket is the standard rate limiting algorithm. It allows bursting while enforcing a long-term rate limit.

Concept:

  • A bucket holds tokens up to capacity C
  • Tokens refill at rate R tokens/second
  • Each request consumes T tokens (T = 1 for request-based, T = tokens_used for token-based)
  • If the bucket has fewer than T tokens, the request is rejected
Python
# rate_limiting/token_bucket.py
import time
import asyncio
from dataclasses import dataclass

@dataclass
class TokenBucket:
    capacity: float          # Maximum tokens in bucket
    refill_rate: float       # Tokens added per second
    current_tokens: float    # Current token count
    last_refill: float       # Timestamp of last refill

    def consume(self, tokens: float = 1.0) -> bool:
        """Try to consume tokens. Returns True if allowed, False if rate limited."""
        now = time.monotonic()

        # Add tokens based on elapsed time
        elapsed = now - self.last_refill
        self.current_tokens = min(
            self.capacity,
            self.current_tokens + elapsed * self.refill_rate,
        )
        self.last_refill = now

        if self.current_tokens >= tokens:
            self.current_tokens -= tokens
            return True
        return False

# In-memory rate limiter (single process only)
class InMemoryRateLimiter:
    def __init__(self):
        self._buckets: dict[str, TokenBucket] = {}
        self._lock = asyncio.Lock()

    async def is_allowed(
        self,
        key: str,
        capacity: float = 100.0,
        refill_rate: float = 10.0,  # 10 requests per second
        cost: float = 1.0,
    ) -> bool:
        async with self._lock:
            if key not in self._buckets:
                self._buckets[key] = TokenBucket(
                    capacity=capacity,
                    refill_rate=refill_rate,
                    current_tokens=capacity,
                    last_refill=time.monotonic(),
                )
            return self._buckets[key].consume(cost)

Redis-Backed Distributed Rate Limiter

For multi-instance deployments, the rate limiter state must live in Redis:

Python
# rate_limiting/redis_limiter.py
import time
import redis.asyncio as aioredis
from fastapi import Request, HTTPException

class RedisTokenBucketLimiter:
    def __init__(self, redis: aioredis.Redis):
        self.redis = redis

    async def is_allowed(
        self,
        key: str,
        capacity: int = 100,
        refill_rate: float = 10.0,
        cost: int = 1,
    ) -> tuple[bool, dict]:
        """
        Atomic token bucket using Redis Lua script.
        Returns (allowed, metadata).
        """
        now = time.time()

        # Lua script for atomic check-and-consume
        lua_script = """
        local key = KEYS[1]
        local capacity = tonumber(ARGV[1])
        local refill_rate = tonumber(ARGV[2])
        local cost = tonumber(ARGV[3])
        local now = tonumber(ARGV[4])

        -- Get current state
        local current = redis.call('HMGET', key, 'tokens', 'last_refill')
        local tokens = tonumber(current[1]) or capacity
        local last_refill = tonumber(current[2]) or now

        -- Refill tokens
        local elapsed = now - last_refill
        tokens = math.min(capacity, tokens + elapsed * refill_rate)

        -- Check if allowed
        local allowed = 0
        if tokens >= cost then
            tokens = tokens - cost
            allowed = 1
        end

        -- Save state with 1-hour TTL
        redis.call('HMSET', key, 'tokens', tokens, 'last_refill', now)
        redis.call('EXPIRE', key, 3600)

        return {allowed, math.floor(tokens)}
        """

        result = await self.redis.eval(
            lua_script,
            1,  # num keys
            key,
            capacity,
            refill_rate,
            cost,
            now,
        )

        allowed = bool(result[0])
        remaining = result[1]

        return allowed, {
            "limit": capacity,
            "remaining": remaining,
            "reset_after": cost / refill_rate,
        }

FastAPI Rate Limiting Middleware

Python
# middleware/rate_limit.py
from fastapi import Request, Response
from fastapi.responses import JSONResponse
import os

class AIRateLimitMiddleware:
    def __init__(self, app, redis):
        self.app = app
        self.limiter = RedisTokenBucketLimiter(redis)

    async def __call__(self, scope, receive, send):
        if scope["type"] != "http":
            await self.app(scope, receive, send)
            return

        request = Request(scope, receive)

        # Identify the requester
        user_id = request.headers.get("X-User-ID")
        api_key = request.headers.get("X-API-Key")
        ip = request.client.host

        # Three-tier limits: per API key > per user > per IP
        if api_key:
            key = f"rate:apikey:{api_key}"
            capacity, rate = 1000, 100.0  # High limit for API key clients
        elif user_id:
            key = f"rate:user:{user_id}"
            capacity, rate = 100, 10.0    # Standard user limit
        else:
            key = f"rate:ip:{ip}"
            capacity, rate = 20, 2.0      # Strict limit for anonymous

        allowed, meta = await self.limiter.is_allowed(key, capacity, rate)

        if not allowed:
            response = JSONResponse(
                status_code=429,
                content={
                    "error": "rate_limited",
                    "message": "Too many requests. Please slow down.",
                    "retry_after_seconds": meta["reset_after"],
                },
                headers={
                    "X-RateLimit-Limit": str(meta["limit"]),
                    "X-RateLimit-Remaining": str(meta["remaining"]),
                    "Retry-After": str(int(meta["reset_after"])),
                },
            )
            await response(scope, receive, send)
            return

        # Add rate limit headers to all responses
        async def send_with_headers(message):
            if message["type"] == "http.response.start":
                headers = dict(message.get("headers", []))
                headers[b"x-ratelimit-limit"] = str(meta["limit"]).encode()
                headers[b"x-ratelimit-remaining"] = str(meta["remaining"]).encode()
                message["headers"] = list(headers.items())
            await send(message)

        await self.app(scope, receive, send_with_headers)

Token-Based Rate Limiting for AI APIs

For LLM endpoints, limit by tokens consumed, not request count:

Python
@router.post("/api/chat")
async def chat(
    request: ChatRequest,
    raw_request: Request,
    limiter: RedisTokenBucketLimiter = Depends(get_limiter),
):
    user_id = get_user_id(raw_request)

    # Pre-flight token estimate (before calling LLM)
    estimated_tokens = estimate_tokens(request.message) + 500  # prompt + expected response

    # Rate limit by estimated tokens
    allowed, meta = await limiter.is_allowed(
        key=f"tokens:user:{user_id}",
        capacity=50_000,    # 50k tokens per hour
        refill_rate=14.0,   # ~50k / 3600 tokens/second
        cost=estimated_tokens,
    )

    if not allowed:
        raise HTTPException(
            status_code=429,
            detail={
                "error": "token_quota_exceeded",
                "message": f"You've used your hourly token quota. Retry after {meta['reset_after']:.0f} seconds.",
            }
        )

    # Make the LLM call
    response = await generate_response(request)

    # Update with actual tokens used (correct the estimate)
    actual_tokens = response.usage.total_tokens
    correction = actual_tokens - estimated_tokens
    if correction > 0:
        await limiter.is_allowed(
            key=f"tokens:user:{user_id}",
            capacity=50_000,
            refill_rate=14.0,
            cost=correction,
        )

    return {"answer": response.text}

Graduated Response

Don't just block — warn users as they approach limits:

Python
async def check_with_graduated_response(
    user_id: str,
    limiter: RedisTokenBucketLimiter,
) -> dict:
    allowed, meta = await limiter.is_allowed(f"rate:user:{user_id}")

    # Under 10% remaining  warn
    if allowed and meta["remaining"] < meta["limit"] * 0.1:
        return {
            "allowed": True,
            "warning": f"You have {meta['remaining']} requests remaining in this window.",
        }

    # 0 remaining  block
    if not allowed:
        return {
            "allowed": False,
            "error": "rate_limited",
            "retry_after": meta["reset_after"],
        }

    return {"allowed": True}

Recommended Limits by Use Case

| Use case | Requests/hour | Tokens/hour | Notes | |---|---|---|---| | Anonymous user | 20 | 10,000 | Strict — prevent abuse | | Authenticated user | 200 | 100,000 | Standard | | Premium user | 2,000 | 1,000,000 | Paid tier | | API key (server-to-server) | 10,000 | 5,000,000 | Trusted integration |

Always alert when a single user hits 80% of their limit — may indicate a bug in their client code or a scraping attack.