Skip to content

store

TaskStore - Abstract interface for task state storage.

TaskStore

Bases: ABC

Abstract interface for task state storage.

This is a pure storage interface - it doesn't manage execution. Implementations can use in-memory storage, databases, Redis, etc.

All methods are async to support various backends.

Source code in src/mcp/shared/experimental/tasks/store.py
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
class TaskStore(ABC):
    """Abstract interface for task state storage.

    This is a pure storage interface - it doesn't manage execution.
    Implementations can use in-memory storage, databases, Redis, etc.

    All methods are async to support various backends.
    """

    @abstractmethod
    async def create_task(
        self,
        metadata: TaskMetadata,
        task_id: str | None = None,
    ) -> Task:
        """Create a new task.

        Args:
            metadata: Task metadata (ttl, etc.)
            task_id: Optional task ID. If None, implementation should generate one.

        Returns:
            The created Task with status="working"

        Raises:
            ValueError: If task_id already exists
        """

    @abstractmethod
    async def get_task(self, task_id: str) -> Task | None:
        """Get a task by ID.

        Args:
            task_id: The task identifier

        Returns:
            The Task, or None if not found
        """

    @abstractmethod
    async def update_task(
        self,
        task_id: str,
        status: TaskStatus | None = None,
        status_message: str | None = None,
    ) -> Task:
        """Update a task's status and/or message.

        Args:
            task_id: The task identifier
            status: New status (if changing)
            status_message: New status message (if changing)

        Returns:
            The updated Task

        Raises:
            ValueError: If task not found
            ValueError: If attempting to transition from a terminal status
                (completed, failed, cancelled). Per spec, terminal states
                MUST NOT transition to any other status.
        """

    @abstractmethod
    async def store_result(self, task_id: str, result: Result) -> None:
        """Store the result for a task.

        Args:
            task_id: The task identifier
            result: The result to store

        Raises:
            ValueError: If task not found
        """

    @abstractmethod
    async def get_result(self, task_id: str) -> Result | None:
        """Get the stored result for a task.

        Args:
            task_id: The task identifier

        Returns:
            The stored Result, or None if not available
        """

    @abstractmethod
    async def list_tasks(
        self,
        cursor: str | None = None,
    ) -> tuple[list[Task], str | None]:
        """List tasks with pagination.

        Args:
            cursor: Optional cursor for pagination

        Returns:
            Tuple of (tasks, next_cursor). next_cursor is None if no more pages.
        """

    @abstractmethod
    async def delete_task(self, task_id: str) -> bool:
        """Delete a task.

        Args:
            task_id: The task identifier

        Returns:
            True if deleted, False if not found
        """

    @abstractmethod
    async def wait_for_update(self, task_id: str) -> None:
        """Wait until the task status changes.

        This blocks until either:
        1. The task status changes
        2. The wait is cancelled

        Used by tasks/result to wait for task completion or status changes.

        Args:
            task_id: The task identifier

        Raises:
            ValueError: If task not found
        """

    @abstractmethod
    async def notify_update(self, task_id: str) -> None:
        """Signal that a task has been updated.

        This wakes up any coroutines waiting in wait_for_update().

        Args:
            task_id: The task identifier
        """

create_task abstractmethod async

create_task(
    metadata: TaskMetadata, task_id: str | None = None
) -> Task

Create a new task.

Parameters:

Name Type Description Default
metadata TaskMetadata

Task metadata (ttl, etc.)

required
task_id str | None

Optional task ID. If None, implementation should generate one.

None

Returns:

Type Description
Task

The created Task with status="working"

Raises:

Type Description
ValueError

If task_id already exists

Source code in src/mcp/shared/experimental/tasks/store.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@abstractmethod
async def create_task(
    self,
    metadata: TaskMetadata,
    task_id: str | None = None,
) -> Task:
    """Create a new task.

    Args:
        metadata: Task metadata (ttl, etc.)
        task_id: Optional task ID. If None, implementation should generate one.

    Returns:
        The created Task with status="working"

    Raises:
        ValueError: If task_id already exists
    """

get_task abstractmethod async

get_task(task_id: str) -> Task | None

Get a task by ID.

Parameters:

Name Type Description Default
task_id str

The task identifier

required

Returns:

Type Description
Task | None

The Task, or None if not found

Source code in src/mcp/shared/experimental/tasks/store.py
36
37
38
39
40
41
42
43
44
45
@abstractmethod
async def get_task(self, task_id: str) -> Task | None:
    """Get a task by ID.

    Args:
        task_id: The task identifier

    Returns:
        The Task, or None if not found
    """

update_task abstractmethod async

update_task(
    task_id: str,
    status: TaskStatus | None = None,
    status_message: str | None = None,
) -> Task

Update a task's status and/or message.

Parameters:

Name Type Description Default
task_id str

The task identifier

required
status TaskStatus | None

New status (if changing)

None
status_message str | None

New status message (if changing)

None

Returns:

Type Description
Task

The updated Task

Raises:

Type Description
ValueError

If task not found

ValueError

If attempting to transition from a terminal status (completed, failed, cancelled). Per spec, terminal states MUST NOT transition to any other status.

Source code in src/mcp/shared/experimental/tasks/store.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
@abstractmethod
async def update_task(
    self,
    task_id: str,
    status: TaskStatus | None = None,
    status_message: str | None = None,
) -> Task:
    """Update a task's status and/or message.

    Args:
        task_id: The task identifier
        status: New status (if changing)
        status_message: New status message (if changing)

    Returns:
        The updated Task

    Raises:
        ValueError: If task not found
        ValueError: If attempting to transition from a terminal status
            (completed, failed, cancelled). Per spec, terminal states
            MUST NOT transition to any other status.
    """

store_result abstractmethod async

store_result(task_id: str, result: Result) -> None

Store the result for a task.

Parameters:

Name Type Description Default
task_id str

The task identifier

required
result Result

The result to store

required

Raises:

Type Description
ValueError

If task not found

Source code in src/mcp/shared/experimental/tasks/store.py
71
72
73
74
75
76
77
78
79
80
81
@abstractmethod
async def store_result(self, task_id: str, result: Result) -> None:
    """Store the result for a task.

    Args:
        task_id: The task identifier
        result: The result to store

    Raises:
        ValueError: If task not found
    """

get_result abstractmethod async

get_result(task_id: str) -> Result | None

Get the stored result for a task.

Parameters:

Name Type Description Default
task_id str

The task identifier

required

Returns:

Type Description
Result | None

The stored Result, or None if not available

Source code in src/mcp/shared/experimental/tasks/store.py
83
84
85
86
87
88
89
90
91
92
@abstractmethod
async def get_result(self, task_id: str) -> Result | None:
    """Get the stored result for a task.

    Args:
        task_id: The task identifier

    Returns:
        The stored Result, or None if not available
    """

list_tasks abstractmethod async

list_tasks(
    cursor: str | None = None,
) -> tuple[list[Task], str | None]

List tasks with pagination.

Parameters:

Name Type Description Default
cursor str | None

Optional cursor for pagination

None

Returns:

Type Description
tuple[list[Task], str | None]

Tuple of (tasks, next_cursor). next_cursor is None if no more pages.

Source code in src/mcp/shared/experimental/tasks/store.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
@abstractmethod
async def list_tasks(
    self,
    cursor: str | None = None,
) -> tuple[list[Task], str | None]:
    """List tasks with pagination.

    Args:
        cursor: Optional cursor for pagination

    Returns:
        Tuple of (tasks, next_cursor). next_cursor is None if no more pages.
    """

delete_task abstractmethod async

delete_task(task_id: str) -> bool

Delete a task.

Parameters:

Name Type Description Default
task_id str

The task identifier

required

Returns:

Type Description
bool

True if deleted, False if not found

Source code in src/mcp/shared/experimental/tasks/store.py
108
109
110
111
112
113
114
115
116
117
@abstractmethod
async def delete_task(self, task_id: str) -> bool:
    """Delete a task.

    Args:
        task_id: The task identifier

    Returns:
        True if deleted, False if not found
    """

wait_for_update abstractmethod async

wait_for_update(task_id: str) -> None

Wait until the task status changes.

This blocks until either: 1. The task status changes 2. The wait is cancelled

Used by tasks/result to wait for task completion or status changes.

Parameters:

Name Type Description Default
task_id str

The task identifier

required

Raises:

Type Description
ValueError

If task not found

Source code in src/mcp/shared/experimental/tasks/store.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
@abstractmethod
async def wait_for_update(self, task_id: str) -> None:
    """Wait until the task status changes.

    This blocks until either:
    1. The task status changes
    2. The wait is cancelled

    Used by tasks/result to wait for task completion or status changes.

    Args:
        task_id: The task identifier

    Raises:
        ValueError: If task not found
    """

notify_update abstractmethod async

notify_update(task_id: str) -> None

Signal that a task has been updated.

This wakes up any coroutines waiting in wait_for_update().

Parameters:

Name Type Description Default
task_id str

The task identifier

required
Source code in src/mcp/shared/experimental/tasks/store.py
136
137
138
139
140
141
142
143
144
@abstractmethod
async def notify_update(self, task_id: str) -> None:
    """Signal that a task has been updated.

    This wakes up any coroutines waiting in wait_for_update().

    Args:
        task_id: The task identifier
    """