Back to blog
Data Engineeringadvanced

Project: Multi-Database E-commerce Architecture

Build a production-grade e-commerce backend that combines PostgreSQL, MongoDB, Redis, and Elasticsearch โ€” each database doing what it does best. Full schema, queries, and architecture decisions.

LearnixoApril 17, 20269 min read
ProjectPostgreSQLMongoDBRedisElasticsearchArchitectureDatabase Design
Share:๐•

What You'll Build

A production e-commerce backend that uses four databases in harmony, each selected for the access patterns it handles best:

| Database | Responsibility | |---|---| | PostgreSQL | Orders, payments, inventory, users โ€” ACID-critical | | MongoDB | Product catalog โ€” variable attributes per category | | Redis | Cart sessions, product cache, rate limiting, leaderboard | | Elasticsearch | Product search, filtering, autocomplete |

This is how companies like Shopify, Zalando, and ASOS actually structure their data layer.


Architecture Overview

Client
  โ†“
API Gateway (rate limiting via Redis)
  โ†“
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                  Application Services                โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚  User/Auth   โ”‚  Order/Pay   โ”‚  Catalog  โ”‚  Search   โ”‚
โ”‚  Service     โ”‚  Service     โ”‚  Service  โ”‚  Service  โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚  PostgreSQL  โ”‚  PostgreSQL  โ”‚  MongoDB  โ”‚  Elastic  โ”‚
โ”‚  (users,     โ”‚  (orders,    โ”‚  (product โ”‚  search   โ”‚
โ”‚   sessions)  โ”‚   payments,  โ”‚   catalog)โ”‚  index    โ”‚
โ”‚              โ”‚   inventory) โ”‚           โ”‚           โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                        โ†‘
                  Redis (shared)
              cart ยท cache ยท rate limit

Step 1: PostgreSQL โ€” Transactional Core

Schema

SQL
-- Users & auth
CREATE TABLE users (
  id          UUID        PRIMARY KEY DEFAULT gen_random_uuid(),
  email       TEXT        NOT NULL UNIQUE,
  password_hash TEXT,
  display_name TEXT,
  plan        TEXT        NOT NULL DEFAULT 'free',
  created_at  TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- Inventory (source of truth)
CREATE TABLE inventory (
  sku         TEXT        PRIMARY KEY,
  name        TEXT        NOT NULL,
  stock       INT         NOT NULL DEFAULT 0 CHECK (stock >= 0),
  reserved    INT         NOT NULL DEFAULT 0 CHECK (reserved >= 0),
  unit_price_cents INT    NOT NULL,
  updated_at  TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- Orders
CREATE TABLE orders (
  id          UUID        PRIMARY KEY DEFAULT gen_random_uuid(),
  user_id     UUID        NOT NULL REFERENCES users(id),
  status      TEXT        NOT NULL DEFAULT 'pending'
              CHECK (status IN ('pending','confirmed','shipped','delivered','cancelled','refunded')),
  total_cents INT         NOT NULL,
  currency    CHAR(3)     NOT NULL DEFAULT 'USD',
  metadata    JSONB       DEFAULT '{}',
  created_at  TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  updated_at  TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE TABLE order_items (
  id          UUID        PRIMARY KEY DEFAULT gen_random_uuid(),
  order_id    UUID        NOT NULL REFERENCES orders(id) ON DELETE CASCADE,
  sku         TEXT        NOT NULL REFERENCES inventory(sku),
  quantity    INT         NOT NULL CHECK (quantity > 0),
  unit_price_cents INT   NOT NULL,  -- snapshot at order time
  product_name TEXT       NOT NULL  -- snapshot (product name can change)
);

-- Payments
CREATE TABLE payments (
  id              UUID    PRIMARY KEY DEFAULT gen_random_uuid(),
  order_id        UUID    NOT NULL REFERENCES orders(id),
  provider        TEXT    NOT NULL,  -- 'stripe', 'paypal'
  provider_ref    TEXT    NOT NULL UNIQUE,
  amount_cents    INT     NOT NULL,
  status          TEXT    NOT NULL DEFAULT 'pending'
                  CHECK (status IN ('pending','succeeded','failed','refunded')),
  processed_at    TIMESTAMPTZ,
  created_at      TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- Indexes
CREATE INDEX idx_orders_user ON orders(user_id, created_at DESC);
CREATE INDEX idx_orders_status ON orders(status, created_at DESC)
WHERE status IN ('pending','confirmed');
CREATE INDEX idx_order_items_order ON order_items(order_id);
CREATE INDEX idx_payments_order ON payments(order_id);

Order Placement Transaction

SQL
-- Atomic: reserve stock + create order in one transaction
CREATE OR REPLACE FUNCTION place_order(
  p_user_id   UUID,
  p_items     JSONB    -- [{"sku":"P001","qty":2},...]
) RETURNS UUID AS $$
DECLARE
  v_order_id  UUID := gen_random_uuid();
  v_total     INT  := 0;
  v_item      JSONB;
  v_sku       TEXT;
  v_qty       INT;
  v_price     INT;
  v_name      TEXT;
BEGIN
  -- 1. Lock and validate inventory for each item
  FOR v_item IN SELECT * FROM jsonb_array_elements(p_items)
  LOOP
    v_sku := v_item->>'sku';
    v_qty := (v_item->>'qty')::INT;

    SELECT unit_price_cents, name INTO v_price, v_name
    FROM inventory
    WHERE sku = v_sku AND (stock - reserved) >= v_qty
    FOR UPDATE;  -- row-level lock

    IF NOT FOUND THEN
      RAISE EXCEPTION 'Insufficient stock for SKU %', v_sku;
    END IF;

    -- Reserve inventory
    UPDATE inventory
    SET reserved = reserved + v_qty, updated_at = NOW()
    WHERE sku = v_sku;

    v_total := v_total + (v_price * v_qty);
  END LOOP;

  -- 2. Create order
  INSERT INTO orders (id, user_id, total_cents)
  VALUES (v_order_id, p_user_id, v_total);

  -- 3. Create order items
  FOR v_item IN SELECT * FROM jsonb_array_elements(p_items)
  LOOP
    v_sku := v_item->>'sku';
    v_qty := (v_item->>'qty')::INT;
    SELECT unit_price_cents, name INTO v_price, v_name
    FROM inventory WHERE sku = v_sku;

    INSERT INTO order_items (order_id, sku, quantity, unit_price_cents, product_name)
    VALUES (v_order_id, v_sku, v_qty, v_price, v_name);
  END LOOP;

  RETURN v_order_id;
END;
$$ LANGUAGE plpgsql;

-- Usage
SELECT place_order(
  'user-uuid-here',
  '[{"sku":"LAPTOP-001","qty":1},{"sku":"MOUSE-001","qty":2}]'
);

Step 2: MongoDB โ€” Product Catalog

Product attributes vary dramatically by category โ€” a laptop has ram_gb, a shirt has size and color. MongoDB handles this naturally.

JAVASCRIPT
// Product schema โ€” flexible attrs per category
const productSchema = {
  _id: ObjectId,
  sku: String,           // matches PostgreSQL inventory.sku
  name: String,
  slug: String,          // URL-friendly
  category: String,
  brand: String,
  description: String,
  images: [{ url: String, alt: String, isPrimary: Boolean }],
  attrs: Object,         // category-specific: { ram_gb: 32, screen_inch: 14 }
  variants: [{
    sku: String,
    attrs: Object,       // variant-specific: { color: 'black', size: 'L' }
    priceCents: Number,
  }],
  tags: [String],
  ratings: {
    avg: Number,
    count: Number,
  },
  publishedAt: Date,
  updatedAt: Date,
}

// Insert laptop
db.products.insertOne({
  sku: "LAPTOP-001",
  name: "ThinkPad X1 Carbon Gen 12",
  slug: "thinkpad-x1-carbon-gen-12",
  category: "laptops",
  brand: "Lenovo",
  description: "Ultra-light business laptop with 12th Gen Intel Core i7.",
  images: [
    { url: "/images/thinkpad-x1-front.jpg", alt: "Front view", isPrimary: true },
    { url: "/images/thinkpad-x1-side.jpg",  alt: "Side view",  isPrimary: false },
  ],
  attrs: { ram_gb: 32, storage_gb: 1000, screen_inch: 14, weight_kg: 1.12,
           cpu: "Intel Core i7-1260P", os: "Windows 11 Pro", battery_wh: 57 },
  tags: ["laptop", "business", "ultralight", "lenovo"],
  ratings: { avg: 4.7, count: 284 },
  publishedAt: new Date(),
  updatedAt: new Date(),
})

// Query: find all laptops with 32GB+ RAM, sorted by rating
db.products.find({
  category: "laptops",
  "attrs.ram_gb": { $gte: 32 },
  publishedAt: { $lte: new Date() }
}).sort({ "ratings.avg": -1 }).limit(20)

// Index for this query
db.products.createIndex({ category: 1, "attrs.ram_gb": 1, "ratings.avg": -1 })
db.products.createIndex({ slug: 1 }, { unique: true })
db.products.createIndex({ tags: 1 })

Step 3: Redis โ€” Cart, Cache, Rate Limiting

Python
import redis
import json
from datetime import timedelta

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

# โ”€โ”€ Cart (Hash per user) โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
def add_to_cart(user_id: str, sku: str, qty: int):
    key = f"cart:{user_id}"
    existing = r.hget(key, sku)
    new_qty = (int(existing) if existing else 0) + qty
    r.hset(key, sku, new_qty)
    r.expire(key, int(timedelta(days=30).total_seconds()))

def get_cart(user_id: str) -> dict:
    return r.hgetall(f"cart:{user_id}")  # { "SKU-001": "2", "SKU-002": "1" }

def remove_from_cart(user_id: str, sku: str):
    r.hdel(f"cart:{user_id}", sku)

def clear_cart(user_id: str):
    r.delete(f"cart:{user_id}")

# โ”€โ”€ Product cache (String with TTL) โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
def cache_product(sku: str, product: dict, ttl_minutes: int = 15):
    r.setex(f"product:{sku}", timedelta(minutes=ttl_minutes), json.dumps(product))

def get_cached_product(sku: str) -> dict | None:
    data = r.get(f"product:{sku}")
    return json.loads(data) if data else None

# โ”€โ”€ Rate limiting (Sliding window) โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
RATE_LIMIT_SCRIPT = """
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
redis.call('ZREMRANGEBYSCORE', key, 0, now - window * 1000)
local count = redis.call('ZCARD', key)
if count < limit then
    redis.call('ZADD', key, now, now)
    redis.call('EXPIRE', key, window)
    return 1
else
    return 0
end
"""
_rate_limit_sha = None

def is_allowed(user_id: str, limit: int = 100, window_seconds: int = 60) -> bool:
    global _rate_limit_sha
    if not _rate_limit_sha:
        _rate_limit_sha = r.script_load(RATE_LIMIT_SCRIPT)
    import time
    now_ms = int(time.time() * 1000)
    result = r.evalsha(_rate_limit_sha, 1,
        f"ratelimit:{user_id}", limit, window_seconds, now_ms)
    return bool(result)

# โ”€โ”€ Leaderboard (Sorted Set) โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
def add_xp(user_id: str, xp: int):
    r.zincrby("leaderboard:daily", xp, user_id)

def get_top_users(n: int = 10):
    return r.zrevrange("leaderboard:daily", 0, n - 1, withscores=True)

def get_rank(user_id: str) -> int:
    rank = r.zrevrank("leaderboard:daily", user_id)
    return rank + 1 if rank is not None else -1

Step 4: Elasticsearch โ€” Product Search

Python
from elasticsearch import Elasticsearch

es = Elasticsearch("http://localhost:9200")

# Create index with mapping
es.indices.create(index="products", body={
    "mappings": {
        "properties": {
            "sku":         { "type": "keyword" },
            "name":        { "type": "text",    "analyzer": "english",
                             "fields": { "raw": { "type": "keyword" } } },
            "category":    { "type": "keyword" },
            "brand":       { "type": "keyword" },
            "description": { "type": "text",    "analyzer": "english" },
            "tags":        { "type": "keyword" },
            "priceCents":  { "type": "integer" },
            "rating":      { "type": "float" },
            "attrs":       { "type": "object",  "dynamic": True },
        }
    },
    "settings": {
        "number_of_shards": 1,
        "number_of_replicas": 1,
    }
})

# Search with filters + facets + full-text
def search_products(query: str, category: str = None,
                    min_price: int = None, max_price: int = None,
                    sort_by: str = "_score", page: int = 1, size: int = 20):
    must = [{ "multi_match": {
        "query": query,
        "fields": ["name^3", "brand^2", "description", "tags"],
        "fuzziness": "AUTO"
    }}]
    filters = []
    if category:
        filters.append({ "term": { "category": category } })
    if min_price or max_price:
        price_range = {}
        if min_price: price_range["gte"] = min_price
        if max_price: price_range["lte"] = max_price
        filters.append({ "range": { "priceCents": price_range } })

    return es.search(index="products", body={
        "from": (page - 1) * size,
        "size": size,
        "query": {
            "bool": { "must": must, "filter": filters }
        },
        "sort": [{ sort_by: "desc" }, "_score"],
        "aggs": {
            "by_category": { "terms": { "field": "category", "size": 20 } },
            "by_brand":    { "terms": { "field": "brand",    "size": 20 } },
            "price_range": {
                "range": {
                    "field": "priceCents",
                    "ranges": [
                        { "key": "Under $50",   "to": 5000 },
                        { "key": "$50โ€“$200",    "from": 5000,  "to": 20000 },
                        { "key": "$200โ€“$500",   "from": 20000, "to": 50000 },
                        { "key": "Over $500",   "from": 50000 },
                    ]
                }
            }
        },
        "highlight": {
            "fields": { "name": {}, "description": { "fragment_size": 150 } }
        },
        "suggest": {
            "did_you_mean": {
                "text": query,
                "phrase": { "field": "name", "size": 3 }
            }
        }
    })

Keeping Databases in Sync

When MongoDB product data changes, Elasticsearch must be updated.

Python
# Event-driven sync using Change Streams (MongoDB) โ†’ Elasticsearch
import asyncio
from motor.motor_asyncio import AsyncIOMotorClient

async def sync_to_elasticsearch():
    client = AsyncIOMotorClient("mongodb://localhost:27017")
    db = client.myapp

    async with db.products.watch([
        { "$match": { "operationType": { "$in": ["insert", "update", "replace", "delete"] } } }
    ]) as stream:
        async for change in stream:
            op = change["operationType"]

            if op in ("insert", "replace"):
                doc = change["fullDocument"]
                es.index(index="products", id=str(doc["_id"]), document={
                    "sku":         doc["sku"],
                    "name":        doc["name"],
                    "category":    doc["category"],
                    "brand":       doc.get("brand"),
                    "description": doc.get("description"),
                    "tags":        doc.get("tags", []),
                    "priceCents":  doc.get("priceCents"),
                    "rating":      doc.get("ratings", {}).get("avg"),
                    "attrs":       doc.get("attrs", {}),
                })

            elif op == "update":
                doc_id = str(change["documentKey"]["_id"])
                updated = change.get("updateDescription", {}).get("updatedFields", {})
                if updated:
                    es.update(index="products", id=doc_id, doc=updated)

            elif op == "delete":
                doc_id = str(change["documentKey"]["_id"])
                es.delete(index="products", id=doc_id, ignore=[404])

asyncio.run(sync_to_elasticsearch())

Final Architecture Summary

Checkout flow:
1. GET /cart            โ†’ Redis HGETALL cart:{userId}
2. GET /products/{sku}  โ†’ Redis cache โ†’ MongoDB (cache miss)
3. POST /orders         โ†’ PostgreSQL transaction (reserve stock + create order)
4. POST /payments       โ†’ PostgreSQL (record payment)
5. ORDER CONFIRMED      โ†’ Redis (clear cart) + PostgreSQL (deduct inventory)

Search flow:
1. GET /search?q=laptop โ†’ Elasticsearch (full-text + filters + facets)
2. GET /products/{slug} โ†’ Redis cache โ†’ MongoDB (full product details)

Admin flow:
1. Revenue analytics    โ†’ PostgreSQL (JOIN orders + order_items + users)
2. Inventory management โ†’ PostgreSQL (stock levels, reservations)
3. Product updates      โ†’ MongoDB โ†’ Change Stream โ†’ Elasticsearch sync

This architecture scales each layer independently. Product search can handle 10,000 req/s on Elasticsearch without touching PostgreSQL. Cart operations hit Redis at sub-millisecond latency. Transactional integrity is never compromised because all money/inventory operations stay in PostgreSQL.

Enjoyed this article?

Explore the Data Engineering learning path for more.

Found this helpful?

Share:๐•

Leave a comment

Have a question, correction, or just found this helpful? Leave a note below.