Rate Limiting and Abuse Prevention
Implement token-bucket rate limiting for AI APIs to control costs and prevent abuse. Redis-backed sliding window limiter, per-user and per-IP limits, with graduated response.
Why Rate Limiting Matters More for AI APIs
A standard REST API rate limit prevents DoS attacks. An AI API rate limit prevents both DoS attacks AND runaway costs. Each LLM call costs money. An attacker (or a misconfigured client) hitting your endpoint 10,000 times will cost you money, not just CPU.
For AI services, rate limit on tokens consumed, not just request count. A user who sends one request with a 50,000-token context costs as much as 50 normal requests.
Token Bucket Algorithm
The token bucket is the standard rate limiting algorithm. It allows bursting while enforcing a long-term rate limit.
Concept:
- A bucket holds tokens up to capacity C
- Tokens refill at rate R tokens/second
- Each request consumes T tokens (T = 1 for request-based, T = tokens_used for token-based)
- If the bucket has fewer than T tokens, the request is rejected
# rate_limiting/token_bucket.py
import time
import asyncio
from dataclasses import dataclass
@dataclass
class TokenBucket:
capacity: float # Maximum tokens in bucket
refill_rate: float # Tokens added per second
current_tokens: float # Current token count
last_refill: float # Timestamp of last refill
def consume(self, tokens: float = 1.0) -> bool:
"""Try to consume tokens. Returns True if allowed, False if rate limited."""
now = time.monotonic()
# Add tokens based on elapsed time
elapsed = now - self.last_refill
self.current_tokens = min(
self.capacity,
self.current_tokens + elapsed * self.refill_rate,
)
self.last_refill = now
if self.current_tokens >= tokens:
self.current_tokens -= tokens
return True
return False
# In-memory rate limiter (single process only)
class InMemoryRateLimiter:
def __init__(self):
self._buckets: dict[str, TokenBucket] = {}
self._lock = asyncio.Lock()
async def is_allowed(
self,
key: str,
capacity: float = 100.0,
refill_rate: float = 10.0, # 10 requests per second
cost: float = 1.0,
) -> bool:
async with self._lock:
if key not in self._buckets:
self._buckets[key] = TokenBucket(
capacity=capacity,
refill_rate=refill_rate,
current_tokens=capacity,
last_refill=time.monotonic(),
)
return self._buckets[key].consume(cost)Redis-Backed Distributed Rate Limiter
For multi-instance deployments, the rate limiter state must live in Redis:
# rate_limiting/redis_limiter.py
import time
import redis.asyncio as aioredis
from fastapi import Request, HTTPException
class RedisTokenBucketLimiter:
def __init__(self, redis: aioredis.Redis):
self.redis = redis
async def is_allowed(
self,
key: str,
capacity: int = 100,
refill_rate: float = 10.0,
cost: int = 1,
) -> tuple[bool, dict]:
"""
Atomic token bucket using Redis Lua script.
Returns (allowed, metadata).
"""
now = time.time()
# Lua script for atomic check-and-consume
lua_script = """
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local refill_rate = tonumber(ARGV[2])
local cost = tonumber(ARGV[3])
local now = tonumber(ARGV[4])
-- Get current state
local current = redis.call('HMGET', key, 'tokens', 'last_refill')
local tokens = tonumber(current[1]) or capacity
local last_refill = tonumber(current[2]) or now
-- Refill tokens
local elapsed = now - last_refill
tokens = math.min(capacity, tokens + elapsed * refill_rate)
-- Check if allowed
local allowed = 0
if tokens >= cost then
tokens = tokens - cost
allowed = 1
end
-- Save state with 1-hour TTL
redis.call('HMSET', key, 'tokens', tokens, 'last_refill', now)
redis.call('EXPIRE', key, 3600)
return {allowed, math.floor(tokens)}
"""
result = await self.redis.eval(
lua_script,
1, # num keys
key,
capacity,
refill_rate,
cost,
now,
)
allowed = bool(result[0])
remaining = result[1]
return allowed, {
"limit": capacity,
"remaining": remaining,
"reset_after": cost / refill_rate,
}FastAPI Rate Limiting Middleware
# middleware/rate_limit.py
from fastapi import Request, Response
from fastapi.responses import JSONResponse
import os
class AIRateLimitMiddleware:
def __init__(self, app, redis):
self.app = app
self.limiter = RedisTokenBucketLimiter(redis)
async def __call__(self, scope, receive, send):
if scope["type"] != "http":
await self.app(scope, receive, send)
return
request = Request(scope, receive)
# Identify the requester
user_id = request.headers.get("X-User-ID")
api_key = request.headers.get("X-API-Key")
ip = request.client.host
# Three-tier limits: per API key > per user > per IP
if api_key:
key = f"rate:apikey:{api_key}"
capacity, rate = 1000, 100.0 # High limit for API key clients
elif user_id:
key = f"rate:user:{user_id}"
capacity, rate = 100, 10.0 # Standard user limit
else:
key = f"rate:ip:{ip}"
capacity, rate = 20, 2.0 # Strict limit for anonymous
allowed, meta = await self.limiter.is_allowed(key, capacity, rate)
if not allowed:
response = JSONResponse(
status_code=429,
content={
"error": "rate_limited",
"message": "Too many requests. Please slow down.",
"retry_after_seconds": meta["reset_after"],
},
headers={
"X-RateLimit-Limit": str(meta["limit"]),
"X-RateLimit-Remaining": str(meta["remaining"]),
"Retry-After": str(int(meta["reset_after"])),
},
)
await response(scope, receive, send)
return
# Add rate limit headers to all responses
async def send_with_headers(message):
if message["type"] == "http.response.start":
headers = dict(message.get("headers", []))
headers[b"x-ratelimit-limit"] = str(meta["limit"]).encode()
headers[b"x-ratelimit-remaining"] = str(meta["remaining"]).encode()
message["headers"] = list(headers.items())
await send(message)
await self.app(scope, receive, send_with_headers)Token-Based Rate Limiting for AI APIs
For LLM endpoints, limit by tokens consumed, not request count:
@router.post("/api/chat")
async def chat(
request: ChatRequest,
raw_request: Request,
limiter: RedisTokenBucketLimiter = Depends(get_limiter),
):
user_id = get_user_id(raw_request)
# Pre-flight token estimate (before calling LLM)
estimated_tokens = estimate_tokens(request.message) + 500 # prompt + expected response
# Rate limit by estimated tokens
allowed, meta = await limiter.is_allowed(
key=f"tokens:user:{user_id}",
capacity=50_000, # 50k tokens per hour
refill_rate=14.0, # ~50k / 3600 tokens/second
cost=estimated_tokens,
)
if not allowed:
raise HTTPException(
status_code=429,
detail={
"error": "token_quota_exceeded",
"message": f"You've used your hourly token quota. Retry after {meta['reset_after']:.0f} seconds.",
}
)
# Make the LLM call
response = await generate_response(request)
# Update with actual tokens used (correct the estimate)
actual_tokens = response.usage.total_tokens
correction = actual_tokens - estimated_tokens
if correction > 0:
await limiter.is_allowed(
key=f"tokens:user:{user_id}",
capacity=50_000,
refill_rate=14.0,
cost=correction,
)
return {"answer": response.text}Graduated Response
Don't just block ā warn users as they approach limits:
async def check_with_graduated_response(
user_id: str,
limiter: RedisTokenBucketLimiter,
) -> dict:
allowed, meta = await limiter.is_allowed(f"rate:user:{user_id}")
# Under 10% remaining ā warn
if allowed and meta["remaining"] < meta["limit"] * 0.1:
return {
"allowed": True,
"warning": f"You have {meta['remaining']} requests remaining in this window.",
}
# 0 remaining ā block
if not allowed:
return {
"allowed": False,
"error": "rate_limited",
"retry_after": meta["reset_after"],
}
return {"allowed": True}Recommended Limits by Use Case
| Use case | Requests/hour | Tokens/hour | Notes | |---|---|---|---| | Anonymous user | 20 | 10,000 | Strict ā prevent abuse | | Authenticated user | 200 | 100,000 | Standard | | Premium user | 2,000 | 1,000,000 | Paid tier | | API key (server-to-server) | 10,000 | 5,000,000 | Trusted integration |
Always alert when a single user hits 80% of their limit ā may indicate a bug in their client code or a scraping attack.
Found this helpful?
Leave a comment
Have a question, correction, or just found this helpful? Leave a note below.