Learnixo

AI Safety & Guardrails · Lesson 7 of 15

Detection: Regex Patterns and Semantic Classifiers

Why Output Detection Matters

Input filtering prevents known attack patterns. But LLMs can produce unsafe content even from innocuous inputs — through model failures, edge cases, or subtle jailbreaks that slipped past input filters. Output detection is the last line of defence before content reaches users.

Output detection answers: "Is this generated text safe to show?"

The answer must come back quickly (latency budget) and accurately (low false positives + low false negatives). These goals conflict, which is why production systems layer multiple approaches.


Approach 1: Classifier-Based Detection

A safety classifier is a model (usually a fine-tuned BERT or similar) trained to label text as safe or unsafe across one or more categories.

Pros and cons

| Aspect | Details | |---|---| | Latency | Very fast — under 50ms on GPU, under 200ms on CPU | | Cost | Near zero after model loading | | Accuracy | High for known categories, poor for novel attacks | | Coverage | Only what it was trained to detect |

Example with a Hugging Face classifier

Python
from transformers import pipeline
import time

# Load a toxicity classifier fine-tuned for safety use cases
# In production, use a model fine-tuned on your domain's policy
toxicity_classifier = pipeline(
    "text-classification",
    model="unitary/toxic-bert",
    device=-1,  # CPU; use 0 for GPU
    truncation=True,
    max_length=512
)

# Multi-label classifier for safety categories
safety_classifier = pipeline(
    "text-classification",
    model="KoalaAI/Text-Moderation",
    device=-1,
    truncation=True,
    max_length=512
)

def classify_output_safety(text: str) -> dict:
    """
    Run text through multiple safety classifiers.
    Returns aggregated verdict with per-classifier scores.
    """
    start = time.time()
    
    # Toxicity check
    toxicity_result = toxicity_classifier(text)[0]
    
    # Broader safety category check
    safety_result = safety_classifier(text)[0]
    
    latency_ms = (time.time() - start) * 1000
    
    # Map to unified verdict
    is_toxic = (
        toxicity_result["label"].upper() == "TOXIC"
        and toxicity_result["score"] > 0.7
    )
    
    is_unsafe_category = (
        safety_result["label"] not in ("OK", "safe")
        and safety_result["score"] > 0.6
    )
    
    return {
        "text_preview": text[:100] + "..." if len(text) > 100 else text,
        "toxicity": {
            "label": toxicity_result["label"],
            "score": round(toxicity_result["score"], 3)
        },
        "safety_category": {
            "label": safety_result["label"],
            "score": round(safety_result["score"], 3)
        },
        "is_unsafe": is_toxic or is_unsafe_category,
        "latency_ms": round(latency_ms, 1),
        "action": "BLOCK" if (is_toxic or is_unsafe_category) else "PASS"
    }

# Usage
result = classify_output_safety("This is a helpful and informative response about Python.")
print(result)

Approach 2: Rule-Based Detection

Regular expressions and pattern matching for high-confidence unsafe signals. Very fast, zero cost, zero latency — but brittle and easily bypassed.

Use rule-based detection for high-confidence known bad patterns, not as a sole defence.

Python
import re
import ipaddress
from typing import Optional

# PII patterns
PII_PATTERNS = {
    "email": re.compile(r'\b[A-Za-z0-9._%+\-]+@[A-Za-z0-9.\-]+\.[A-Z|a-z]{2,}\b'),
    "phone_us": re.compile(r'\b(\+1[-.\s]?)?(\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4})\b'),
    "ssn": re.compile(r'\b\d{3}-\d{2}-\d{4}\b'),
    "credit_card": re.compile(r'\b(?:4[0-9]{12}(?:[0-9]{3})?|5[1-5][0-9]{14}|3[47][0-9]{13})\b'),
    "uk_nino": re.compile(r'\b[A-Z]{2}\d{6}[A-Z]\b'),
}

# URLs in outputs (may indicate phishing or data exfiltration)
URL_PATTERN = re.compile(
    r'https?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+'
)

# Code patterns that suggest dangerous operations
DANGEROUS_CODE_PATTERNS = {
    "shell_injection": re.compile(r'(os\.system|subprocess\.call|subprocess\.run|eval\(|exec\()', re.IGNORECASE),
    "file_deletion": re.compile(r'(rm\s+-rf|rmdir\s+/s|shutil\.rmtree)', re.IGNORECASE),
    "network_exfil": re.compile(r'(curl|wget|requests\.post|httpx\.post)\s+.*https?://', re.IGNORECASE),
}

# Medical advice patterns (flag for professional disclaimer)
MEDICAL_ADVICE_PATTERNS = [
    re.compile(r'\byou\s+should\s+(take|stop\s+taking|increase|decrease)\s+\w+\b', re.IGNORECASE),
    re.compile(r'\bdose\s+(of|is)\s+\d+\s*(mg|mcg|ml)\b', re.IGNORECASE),
    re.compile(r'\bprescrib(e|ed|ing)\b', re.IGNORECASE),
]

def rule_based_output_check(text: str) -> dict:
    """
    Fast rule-based check for known unsafe patterns in LLM output.
    Complements classifier-based approaches.
    """
    findings = {}
    
    # PII detection
    pii_found = {}
    for pii_type, pattern in PII_PATTERNS.items():
        matches = pattern.findall(text)
        if matches:
            pii_found[pii_type] = len(matches)
    if pii_found:
        findings["pii"] = pii_found
    
    # URL detection
    urls = URL_PATTERN.findall(text)
    if urls:
        findings["external_urls"] = urls[:5]  # Cap at 5 for the report
    
    # Dangerous code patterns
    code_flags = {}
    for pattern_name, pattern in DANGEROUS_CODE_PATTERNS.items():
        if pattern.search(text):
            code_flags[pattern_name] = True
    if code_flags:
        findings["dangerous_code"] = code_flags
    
    # Medical advice
    medical_flags = []
    for pattern in MEDICAL_ADVICE_PATTERNS:
        match = pattern.search(text)
        if match:
            medical_flags.append(match.group(0))
    if medical_flags:
        findings["medical_advice_signals"] = medical_flags
    
    # Risk assessment
    high_risk = bool(pii_found.get("ssn") or pii_found.get("credit_card") or code_flags)
    medium_risk = bool(pii_found or urls or medical_flags)
    
    return {
        "findings": findings,
        "risk_level": "HIGH" if high_risk else ("MEDIUM" if medium_risk else "LOW"),
        "action": "BLOCK" if high_risk else ("FLAG" if medium_risk else "PASS"),
        "finding_count": sum(len(v) if isinstance(v, (list, dict)) else 1 for v in findings.values())
    }

# Example
result = rule_based_output_check(
    "You should take 500mg of ibuprofen twice daily. Contact me at user@example.com"
)
print(result)

Approach 3: LLM-as-Judge

Use a second (often more capable) LLM to evaluate whether the first LLM's output is safe. More flexible than classifiers, more accurate for complex policy, but adds latency and cost.

Python
from anthropic import Anthropic
import json

client = Anthropic()

JUDGE_SYSTEM_PROMPT = """
You are a content safety evaluator for an AI assistant.
Your job is to evaluate whether an AI-generated response is safe to show to users.

Evaluate the response against these safety categories:
- HARMFUL_CONTENT: violence, self-harm, dangerous instructions
- HATE_SPEECH: discriminatory content targeting protected groups  
- SEXUAL_CONTENT: explicit sexual material
- PII_EXPOSURE: personal identifiable information belonging to real people
- MEDICAL_ADVICE: specific medical diagnosis or treatment recommendations without disclaimer
- LEGAL_ADVICE: specific legal advice without professional disclaimer
- MISINFORMATION: demonstrably false factual claims stated as fact
- PROMPT_LEAK: reveals system prompt or internal instructions

Respond ONLY with this JSON structure (no markdown):
{
  "safe": true/false,
  "categories_flagged": ["CATEGORY1", "CATEGORY2"],
  "severity": "none|low|medium|high",
  "reasoning": "one sentence explanation",
  "recommended_action": "PASS|FLAG|BLOCK"
}
"""

def llm_judge_output(
    user_query: str,
    ai_response: str,
    context: Optional[str] = None
) -> dict:
    """
    Use a second LLM call to evaluate safety of the first LLM's response.
    Returns a structured safety verdict.
    """
    eval_content = f"""
ORIGINAL USER QUERY:
{user_query}

AI RESPONSE TO EVALUATE:
{ai_response}
"""
    if context:
        eval_content = f"CONTEXT:\n{context}\n\n" + eval_content
    
    try:
        response = client.messages.create(
            model="claude-sonnet-4-6",
            max_tokens=400,
            system=JUDGE_SYSTEM_PROMPT,
            messages=[{"role": "user", "content": eval_content}]
        )
        
        raw = response.content[0].text.strip()
        result = json.loads(raw)
        return result
    except json.JSONDecodeError:
        return {
            "safe": False,
            "categories_flagged": ["PARSE_ERROR"],
            "severity": "medium",
            "reasoning": "Safety judge returned unparseable response",
            "recommended_action": "FLAG"
        }
    except Exception as e:
        return {
            "safe": False,
            "categories_flagged": ["JUDGE_ERROR"],
            "severity": "medium",
            "reasoning": f"Safety judge error: {str(e)}",
            "recommended_action": "FLAG"
        }

# Usage
verdict = llm_judge_output(
    user_query="What is aspirin?",
    ai_response="Aspirin is an NSAID. You should take 1000mg every 4 hours if you have a fever."
)
print(json.dumps(verdict, indent=2))

Approach 4: OpenAI Moderation API

OpenAI provides a free moderation endpoint that classifies text across multiple harm categories. Useful as a cost-free baseline check.

Python
import openai
import json

openai_client = openai.OpenAI()  # Requires OPENAI_API_KEY env var

def openai_moderation_check(text: str) -> dict:
    """
    Run text through the OpenAI Moderation API.
    Returns category scores and overall flag.
    
    Categories:
    - harassment, harassment/threatening
    - hate, hate/threatening
    - illicit, illicit/violent
    - self-harm, self-harm/instructions, self-harm/intent
    - sexual, sexual/minors
    - violence, violence/graphic
    """
    response = openai_client.moderations.create(
        model="omni-moderation-latest",
        input=text
    )
    
    result = response.results[0]
    
    # Get all flagged categories
    flagged_categories = {
        category: score
        for category, score in result.category_scores.__dict__.items()
        if score > 0.1  # Only include categories with meaningful scores
    }
    
    # Sort by score descending
    flagged_sorted = dict(
        sorted(flagged_categories.items(), key=lambda x: x[1], reverse=True)
    )
    
    return {
        "flagged": result.flagged,
        "categories_flagged": [
            cat for cat, flagged in result.categories.__dict__.items() if flagged
        ],
        "top_category_scores": dict(list(flagged_sorted.items())[:5]),
        "action": "BLOCK" if result.flagged else "PASS",
        "api": "openai_moderation"
    }

# Usage
result = openai_moderation_check("How do I safely dispose of old medication?")
print(json.dumps(result, indent=2))

Approach 5: Azure Content Safety API

Azure Content Safety provides enterprise-grade content moderation with severity levels (0-7) per category. Suitable for regulated industries.

Python
import os
import requests
import json

def azure_content_safety_check(
    text: str,
    endpoint: str = None,
    api_key: str = None
) -> dict:
    """
    Check text with Azure Content Safety API.
    
    Categories and severity:
    - Hate: discrimination, prejudice (0=safe, 2=low, 4=medium, 6=high)
    - Violence: physical harm, dangerous acts
    - Sexual: sexual content
    - SelfHarm: self-harm content
    
    Severity 0-1: safe
    Severity 2-3: low risk
    Severity 4-5: medium risk
    Severity 6-7: high risk
    """
    endpoint = endpoint or os.environ.get("AZURE_CONTENT_SAFETY_ENDPOINT")
    api_key = api_key or os.environ.get("AZURE_CONTENT_SAFETY_KEY")
    
    if not endpoint or not api_key:
        return {"error": "Azure Content Safety credentials not configured"}
    
    url = f"{endpoint}/contentsafety/text:analyze?api-version=2024-09-01"
    
    headers = {
        "Ocp-Apim-Subscription-Key": api_key,
        "Content-Type": "application/json"
    }
    
    payload = {
        "text": text[:10000],  # API limit
        "categories": ["Hate", "Violence", "Sexual", "SelfHarm"],
        "outputType": "FourSeverityLevels"
    }
    
    try:
        response = requests.post(url, headers=headers, json=payload, timeout=10)
        response.raise_for_status()
        data = response.json()
        
        categories = data.get("categoriesAnalysis", [])
        max_severity = max((c["severity"] for c in categories), default=0)
        flagged_categories = [c for c in categories if c["severity"] >= 2]
        
        return {
            "flagged": max_severity >= 4,
            "max_severity": max_severity,
            "categories": [
                {
                    "category": c["category"],
                    "severity": c["severity"],
                    "severity_label": ["safe", "safe", "low", "low", "medium", "medium", "high", "high"][c["severity"]]
                }
                for c in categories
            ],
            "flagged_categories": [c["category"] for c in flagged_categories],
            "action": "BLOCK" if max_severity >= 4 else ("FLAG" if max_severity >= 2 else "PASS"),
            "api": "azure_content_safety"
        }
    
    except requests.RequestException as e:
        return {"error": str(e), "api": "azure_content_safety"}

# Usage
result = azure_content_safety_check("How do I whittle a knife?")
print(json.dumps(result, indent=2))

Putting It Together: Output Safety Pipeline

Python
import time
from anthropic import Anthropic

client = Anthropic()

class OutputSafetyPipeline:
    """
    Multi-layer output safety check.
    Fast checks run first; expensive checks run only if fast checks pass.
    """
    
    def __init__(
        self,
        use_openai_moderation: bool = True,
        use_azure_content_safety: bool = False,
        use_llm_judge: bool = True,
        use_rule_based: bool = True
    ):
        self.use_openai = use_openai_moderation
        self.use_azure = use_azure_content_safety
        self.use_llm_judge = use_llm_judge
        self.use_rules = use_rule_based
    
    def check(self, user_query: str, ai_response: str) -> dict:
        start = time.time()
        checks = {}
        blocked = False
        
        # Layer 1: Rule-based (fastest, free)
        if self.use_rules:
            rule_result = rule_based_output_check(ai_response)
            checks["rule_based"] = rule_result
            if rule_result["action"] == "BLOCK":
                blocked = True
        
        # Layer 2: OpenAI Moderation (fast, free)
        if self.use_openai and not blocked:
            try:
                mod_result = openai_moderation_check(ai_response)
                checks["openai_moderation"] = mod_result
                if mod_result["action"] == "BLOCK":
                    blocked = True
            except Exception as e:
                checks["openai_moderation"] = {"error": str(e)}
        
        # Layer 3: Azure Content Safety (if configured)
        if self.use_azure and not blocked:
            azure_result = azure_content_safety_check(ai_response)
            checks["azure_content_safety"] = azure_result
            if azure_result.get("action") == "BLOCK":
                blocked = True
        
        # Layer 4: LLM judge (slowest, most accurate for complex cases)
        if self.use_llm_judge and not blocked:
            judge_result = llm_judge_output(user_query, ai_response)
            checks["llm_judge"] = judge_result
            if judge_result.get("recommended_action") == "BLOCK":
                blocked = True
        
        total_ms = round((time.time() - start) * 1000, 1)
        
        # Final verdict: BLOCK if any layer blocks; FLAG if any layer flags
        any_flag = any(
            c.get("action") == "FLAG" or c.get("recommended_action") == "FLAG"
            for c in checks.values()
            if isinstance(c, dict)
        )
        
        return {
            "safe_to_show": not blocked,
            "final_action": "BLOCK" if blocked else ("FLAG" if any_flag else "PASS"),
            "checks_run": list(checks.keys()),
            "check_results": checks,
            "total_latency_ms": total_ms,
            "response_length_chars": len(ai_response)
        }


# Usage
pipeline = OutputSafetyPipeline(
    use_openai_moderation=True,
    use_azure_content_safety=False,  # Requires Azure subscription
    use_llm_judge=True,
    use_rule_based=True
)

def safe_generate(user_query: str) -> dict:
    # Generate AI response
    response = client.messages.create(
        model="claude-sonnet-4-6",
        max_tokens=400,
        messages=[{"role": "user", "content": user_query}]
    )
    ai_response = response.content[0].text
    
    # Run through safety pipeline
    safety_result = pipeline.check(user_query, ai_response)
    
    if safety_result["safe_to_show"]:
        return {"response": ai_response, "safety": safety_result}
    else:
        return {
            "response": "I'm unable to provide a response to that request.",
            "blocked": True,
            "safety": safety_result
        }

result = safe_generate("What is the capital of France?")
print(f"Action: {result['safety']['final_action']}")
if not result.get('blocked'):
    print(f"Response: {result['response'][:100]}")

Summary

| Method | Latency | Cost | Accuracy | Best For | |---|---|---|---|---| | Rule-based | Under 5ms | Free | Low | High-confidence known patterns | | Classifier (BERT) | 50-200ms | Free after load | Medium | Broad toxicity, volume at scale | | OpenAI Moderation | 200-500ms | Free | Medium-High | Standard policy categories | | Azure Content Safety | 200-600ms | Pay-per-call | High | Enterprise, regulated industries | | LLM-as-judge | 1-3s | Pay-per-token | Highest | Complex policy, low volume |

Layer them: rules first, moderation API second, LLM judge only for borderline cases.