Skip to content

context

TaskContext - Pure task state management.

This module provides TaskContext, which manages task state without any server/session dependencies. It can be used standalone for distributed workers or wrapped by ServerTaskContext for full server integration.

TaskContext

Pure task state management - no session dependencies.

This class handles: - Task state (status, result) - Cancellation tracking - Store interactions

For server-integrated features (elicit, create_message, notifications), use ServerTaskContext from mcp.server.experimental.

Example (distributed worker): async def worker_job(task_id: str): store = RedisTaskStore(redis_url) task = await store.get_task(task_id) ctx = TaskContext(task=task, store=store)

    await ctx.update_status("Working...")
    result = await do_work()
    await ctx.complete(result)
Source code in src/mcp/shared/experimental/tasks/context.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
class TaskContext:
    """Pure task state management - no session dependencies.

    This class handles:
    - Task state (status, result)
    - Cancellation tracking
    - Store interactions

    For server-integrated features (elicit, create_message, notifications),
    use ServerTaskContext from mcp.server.experimental.

    Example (distributed worker):
        async def worker_job(task_id: str):
            store = RedisTaskStore(redis_url)
            task = await store.get_task(task_id)
            ctx = TaskContext(task=task, store=store)

            await ctx.update_status("Working...")
            result = await do_work()
            await ctx.complete(result)
    """

    def __init__(self, task: Task, store: TaskStore):
        self._task = task
        self._store = store
        self._cancelled = False

    @property
    def task_id(self) -> str:
        """The task identifier."""
        return self._task.task_id

    @property
    def task(self) -> Task:
        """The current task state."""
        return self._task

    @property
    def is_cancelled(self) -> bool:
        """Whether cancellation has been requested."""
        return self._cancelled

    def request_cancellation(self) -> None:
        """Request cancellation of this task.

        This sets is_cancelled=True. Task work should check this
        periodically and exit gracefully if set.
        """
        self._cancelled = True

    async def update_status(self, message: str) -> None:
        """Update the task's status message.

        Args:
            message: The new status message
        """
        self._task = await self._store.update_task(
            self.task_id,
            status_message=message,
        )

    async def complete(self, result: Result) -> None:
        """Mark the task as completed with the given result.

        Args:
            result: The task result
        """
        await self._store.store_result(self.task_id, result)
        self._task = await self._store.update_task(
            self.task_id,
            status=TASK_STATUS_COMPLETED,
        )

    async def fail(self, error: str) -> None:
        """Mark the task as failed with an error message.

        Args:
            error: The error message
        """
        self._task = await self._store.update_task(
            self.task_id,
            status=TASK_STATUS_FAILED,
            status_message=error,
        )

task_id property

task_id: str

The task identifier.

task property

task: Task

The current task state.

is_cancelled property

is_cancelled: bool

Whether cancellation has been requested.

request_cancellation

request_cancellation() -> None

Request cancellation of this task.

This sets is_cancelled=True. Task work should check this periodically and exit gracefully if set.

Source code in src/mcp/shared/experimental/tasks/context.py
54
55
56
57
58
59
60
def request_cancellation(self) -> None:
    """Request cancellation of this task.

    This sets is_cancelled=True. Task work should check this
    periodically and exit gracefully if set.
    """
    self._cancelled = True

update_status async

update_status(message: str) -> None

Update the task's status message.

Parameters:

Name Type Description Default
message str

The new status message

required
Source code in src/mcp/shared/experimental/tasks/context.py
62
63
64
65
66
67
68
69
70
71
async def update_status(self, message: str) -> None:
    """Update the task's status message.

    Args:
        message: The new status message
    """
    self._task = await self._store.update_task(
        self.task_id,
        status_message=message,
    )

complete async

complete(result: Result) -> None

Mark the task as completed with the given result.

Parameters:

Name Type Description Default
result Result

The task result

required
Source code in src/mcp/shared/experimental/tasks/context.py
73
74
75
76
77
78
79
80
81
82
83
async def complete(self, result: Result) -> None:
    """Mark the task as completed with the given result.

    Args:
        result: The task result
    """
    await self._store.store_result(self.task_id, result)
    self._task = await self._store.update_task(
        self.task_id,
        status=TASK_STATUS_COMPLETED,
    )

fail async

fail(error: str) -> None

Mark the task as failed with an error message.

Parameters:

Name Type Description Default
error str

The error message

required
Source code in src/mcp/shared/experimental/tasks/context.py
85
86
87
88
89
90
91
92
93
94
95
async def fail(self, error: str) -> None:
    """Mark the task as failed with an error message.

    Args:
        error: The error message
    """
    self._task = await self._store.update_task(
        self.task_id,
        status=TASK_STATUS_FAILED,
        status_message=error,
    )