Every production LLM deployment needs an audit trail. When something goes wrong — a customer complaint, a compliance inquiry, a security incident — you need to answer: who asked what, when, and what did the model respond?
What to Log
| Field | Required? | Purpose |
|---|---|---|
| Timestamp | ✅ | When the request was made |
| Request ID | ✅ | Unique identifier for correlation |
| User ID | ✅ | Who initiated the request |
| Model | ✅ | Which model version was used |
| System prompt hash | ✅ | Track prompt changes without storing full prompt |
| Input tokens | ✅ | Cost tracking and anomaly detection |
| Output tokens | ✅ | Cost tracking and anomaly detection |
| Thinking tokens | ✅ | Thinking cost tracking |
| Latency (ms) | ✅ | Performance monitoring |
| Input content | ⚠️ | May contain PII — store encrypted or hash only |
| Output content | ⚠️ | May contain PII — store encrypted or hash only |
| Sanitization events | ✅ | What was redacted before sending |
| Error details | ✅ | Any API errors or failures |
| Cost (USD) | ✅ | Financial tracking |
Audit Logger Implementation
import json
import time
import uuid
import hashlib
from datetime import datetime, timezone
from dataclasses import dataclass, asdict
@dataclass
class AuditEntry:
request_id: str
timestamp: str
user_id: str
model: str
action: str # "messages.create", "messages.stream", etc.
system_prompt_hash: str
input_tokens: int
output_tokens: int
thinking_tokens: int
latency_ms: int
cost_usd: float
status: str # "success", "error", "blocked"
error_message: str = ""
sanitization_count: int = 0
input_hash: str = "" # Hash of input for correlation
output_hash: str = "" # Hash of output for correlation
metadata: dict = None
class AuditLogger:
"""Production audit logger for LLM interactions."""
# Pricing constants
INPUT_COST_PER_M = 5.0 # $5 per 1M input tokens
OUTPUT_COST_PER_M = 25.0 # $25 per 1M output tokens
THINKING_COST_PER_M = 5.0 # Same as input
def __init__(self, storage_backend):
self.storage = storage_backend
def log_request(self, user_id: str, model: str,
system_prompt: str, messages: list[dict],
response, latency_ms: int,
sanitization_count: int = 0) -> str:
"""Log a completed API request."""
request_id = str(uuid.uuid4())
input_tokens = response.usage.input_tokens
output_tokens = response.usage.output_tokens
thinking_tokens = getattr(response.usage, 'thinking_tokens', 0)
cost = self._calculate_cost(
input_tokens, output_tokens, thinking_tokens
)
entry = AuditEntry(
request_id=request_id,
timestamp=datetime.now(timezone.utc).isoformat(),
user_id=user_id,
model=model,
action="messages.create",
system_prompt_hash=hashlib.sha256(
system_prompt.encode()
).hexdigest()[:16],
input_tokens=input_tokens,
output_tokens=output_tokens,
thinking_tokens=thinking_tokens,
latency_ms=latency_ms,
cost_usd=cost,
status="success",
sanitization_count=sanitization_count,
input_hash=hashlib.sha256(
json.dumps(messages).encode()
).hexdigest()[:16],
output_hash=hashlib.sha256(
str(response.content).encode()
).hexdigest()[:16],
)
self.storage.write(entry)
return request_id
def log_error(self, user_id: str, model: str,
error: Exception) -> str:
"""Log a failed API request."""
request_id = str(uuid.uuid4())
entry = AuditEntry(
request_id=request_id,
timestamp=datetime.now(timezone.utc).isoformat(),
user_id=user_id,
model=model,
action="messages.create",
system_prompt_hash="",
input_tokens=0,
output_tokens=0,
thinking_tokens=0,
latency_ms=0,
cost_usd=0.0,
status="error",
error_message=str(error)[:500],
)
self.storage.write(entry)
return request_id
def _calculate_cost(self, input_tokens: int, output_tokens: int,
thinking_tokens: int) -> float:
input_cost = (input_tokens + thinking_tokens) * self.INPUT_COST_PER_M / 1_000_000
output_cost = output_tokens * self.OUTPUT_COST_PER_M / 1_000_000
return round(input_cost + output_cost, 6)
Auditing Client Wrapper
class AuditedClient:
"""Anthropic client with automatic audit logging."""
def __init__(self, user_id: str, audit_logger: AuditLogger):
from anthropic import Anthropic
self.client = Anthropic()
self.user_id = user_id
self.logger = audit_logger
def messages_create(self, model: str, messages: list[dict],
system: str = "", **kwargs):
"""Create a message with automatic audit logging."""
start = time.monotonic()
try:
response = self.client.messages.create(
model=model,
messages=messages,
system=system,
**kwargs
)
latency = int((time.monotonic() - start) * 1000)
request_id = self.logger.log_request(
user_id=self.user_id,
model=model,
system_prompt=system,
messages=messages,
response=response,
latency_ms=latency,
)
# Attach request_id to response for correlation
response._audit_request_id = request_id
return response
except Exception as e:
self.logger.log_error(
user_id=self.user_id,
model=model,
error=e,
)
raise
Storage Backends
File-Based (Development)
import os
class FileAuditStorage:
"""File-based audit storage for development."""
def __init__(self, directory: str = "./audit_logs"):
self.directory = directory
os.makedirs(directory, exist_ok=True)
def write(self, entry: AuditEntry):
date_str = datetime.now().strftime("%Y-%m-%d")
filepath = os.path.join(self.directory, f"audit_{date_str}.jsonl")
with open(filepath, "a") as f:
f.write(json.dumps(asdict(entry)) + "\n")
def query(self, start_date: str, end_date: str,
user_id: str = None) -> list[AuditEntry]:
"""Query audit entries by date range and optional filters."""
results = []
for filename in sorted(os.listdir(self.directory)):
if not filename.endswith(".jsonl"):
continue
filepath = os.path.join(self.directory, filename)
with open(filepath) as f:
for line in f:
entry = json.loads(line)
if start_date <= entry["timestamp"] <= end_date:
if user_id is None or entry["user_id"] == user_id:
results.append(entry)
return results
Database-Based (Production)
class PostgresAuditStorage:
"""PostgreSQL audit storage for production."""
CREATE_TABLE = """
CREATE TABLE IF NOT EXISTS llm_audit_log (
request_id UUID PRIMARY KEY,
timestamp TIMESTAMPTZ NOT NULL,
user_id VARCHAR(255) NOT NULL,
model VARCHAR(100) NOT NULL,
action VARCHAR(50) NOT NULL,
system_prompt_hash VARCHAR(16),
input_tokens INTEGER NOT NULL,
output_tokens INTEGER NOT NULL,
thinking_tokens INTEGER NOT NULL DEFAULT 0,
latency_ms INTEGER NOT NULL,
cost_usd DECIMAL(10, 6) NOT NULL,
status VARCHAR(20) NOT NULL,
error_message TEXT,
sanitization_count INTEGER DEFAULT 0,
input_hash VARCHAR(16),
output_hash VARCHAR(16),
metadata JSONB,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_audit_timestamp
ON llm_audit_log (timestamp);
CREATE INDEX IF NOT EXISTS idx_audit_user
ON llm_audit_log (user_id);
CREATE INDEX IF NOT EXISTS idx_audit_status
ON llm_audit_log (status);
CREATE INDEX IF NOT EXISTS idx_audit_cost
ON llm_audit_log (cost_usd);
"""
def __init__(self, connection_string: str):
import psycopg2
self.conn = psycopg2.connect(connection_string)
self._init_table()
def _init_table(self):
with self.conn.cursor() as cur:
cur.execute(self.CREATE_TABLE)
self.conn.commit()
def write(self, entry: AuditEntry):
with self.conn.cursor() as cur:
cur.execute("""
INSERT INTO llm_audit_log
(request_id, timestamp, user_id, model, action,
system_prompt_hash, input_tokens, output_tokens,
thinking_tokens, latency_ms, cost_usd, status,
error_message, sanitization_count, input_hash,
output_hash, metadata)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
%s, %s, %s, %s, %s, %s, %s)
""", (
entry.request_id, entry.timestamp, entry.user_id,
entry.model, entry.action, entry.system_prompt_hash,
entry.input_tokens, entry.output_tokens,
entry.thinking_tokens, entry.latency_ms,
entry.cost_usd, entry.status, entry.error_message,
entry.sanitization_count, entry.input_hash,
entry.output_hash,
json.dumps(entry.metadata) if entry.metadata else None,
))
self.conn.commit()
Retention Policies
RETENTION_POLICIES = {
"standard": {
"audit_metadata": "2 years", # Who, when, how many tokens
"content_hashes": "1 year", # Hashes for correlation
"full_content": "90 days", # Encrypted full content (if stored)
"error_logs": "1 year", # Error details
},
"hipaa": {
"audit_metadata": "6 years", # HIPAA requires 6-year retention
"content_hashes": "6 years",
"full_content": "Never stored", # PHI should not be in logs
"error_logs": "6 years",
},
"gdpr": {
"audit_metadata": "As needed", # Must justify retention period
"content_hashes": "As needed",
"full_content": "Minimize", # Data minimization principle
"error_logs": "As needed",
},
}
Audit Analysis Queries
-- Daily cost by user
SELECT user_id,
DATE(timestamp) as day,
COUNT(*) as requests,
SUM(cost_usd) as total_cost,
AVG(latency_ms) as avg_latency
FROM llm_audit_log
WHERE timestamp >= NOW() - INTERVAL '30 days'
GROUP BY user_id, DATE(timestamp)
ORDER BY total_cost DESC;
-- Anomaly detection: unusually high token usage
SELECT request_id, user_id, input_tokens, output_tokens, cost_usd
FROM llm_audit_log
WHERE input_tokens > (
SELECT AVG(input_tokens) + 3 * STDDEV(input_tokens)
FROM llm_audit_log
WHERE timestamp >= NOW() - INTERVAL '7 days'
)
AND timestamp >= NOW() - INTERVAL '24 hours';
-- Error rate by model
SELECT model,
COUNT(*) FILTER (WHERE status = 'error') as errors,
COUNT(*) as total,
ROUND(100.0 * COUNT(*) FILTER (WHERE status = 'error')
/ COUNT(*), 2) as error_rate
FROM llm_audit_log
WHERE timestamp >= NOW() - INTERVAL '24 hours'
GROUP BY model;
A solid audit trail is the foundation for every compliance framework. In the next lesson, you will learn how to map your LLM deployment to specific compliance requirements — GDPR, SOC 2, and HIPAA.