Skip to content

helpers

Helper functions for pure task management.

These helpers work with pure TaskContext and don't require server dependencies. For server-integrated task helpers, use mcp.server.experimental.

is_terminal

is_terminal(status: TaskStatus) -> bool

Check if a task status represents a terminal state.

Terminal states are those where the task has finished and will not change.

Parameters:

Name Type Description Default
status TaskStatus

The task status to check

required

Returns:

Type Description
bool

True if the status is terminal (completed, failed, or cancelled)

Source code in src/mcp/shared/experimental/tasks/helpers.py
36
37
38
39
40
41
42
43
44
45
46
47
def is_terminal(status: TaskStatus) -> bool:
    """Check if a task status represents a terminal state.

    Terminal states are those where the task has finished and will not change.

    Args:
        status: The task status to check

    Returns:
        True if the status is terminal (completed, failed, or cancelled)
    """
    return status in (TASK_STATUS_COMPLETED, TASK_STATUS_FAILED, TASK_STATUS_CANCELLED)

cancel_task async

cancel_task(
    store: TaskStore, task_id: str
) -> CancelTaskResult

Cancel a task with spec-compliant validation.

Per spec: "Receivers MUST reject cancellation of terminal status tasks with -32602 (Invalid params)"

This helper validates that the task exists and is not in a terminal state before setting it to "cancelled".

Parameters:

Name Type Description Default
store TaskStore

The task store

required
task_id str

The task identifier to cancel

required

Returns:

Type Description
CancelTaskResult

CancelTaskResult with the cancelled task state

Raises:

Type Description
MCPError

With INVALID_PARAMS (-32602) if: - Task does not exist - Task is already in a terminal state (completed, failed, cancelled)

Example
async def handle_cancel(ctx, params: CancelTaskRequestParams) -> CancelTaskResult:
    return await cancel_task(store, params.task_id)
Source code in src/mcp/shared/experimental/tasks/helpers.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
async def cancel_task(
    store: TaskStore,
    task_id: str,
) -> CancelTaskResult:
    """Cancel a task with spec-compliant validation.

    Per spec: "Receivers MUST reject cancellation of terminal status tasks
    with -32602 (Invalid params)"

    This helper validates that the task exists and is not in a terminal state
    before setting it to "cancelled".

    Args:
        store: The task store
        task_id: The task identifier to cancel

    Returns:
        CancelTaskResult with the cancelled task state

    Raises:
        MCPError: With INVALID_PARAMS (-32602) if:
            - Task does not exist
            - Task is already in a terminal state (completed, failed, cancelled)

    Example:
        ```python
        async def handle_cancel(ctx, params: CancelTaskRequestParams) -> CancelTaskResult:
            return await cancel_task(store, params.task_id)
        ```
    """
    task = await store.get_task(task_id)
    if task is None:
        raise MCPError(code=INVALID_PARAMS, message=f"Task not found: {task_id}")

    if is_terminal(task.status):
        raise MCPError(code=INVALID_PARAMS, message=f"Cannot cancel task in terminal state '{task.status}'")

    # Update task to cancelled status
    cancelled_task = await store.update_task(task_id, status=TASK_STATUS_CANCELLED)
    return CancelTaskResult(**cancelled_task.model_dump())

generate_task_id

generate_task_id() -> str

Generate a unique task ID.

Source code in src/mcp/shared/experimental/tasks/helpers.py
92
93
94
def generate_task_id() -> str:
    """Generate a unique task ID."""
    return str(uuid4())

create_task_state

create_task_state(
    metadata: TaskMetadata, task_id: str | None = None
) -> Task

Create a Task object with initial state.

This is a helper for TaskStore implementations.

Parameters:

Name Type Description Default
metadata TaskMetadata

Task metadata

required
task_id str | None

Optional task ID (generated if not provided)

None

Returns:

Type Description
Task

A new Task in "working" status

Source code in src/mcp/shared/experimental/tasks/helpers.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def create_task_state(
    metadata: TaskMetadata,
    task_id: str | None = None,
) -> Task:
    """Create a Task object with initial state.

    This is a helper for TaskStore implementations.

    Args:
        metadata: Task metadata
        task_id: Optional task ID (generated if not provided)

    Returns:
        A new Task in "working" status
    """
    now = datetime.now(timezone.utc)
    return Task(
        task_id=task_id or generate_task_id(),
        status=TASK_STATUS_WORKING,
        created_at=now,
        last_updated_at=now,
        ttl=metadata.ttl,
        poll_interval=500,  # Default 500ms poll interval
    )

task_execution async

task_execution(
    task_id: str, store: TaskStore
) -> AsyncIterator[TaskContext]

Context manager for safe task execution (pure, no server dependencies).

Loads a task from the store and provides a TaskContext for the work. If an unhandled exception occurs, the task is automatically marked as failed and the exception is suppressed (since the failure is captured in task state).

This is useful for distributed workers that don't have a server session.

Parameters:

Name Type Description Default
task_id str

The task identifier to execute

required
store TaskStore

The task store (must be accessible by the worker)

required

Yields:

Type Description
AsyncIterator[TaskContext]

TaskContext for updating status and completing/failing the task

Raises:

Type Description
ValueError

If the task is not found in the store

Example (distributed worker): async def worker_process(task_id: str): store = RedisTaskStore(redis_url) async with task_execution(task_id, store) as ctx: await ctx.update_status("Working...") result = await do_work() await ctx.complete(result)

Source code in src/mcp/shared/experimental/tasks/helpers.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
@asynccontextmanager
async def task_execution(
    task_id: str,
    store: TaskStore,
) -> AsyncIterator[TaskContext]:
    """Context manager for safe task execution (pure, no server dependencies).

    Loads a task from the store and provides a TaskContext for the work.
    If an unhandled exception occurs, the task is automatically marked as failed
    and the exception is suppressed (since the failure is captured in task state).

    This is useful for distributed workers that don't have a server session.

    Args:
        task_id: The task identifier to execute
        store: The task store (must be accessible by the worker)

    Yields:
        TaskContext for updating status and completing/failing the task

    Raises:
        ValueError: If the task is not found in the store

    Example (distributed worker):
        async def worker_process(task_id: str):
            store = RedisTaskStore(redis_url)
            async with task_execution(task_id, store) as ctx:
                await ctx.update_status("Working...")
                result = await do_work()
                await ctx.complete(result)
    """
    task = await store.get_task(task_id)
    if task is None:
        raise ValueError(f"Task {task_id} not found")

    ctx = TaskContext(task, store)
    try:
        yield ctx
    except Exception as e:
        # Auto-fail the task if an exception occurs and task isn't already terminal
        # Exception is suppressed since failure is captured in task state
        if not is_terminal(ctx.task.status):
            await ctx.fail(str(e))