Skip to content

sse

sse_client async

sse_client(
    url: str,
    headers: dict[str, Any] | None = None,
    timeout: float = 5.0,
    sse_read_timeout: float = 300.0,
    httpx_client_factory: McpHttpClientFactory = create_mcp_http_client,
    auth: Auth | None = None,
    on_session_created: Callable[[str], None] | None = None,
)

Client transport for SSE.

sse_read_timeout determines how long (in seconds) the client will wait for a new event before disconnecting. All other HTTP operations are controlled by timeout.

Parameters:

Name Type Description Default
url str

The SSE endpoint URL.

required
headers dict[str, Any] | None

Optional headers to include in requests.

None
timeout float

HTTP timeout for regular operations (in seconds).

5.0
sse_read_timeout float

Timeout for SSE read operations (in seconds).

300.0
httpx_client_factory McpHttpClientFactory

Factory function for creating the HTTPX client.

create_mcp_http_client
auth Auth | None

Optional HTTPX authentication handler.

None
on_session_created Callable[[str], None] | None

Optional callback invoked with the session ID when received.

None
Source code in src/mcp/client/sse.py
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
@asynccontextmanager
async def sse_client(
    url: str,
    headers: dict[str, Any] | None = None,
    timeout: float = 5.0,
    sse_read_timeout: float = 300.0,
    httpx_client_factory: McpHttpClientFactory = create_mcp_http_client,
    auth: httpx.Auth | None = None,
    on_session_created: Callable[[str], None] | None = None,
):
    """Client transport for SSE.

    `sse_read_timeout` determines how long (in seconds) the client will wait for a new
    event before disconnecting. All other HTTP operations are controlled by `timeout`.

    Args:
        url: The SSE endpoint URL.
        headers: Optional headers to include in requests.
        timeout: HTTP timeout for regular operations (in seconds).
        sse_read_timeout: Timeout for SSE read operations (in seconds).
        httpx_client_factory: Factory function for creating the HTTPX client.
        auth: Optional HTTPX authentication handler.
        on_session_created: Optional callback invoked with the session ID when received.
    """
    logger.debug(f"Connecting to SSE endpoint: {remove_request_params(url)}")
    async with httpx_client_factory(
        headers=headers, auth=auth, timeout=httpx.Timeout(timeout, read=sse_read_timeout)
    ) as client:
        async with aconnect_sse(client, "GET", url) as event_source:
            event_source.response.raise_for_status()
            logger.debug("SSE connection established")

            read_stream_writer, read_stream = create_context_streams[SessionMessage | Exception](0)
            write_stream, write_stream_reader = create_context_streams[SessionMessage](0)

            async def sse_reader(task_status: TaskStatus[str] = anyio.TASK_STATUS_IGNORED):
                try:
                    async for sse in event_source.aiter_sse():  # pragma: no branch
                        logger.debug(f"Received SSE event: {sse.event}")
                        match sse.event:
                            case "endpoint":
                                endpoint_url = urljoin(url, sse.data)
                                logger.debug(f"Received endpoint URL: {endpoint_url}")

                                url_parsed = urlparse(url)
                                endpoint_parsed = urlparse(endpoint_url)
                                if (  # pragma: no cover
                                    url_parsed.netloc != endpoint_parsed.netloc
                                    or url_parsed.scheme != endpoint_parsed.scheme
                                ):
                                    error_msg = (  # pragma: no cover
                                        f"Endpoint origin does not match connection origin: {endpoint_url}"
                                    )
                                    logger.error(error_msg)  # pragma: no cover
                                    raise ValueError(error_msg)  # pragma: no cover

                                if on_session_created:
                                    session_id = _extract_session_id_from_endpoint(endpoint_url)
                                    if session_id:
                                        on_session_created(session_id)

                                task_status.started(endpoint_url)

                            case "message":
                                # Skip empty data (keep-alive pings)
                                if not sse.data:
                                    continue
                                try:
                                    message = types.jsonrpc_message_adapter.validate_json(sse.data, by_name=False)
                                    logger.debug(f"Received server message: {message}")
                                except Exception as exc:  # pragma: no cover
                                    logger.exception("Error parsing server message")  # pragma: no cover
                                    await read_stream_writer.send(exc)  # pragma: no cover
                                    continue  # pragma: no cover

                                session_message = SessionMessage(message)
                                await read_stream_writer.send(session_message)
                            case _:  # pragma: no cover
                                logger.warning(f"Unknown SSE event: {sse.event}")  # pragma: no cover
                except SSEError as sse_exc:  # pragma: lax no cover
                    logger.exception("Encountered SSE exception")
                    raise sse_exc
                except Exception as exc:  # pragma: lax no cover
                    logger.exception("Error in sse_reader")
                    await read_stream_writer.send(exc)
                finally:
                    await read_stream_writer.aclose()

            async def post_writer(endpoint_url: str):
                try:
                    async with write_stream_reader, write_stream:

                        async def _send_message(session_message: SessionMessage) -> None:
                            logger.debug(f"Sending client message: {session_message}")
                            response = await client.post(
                                endpoint_url,
                                json=session_message.message.model_dump(
                                    by_alias=True,
                                    mode="json",
                                    exclude_unset=True,
                                ),
                            )
                            response.raise_for_status()
                            logger.debug(f"Client message sent successfully: {response.status_code}")

                        async for session_message in write_stream_reader:
                            sender_ctx = write_stream_reader.last_context
                            if sender_ctx is not None:
                                async with anyio.create_task_group() as tg:
                                    sender_ctx.run(tg.start_soon, _send_message, session_message)
                            else:
                                await _send_message(session_message)  # pragma: no cover
                except Exception:  # pragma: lax no cover
                    logger.exception("Error in post_writer")

            # On Python 3.14, coverage.py reports a phantom branch arc on this
            # line (->yield) when nested two async-with levels deep. The branch
            # is the unreachable "did __aexit__ suppress?" arm for memory streams.
            async with (  # pragma: no branch
                read_stream_writer,
                read_stream,
                write_stream,
                write_stream_reader,
                anyio.create_task_group() as tg,
            ):
                endpoint_url = await tg.start(sse_reader)
                logger.debug(f"Starting post writer with endpoint URL: {endpoint_url}")
                tg.start_soon(post_writer, endpoint_url)

                yield read_stream, write_stream
                tg.cancel_scope.cancel()