Skip to content

message_queue

TaskMessageQueue - FIFO queue for task-related messages.

This implements the core message queue pattern from the MCP Tasks spec. When a handler needs to send a request (like elicitation) during a task-augmented request, the message is enqueued instead of sent directly. Messages are delivered to the client only through the tasks/result endpoint.

This pattern enables: 1. Decoupling request handling from message delivery 2. Proper bidirectional communication via the tasks/result stream 3. Automatic status management (working <-> input_required)

QueuedMessage dataclass

A message queued for delivery via tasks/result.

Messages are stored with their type and a resolver for requests that expect responses.

Source code in src/mcp/shared/experimental/tasks/message_queue.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
@dataclass
class QueuedMessage:
    """A message queued for delivery via tasks/result.

    Messages are stored with their type and a resolver for requests
    that expect responses.
    """

    type: Literal["request", "notification"]
    """Whether this is a request (expects response) or notification (one-way)."""

    message: JSONRPCRequest | JSONRPCNotification
    """The JSON-RPC message to send."""

    timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
    """When the message was enqueued."""

    resolver: Resolver[dict[str, Any]] | None = None
    """Resolver to set when response arrives (only for requests)."""

    original_request_id: RequestId | None = None
    """The original request ID used internally, for routing responses back."""

type instance-attribute

type: Literal['request', 'notification']

Whether this is a request (expects response) or notification (one-way).

message instance-attribute

The JSON-RPC message to send.

timestamp class-attribute instance-attribute

timestamp: datetime = field(
    default_factory=lambda: now(utc)
)

When the message was enqueued.

resolver class-attribute instance-attribute

resolver: Resolver[dict[str, Any]] | None = None

Resolver to set when response arrives (only for requests).

original_request_id class-attribute instance-attribute

original_request_id: RequestId | None = None

The original request ID used internally, for routing responses back.

TaskMessageQueue

Bases: ABC

Abstract interface for task message queuing.

This is a FIFO queue that stores messages to be delivered via tasks/result. When a task-augmented handler calls elicit() or sends a notification, the message is enqueued here instead of being sent directly to the client.

The tasks/result handler then dequeues and sends these messages through the transport, with relatedRequestId set to the tasks/result request ID so responses are routed correctly.

Implementations can use in-memory storage, Redis, etc.

Source code in src/mcp/shared/experimental/tasks/message_queue.py
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
class TaskMessageQueue(ABC):
    """Abstract interface for task message queuing.

    This is a FIFO queue that stores messages to be delivered via `tasks/result`.
    When a task-augmented handler calls elicit() or sends a notification, the
    message is enqueued here instead of being sent directly to the client.

    The `tasks/result` handler then dequeues and sends these messages through
    the transport, with `relatedRequestId` set to the tasks/result request ID
    so responses are routed correctly.

    Implementations can use in-memory storage, Redis, etc.
    """

    @abstractmethod
    async def enqueue(self, task_id: str, message: QueuedMessage) -> None:
        """Add a message to the queue for a task.

        Args:
            task_id: The task identifier
            message: The message to enqueue
        """

    @abstractmethod
    async def dequeue(self, task_id: str) -> QueuedMessage | None:
        """Remove and return the next message from the queue.

        Args:
            task_id: The task identifier

        Returns:
            The next message, or None if queue is empty
        """

    @abstractmethod
    async def peek(self, task_id: str) -> QueuedMessage | None:
        """Return the next message without removing it.

        Args:
            task_id: The task identifier

        Returns:
            The next message, or None if queue is empty
        """

    @abstractmethod
    async def is_empty(self, task_id: str) -> bool:
        """Check if the queue is empty for a task.

        Args:
            task_id: The task identifier

        Returns:
            True if no messages are queued
        """

    @abstractmethod
    async def clear(self, task_id: str) -> list[QueuedMessage]:
        """Remove and return all messages from the queue.

        This is useful for cleanup when a task is cancelled or completed.

        Args:
            task_id: The task identifier

        Returns:
            All queued messages (may be empty)
        """

    @abstractmethod
    async def wait_for_message(self, task_id: str) -> None:
        """Wait until a message is available in the queue.

        This blocks until either:
        1. A message is enqueued for this task
        2. The wait is cancelled

        Args:
            task_id: The task identifier
        """

    @abstractmethod
    async def notify_message_available(self, task_id: str) -> None:
        """Signal that a message is available for a task.

        This wakes up any coroutines waiting in wait_for_message().

        Args:
            task_id: The task identifier
        """

enqueue abstractmethod async

enqueue(task_id: str, message: QueuedMessage) -> None

Add a message to the queue for a task.

Parameters:

Name Type Description Default
task_id str

The task identifier

required
message QueuedMessage

The message to enqueue

required
Source code in src/mcp/shared/experimental/tasks/message_queue.py
64
65
66
67
68
69
70
71
@abstractmethod
async def enqueue(self, task_id: str, message: QueuedMessage) -> None:
    """Add a message to the queue for a task.

    Args:
        task_id: The task identifier
        message: The message to enqueue
    """

dequeue abstractmethod async

dequeue(task_id: str) -> QueuedMessage | None

Remove and return the next message from the queue.

Parameters:

Name Type Description Default
task_id str

The task identifier

required

Returns:

Type Description
QueuedMessage | None

The next message, or None if queue is empty

Source code in src/mcp/shared/experimental/tasks/message_queue.py
73
74
75
76
77
78
79
80
81
82
@abstractmethod
async def dequeue(self, task_id: str) -> QueuedMessage | None:
    """Remove and return the next message from the queue.

    Args:
        task_id: The task identifier

    Returns:
        The next message, or None if queue is empty
    """

peek abstractmethod async

peek(task_id: str) -> QueuedMessage | None

Return the next message without removing it.

Parameters:

Name Type Description Default
task_id str

The task identifier

required

Returns:

Type Description
QueuedMessage | None

The next message, or None if queue is empty

Source code in src/mcp/shared/experimental/tasks/message_queue.py
84
85
86
87
88
89
90
91
92
93
@abstractmethod
async def peek(self, task_id: str) -> QueuedMessage | None:
    """Return the next message without removing it.

    Args:
        task_id: The task identifier

    Returns:
        The next message, or None if queue is empty
    """

is_empty abstractmethod async

is_empty(task_id: str) -> bool

Check if the queue is empty for a task.

Parameters:

Name Type Description Default
task_id str

The task identifier

required

Returns:

Type Description
bool

True if no messages are queued

Source code in src/mcp/shared/experimental/tasks/message_queue.py
 95
 96
 97
 98
 99
100
101
102
103
104
@abstractmethod
async def is_empty(self, task_id: str) -> bool:
    """Check if the queue is empty for a task.

    Args:
        task_id: The task identifier

    Returns:
        True if no messages are queued
    """

clear abstractmethod async

clear(task_id: str) -> list[QueuedMessage]

Remove and return all messages from the queue.

This is useful for cleanup when a task is cancelled or completed.

Parameters:

Name Type Description Default
task_id str

The task identifier

required

Returns:

Type Description
list[QueuedMessage]

All queued messages (may be empty)

Source code in src/mcp/shared/experimental/tasks/message_queue.py
106
107
108
109
110
111
112
113
114
115
116
117
@abstractmethod
async def clear(self, task_id: str) -> list[QueuedMessage]:
    """Remove and return all messages from the queue.

    This is useful for cleanup when a task is cancelled or completed.

    Args:
        task_id: The task identifier

    Returns:
        All queued messages (may be empty)
    """

wait_for_message abstractmethod async

wait_for_message(task_id: str) -> None

Wait until a message is available in the queue.

This blocks until either: 1. A message is enqueued for this task 2. The wait is cancelled

Parameters:

Name Type Description Default
task_id str

The task identifier

required
Source code in src/mcp/shared/experimental/tasks/message_queue.py
119
120
121
122
123
124
125
126
127
128
129
@abstractmethod
async def wait_for_message(self, task_id: str) -> None:
    """Wait until a message is available in the queue.

    This blocks until either:
    1. A message is enqueued for this task
    2. The wait is cancelled

    Args:
        task_id: The task identifier
    """

notify_message_available abstractmethod async

notify_message_available(task_id: str) -> None

Signal that a message is available for a task.

This wakes up any coroutines waiting in wait_for_message().

Parameters:

Name Type Description Default
task_id str

The task identifier

required
Source code in src/mcp/shared/experimental/tasks/message_queue.py
131
132
133
134
135
136
137
138
139
@abstractmethod
async def notify_message_available(self, task_id: str) -> None:
    """Signal that a message is available for a task.

    This wakes up any coroutines waiting in wait_for_message().

    Args:
        task_id: The task identifier
    """

InMemoryTaskMessageQueue

Bases: TaskMessageQueue

In-memory implementation of TaskMessageQueue.

This is suitable for single-process servers. For distributed systems, implement TaskMessageQueue with Redis, RabbitMQ, etc.

Features: - FIFO ordering per task - Async wait for message availability - Thread-safe for single-process async use

Source code in src/mcp/shared/experimental/tasks/message_queue.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
class InMemoryTaskMessageQueue(TaskMessageQueue):
    """In-memory implementation of TaskMessageQueue.

    This is suitable for single-process servers. For distributed systems,
    implement TaskMessageQueue with Redis, RabbitMQ, etc.

    Features:
    - FIFO ordering per task
    - Async wait for message availability
    - Thread-safe for single-process async use
    """

    def __init__(self) -> None:
        self._queues: dict[str, deque[QueuedMessage]] = {}
        self._events: dict[str, anyio.Event] = {}

    def _get_queue(self, task_id: str) -> deque[QueuedMessage]:
        """Get or create the queue for a task."""
        if task_id not in self._queues:
            self._queues[task_id] = deque()
        return self._queues[task_id]

    async def enqueue(self, task_id: str, message: QueuedMessage) -> None:
        """Add a message to the queue."""
        queue = self._get_queue(task_id)
        queue.append(message)
        # Signal that a message is available
        await self.notify_message_available(task_id)

    async def dequeue(self, task_id: str) -> QueuedMessage | None:
        """Remove and return the next message."""
        queue = self._get_queue(task_id)
        if not queue:
            return None
        return queue.popleft()

    async def peek(self, task_id: str) -> QueuedMessage | None:
        """Return the next message without removing it."""
        queue = self._get_queue(task_id)
        if not queue:
            return None
        return queue[0]

    async def is_empty(self, task_id: str) -> bool:
        """Check if the queue is empty."""
        queue = self._get_queue(task_id)
        return len(queue) == 0

    async def clear(self, task_id: str) -> list[QueuedMessage]:
        """Remove and return all messages."""
        queue = self._get_queue(task_id)
        messages = list(queue)
        queue.clear()
        return messages

    async def wait_for_message(self, task_id: str) -> None:
        """Wait until a message is available."""
        # Check if there are already messages
        if not await self.is_empty(task_id):
            return

        # Create a fresh event for waiting (anyio.Event can't be cleared)
        self._events[task_id] = anyio.Event()
        event = self._events[task_id]

        # Double-check after creating event (avoid race condition)
        if not await self.is_empty(task_id):
            return

        # Wait for a new message
        await event.wait()

    async def notify_message_available(self, task_id: str) -> None:
        """Signal that a message is available."""
        if task_id in self._events:
            self._events[task_id].set()

    def cleanup(self, task_id: str | None = None) -> None:
        """Clean up queues and events.

        Args:
            task_id: If provided, clean up only this task. Otherwise clean up all.
        """
        if task_id is not None:
            self._queues.pop(task_id, None)
            self._events.pop(task_id, None)
        else:
            self._queues.clear()
            self._events.clear()

enqueue async

enqueue(task_id: str, message: QueuedMessage) -> None

Add a message to the queue.

Source code in src/mcp/shared/experimental/tasks/message_queue.py
164
165
166
167
168
169
async def enqueue(self, task_id: str, message: QueuedMessage) -> None:
    """Add a message to the queue."""
    queue = self._get_queue(task_id)
    queue.append(message)
    # Signal that a message is available
    await self.notify_message_available(task_id)

dequeue async

dequeue(task_id: str) -> QueuedMessage | None

Remove and return the next message.

Source code in src/mcp/shared/experimental/tasks/message_queue.py
171
172
173
174
175
176
async def dequeue(self, task_id: str) -> QueuedMessage | None:
    """Remove and return the next message."""
    queue = self._get_queue(task_id)
    if not queue:
        return None
    return queue.popleft()

peek async

peek(task_id: str) -> QueuedMessage | None

Return the next message without removing it.

Source code in src/mcp/shared/experimental/tasks/message_queue.py
178
179
180
181
182
183
async def peek(self, task_id: str) -> QueuedMessage | None:
    """Return the next message without removing it."""
    queue = self._get_queue(task_id)
    if not queue:
        return None
    return queue[0]

is_empty async

is_empty(task_id: str) -> bool

Check if the queue is empty.

Source code in src/mcp/shared/experimental/tasks/message_queue.py
185
186
187
188
async def is_empty(self, task_id: str) -> bool:
    """Check if the queue is empty."""
    queue = self._get_queue(task_id)
    return len(queue) == 0

clear async

clear(task_id: str) -> list[QueuedMessage]

Remove and return all messages.

Source code in src/mcp/shared/experimental/tasks/message_queue.py
190
191
192
193
194
195
async def clear(self, task_id: str) -> list[QueuedMessage]:
    """Remove and return all messages."""
    queue = self._get_queue(task_id)
    messages = list(queue)
    queue.clear()
    return messages

wait_for_message async

wait_for_message(task_id: str) -> None

Wait until a message is available.

Source code in src/mcp/shared/experimental/tasks/message_queue.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
async def wait_for_message(self, task_id: str) -> None:
    """Wait until a message is available."""
    # Check if there are already messages
    if not await self.is_empty(task_id):
        return

    # Create a fresh event for waiting (anyio.Event can't be cleared)
    self._events[task_id] = anyio.Event()
    event = self._events[task_id]

    # Double-check after creating event (avoid race condition)
    if not await self.is_empty(task_id):
        return

    # Wait for a new message
    await event.wait()

notify_message_available async

notify_message_available(task_id: str) -> None

Signal that a message is available.

Source code in src/mcp/shared/experimental/tasks/message_queue.py
214
215
216
217
async def notify_message_available(self, task_id: str) -> None:
    """Signal that a message is available."""
    if task_id in self._events:
        self._events[task_id].set()

cleanup

cleanup(task_id: str | None = None) -> None

Clean up queues and events.

Parameters:

Name Type Description Default
task_id str | None

If provided, clean up only this task. Otherwise clean up all.

None
Source code in src/mcp/shared/experimental/tasks/message_queue.py
219
220
221
222
223
224
225
226
227
228
229
230
def cleanup(self, task_id: str | None = None) -> None:
    """Clean up queues and events.

    Args:
        task_id: If provided, clean up only this task. Otherwise clean up all.
    """
    if task_id is not None:
        self._queues.pop(task_id, None)
        self._events.pop(task_id, None)
    else:
        self._queues.clear()
        self._events.clear()