Lesson 12 of 46 ~25 min
Course progress
0%

Data Classification & Sanitization

Understand what data can and cannot be sent to Claude — PII, secrets, proprietary code, HIPAA data. Build production data sanitization pipelines.

The first rule of production AI deployment: know exactly what data you are sending to the model. This lesson establishes a classification framework and builds sanitization pipelines that prevent sensitive data from ever reaching the API.

Data Classification Framework

Every piece of data in your organization falls into one of four tiers:

TierClassificationCan Send to Claude?Examples
Tier 1Public✅ YesMarketing copy, public docs, open-source code
Tier 2Internal✅ With controlsInternal wikis, non-sensitive code, meeting notes
Tier 3Confidential⚠️ With sanitizationCustomer data (sanitized), proprietary algorithms
Tier 4Restricted❌ NeverAPI keys, passwords, PII, PHI, payment data

What Must Never Be Sent

# These patterns must be caught BEFORE data reaches the API
NEVER_SEND = {
    "api_keys": [
        r"sk-ant-[a-zA-Z0-9\-]+",          # Anthropic API keys
        r"sk-[a-zA-Z0-9]{48}",              # OpenAI API keys
        r"AKIA[0-9A-Z]{16}",               # AWS access keys
        r"ghp_[a-zA-Z0-9]{36}",            # GitHub PATs
    ],
    "credentials": [
        r"password\s*[:=]\s*['\"][^'\"]+",  # Password assignments
        r"-----BEGIN (?:RSA |EC )?PRIVATE KEY-----", # Private keys
        r"bearer\s+[a-zA-Z0-9\-._~+/]+=*",  # Bearer tokens
    ],
    "pii": [
        r"\b\d{3}-\d{2}-\d{4}\b",          # SSN
        r"\b\d{16}\b",                       # Credit card numbers
        r"\b[A-Z]{2}\d{6,9}\b",             # Passport numbers
    ],
    "phi": [
        r"(?i)diagnosis:\s*.+",              # Medical diagnoses
        r"(?i)patient\s+id:\s*\S+",          # Patient identifiers
        r"(?i)medical\s+record\s+number",    # MRN references
    ],
}

Building a Sanitization Pipeline

import re
import hashlib
from dataclasses import dataclass

@dataclass
class SanitizationResult:
    sanitized_text: str
    redactions: list[dict]
    risk_level: str  # "safe", "sanitized", "blocked"

class DataSanitizer:
    """Production-grade data sanitization for LLM inputs."""

    def __init__(self):
        self.patterns = self._compile_patterns()
        self.redaction_log: list[dict] = []

    def _compile_patterns(self) -> dict[str, list[re.Pattern]]:
        compiled = {}
        for category, patterns in NEVER_SEND.items():
            compiled[category] = [re.compile(p) for p in patterns]
        return compiled

    def sanitize(self, text: str,
                 allow_tier: int = 2) -> SanitizationResult:
        """Sanitize text before sending to Claude."""
        redactions = []
        sanitized = text

        # Always block Tier 4 data regardless of allow_tier
        for category, patterns in self.patterns.items():
            for pattern in patterns:
                matches = list(pattern.finditer(sanitized))
                for match in reversed(matches):  # Reverse to preserve indices
                    original = match.group()
                    replacement = self._redact(original, category)
                    sanitized = (
                        sanitized[:match.start()]
                        + replacement
                        + sanitized[match.end():]
                    )
                    redactions.append({
                        "category": category,
                        "position": match.start(),
                        "length": len(original),
                        "replacement": replacement,
                        "hash": hashlib.sha256(
                            original.encode()
                        ).hexdigest()[:12],
                    })

        # Determine risk level
        if not redactions:
            risk = "safe"
        elif any(r["category"] in ("api_keys", "credentials")
                 for r in redactions):
            risk = "blocked"  # Secrets found — block entirely
        else:
            risk = "sanitized"

        return SanitizationResult(
            sanitized_text=sanitized,
            redactions=redactions,
            risk_level=risk,
        )

    def _redact(self, value: str, category: str) -> str:
        """Replace sensitive data with a safe placeholder."""
        short_hash = hashlib.sha256(value.encode()).hexdigest()[:8]
        return f"[REDACTED_{category.upper()}_{short_hash}]"

# Usage
sanitizer = DataSanitizer()

text = """
Please review this configuration:
  api_key: sk-ant-abc123-secret-key-here
  database_url: postgres://admin:password123@db.example.com/prod
  Customer SSN: 123-45-6789
"""

result = sanitizer.sanitize(text)
print(f"Risk level: {result.risk_level}")
print(f"Redactions: {len(result.redactions)}")
print(f"Sanitized text:\n{result.sanitized_text}")

Sanitization-Aware API Client

Wrap the Anthropic client with automatic sanitization:

class SecureClient:
    """Anthropic client with automatic input sanitization."""

    def __init__(self, block_on_secrets: bool = True):
        from anthropic import Anthropic
        self.client = Anthropic()
        self.sanitizer = DataSanitizer()
        self.block_on_secrets = block_on_secrets

    def messages_create(self, messages: list[dict], **kwargs) -> dict:
        """Create a message with automatic sanitization."""
        sanitized_messages = []
        all_redactions = []

        for msg in messages:
            content = msg.get("content", "")
            if isinstance(content, str):
                result = self.sanitizer.sanitize(content)

                if result.risk_level == "blocked" and self.block_on_secrets:
                    raise SecurityError(
                        f"Blocked: {len(result.redactions)} secret(s) detected "
                        f"in message. Categories: "
                        f"{set(r['category'] for r in result.redactions)}"
                    )

                sanitized_messages.append({
                    **msg,
                    "content": result.sanitized_text,
                })
                all_redactions.extend(result.redactions)
            else:
                sanitized_messages.append(msg)

        if all_redactions:
            print(f"⚠️ Sanitized {len(all_redactions)} sensitive items "
                  f"before sending to API")

        response = self.client.messages.create(
            messages=sanitized_messages,
            **kwargs
        )

        return response

class SecurityError(Exception):
    pass

Code-Specific Sanitization

When sending proprietary code for review, strip identifying information:

class CodeSanitizer:
    """Sanitize proprietary code before sending for review."""

    def __init__(self):
        self.replacements = {}

    def sanitize_code(self, code: str,
                      strip_comments: bool = False) -> str:
        """Remove sensitive identifiers from code."""
        sanitized = code

        # Replace internal domain names
        sanitized = re.sub(
            r'[a-zA-Z0-9.-]+\.internal\.company\.com',
            'service.internal.example.com',
            sanitized
        )

        # Replace internal IP ranges
        sanitized = re.sub(
            r'10\.\d{1,3}\.\d{1,3}\.\d{1,3}',
            '10.0.0.x',
            sanitized
        )

        # Strip proprietary comments if requested
        if strip_comments:
            sanitized = re.sub(
                r'#\s*(?:PROPRIETARY|CONFIDENTIAL|COPYRIGHT).*$',
                '# [comment removed]',
                sanitized,
                flags=re.MULTILINE
            )

        # Replace environment variable values
        sanitized = re.sub(
            r'(os\.environ\[?["\'][^"\']+["\']\]?\s*=\s*)["\'][^"\']+["\']',
            r'\1"REDACTED"',
            sanitized
        )

        return sanitized

Pre-Flight Check

Run this before every API call in production:

def pre_flight_check(messages: list[dict],
                     system: str = "") -> dict:
    """Comprehensive pre-flight security check."""
    sanitizer = DataSanitizer()
    issues = []

    # Check system prompt
    if system:
        result = sanitizer.sanitize(system)
        if result.redactions:
            issues.append({
                "location": "system_prompt",
                "redactions": len(result.redactions),
                "risk": result.risk_level,
            })

    # Check each message
    for i, msg in enumerate(messages):
        content = msg.get("content", "")
        if isinstance(content, str):
            result = sanitizer.sanitize(content)
            if result.redactions:
                issues.append({
                    "location": f"message_{i}",
                    "role": msg.get("role"),
                    "redactions": len(result.redactions),
                    "risk": result.risk_level,
                    "categories": list(set(
                        r["category"] for r in result.redactions
                    )),
                })

    return {
        "passed": len(issues) == 0,
        "issues": issues,
        "total_redactions": sum(i["redactions"] for i in issues),
        "max_risk": max(
            (i["risk"] for i in issues), default="safe"
        ),
    }

The sanitization pipeline is your first line of defense. In the next lesson, you will build the audit trail that records everything that passes through your system — what was sent, what was returned, and who initiated each request.