Project: Multi-Database E-commerce Architecture
Build a production-grade e-commerce backend that combines PostgreSQL, MongoDB, Redis, and Elasticsearch โ each database doing what it does best. Full schema, queries, and architecture decisions.
What You'll Build
A production e-commerce backend that uses four databases in harmony, each selected for the access patterns it handles best:
| Database | Responsibility | |---|---| | PostgreSQL | Orders, payments, inventory, users โ ACID-critical | | MongoDB | Product catalog โ variable attributes per category | | Redis | Cart sessions, product cache, rate limiting, leaderboard | | Elasticsearch | Product search, filtering, autocomplete |
This is how companies like Shopify, Zalando, and ASOS actually structure their data layer.
Architecture Overview
Client
โ
API Gateway (rate limiting via Redis)
โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Application Services โ
โโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโฌโโโโโโโโโโโโฌโโโโโโโโโโโโค
โ User/Auth โ Order/Pay โ Catalog โ Search โ
โ Service โ Service โ Service โ Service โ
โโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโผโโโโโโโโโโโโผโโโโโโโโโโโโค
โ PostgreSQL โ PostgreSQL โ MongoDB โ Elastic โ
โ (users, โ (orders, โ (product โ search โ
โ sessions) โ payments, โ catalog)โ index โ
โ โ inventory) โ โ โ
โโโโโโโโโโโโโโโโดโโโโโโโโโโโโโโโดโโโโโโโโโโโโดโโโโโโโโโโโโ
โ
Redis (shared)
cart ยท cache ยท rate limitStep 1: PostgreSQL โ Transactional Core
Schema
-- Users & auth
CREATE TABLE users (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
email TEXT NOT NULL UNIQUE,
password_hash TEXT,
display_name TEXT,
plan TEXT NOT NULL DEFAULT 'free',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- Inventory (source of truth)
CREATE TABLE inventory (
sku TEXT PRIMARY KEY,
name TEXT NOT NULL,
stock INT NOT NULL DEFAULT 0 CHECK (stock >= 0),
reserved INT NOT NULL DEFAULT 0 CHECK (reserved >= 0),
unit_price_cents INT NOT NULL,
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- Orders
CREATE TABLE orders (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID NOT NULL REFERENCES users(id),
status TEXT NOT NULL DEFAULT 'pending'
CHECK (status IN ('pending','confirmed','shipped','delivered','cancelled','refunded')),
total_cents INT NOT NULL,
currency CHAR(3) NOT NULL DEFAULT 'USD',
metadata JSONB DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE order_items (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
order_id UUID NOT NULL REFERENCES orders(id) ON DELETE CASCADE,
sku TEXT NOT NULL REFERENCES inventory(sku),
quantity INT NOT NULL CHECK (quantity > 0),
unit_price_cents INT NOT NULL, -- snapshot at order time
product_name TEXT NOT NULL -- snapshot (product name can change)
);
-- Payments
CREATE TABLE payments (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
order_id UUID NOT NULL REFERENCES orders(id),
provider TEXT NOT NULL, -- 'stripe', 'paypal'
provider_ref TEXT NOT NULL UNIQUE,
amount_cents INT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending'
CHECK (status IN ('pending','succeeded','failed','refunded')),
processed_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- Indexes
CREATE INDEX idx_orders_user ON orders(user_id, created_at DESC);
CREATE INDEX idx_orders_status ON orders(status, created_at DESC)
WHERE status IN ('pending','confirmed');
CREATE INDEX idx_order_items_order ON order_items(order_id);
CREATE INDEX idx_payments_order ON payments(order_id);Order Placement Transaction
-- Atomic: reserve stock + create order in one transaction
CREATE OR REPLACE FUNCTION place_order(
p_user_id UUID,
p_items JSONB -- [{"sku":"P001","qty":2},...]
) RETURNS UUID AS $$
DECLARE
v_order_id UUID := gen_random_uuid();
v_total INT := 0;
v_item JSONB;
v_sku TEXT;
v_qty INT;
v_price INT;
v_name TEXT;
BEGIN
-- 1. Lock and validate inventory for each item
FOR v_item IN SELECT * FROM jsonb_array_elements(p_items)
LOOP
v_sku := v_item->>'sku';
v_qty := (v_item->>'qty')::INT;
SELECT unit_price_cents, name INTO v_price, v_name
FROM inventory
WHERE sku = v_sku AND (stock - reserved) >= v_qty
FOR UPDATE; -- row-level lock
IF NOT FOUND THEN
RAISE EXCEPTION 'Insufficient stock for SKU %', v_sku;
END IF;
-- Reserve inventory
UPDATE inventory
SET reserved = reserved + v_qty, updated_at = NOW()
WHERE sku = v_sku;
v_total := v_total + (v_price * v_qty);
END LOOP;
-- 2. Create order
INSERT INTO orders (id, user_id, total_cents)
VALUES (v_order_id, p_user_id, v_total);
-- 3. Create order items
FOR v_item IN SELECT * FROM jsonb_array_elements(p_items)
LOOP
v_sku := v_item->>'sku';
v_qty := (v_item->>'qty')::INT;
SELECT unit_price_cents, name INTO v_price, v_name
FROM inventory WHERE sku = v_sku;
INSERT INTO order_items (order_id, sku, quantity, unit_price_cents, product_name)
VALUES (v_order_id, v_sku, v_qty, v_price, v_name);
END LOOP;
RETURN v_order_id;
END;
$$ LANGUAGE plpgsql;
-- Usage
SELECT place_order(
'user-uuid-here',
'[{"sku":"LAPTOP-001","qty":1},{"sku":"MOUSE-001","qty":2}]'
);Step 2: MongoDB โ Product Catalog
Product attributes vary dramatically by category โ a laptop has ram_gb, a shirt has size and color. MongoDB handles this naturally.
// Product schema โ flexible attrs per category
const productSchema = {
_id: ObjectId,
sku: String, // matches PostgreSQL inventory.sku
name: String,
slug: String, // URL-friendly
category: String,
brand: String,
description: String,
images: [{ url: String, alt: String, isPrimary: Boolean }],
attrs: Object, // category-specific: { ram_gb: 32, screen_inch: 14 }
variants: [{
sku: String,
attrs: Object, // variant-specific: { color: 'black', size: 'L' }
priceCents: Number,
}],
tags: [String],
ratings: {
avg: Number,
count: Number,
},
publishedAt: Date,
updatedAt: Date,
}
// Insert laptop
db.products.insertOne({
sku: "LAPTOP-001",
name: "ThinkPad X1 Carbon Gen 12",
slug: "thinkpad-x1-carbon-gen-12",
category: "laptops",
brand: "Lenovo",
description: "Ultra-light business laptop with 12th Gen Intel Core i7.",
images: [
{ url: "/images/thinkpad-x1-front.jpg", alt: "Front view", isPrimary: true },
{ url: "/images/thinkpad-x1-side.jpg", alt: "Side view", isPrimary: false },
],
attrs: { ram_gb: 32, storage_gb: 1000, screen_inch: 14, weight_kg: 1.12,
cpu: "Intel Core i7-1260P", os: "Windows 11 Pro", battery_wh: 57 },
tags: ["laptop", "business", "ultralight", "lenovo"],
ratings: { avg: 4.7, count: 284 },
publishedAt: new Date(),
updatedAt: new Date(),
})
// Query: find all laptops with 32GB+ RAM, sorted by rating
db.products.find({
category: "laptops",
"attrs.ram_gb": { $gte: 32 },
publishedAt: { $lte: new Date() }
}).sort({ "ratings.avg": -1 }).limit(20)
// Index for this query
db.products.createIndex({ category: 1, "attrs.ram_gb": 1, "ratings.avg": -1 })
db.products.createIndex({ slug: 1 }, { unique: true })
db.products.createIndex({ tags: 1 })Step 3: Redis โ Cart, Cache, Rate Limiting
import redis
import json
from datetime import timedelta
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
# โโ Cart (Hash per user) โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
def add_to_cart(user_id: str, sku: str, qty: int):
key = f"cart:{user_id}"
existing = r.hget(key, sku)
new_qty = (int(existing) if existing else 0) + qty
r.hset(key, sku, new_qty)
r.expire(key, int(timedelta(days=30).total_seconds()))
def get_cart(user_id: str) -> dict:
return r.hgetall(f"cart:{user_id}") # { "SKU-001": "2", "SKU-002": "1" }
def remove_from_cart(user_id: str, sku: str):
r.hdel(f"cart:{user_id}", sku)
def clear_cart(user_id: str):
r.delete(f"cart:{user_id}")
# โโ Product cache (String with TTL) โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
def cache_product(sku: str, product: dict, ttl_minutes: int = 15):
r.setex(f"product:{sku}", timedelta(minutes=ttl_minutes), json.dumps(product))
def get_cached_product(sku: str) -> dict | None:
data = r.get(f"product:{sku}")
return json.loads(data) if data else None
# โโ Rate limiting (Sliding window) โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
RATE_LIMIT_SCRIPT = """
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
redis.call('ZREMRANGEBYSCORE', key, 0, now - window * 1000)
local count = redis.call('ZCARD', key)
if count < limit then
redis.call('ZADD', key, now, now)
redis.call('EXPIRE', key, window)
return 1
else
return 0
end
"""
_rate_limit_sha = None
def is_allowed(user_id: str, limit: int = 100, window_seconds: int = 60) -> bool:
global _rate_limit_sha
if not _rate_limit_sha:
_rate_limit_sha = r.script_load(RATE_LIMIT_SCRIPT)
import time
now_ms = int(time.time() * 1000)
result = r.evalsha(_rate_limit_sha, 1,
f"ratelimit:{user_id}", limit, window_seconds, now_ms)
return bool(result)
# โโ Leaderboard (Sorted Set) โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
def add_xp(user_id: str, xp: int):
r.zincrby("leaderboard:daily", xp, user_id)
def get_top_users(n: int = 10):
return r.zrevrange("leaderboard:daily", 0, n - 1, withscores=True)
def get_rank(user_id: str) -> int:
rank = r.zrevrank("leaderboard:daily", user_id)
return rank + 1 if rank is not None else -1Step 4: Elasticsearch โ Product Search
from elasticsearch import Elasticsearch
es = Elasticsearch("http://localhost:9200")
# Create index with mapping
es.indices.create(index="products", body={
"mappings": {
"properties": {
"sku": { "type": "keyword" },
"name": { "type": "text", "analyzer": "english",
"fields": { "raw": { "type": "keyword" } } },
"category": { "type": "keyword" },
"brand": { "type": "keyword" },
"description": { "type": "text", "analyzer": "english" },
"tags": { "type": "keyword" },
"priceCents": { "type": "integer" },
"rating": { "type": "float" },
"attrs": { "type": "object", "dynamic": True },
}
},
"settings": {
"number_of_shards": 1,
"number_of_replicas": 1,
}
})
# Search with filters + facets + full-text
def search_products(query: str, category: str = None,
min_price: int = None, max_price: int = None,
sort_by: str = "_score", page: int = 1, size: int = 20):
must = [{ "multi_match": {
"query": query,
"fields": ["name^3", "brand^2", "description", "tags"],
"fuzziness": "AUTO"
}}]
filters = []
if category:
filters.append({ "term": { "category": category } })
if min_price or max_price:
price_range = {}
if min_price: price_range["gte"] = min_price
if max_price: price_range["lte"] = max_price
filters.append({ "range": { "priceCents": price_range } })
return es.search(index="products", body={
"from": (page - 1) * size,
"size": size,
"query": {
"bool": { "must": must, "filter": filters }
},
"sort": [{ sort_by: "desc" }, "_score"],
"aggs": {
"by_category": { "terms": { "field": "category", "size": 20 } },
"by_brand": { "terms": { "field": "brand", "size": 20 } },
"price_range": {
"range": {
"field": "priceCents",
"ranges": [
{ "key": "Under $50", "to": 5000 },
{ "key": "$50โ$200", "from": 5000, "to": 20000 },
{ "key": "$200โ$500", "from": 20000, "to": 50000 },
{ "key": "Over $500", "from": 50000 },
]
}
}
},
"highlight": {
"fields": { "name": {}, "description": { "fragment_size": 150 } }
},
"suggest": {
"did_you_mean": {
"text": query,
"phrase": { "field": "name", "size": 3 }
}
}
})Keeping Databases in Sync
When MongoDB product data changes, Elasticsearch must be updated.
# Event-driven sync using Change Streams (MongoDB) โ Elasticsearch
import asyncio
from motor.motor_asyncio import AsyncIOMotorClient
async def sync_to_elasticsearch():
client = AsyncIOMotorClient("mongodb://localhost:27017")
db = client.myapp
async with db.products.watch([
{ "$match": { "operationType": { "$in": ["insert", "update", "replace", "delete"] } } }
]) as stream:
async for change in stream:
op = change["operationType"]
if op in ("insert", "replace"):
doc = change["fullDocument"]
es.index(index="products", id=str(doc["_id"]), document={
"sku": doc["sku"],
"name": doc["name"],
"category": doc["category"],
"brand": doc.get("brand"),
"description": doc.get("description"),
"tags": doc.get("tags", []),
"priceCents": doc.get("priceCents"),
"rating": doc.get("ratings", {}).get("avg"),
"attrs": doc.get("attrs", {}),
})
elif op == "update":
doc_id = str(change["documentKey"]["_id"])
updated = change.get("updateDescription", {}).get("updatedFields", {})
if updated:
es.update(index="products", id=doc_id, doc=updated)
elif op == "delete":
doc_id = str(change["documentKey"]["_id"])
es.delete(index="products", id=doc_id, ignore=[404])
asyncio.run(sync_to_elasticsearch())Final Architecture Summary
Checkout flow:
1. GET /cart โ Redis HGETALL cart:{userId}
2. GET /products/{sku} โ Redis cache โ MongoDB (cache miss)
3. POST /orders โ PostgreSQL transaction (reserve stock + create order)
4. POST /payments โ PostgreSQL (record payment)
5. ORDER CONFIRMED โ Redis (clear cart) + PostgreSQL (deduct inventory)
Search flow:
1. GET /search?q=laptop โ Elasticsearch (full-text + filters + facets)
2. GET /products/{slug} โ Redis cache โ MongoDB (full product details)
Admin flow:
1. Revenue analytics โ PostgreSQL (JOIN orders + order_items + users)
2. Inventory management โ PostgreSQL (stock levels, reservations)
3. Product updates โ MongoDB โ Change Stream โ Elasticsearch syncThis architecture scales each layer independently. Product search can handle 10,000 req/s on Elasticsearch without touching PostgreSQL. Cart operations hit Redis at sub-millisecond latency. Transactional integrity is never compromised because all money/inventory operations stay in PostgreSQL.
Enjoyed this article?
Explore the Data Engineering learning path for more.
Found this helpful?
Leave a comment
Have a question, correction, or just found this helpful? Leave a note below.