Lesson 23 of 30 ~15 min
Course progress
0%

Audit Trail & Logging

Implementace audit trail pro compliance a debugging.

Audit logging je kritický pro compliance, debugging a security monitoring.

Proč audit logging?

  1. Compliance - SOC 2, GDPR, HIPAA vyžadují audit trail
  2. Security - Detekce anomálií a breach investigation
  3. Debugging - Pochopení co se stalo
  4. Cost tracking - Sledování usage per user/team

Základní audit log struktura

from dataclasses import dataclass
from datetime import datetime
from typing import Optional
import json

@dataclass
class AuditLogEntry:
    timestamp: datetime
    user_id: str
    action: str
    model: str
    input_tokens: int
    output_tokens: int
    cost: float
    duration_ms: float
    status: str  # success, error, rate_limited
    error_message: Optional[str] = None
    
    # Metadata
    session_id: Optional[str] = None
    request_id: Optional[str] = None
    ip_address: Optional[str] = None
    user_agent: Optional[str] = None
    
    # Content hashes (not content itself!)
    prompt_hash: Optional[str] = None
    response_hash: Optional[str] = None
    
    def to_json(self) -> str:
        return json.dumps({
            **self.__dict__,
            'timestamp': self.timestamp.isoformat()
        })

Audit Logger Implementation

import hashlib
import logging
from anthropic import Anthropic

class AuditedClaudeClient:
    def __init__(self, user_id: str):
        self.client = Anthropic()
        self.user_id = user_id
        self.logger = logging.getLogger('claude_audit')
        
        # Configure logging
        handler = logging.FileHandler('claude_audit.log')
        handler.setFormatter(logging.Formatter('%(message)s'))
        self.logger.addHandler(handler)
        self.logger.setLevel(logging.INFO)
    
    def _hash_content(self, content: str) -> str:
        """Hash content for audit without storing actual content"""
        return hashlib.sha256(content.encode()).hexdigest()[:16]
    
    def messages_create(self, **kwargs):
        start_time = datetime.now()
        request_id = str(uuid.uuid4())
        
        # Extract prompt for hashing
        prompt = str(kwargs.get('messages', []))
        prompt_hash = self._hash_content(prompt)
        
        try:
            response = self.client.messages.create(**kwargs)
            
            # Calculate metrics
            duration = (datetime.now() - start_time).total_seconds() * 1000
            input_tokens = response.usage.input_tokens
            output_tokens = response.usage.output_tokens
            cost = self._calculate_cost(kwargs.get('model'), input_tokens, output_tokens)
            
            # Response hash
            response_text = str(response.content)
            response_hash = self._hash_content(response_text)
            
            # Log success
            entry = AuditLogEntry(
                timestamp=start_time,
                user_id=self.user_id,
                action='messages.create',
                model=kwargs.get('model', 'unknown'),
                input_tokens=input_tokens,
                output_tokens=output_tokens,
                cost=cost,
                duration_ms=duration,
                status='success',
                request_id=request_id,
                prompt_hash=prompt_hash,
                response_hash=response_hash
            )
            self.logger.info(entry.to_json())
            
            return response
            
        except Exception as e:
            duration = (datetime.now() - start_time).total_seconds() * 1000
            
            entry = AuditLogEntry(
                timestamp=start_time,
                user_id=self.user_id,
                action='messages.create',
                model=kwargs.get('model', 'unknown'),
                input_tokens=0,
                output_tokens=0,
                cost=0,
                duration_ms=duration,
                status='error',
                error_message=str(e),
                request_id=request_id,
                prompt_hash=prompt_hash
            )
            self.logger.info(entry.to_json())
            
            raise
    
    def _calculate_cost(self, model: str, input_tokens: int, output_tokens: int) -> float:
        rates = {
            'claude-opus-4-5': (15, 75),
            'claude-sonnet-4-5': (3, 15),
            'claude-haiku-3-5': (0.25, 1.25)
        }
        input_rate, output_rate = rates.get(model, (3, 15))
        return (input_tokens * input_rate + output_tokens * output_rate) / 1_000_000

Structured Logging pro SIEM

import structlog

# Configure structured logging
structlog.configure(
    processors=[
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.JSONRenderer()
    ]
)

logger = structlog.get_logger("claude_audit")

def log_claude_request(
    user_id: str,
    action: str,
    model: str,
    tokens: dict,
    status: str,
    **metadata
):
    logger.info(
        "claude_api_call",
        user_id=user_id,
        action=action,
        model=model,
        input_tokens=tokens.get('input', 0),
        output_tokens=tokens.get('output', 0),
        status=status,
        **metadata
    )

Output (ready for Splunk, Datadog, etc.):

{
  "event": "claude_api_call",
  "timestamp": "2025-01-15T10:30:00Z",
  "user_id": "user_123",
  "action": "messages.create",
  "model": "claude-opus-4-5",
  "input_tokens": 1500,
  "output_tokens": 800,
  "status": "success"
}

Compliance queries

GDPR - Right to access

-- Všechny API calls pro konkrétního uživatele
SELECT * FROM claude_audit_logs
WHERE user_id = 'user_123'
ORDER BY timestamp DESC;

Usage reporting

-- Monthly usage per team
SELECT 
    team_id,
    SUM(input_tokens) as total_input,
    SUM(output_tokens) as total_output,
    SUM(cost) as total_cost,
    COUNT(*) as api_calls
FROM claude_audit_logs
WHERE timestamp >= '2025-01-01'
GROUP BY team_id;

Anomaly detection

-- Unusual usage patterns
SELECT 
    user_id,
    DATE(timestamp) as day,
    COUNT(*) as calls,
    SUM(cost) as daily_cost
FROM claude_audit_logs
GROUP BY user_id, DATE(timestamp)
HAVING daily_cost > 100  -- Alert threshold
ORDER BY daily_cost DESC;

Retention policies

# Implementace retention policy
def cleanup_old_logs(retention_days: int = 90):
    cutoff = datetime.now() - timedelta(days=retention_days)
    
    # Archive before delete (for compliance)
    archive_logs(before=cutoff)
    
    # Delete
    delete_logs(before=cutoff)
    
    log.info(f"Cleaned up logs older than {retention_days} days")

Audit logging je základ enterprise compliance.