Lesson 21 of 46 ~30 min
Course progress
0%

Building Production-Grade MCP Servers

Build a complete MCP server in Python and TypeScript — server decorators, tool schemas, error handling, input validation, and a working database query server.

This lesson walks through building a production-grade MCP server — not a toy demo, but something you would deploy behind a real application. You will build a database query server that exposes PostgreSQL as a set of safe, auditable tools.

Project Setup

Python

# Create project
mkdir mcp-db-server && cd mcp-db-server
python -m venv .venv && source .venv/bin/activate

# Install dependencies
pip install mcp[server] asyncpg pydantic

TypeScript

mkdir mcp-db-server && cd mcp-db-server
npm init -y
npm install @modelcontextprotocol/sdk zod pg
npm install -D typescript @types/pg tsx
npx tsc --init --target es2022 --module nodenext --moduleResolution nodenext

Python MCP Server — Complete Example

#!/usr/bin/env python3
"""Production MCP server for PostgreSQL database queries."""
import asyncio
import logging
import re
from contextlib import asynccontextmanager

import asyncpg
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import TextContent, Tool
from pydantic import BaseModel, field_validator

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("mcp-db")

# --- Input Validation ---

DANGEROUS_KEYWORDS = re.compile(
    r"\b(DROP|DELETE|TRUNCATE|ALTER|INSERT|UPDATE|CREATE|GRANT|REVOKE)\b",
    re.IGNORECASE,
)

class QueryInput(BaseModel):
    """Validated input for SQL queries."""
    sql: str
    limit: int = 100

    @field_validator("sql")
    @classmethod
    def validate_sql(cls, v: str) -> str:
        if DANGEROUS_KEYWORDS.search(v):
            raise ValueError("Only SELECT queries are allowed")
        if not v.strip().upper().startswith("SELECT"):
            raise ValueError("Query must start with SELECT")
        if ";" in v and v.strip().rindex(";") < len(v.strip()) - 1:
            raise ValueError("Multiple statements are not allowed")
        return v.strip()

    @field_validator("limit")
    @classmethod
    def validate_limit(cls, v: int) -> int:
        if v < 1 or v > 1000:
            raise ValueError("Limit must be between 1 and 1000")
        return v

# --- Server Definition ---

class DatabaseMCPServer:
    def __init__(self, dsn: str):
        self.dsn = dsn
        self.pool: asyncpg.Pool | None = None
        self.server = Server("database-query-server")
        self._register_handlers()

    def _register_handlers(self):
        @self.server.list_tools()
        async def list_tools() -> list[Tool]:
            return [
                Tool(
                    name="query",
                    description="Execute a read-only SQL SELECT query",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "sql": {
                                "type": "string",
                                "description": "SQL SELECT query to execute"
                            },
                            "limit": {
                                "type": "integer",
                                "description": "Max rows (1-1000, default 100)",
                                "default": 100,
                                "minimum": 1,
                                "maximum": 1000
                            }
                        },
                        "required": ["sql"]
                    }
                ),
                Tool(
                    name="list_tables",
                    description="List all tables in the database with row counts",
                    inputSchema={
                        "type": "object",
                        "properties": {},
                    }
                ),
                Tool(
                    name="describe_table",
                    description="Show column names, types, and constraints for a table",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "table_name": {
                                "type": "string",
                                "description": "Name of the table to describe",
                                "pattern": "^[a-zA-Z_][a-zA-Z0-9_]*$"
                            }
                        },
                        "required": ["table_name"]
                    }
                ),
            ]

        @self.server.call_tool()
        async def call_tool(name: str, arguments: dict) -> list[TextContent]:
            try:
                if name == "query":
                    return await self._handle_query(arguments)
                elif name == "list_tables":
                    return await self._handle_list_tables()
                elif name == "describe_table":
                    return await self._handle_describe_table(arguments)
                else:
                    return [TextContent(type="text", text=f"Unknown tool: {name}")]
            except Exception as e:
                logger.error(f"Tool '{name}' failed: {e}", exc_info=True)
                return [TextContent(
                    type="text",
                    text=f"Error: {type(e).__name__}: {str(e)}"
                )]

    async def _handle_query(self, arguments: dict) -> list[TextContent]:
        validated = QueryInput(**arguments)
        sql_with_limit = f"SELECT * FROM ({validated.sql}) AS q LIMIT {validated.limit}"

        async with self.pool.acquire() as conn:
            rows = await conn.fetch(sql_with_limit)

        if not rows:
            return [TextContent(type="text", text="Query returned 0 rows.")]

        columns = list(rows[0].keys())
        result_lines = ["\t".join(columns)]
        for row in rows:
            result_lines.append("\t".join(str(v) for v in row.values()))

        return [TextContent(
            type="text",
            text=f"Rows: {len(rows)}\n\n" + "\n".join(result_lines)
        )]

    async def _handle_list_tables(self) -> list[TextContent]:
        sql = """
            SELECT schemaname, tablename,
                   pg_stat_get_live_tuples(c.oid) AS row_count
            FROM pg_tables t
            JOIN pg_class c ON c.relname = t.tablename
            WHERE schemaname NOT IN ('pg_catalog', 'information_schema')
            ORDER BY schemaname, tablename
        """
        async with self.pool.acquire() as conn:
            rows = await conn.fetch(sql)

        lines = [f"{r['schemaname']}.{r['tablename']} ({r['row_count']} rows)" for r in rows]
        return [TextContent(type="text", text="\n".join(lines) or "No tables found.")]

    async def _handle_describe_table(self, arguments: dict) -> list[TextContent]:
        table_name = arguments["table_name"]
        if not re.match(r"^[a-zA-Z_][a-zA-Z0-9_]*$", table_name):
            return [TextContent(type="text", text="Invalid table name.")]

        sql = """
            SELECT column_name, data_type, is_nullable, column_default
            FROM information_schema.columns
            WHERE table_name = $1
            ORDER BY ordinal_position
        """
        async with self.pool.acquire() as conn:
            rows = await conn.fetch(sql, table_name)

        if not rows:
            return [TextContent(type="text", text=f"Table '{table_name}' not found.")]

        lines = [f"Table: {table_name}\n"]
        for r in rows:
            nullable = "NULL" if r["is_nullable"] == "YES" else "NOT NULL"
            default = f" DEFAULT {r['column_default']}" if r["column_default"] else ""
            lines.append(f"  {r['column_name']}: {r['data_type']} {nullable}{default}")

        return [TextContent(type="text", text="\n".join(lines))]

    @asynccontextmanager
    async def run_context(self):
        self.pool = await asyncpg.create_pool(self.dsn, min_size=2, max_size=10)
        try:
            yield
        finally:
            await self.pool.close()

    async def run(self):
        async with self.run_context():
            async with stdio_server() as (read, write):
                await self.server.run(read, write, self.server.create_initialization_options())

# --- Entry Point ---

if __name__ == "__main__":
    import os
    dsn = os.environ.get("DATABASE_URL", "postgresql://localhost:5432/myapp")
    server = DatabaseMCPServer(dsn)
    asyncio.run(server.run())

TypeScript MCP Server — Complete Example

#!/usr/bin/env npx tsx
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
  CallToolRequestSchema,
  ListToolsRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";
import { Pool } from "pg";
import { z } from "zod";

// --- Input Validation ---

const DANGEROUS_SQL = /\b(DROP|DELETE|TRUNCATE|ALTER|INSERT|UPDATE|CREATE|GRANT|REVOKE)\b/i;

const QueryInputSchema = z.object({
  sql: z
    .string()
    .refine((s) => s.trim().toUpperCase().startsWith("SELECT"), {
      message: "Query must start with SELECT",
    })
    .refine((s) => !DANGEROUS_SQL.test(s), {
      message: "Only SELECT queries are allowed",
    }),
  limit: z.number().int().min(1).max(1000).default(100),
});

const TableNameSchema = z.object({
  table_name: z.string().regex(/^[a-zA-Z_][a-zA-Z0-9_]*$/),
});

// --- Server ---

const pool = new Pool({
  connectionString: process.env.DATABASE_URL ?? "postgresql://localhost:5432/myapp",
  max: 10,
});

const server = new Server(
  { name: "database-query-server", version: "1.0.0" },
  { capabilities: { tools: {} } }
);

// List available tools
server.setRequestHandler(ListToolsRequestSchema, async () => ({
  tools: [
    {
      name: "query",
      description: "Execute a read-only SQL SELECT query",
      inputSchema: {
        type: "object" as const,
        properties: {
          sql: { type: "string", description: "SQL SELECT query to execute" },
          limit: {
            type: "integer",
            description: "Max rows (1-1000, default 100)",
            default: 100,
            minimum: 1,
            maximum: 1000,
          },
        },
        required: ["sql"],
      },
    },
    {
      name: "list_tables",
      description: "List all tables in the database with row counts",
      inputSchema: { type: "object" as const, properties: {} },
    },
    {
      name: "describe_table",
      description: "Show column names, types, and constraints for a table",
      inputSchema: {
        type: "object" as const,
        properties: {
          table_name: {
            type: "string",
            description: "Name of the table",
            pattern: "^[a-zA-Z_][a-zA-Z0-9_]*$",
          },
        },
        required: ["table_name"],
      },
    },
  ],
}));

// Handle tool calls
server.setRequestHandler(CallToolRequestSchema, async (request) => {
  const { name, arguments: args } = request.params;

  try {
    switch (name) {
      case "query": {
        const { sql, limit } = QueryInputSchema.parse(args);
        const wrappedSql = `SELECT * FROM (${sql}) AS q LIMIT ${limit}`;
        const result = await pool.query(wrappedSql);

        if (result.rows.length === 0) {
          return { content: [{ type: "text", text: "Query returned 0 rows." }] };
        }

        const columns = result.fields.map((f) => f.name);
        const lines = [columns.join("\t")];
        for (const row of result.rows) {
          lines.push(columns.map((c) => String(row[c] ?? "NULL")).join("\t"));
        }
        return {
          content: [{ type: "text", text: `Rows: ${result.rows.length}\n\n${lines.join("\n")}` }],
        };
      }

      case "list_tables": {
        const result = await pool.query(`
          SELECT schemaname, tablename,
                 pg_stat_get_live_tuples(c.oid) AS row_count
          FROM pg_tables t
          JOIN pg_class c ON c.relname = t.tablename
          WHERE schemaname NOT IN ('pg_catalog', 'information_schema')
          ORDER BY schemaname, tablename
        `);
        const lines = result.rows.map(
          (r) => `${r.schemaname}.${r.tablename} (${r.row_count} rows)`
        );
        return { content: [{ type: "text", text: lines.join("\n") || "No tables found." }] };
      }

      case "describe_table": {
        const { table_name } = TableNameSchema.parse(args);
        const result = await pool.query(
          `SELECT column_name, data_type, is_nullable, column_default
           FROM information_schema.columns
           WHERE table_name = $1
           ORDER BY ordinal_position`,
          [table_name]
        );

        if (result.rows.length === 0) {
          return { content: [{ type: "text", text: `Table '${table_name}' not found.` }] };
        }

        const lines = [`Table: ${table_name}\n`];
        for (const r of result.rows) {
          const nullable = r.is_nullable === "YES" ? "NULL" : "NOT NULL";
          const def = r.column_default ? ` DEFAULT ${r.column_default}` : "";
          lines.push(`  ${r.column_name}: ${r.data_type} ${nullable}${def}`);
        }
        return { content: [{ type: "text", text: lines.join("\n") }] };
      }

      default:
        return { content: [{ type: "text", text: `Unknown tool: ${name}` }], isError: true };
    }
  } catch (error) {
    const message = error instanceof Error ? error.message : String(error);
    console.error(`Tool '${name}' failed:`, error);
    return { content: [{ type: "text", text: `Error: ${message}` }], isError: true };
  }
});

// --- Entry Point ---

async function main() {
  const transport = new StdioServerTransport();
  await server.connect(transport);
  console.error("MCP Database Server running on stdio");
}

main().catch((err) => {
  console.error("Fatal error:", err);
  process.exit(1);
});

Error Handling Patterns

Every production MCP server must handle three error categories:

graph TD
    E[Error] --> V[Validation Error<br/>Bad input from model]
    E --> R[Runtime Error<br/>External system failure]
    E --> I[Internal Error<br/>Server bug]
    V -->|Return| V1["isError: true + helpful message<br/>Model will self-correct"]
    R -->|Return| R1["isError: true + context<br/>Model can retry or inform user"]
    I -->|Log + Return| I1["Generic error message<br/>Details in server logs only"]

Key principle: Never expose stack traces or internal details to the model. Return structured error messages that help the model recover.

@self.server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
    try:
        validated = InputSchema(**arguments)
    except ValidationError as e:
        # Validation error — help the model fix its input
        errors = "; ".join(err["msg"] for err in e.errors())
        return [TextContent(type="text", text=f"Invalid input: {errors}")]

    try:
        result = await execute_tool(validated)
        return [TextContent(type="text", text=result)]
    except asyncpg.PostgresError as e:
        # Runtime error — the query was valid but failed
        return [TextContent(type="text", text=f"Database error: {e.message}")]
    except Exception as e:
        # Internal error — log details, return generic message
        logger.exception(f"Unexpected error in tool '{name}'")
        return [TextContent(type="text", text="Internal server error. Check logs.")]

Testing Your MCP Server

Use the MCP Inspector to test without connecting to Claude:

# Install inspector
npm install -g @modelcontextprotocol/inspector

# Test your Python server
mcp-inspector python server.py

# Test your TypeScript server
mcp-inspector npx tsx server.ts

The inspector provides a web UI where you can:

  • See all registered tools, resources, and prompts
  • Call tools with custom arguments
  • View raw JSON-RPC messages
  • Validate response formats

Claude Desktop Configuration

To use your server with Claude Desktop, add it to the configuration file:

{
  "mcpServers": {
    "database": {
      "command": "python",
      "args": ["/path/to/server.py"],
      "env": {
        "DATABASE_URL": "postgresql://user:pass@localhost:5432/myapp"
      }
    }
  }
}

Production Checklist

Before deploying an MCP server to production:

CheckWhy
All inputs validatedModels hallucinate parameters — validation catches them
Dangerous operations blockedPrevent DROP TABLE, file deletion, etc.
Timeouts on all external callsOne slow query should not block the server
Structured error responsesHelp the model self-correct
Logging on every tool callAudit trail for debugging and compliance
Connection poolingHandle concurrent tool calls efficiently
Graceful shutdownClean up connections on SIGTERM
Health check endpointFor monitoring in production

In the next lesson, you will explore the official MCP servers and learn to chain multiple servers together for complex workflows.