helpers
Helper functions for pure task management.
These helpers work with pure TaskContext and don't require server dependencies. For server-integrated task helpers, use mcp.server.experimental.
is_terminal
is_terminal(status: TaskStatus) -> bool
Check if a task status represents a terminal state.
Terminal states are those where the task has finished and will not change.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
status
|
TaskStatus
|
The task status to check |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if the status is terminal (completed, failed, or cancelled) |
Source code in src/mcp/shared/experimental/tasks/helpers.py
36 37 38 39 40 41 42 43 44 45 46 47 | |
cancel_task
async
cancel_task(
store: TaskStore, task_id: str
) -> CancelTaskResult
Cancel a task with spec-compliant validation.
Per spec: "Receivers MUST reject cancellation of terminal status tasks with -32602 (Invalid params)"
This helper validates that the task exists and is not in a terminal state before setting it to "cancelled".
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
store
|
TaskStore
|
The task store |
required |
task_id
|
str
|
The task identifier to cancel |
required |
Returns:
| Type | Description |
|---|---|
CancelTaskResult
|
CancelTaskResult with the cancelled task state |
Raises:
| Type | Description |
|---|---|
MCPError
|
With INVALID_PARAMS (-32602) if: - Task does not exist - Task is already in a terminal state (completed, failed, cancelled) |
Example
async def handle_cancel(ctx, params: CancelTaskRequestParams) -> CancelTaskResult:
return await cancel_task(store, params.task_id)
Source code in src/mcp/shared/experimental/tasks/helpers.py
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 | |
generate_task_id
generate_task_id() -> str
Generate a unique task ID.
Source code in src/mcp/shared/experimental/tasks/helpers.py
92 93 94 | |
create_task_state
create_task_state(
metadata: TaskMetadata, task_id: str | None = None
) -> Task
Create a Task object with initial state.
This is a helper for TaskStore implementations.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
metadata
|
TaskMetadata
|
Task metadata |
required |
task_id
|
str | None
|
Optional task ID (generated if not provided) |
None
|
Returns:
| Type | Description |
|---|---|
Task
|
A new Task in "working" status |
Source code in src/mcp/shared/experimental/tasks/helpers.py
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 | |
task_execution
async
task_execution(
task_id: str, store: TaskStore
) -> AsyncIterator[TaskContext]
Context manager for safe task execution (pure, no server dependencies).
Loads a task from the store and provides a TaskContext for the work. If an unhandled exception occurs, the task is automatically marked as failed and the exception is suppressed (since the failure is captured in task state).
This is useful for distributed workers that don't have a server session.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task_id
|
str
|
The task identifier to execute |
required |
store
|
TaskStore
|
The task store (must be accessible by the worker) |
required |
Yields:
| Type | Description |
|---|---|
AsyncIterator[TaskContext]
|
TaskContext for updating status and completing/failing the task |
Raises:
| Type | Description |
|---|---|
ValueError
|
If the task is not found in the store |
Example (distributed worker): async def worker_process(task_id: str): store = RedisTaskStore(redis_url) async with task_execution(task_id, store) as ctx: await ctx.update_status("Working...") result = await do_work() await ctx.complete(result)
Source code in src/mcp/shared/experimental/tasks/helpers.py
123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 | |