Detecting Unsafe Outputs
Build multi-layer output safety detection using classifier-based approaches, rule-based filters, LLM-as-judge, and the OpenAI Moderation and Azure Content Safety APIs — with working Python examples.
Why Output Detection Matters
Input filtering prevents known attack patterns. But LLMs can produce unsafe content even from innocuous inputs — through model failures, edge cases, or subtle jailbreaks that slipped past input filters. Output detection is the last line of defence before content reaches users.
Output detection answers: "Is this generated text safe to show?"
The answer must come back quickly (latency budget) and accurately (low false positives + low false negatives). These goals conflict, which is why production systems layer multiple approaches.
Approach 1: Classifier-Based Detection
A safety classifier is a model (usually a fine-tuned BERT or similar) trained to label text as safe or unsafe across one or more categories.
Pros and cons
| Aspect | Details | |---|---| | Latency | Very fast — under 50ms on GPU, under 200ms on CPU | | Cost | Near zero after model loading | | Accuracy | High for known categories, poor for novel attacks | | Coverage | Only what it was trained to detect |
Example with a Hugging Face classifier
from transformers import pipeline
import time
# Load a toxicity classifier fine-tuned for safety use cases
# In production, use a model fine-tuned on your domain's policy
toxicity_classifier = pipeline(
"text-classification",
model="unitary/toxic-bert",
device=-1, # CPU; use 0 for GPU
truncation=True,
max_length=512
)
# Multi-label classifier for safety categories
safety_classifier = pipeline(
"text-classification",
model="KoalaAI/Text-Moderation",
device=-1,
truncation=True,
max_length=512
)
def classify_output_safety(text: str) -> dict:
"""
Run text through multiple safety classifiers.
Returns aggregated verdict with per-classifier scores.
"""
start = time.time()
# Toxicity check
toxicity_result = toxicity_classifier(text)[0]
# Broader safety category check
safety_result = safety_classifier(text)[0]
latency_ms = (time.time() - start) * 1000
# Map to unified verdict
is_toxic = (
toxicity_result["label"].upper() == "TOXIC"
and toxicity_result["score"] > 0.7
)
is_unsafe_category = (
safety_result["label"] not in ("OK", "safe")
and safety_result["score"] > 0.6
)
return {
"text_preview": text[:100] + "..." if len(text) > 100 else text,
"toxicity": {
"label": toxicity_result["label"],
"score": round(toxicity_result["score"], 3)
},
"safety_category": {
"label": safety_result["label"],
"score": round(safety_result["score"], 3)
},
"is_unsafe": is_toxic or is_unsafe_category,
"latency_ms": round(latency_ms, 1),
"action": "BLOCK" if (is_toxic or is_unsafe_category) else "PASS"
}
# Usage
result = classify_output_safety("This is a helpful and informative response about Python.")
print(result)Approach 2: Rule-Based Detection
Regular expressions and pattern matching for high-confidence unsafe signals. Very fast, zero cost, zero latency — but brittle and easily bypassed.
Use rule-based detection for high-confidence known bad patterns, not as a sole defence.
import re
import ipaddress
from typing import Optional
# PII patterns
PII_PATTERNS = {
"email": re.compile(r'\b[A-Za-z0-9._%+\-]+@[A-Za-z0-9.\-]+\.[A-Z|a-z]{2,}\b'),
"phone_us": re.compile(r'\b(\+1[-.\s]?)?(\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4})\b'),
"ssn": re.compile(r'\b\d{3}-\d{2}-\d{4}\b'),
"credit_card": re.compile(r'\b(?:4[0-9]{12}(?:[0-9]{3})?|5[1-5][0-9]{14}|3[47][0-9]{13})\b'),
"uk_nino": re.compile(r'\b[A-Z]{2}\d{6}[A-Z]\b'),
}
# URLs in outputs (may indicate phishing or data exfiltration)
URL_PATTERN = re.compile(
r'https?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+'
)
# Code patterns that suggest dangerous operations
DANGEROUS_CODE_PATTERNS = {
"shell_injection": re.compile(r'(os\.system|subprocess\.call|subprocess\.run|eval\(|exec\()', re.IGNORECASE),
"file_deletion": re.compile(r'(rm\s+-rf|rmdir\s+/s|shutil\.rmtree)', re.IGNORECASE),
"network_exfil": re.compile(r'(curl|wget|requests\.post|httpx\.post)\s+.*https?://', re.IGNORECASE),
}
# Medical advice patterns (flag for professional disclaimer)
MEDICAL_ADVICE_PATTERNS = [
re.compile(r'\byou\s+should\s+(take|stop\s+taking|increase|decrease)\s+\w+\b', re.IGNORECASE),
re.compile(r'\bdose\s+(of|is)\s+\d+\s*(mg|mcg|ml)\b', re.IGNORECASE),
re.compile(r'\bprescrib(e|ed|ing)\b', re.IGNORECASE),
]
def rule_based_output_check(text: str) -> dict:
"""
Fast rule-based check for known unsafe patterns in LLM output.
Complements classifier-based approaches.
"""
findings = {}
# PII detection
pii_found = {}
for pii_type, pattern in PII_PATTERNS.items():
matches = pattern.findall(text)
if matches:
pii_found[pii_type] = len(matches)
if pii_found:
findings["pii"] = pii_found
# URL detection
urls = URL_PATTERN.findall(text)
if urls:
findings["external_urls"] = urls[:5] # Cap at 5 for the report
# Dangerous code patterns
code_flags = {}
for pattern_name, pattern in DANGEROUS_CODE_PATTERNS.items():
if pattern.search(text):
code_flags[pattern_name] = True
if code_flags:
findings["dangerous_code"] = code_flags
# Medical advice
medical_flags = []
for pattern in MEDICAL_ADVICE_PATTERNS:
match = pattern.search(text)
if match:
medical_flags.append(match.group(0))
if medical_flags:
findings["medical_advice_signals"] = medical_flags
# Risk assessment
high_risk = bool(pii_found.get("ssn") or pii_found.get("credit_card") or code_flags)
medium_risk = bool(pii_found or urls or medical_flags)
return {
"findings": findings,
"risk_level": "HIGH" if high_risk else ("MEDIUM" if medium_risk else "LOW"),
"action": "BLOCK" if high_risk else ("FLAG" if medium_risk else "PASS"),
"finding_count": sum(len(v) if isinstance(v, (list, dict)) else 1 for v in findings.values())
}
# Example
result = rule_based_output_check(
"You should take 500mg of ibuprofen twice daily. Contact me at user@example.com"
)
print(result)Approach 3: LLM-as-Judge
Use a second (often more capable) LLM to evaluate whether the first LLM's output is safe. More flexible than classifiers, more accurate for complex policy, but adds latency and cost.
from anthropic import Anthropic
import json
client = Anthropic()
JUDGE_SYSTEM_PROMPT = """
You are a content safety evaluator for an AI assistant.
Your job is to evaluate whether an AI-generated response is safe to show to users.
Evaluate the response against these safety categories:
- HARMFUL_CONTENT: violence, self-harm, dangerous instructions
- HATE_SPEECH: discriminatory content targeting protected groups
- SEXUAL_CONTENT: explicit sexual material
- PII_EXPOSURE: personal identifiable information belonging to real people
- MEDICAL_ADVICE: specific medical diagnosis or treatment recommendations without disclaimer
- LEGAL_ADVICE: specific legal advice without professional disclaimer
- MISINFORMATION: demonstrably false factual claims stated as fact
- PROMPT_LEAK: reveals system prompt or internal instructions
Respond ONLY with this JSON structure (no markdown):
{
"safe": true/false,
"categories_flagged": ["CATEGORY1", "CATEGORY2"],
"severity": "none|low|medium|high",
"reasoning": "one sentence explanation",
"recommended_action": "PASS|FLAG|BLOCK"
}
"""
def llm_judge_output(
user_query: str,
ai_response: str,
context: Optional[str] = None
) -> dict:
"""
Use a second LLM call to evaluate safety of the first LLM's response.
Returns a structured safety verdict.
"""
eval_content = f"""
ORIGINAL USER QUERY:
{user_query}
AI RESPONSE TO EVALUATE:
{ai_response}
"""
if context:
eval_content = f"CONTEXT:\n{context}\n\n" + eval_content
try:
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=400,
system=JUDGE_SYSTEM_PROMPT,
messages=[{"role": "user", "content": eval_content}]
)
raw = response.content[0].text.strip()
result = json.loads(raw)
return result
except json.JSONDecodeError:
return {
"safe": False,
"categories_flagged": ["PARSE_ERROR"],
"severity": "medium",
"reasoning": "Safety judge returned unparseable response",
"recommended_action": "FLAG"
}
except Exception as e:
return {
"safe": False,
"categories_flagged": ["JUDGE_ERROR"],
"severity": "medium",
"reasoning": f"Safety judge error: {str(e)}",
"recommended_action": "FLAG"
}
# Usage
verdict = llm_judge_output(
user_query="What is aspirin?",
ai_response="Aspirin is an NSAID. You should take 1000mg every 4 hours if you have a fever."
)
print(json.dumps(verdict, indent=2))Approach 4: OpenAI Moderation API
OpenAI provides a free moderation endpoint that classifies text across multiple harm categories. Useful as a cost-free baseline check.
import openai
import json
openai_client = openai.OpenAI() # Requires OPENAI_API_KEY env var
def openai_moderation_check(text: str) -> dict:
"""
Run text through the OpenAI Moderation API.
Returns category scores and overall flag.
Categories:
- harassment, harassment/threatening
- hate, hate/threatening
- illicit, illicit/violent
- self-harm, self-harm/instructions, self-harm/intent
- sexual, sexual/minors
- violence, violence/graphic
"""
response = openai_client.moderations.create(
model="omni-moderation-latest",
input=text
)
result = response.results[0]
# Get all flagged categories
flagged_categories = {
category: score
for category, score in result.category_scores.__dict__.items()
if score > 0.1 # Only include categories with meaningful scores
}
# Sort by score descending
flagged_sorted = dict(
sorted(flagged_categories.items(), key=lambda x: x[1], reverse=True)
)
return {
"flagged": result.flagged,
"categories_flagged": [
cat for cat, flagged in result.categories.__dict__.items() if flagged
],
"top_category_scores": dict(list(flagged_sorted.items())[:5]),
"action": "BLOCK" if result.flagged else "PASS",
"api": "openai_moderation"
}
# Usage
result = openai_moderation_check("How do I safely dispose of old medication?")
print(json.dumps(result, indent=2))Approach 5: Azure Content Safety API
Azure Content Safety provides enterprise-grade content moderation with severity levels (0-7) per category. Suitable for regulated industries.
import os
import requests
import json
def azure_content_safety_check(
text: str,
endpoint: str = None,
api_key: str = None
) -> dict:
"""
Check text with Azure Content Safety API.
Categories and severity:
- Hate: discrimination, prejudice (0=safe, 2=low, 4=medium, 6=high)
- Violence: physical harm, dangerous acts
- Sexual: sexual content
- SelfHarm: self-harm content
Severity 0-1: safe
Severity 2-3: low risk
Severity 4-5: medium risk
Severity 6-7: high risk
"""
endpoint = endpoint or os.environ.get("AZURE_CONTENT_SAFETY_ENDPOINT")
api_key = api_key or os.environ.get("AZURE_CONTENT_SAFETY_KEY")
if not endpoint or not api_key:
return {"error": "Azure Content Safety credentials not configured"}
url = f"{endpoint}/contentsafety/text:analyze?api-version=2024-09-01"
headers = {
"Ocp-Apim-Subscription-Key": api_key,
"Content-Type": "application/json"
}
payload = {
"text": text[:10000], # API limit
"categories": ["Hate", "Violence", "Sexual", "SelfHarm"],
"outputType": "FourSeverityLevels"
}
try:
response = requests.post(url, headers=headers, json=payload, timeout=10)
response.raise_for_status()
data = response.json()
categories = data.get("categoriesAnalysis", [])
max_severity = max((c["severity"] for c in categories), default=0)
flagged_categories = [c for c in categories if c["severity"] >= 2]
return {
"flagged": max_severity >= 4,
"max_severity": max_severity,
"categories": [
{
"category": c["category"],
"severity": c["severity"],
"severity_label": ["safe", "safe", "low", "low", "medium", "medium", "high", "high"][c["severity"]]
}
for c in categories
],
"flagged_categories": [c["category"] for c in flagged_categories],
"action": "BLOCK" if max_severity >= 4 else ("FLAG" if max_severity >= 2 else "PASS"),
"api": "azure_content_safety"
}
except requests.RequestException as e:
return {"error": str(e), "api": "azure_content_safety"}
# Usage
result = azure_content_safety_check("How do I whittle a knife?")
print(json.dumps(result, indent=2))Putting It Together: Output Safety Pipeline
import time
from anthropic import Anthropic
client = Anthropic()
class OutputSafetyPipeline:
"""
Multi-layer output safety check.
Fast checks run first; expensive checks run only if fast checks pass.
"""
def __init__(
self,
use_openai_moderation: bool = True,
use_azure_content_safety: bool = False,
use_llm_judge: bool = True,
use_rule_based: bool = True
):
self.use_openai = use_openai_moderation
self.use_azure = use_azure_content_safety
self.use_llm_judge = use_llm_judge
self.use_rules = use_rule_based
def check(self, user_query: str, ai_response: str) -> dict:
start = time.time()
checks = {}
blocked = False
# Layer 1: Rule-based (fastest, free)
if self.use_rules:
rule_result = rule_based_output_check(ai_response)
checks["rule_based"] = rule_result
if rule_result["action"] == "BLOCK":
blocked = True
# Layer 2: OpenAI Moderation (fast, free)
if self.use_openai and not blocked:
try:
mod_result = openai_moderation_check(ai_response)
checks["openai_moderation"] = mod_result
if mod_result["action"] == "BLOCK":
blocked = True
except Exception as e:
checks["openai_moderation"] = {"error": str(e)}
# Layer 3: Azure Content Safety (if configured)
if self.use_azure and not blocked:
azure_result = azure_content_safety_check(ai_response)
checks["azure_content_safety"] = azure_result
if azure_result.get("action") == "BLOCK":
blocked = True
# Layer 4: LLM judge (slowest, most accurate for complex cases)
if self.use_llm_judge and not blocked:
judge_result = llm_judge_output(user_query, ai_response)
checks["llm_judge"] = judge_result
if judge_result.get("recommended_action") == "BLOCK":
blocked = True
total_ms = round((time.time() - start) * 1000, 1)
# Final verdict: BLOCK if any layer blocks; FLAG if any layer flags
any_flag = any(
c.get("action") == "FLAG" or c.get("recommended_action") == "FLAG"
for c in checks.values()
if isinstance(c, dict)
)
return {
"safe_to_show": not blocked,
"final_action": "BLOCK" if blocked else ("FLAG" if any_flag else "PASS"),
"checks_run": list(checks.keys()),
"check_results": checks,
"total_latency_ms": total_ms,
"response_length_chars": len(ai_response)
}
# Usage
pipeline = OutputSafetyPipeline(
use_openai_moderation=True,
use_azure_content_safety=False, # Requires Azure subscription
use_llm_judge=True,
use_rule_based=True
)
def safe_generate(user_query: str) -> dict:
# Generate AI response
response = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=400,
messages=[{"role": "user", "content": user_query}]
)
ai_response = response.content[0].text
# Run through safety pipeline
safety_result = pipeline.check(user_query, ai_response)
if safety_result["safe_to_show"]:
return {"response": ai_response, "safety": safety_result}
else:
return {
"response": "I'm unable to provide a response to that request.",
"blocked": True,
"safety": safety_result
}
result = safe_generate("What is the capital of France?")
print(f"Action: {result['safety']['final_action']}")
if not result.get('blocked'):
print(f"Response: {result['response'][:100]}")Summary
| Method | Latency | Cost | Accuracy | Best For | |---|---|---|---|---| | Rule-based | Under 5ms | Free | Low | High-confidence known patterns | | Classifier (BERT) | 50-200ms | Free after load | Medium | Broad toxicity, volume at scale | | OpenAI Moderation | 200-500ms | Free | Medium-High | Standard policy categories | | Azure Content Safety | 200-600ms | Pay-per-call | High | Enterprise, regulated industries | | LLM-as-judge | 1-3s | Pay-per-token | Highest | Complex policy, low volume |
Layer them: rules first, moderation API second, LLM judge only for borderline cases.
Found this helpful?
Leave a comment
Have a question, correction, or just found this helpful? Leave a note below.