Lesson 42 of 46 ~20 min
Course progress
0%

MCP Security Best Practices

Secure your MCP servers for enterprise deployment — path restrictions, sandboxing, audit logging, token scoping, and input sanitization.

An MCP server is a bridge between an AI model and your production systems. Every tool call is an action that could read sensitive data, modify state, or expose infrastructure. This lesson covers the security practices required to deploy MCP servers that pass enterprise security reviews.

The MCP Threat Model

graph TD
    subgraph "Threats"
        T1[Prompt Injection<br/>Malicious input via user or context]
        T2[Parameter Hallucination<br/>Model invents dangerous arguments]
        T3[Privilege Escalation<br/>Tool used beyond intended scope]
        T4[Data Exfiltration<br/>Sensitive data leaked via model output]
        T5[Denial of Service<br/>Resource exhaustion via tool abuse]
    end
    subgraph "Mitigations"
        M1[Input Validation]
        M2[Least-Privilege Access]
        M3[Audit Logging]
        M4[Rate Limiting]
        M5[Output Filtering]
    end
    T1 --> M1
    T2 --> M1
    T3 --> M2
    T4 --> M5
    T5 --> M4

1. Input Validation — The First Line of Defense

Every tool argument must be validated before execution. Models hallucinate parameters — treat every input as untrusted.

import re
from pathlib import Path
from pydantic import BaseModel, field_validator

class FileReadInput(BaseModel):
    """Validated file read input with path restrictions."""
    path: str
    encoding: str = "utf-8"

    @field_validator("path")
    @classmethod
    def validate_path(cls, v: str) -> str:
        resolved = Path(v).resolve()

        # Allowed directories — configure per deployment
        allowed_roots = [
            Path("/app/data"),
            Path("/app/config"),
        ]

        if not any(resolved.is_relative_to(root) for root in allowed_roots):
            raise ValueError(f"Access denied: path outside allowed directories")

        # Block sensitive file patterns
        blocked = [".env", ".git", "credentials", "secrets", "id_rsa", ".pem"]
        if any(part in resolved.name.lower() for part in blocked):
            raise ValueError(f"Access denied: sensitive file pattern")

        return str(resolved)

    @field_validator("encoding")
    @classmethod
    def validate_encoding(cls, v: str) -> str:
        allowed = {"utf-8", "ascii", "latin-1"}
        if v.lower() not in allowed:
            raise ValueError(f"Encoding must be one of: {allowed}")
        return v.lower()


class SQLQueryInput(BaseModel):
    """Validated SQL input — SELECT only, no injection vectors."""
    sql: str

    @field_validator("sql")
    @classmethod
    def validate_sql(cls, v: str) -> str:
        normalized = v.strip()

        # Must start with SELECT
        if not normalized.upper().startswith("SELECT"):
            raise ValueError("Only SELECT queries are allowed")

        # Block write operations
        write_ops = re.compile(
            r"\b(INSERT|UPDATE|DELETE|DROP|CREATE|ALTER|TRUNCATE|GRANT|REVOKE|EXEC)\b",
            re.IGNORECASE
        )
        if write_ops.search(normalized):
            raise ValueError("Write operations are not permitted")

        # Block multiple statements
        if normalized.count(";") > 1:
            raise ValueError("Multiple statements are not allowed")

        # Block common injection patterns
        injection_patterns = [
            r"--\s",           # SQL comments
            r"/\*",            # Block comments
            r"UNION\s+SELECT", # UNION injection
            r"INTO\s+OUTFILE", # File writes
            r"LOAD_FILE",      # File reads
        ]
        for pattern in injection_patterns:
            if re.search(pattern, normalized, re.IGNORECASE):
                raise ValueError(f"Blocked: potentially dangerous SQL pattern")

        return normalized

2. Least-Privilege Access

Every MCP server should operate with the minimum permissions required.

Database Servers

-- Create a dedicated read-only role for MCP
CREATE ROLE mcp_readonly LOGIN PASSWORD 'strong_random_password';

-- Grant SELECT only on specific schemas
GRANT USAGE ON SCHEMA public TO mcp_readonly;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO mcp_readonly;

-- Explicitly deny write access
ALTER DEFAULT PRIVILEGES IN SCHEMA public
    GRANT SELECT ON TABLES TO mcp_readonly;

-- Block access to sensitive tables
REVOKE SELECT ON users_credentials FROM mcp_readonly;
REVOKE SELECT ON api_keys FROM mcp_readonly;
REVOKE SELECT ON audit_log FROM mcp_readonly;

Filesystem Servers

import os
import stat

class SecureFilesystemServer:
    def __init__(self, allowed_roots: list[str], read_only: bool = True):
        self.allowed_roots = [Path(r).resolve() for r in allowed_roots]
        self.read_only = read_only
        self.blocked_extensions = {
            ".env", ".pem", ".key", ".p12", ".pfx",
            ".credentials", ".secret", ".token"
        }

    def validate_access(self, path: str, write: bool = False) -> Path:
        resolved = Path(path).resolve()

        # Check allowed roots
        if not any(resolved.is_relative_to(root) for root in self.allowed_roots):
            raise PermissionError(f"Path outside allowed directories")

        # Check symlink targets (prevent symlink traversal)
        if resolved.is_symlink():
            target = resolved.readlink().resolve()
            if not any(target.is_relative_to(root) for root in self.allowed_roots):
                raise PermissionError(f"Symlink target outside allowed directories")

        # Block sensitive extensions
        if resolved.suffix.lower() in self.blocked_extensions:
            raise PermissionError(f"Access to {resolved.suffix} files is blocked")

        # Enforce read-only mode
        if write and self.read_only:
            raise PermissionError("Server is in read-only mode")

        return resolved

API Token Scoping

ServiceMinimum ScopeDo NOT Grant
GitHubrepo:read, issues:readadmin:org, delete_repo
Slackchannels:read, chat:write (specific channels)admin, users:read
Sentryproject:read, event:readproject:admin, org:write
Google MapsAPI key restricted to specific APIsUnrestricted API key

3. Audit Logging

Log every tool call with enough context to reconstruct what happened and why.

import json
import time
import logging
from dataclasses import dataclass, asdict
from datetime import datetime, timezone

audit_logger = logging.getLogger("mcp.audit")
audit_logger.setLevel(logging.INFO)

handler = logging.FileHandler("/var/log/mcp/audit.jsonl")
handler.setFormatter(logging.Formatter("%(message)s"))
audit_logger.addHandler(handler)

@dataclass
class AuditEntry:
    timestamp: str
    tool_name: str
    arguments: dict
    result_summary: str
    duration_ms: float
    success: bool
    error: str | None = None

def audited_tool_call(func):
    """Decorator that logs every tool invocation."""
    async def wrapper(name: str, arguments: dict):
        start = time.monotonic()
        entry = AuditEntry(
            timestamp=datetime.now(timezone.utc).isoformat(),
            tool_name=name,
            arguments=_redact_sensitive(arguments),
            result_summary="",
            duration_ms=0,
            success=False,
        )
        try:
            result = await func(name, arguments)
            entry.success = True
            entry.result_summary = _summarize_result(result)
            return result
        except Exception as e:
            entry.error = f"{type(e).__name__}: {str(e)}"
            raise
        finally:
            entry.duration_ms = (time.monotonic() - start) * 1000
            audit_logger.info(json.dumps(asdict(entry)))
    return wrapper

def _redact_sensitive(args: dict) -> dict:
    """Remove sensitive values from logged arguments."""
    sensitive_keys = {"password", "token", "secret", "api_key", "credentials"}
    return {
        k: "***REDACTED***" if k.lower() in sensitive_keys else v
        for k, v in args.items()
    }

def _summarize_result(result) -> str:
    """Create a short summary without logging full data."""
    text = str(result)
    if len(text) > 200:
        return text[:200] + f"... ({len(text)} chars total)"
    return text

Example audit log entry:

{
  "timestamp": "2026-02-10T14:32:01.234Z",
  "tool_name": "query",
  "arguments": {"sql": "SELECT count(*) FROM orders WHERE status = 'pending'"},
  "result_summary": "Rows: 1\n\ncount\n42",
  "duration_ms": 23.4,
  "success": true,
  "error": null
}

4. Rate Limiting and Resource Protection

Prevent a runaway model conversation from exhausting your infrastructure.

import asyncio
from collections import defaultdict
from time import monotonic

class RateLimiter:
    """Per-tool rate limiting with sliding window."""

    def __init__(self, default_rpm: int = 60, default_max_concurrent: int = 5):
        self.default_rpm = default_rpm
        self.default_max_concurrent = default_max_concurrent
        self.call_times: dict[str, list[float]] = defaultdict(list)
        self.semaphores: dict[str, asyncio.Semaphore] = {}
        self.tool_limits: dict[str, int] = {}

    def configure(self, tool_name: str, rpm: int, max_concurrent: int = 5):
        self.tool_limits[tool_name] = rpm
        self.semaphores[tool_name] = asyncio.Semaphore(max_concurrent)

    async def acquire(self, tool_name: str):
        rpm_limit = self.tool_limits.get(tool_name, self.default_rpm)
        now = monotonic()

        # Clean old entries
        self.call_times[tool_name] = [
            t for t in self.call_times[tool_name] if now - t < 60
        ]

        if len(self.call_times[tool_name]) >= rpm_limit:
            raise RuntimeError(
                f"Rate limit exceeded for '{tool_name}': {rpm_limit} calls/minute"
            )

        self.call_times[tool_name].append(now)

        sem = self.semaphores.get(
            tool_name,
            asyncio.Semaphore(self.default_max_concurrent)
        )
        await sem.acquire()
        return sem

# Usage
limiter = RateLimiter()
limiter.configure("query", rpm=30, max_concurrent=3)
limiter.configure("write_file", rpm=10, max_concurrent=1)

5. Output Filtering

Prevent sensitive data from leaking through model responses.

import re

class OutputFilter:
    """Filter sensitive patterns from tool results before returning to model."""

    PATTERNS = [
        (re.compile(r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b"), "[EMAIL]"),
        (re.compile(r"\b\d{3}-\d{2}-\d{4}\b"), "[SSN]"),
        (re.compile(r"\b4[0-9]{12}(?:[0-9]{3})?\b"), "[CARD]"),
        (re.compile(r"\b5[1-5][0-9]{14}\b"), "[CARD]"),
        (re.compile(r"\b(?:sk-|ghp_|xoxb-|AIza)[A-Za-z0-9_-]{20,}\b"), "[API_KEY]"),
        (re.compile(r"-----BEGIN (?:RSA )?PRIVATE KEY-----"), "[PRIVATE_KEY]"),
    ]

    @classmethod
    def filter(cls, text: str) -> str:
        for pattern, replacement in cls.PATTERNS:
            text = pattern.sub(replacement, text)
        return text

Apply the filter to every tool result before returning to the client:

@self.server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
    result = await execute_tool(name, arguments)
    filtered = OutputFilter.filter(result)
    return [TextContent(type="text", text=filtered)]

Enterprise Security Checklist

Use this checklist before submitting an MCP server for security review:

CategoryRequirementStatus
InputAll tool arguments validated with strict schemas
InputPath traversal attacks blocked (symlinks, ../)
InputSQL injection patterns detected and rejected
AccessLeast-privilege credentials for all external services
AccessAPI tokens scoped to minimum required permissions
AccessFilesystem access restricted to explicit allowlists
LoggingEvery tool call logged with timestamp, args, result
LoggingSensitive values redacted in logs
LoggingLog retention and rotation configured
LimitsPer-tool rate limiting enforced
LimitsMaximum result size bounded
LimitsTimeouts on all external calls
OutputPII and secrets filtered from results
OutputStack traces never exposed to model
InfraServer runs in sandboxed environment (container, VM)
InfraNetwork egress restricted to required endpoints
InfraDependencies audited and pinned

Sandboxing with Containers

For maximum isolation, run MCP servers in containers with restricted capabilities:

FROM python:3.12-slim
WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY server.py .

# Run as non-root
RUN useradd -r -s /bin/false mcpuser
USER mcpuser

# No network access needed for filesystem server
# Use --network=none when running
CMD ["python", "server.py"]
# Run with minimal permissions
docker run \
  --network=none \
  --read-only \
  --tmpfs /tmp:size=100M \
  --cap-drop=ALL \
  --security-opt=no-new-privileges \
  -v /app/data:/data:ro \
  mcp-filesystem-server

Security Architecture for Multi-Server Deployments

graph TB
    U[User / Claude Desktop] --> GW[MCP Gateway<br/>Auth + Rate Limiting + Audit]
    GW --> S1[Filesystem Server<br/>Read-only, sandboxed]
    GW --> S2[Database Server<br/>SELECT-only role]
    GW --> S3[GitHub Server<br/>Scoped token]
    GW --> S4[Slack Server<br/>Bot token, 2 channels]
    S1 --> FS["/app/data<br/>/app/config"]
    S2 --> DB[(PostgreSQL<br/>mcp_readonly role)]
    S3 --> GH[GitHub API<br/>repo:read only]
    S4 --> SL[Slack API<br/>#incidents, #deployments]

The gateway pattern centralizes authentication, rate limiting, and audit logging — individual servers do not need to reimplement these concerns.

This completes the MCP & Tool Ecosystem module. You now have the knowledge to build, deploy, and secure MCP servers for production use with Opus 4.6.