Lesson 24 of 46 ~25 min
Course progress
0%

Audit Trail Implementation

Implement comprehensive audit logging for LLM interactions — what to log, storage requirements, retention policies, and search and analysis patterns.

Every production LLM deployment needs an audit trail. When something goes wrong — a customer complaint, a compliance inquiry, a security incident — you need to answer: who asked what, when, and what did the model respond?

What to Log

FieldRequired?Purpose
TimestampWhen the request was made
Request IDUnique identifier for correlation
User IDWho initiated the request
ModelWhich model version was used
System prompt hashTrack prompt changes without storing full prompt
Input tokensCost tracking and anomaly detection
Output tokensCost tracking and anomaly detection
Thinking tokensThinking cost tracking
Latency (ms)Performance monitoring
Input content⚠️May contain PII — store encrypted or hash only
Output content⚠️May contain PII — store encrypted or hash only
Sanitization eventsWhat was redacted before sending
Error detailsAny API errors or failures
Cost (USD)Financial tracking

Audit Logger Implementation

import json
import time
import uuid
import hashlib
from datetime import datetime, timezone
from dataclasses import dataclass, asdict

@dataclass
class AuditEntry:
    request_id: str
    timestamp: str
    user_id: str
    model: str
    action: str  # "messages.create", "messages.stream", etc.
    system_prompt_hash: str
    input_tokens: int
    output_tokens: int
    thinking_tokens: int
    latency_ms: int
    cost_usd: float
    status: str  # "success", "error", "blocked"
    error_message: str = ""
    sanitization_count: int = 0
    input_hash: str = ""       # Hash of input for correlation
    output_hash: str = ""      # Hash of output for correlation
    metadata: dict = None

class AuditLogger:
    """Production audit logger for LLM interactions."""

    # Pricing constants
    INPUT_COST_PER_M = 5.0     # $5 per 1M input tokens
    OUTPUT_COST_PER_M = 25.0   # $25 per 1M output tokens
    THINKING_COST_PER_M = 5.0  # Same as input

    def __init__(self, storage_backend):
        self.storage = storage_backend

    def log_request(self, user_id: str, model: str,
                    system_prompt: str, messages: list[dict],
                    response, latency_ms: int,
                    sanitization_count: int = 0) -> str:
        """Log a completed API request."""
        request_id = str(uuid.uuid4())

        input_tokens = response.usage.input_tokens
        output_tokens = response.usage.output_tokens
        thinking_tokens = getattr(response.usage, 'thinking_tokens', 0)

        cost = self._calculate_cost(
            input_tokens, output_tokens, thinking_tokens
        )

        entry = AuditEntry(
            request_id=request_id,
            timestamp=datetime.now(timezone.utc).isoformat(),
            user_id=user_id,
            model=model,
            action="messages.create",
            system_prompt_hash=hashlib.sha256(
                system_prompt.encode()
            ).hexdigest()[:16],
            input_tokens=input_tokens,
            output_tokens=output_tokens,
            thinking_tokens=thinking_tokens,
            latency_ms=latency_ms,
            cost_usd=cost,
            status="success",
            sanitization_count=sanitization_count,
            input_hash=hashlib.sha256(
                json.dumps(messages).encode()
            ).hexdigest()[:16],
            output_hash=hashlib.sha256(
                str(response.content).encode()
            ).hexdigest()[:16],
        )

        self.storage.write(entry)
        return request_id

    def log_error(self, user_id: str, model: str,
                  error: Exception) -> str:
        """Log a failed API request."""
        request_id = str(uuid.uuid4())

        entry = AuditEntry(
            request_id=request_id,
            timestamp=datetime.now(timezone.utc).isoformat(),
            user_id=user_id,
            model=model,
            action="messages.create",
            system_prompt_hash="",
            input_tokens=0,
            output_tokens=0,
            thinking_tokens=0,
            latency_ms=0,
            cost_usd=0.0,
            status="error",
            error_message=str(error)[:500],
        )

        self.storage.write(entry)
        return request_id

    def _calculate_cost(self, input_tokens: int, output_tokens: int,
                        thinking_tokens: int) -> float:
        input_cost = (input_tokens + thinking_tokens) * self.INPUT_COST_PER_M / 1_000_000
        output_cost = output_tokens * self.OUTPUT_COST_PER_M / 1_000_000
        return round(input_cost + output_cost, 6)

Auditing Client Wrapper

class AuditedClient:
    """Anthropic client with automatic audit logging."""

    def __init__(self, user_id: str, audit_logger: AuditLogger):
        from anthropic import Anthropic
        self.client = Anthropic()
        self.user_id = user_id
        self.logger = audit_logger

    def messages_create(self, model: str, messages: list[dict],
                        system: str = "", **kwargs):
        """Create a message with automatic audit logging."""
        start = time.monotonic()

        try:
            response = self.client.messages.create(
                model=model,
                messages=messages,
                system=system,
                **kwargs
            )

            latency = int((time.monotonic() - start) * 1000)

            request_id = self.logger.log_request(
                user_id=self.user_id,
                model=model,
                system_prompt=system,
                messages=messages,
                response=response,
                latency_ms=latency,
            )

            # Attach request_id to response for correlation
            response._audit_request_id = request_id
            return response

        except Exception as e:
            self.logger.log_error(
                user_id=self.user_id,
                model=model,
                error=e,
            )
            raise

Storage Backends

File-Based (Development)

import os

class FileAuditStorage:
    """File-based audit storage for development."""

    def __init__(self, directory: str = "./audit_logs"):
        self.directory = directory
        os.makedirs(directory, exist_ok=True)

    def write(self, entry: AuditEntry):
        date_str = datetime.now().strftime("%Y-%m-%d")
        filepath = os.path.join(self.directory, f"audit_{date_str}.jsonl")

        with open(filepath, "a") as f:
            f.write(json.dumps(asdict(entry)) + "\n")

    def query(self, start_date: str, end_date: str,
              user_id: str = None) -> list[AuditEntry]:
        """Query audit entries by date range and optional filters."""
        results = []
        for filename in sorted(os.listdir(self.directory)):
            if not filename.endswith(".jsonl"):
                continue
            filepath = os.path.join(self.directory, filename)
            with open(filepath) as f:
                for line in f:
                    entry = json.loads(line)
                    if start_date <= entry["timestamp"] <= end_date:
                        if user_id is None or entry["user_id"] == user_id:
                            results.append(entry)
        return results

Database-Based (Production)

class PostgresAuditStorage:
    """PostgreSQL audit storage for production."""

    CREATE_TABLE = """
    CREATE TABLE IF NOT EXISTS llm_audit_log (
        request_id UUID PRIMARY KEY,
        timestamp TIMESTAMPTZ NOT NULL,
        user_id VARCHAR(255) NOT NULL,
        model VARCHAR(100) NOT NULL,
        action VARCHAR(50) NOT NULL,
        system_prompt_hash VARCHAR(16),
        input_tokens INTEGER NOT NULL,
        output_tokens INTEGER NOT NULL,
        thinking_tokens INTEGER NOT NULL DEFAULT 0,
        latency_ms INTEGER NOT NULL,
        cost_usd DECIMAL(10, 6) NOT NULL,
        status VARCHAR(20) NOT NULL,
        error_message TEXT,
        sanitization_count INTEGER DEFAULT 0,
        input_hash VARCHAR(16),
        output_hash VARCHAR(16),
        metadata JSONB,
        created_at TIMESTAMPTZ DEFAULT NOW()
    );

    CREATE INDEX IF NOT EXISTS idx_audit_timestamp
        ON llm_audit_log (timestamp);
    CREATE INDEX IF NOT EXISTS idx_audit_user
        ON llm_audit_log (user_id);
    CREATE INDEX IF NOT EXISTS idx_audit_status
        ON llm_audit_log (status);
    CREATE INDEX IF NOT EXISTS idx_audit_cost
        ON llm_audit_log (cost_usd);
    """

    def __init__(self, connection_string: str):
        import psycopg2
        self.conn = psycopg2.connect(connection_string)
        self._init_table()

    def _init_table(self):
        with self.conn.cursor() as cur:
            cur.execute(self.CREATE_TABLE)
        self.conn.commit()

    def write(self, entry: AuditEntry):
        with self.conn.cursor() as cur:
            cur.execute("""
                INSERT INTO llm_audit_log
                (request_id, timestamp, user_id, model, action,
                 system_prompt_hash, input_tokens, output_tokens,
                 thinking_tokens, latency_ms, cost_usd, status,
                 error_message, sanitization_count, input_hash,
                 output_hash, metadata)
                VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
                        %s, %s, %s, %s, %s, %s, %s)
            """, (
                entry.request_id, entry.timestamp, entry.user_id,
                entry.model, entry.action, entry.system_prompt_hash,
                entry.input_tokens, entry.output_tokens,
                entry.thinking_tokens, entry.latency_ms,
                entry.cost_usd, entry.status, entry.error_message,
                entry.sanitization_count, entry.input_hash,
                entry.output_hash,
                json.dumps(entry.metadata) if entry.metadata else None,
            ))
        self.conn.commit()

Retention Policies

RETENTION_POLICIES = {
    "standard": {
        "audit_metadata": "2 years",    # Who, when, how many tokens
        "content_hashes": "1 year",     # Hashes for correlation
        "full_content": "90 days",      # Encrypted full content (if stored)
        "error_logs": "1 year",         # Error details
    },
    "hipaa": {
        "audit_metadata": "6 years",    # HIPAA requires 6-year retention
        "content_hashes": "6 years",
        "full_content": "Never stored", # PHI should not be in logs
        "error_logs": "6 years",
    },
    "gdpr": {
        "audit_metadata": "As needed",  # Must justify retention period
        "content_hashes": "As needed",
        "full_content": "Minimize",     # Data minimization principle
        "error_logs": "As needed",
    },
}

Audit Analysis Queries

-- Daily cost by user
SELECT user_id,
       DATE(timestamp) as day,
       COUNT(*) as requests,
       SUM(cost_usd) as total_cost,
       AVG(latency_ms) as avg_latency
FROM llm_audit_log
WHERE timestamp >= NOW() - INTERVAL '30 days'
GROUP BY user_id, DATE(timestamp)
ORDER BY total_cost DESC;

-- Anomaly detection: unusually high token usage
SELECT request_id, user_id, input_tokens, output_tokens, cost_usd
FROM llm_audit_log
WHERE input_tokens > (
    SELECT AVG(input_tokens) + 3 * STDDEV(input_tokens)
    FROM llm_audit_log
    WHERE timestamp >= NOW() - INTERVAL '7 days'
)
AND timestamp >= NOW() - INTERVAL '24 hours';

-- Error rate by model
SELECT model,
       COUNT(*) FILTER (WHERE status = 'error') as errors,
       COUNT(*) as total,
       ROUND(100.0 * COUNT(*) FILTER (WHERE status = 'error')
             / COUNT(*), 2) as error_rate
FROM llm_audit_log
WHERE timestamp >= NOW() - INTERVAL '24 hours'
GROUP BY model;

A solid audit trail is the foundation for every compliance framework. In the next lesson, you will learn how to map your LLM deployment to specific compliance requirements — GDPR, SOC 2, and HIPAA.