Audit logging je kritický pro compliance, debugging a security monitoring.
Proč audit logging?
- Compliance - SOC 2, GDPR, HIPAA vyžadují audit trail
- Security - Detekce anomálií a breach investigation
- Debugging - Pochopení co se stalo
- Cost tracking - Sledování usage per user/team
Základní audit log struktura
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
import json
@dataclass
class AuditLogEntry:
timestamp: datetime
user_id: str
action: str
model: str
input_tokens: int
output_tokens: int
cost: float
duration_ms: float
status: str # success, error, rate_limited
error_message: Optional[str] = None
# Metadata
session_id: Optional[str] = None
request_id: Optional[str] = None
ip_address: Optional[str] = None
user_agent: Optional[str] = None
# Content hashes (not content itself!)
prompt_hash: Optional[str] = None
response_hash: Optional[str] = None
def to_json(self) -> str:
return json.dumps({
**self.__dict__,
'timestamp': self.timestamp.isoformat()
})
Audit Logger Implementation
import hashlib
import logging
from anthropic import Anthropic
class AuditedClaudeClient:
def __init__(self, user_id: str):
self.client = Anthropic()
self.user_id = user_id
self.logger = logging.getLogger('claude_audit')
# Configure logging
handler = logging.FileHandler('claude_audit.log')
handler.setFormatter(logging.Formatter('%(message)s'))
self.logger.addHandler(handler)
self.logger.setLevel(logging.INFO)
def _hash_content(self, content: str) -> str:
"""Hash content for audit without storing actual content"""
return hashlib.sha256(content.encode()).hexdigest()[:16]
def messages_create(self, **kwargs):
start_time = datetime.now()
request_id = str(uuid.uuid4())
# Extract prompt for hashing
prompt = str(kwargs.get('messages', []))
prompt_hash = self._hash_content(prompt)
try:
response = self.client.messages.create(**kwargs)
# Calculate metrics
duration = (datetime.now() - start_time).total_seconds() * 1000
input_tokens = response.usage.input_tokens
output_tokens = response.usage.output_tokens
cost = self._calculate_cost(kwargs.get('model'), input_tokens, output_tokens)
# Response hash
response_text = str(response.content)
response_hash = self._hash_content(response_text)
# Log success
entry = AuditLogEntry(
timestamp=start_time,
user_id=self.user_id,
action='messages.create',
model=kwargs.get('model', 'unknown'),
input_tokens=input_tokens,
output_tokens=output_tokens,
cost=cost,
duration_ms=duration,
status='success',
request_id=request_id,
prompt_hash=prompt_hash,
response_hash=response_hash
)
self.logger.info(entry.to_json())
return response
except Exception as e:
duration = (datetime.now() - start_time).total_seconds() * 1000
entry = AuditLogEntry(
timestamp=start_time,
user_id=self.user_id,
action='messages.create',
model=kwargs.get('model', 'unknown'),
input_tokens=0,
output_tokens=0,
cost=0,
duration_ms=duration,
status='error',
error_message=str(e),
request_id=request_id,
prompt_hash=prompt_hash
)
self.logger.info(entry.to_json())
raise
def _calculate_cost(self, model: str, input_tokens: int, output_tokens: int) -> float:
rates = {
'claude-opus-4-5': (15, 75),
'claude-sonnet-4-5': (3, 15),
'claude-haiku-3-5': (0.25, 1.25)
}
input_rate, output_rate = rates.get(model, (3, 15))
return (input_tokens * input_rate + output_tokens * output_rate) / 1_000_000
Structured Logging pro SIEM
import structlog
# Configure structured logging
structlog.configure(
processors=[
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer()
]
)
logger = structlog.get_logger("claude_audit")
def log_claude_request(
user_id: str,
action: str,
model: str,
tokens: dict,
status: str,
**metadata
):
logger.info(
"claude_api_call",
user_id=user_id,
action=action,
model=model,
input_tokens=tokens.get('input', 0),
output_tokens=tokens.get('output', 0),
status=status,
**metadata
)
Output (ready for Splunk, Datadog, etc.):
{
"event": "claude_api_call",
"timestamp": "2025-01-15T10:30:00Z",
"user_id": "user_123",
"action": "messages.create",
"model": "claude-opus-4-5",
"input_tokens": 1500,
"output_tokens": 800,
"status": "success"
}
Compliance queries
GDPR - Right to access
-- Všechny API calls pro konkrétního uživatele
SELECT * FROM claude_audit_logs
WHERE user_id = 'user_123'
ORDER BY timestamp DESC;
Usage reporting
-- Monthly usage per team
SELECT
team_id,
SUM(input_tokens) as total_input,
SUM(output_tokens) as total_output,
SUM(cost) as total_cost,
COUNT(*) as api_calls
FROM claude_audit_logs
WHERE timestamp >= '2025-01-01'
GROUP BY team_id;
Anomaly detection
-- Unusual usage patterns
SELECT
user_id,
DATE(timestamp) as day,
COUNT(*) as calls,
SUM(cost) as daily_cost
FROM claude_audit_logs
GROUP BY user_id, DATE(timestamp)
HAVING daily_cost > 100 -- Alert threshold
ORDER BY daily_cost DESC;
Retention policies
# Implementace retention policy
def cleanup_old_logs(retention_days: int = 90):
cutoff = datetime.now() - timedelta(days=retention_days)
# Archive before delete (for compliance)
archive_logs(before=cutoff)
# Delete
delete_logs(before=cutoff)
log.info(f"Cleaned up logs older than {retention_days} days")
Audit logging je základ enterprise compliance.