Szczegółowa analiza funkcji protokołu MCP
February 8, 2026 · View on GitHub
Ten przewodnik omawia zaawansowane funkcje protokołu MCP, które wykraczają poza podstawową obsługę narzędzi i zasobów. Zrozumienie tych funkcji pomaga tworzyć bardziej niezawodne, przyjazne dla użytkownika i gotowe do produkcji serwery MCP.
Omówione funkcje
- Powiadomienia o postępie - raportowanie postępu dla operacji długotrwałych
- Anulowanie żądań - pozwalanie klientom na anulowanie trwających żądań
- Szablony zasobów - dynamiczne URI zasobów z parametrami
- Zdarzenia cyklu życia serwera - właściwa inicjalizacja i zamykanie
- Kontrola logowania - konfigurowanie logowania po stronie serwera
- Wzorce obsługi błędów - spójne odpowiedzi błędów
1. Powiadomienia o postępie
Dla operacji, które wymagają czasu (przetwarzanie danych, pobieranie plików, wywołania API), powiadomienia o postępie informują użytkowników o stanie.
Jak to działa
sequenceDiagram
participant Client
participant Server
Client->>Server: tools/call (długa operacja)
Server-->>Client: powiadomienie: postęp 10%
Server-->>Client: powiadomienie: postęp 50%
Server-->>Client: powiadomienie: postęp 90%
Server->>Client: wynik (zakończono)
Implementacja w Pythonie
from mcp.server import Server, NotificationOptions
from mcp.types import ProgressNotification
import asyncio
app = Server("progress-server")
@app.tool()
async def process_large_file(file_path: str, ctx) -> str:
"""Process a large file with progress updates."""
# Pobierz rozmiar pliku do obliczania postępu
file_size = os.path.getsize(file_path)
processed = 0
with open(file_path, 'rb') as f:
while chunk := f.read(8192):
# Przetwórz fragment
await process_chunk(chunk)
processed += len(chunk)
# Wyślij powiadomienie o postępie
progress = (processed / file_size) * 100
await ctx.send_notification(
ProgressNotification(
progressToken=ctx.request_id,
progress=progress,
total=100,
message=f"Processing: {progress:.1f}%"
)
)
return f"Processed {file_size} bytes"
@app.tool()
async def batch_operation(items: list[str], ctx) -> str:
"""Process multiple items with progress."""
results = []
total = len(items)
for i, item in enumerate(items):
result = await process_item(item)
results.append(result)
# Zgłoś postęp po każdym elemencie
await ctx.send_notification(
ProgressNotification(
progressToken=ctx.request_id,
progress=i + 1,
total=total,
message=f"Processed {i + 1}/{total}: {item}"
)
)
return f"Completed {total} items"
Implementacja w TypeScript
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
server.setRequestHandler(CallToolSchema, async (request, extra) => {
const { name, arguments: args } = request.params;
if (name === "process_data") {
const items = args.items as string[];
const results = [];
for (let i = 0; i < items.length; i++) {
const result = await processItem(items[i]);
results.push(result);
// Wyślij powiadomienie o postępie
await extra.sendNotification({
method: "notifications/progress",
params: {
progressToken: request.id,
progress: i + 1,
total: items.length,
message: `Processing item ${i + 1}/${items.length}`
}
});
}
return { content: [{ type: "text", text: JSON.stringify(results) }] };
}
});
Obsługa po stronie klienta (Python)
async def handle_progress(notification):
"""Handle progress notifications from server."""
params = notification.params
print(f"Progress: {params.progress}/{params.total} - {params.message}")
# Rejestruj obsługę
session.on_notification("notifications/progress", handle_progress)
# Wywołaj narzędzie (aktualizacje postępu będą przychodzić przez obsługę)
result = await session.call_tool("process_large_file", {"file_path": "/data/large.csv"})
2. Anulowanie żądań
Pozwalanie klientom na anulowanie żądań, które nie są już potrzebne lub zajmują zbyt dużo czasu.
Implementacja w Pythonie
from mcp.server import Server
from mcp.types import CancelledError
import asyncio
app = Server("cancellable-server")
@app.tool()
async def long_running_search(query: str, ctx) -> str:
"""Search that can be cancelled."""
results = []
try:
for page in range(100): # Przeszukaj wiele stron
# Sprawdź, czy anulowanie zostało zażądane
if ctx.is_cancelled:
raise CancelledError("Search cancelled by user")
# Symuluj wyszukiwanie na stronie
page_results = await search_page(query, page)
results.extend(page_results)
# Krótkie opóźnienie pozwala na sprawdzanie anulowania
await asyncio.sleep(0.1)
except CancelledError:
# Zwróć wyniki częściowe
return f"Cancelled. Found {len(results)} results before cancellation."
return f"Found {len(results)} total results"
@app.tool()
async def download_file(url: str, ctx) -> str:
"""Download with cancellation support."""
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
total_size = int(response.headers.get('content-length', 0))
downloaded = 0
chunks = []
async for chunk in response.content.iter_chunked(8192):
if ctx.is_cancelled:
return f"Download cancelled at {downloaded}/{total_size} bytes"
chunks.append(chunk)
downloaded += len(chunk)
return f"Downloaded {downloaded} bytes"
Implementacja kontekstu anulowania
class CancellableContext:
"""Context object that tracks cancellation state."""
def __init__(self, request_id: str):
self.request_id = request_id
self._cancelled = asyncio.Event()
self._cancel_reason = None
@property
def is_cancelled(self) -> bool:
return self._cancelled.is_set()
def cancel(self, reason: str = "Cancelled"):
self._cancel_reason = reason
self._cancelled.set()
async def check_cancelled(self):
"""Raise if cancelled, otherwise continue."""
if self.is_cancelled:
raise CancelledError(self._cancel_reason)
async def sleep_or_cancel(self, seconds: float):
"""Sleep that can be interrupted by cancellation."""
try:
await asyncio.wait_for(
self._cancelled.wait(),
timeout=seconds
)
raise CancelledError(self._cancel_reason)
except asyncio.TimeoutError:
pass # Normalny limit czasu, kontynuuj
Anulowanie po stronie klienta
import asyncio
async def search_with_timeout(session, query, timeout=30):
"""Search with automatic cancellation on timeout."""
task = asyncio.create_task(
session.call_tool("long_running_search", {"query": query})
)
try:
result = await asyncio.wait_for(task, timeout=timeout)
return result
except asyncio.TimeoutError:
# Żądanie anulowania
await session.send_notification({
"method": "notifications/cancelled",
"params": {"requestId": task.request_id, "reason": "Timeout"}
})
return "Search timed out"
3. Szablony zasobów
Szablony zasobów umożliwiają dynamiczne tworzenie URI z parametrami, przydatne w API i bazach danych.
Definiowanie szablonów
from mcp.server import Server
from mcp.types import ResourceTemplate
app = Server("template-server")
@app.list_resource_templates()
async def list_templates() -> list[ResourceTemplate]:
"""Return available resource templates."""
return [
ResourceTemplate(
uriTemplate="db://users/{user_id}",
name="User Profile",
description="Fetch user profile by ID",
mimeType="application/json"
),
ResourceTemplate(
uriTemplate="api://weather/{city}/{date}",
name="Weather Data",
description="Historical weather for city and date",
mimeType="application/json"
),
ResourceTemplate(
uriTemplate="file://{path}",
name="File Content",
description="Read file at given path",
mimeType="text/plain"
)
]
@app.read_resource()
async def read_resource(uri: str) -> str:
"""Read resource, expanding template parameters."""
# Analizuj URI, aby wyodrębnić parametry
if uri.startswith("db://users/"):
user_id = uri.split("/")[-1]
return await fetch_user(user_id)
elif uri.startswith("api://weather/"):
parts = uri.replace("api://weather/", "").split("/")
city, date = parts[0], parts[1]
return await fetch_weather(city, date)
elif uri.startswith("file://"):
path = uri.replace("file://", "")
return await read_file(path)
raise ValueError(f"Unknown resource URI: {uri}")
Implementacja w TypeScript
server.setRequestHandler(ListResourceTemplatesSchema, async () => {
return {
resourceTemplates: [
{
uriTemplate: "github://repos/{owner}/{repo}/issues/{issue_number}",
name: "GitHub Issue",
description: "Fetch a specific GitHub issue",
mimeType: "application/json"
},
{
uriTemplate: "db://tables/{table}/rows/{id}",
name: "Database Row",
description: "Fetch a row from a database table",
mimeType: "application/json"
}
]
};
});
server.setRequestHandler(ReadResourceSchema, async (request) => {
const uri = request.params.uri;
// Parsuj URI problemu GitHub
const githubMatch = uri.match(/^github:\/\/repos\/([^/]+)\/([^/]+)\/issues\/(\d+)$/);
if (githubMatch) {
const [_, owner, repo, issueNumber] = githubMatch;
const issue = await fetchGitHubIssue(owner, repo, parseInt(issueNumber));
return {
contents: [{
uri,
mimeType: "application/json",
text: JSON.stringify(issue, null, 2)
}]
};
}
throw new Error(`Unknown resource URI: ${uri}`);
});
4. Zdarzenia cyklu życia serwera
Właściwe zarządzanie inicjalizacją i zamykaniem zapewnia czyste zarządzanie zasobami.
Zarządzanie cyklem życia w Pythonie
from mcp.server import Server
from contextlib import asynccontextmanager
app = Server("lifecycle-server")
# Wspólny stan
db_connection = None
cache = None
@asynccontextmanager
async def lifespan(server: Server):
"""Manage server lifecycle."""
global db_connection, cache
# Uruchomienie
print("🚀 Server starting...")
db_connection = await create_database_connection()
cache = await create_cache_client()
print("✅ Resources initialized")
yield # Serwer działa tutaj
# Zamknięcie
print("🛑 Server shutting down...")
await db_connection.close()
await cache.close()
print("✅ Resources cleaned up")
app = Server("lifecycle-server", lifespan=lifespan)
@app.tool()
async def query_database(sql: str) -> str:
"""Use the shared database connection."""
result = await db_connection.execute(sql)
return str(result)
Cykl życia w TypeScript
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
class ManagedServer {
private server: Server;
private dbConnection: DatabaseConnection | null = null;
constructor() {
this.server = new Server({
name: "lifecycle-server",
version: "1.0.0"
});
this.setupHandlers();
}
async start() {
// Inicjalizuj zasoby
console.log("🚀 Server starting...");
this.dbConnection = await createDatabaseConnection();
console.log("✅ Database connected");
// Uruchom serwer
await this.server.connect(transport);
}
async stop() {
// Zwolnij zasoby
console.log("🛑 Server shutting down...");
if (this.dbConnection) {
await this.dbConnection.close();
}
await this.server.close();
console.log("✅ Cleanup complete");
}
private setupHandlers() {
this.server.setRequestHandler(CallToolSchema, async (request) => {
// Bezpiecznie używaj this.dbConnection
// ...
});
}
}
// Użycie z łagodnym zamknięciem
const server = new ManagedServer();
process.on('SIGINT', async () => {
await server.stop();
process.exit(0);
});
await server.start();
5. Kontrola logowania
MCP obsługuje poziomy logowania po stronie serwera, które klienci mogą kontrolować.
Implementacja poziomów logowania
from mcp.server import Server
from mcp.types import LoggingLevel
import logging
app = Server("logging-server")
# Mapuj poziomy MCP na poziomy logowania Pythona
LEVEL_MAP = {
LoggingLevel.DEBUG: logging.DEBUG,
LoggingLevel.INFO: logging.INFO,
LoggingLevel.WARNING: logging.WARNING,
LoggingLevel.ERROR: logging.ERROR,
}
logger = logging.getLogger("mcp-server")
@app.set_logging_level()
async def set_logging_level(level: LoggingLevel) -> None:
"""Handle client request to change logging level."""
python_level = LEVEL_MAP.get(level, logging.INFO)
logger.setLevel(python_level)
logger.info(f"Logging level set to {level}")
@app.tool()
async def debug_operation(data: str) -> str:
"""Tool with various logging levels."""
logger.debug(f"Processing data: {data}")
try:
result = process(data)
logger.info(f"Successfully processed: {result}")
return result
except Exception as e:
logger.error(f"Processing failed: {e}")
raise
Wysyłanie komunikatów logowania do klienta
@app.tool()
async def complex_operation(input: str, ctx) -> str:
"""Operation that logs to client."""
# Wyślij powiadomienie o logu do klienta
await ctx.send_log(
level="info",
message=f"Starting complex operation with input: {input}"
)
# Wykonaj pracę...
result = await do_work(input)
await ctx.send_log(
level="debug",
message=f"Operation complete, result size: {len(result)}"
)
return result
6. Wzorce obsługi błędów
Spójna obsługa błędów poprawia debugowanie i doświadczenie użytkownika.
Kody błędów MCP
from mcp.types import McpError, ErrorCode
class ToolError(McpError):
"""Base class for tool errors."""
pass
class ValidationError(ToolError):
"""Invalid input parameters."""
def __init__(self, message: str):
super().__init__(ErrorCode.INVALID_PARAMS, message)
class NotFoundError(ToolError):
"""Requested resource not found."""
def __init__(self, resource: str):
super().__init__(ErrorCode.INVALID_REQUEST, f"Not found: {resource}")
class PermissionError(ToolError):
"""Access denied."""
def __init__(self, action: str):
super().__init__(ErrorCode.INVALID_REQUEST, f"Permission denied: {action}")
class InternalError(ToolError):
"""Internal server error."""
def __init__(self, message: str):
super().__init__(ErrorCode.INTERNAL_ERROR, message)
Strukturalne odpowiedzi błędów
@app.tool()
async def safe_operation(input: str) -> str:
"""Tool with comprehensive error handling."""
# Waliduj dane wejściowe
if not input:
raise ValidationError("Input cannot be empty")
if len(input) > 10000:
raise ValidationError(f"Input too large: {len(input)} chars (max 10000)")
try:
# Sprawdź uprawnienia
if not await check_permission(input):
raise PermissionError(f"read {input}")
# Wykonaj operację
result = await perform_operation(input)
if result is None:
raise NotFoundError(input)
return result
except ConnectionError as e:
raise InternalError(f"Database connection failed: {e}")
except TimeoutError as e:
raise InternalError(f"Operation timed out: {e}")
except Exception as e:
# Zaloguj nieoczekiwane błędy
logger.exception(f"Unexpected error in safe_operation")
raise InternalError(f"Unexpected error: {type(e).__name__}")
Obsługa błędów w TypeScript
import { McpError, ErrorCode } from "@modelcontextprotocol/sdk/types.js";
function validateInput(data: unknown): asserts data is ValidInput {
if (typeof data !== "object" || data === null) {
throw new McpError(
ErrorCode.InvalidParams,
"Input must be an object"
);
}
// Więcej walidacji...
}
server.setRequestHandler(CallToolSchema, async (request) => {
try {
validateInput(request.params.arguments);
const result = await performOperation(request.params.arguments);
return {
content: [{ type: "text", text: JSON.stringify(result) }]
};
} catch (error) {
if (error instanceof McpError) {
throw error; // Już błąd MCP
}
// Konwertuj inne błędy
if (error instanceof NotFoundError) {
throw new McpError(ErrorCode.InvalidRequest, error.message);
}
// Nieznany błąd
console.error("Unexpected error:", error);
throw new McpError(
ErrorCode.InternalError,
"An unexpected error occurred"
);
}
});
Funkcje eksperymentalne (MCP 2025-11-25)
Te funkcje są oznaczone jako eksperymentalne w specyfikacji:
Zadania (operacje długotrwałe)
# Zadania umożliwiają śledzenie długotrwałych operacji ze stanem
@app.task()
async def training_task(model_id: str, data_path: str, ctx) -> str:
"""Long-running ML training task."""
# Zgłoś rozpoczęcie zadania
await ctx.report_status("running", "Initializing training...")
# Pętla treningowa
for epoch in range(100):
await train_epoch(model_id, data_path, epoch)
await ctx.report_status(
"running",
f"Training epoch {epoch + 1}/100",
progress=epoch + 1,
total=100
)
await ctx.report_status("completed", "Training finished")
return f"Model {model_id} trained successfully"
Adnotacje narzędzi
# Adnotacje dostarczają metadane o zachowaniu narzędzia
@app.tool(
annotations={
"destructive": False, # Nie modyfikuje danych
"idempotent": True, # Bezpieczne do ponowienia
"timeout_seconds": 30, # Oczekiwany maksymalny czas trwania
"requires_approval": False # Nie wymaga zgody użytkownika
}
)
async def safe_query(query: str) -> str:
"""A read-only database query tool."""
return await execute_read_query(query)
Co dalej
Dodatkowe zasoby
Zastrzeżenie:
Niniejszy dokument został przetłumaczony za pomocą usługi tłumaczenia AI Co-op Translator. Mimo że dokładamy starań, aby tłumaczenie było jak najbardziej precyzyjne, prosimy mieć na uwadze, że automatyczne tłumaczenia mogą zawierać błędy lub nieścisłości. Oryginalny dokument w języku źródłowym należy traktować jako źródło ostateczne. W przypadku informacji krytycznych zalecane jest skorzystanie z profesjonalnego, ludzkiego tłumaczenia. Nie ponosimy odpowiedzialności za jakiekolwiek nieporozumienia lub błędne interpretacje wynikające z użycia tego tłumaczenia.