Skip to content

session

ProgressFnT

Bases: Protocol

Protocol for progress notification callbacks.

Source code in src/mcp/shared/session.py
52
53
54
55
56
57
class ProgressFnT(Protocol):
    """Protocol for progress notification callbacks."""

    async def __call__(
        self, progress: float, total: float | None, message: str | None
    ) -> None: ...  # pragma: no branch

RequestResponder

Bases: Generic[ReceiveRequestT, SendResultT]

Handles responding to MCP requests and manages request lifecycle.

This class MUST be used as a context manager to ensure proper cleanup and cancellation handling:

Example
with request_responder as resp:
    await resp.respond(result)

The context manager ensures: 1. Proper cancellation scope setup and cleanup 2. Request completion tracking 3. Cleanup of in-flight requests

Source code in src/mcp/shared/session.py
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
class RequestResponder(Generic[ReceiveRequestT, SendResultT]):
    """Handles responding to MCP requests and manages request lifecycle.

    This class MUST be used as a context manager to ensure proper cleanup and
    cancellation handling:

    Example:
        ```python
        with request_responder as resp:
            await resp.respond(result)
        ```

    The context manager ensures:
    1. Proper cancellation scope setup and cleanup
    2. Request completion tracking
    3. Cleanup of in-flight requests
    """

    def __init__(
        self,
        request_id: RequestId,
        request_meta: RequestParamsMeta | None,
        request: ReceiveRequestT,
        session: BaseSession[SendRequestT, SendNotificationT, SendResultT, ReceiveRequestT, ReceiveNotificationT],
        on_complete: Callable[[RequestResponder[ReceiveRequestT, SendResultT]], Any],
        message_metadata: MessageMetadata = None,
        context: contextvars.Context | None = None,
    ) -> None:
        self.request_id = request_id
        self.request_meta = request_meta
        self.request = request
        self.message_metadata = message_metadata
        self.context = context
        self._session = session
        self._completed = False
        self._cancel_scope = anyio.CancelScope()
        self._on_complete = on_complete
        self._entered = False  # Track if we're in a context manager

    def __enter__(self) -> RequestResponder[ReceiveRequestT, SendResultT]:
        """Enter the context manager, enabling request cancellation tracking."""
        self._entered = True
        self._cancel_scope = anyio.CancelScope()
        self._cancel_scope.__enter__()
        return self

    def __exit__(
        self,
        exc_type: type[BaseException] | None,
        exc_val: BaseException | None,
        exc_tb: TracebackType | None,
    ) -> None:
        """Exit the context manager, performing cleanup and notifying completion."""
        try:
            if self._completed:
                self._on_complete(self)
        finally:
            self._entered = False
            if not self._cancel_scope:  # pragma: no cover
                raise RuntimeError("No active cancel scope")
            self._cancel_scope.__exit__(exc_type, exc_val, exc_tb)

    async def respond(self, response: SendResultT | ErrorData) -> None:
        """Send a response for this request.

        Must be called within a context manager block.

        Raises:
            RuntimeError: If not used within a context manager
            AssertionError: If request was already responded to
        """
        if not self._entered:  # pragma: no cover
            raise RuntimeError("RequestResponder must be used as a context manager")
        assert not self._completed, "Request already responded to"

        if not self.cancelled:  # pragma: no branch
            self._completed = True

            await self._session._send_response(  # type: ignore[reportPrivateUsage]
                request_id=self.request_id, response=response
            )

    async def cancel(self) -> None:
        """Cancel this request and mark it as completed."""
        if not self._entered:  # pragma: no cover
            raise RuntimeError("RequestResponder must be used as a context manager")
        if not self._cancel_scope:  # pragma: no cover
            raise RuntimeError("No active cancel scope")

        self._cancel_scope.cancel()
        self._completed = True  # Mark as completed so it's removed from in_flight
        # Send an error response to indicate cancellation
        await self._session._send_response(  # type: ignore[reportPrivateUsage]
            request_id=self.request_id,
            response=ErrorData(code=0, message="Request cancelled"),
        )

    @property
    def in_flight(self) -> bool:  # pragma: no cover
        return not self._completed and not self.cancelled

    @property
    def cancelled(self) -> bool:
        return self._cancel_scope.cancel_called

__enter__

__enter__() -> (
    RequestResponder[ReceiveRequestT, SendResultT]
)

Enter the context manager, enabling request cancellation tracking.

Source code in src/mcp/shared/session.py
 99
100
101
102
103
104
def __enter__(self) -> RequestResponder[ReceiveRequestT, SendResultT]:
    """Enter the context manager, enabling request cancellation tracking."""
    self._entered = True
    self._cancel_scope = anyio.CancelScope()
    self._cancel_scope.__enter__()
    return self

__exit__

__exit__(
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None

Exit the context manager, performing cleanup and notifying completion.

Source code in src/mcp/shared/session.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def __exit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None:
    """Exit the context manager, performing cleanup and notifying completion."""
    try:
        if self._completed:
            self._on_complete(self)
    finally:
        self._entered = False
        if not self._cancel_scope:  # pragma: no cover
            raise RuntimeError("No active cancel scope")
        self._cancel_scope.__exit__(exc_type, exc_val, exc_tb)

respond async

respond(response: SendResultT | ErrorData) -> None

Send a response for this request.

Must be called within a context manager block.

Raises:

Type Description
RuntimeError

If not used within a context manager

AssertionError

If request was already responded to

Source code in src/mcp/shared/session.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
async def respond(self, response: SendResultT | ErrorData) -> None:
    """Send a response for this request.

    Must be called within a context manager block.

    Raises:
        RuntimeError: If not used within a context manager
        AssertionError: If request was already responded to
    """
    if not self._entered:  # pragma: no cover
        raise RuntimeError("RequestResponder must be used as a context manager")
    assert not self._completed, "Request already responded to"

    if not self.cancelled:  # pragma: no branch
        self._completed = True

        await self._session._send_response(  # type: ignore[reportPrivateUsage]
            request_id=self.request_id, response=response
        )

cancel async

cancel() -> None

Cancel this request and mark it as completed.

Source code in src/mcp/shared/session.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
async def cancel(self) -> None:
    """Cancel this request and mark it as completed."""
    if not self._entered:  # pragma: no cover
        raise RuntimeError("RequestResponder must be used as a context manager")
    if not self._cancel_scope:  # pragma: no cover
        raise RuntimeError("No active cancel scope")

    self._cancel_scope.cancel()
    self._completed = True  # Mark as completed so it's removed from in_flight
    # Send an error response to indicate cancellation
    await self._session._send_response(  # type: ignore[reportPrivateUsage]
        request_id=self.request_id,
        response=ErrorData(code=0, message="Request cancelled"),
    )

BaseSession

Bases: Generic[SendRequestT, SendNotificationT, SendResultT, ReceiveRequestT, ReceiveNotificationT]

Implements an MCP "session" on top of read/write streams, including features like request/response linking, notifications, and progress.

This class is an async context manager that automatically starts processing messages when entered.

Source code in src/mcp/shared/session.py
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
class BaseSession(
    Generic[
        SendRequestT,
        SendNotificationT,
        SendResultT,
        ReceiveRequestT,
        ReceiveNotificationT,
    ],
):
    """Implements an MCP "session" on top of read/write streams, including features
    like request/response linking, notifications, and progress.

    This class is an async context manager that automatically starts processing
    messages when entered.
    """

    _response_streams: dict[RequestId, MemoryObjectSendStream[JSONRPCResponse | JSONRPCError]]
    _request_id: int
    _in_flight: dict[RequestId, RequestResponder[ReceiveRequestT, SendResultT]]
    _progress_callbacks: dict[RequestId, ProgressFnT]
    _response_routers: list[ResponseRouter]

    def __init__(
        self,
        read_stream: ReadStream[SessionMessage | Exception],
        write_stream: WriteStream[SessionMessage],
        # If none, reading will never time out
        read_timeout_seconds: float | None = None,
    ) -> None:
        self._read_stream = read_stream
        self._write_stream = write_stream
        self._response_streams = {}
        self._request_id = 0
        self._session_read_timeout_seconds = read_timeout_seconds
        self._in_flight = {}
        self._progress_callbacks = {}
        self._response_routers = []
        self._exit_stack = AsyncExitStack()

    def add_response_router(self, router: ResponseRouter) -> None:
        """Register a response router to handle responses for non-standard requests.

        Response routers are checked in order before falling back to the default
        response stream mechanism. This is used by TaskResultHandler to route
        responses for queued task requests back to their resolvers.

        !!! warning
            This is an experimental API that may change without notice.

        Args:
            router: A ResponseRouter implementation
        """
        self._response_routers.append(router)

    async def __aenter__(self) -> Self:
        self._task_group = anyio.create_task_group()
        await self._task_group.__aenter__()
        self._task_group.start_soon(self._receive_loop)
        return self

    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc_val: BaseException | None,
        exc_tb: TracebackType | None,
    ) -> bool | None:
        await self._exit_stack.aclose()
        # Using BaseSession as a context manager should not block on exit (this
        # would be very surprising behavior), so make sure to cancel the tasks
        # in the task group.
        self._task_group.cancel_scope.cancel()
        return await self._task_group.__aexit__(exc_type, exc_val, exc_tb)

    async def send_request(
        self,
        request: SendRequestT,
        result_type: type[ReceiveResultT],
        request_read_timeout_seconds: float | None = None,
        metadata: MessageMetadata = None,
        progress_callback: ProgressFnT | None = None,
    ) -> ReceiveResultT:
        """Sends a request and waits for a response.

        Raises an MCPError if the response contains an error. If a request read timeout is provided, it will take
        precedence over the session read timeout.

        Do not use this method to emit notifications! Use send_notification() instead.
        """
        request_id = self._request_id
        self._request_id = request_id + 1

        response_stream, response_stream_reader = anyio.create_memory_object_stream[JSONRPCResponse | JSONRPCError](1)
        self._response_streams[request_id] = response_stream

        # Set up progress token if progress callback is provided
        request_data = request.model_dump(by_alias=True, mode="json", exclude_none=True)
        if progress_callback is not None:
            # Use request_id as progress token
            if "params" not in request_data:  # pragma: lax no cover
                request_data["params"] = {}
            if "_meta" not in request_data["params"]:  # pragma: lax no cover
                request_data["params"]["_meta"] = {}
            request_data["params"]["_meta"]["progressToken"] = request_id
            # Store the callback for this request
            self._progress_callbacks[request_id] = progress_callback

        try:
            target = request_data.get("params", {}).get("name")
            span_name = f"MCP send {request.method} {target}" if target else f"MCP send {request.method}"

            with otel_span(
                span_name,
                kind=SpanKind.CLIENT,
                attributes={"mcp.method.name": request.method, "jsonrpc.request.id": request_id},
            ):
                # Inject W3C trace context into _meta (SEP-414).
                meta: dict[str, Any] = request_data.setdefault("params", {}).setdefault("_meta", {})
                inject_trace_context(meta)

                jsonrpc_request = JSONRPCRequest(jsonrpc="2.0", id=request_id, **request_data)
                await self._write_stream.send(SessionMessage(message=jsonrpc_request, metadata=metadata))

                # request read timeout takes precedence over session read timeout
                timeout = request_read_timeout_seconds or self._session_read_timeout_seconds

                try:
                    with anyio.fail_after(timeout):
                        response_or_error = await response_stream_reader.receive()
                except TimeoutError:
                    class_name = request.__class__.__name__
                    message = f"Timed out while waiting for response to {class_name}. Waited {timeout} seconds."
                    raise MCPError(code=REQUEST_TIMEOUT, message=message)

                if isinstance(response_or_error, JSONRPCError):
                    raise MCPError.from_jsonrpc_error(response_or_error)
                else:
                    return result_type.model_validate(response_or_error.result, by_name=False)

        finally:
            self._response_streams.pop(request_id, None)
            self._progress_callbacks.pop(request_id, None)
            await response_stream.aclose()
            await response_stream_reader.aclose()

    async def send_notification(
        self,
        notification: SendNotificationT,
        related_request_id: RequestId | None = None,
    ) -> None:
        """Emits a notification, which is a one-way message that does not expect a response."""
        # Some transport implementations may need to set the related_request_id
        # to attribute to the notifications to the request that triggered them.
        jsonrpc_notification = JSONRPCNotification(
            jsonrpc="2.0",
            **notification.model_dump(by_alias=True, mode="json", exclude_none=True),
        )
        session_message = SessionMessage(
            message=jsonrpc_notification,
            metadata=ServerMessageMetadata(related_request_id=related_request_id) if related_request_id else None,
        )
        await self._write_stream.send(session_message)

    async def _send_response(self, request_id: RequestId, response: SendResultT | ErrorData) -> None:
        if isinstance(response, ErrorData):
            jsonrpc_error = JSONRPCError(jsonrpc="2.0", id=request_id, error=response)
            session_message = SessionMessage(message=jsonrpc_error)
            await self._write_stream.send(session_message)
        else:
            jsonrpc_response = JSONRPCResponse(
                jsonrpc="2.0",
                id=request_id,
                result=response.model_dump(by_alias=True, mode="json", exclude_none=True),
            )
            session_message = SessionMessage(message=jsonrpc_response)
            await self._write_stream.send(session_message)

    @property
    def _receive_request_adapter(self) -> TypeAdapter[ReceiveRequestT]:
        """Each subclass must provide its own request adapter."""
        raise NotImplementedError

    @property
    def _receive_notification_adapter(self) -> TypeAdapter[ReceiveNotificationT]:
        raise NotImplementedError

    async def _receive_loop(self) -> None:
        async with self._read_stream, self._write_stream:
            try:

                async def _handle_session_message(message: SessionMessage) -> None:
                    sender_context: contextvars.Context | None = getattr(self._read_stream, "last_context", None)
                    if isinstance(message.message, JSONRPCRequest):
                        try:
                            validated_request = self._receive_request_adapter.validate_python(
                                message.message.model_dump(by_alias=True, mode="json", exclude_none=True),
                                by_name=False,
                            )
                            responder = RequestResponder(
                                request_id=message.message.id,
                                request_meta=validated_request.params.meta if validated_request.params else None,
                                request=validated_request,
                                session=self,
                                on_complete=lambda r: self._in_flight.pop(r.request_id, None),
                                message_metadata=message.metadata,
                                context=sender_context,
                            )
                            self._in_flight[responder.request_id] = responder
                            await self._received_request(responder)

                            if not responder._completed:  # type: ignore[reportPrivateUsage]
                                await self._handle_incoming(responder)
                        except Exception:
                            # For request validation errors, send a proper JSON-RPC error
                            # response instead of crashing the server
                            logging.warning("Failed to validate request", exc_info=True)
                            logging.debug(f"Message that failed validation: {message.message}")
                            error_response = JSONRPCError(
                                jsonrpc="2.0",
                                id=message.message.id,
                                error=ErrorData(code=INVALID_PARAMS, message="Invalid request parameters", data=""),
                            )
                            session_message = SessionMessage(message=error_response)
                            await self._write_stream.send(session_message)

                    elif isinstance(message.message, JSONRPCNotification):
                        try:
                            notification = self._receive_notification_adapter.validate_python(
                                message.message.model_dump(by_alias=True, mode="json", exclude_none=True),
                                by_name=False,
                            )
                            # Handle cancellation notifications
                            if isinstance(notification, CancelledNotification):
                                cancelled_id = notification.params.request_id
                                if cancelled_id in self._in_flight:  # pragma: no branch
                                    await self._in_flight[cancelled_id].cancel()
                            else:
                                # Handle progress notifications callback
                                if isinstance(notification, ProgressNotification):
                                    progress_token = notification.params.progress_token
                                    # If there is a progress callback for this token,
                                    # call it with the progress information
                                    if progress_token in self._progress_callbacks:
                                        callback = self._progress_callbacks[progress_token]
                                        try:
                                            await callback(
                                                notification.params.progress,
                                                notification.params.total,
                                                notification.params.message,
                                            )
                                        except Exception:
                                            logging.exception("Progress callback raised an exception")
                                await self._received_notification(notification)
                                await self._handle_incoming(notification)
                        except Exception:
                            # For other validation errors, log and continue
                            logging.warning(  # pragma: no cover
                                f"Failed to validate notification:. Message was: {message.message}",
                                exc_info=True,
                            )
                    else:  # Response or error
                        await self._handle_response(message)

                async for message in self._read_stream:
                    if isinstance(message, Exception):
                        await self._handle_incoming(message)
                        continue

                    await _handle_session_message(message)

            except anyio.ClosedResourceError:
                # This is expected when the client disconnects abruptly.
                # Without this handler, the exception would propagate up and
                # crash the server's task group.
                logging.debug("Read stream closed by client")
            except Exception as e:
                # Other exceptions are not expected and should be logged. We purposefully
                # catch all exceptions here to avoid crashing the server.
                logging.exception(f"Unhandled exception in receive loop: {e}")  # pragma: no cover
            finally:
                # after the read stream is closed, we need to send errors
                # to any pending requests
                # Snapshot: stream.send() wakes the waiter, whose finally pops
                # from _response_streams before the next __next__() call.
                for id, stream in list(self._response_streams.items()):
                    error = ErrorData(code=CONNECTION_CLOSED, message="Connection closed")
                    try:
                        await stream.send(JSONRPCError(jsonrpc="2.0", id=id, error=error))
                        await stream.aclose()
                    except Exception:  # pragma: no cover
                        # Stream might already be closed
                        pass
                self._response_streams.clear()

    def _normalize_request_id(self, response_id: RequestId) -> RequestId:
        """Normalize a response ID to match how request IDs are stored.

        Since the client always sends integer IDs, we normalize string IDs
        to integers when possible. This matches the TypeScript SDK approach:
        https://github.com/modelcontextprotocol/typescript-sdk/blob/a606fb17909ea454e83aab14c73f14ea45c04448/src/shared/protocol.ts#L861

        Args:
            response_id: The response ID from the incoming message.

        Returns:
            The normalized ID (int if possible, otherwise original value).
        """
        if isinstance(response_id, str):
            try:
                return int(response_id)
            except ValueError:
                logging.warning(f"Response ID {response_id!r} cannot be normalized to match pending requests")
        return response_id

    async def _handle_response(self, message: SessionMessage) -> None:
        """Handle an incoming response or error message.

        Checks response routers first (e.g., for task-related responses),
        then falls back to the normal response stream mechanism.
        """
        # This check is always true at runtime: the caller (_receive_loop) only invokes
        # this method in the else branch after checking for JSONRPCRequest and
        # JSONRPCNotification. However, the type checker can't infer this from the
        # method signature, so we need this guard for type narrowing.
        if not isinstance(message.message, JSONRPCResponse | JSONRPCError):
            return  # pragma: no cover

        if message.message.id is None:
            # Narrows to JSONRPCError since JSONRPCResponse.id is always RequestId
            error = message.message.error
            logging.warning(f"Received error with null ID: {error.message}")
            await self._handle_incoming(MCPError(error.code, error.message, error.data))
            return
        # Normalize response ID to handle type mismatches (e.g., "0" vs 0)
        response_id = self._normalize_request_id(message.message.id)

        # First, check response routers (e.g., TaskResultHandler)
        if isinstance(message.message, JSONRPCError):
            # Route error to routers
            for router in self._response_routers:
                if router.route_error(response_id, message.message.error):
                    return  # Handled
        else:
            # Route success response to routers
            response_data: dict[str, Any] = message.message.result or {}
            for router in self._response_routers:
                if router.route_response(response_id, response_data):
                    return  # Handled

        # Fall back to normal response streams
        stream = self._response_streams.pop(response_id, None)
        if stream:
            await stream.send(message.message)
        else:
            await self._handle_incoming(RuntimeError(f"Received response with an unknown request ID: {message}"))

    async def _received_request(self, responder: RequestResponder[ReceiveRequestT, SendResultT]) -> None:
        """Can be overridden by subclasses to handle a request without needing to
        listen on the message stream.

        If the request is responded to within this method, it will not be
        forwarded on to the message stream.
        """

    async def _received_notification(self, notification: ReceiveNotificationT) -> None:
        """Can be overridden by subclasses to handle a notification without needing
        to listen on the message stream.
        """

    async def send_progress_notification(
        self,
        progress_token: ProgressToken,
        progress: float,
        total: float | None = None,
        message: str | None = None,
    ) -> None:
        """Sends a progress notification for a request that is currently being processed."""

    async def _handle_incoming(
        self, req: RequestResponder[ReceiveRequestT, SendResultT] | ReceiveNotificationT | Exception
    ) -> None:
        """A generic handler for incoming messages. Overridden by subclasses."""

add_response_router

add_response_router(router: ResponseRouter) -> None

Register a response router to handle responses for non-standard requests.

Response routers are checked in order before falling back to the default response stream mechanism. This is used by TaskResultHandler to route responses for queued task requests back to their resolvers.

Warning

This is an experimental API that may change without notice.

Parameters:

Name Type Description Default
router ResponseRouter

A ResponseRouter implementation

required
Source code in src/mcp/shared/session.py
205
206
207
208
209
210
211
212
213
214
215
216
217
218
def add_response_router(self, router: ResponseRouter) -> None:
    """Register a response router to handle responses for non-standard requests.

    Response routers are checked in order before falling back to the default
    response stream mechanism. This is used by TaskResultHandler to route
    responses for queued task requests back to their resolvers.

    !!! warning
        This is an experimental API that may change without notice.

    Args:
        router: A ResponseRouter implementation
    """
    self._response_routers.append(router)

send_request async

send_request(
    request: SendRequestT,
    result_type: type[ReceiveResultT],
    request_read_timeout_seconds: float | None = None,
    metadata: MessageMetadata = None,
    progress_callback: ProgressFnT | None = None,
) -> ReceiveResultT

Sends a request and waits for a response.

Raises an MCPError if the response contains an error. If a request read timeout is provided, it will take precedence over the session read timeout.

Do not use this method to emit notifications! Use send_notification() instead.

Source code in src/mcp/shared/session.py
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
async def send_request(
    self,
    request: SendRequestT,
    result_type: type[ReceiveResultT],
    request_read_timeout_seconds: float | None = None,
    metadata: MessageMetadata = None,
    progress_callback: ProgressFnT | None = None,
) -> ReceiveResultT:
    """Sends a request and waits for a response.

    Raises an MCPError if the response contains an error. If a request read timeout is provided, it will take
    precedence over the session read timeout.

    Do not use this method to emit notifications! Use send_notification() instead.
    """
    request_id = self._request_id
    self._request_id = request_id + 1

    response_stream, response_stream_reader = anyio.create_memory_object_stream[JSONRPCResponse | JSONRPCError](1)
    self._response_streams[request_id] = response_stream

    # Set up progress token if progress callback is provided
    request_data = request.model_dump(by_alias=True, mode="json", exclude_none=True)
    if progress_callback is not None:
        # Use request_id as progress token
        if "params" not in request_data:  # pragma: lax no cover
            request_data["params"] = {}
        if "_meta" not in request_data["params"]:  # pragma: lax no cover
            request_data["params"]["_meta"] = {}
        request_data["params"]["_meta"]["progressToken"] = request_id
        # Store the callback for this request
        self._progress_callbacks[request_id] = progress_callback

    try:
        target = request_data.get("params", {}).get("name")
        span_name = f"MCP send {request.method} {target}" if target else f"MCP send {request.method}"

        with otel_span(
            span_name,
            kind=SpanKind.CLIENT,
            attributes={"mcp.method.name": request.method, "jsonrpc.request.id": request_id},
        ):
            # Inject W3C trace context into _meta (SEP-414).
            meta: dict[str, Any] = request_data.setdefault("params", {}).setdefault("_meta", {})
            inject_trace_context(meta)

            jsonrpc_request = JSONRPCRequest(jsonrpc="2.0", id=request_id, **request_data)
            await self._write_stream.send(SessionMessage(message=jsonrpc_request, metadata=metadata))

            # request read timeout takes precedence over session read timeout
            timeout = request_read_timeout_seconds or self._session_read_timeout_seconds

            try:
                with anyio.fail_after(timeout):
                    response_or_error = await response_stream_reader.receive()
            except TimeoutError:
                class_name = request.__class__.__name__
                message = f"Timed out while waiting for response to {class_name}. Waited {timeout} seconds."
                raise MCPError(code=REQUEST_TIMEOUT, message=message)

            if isinstance(response_or_error, JSONRPCError):
                raise MCPError.from_jsonrpc_error(response_or_error)
            else:
                return result_type.model_validate(response_or_error.result, by_name=False)

    finally:
        self._response_streams.pop(request_id, None)
        self._progress_callbacks.pop(request_id, None)
        await response_stream.aclose()
        await response_stream_reader.aclose()

send_notification async

send_notification(
    notification: SendNotificationT,
    related_request_id: RequestId | None = None,
) -> None

Emits a notification, which is a one-way message that does not expect a response.

Source code in src/mcp/shared/session.py
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
async def send_notification(
    self,
    notification: SendNotificationT,
    related_request_id: RequestId | None = None,
) -> None:
    """Emits a notification, which is a one-way message that does not expect a response."""
    # Some transport implementations may need to set the related_request_id
    # to attribute to the notifications to the request that triggered them.
    jsonrpc_notification = JSONRPCNotification(
        jsonrpc="2.0",
        **notification.model_dump(by_alias=True, mode="json", exclude_none=True),
    )
    session_message = SessionMessage(
        message=jsonrpc_notification,
        metadata=ServerMessageMetadata(related_request_id=related_request_id) if related_request_id else None,
    )
    await self._write_stream.send(session_message)

send_progress_notification async

send_progress_notification(
    progress_token: ProgressToken,
    progress: float,
    total: float | None = None,
    message: str | None = None,
) -> None

Sends a progress notification for a request that is currently being processed.

Source code in src/mcp/shared/session.py
534
535
536
537
538
539
540
541
async def send_progress_notification(
    self,
    progress_token: ProgressToken,
    progress: float,
    total: float | None = None,
    message: str | None = None,
) -> None:
    """Sends a progress notification for a request that is currently being processed."""