An MCP server is a bridge between an AI model and your production systems. Every tool call is an action that could read sensitive data, modify state, or expose infrastructure. This lesson covers the security practices required to deploy MCP servers that pass enterprise security reviews.
The MCP Threat Model
graph TD
subgraph "Threats"
T1[Prompt Injection<br/>Malicious input via user or context]
T2[Parameter Hallucination<br/>Model invents dangerous arguments]
T3[Privilege Escalation<br/>Tool used beyond intended scope]
T4[Data Exfiltration<br/>Sensitive data leaked via model output]
T5[Denial of Service<br/>Resource exhaustion via tool abuse]
end
subgraph "Mitigations"
M1[Input Validation]
M2[Least-Privilege Access]
M3[Audit Logging]
M4[Rate Limiting]
M5[Output Filtering]
end
T1 --> M1
T2 --> M1
T3 --> M2
T4 --> M5
T5 --> M4
1. Input Validation — The First Line of Defense
Every tool argument must be validated before execution. Models hallucinate parameters — treat every input as untrusted.
import re
from pathlib import Path
from pydantic import BaseModel, field_validator
class FileReadInput(BaseModel):
"""Validated file read input with path restrictions."""
path: str
encoding: str = "utf-8"
@field_validator("path")
@classmethod
def validate_path(cls, v: str) -> str:
resolved = Path(v).resolve()
# Allowed directories — configure per deployment
allowed_roots = [
Path("/app/data"),
Path("/app/config"),
]
if not any(resolved.is_relative_to(root) for root in allowed_roots):
raise ValueError(f"Access denied: path outside allowed directories")
# Block sensitive file patterns
blocked = [".env", ".git", "credentials", "secrets", "id_rsa", ".pem"]
if any(part in resolved.name.lower() for part in blocked):
raise ValueError(f"Access denied: sensitive file pattern")
return str(resolved)
@field_validator("encoding")
@classmethod
def validate_encoding(cls, v: str) -> str:
allowed = {"utf-8", "ascii", "latin-1"}
if v.lower() not in allowed:
raise ValueError(f"Encoding must be one of: {allowed}")
return v.lower()
class SQLQueryInput(BaseModel):
"""Validated SQL input — SELECT only, no injection vectors."""
sql: str
@field_validator("sql")
@classmethod
def validate_sql(cls, v: str) -> str:
normalized = v.strip()
# Must start with SELECT
if not normalized.upper().startswith("SELECT"):
raise ValueError("Only SELECT queries are allowed")
# Block write operations
write_ops = re.compile(
r"\b(INSERT|UPDATE|DELETE|DROP|CREATE|ALTER|TRUNCATE|GRANT|REVOKE|EXEC)\b",
re.IGNORECASE
)
if write_ops.search(normalized):
raise ValueError("Write operations are not permitted")
# Block multiple statements
if normalized.count(";") > 1:
raise ValueError("Multiple statements are not allowed")
# Block common injection patterns
injection_patterns = [
r"--\s", # SQL comments
r"/\*", # Block comments
r"UNION\s+SELECT", # UNION injection
r"INTO\s+OUTFILE", # File writes
r"LOAD_FILE", # File reads
]
for pattern in injection_patterns:
if re.search(pattern, normalized, re.IGNORECASE):
raise ValueError(f"Blocked: potentially dangerous SQL pattern")
return normalized
2. Least-Privilege Access
Every MCP server should operate with the minimum permissions required.
Database Servers
-- Create a dedicated read-only role for MCP
CREATE ROLE mcp_readonly LOGIN PASSWORD 'strong_random_password';
-- Grant SELECT only on specific schemas
GRANT USAGE ON SCHEMA public TO mcp_readonly;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO mcp_readonly;
-- Explicitly deny write access
ALTER DEFAULT PRIVILEGES IN SCHEMA public
GRANT SELECT ON TABLES TO mcp_readonly;
-- Block access to sensitive tables
REVOKE SELECT ON users_credentials FROM mcp_readonly;
REVOKE SELECT ON api_keys FROM mcp_readonly;
REVOKE SELECT ON audit_log FROM mcp_readonly;
Filesystem Servers
import os
import stat
class SecureFilesystemServer:
def __init__(self, allowed_roots: list[str], read_only: bool = True):
self.allowed_roots = [Path(r).resolve() for r in allowed_roots]
self.read_only = read_only
self.blocked_extensions = {
".env", ".pem", ".key", ".p12", ".pfx",
".credentials", ".secret", ".token"
}
def validate_access(self, path: str, write: bool = False) -> Path:
resolved = Path(path).resolve()
# Check allowed roots
if not any(resolved.is_relative_to(root) for root in self.allowed_roots):
raise PermissionError(f"Path outside allowed directories")
# Check symlink targets (prevent symlink traversal)
if resolved.is_symlink():
target = resolved.readlink().resolve()
if not any(target.is_relative_to(root) for root in self.allowed_roots):
raise PermissionError(f"Symlink target outside allowed directories")
# Block sensitive extensions
if resolved.suffix.lower() in self.blocked_extensions:
raise PermissionError(f"Access to {resolved.suffix} files is blocked")
# Enforce read-only mode
if write and self.read_only:
raise PermissionError("Server is in read-only mode")
return resolved
API Token Scoping
| Service | Minimum Scope | Do NOT Grant |
|---|---|---|
| GitHub | repo:read, issues:read | admin:org, delete_repo |
| Slack | channels:read, chat:write (specific channels) | admin, users:read |
| Sentry | project:read, event:read | project:admin, org:write |
| Google Maps | API key restricted to specific APIs | Unrestricted API key |
3. Audit Logging
Log every tool call with enough context to reconstruct what happened and why.
import json
import time
import logging
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
audit_logger = logging.getLogger("mcp.audit")
audit_logger.setLevel(logging.INFO)
handler = logging.FileHandler("/var/log/mcp/audit.jsonl")
handler.setFormatter(logging.Formatter("%(message)s"))
audit_logger.addHandler(handler)
@dataclass
class AuditEntry:
timestamp: str
tool_name: str
arguments: dict
result_summary: str
duration_ms: float
success: bool
error: str | None = None
def audited_tool_call(func):
"""Decorator that logs every tool invocation."""
async def wrapper(name: str, arguments: dict):
start = time.monotonic()
entry = AuditEntry(
timestamp=datetime.now(timezone.utc).isoformat(),
tool_name=name,
arguments=_redact_sensitive(arguments),
result_summary="",
duration_ms=0,
success=False,
)
try:
result = await func(name, arguments)
entry.success = True
entry.result_summary = _summarize_result(result)
return result
except Exception as e:
entry.error = f"{type(e).__name__}: {str(e)}"
raise
finally:
entry.duration_ms = (time.monotonic() - start) * 1000
audit_logger.info(json.dumps(asdict(entry)))
return wrapper
def _redact_sensitive(args: dict) -> dict:
"""Remove sensitive values from logged arguments."""
sensitive_keys = {"password", "token", "secret", "api_key", "credentials"}
return {
k: "***REDACTED***" if k.lower() in sensitive_keys else v
for k, v in args.items()
}
def _summarize_result(result) -> str:
"""Create a short summary without logging full data."""
text = str(result)
if len(text) > 200:
return text[:200] + f"... ({len(text)} chars total)"
return text
Example audit log entry:
{
"timestamp": "2026-02-10T14:32:01.234Z",
"tool_name": "query",
"arguments": {"sql": "SELECT count(*) FROM orders WHERE status = 'pending'"},
"result_summary": "Rows: 1\n\ncount\n42",
"duration_ms": 23.4,
"success": true,
"error": null
}
4. Rate Limiting and Resource Protection
Prevent a runaway model conversation from exhausting your infrastructure.
import asyncio
from collections import defaultdict
from time import monotonic
class RateLimiter:
"""Per-tool rate limiting with sliding window."""
def __init__(self, default_rpm: int = 60, default_max_concurrent: int = 5):
self.default_rpm = default_rpm
self.default_max_concurrent = default_max_concurrent
self.call_times: dict[str, list[float]] = defaultdict(list)
self.semaphores: dict[str, asyncio.Semaphore] = {}
self.tool_limits: dict[str, int] = {}
def configure(self, tool_name: str, rpm: int, max_concurrent: int = 5):
self.tool_limits[tool_name] = rpm
self.semaphores[tool_name] = asyncio.Semaphore(max_concurrent)
async def acquire(self, tool_name: str):
rpm_limit = self.tool_limits.get(tool_name, self.default_rpm)
now = monotonic()
# Clean old entries
self.call_times[tool_name] = [
t for t in self.call_times[tool_name] if now - t < 60
]
if len(self.call_times[tool_name]) >= rpm_limit:
raise RuntimeError(
f"Rate limit exceeded for '{tool_name}': {rpm_limit} calls/minute"
)
self.call_times[tool_name].append(now)
sem = self.semaphores.get(
tool_name,
asyncio.Semaphore(self.default_max_concurrent)
)
await sem.acquire()
return sem
# Usage
limiter = RateLimiter()
limiter.configure("query", rpm=30, max_concurrent=3)
limiter.configure("write_file", rpm=10, max_concurrent=1)
5. Output Filtering
Prevent sensitive data from leaking through model responses.
import re
class OutputFilter:
"""Filter sensitive patterns from tool results before returning to model."""
PATTERNS = [
(re.compile(r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b"), "[EMAIL]"),
(re.compile(r"\b\d{3}-\d{2}-\d{4}\b"), "[SSN]"),
(re.compile(r"\b4[0-9]{12}(?:[0-9]{3})?\b"), "[CARD]"),
(re.compile(r"\b5[1-5][0-9]{14}\b"), "[CARD]"),
(re.compile(r"\b(?:sk-|ghp_|xoxb-|AIza)[A-Za-z0-9_-]{20,}\b"), "[API_KEY]"),
(re.compile(r"-----BEGIN (?:RSA )?PRIVATE KEY-----"), "[PRIVATE_KEY]"),
]
@classmethod
def filter(cls, text: str) -> str:
for pattern, replacement in cls.PATTERNS:
text = pattern.sub(replacement, text)
return text
Apply the filter to every tool result before returning to the client:
@self.server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
result = await execute_tool(name, arguments)
filtered = OutputFilter.filter(result)
return [TextContent(type="text", text=filtered)]
Enterprise Security Checklist
Use this checklist before submitting an MCP server for security review:
| Category | Requirement | Status |
|---|---|---|
| Input | All tool arguments validated with strict schemas | ☐ |
| Input | Path traversal attacks blocked (symlinks, ../) | ☐ |
| Input | SQL injection patterns detected and rejected | ☐ |
| Access | Least-privilege credentials for all external services | ☐ |
| Access | API tokens scoped to minimum required permissions | ☐ |
| Access | Filesystem access restricted to explicit allowlists | ☐ |
| Logging | Every tool call logged with timestamp, args, result | ☐ |
| Logging | Sensitive values redacted in logs | ☐ |
| Logging | Log retention and rotation configured | ☐ |
| Limits | Per-tool rate limiting enforced | ☐ |
| Limits | Maximum result size bounded | ☐ |
| Limits | Timeouts on all external calls | ☐ |
| Output | PII and secrets filtered from results | ☐ |
| Output | Stack traces never exposed to model | ☐ |
| Infra | Server runs in sandboxed environment (container, VM) | ☐ |
| Infra | Network egress restricted to required endpoints | ☐ |
| Infra | Dependencies audited and pinned | ☐ |
Sandboxing with Containers
For maximum isolation, run MCP servers in containers with restricted capabilities:
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY server.py .
# Run as non-root
RUN useradd -r -s /bin/false mcpuser
USER mcpuser
# No network access needed for filesystem server
# Use --network=none when running
CMD ["python", "server.py"]
# Run with minimal permissions
docker run \
--network=none \
--read-only \
--tmpfs /tmp:size=100M \
--cap-drop=ALL \
--security-opt=no-new-privileges \
-v /app/data:/data:ro \
mcp-filesystem-server
Security Architecture for Multi-Server Deployments
graph TB
U[User / Claude Desktop] --> GW[MCP Gateway<br/>Auth + Rate Limiting + Audit]
GW --> S1[Filesystem Server<br/>Read-only, sandboxed]
GW --> S2[Database Server<br/>SELECT-only role]
GW --> S3[GitHub Server<br/>Scoped token]
GW --> S4[Slack Server<br/>Bot token, 2 channels]
S1 --> FS["/app/data<br/>/app/config"]
S2 --> DB[(PostgreSQL<br/>mcp_readonly role)]
S3 --> GH[GitHub API<br/>repo:read only]
S4 --> SL[Slack API<br/>#incidents, #deployments]
The gateway pattern centralizes authentication, rate limiting, and audit logging — individual servers do not need to reimplement these concerns.
This completes the MCP & Tool Ecosystem module. You now have the knowledge to build, deploy, and secure MCP servers for production use with Opus 4.6.