This lesson walks through building a production-grade MCP server — not a toy demo, but something you would deploy behind a real application. You will build a database query server that exposes PostgreSQL as a set of safe, auditable tools.
Project Setup
Python
# Create project
mkdir mcp-db-server && cd mcp-db-server
python -m venv .venv && source .venv/bin/activate
# Install dependencies
pip install mcp[server] asyncpg pydantic
TypeScript
mkdir mcp-db-server && cd mcp-db-server
npm init -y
npm install @modelcontextprotocol/sdk zod pg
npm install -D typescript @types/pg tsx
npx tsc --init --target es2022 --module nodenext --moduleResolution nodenext
Python MCP Server — Complete Example
#!/usr/bin/env python3
"""Production MCP server for PostgreSQL database queries."""
import asyncio
import logging
import re
from contextlib import asynccontextmanager
import asyncpg
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import TextContent, Tool
from pydantic import BaseModel, field_validator
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("mcp-db")
# --- Input Validation ---
DANGEROUS_KEYWORDS = re.compile(
r"\b(DROP|DELETE|TRUNCATE|ALTER|INSERT|UPDATE|CREATE|GRANT|REVOKE)\b",
re.IGNORECASE,
)
class QueryInput(BaseModel):
"""Validated input for SQL queries."""
sql: str
limit: int = 100
@field_validator("sql")
@classmethod
def validate_sql(cls, v: str) -> str:
if DANGEROUS_KEYWORDS.search(v):
raise ValueError("Only SELECT queries are allowed")
if not v.strip().upper().startswith("SELECT"):
raise ValueError("Query must start with SELECT")
if ";" in v and v.strip().rindex(";") < len(v.strip()) - 1:
raise ValueError("Multiple statements are not allowed")
return v.strip()
@field_validator("limit")
@classmethod
def validate_limit(cls, v: int) -> int:
if v < 1 or v > 1000:
raise ValueError("Limit must be between 1 and 1000")
return v
# --- Server Definition ---
class DatabaseMCPServer:
def __init__(self, dsn: str):
self.dsn = dsn
self.pool: asyncpg.Pool | None = None
self.server = Server("database-query-server")
self._register_handlers()
def _register_handlers(self):
@self.server.list_tools()
async def list_tools() -> list[Tool]:
return [
Tool(
name="query",
description="Execute a read-only SQL SELECT query",
inputSchema={
"type": "object",
"properties": {
"sql": {
"type": "string",
"description": "SQL SELECT query to execute"
},
"limit": {
"type": "integer",
"description": "Max rows (1-1000, default 100)",
"default": 100,
"minimum": 1,
"maximum": 1000
}
},
"required": ["sql"]
}
),
Tool(
name="list_tables",
description="List all tables in the database with row counts",
inputSchema={
"type": "object",
"properties": {},
}
),
Tool(
name="describe_table",
description="Show column names, types, and constraints for a table",
inputSchema={
"type": "object",
"properties": {
"table_name": {
"type": "string",
"description": "Name of the table to describe",
"pattern": "^[a-zA-Z_][a-zA-Z0-9_]*$"
}
},
"required": ["table_name"]
}
),
]
@self.server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
try:
if name == "query":
return await self._handle_query(arguments)
elif name == "list_tables":
return await self._handle_list_tables()
elif name == "describe_table":
return await self._handle_describe_table(arguments)
else:
return [TextContent(type="text", text=f"Unknown tool: {name}")]
except Exception as e:
logger.error(f"Tool '{name}' failed: {e}", exc_info=True)
return [TextContent(
type="text",
text=f"Error: {type(e).__name__}: {str(e)}"
)]
async def _handle_query(self, arguments: dict) -> list[TextContent]:
validated = QueryInput(**arguments)
sql_with_limit = f"SELECT * FROM ({validated.sql}) AS q LIMIT {validated.limit}"
async with self.pool.acquire() as conn:
rows = await conn.fetch(sql_with_limit)
if not rows:
return [TextContent(type="text", text="Query returned 0 rows.")]
columns = list(rows[0].keys())
result_lines = ["\t".join(columns)]
for row in rows:
result_lines.append("\t".join(str(v) for v in row.values()))
return [TextContent(
type="text",
text=f"Rows: {len(rows)}\n\n" + "\n".join(result_lines)
)]
async def _handle_list_tables(self) -> list[TextContent]:
sql = """
SELECT schemaname, tablename,
pg_stat_get_live_tuples(c.oid) AS row_count
FROM pg_tables t
JOIN pg_class c ON c.relname = t.tablename
WHERE schemaname NOT IN ('pg_catalog', 'information_schema')
ORDER BY schemaname, tablename
"""
async with self.pool.acquire() as conn:
rows = await conn.fetch(sql)
lines = [f"{r['schemaname']}.{r['tablename']} ({r['row_count']} rows)" for r in rows]
return [TextContent(type="text", text="\n".join(lines) or "No tables found.")]
async def _handle_describe_table(self, arguments: dict) -> list[TextContent]:
table_name = arguments["table_name"]
if not re.match(r"^[a-zA-Z_][a-zA-Z0-9_]*$", table_name):
return [TextContent(type="text", text="Invalid table name.")]
sql = """
SELECT column_name, data_type, is_nullable, column_default
FROM information_schema.columns
WHERE table_name = $1
ORDER BY ordinal_position
"""
async with self.pool.acquire() as conn:
rows = await conn.fetch(sql, table_name)
if not rows:
return [TextContent(type="text", text=f"Table '{table_name}' not found.")]
lines = [f"Table: {table_name}\n"]
for r in rows:
nullable = "NULL" if r["is_nullable"] == "YES" else "NOT NULL"
default = f" DEFAULT {r['column_default']}" if r["column_default"] else ""
lines.append(f" {r['column_name']}: {r['data_type']} {nullable}{default}")
return [TextContent(type="text", text="\n".join(lines))]
@asynccontextmanager
async def run_context(self):
self.pool = await asyncpg.create_pool(self.dsn, min_size=2, max_size=10)
try:
yield
finally:
await self.pool.close()
async def run(self):
async with self.run_context():
async with stdio_server() as (read, write):
await self.server.run(read, write, self.server.create_initialization_options())
# --- Entry Point ---
if __name__ == "__main__":
import os
dsn = os.environ.get("DATABASE_URL", "postgresql://localhost:5432/myapp")
server = DatabaseMCPServer(dsn)
asyncio.run(server.run())
TypeScript MCP Server — Complete Example
#!/usr/bin/env npx tsx
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
CallToolRequestSchema,
ListToolsRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";
import { Pool } from "pg";
import { z } from "zod";
// --- Input Validation ---
const DANGEROUS_SQL = /\b(DROP|DELETE|TRUNCATE|ALTER|INSERT|UPDATE|CREATE|GRANT|REVOKE)\b/i;
const QueryInputSchema = z.object({
sql: z
.string()
.refine((s) => s.trim().toUpperCase().startsWith("SELECT"), {
message: "Query must start with SELECT",
})
.refine((s) => !DANGEROUS_SQL.test(s), {
message: "Only SELECT queries are allowed",
}),
limit: z.number().int().min(1).max(1000).default(100),
});
const TableNameSchema = z.object({
table_name: z.string().regex(/^[a-zA-Z_][a-zA-Z0-9_]*$/),
});
// --- Server ---
const pool = new Pool({
connectionString: process.env.DATABASE_URL ?? "postgresql://localhost:5432/myapp",
max: 10,
});
const server = new Server(
{ name: "database-query-server", version: "1.0.0" },
{ capabilities: { tools: {} } }
);
// List available tools
server.setRequestHandler(ListToolsRequestSchema, async () => ({
tools: [
{
name: "query",
description: "Execute a read-only SQL SELECT query",
inputSchema: {
type: "object" as const,
properties: {
sql: { type: "string", description: "SQL SELECT query to execute" },
limit: {
type: "integer",
description: "Max rows (1-1000, default 100)",
default: 100,
minimum: 1,
maximum: 1000,
},
},
required: ["sql"],
},
},
{
name: "list_tables",
description: "List all tables in the database with row counts",
inputSchema: { type: "object" as const, properties: {} },
},
{
name: "describe_table",
description: "Show column names, types, and constraints for a table",
inputSchema: {
type: "object" as const,
properties: {
table_name: {
type: "string",
description: "Name of the table",
pattern: "^[a-zA-Z_][a-zA-Z0-9_]*$",
},
},
required: ["table_name"],
},
},
],
}));
// Handle tool calls
server.setRequestHandler(CallToolRequestSchema, async (request) => {
const { name, arguments: args } = request.params;
try {
switch (name) {
case "query": {
const { sql, limit } = QueryInputSchema.parse(args);
const wrappedSql = `SELECT * FROM (${sql}) AS q LIMIT ${limit}`;
const result = await pool.query(wrappedSql);
if (result.rows.length === 0) {
return { content: [{ type: "text", text: "Query returned 0 rows." }] };
}
const columns = result.fields.map((f) => f.name);
const lines = [columns.join("\t")];
for (const row of result.rows) {
lines.push(columns.map((c) => String(row[c] ?? "NULL")).join("\t"));
}
return {
content: [{ type: "text", text: `Rows: ${result.rows.length}\n\n${lines.join("\n")}` }],
};
}
case "list_tables": {
const result = await pool.query(`
SELECT schemaname, tablename,
pg_stat_get_live_tuples(c.oid) AS row_count
FROM pg_tables t
JOIN pg_class c ON c.relname = t.tablename
WHERE schemaname NOT IN ('pg_catalog', 'information_schema')
ORDER BY schemaname, tablename
`);
const lines = result.rows.map(
(r) => `${r.schemaname}.${r.tablename} (${r.row_count} rows)`
);
return { content: [{ type: "text", text: lines.join("\n") || "No tables found." }] };
}
case "describe_table": {
const { table_name } = TableNameSchema.parse(args);
const result = await pool.query(
`SELECT column_name, data_type, is_nullable, column_default
FROM information_schema.columns
WHERE table_name = $1
ORDER BY ordinal_position`,
[table_name]
);
if (result.rows.length === 0) {
return { content: [{ type: "text", text: `Table '${table_name}' not found.` }] };
}
const lines = [`Table: ${table_name}\n`];
for (const r of result.rows) {
const nullable = r.is_nullable === "YES" ? "NULL" : "NOT NULL";
const def = r.column_default ? ` DEFAULT ${r.column_default}` : "";
lines.push(` ${r.column_name}: ${r.data_type} ${nullable}${def}`);
}
return { content: [{ type: "text", text: lines.join("\n") }] };
}
default:
return { content: [{ type: "text", text: `Unknown tool: ${name}` }], isError: true };
}
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
console.error(`Tool '${name}' failed:`, error);
return { content: [{ type: "text", text: `Error: ${message}` }], isError: true };
}
});
// --- Entry Point ---
async function main() {
const transport = new StdioServerTransport();
await server.connect(transport);
console.error("MCP Database Server running on stdio");
}
main().catch((err) => {
console.error("Fatal error:", err);
process.exit(1);
});
Error Handling Patterns
Every production MCP server must handle three error categories:
graph TD
E[Error] --> V[Validation Error<br/>Bad input from model]
E --> R[Runtime Error<br/>External system failure]
E --> I[Internal Error<br/>Server bug]
V -->|Return| V1["isError: true + helpful message<br/>Model will self-correct"]
R -->|Return| R1["isError: true + context<br/>Model can retry or inform user"]
I -->|Log + Return| I1["Generic error message<br/>Details in server logs only"]
Key principle: Never expose stack traces or internal details to the model. Return structured error messages that help the model recover.
@self.server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
try:
validated = InputSchema(**arguments)
except ValidationError as e:
# Validation error — help the model fix its input
errors = "; ".join(err["msg"] for err in e.errors())
return [TextContent(type="text", text=f"Invalid input: {errors}")]
try:
result = await execute_tool(validated)
return [TextContent(type="text", text=result)]
except asyncpg.PostgresError as e:
# Runtime error — the query was valid but failed
return [TextContent(type="text", text=f"Database error: {e.message}")]
except Exception as e:
# Internal error — log details, return generic message
logger.exception(f"Unexpected error in tool '{name}'")
return [TextContent(type="text", text="Internal server error. Check logs.")]
Testing Your MCP Server
Use the MCP Inspector to test without connecting to Claude:
# Install inspector
npm install -g @modelcontextprotocol/inspector
# Test your Python server
mcp-inspector python server.py
# Test your TypeScript server
mcp-inspector npx tsx server.ts
The inspector provides a web UI where you can:
- See all registered tools, resources, and prompts
- Call tools with custom arguments
- View raw JSON-RPC messages
- Validate response formats
Claude Desktop Configuration
To use your server with Claude Desktop, add it to the configuration file:
{
"mcpServers": {
"database": {
"command": "python",
"args": ["/path/to/server.py"],
"env": {
"DATABASE_URL": "postgresql://user:pass@localhost:5432/myapp"
}
}
}
}
Production Checklist
Before deploying an MCP server to production:
| Check | Why |
|---|---|
| All inputs validated | Models hallucinate parameters — validation catches them |
| Dangerous operations blocked | Prevent DROP TABLE, file deletion, etc. |
| Timeouts on all external calls | One slow query should not block the server |
| Structured error responses | Help the model self-correct |
| Logging on every tool call | Audit trail for debugging and compliance |
| Connection pooling | Handle concurrent tool calls efficiently |
| Graceful shutdown | Clean up connections on SIGTERM |
| Health check endpoint | For monitoring in production |
In the next lesson, you will explore the official MCP servers and learn to chain multiple servers together for complex workflows.