AI Systemsintermediate
RAG Citations and Source Attribution
Attribute answers to source documents in RAG systems. Inline citations, span-level grounding, citation verification, and trust indicators for clinical AI.
Asma Hafeez KhanMay 16, 20267 min read
RAGCitationsAttributionGroundingTrustClinical AI
Why Citations Matter
In clinical AI, an answer without a source is unacceptable. Citations:
- Allow clinicians to verify AI claims in primary sources
- Provide accountability when AI is wrong
- Help users calibrate trust (Lexicomp vs Reddit)
- Enable auditing of what knowledge the system used
Approach 1: Inline Citation Markers
Instruct the LLM to insert markers [1], [2] in the response text:
Python
from openai import OpenAI
import json
import re
client = OpenAI()
CITATION_SYSTEM_PROMPT = """You are a clinical pharmacology assistant. Answer questions using ONLY the provided context documents.
IMPORTANT CITATION RULES:
- After every factual claim, insert the citation marker [N] where N is the document number
- If multiple documents support a claim, use [1][2] or [1,2]
- Never make claims without a citation
- If documents contradict each other, cite both and note the disagreement
Format your response as a clear explanation with inline citations.
End your response with a "Sources used:" section listing which documents you cited."""
CITATION_USER_TEMPLATE = """{context_section}
Question: {question}
Answer with inline citations [1], [2], etc.:"""
def format_context_for_citation(documents: list[dict]) -> str:
"""Format retrieved documents with numbered labels for citation."""
sections = []
for i, doc in enumerate(documents, start=1):
source_label = doc.get("title", f"Document {i}")
source_info = f"[Document {i}: {source_label}]"
if doc.get("url"):
source_info += f" ({doc['url']})"
sections.append(f"{source_info}\n{doc['content']}")
return "\n\n---\n\n".join(sections)
def answer_with_citations(
question: str,
documents: list[dict],
model: str = "gpt-4o",
) -> dict:
"""
Generate an answer with inline citations referencing provided documents.
Returns answer text + structured citation list.
"""
if not documents:
return {
"answer": "I was unable to find relevant information to answer this question.",
"citations": [],
}
context_section = format_context_for_citation(documents)
response = client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": CITATION_SYSTEM_PROMPT},
{
"role": "user",
"content": CITATION_USER_TEMPLATE.format(
context_section=context_section,
question=question,
),
},
],
temperature=0,
)
answer_text = response.choices[0].message.content
# Parse which citations were actually used
used_numbers = set(int(n) for n in re.findall(r'\[(\d+)\]', answer_text))
citations = []
for num in sorted(used_numbers):
if 1 <= num <= len(documents):
doc = documents[num - 1]
citations.append({
"number": num,
"title": doc.get("title", f"Document {num}"),
"source": doc.get("source", ""),
"url": doc.get("url", ""),
"date": doc.get("date", ""),
"snippet": doc["content"][:200] + "...",
})
return {
"answer": answer_text,
"citations": citations,
"documents_used": len(used_numbers),
"documents_retrieved": len(documents),
}Approach 2: Structured JSON Citations
Return citations as structured data ā better for programmatic rendering:
Python
STRUCTURED_CITATION_PROMPT = """Answer the question using only the provided context documents.
Return JSON with this exact structure:
{{
"answer": "The complete answer text with [citation_id] markers inline",
"claims": [
{{
"text": "exact claim from your answer",
"citation_ids": ["doc1", "doc2"],
"confidence": "high|medium|low"
}}
],
"citations": [
{{
"id": "doc1",
"title": "document title",
"source": "source name",
"relevant_quote": "exact quote from document supporting the claim"
}}
],
"unanswered_aspects": ["parts of the question not addressed by the context"]
}}"""
def answer_with_structured_citations(
question: str,
documents: list[dict],
model: str = "gpt-4o",
) -> dict:
"""Generate structured citations with claim-level attribution."""
# Build context with stable IDs
docs_with_ids = []
for i, doc in enumerate(documents):
doc_id = doc.get("id", f"doc{i+1}")
docs_with_ids.append({
**doc,
"id": doc_id,
})
context = "\n\n".join([
f"[ID: {d['id']}] [{d.get('title', 'Untitled')}]\n{d['content']}"
for d in docs_with_ids
])
response = client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": STRUCTURED_CITATION_PROMPT},
{
"role": "user",
"content": f"Context:\n{context}\n\nQuestion: {question}",
},
],
response_format={"type": "json_object"},
temperature=0,
)
return json.loads(response.choices[0].message.content)Citation Verification
Verify that cited quotes actually appear in the source documents:
Python
from difflib import SequenceMatcher
def verify_citation(
claimed_quote: str,
source_document: str,
similarity_threshold: float = 0.85,
) -> dict:
"""
Check if a cited quote genuinely appears in the source.
Uses fuzzy matching to handle minor paraphrasing.
"""
# Exact match check
if claimed_quote.lower() in source_document.lower():
return {
"verified": True,
"match_type": "exact",
"similarity": 1.0,
}
# Fuzzy match ā find the most similar substring
words = claimed_quote.split()
window_size = len(words)
source_words = source_document.split()
best_ratio = 0.0
best_match = ""
for i in range(len(source_words) - window_size + 1):
window = " ".join(source_words[i:i + window_size])
ratio = SequenceMatcher(None, claimed_quote.lower(), window.lower()).ratio()
if ratio > best_ratio:
best_ratio = ratio
best_match = window
verified = best_ratio >= similarity_threshold
return {
"verified": verified,
"match_type": "fuzzy" if verified else "not_found",
"similarity": best_ratio,
"best_match": best_match if verified else "",
}
def audit_citations(
structured_response: dict,
source_documents: dict, # id ā content mapping
) -> dict:
"""Verify all citations in a structured response."""
audit_results = []
failed_verifications = 0
for citation in structured_response.get("citations", []):
doc_id = citation.get("id")
claimed_quote = citation.get("relevant_quote", "")
source_content = source_documents.get(doc_id, "")
if not source_content:
audit_results.append({
"citation_id": doc_id,
"verified": False,
"issue": "source document not found",
})
failed_verifications += 1
continue
verification = verify_citation(claimed_quote, source_content)
audit_results.append({
"citation_id": doc_id,
**verification,
})
if not verification["verified"]:
failed_verifications += 1
return {
"total_citations": len(audit_results),
"verified": sum(1 for r in audit_results if r["verified"]),
"failed": failed_verifications,
"pass_rate": (len(audit_results) - failed_verifications) / max(len(audit_results), 1),
"details": audit_results,
}Trust Indicators
Assign trust scores to sources based on provenance:
Python
from enum import Enum
from dataclasses import dataclass
class SourceTier(Enum):
TIER_1 = "tier_1" # Gold standard: FDA labels, peer-reviewed primary research
TIER_2 = "tier_2" # Clinical references: Lexicomp, Micromedex, UpToDate
TIER_3 = "tier_3" # Secondary: clinical guidelines, review articles
TIER_4 = "tier_4" # Tertiary: textbooks, educational resources
UNKNOWN = "unknown"
SOURCE_TIER_MAP = {
"fda.gov": SourceTier.TIER_1,
"pubmed": SourceTier.TIER_1,
"nejm.org": SourceTier.TIER_1,
"lexicomp": SourceTier.TIER_2,
"micromedex": SourceTier.TIER_2,
"uptodate": SourceTier.TIER_2,
"guidelines": SourceTier.TIER_3,
"medscape": SourceTier.TIER_3,
}
TIER_TRUST_SCORES = {
SourceTier.TIER_1: 0.95,
SourceTier.TIER_2: 0.88,
SourceTier.TIER_3: 0.75,
SourceTier.TIER_4: 0.60,
SourceTier.UNKNOWN: 0.50,
}
@dataclass
class AnnotatedCitation:
"""A citation enriched with trust metadata."""
number: int
title: str
source: str
url: str
date: str
tier: SourceTier
trust_score: float
snippet: str
age_days: int = 0
def classify_source_tier(source: str, url: str = "") -> SourceTier:
"""Assign tier based on source name or URL."""
combined = f"{source} {url}".lower()
for key, tier in SOURCE_TIER_MAP.items():
if key in combined:
return tier
return SourceTier.UNKNOWN
def annotate_citations(citations: list[dict]) -> list[AnnotatedCitation]:
"""Enrich citations with trust scores and tier classification."""
import datetime
annotated = []
for cit in citations:
tier = classify_source_tier(
cit.get("source", ""),
cit.get("url", ""),
)
trust = TIER_TRUST_SCORES[tier]
# Penalize old sources
doc_date = cit.get("date", "")
age_days = 0
if doc_date:
try:
pub_date = datetime.datetime.fromisoformat(doc_date)
age_days = (datetime.datetime.now() - pub_date).days
# Reduce trust for sources older than 5 years
if age_days > 1825:
trust *= 0.9
if age_days > 3650:
trust *= 0.8
except ValueError:
pass
annotated.append(AnnotatedCitation(
number=cit.get("number", 0),
title=cit.get("title", ""),
source=cit.get("source", ""),
url=cit.get("url", ""),
date=doc_date,
tier=tier,
trust_score=round(trust, 2),
snippet=cit.get("snippet", ""),
age_days=age_days,
))
return annotated
def compute_answer_trust_score(annotated_citations: list[AnnotatedCitation]) -> dict:
"""Compute an overall trust score for an answer based on its citations."""
if not annotated_citations:
return {"score": 0.0, "level": "unverified", "explanation": "No citations provided"}
trust_scores = [c.trust_score for c in annotated_citations]
# Weight toward highest-trust source (conservative approach)
max_trust = max(trust_scores)
avg_trust = sum(trust_scores) / len(trust_scores)
# Blend: 70% best source, 30% average
blended = 0.7 * max_trust + 0.3 * avg_trust
# Tier labels
best_tier = min(c.tier for c in annotated_citations if c.tier != SourceTier.UNKNOWN)
return {
"score": round(blended, 2),
"level": (
"high" if blended >= 0.85 else
"medium" if blended >= 0.70 else
"low"
),
"best_tier": best_tier.value,
"n_sources": len(annotated_citations),
"explanation": (
f"Based on {len(annotated_citations)} source(s). "
f"Highest-quality source: {best_tier.value.replace('_', ' ')}."
),
}Full Citation-Aware RAG Response
Python
def rag_with_full_citations(
question: str,
retriever,
source_registry: dict, # doc_id ā full document metadata
) -> dict:
"""
Complete RAG response with citations, verification, and trust scores.
"""
from openai import OpenAI as OAI
emb = OAI().embeddings.create(model="text-embedding-3-small", input=[question])
query_emb = emb.data[0].embedding
# Retrieve documents
raw_docs = retriever.retrieve(query_emb, top_k=5)
# Enrich with metadata from registry
documents = []
for doc in raw_docs:
doc_id = doc.get("id", "")
meta = source_registry.get(doc_id, {})
documents.append({
**doc,
"title": meta.get("title", doc.get("title", "Unknown")),
"source": meta.get("source", ""),
"url": meta.get("url", ""),
"date": meta.get("date", ""),
})
# Generate answer with structured citations
structured = answer_with_structured_citations(question, documents)
# Verify citations
source_contents = {d.get("id", f"doc{i}"): d["content"] for i, d in enumerate(documents)}
audit = audit_citations(structured, source_contents)
# Annotate with trust scores
annotated = annotate_citations(structured.get("citations", []))
trust = compute_answer_trust_score(annotated)
return {
"answer": structured.get("answer", ""),
"citations": [vars(c) for c in annotated],
"claims": structured.get("claims", []),
"citation_audit": audit,
"trust": trust,
"unanswered": structured.get("unanswered_aspects", []),
}Found this helpful?
Leave a comment
Have a question, correction, or just found this helpful? Leave a note below.