The first rule of production AI deployment: know exactly what data you are sending to the model. This lesson establishes a classification framework and builds sanitization pipelines that prevent sensitive data from ever reaching the API.
Data Classification Framework
Every piece of data in your organization falls into one of four tiers:
| Tier | Classification | Can Send to Claude? | Examples |
|---|---|---|---|
| Tier 1 | Public | ✅ Yes | Marketing copy, public docs, open-source code |
| Tier 2 | Internal | ✅ With controls | Internal wikis, non-sensitive code, meeting notes |
| Tier 3 | Confidential | ⚠️ With sanitization | Customer data (sanitized), proprietary algorithms |
| Tier 4 | Restricted | ❌ Never | API keys, passwords, PII, PHI, payment data |
What Must Never Be Sent
# These patterns must be caught BEFORE data reaches the API
NEVER_SEND = {
"api_keys": [
r"sk-ant-[a-zA-Z0-9\-]+", # Anthropic API keys
r"sk-[a-zA-Z0-9]{48}", # OpenAI API keys
r"AKIA[0-9A-Z]{16}", # AWS access keys
r"ghp_[a-zA-Z0-9]{36}", # GitHub PATs
],
"credentials": [
r"password\s*[:=]\s*['\"][^'\"]+", # Password assignments
r"-----BEGIN (?:RSA |EC )?PRIVATE KEY-----", # Private keys
r"bearer\s+[a-zA-Z0-9\-._~+/]+=*", # Bearer tokens
],
"pii": [
r"\b\d{3}-\d{2}-\d{4}\b", # SSN
r"\b\d{16}\b", # Credit card numbers
r"\b[A-Z]{2}\d{6,9}\b", # Passport numbers
],
"phi": [
r"(?i)diagnosis:\s*.+", # Medical diagnoses
r"(?i)patient\s+id:\s*\S+", # Patient identifiers
r"(?i)medical\s+record\s+number", # MRN references
],
}
Building a Sanitization Pipeline
import re
import hashlib
from dataclasses import dataclass
@dataclass
class SanitizationResult:
sanitized_text: str
redactions: list[dict]
risk_level: str # "safe", "sanitized", "blocked"
class DataSanitizer:
"""Production-grade data sanitization for LLM inputs."""
def __init__(self):
self.patterns = self._compile_patterns()
self.redaction_log: list[dict] = []
def _compile_patterns(self) -> dict[str, list[re.Pattern]]:
compiled = {}
for category, patterns in NEVER_SEND.items():
compiled[category] = [re.compile(p) for p in patterns]
return compiled
def sanitize(self, text: str,
allow_tier: int = 2) -> SanitizationResult:
"""Sanitize text before sending to Claude."""
redactions = []
sanitized = text
# Always block Tier 4 data regardless of allow_tier
for category, patterns in self.patterns.items():
for pattern in patterns:
matches = list(pattern.finditer(sanitized))
for match in reversed(matches): # Reverse to preserve indices
original = match.group()
replacement = self._redact(original, category)
sanitized = (
sanitized[:match.start()]
+ replacement
+ sanitized[match.end():]
)
redactions.append({
"category": category,
"position": match.start(),
"length": len(original),
"replacement": replacement,
"hash": hashlib.sha256(
original.encode()
).hexdigest()[:12],
})
# Determine risk level
if not redactions:
risk = "safe"
elif any(r["category"] in ("api_keys", "credentials")
for r in redactions):
risk = "blocked" # Secrets found — block entirely
else:
risk = "sanitized"
return SanitizationResult(
sanitized_text=sanitized,
redactions=redactions,
risk_level=risk,
)
def _redact(self, value: str, category: str) -> str:
"""Replace sensitive data with a safe placeholder."""
short_hash = hashlib.sha256(value.encode()).hexdigest()[:8]
return f"[REDACTED_{category.upper()}_{short_hash}]"
# Usage
sanitizer = DataSanitizer()
text = """
Please review this configuration:
api_key: sk-ant-abc123-secret-key-here
database_url: postgres://admin:password123@db.example.com/prod
Customer SSN: 123-45-6789
"""
result = sanitizer.sanitize(text)
print(f"Risk level: {result.risk_level}")
print(f"Redactions: {len(result.redactions)}")
print(f"Sanitized text:\n{result.sanitized_text}")
Sanitization-Aware API Client
Wrap the Anthropic client with automatic sanitization:
class SecureClient:
"""Anthropic client with automatic input sanitization."""
def __init__(self, block_on_secrets: bool = True):
from anthropic import Anthropic
self.client = Anthropic()
self.sanitizer = DataSanitizer()
self.block_on_secrets = block_on_secrets
def messages_create(self, messages: list[dict], **kwargs) -> dict:
"""Create a message with automatic sanitization."""
sanitized_messages = []
all_redactions = []
for msg in messages:
content = msg.get("content", "")
if isinstance(content, str):
result = self.sanitizer.sanitize(content)
if result.risk_level == "blocked" and self.block_on_secrets:
raise SecurityError(
f"Blocked: {len(result.redactions)} secret(s) detected "
f"in message. Categories: "
f"{set(r['category'] for r in result.redactions)}"
)
sanitized_messages.append({
**msg,
"content": result.sanitized_text,
})
all_redactions.extend(result.redactions)
else:
sanitized_messages.append(msg)
if all_redactions:
print(f"⚠️ Sanitized {len(all_redactions)} sensitive items "
f"before sending to API")
response = self.client.messages.create(
messages=sanitized_messages,
**kwargs
)
return response
class SecurityError(Exception):
pass
Code-Specific Sanitization
When sending proprietary code for review, strip identifying information:
class CodeSanitizer:
"""Sanitize proprietary code before sending for review."""
def __init__(self):
self.replacements = {}
def sanitize_code(self, code: str,
strip_comments: bool = False) -> str:
"""Remove sensitive identifiers from code."""
sanitized = code
# Replace internal domain names
sanitized = re.sub(
r'[a-zA-Z0-9.-]+\.internal\.company\.com',
'service.internal.example.com',
sanitized
)
# Replace internal IP ranges
sanitized = re.sub(
r'10\.\d{1,3}\.\d{1,3}\.\d{1,3}',
'10.0.0.x',
sanitized
)
# Strip proprietary comments if requested
if strip_comments:
sanitized = re.sub(
r'#\s*(?:PROPRIETARY|CONFIDENTIAL|COPYRIGHT).*$',
'# [comment removed]',
sanitized,
flags=re.MULTILINE
)
# Replace environment variable values
sanitized = re.sub(
r'(os\.environ\[?["\'][^"\']+["\']\]?\s*=\s*)["\'][^"\']+["\']',
r'\1"REDACTED"',
sanitized
)
return sanitized
Pre-Flight Check
Run this before every API call in production:
def pre_flight_check(messages: list[dict],
system: str = "") -> dict:
"""Comprehensive pre-flight security check."""
sanitizer = DataSanitizer()
issues = []
# Check system prompt
if system:
result = sanitizer.sanitize(system)
if result.redactions:
issues.append({
"location": "system_prompt",
"redactions": len(result.redactions),
"risk": result.risk_level,
})
# Check each message
for i, msg in enumerate(messages):
content = msg.get("content", "")
if isinstance(content, str):
result = sanitizer.sanitize(content)
if result.redactions:
issues.append({
"location": f"message_{i}",
"role": msg.get("role"),
"redactions": len(result.redactions),
"risk": result.risk_level,
"categories": list(set(
r["category"] for r in result.redactions
)),
})
return {
"passed": len(issues) == 0,
"issues": issues,
"total_redactions": sum(i["redactions"] for i in issues),
"max_risk": max(
(i["risk"] for i in issues), default="safe"
),
}
The sanitization pipeline is your first line of defense. In the next lesson, you will build the audit trail that records everything that passes through your system — what was sent, what was returned, and who initiated each request.