SQL & NoSQL Databases: Complete Guide · Lesson 9 of 9

Project: Multi-Database E-commerce Backend

What You'll Build

A production e-commerce backend that uses four databases in harmony, each selected for the access patterns it handles best:

| Database | Responsibility | |---|---| | PostgreSQL | Orders, payments, inventory, users — ACID-critical | | MongoDB | Product catalog — variable attributes per category | | Redis | Cart sessions, product cache, rate limiting, leaderboard | | Elasticsearch | Product search, filtering, autocomplete |

This is how companies like Shopify, Zalando, and ASOS actually structure their data layer.


Architecture Overview

Client
  ↓
API Gateway (rate limiting via Redis)
  ↓
┌─────────────────────────────────────────────────────┐
│                  Application Services                │
├──────────────┬──────────────┬───────────┬───────────┤
│  User/Auth   │  Order/Pay   │  Catalog  │  Search   │
│  Service     │  Service     │  Service  │  Service  │
├──────────────┼──────────────┼───────────┼───────────┤
│  PostgreSQL  │  PostgreSQL  │  MongoDB  │  Elastic  │
│  (users,     │  (orders,    │  (product │  search   │
│   sessions)  │   payments,  │   catalog)│  index    │
│              │   inventory) │           │           │
└──────────────┴──────────────┴───────────┴───────────┘
                        ↑
                  Redis (shared)
              cart · cache · rate limit

Step 1: PostgreSQL — Transactional Core

Schema

SQL
-- Users & auth
CREATE TABLE users (
  id          UUID        PRIMARY KEY DEFAULT gen_random_uuid(),
  email       TEXT        NOT NULL UNIQUE,
  password_hash TEXT,
  display_name TEXT,
  plan        TEXT        NOT NULL DEFAULT 'free',
  created_at  TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- Inventory (source of truth)
CREATE TABLE inventory (
  sku         TEXT        PRIMARY KEY,
  name        TEXT        NOT NULL,
  stock       INT         NOT NULL DEFAULT 0 CHECK (stock >= 0),
  reserved    INT         NOT NULL DEFAULT 0 CHECK (reserved >= 0),
  unit_price_cents INT    NOT NULL,
  updated_at  TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- Orders
CREATE TABLE orders (
  id          UUID        PRIMARY KEY DEFAULT gen_random_uuid(),
  user_id     UUID        NOT NULL REFERENCES users(id),
  status      TEXT        NOT NULL DEFAULT 'pending'
              CHECK (status IN ('pending','confirmed','shipped','delivered','cancelled','refunded')),
  total_cents INT         NOT NULL,
  currency    CHAR(3)     NOT NULL DEFAULT 'USD',
  metadata    JSONB       DEFAULT '{}',
  created_at  TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  updated_at  TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE TABLE order_items (
  id          UUID        PRIMARY KEY DEFAULT gen_random_uuid(),
  order_id    UUID        NOT NULL REFERENCES orders(id) ON DELETE CASCADE,
  sku         TEXT        NOT NULL REFERENCES inventory(sku),
  quantity    INT         NOT NULL CHECK (quantity > 0),
  unit_price_cents INT   NOT NULL,  -- snapshot at order time
  product_name TEXT       NOT NULL  -- snapshot (product name can change)
);

-- Payments
CREATE TABLE payments (
  id              UUID    PRIMARY KEY DEFAULT gen_random_uuid(),
  order_id        UUID    NOT NULL REFERENCES orders(id),
  provider        TEXT    NOT NULL,  -- 'stripe', 'paypal'
  provider_ref    TEXT    NOT NULL UNIQUE,
  amount_cents    INT     NOT NULL,
  status          TEXT    NOT NULL DEFAULT 'pending'
                  CHECK (status IN ('pending','succeeded','failed','refunded')),
  processed_at    TIMESTAMPTZ,
  created_at      TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- Indexes
CREATE INDEX idx_orders_user ON orders(user_id, created_at DESC);
CREATE INDEX idx_orders_status ON orders(status, created_at DESC)
WHERE status IN ('pending','confirmed');
CREATE INDEX idx_order_items_order ON order_items(order_id);
CREATE INDEX idx_payments_order ON payments(order_id);

Order Placement Transaction

SQL
-- Atomic: reserve stock + create order in one transaction
CREATE OR REPLACE FUNCTION place_order(
  p_user_id   UUID,
  p_items     JSONB    -- [{"sku":"P001","qty":2},...]
) RETURNS UUID AS $$
DECLARE
  v_order_id  UUID := gen_random_uuid();
  v_total     INT  := 0;
  v_item      JSONB;
  v_sku       TEXT;
  v_qty       INT;
  v_price     INT;
  v_name      TEXT;
BEGIN
  -- 1. Lock and validate inventory for each item
  FOR v_item IN SELECT * FROM jsonb_array_elements(p_items)
  LOOP
    v_sku := v_item->>'sku';
    v_qty := (v_item->>'qty')::INT;

    SELECT unit_price_cents, name INTO v_price, v_name
    FROM inventory
    WHERE sku = v_sku AND (stock - reserved) >= v_qty
    FOR UPDATE;  -- row-level lock

    IF NOT FOUND THEN
      RAISE EXCEPTION 'Insufficient stock for SKU %', v_sku;
    END IF;

    -- Reserve inventory
    UPDATE inventory
    SET reserved = reserved + v_qty, updated_at = NOW()
    WHERE sku = v_sku;

    v_total := v_total + (v_price * v_qty);
  END LOOP;

  -- 2. Create order
  INSERT INTO orders (id, user_id, total_cents)
  VALUES (v_order_id, p_user_id, v_total);

  -- 3. Create order items
  FOR v_item IN SELECT * FROM jsonb_array_elements(p_items)
  LOOP
    v_sku := v_item->>'sku';
    v_qty := (v_item->>'qty')::INT;
    SELECT unit_price_cents, name INTO v_price, v_name
    FROM inventory WHERE sku = v_sku;

    INSERT INTO order_items (order_id, sku, quantity, unit_price_cents, product_name)
    VALUES (v_order_id, v_sku, v_qty, v_price, v_name);
  END LOOP;

  RETURN v_order_id;
END;
$$ LANGUAGE plpgsql;

-- Usage
SELECT place_order(
  'user-uuid-here',
  '[{"sku":"LAPTOP-001","qty":1},{"sku":"MOUSE-001","qty":2}]'
);

Step 2: MongoDB — Product Catalog

Product attributes vary dramatically by category — a laptop has ram_gb, a shirt has size and color. MongoDB handles this naturally.

JAVASCRIPT
// Product schema — flexible attrs per category
const productSchema = {
  _id: ObjectId,
  sku: String,           // matches PostgreSQL inventory.sku
  name: String,
  slug: String,          // URL-friendly
  category: String,
  brand: String,
  description: String,
  images: [{ url: String, alt: String, isPrimary: Boolean }],
  attrs: Object,         // category-specific: { ram_gb: 32, screen_inch: 14 }
  variants: [{
    sku: String,
    attrs: Object,       // variant-specific: { color: 'black', size: 'L' }
    priceCents: Number,
  }],
  tags: [String],
  ratings: {
    avg: Number,
    count: Number,
  },
  publishedAt: Date,
  updatedAt: Date,
}

// Insert laptop
db.products.insertOne({
  sku: "LAPTOP-001",
  name: "ThinkPad X1 Carbon Gen 12",
  slug: "thinkpad-x1-carbon-gen-12",
  category: "laptops",
  brand: "Lenovo",
  description: "Ultra-light business laptop with 12th Gen Intel Core i7.",
  images: [
    { url: "/images/thinkpad-x1-front.jpg", alt: "Front view", isPrimary: true },
    { url: "/images/thinkpad-x1-side.jpg",  alt: "Side view",  isPrimary: false },
  ],
  attrs: { ram_gb: 32, storage_gb: 1000, screen_inch: 14, weight_kg: 1.12,
           cpu: "Intel Core i7-1260P", os: "Windows 11 Pro", battery_wh: 57 },
  tags: ["laptop", "business", "ultralight", "lenovo"],
  ratings: { avg: 4.7, count: 284 },
  publishedAt: new Date(),
  updatedAt: new Date(),
})

// Query: find all laptops with 32GB+ RAM, sorted by rating
db.products.find({
  category: "laptops",
  "attrs.ram_gb": { $gte: 32 },
  publishedAt: { $lte: new Date() }
}).sort({ "ratings.avg": -1 }).limit(20)

// Index for this query
db.products.createIndex({ category: 1, "attrs.ram_gb": 1, "ratings.avg": -1 })
db.products.createIndex({ slug: 1 }, { unique: true })
db.products.createIndex({ tags: 1 })

Step 3: Redis — Cart, Cache, Rate Limiting

Python
import redis
import json
from datetime import timedelta

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

# ── Cart (Hash per user) ──────────────────────────────────────────────
def add_to_cart(user_id: str, sku: str, qty: int):
    key = f"cart:{user_id}"
    existing = r.hget(key, sku)
    new_qty = (int(existing) if existing else 0) + qty
    r.hset(key, sku, new_qty)
    r.expire(key, int(timedelta(days=30).total_seconds()))

def get_cart(user_id: str) -> dict:
    return r.hgetall(f"cart:{user_id}")  # { "SKU-001": "2", "SKU-002": "1" }

def remove_from_cart(user_id: str, sku: str):
    r.hdel(f"cart:{user_id}", sku)

def clear_cart(user_id: str):
    r.delete(f"cart:{user_id}")

# ── Product cache (String with TTL) ──────────────────────────────────
def cache_product(sku: str, product: dict, ttl_minutes: int = 15):
    r.setex(f"product:{sku}", timedelta(minutes=ttl_minutes), json.dumps(product))

def get_cached_product(sku: str) -> dict | None:
    data = r.get(f"product:{sku}")
    return json.loads(data) if data else None

# ── Rate limiting (Sliding window) ───────────────────────────────────
RATE_LIMIT_SCRIPT = """
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
redis.call('ZREMRANGEBYSCORE', key, 0, now - window * 1000)
local count = redis.call('ZCARD', key)
if count < limit then
    redis.call('ZADD', key, now, now)
    redis.call('EXPIRE', key, window)
    return 1
else
    return 0
end
"""
_rate_limit_sha = None

def is_allowed(user_id: str, limit: int = 100, window_seconds: int = 60) -> bool:
    global _rate_limit_sha
    if not _rate_limit_sha:
        _rate_limit_sha = r.script_load(RATE_LIMIT_SCRIPT)
    import time
    now_ms = int(time.time() * 1000)
    result = r.evalsha(_rate_limit_sha, 1,
        f"ratelimit:{user_id}", limit, window_seconds, now_ms)
    return bool(result)

# ── Leaderboard (Sorted Set) ──────────────────────────────────────────
def add_xp(user_id: str, xp: int):
    r.zincrby("leaderboard:daily", xp, user_id)

def get_top_users(n: int = 10):
    return r.zrevrange("leaderboard:daily", 0, n - 1, withscores=True)

def get_rank(user_id: str) -> int:
    rank = r.zrevrank("leaderboard:daily", user_id)
    return rank + 1 if rank is not None else -1

Step 4: Elasticsearch — Product Search

Python
from elasticsearch import Elasticsearch

es = Elasticsearch("http://localhost:9200")

# Create index with mapping
es.indices.create(index="products", body={
    "mappings": {
        "properties": {
            "sku":         { "type": "keyword" },
            "name":        { "type": "text",    "analyzer": "english",
                             "fields": { "raw": { "type": "keyword" } } },
            "category":    { "type": "keyword" },
            "brand":       { "type": "keyword" },
            "description": { "type": "text",    "analyzer": "english" },
            "tags":        { "type": "keyword" },
            "priceCents":  { "type": "integer" },
            "rating":      { "type": "float" },
            "attrs":       { "type": "object",  "dynamic": True },
        }
    },
    "settings": {
        "number_of_shards": 1,
        "number_of_replicas": 1,
    }
})

# Search with filters + facets + full-text
def search_products(query: str, category: str = None,
                    min_price: int = None, max_price: int = None,
                    sort_by: str = "_score", page: int = 1, size: int = 20):
    must = [{ "multi_match": {
        "query": query,
        "fields": ["name^3", "brand^2", "description", "tags"],
        "fuzziness": "AUTO"
    }}]
    filters = []
    if category:
        filters.append({ "term": { "category": category } })
    if min_price or max_price:
        price_range = {}
        if min_price: price_range["gte"] = min_price
        if max_price: price_range["lte"] = max_price
        filters.append({ "range": { "priceCents": price_range } })

    return es.search(index="products", body={
        "from": (page - 1) * size,
        "size": size,
        "query": {
            "bool": { "must": must, "filter": filters }
        },
        "sort": [{ sort_by: "desc" }, "_score"],
        "aggs": {
            "by_category": { "terms": { "field": "category", "size": 20 } },
            "by_brand":    { "terms": { "field": "brand",    "size": 20 } },
            "price_range": {
                "range": {
                    "field": "priceCents",
                    "ranges": [
                        { "key": "Under $50",   "to": 5000 },
                        { "key": "$50–$200",    "from": 5000,  "to": 20000 },
                        { "key": "$200–$500",   "from": 20000, "to": 50000 },
                        { "key": "Over $500",   "from": 50000 },
                    ]
                }
            }
        },
        "highlight": {
            "fields": { "name": {}, "description": { "fragment_size": 150 } }
        },
        "suggest": {
            "did_you_mean": {
                "text": query,
                "phrase": { "field": "name", "size": 3 }
            }
        }
    })

Keeping Databases in Sync

When MongoDB product data changes, Elasticsearch must be updated.

Python
# Event-driven sync using Change Streams (MongoDB)  Elasticsearch
import asyncio
from motor.motor_asyncio import AsyncIOMotorClient

async def sync_to_elasticsearch():
    client = AsyncIOMotorClient("mongodb://localhost:27017")
    db = client.myapp

    async with db.products.watch([
        { "$match": { "operationType": { "$in": ["insert", "update", "replace", "delete"] } } }
    ]) as stream:
        async for change in stream:
            op = change["operationType"]

            if op in ("insert", "replace"):
                doc = change["fullDocument"]
                es.index(index="products", id=str(doc["_id"]), document={
                    "sku":         doc["sku"],
                    "name":        doc["name"],
                    "category":    doc["category"],
                    "brand":       doc.get("brand"),
                    "description": doc.get("description"),
                    "tags":        doc.get("tags", []),
                    "priceCents":  doc.get("priceCents"),
                    "rating":      doc.get("ratings", {}).get("avg"),
                    "attrs":       doc.get("attrs", {}),
                })

            elif op == "update":
                doc_id = str(change["documentKey"]["_id"])
                updated = change.get("updateDescription", {}).get("updatedFields", {})
                if updated:
                    es.update(index="products", id=doc_id, doc=updated)

            elif op == "delete":
                doc_id = str(change["documentKey"]["_id"])
                es.delete(index="products", id=doc_id, ignore=[404])

asyncio.run(sync_to_elasticsearch())

Final Architecture Summary

Checkout flow:
1. GET /cart            → Redis HGETALL cart:{userId}
2. GET /products/{sku}  → Redis cache → MongoDB (cache miss)
3. POST /orders         → PostgreSQL transaction (reserve stock + create order)
4. POST /payments       → PostgreSQL (record payment)
5. ORDER CONFIRMED      → Redis (clear cart) + PostgreSQL (deduct inventory)

Search flow:
1. GET /search?q=laptop → Elasticsearch (full-text + filters + facets)
2. GET /products/{slug} → Redis cache → MongoDB (full product details)

Admin flow:
1. Revenue analytics    → PostgreSQL (JOIN orders + order_items + users)
2. Inventory management → PostgreSQL (stock levels, reservations)
3. Product updates      → MongoDB → Change Stream → Elasticsearch sync

This architecture scales each layer independently. Product search can handle 10,000 req/s on Elasticsearch without touching PostgreSQL. Cart operations hit Redis at sub-millisecond latency. Transactional integrity is never compromised because all money/inventory operations stay in PostgreSQL.