Skip to content

context

Context

Bases: BaseModel, Generic[LifespanContextT, RequestT]

Context object providing access to MCP capabilities.

This provides a cleaner interface to MCP's RequestContext functionality. It gets injected into tool and resource functions that request it via type hints.

To use context in a tool function, add a parameter with the Context type annotation:

@server.tool()
async def my_tool(x: int, ctx: Context) -> str:
    # Log messages to the client
    await ctx.info(f"Processing {x}")
    await ctx.debug("Debug info")
    await ctx.warning("Warning message")
    await ctx.error("Error message")

    # Report progress
    await ctx.report_progress(50, 100)

    # Access resources
    data = await ctx.read_resource("resource://data")

    # Get request info
    request_id = ctx.request_id
    client_id = ctx.client_id

    return str(x)

The context parameter name can be anything as long as it's annotated with Context. The context is optional - tools that don't need it can omit the parameter.

Source code in src/mcp/server/mcpserver/context.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
class Context(BaseModel, Generic[LifespanContextT, RequestT]):
    """Context object providing access to MCP capabilities.

    This provides a cleaner interface to MCP's RequestContext functionality.
    It gets injected into tool and resource functions that request it via type hints.

    To use context in a tool function, add a parameter with the Context type annotation:

    ```python
    @server.tool()
    async def my_tool(x: int, ctx: Context) -> str:
        # Log messages to the client
        await ctx.info(f"Processing {x}")
        await ctx.debug("Debug info")
        await ctx.warning("Warning message")
        await ctx.error("Error message")

        # Report progress
        await ctx.report_progress(50, 100)

        # Access resources
        data = await ctx.read_resource("resource://data")

        # Get request info
        request_id = ctx.request_id
        client_id = ctx.client_id

        return str(x)
    ```

    The context parameter name can be anything as long as it's annotated with Context.
    The context is optional - tools that don't need it can omit the parameter.
    """

    _request_context: ServerRequestContext[LifespanContextT, RequestT] | None
    _mcp_server: MCPServer | None

    # TODO(maxisbey): Consider making request_context/mcp_server required, or refactor Context entirely.
    def __init__(
        self,
        *,
        request_context: ServerRequestContext[LifespanContextT, RequestT] | None = None,
        mcp_server: MCPServer | None = None,
        # TODO(Marcelo): We should drop this kwargs parameter.
        **kwargs: Any,
    ):
        super().__init__(**kwargs)
        self._request_context = request_context
        self._mcp_server = mcp_server

    @property
    def mcp_server(self) -> MCPServer:
        """Access to the MCPServer instance."""
        if self._mcp_server is None:  # pragma: no cover
            raise ValueError("Context is not available outside of a request")
        return self._mcp_server  # pragma: no cover

    @property
    def request_context(self) -> ServerRequestContext[LifespanContextT, RequestT]:
        """Access to the underlying request context."""
        if self._request_context is None:  # pragma: no cover
            raise ValueError("Context is not available outside of a request")
        return self._request_context

    async def report_progress(self, progress: float, total: float | None = None, message: str | None = None) -> None:
        """Report progress for the current operation.

        Args:
            progress: Current progress value (e.g., 24)
            total: Optional total value (e.g., 100)
            message: Optional message (e.g., "Starting render...")
        """
        progress_token = self.request_context.meta.get("progress_token") if self.request_context.meta else None

        if progress_token is None:  # pragma: no cover
            return

        await self.request_context.session.send_progress_notification(
            progress_token=progress_token,
            progress=progress,
            total=total,
            message=message,
            related_request_id=self.request_id,
        )

    async def read_resource(self, uri: str | AnyUrl) -> Iterable[ReadResourceContents]:
        """Read a resource by URI.

        Args:
            uri: Resource URI to read

        Returns:
            The resource content as either text or bytes
        """
        assert self._mcp_server is not None, "Context is not available outside of a request"
        return await self._mcp_server.read_resource(uri, self)

    async def elicit(
        self,
        message: str,
        schema: type[ElicitSchemaModelT],
    ) -> ElicitationResult[ElicitSchemaModelT]:
        """Elicit information from the client/user.

        This method can be used to interactively ask for additional information from the
        client within a tool's execution. The client might display the message to the
        user and collect a response according to the provided schema. If the client
        is an agent, it might decide how to handle the elicitation -- either by asking
        the user or automatically generating a response.

        Args:
            message: Message to present to the user
            schema: A Pydantic model class defining the expected response structure.
                    According to the specification, only primitive types are allowed.

        Returns:
            An ElicitationResult containing the action taken and the data if accepted

        Note:
            Check the result.action to determine if the user accepted, declined, or cancelled.
            The result.data will only be populated if action is "accept" and validation succeeded.
        """

        return await elicit_with_validation(
            session=self.request_context.session,
            message=message,
            schema=schema,
            related_request_id=self.request_id,
        )

    async def elicit_url(
        self,
        message: str,
        url: str,
        elicitation_id: str,
    ) -> UrlElicitationResult:
        """Request URL mode elicitation from the client.

        This directs the user to an external URL for out-of-band interactions
        that must not pass through the MCP client. Use this for:
        - Collecting sensitive credentials (API keys, passwords)
        - OAuth authorization flows with third-party services
        - Payment and subscription flows
        - Any interaction where data should not pass through the LLM context

        The response indicates whether the user consented to navigate to the URL.
        The actual interaction happens out-of-band. When the elicitation completes,
        call `ctx.session.send_elicit_complete(elicitation_id)` to notify the client.

        Args:
            message: Human-readable explanation of why the interaction is needed
            url: The URL the user should navigate to
            elicitation_id: Unique identifier for tracking this elicitation

        Returns:
            UrlElicitationResult indicating accept, decline, or cancel
        """
        return await elicit_url(
            session=self.request_context.session,
            message=message,
            url=url,
            elicitation_id=elicitation_id,
            related_request_id=self.request_id,
        )

    async def log(
        self,
        level: LoggingLevel,
        data: Any,
        *,
        logger_name: str | None = None,
    ) -> None:
        """Send a log message to the client.

        Args:
            level: Log level (debug, info, notice, warning, error, critical,
                alert, emergency)
            data: The data to be logged. Any JSON serializable type is allowed
                (string, dict, list, number, bool, etc.) per the MCP specification.
            logger_name: Optional logger name
        """
        await self.request_context.session.send_log_message(
            level=level,
            data=data,
            logger=logger_name,
            related_request_id=self.request_id,
        )

    # TODO(maxisbey): see if this is needed otherwise remove
    @property
    def client_id(self) -> str | None:
        """Get the client ID if available.

        Note: this reads from the MCP request's `_meta` params, not the OAuth
        bearer token. For that, use `get_access_token().client_id`.
        """
        return self.request_context.meta.get("client_id") if self.request_context.meta else None  # pragma: no cover

    @property
    def request_id(self) -> str:
        """Get the unique ID for this request."""
        return str(self.request_context.request_id)

    @property
    def session(self):
        """Access to the underlying session for advanced usage."""
        return self.request_context.session

    async def close_sse_stream(self) -> None:
        """Close the SSE stream to trigger client reconnection.

        This method closes the HTTP connection for the current request, triggering
        client reconnection. Events continue to be stored in the event store and will
        be replayed when the client reconnects with Last-Event-ID.

        Use this to implement polling behavior during long-running operations -
        the client will reconnect after the retry interval specified in the priming event.

        Note:
            This is a no-op if not using StreamableHTTP transport with event_store.
            The callback is only available when event_store is configured.
        """
        if self._request_context and self._request_context.close_sse_stream:  # pragma: no cover
            await self._request_context.close_sse_stream()

    async def close_standalone_sse_stream(self) -> None:
        """Close the standalone GET SSE stream to trigger client reconnection.

        This method closes the HTTP connection for the standalone GET stream used
        for unsolicited server-to-client notifications. The client SHOULD reconnect
        with Last-Event-ID to resume receiving notifications.

        Note:
            This is a no-op if not using StreamableHTTP transport with event_store.
            Currently, client reconnection for standalone GET streams is NOT
            implemented - this is a known gap.
        """
        if self._request_context and self._request_context.close_standalone_sse_stream:  # pragma: no cover
            await self._request_context.close_standalone_sse_stream()

    # Convenience methods for common log levels
    async def debug(self, data: Any, *, logger_name: str | None = None) -> None:
        """Send a debug log message."""
        await self.log("debug", data, logger_name=logger_name)

    async def info(self, data: Any, *, logger_name: str | None = None) -> None:
        """Send an info log message."""
        await self.log("info", data, logger_name=logger_name)

    async def warning(self, data: Any, *, logger_name: str | None = None) -> None:
        """Send a warning log message."""
        await self.log("warning", data, logger_name=logger_name)

    async def error(self, data: Any, *, logger_name: str | None = None) -> None:
        """Send an error log message."""
        await self.log("error", data, logger_name=logger_name)

mcp_server property

mcp_server: MCPServer

Access to the MCPServer instance.

request_context property

request_context: ServerRequestContext[
    LifespanContextT, RequestT
]

Access to the underlying request context.

report_progress async

report_progress(
    progress: float,
    total: float | None = None,
    message: str | None = None,
) -> None

Report progress for the current operation.

Parameters:

Name Type Description Default
progress float

Current progress value (e.g., 24)

required
total float | None

Optional total value (e.g., 100)

None
message str | None

Optional message (e.g., "Starting render...")

None
Source code in src/mcp/server/mcpserver/context.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
async def report_progress(self, progress: float, total: float | None = None, message: str | None = None) -> None:
    """Report progress for the current operation.

    Args:
        progress: Current progress value (e.g., 24)
        total: Optional total value (e.g., 100)
        message: Optional message (e.g., "Starting render...")
    """
    progress_token = self.request_context.meta.get("progress_token") if self.request_context.meta else None

    if progress_token is None:  # pragma: no cover
        return

    await self.request_context.session.send_progress_notification(
        progress_token=progress_token,
        progress=progress,
        total=total,
        message=message,
        related_request_id=self.request_id,
    )

read_resource async

read_resource(
    uri: str | AnyUrl,
) -> Iterable[ReadResourceContents]

Read a resource by URI.

Parameters:

Name Type Description Default
uri str | AnyUrl

Resource URI to read

required

Returns:

Type Description
Iterable[ReadResourceContents]

The resource content as either text or bytes

Source code in src/mcp/server/mcpserver/context.py
108
109
110
111
112
113
114
115
116
117
118
async def read_resource(self, uri: str | AnyUrl) -> Iterable[ReadResourceContents]:
    """Read a resource by URI.

    Args:
        uri: Resource URI to read

    Returns:
        The resource content as either text or bytes
    """
    assert self._mcp_server is not None, "Context is not available outside of a request"
    return await self._mcp_server.read_resource(uri, self)

elicit async

elicit(
    message: str, schema: type[ElicitSchemaModelT]
) -> ElicitationResult[ElicitSchemaModelT]

Elicit information from the client/user.

This method can be used to interactively ask for additional information from the client within a tool's execution. The client might display the message to the user and collect a response according to the provided schema. If the client is an agent, it might decide how to handle the elicitation -- either by asking the user or automatically generating a response.

Parameters:

Name Type Description Default
message str

Message to present to the user

required
schema type[ElicitSchemaModelT]

A Pydantic model class defining the expected response structure. According to the specification, only primitive types are allowed.

required

Returns:

Type Description
ElicitationResult[ElicitSchemaModelT]

An ElicitationResult containing the action taken and the data if accepted

Note

Check the result.action to determine if the user accepted, declined, or cancelled. The result.data will only be populated if action is "accept" and validation succeeded.

Source code in src/mcp/server/mcpserver/context.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
async def elicit(
    self,
    message: str,
    schema: type[ElicitSchemaModelT],
) -> ElicitationResult[ElicitSchemaModelT]:
    """Elicit information from the client/user.

    This method can be used to interactively ask for additional information from the
    client within a tool's execution. The client might display the message to the
    user and collect a response according to the provided schema. If the client
    is an agent, it might decide how to handle the elicitation -- either by asking
    the user or automatically generating a response.

    Args:
        message: Message to present to the user
        schema: A Pydantic model class defining the expected response structure.
                According to the specification, only primitive types are allowed.

    Returns:
        An ElicitationResult containing the action taken and the data if accepted

    Note:
        Check the result.action to determine if the user accepted, declined, or cancelled.
        The result.data will only be populated if action is "accept" and validation succeeded.
    """

    return await elicit_with_validation(
        session=self.request_context.session,
        message=message,
        schema=schema,
        related_request_id=self.request_id,
    )

elicit_url async

elicit_url(
    message: str, url: str, elicitation_id: str
) -> UrlElicitationResult

Request URL mode elicitation from the client.

This directs the user to an external URL for out-of-band interactions that must not pass through the MCP client. Use this for: - Collecting sensitive credentials (API keys, passwords) - OAuth authorization flows with third-party services - Payment and subscription flows - Any interaction where data should not pass through the LLM context

The response indicates whether the user consented to navigate to the URL. The actual interaction happens out-of-band. When the elicitation completes, call ctx.session.send_elicit_complete(elicitation_id) to notify the client.

Parameters:

Name Type Description Default
message str

Human-readable explanation of why the interaction is needed

required
url str

The URL the user should navigate to

required
elicitation_id str

Unique identifier for tracking this elicitation

required

Returns:

Type Description
UrlElicitationResult

UrlElicitationResult indicating accept, decline, or cancel

Source code in src/mcp/server/mcpserver/context.py
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
async def elicit_url(
    self,
    message: str,
    url: str,
    elicitation_id: str,
) -> UrlElicitationResult:
    """Request URL mode elicitation from the client.

    This directs the user to an external URL for out-of-band interactions
    that must not pass through the MCP client. Use this for:
    - Collecting sensitive credentials (API keys, passwords)
    - OAuth authorization flows with third-party services
    - Payment and subscription flows
    - Any interaction where data should not pass through the LLM context

    The response indicates whether the user consented to navigate to the URL.
    The actual interaction happens out-of-band. When the elicitation completes,
    call `ctx.session.send_elicit_complete(elicitation_id)` to notify the client.

    Args:
        message: Human-readable explanation of why the interaction is needed
        url: The URL the user should navigate to
        elicitation_id: Unique identifier for tracking this elicitation

    Returns:
        UrlElicitationResult indicating accept, decline, or cancel
    """
    return await elicit_url(
        session=self.request_context.session,
        message=message,
        url=url,
        elicitation_id=elicitation_id,
        related_request_id=self.request_id,
    )

log async

log(
    level: LoggingLevel,
    data: Any,
    *,
    logger_name: str | None = None
) -> None

Send a log message to the client.

Parameters:

Name Type Description Default
level LoggingLevel

Log level (debug, info, notice, warning, error, critical, alert, emergency)

required
data Any

The data to be logged. Any JSON serializable type is allowed (string, dict, list, number, bool, etc.) per the MCP specification.

required
logger_name str | None

Optional logger name

None
Source code in src/mcp/server/mcpserver/context.py
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
async def log(
    self,
    level: LoggingLevel,
    data: Any,
    *,
    logger_name: str | None = None,
) -> None:
    """Send a log message to the client.

    Args:
        level: Log level (debug, info, notice, warning, error, critical,
            alert, emergency)
        data: The data to be logged. Any JSON serializable type is allowed
            (string, dict, list, number, bool, etc.) per the MCP specification.
        logger_name: Optional logger name
    """
    await self.request_context.session.send_log_message(
        level=level,
        data=data,
        logger=logger_name,
        related_request_id=self.request_id,
    )

client_id property

client_id: str | None

Get the client ID if available.

Note: this reads from the MCP request's _meta params, not the OAuth bearer token. For that, use get_access_token().client_id.

request_id property

request_id: str

Get the unique ID for this request.

session property

session

Access to the underlying session for advanced usage.

close_sse_stream async

close_sse_stream() -> None

Close the SSE stream to trigger client reconnection.

This method closes the HTTP connection for the current request, triggering client reconnection. Events continue to be stored in the event store and will be replayed when the client reconnects with Last-Event-ID.

Use this to implement polling behavior during long-running operations - the client will reconnect after the retry interval specified in the priming event.

Note

This is a no-op if not using StreamableHTTP transport with event_store. The callback is only available when event_store is configured.

Source code in src/mcp/server/mcpserver/context.py
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
async def close_sse_stream(self) -> None:
    """Close the SSE stream to trigger client reconnection.

    This method closes the HTTP connection for the current request, triggering
    client reconnection. Events continue to be stored in the event store and will
    be replayed when the client reconnects with Last-Event-ID.

    Use this to implement polling behavior during long-running operations -
    the client will reconnect after the retry interval specified in the priming event.

    Note:
        This is a no-op if not using StreamableHTTP transport with event_store.
        The callback is only available when event_store is configured.
    """
    if self._request_context and self._request_context.close_sse_stream:  # pragma: no cover
        await self._request_context.close_sse_stream()

close_standalone_sse_stream async

close_standalone_sse_stream() -> None

Close the standalone GET SSE stream to trigger client reconnection.

This method closes the HTTP connection for the standalone GET stream used for unsolicited server-to-client notifications. The client SHOULD reconnect with Last-Event-ID to resume receiving notifications.

Note

This is a no-op if not using StreamableHTTP transport with event_store. Currently, client reconnection for standalone GET streams is NOT implemented - this is a known gap.

Source code in src/mcp/server/mcpserver/context.py
248
249
250
251
252
253
254
255
256
257
258
259
260
261
async def close_standalone_sse_stream(self) -> None:
    """Close the standalone GET SSE stream to trigger client reconnection.

    This method closes the HTTP connection for the standalone GET stream used
    for unsolicited server-to-client notifications. The client SHOULD reconnect
    with Last-Event-ID to resume receiving notifications.

    Note:
        This is a no-op if not using StreamableHTTP transport with event_store.
        Currently, client reconnection for standalone GET streams is NOT
        implemented - this is a known gap.
    """
    if self._request_context and self._request_context.close_standalone_sse_stream:  # pragma: no cover
        await self._request_context.close_standalone_sse_stream()

debug async

debug(data: Any, *, logger_name: str | None = None) -> None

Send a debug log message.

Source code in src/mcp/server/mcpserver/context.py
264
265
266
async def debug(self, data: Any, *, logger_name: str | None = None) -> None:
    """Send a debug log message."""
    await self.log("debug", data, logger_name=logger_name)

info async

info(data: Any, *, logger_name: str | None = None) -> None

Send an info log message.

Source code in src/mcp/server/mcpserver/context.py
268
269
270
async def info(self, data: Any, *, logger_name: str | None = None) -> None:
    """Send an info log message."""
    await self.log("info", data, logger_name=logger_name)

warning async

warning(
    data: Any, *, logger_name: str | None = None
) -> None

Send a warning log message.

Source code in src/mcp/server/mcpserver/context.py
272
273
274
async def warning(self, data: Any, *, logger_name: str | None = None) -> None:
    """Send a warning log message."""
    await self.log("warning", data, logger_name=logger_name)

error async

error(data: Any, *, logger_name: str | None = None) -> None

Send an error log message.

Source code in src/mcp/server/mcpserver/context.py
276
277
278
async def error(self, data: Any, *, logger_name: str | None = None) -> None:
    """Send an error log message."""
    await self.log("error", data, logger_name=logger_name)