Skip to content

streamable_http

StreamableHTTP Server Transport Module

This module implements an HTTP transport layer with Streamable HTTP.

The transport handles bidirectional communication using HTTP requests and responses, with streaming support for long-running operations.

EventMessage dataclass

A JSONRPCMessage with an optional event ID for stream resumability.

Source code in src/mcp/server/streamable_http.py
71
72
73
74
75
76
@dataclass
class EventMessage:
    """A JSONRPCMessage with an optional event ID for stream resumability."""

    message: JSONRPCMessage
    event_id: str | None = None

EventStore

Bases: ABC

Interface for resumability support via event storage.

Source code in src/mcp/server/streamable_http.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
class EventStore(ABC):
    """Interface for resumability support via event storage."""

    @abstractmethod
    async def store_event(self, stream_id: StreamId, message: JSONRPCMessage | None) -> EventId:
        """Stores an event for later retrieval.

        Args:
            stream_id: ID of the stream the event belongs to
            message: The JSON-RPC message to store, or None for priming events

        Returns:
            The generated event ID for the stored event.
        """
        pass  # pragma: no cover

    @abstractmethod
    async def replay_events_after(
        self,
        last_event_id: EventId,
        send_callback: EventCallback,
    ) -> StreamId | None:
        """Replays events that occurred after the specified event ID.

        Args:
            last_event_id: The ID of the last event the client received
            send_callback: A callback function to send events to the client

        Returns:
            The stream ID of the replayed events, or None if no events were found.
        """
        pass  # pragma: no cover

store_event abstractmethod async

store_event(
    stream_id: StreamId, message: JSONRPCMessage | None
) -> EventId

Stores an event for later retrieval.

Parameters:

Name Type Description Default
stream_id StreamId

ID of the stream the event belongs to

required
message JSONRPCMessage | None

The JSON-RPC message to store, or None for priming events

required

Returns:

Type Description
EventId

The generated event ID for the stored event.

Source code in src/mcp/server/streamable_http.py
85
86
87
88
89
90
91
92
93
94
95
96
@abstractmethod
async def store_event(self, stream_id: StreamId, message: JSONRPCMessage | None) -> EventId:
    """Stores an event for later retrieval.

    Args:
        stream_id: ID of the stream the event belongs to
        message: The JSON-RPC message to store, or None for priming events

    Returns:
        The generated event ID for the stored event.
    """
    pass  # pragma: no cover

replay_events_after abstractmethod async

replay_events_after(
    last_event_id: EventId, send_callback: EventCallback
) -> StreamId | None

Replays events that occurred after the specified event ID.

Parameters:

Name Type Description Default
last_event_id EventId

The ID of the last event the client received

required
send_callback EventCallback

A callback function to send events to the client

required

Returns:

Type Description
StreamId | None

The stream ID of the replayed events, or None if no events were found.

Source code in src/mcp/server/streamable_http.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
@abstractmethod
async def replay_events_after(
    self,
    last_event_id: EventId,
    send_callback: EventCallback,
) -> StreamId | None:
    """Replays events that occurred after the specified event ID.

    Args:
        last_event_id: The ID of the last event the client received
        send_callback: A callback function to send events to the client

    Returns:
        The stream ID of the replayed events, or None if no events were found.
    """
    pass  # pragma: no cover

StreamableHTTPServerTransport

HTTP server transport with event streaming support for MCP.

Handles JSON-RPC messages in HTTP POST requests with SSE streaming. Supports optional JSON responses and session management.

Source code in src/mcp/server/streamable_http.py
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
class StreamableHTTPServerTransport:
    """HTTP server transport with event streaming support for MCP.

    Handles JSON-RPC messages in HTTP POST requests with SSE streaming.
    Supports optional JSON responses and session management.
    """

    # Server notification streams for POST requests as well as standalone SSE stream
    _read_stream_writer: ContextSendStream[SessionMessage | Exception] | None = None
    _read_stream: ContextReceiveStream[SessionMessage | Exception] | None = None
    _write_stream: ContextSendStream[SessionMessage] | None = None
    _write_stream_reader: ContextReceiveStream[SessionMessage] | None = None
    _security: TransportSecurityMiddleware

    def __init__(
        self,
        mcp_session_id: str | None,
        is_json_response_enabled: bool = False,
        event_store: EventStore | None = None,
        security_settings: TransportSecuritySettings | None = None,
        retry_interval: int | None = None,
    ) -> None:
        """Initialize a new StreamableHTTP server transport.

        Args:
            mcp_session_id: Optional session identifier for this connection.
                            Must contain only visible ASCII characters (0x21-0x7E).
            is_json_response_enabled: If True, return JSON responses for requests
                                    instead of SSE streams. Default is False.
            event_store: Event store for resumability support. If provided,
                        resumability will be enabled, allowing clients to
                        reconnect and resume messages.
            security_settings: Optional security settings for DNS rebinding protection.
            retry_interval: Retry interval in milliseconds to suggest to clients in SSE
                           retry field. When set, the server will send a retry field in
                           SSE priming events to control client reconnection timing for
                           polling behavior. Only used when event_store is provided.

        Raises:
            ValueError: If the session ID contains invalid characters.
        """
        if mcp_session_id is not None and not SESSION_ID_PATTERN.fullmatch(mcp_session_id):
            raise ValueError("Session ID must only contain visible ASCII characters (0x21-0x7E)")

        self.mcp_session_id = mcp_session_id
        self.is_json_response_enabled = is_json_response_enabled
        self._event_store = event_store
        self._security = TransportSecurityMiddleware(security_settings)
        self._retry_interval = retry_interval
        self._request_streams: dict[
            RequestId,
            tuple[
                MemoryObjectSendStream[EventMessage],
                MemoryObjectReceiveStream[EventMessage],
            ],
        ] = {}
        self._sse_stream_writers: dict[RequestId, MemoryObjectSendStream[dict[str, str]]] = {}
        self._terminated = False
        # Idle timeout cancel scope; managed by the session manager.
        self.idle_scope: anyio.CancelScope | None = None

    @property
    def is_terminated(self) -> bool:
        """Check if this transport has been explicitly terminated."""
        return self._terminated

    def close_sse_stream(self, request_id: RequestId) -> None:  # pragma: no cover
        """Close SSE connection for a specific request without terminating the stream.

        This method closes the HTTP connection for the specified request, triggering
        client reconnection. Events continue to be stored in the event store and will
        be replayed when the client reconnects with Last-Event-ID.

        Use this to implement polling behavior during long-running operations -
        the client will reconnect after the retry interval specified in the priming event.

        Args:
            request_id: The request ID whose SSE stream should be closed.

        Note:
            This is a no-op if there is no active stream for the request ID.
            Requires event_store to be configured for events to be stored during
            the disconnect.
        """
        writer = self._sse_stream_writers.pop(request_id, None)
        if writer:
            writer.close()

        # Also close and remove request streams
        if request_id in self._request_streams:
            send_stream, receive_stream = self._request_streams.pop(request_id)
            send_stream.close()
            receive_stream.close()

    def close_standalone_sse_stream(self) -> None:  # pragma: no cover
        """Close the standalone GET SSE stream, triggering client reconnection.

        This method closes the HTTP connection for the standalone GET stream used
        for unsolicited server-to-client notifications. The client SHOULD reconnect
        with Last-Event-ID to resume receiving notifications.

        Use this to implement polling behavior for the notification stream -
        the client will reconnect after the retry interval specified in the priming event.

        Note:
            This is a no-op if there is no active standalone SSE stream.
            Requires event_store to be configured for events to be stored during
            the disconnect.
            Currently, client reconnection for standalone GET streams is NOT
            implemented - this is a known gap (see test_standalone_get_stream_reconnection).
        """
        self.close_sse_stream(GET_STREAM_KEY)

    def _create_session_message(
        self,
        message: JSONRPCMessage,
        request: Request,
        request_id: RequestId,
        protocol_version: str,
    ) -> SessionMessage:
        """Create a session message with metadata including close_sse_stream callback.

        The close_sse_stream callbacks are only provided when the client supports
        resumability (protocol version >= 2025-11-25). Old clients can't resume if
        the stream is closed early because they didn't receive a priming event.
        """
        # Only provide close callbacks when client supports resumability
        if self._event_store and protocol_version >= "2025-11-25":

            async def close_stream_callback() -> None:  # pragma: no cover
                self.close_sse_stream(request_id)

            async def close_standalone_stream_callback() -> None:  # pragma: no cover
                self.close_standalone_sse_stream()

            metadata = ServerMessageMetadata(
                request_context=request,
                close_sse_stream=close_stream_callback,
                close_standalone_sse_stream=close_standalone_stream_callback,
            )
        else:
            metadata = ServerMessageMetadata(request_context=request)

        return SessionMessage(message, metadata=metadata)

    async def _maybe_send_priming_event(
        self,
        request_id: RequestId,
        sse_stream_writer: MemoryObjectSendStream[dict[str, Any]],
        protocol_version: str,
    ) -> None:
        """Send priming event for SSE resumability if event_store is configured.

        Only sends priming events to clients with protocol version >= 2025-11-25,
        which includes the fix for handling empty SSE data. Older clients would
        crash trying to parse empty data as JSON.
        """
        if not self._event_store:
            return
        # Priming events have empty data which older clients cannot handle.
        if protocol_version < "2025-11-25":
            return
        priming_event_id = await self._event_store.store_event(
            str(request_id),  # Convert RequestId to StreamId (str)
            None,  # Priming event has no payload
        )
        priming_event: dict[str, str | int] = {"id": priming_event_id, "data": ""}
        if self._retry_interval is not None:
            priming_event["retry"] = self._retry_interval
        await sse_stream_writer.send(priming_event)

    def _create_error_response(
        self,
        error_message: str,
        status_code: HTTPStatus,
        error_code: int = INVALID_REQUEST,
        headers: dict[str, str] | None = None,
    ) -> Response:
        """Create an error response with a simple string message."""
        response_headers = {"Content-Type": CONTENT_TYPE_JSON}
        if headers:  # pragma: no cover
            response_headers.update(headers)

        if self.mcp_session_id:
            response_headers[MCP_SESSION_ID_HEADER] = self.mcp_session_id

        # Return a properly formatted JSON error response
        error_response = JSONRPCError(
            jsonrpc="2.0",
            id=None,
            error=ErrorData(code=error_code, message=error_message),
        )

        return Response(
            error_response.model_dump_json(by_alias=True, exclude_unset=True),
            status_code=status_code,
            headers=response_headers,
        )

    def _create_json_response(
        self,
        response_message: JSONRPCMessage | None,
        status_code: HTTPStatus = HTTPStatus.OK,
        headers: dict[str, str] | None = None,
    ) -> Response:
        """Create a JSON response from a JSONRPCMessage."""
        response_headers = {"Content-Type": CONTENT_TYPE_JSON}
        if headers:  # pragma: lax no cover
            response_headers.update(headers)

        if self.mcp_session_id:  # pragma: lax no cover
            response_headers[MCP_SESSION_ID_HEADER] = self.mcp_session_id

        return Response(
            response_message.model_dump_json(by_alias=True, exclude_unset=True) if response_message else None,
            status_code=status_code,
            headers=response_headers,
        )

    def _get_session_id(self, request: Request) -> str | None:
        """Extract the session ID from request headers."""
        return request.headers.get(MCP_SESSION_ID_HEADER)

    def _create_event_data(self, event_message: EventMessage) -> dict[str, str]:
        """Create event data dictionary from an EventMessage."""
        event_data = {
            "event": "message",
            "data": event_message.message.model_dump_json(by_alias=True, exclude_unset=True),
        }

        # If an event ID was provided, include it
        if event_message.event_id:  # pragma: no cover
            event_data["id"] = event_message.event_id

        return event_data

    async def _clean_up_memory_streams(self, request_id: RequestId) -> None:
        """Clean up memory streams for a given request ID."""
        if request_id in self._request_streams:  # pragma: no branch
            try:
                # Close the request stream
                await self._request_streams[request_id][0].aclose()
                await self._request_streams[request_id][1].aclose()
            except Exception:  # pragma: no cover
                # During cleanup, we catch all exceptions since streams might be in various states
                logger.debug("Error closing memory streams - may already be closed")
            finally:
                # Remove the request stream from the mapping
                self._request_streams.pop(request_id, None)

    async def handle_request(self, scope: Scope, receive: Receive, send: Send) -> None:
        """Application entry point that handles all HTTP requests."""
        request = Request(scope, receive)

        # Validate request headers for DNS rebinding protection
        is_post = request.method == "POST"
        error_response = await self._security.validate_request(request, is_post=is_post)
        if error_response:
            await error_response(scope, receive, send)
            return

        if self._terminated:  # pragma: no cover
            # If the session has been terminated, return 404 Not Found
            response = self._create_error_response(
                "Not Found: Session has been terminated",
                HTTPStatus.NOT_FOUND,
            )
            await response(scope, receive, send)
            return

        if request.method == "POST":
            await self._handle_post_request(scope, request, receive, send)
        elif request.method == "GET":
            await self._handle_get_request(request, send)
        elif request.method == "DELETE":
            await self._handle_delete_request(request, send)
        else:  # pragma: no cover
            await self._handle_unsupported_request(request, send)

    def _check_accept_headers(self, request: Request) -> tuple[bool, bool]:
        """Check if the request accepts the required media types.

        Supports wildcard media types per RFC 7231, section 5.3.2:
        - */* matches any media type
        - application/* matches any application/ subtype
        - text/* matches any text/ subtype
        """
        accept_header = request.headers.get("accept", "")
        accept_types = [media_type.strip().split(";")[0].strip().lower() for media_type in accept_header.split(",")]

        has_wildcard = "*/*" in accept_types
        has_json = has_wildcard or any(t in (CONTENT_TYPE_JSON, "application/*") for t in accept_types)
        has_sse = has_wildcard or any(t in (CONTENT_TYPE_SSE, "text/*") for t in accept_types)

        return has_json, has_sse

    def _check_content_type(self, request: Request) -> bool:
        """Check if the request has the correct Content-Type."""
        content_type = request.headers.get("content-type", "")
        content_type_parts = [part.strip() for part in content_type.split(";")[0].split(",")]

        return any(part == CONTENT_TYPE_JSON for part in content_type_parts)

    async def _validate_accept_header(self, request: Request, scope: Scope, send: Send) -> bool:
        """Validate Accept header based on response mode. Returns True if valid."""
        has_json, has_sse = self._check_accept_headers(request)
        if self.is_json_response_enabled:
            # For JSON-only responses, only require application/json
            if not has_json:  # pragma: lax no cover
                response = self._create_error_response(
                    "Not Acceptable: Client must accept application/json",
                    HTTPStatus.NOT_ACCEPTABLE,
                )
                await response(scope, request.receive, send)
                return False
        # For SSE responses, require both content types
        elif not (has_json and has_sse):
            response = self._create_error_response(
                "Not Acceptable: Client must accept both application/json and text/event-stream",
                HTTPStatus.NOT_ACCEPTABLE,
            )
            await response(scope, request.receive, send)
            return False
        return True

    async def _handle_post_request(self, scope: Scope, request: Request, receive: Receive, send: Send) -> None:
        """Handle POST requests containing JSON-RPC messages."""
        writer = self._read_stream_writer
        if writer is None:  # pragma: no cover
            raise ValueError("No read stream writer available. Ensure connect() is called first.")
        try:
            # Validate Accept header
            if not await self._validate_accept_header(request, scope, send):
                return

            # Validate Content-Type
            if not self._check_content_type(request):  # pragma: no cover
                response = self._create_error_response(
                    "Unsupported Media Type: Content-Type must be application/json",
                    HTTPStatus.UNSUPPORTED_MEDIA_TYPE,
                )
                await response(scope, receive, send)
                return

            # Parse the body - only read it once
            body = await request.body()

            try:
                raw_message = pydantic_core.from_json(body)
            except ValueError as e:
                response = self._create_error_response(f"Parse error: {str(e)}", HTTPStatus.BAD_REQUEST, PARSE_ERROR)
                await response(scope, receive, send)
                return

            try:
                message = jsonrpc_message_adapter.validate_python(raw_message, by_name=False)
            except ValidationError as e:  # pragma: no cover
                response = self._create_error_response(
                    f"Validation error: {str(e)}",
                    HTTPStatus.BAD_REQUEST,
                    INVALID_PARAMS,
                )
                await response(scope, receive, send)
                return

            # Check if this is an initialization request
            is_initialization_request = isinstance(message, JSONRPCRequest) and message.method == "initialize"

            if is_initialization_request:
                # Check if the server already has an established session
                if self.mcp_session_id:
                    # Check if request has a session ID
                    request_session_id = self._get_session_id(request)

                    # If request has a session ID but doesn't match, return 404
                    if request_session_id and request_session_id != self.mcp_session_id:  # pragma: no cover
                        response = self._create_error_response(
                            "Not Found: Invalid or expired session ID",
                            HTTPStatus.NOT_FOUND,
                        )
                        await response(scope, receive, send)
                        return
            elif not await self._validate_request_headers(request, send):  # pragma: no cover
                return

            # For notifications and responses only, return 202 Accepted
            if not isinstance(message, JSONRPCRequest):
                # Create response object and send it
                response = self._create_json_response(
                    None,
                    HTTPStatus.ACCEPTED,
                )
                await response(scope, receive, send)

                # Process the message after sending the response
                metadata = ServerMessageMetadata(request_context=request)
                session_message = SessionMessage(message, metadata=metadata)
                await writer.send(session_message)

                return

            # Extract protocol version for priming event decision.
            # For initialize requests, get from request params.
            # For other requests, get from header (already validated).
            protocol_version = (
                str(message.params.get("protocolVersion", DEFAULT_NEGOTIATED_VERSION))
                if is_initialization_request and message.params
                else request.headers.get(MCP_PROTOCOL_VERSION_HEADER, DEFAULT_NEGOTIATED_VERSION)
            )

            # Extract the request ID outside the try block for proper scope
            request_id = str(message.id)
            # Register this stream for the request ID
            self._request_streams[request_id] = anyio.create_memory_object_stream[EventMessage](0)
            request_stream_reader = self._request_streams[request_id][1]

            if self.is_json_response_enabled:
                # Process the message
                metadata = ServerMessageMetadata(request_context=request)
                session_message = SessionMessage(message, metadata=metadata)
                await writer.send(session_message)
                try:
                    # Process messages from the request-specific stream
                    # We need to collect all messages until we get a response
                    response_message = None

                    # Use similar approach to SSE writer for consistency
                    async for event_message in request_stream_reader:  # pragma: no branch
                        # If it's a response, this is what we're waiting for
                        if isinstance(event_message.message, JSONRPCResponse | JSONRPCError):
                            response_message = event_message.message
                            break
                        # For notifications and requests, keep waiting
                        else:  # pragma: no cover
                            logger.debug(f"received: {event_message.message.method}")

                    # At this point we should have a response
                    if response_message:
                        # Create JSON response
                        response = self._create_json_response(response_message)
                        await response(scope, receive, send)
                    else:  # pragma: no cover
                        # This shouldn't happen in normal operation
                        logger.error("No response message received before stream closed")
                        response = self._create_error_response(
                            "Error processing request: No response received",
                            HTTPStatus.INTERNAL_SERVER_ERROR,
                        )
                        await response(scope, receive, send)
                except Exception:  # pragma: no cover
                    logger.exception("Error processing JSON response")
                    response = self._create_error_response(
                        "Error processing request",
                        HTTPStatus.INTERNAL_SERVER_ERROR,
                        INTERNAL_ERROR,
                    )
                    await response(scope, receive, send)
                finally:
                    await self._clean_up_memory_streams(request_id)
            else:
                # Create SSE stream
                sse_stream_writer, sse_stream_reader = anyio.create_memory_object_stream[dict[str, str]](0)

                # Store writer reference so close_sse_stream() can close it
                self._sse_stream_writers[request_id] = sse_stream_writer

                async def sse_writer():  # pragma: lax no cover
                    # Get the request ID from the incoming request message
                    try:
                        async with sse_stream_writer, request_stream_reader:
                            # Send priming event for SSE resumability
                            await self._maybe_send_priming_event(request_id, sse_stream_writer, protocol_version)

                            # Process messages from the request-specific stream
                            async for event_message in request_stream_reader:
                                # Build the event data
                                event_data = self._create_event_data(event_message)
                                await sse_stream_writer.send(event_data)

                                # If response, remove from pending streams and close
                                if isinstance(event_message.message, JSONRPCResponse | JSONRPCError):
                                    break
                    except anyio.ClosedResourceError:
                        # Expected when close_sse_stream() is called
                        logger.debug("SSE stream closed by close_sse_stream()")
                    except Exception:
                        logger.exception("Error in SSE writer")
                    finally:
                        logger.debug("Closing SSE writer")
                        self._sse_stream_writers.pop(request_id, None)
                        await self._clean_up_memory_streams(request_id)

                # Create and start EventSourceResponse
                # SSE stream mode (original behavior)
                # Set up headers
                headers = {
                    "Cache-Control": "no-cache, no-transform",
                    "Connection": "keep-alive",
                    "Content-Type": CONTENT_TYPE_SSE,
                    **({MCP_SESSION_ID_HEADER: self.mcp_session_id} if self.mcp_session_id else {}),
                }
                response = EventSourceResponse(
                    content=sse_stream_reader,
                    data_sender_callable=sse_writer,
                    headers=headers,
                )

                # Start the SSE response (this will send headers immediately)
                try:
                    # First send the response to establish the SSE connection
                    async with anyio.create_task_group() as tg:
                        tg.start_soon(response, scope, receive, send)
                        # Then send the message to be processed by the server
                        session_message = self._create_session_message(message, request, request_id, protocol_version)
                        await writer.send(session_message)
                except Exception:  # pragma: no cover
                    logger.exception("SSE response error")
                    await sse_stream_writer.aclose()
                    await self._clean_up_memory_streams(request_id)
                finally:
                    await sse_stream_reader.aclose()

        except Exception as err:  # pragma: no cover
            logger.exception("Error handling POST request")
            response = self._create_error_response(
                f"Error handling POST request: {err}",
                HTTPStatus.INTERNAL_SERVER_ERROR,
                INTERNAL_ERROR,
            )
            await response(scope, receive, send)
            if writer:
                await writer.send(Exception(err))
            return

    async def _handle_get_request(self, request: Request, send: Send) -> None:
        """Handle GET request to establish SSE.

        This allows the server to communicate to the client without the client
        first sending data via HTTP POST. The server can send JSON-RPC requests
        and notifications on this stream.
        """
        writer = self._read_stream_writer
        if writer is None:  # pragma: no cover
            raise ValueError("No read stream writer available. Ensure connect() is called first.")

        # Validate Accept header - must include text/event-stream
        _, has_sse = self._check_accept_headers(request)

        if not has_sse:  # pragma: no cover
            response = self._create_error_response(
                "Not Acceptable: Client must accept text/event-stream",
                HTTPStatus.NOT_ACCEPTABLE,
            )
            await response(request.scope, request.receive, send)
            return

        if not await self._validate_request_headers(request, send):  # pragma: no cover
            return

        # Handle resumability: check for Last-Event-ID header
        if last_event_id := request.headers.get(LAST_EVENT_ID_HEADER):  # pragma: no cover
            await self._replay_events(last_event_id, request, send)
            return

        headers = {
            "Cache-Control": "no-cache, no-transform",
            "Connection": "keep-alive",
            "Content-Type": CONTENT_TYPE_SSE,
        }

        if self.mcp_session_id:
            headers[MCP_SESSION_ID_HEADER] = self.mcp_session_id

        # Check if we already have an active GET stream
        if GET_STREAM_KEY in self._request_streams:  # pragma: no cover
            response = self._create_error_response(
                "Conflict: Only one SSE stream is allowed per session",
                HTTPStatus.CONFLICT,
            )
            await response(request.scope, request.receive, send)
            return

        # Create SSE stream
        sse_stream_writer, sse_stream_reader = anyio.create_memory_object_stream[dict[str, str]](0)

        async def standalone_sse_writer():
            try:
                # Create a standalone message stream for server-initiated messages

                self._request_streams[GET_STREAM_KEY] = anyio.create_memory_object_stream[EventMessage](0)
                standalone_stream_reader = self._request_streams[GET_STREAM_KEY][1]

                async with sse_stream_writer, standalone_stream_reader:
                    # Process messages from the standalone stream
                    async for event_message in standalone_stream_reader:  # pragma: lax no cover
                        # For the standalone stream, we handle:
                        # - JSONRPCNotification (server sends notifications to client)
                        # - JSONRPCRequest (server sends requests to client)
                        # We should NOT receive JSONRPCResponse

                        # Send the message via SSE
                        event_data = self._create_event_data(event_message)
                        await sse_stream_writer.send(event_data)
            except Exception:  # pragma: no cover
                logger.exception("Error in standalone SSE writer")
            finally:
                logger.debug("Closing standalone SSE writer")
                await self._clean_up_memory_streams(GET_STREAM_KEY)

        # Create and start EventSourceResponse
        response = EventSourceResponse(
            content=sse_stream_reader,
            data_sender_callable=standalone_sse_writer,
            headers=headers,
        )

        try:
            # This will send headers immediately and establish the SSE connection
            await response(request.scope, request.receive, send)
        except Exception:  # pragma: lax no cover
            logger.exception("Error in standalone SSE response")
            await self._clean_up_memory_streams(GET_STREAM_KEY)
        finally:
            await sse_stream_writer.aclose()
            await sse_stream_reader.aclose()

    async def _handle_delete_request(self, request: Request, send: Send) -> None:
        """Handle DELETE requests for explicit session termination."""
        # Validate session ID
        if not self.mcp_session_id:  # pragma: no cover
            # If no session ID set, return Method Not Allowed
            response = self._create_error_response(
                "Method Not Allowed: Session termination not supported",
                HTTPStatus.METHOD_NOT_ALLOWED,
            )
            await response(request.scope, request.receive, send)
            return

        if not await self._validate_request_headers(request, send):  # pragma: no cover
            return

        await self.terminate()

        response = self._create_json_response(
            None,
            HTTPStatus.OK,
        )
        await response(request.scope, request.receive, send)

    async def terminate(self) -> None:
        """Terminate the current session, closing all streams.

        Once terminated, all requests with this session ID will receive 404 Not Found.
        """

        self._terminated = True
        logger.info(f"Terminating session: {self.mcp_session_id}")

        # We need a copy of the keys to avoid modification during iteration
        request_stream_keys = list(self._request_streams.keys())

        # Close all request streams asynchronously
        for key in request_stream_keys:  # pragma: lax no cover
            await self._clean_up_memory_streams(key)

        # Clear the request streams dictionary immediately
        self._request_streams.clear()
        try:
            if self._read_stream_writer is not None:  # pragma: no branch
                await self._read_stream_writer.aclose()
            if self._read_stream is not None:  # pragma: no branch
                await self._read_stream.aclose()
            if self._write_stream_reader is not None:  # pragma: no branch
                await self._write_stream_reader.aclose()
            if self._write_stream is not None:  # pragma: no branch
                await self._write_stream.aclose()
        except Exception as e:  # pragma: no cover
            # During cleanup, we catch all exceptions since streams might be in various states
            logger.debug(f"Error closing streams: {e}")

    async def _handle_unsupported_request(self, request: Request, send: Send) -> None:  # pragma: no cover
        """Handle unsupported HTTP methods."""
        headers = {
            "Content-Type": CONTENT_TYPE_JSON,
            "Allow": "GET, POST, DELETE",
        }
        if self.mcp_session_id:
            headers[MCP_SESSION_ID_HEADER] = self.mcp_session_id

        response = self._create_error_response(
            "Method Not Allowed",
            HTTPStatus.METHOD_NOT_ALLOWED,
            headers=headers,
        )
        await response(request.scope, request.receive, send)

    async def _validate_request_headers(self, request: Request, send: Send) -> bool:  # pragma: lax no cover
        if not await self._validate_session(request, send):
            return False
        if not await self._validate_protocol_version(request, send):
            return False
        return True

    async def _validate_session(self, request: Request, send: Send) -> bool:
        """Validate the session ID in the request."""
        if not self.mcp_session_id:  # pragma: no cover
            # If we're not using session IDs, return True
            return True

        # Get the session ID from the request headers
        request_session_id = self._get_session_id(request)

        # If no session ID provided but required, return error
        if not request_session_id:  # pragma: no cover
            response = self._create_error_response(
                "Bad Request: Missing session ID",
                HTTPStatus.BAD_REQUEST,
            )
            await response(request.scope, request.receive, send)
            return False

        # If session ID doesn't match, return error
        if request_session_id != self.mcp_session_id:  # pragma: no cover
            response = self._create_error_response(
                "Not Found: Invalid or expired session ID",
                HTTPStatus.NOT_FOUND,
            )
            await response(request.scope, request.receive, send)
            return False

        return True

    async def _validate_protocol_version(self, request: Request, send: Send) -> bool:
        """Validate the protocol version header in the request."""
        # Get the protocol version from the request headers
        protocol_version = request.headers.get(MCP_PROTOCOL_VERSION_HEADER)

        # If no protocol version provided, assume default version
        if protocol_version is None:  # pragma: no cover
            protocol_version = DEFAULT_NEGOTIATED_VERSION

        # Check if the protocol version is supported
        if protocol_version not in SUPPORTED_PROTOCOL_VERSIONS:  # pragma: no cover
            supported_versions = ", ".join(SUPPORTED_PROTOCOL_VERSIONS)
            response = self._create_error_response(
                f"Bad Request: Unsupported protocol version: {protocol_version}. "
                + f"Supported versions: {supported_versions}",
                HTTPStatus.BAD_REQUEST,
            )
            await response(request.scope, request.receive, send)
            return False

        return True

    async def _replay_events(self, last_event_id: str, request: Request, send: Send) -> None:  # pragma: no cover
        """Replays events that would have been sent after the specified event ID.

        Only used when resumability is enabled.
        """
        event_store = self._event_store
        if not event_store:
            return

        try:
            headers = {
                "Cache-Control": "no-cache, no-transform",
                "Connection": "keep-alive",
                "Content-Type": CONTENT_TYPE_SSE,
            }

            if self.mcp_session_id:
                headers[MCP_SESSION_ID_HEADER] = self.mcp_session_id

            # Get protocol version from header (already validated in _validate_protocol_version)
            replay_protocol_version = request.headers.get(MCP_PROTOCOL_VERSION_HEADER, DEFAULT_NEGOTIATED_VERSION)

            # Create SSE stream for replay
            sse_stream_writer, sse_stream_reader = anyio.create_memory_object_stream[dict[str, str]](0)

            async def replay_sender():
                try:
                    async with sse_stream_writer:
                        # Define an async callback for sending events
                        async def send_event(event_message: EventMessage) -> None:
                            event_data = self._create_event_data(event_message)
                            await sse_stream_writer.send(event_data)

                        # Replay past events and get the stream ID
                        stream_id = await event_store.replay_events_after(last_event_id, send_event)

                        # If stream ID not in mapping, create it
                        if stream_id and stream_id not in self._request_streams:
                            # Register SSE writer so close_sse_stream() can close it
                            self._sse_stream_writers[stream_id] = sse_stream_writer

                            # Send priming event for this new connection
                            await self._maybe_send_priming_event(stream_id, sse_stream_writer, replay_protocol_version)

                            # Create new request streams for this connection
                            self._request_streams[stream_id] = anyio.create_memory_object_stream[EventMessage](0)
                            msg_reader = self._request_streams[stream_id][1]

                            # Forward messages to SSE
                            async with msg_reader:
                                async for event_message in msg_reader:
                                    event_data = self._create_event_data(event_message)

                                    await sse_stream_writer.send(event_data)
                except anyio.ClosedResourceError:
                    # Expected when close_sse_stream() is called
                    logger.debug("Replay SSE stream closed by close_sse_stream()")
                except Exception:
                    logger.exception("Error in replay sender")

            # Create and start EventSourceResponse
            response = EventSourceResponse(
                content=sse_stream_reader,
                data_sender_callable=replay_sender,
                headers=headers,
            )

            try:
                await response(request.scope, request.receive, send)
            except Exception:
                logger.exception("Error in replay response")
            finally:
                await sse_stream_writer.aclose()
                await sse_stream_reader.aclose()

        except Exception:
            logger.exception("Error replaying events")
            response = self._create_error_response(
                "Error replaying events",
                HTTPStatus.INTERNAL_SERVER_ERROR,
                INTERNAL_ERROR,
            )
            await response(request.scope, request.receive, send)

    @asynccontextmanager
    async def connect(
        self,
    ) -> AsyncGenerator[
        tuple[
            ReadStream[SessionMessage | Exception],
            WriteStream[SessionMessage],
        ],
        None,
    ]:
        """Context manager that provides read and write streams for a connection.

        Yields:
            Tuple of (read_stream, write_stream) for bidirectional communication
        """

        # Create the memory streams for this connection

        read_stream_writer, read_stream = create_context_streams[SessionMessage | Exception](0)
        write_stream, write_stream_reader = create_context_streams[SessionMessage](0)

        # Store the streams
        self._read_stream_writer = read_stream_writer
        self._read_stream = read_stream
        self._write_stream_reader = write_stream_reader
        self._write_stream = write_stream

        # Start a task group for message routing
        async with anyio.create_task_group() as tg:
            # Create a message router that distributes messages to request streams
            async def message_router():
                try:
                    async for session_message in write_stream_reader:  # pragma: no branch
                        # Determine which request stream(s) should receive this message
                        message = session_message.message
                        target_request_id = None
                        # Check if this is a response with a known request id.
                        # Null-id errors (e.g., parse errors) fall through to
                        # the GET stream since they can't be correlated.
                        if isinstance(message, JSONRPCResponse | JSONRPCError) and message.id is not None:
                            target_request_id = str(message.id)
                        # Extract related_request_id from meta if it exists
                        elif (  # pragma: no cover
                            session_message.metadata is not None
                            and isinstance(
                                session_message.metadata,
                                ServerMessageMetadata,
                            )
                            and session_message.metadata.related_request_id is not None
                        ):
                            target_request_id = str(session_message.metadata.related_request_id)

                        request_stream_id = target_request_id if target_request_id is not None else GET_STREAM_KEY

                        # Store the event if we have an event store,
                        # regardless of whether a client is connected
                        # messages will be replayed on the re-connect
                        event_id = None
                        if self._event_store:  # pragma: lax no cover
                            event_id = await self._event_store.store_event(request_stream_id, message)
                            logger.debug(f"Stored {event_id} from {request_stream_id}")

                        if request_stream_id in self._request_streams:
                            try:
                                # Send both the message and the event ID
                                await self._request_streams[request_stream_id][0].send(EventMessage(message, event_id))
                            except (anyio.BrokenResourceError, anyio.ClosedResourceError):  # pragma: no cover
                                # Stream might be closed, remove from registry
                                self._request_streams.pop(request_stream_id, None)
                        else:  # pragma: no cover
                            logger.debug(
                                f"""Request stream {request_stream_id} not found
                                for message. Still processing message as the client
                                might reconnect and replay."""
                            )
                except anyio.ClosedResourceError:
                    if self._terminated:
                        logger.debug("Read stream closed by client")
                    else:
                        logger.exception("Unexpected closure of read stream in message router")
                except Exception:  # pragma: lax no cover
                    logger.exception("Error in message router")

            # Start the message router
            tg.start_soon(message_router)

            try:
                # Yield the streams for the caller to use
                yield read_stream, write_stream
            finally:
                for stream_id in list(self._request_streams.keys()):  # pragma: lax no cover
                    await self._clean_up_memory_streams(stream_id)
                self._request_streams.clear()

                # Clean up the read and write streams
                try:
                    await read_stream_writer.aclose()
                    await read_stream.aclose()
                    await write_stream_reader.aclose()
                    await write_stream.aclose()
                except Exception as e:  # pragma: no cover
                    # During cleanup, we catch all exceptions since streams might be in various states
                    logger.debug(f"Error closing streams: {e}")

__init__

__init__(
    mcp_session_id: str | None,
    is_json_response_enabled: bool = False,
    event_store: EventStore | None = None,
    security_settings: (
        TransportSecuritySettings | None
    ) = None,
    retry_interval: int | None = None,
) -> None

Initialize a new StreamableHTTP server transport.

Parameters:

Name Type Description Default
mcp_session_id str | None

Optional session identifier for this connection. Must contain only visible ASCII characters (0x21-0x7E).

required
is_json_response_enabled bool

If True, return JSON responses for requests instead of SSE streams. Default is False.

False
event_store EventStore | None

Event store for resumability support. If provided, resumability will be enabled, allowing clients to reconnect and resume messages.

None
security_settings TransportSecuritySettings | None

Optional security settings for DNS rebinding protection.

None
retry_interval int | None

Retry interval in milliseconds to suggest to clients in SSE retry field. When set, the server will send a retry field in SSE priming events to control client reconnection timing for polling behavior. Only used when event_store is provided.

None

Raises:

Type Description
ValueError

If the session ID contains invalid characters.

Source code in src/mcp/server/streamable_http.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
def __init__(
    self,
    mcp_session_id: str | None,
    is_json_response_enabled: bool = False,
    event_store: EventStore | None = None,
    security_settings: TransportSecuritySettings | None = None,
    retry_interval: int | None = None,
) -> None:
    """Initialize a new StreamableHTTP server transport.

    Args:
        mcp_session_id: Optional session identifier for this connection.
                        Must contain only visible ASCII characters (0x21-0x7E).
        is_json_response_enabled: If True, return JSON responses for requests
                                instead of SSE streams. Default is False.
        event_store: Event store for resumability support. If provided,
                    resumability will be enabled, allowing clients to
                    reconnect and resume messages.
        security_settings: Optional security settings for DNS rebinding protection.
        retry_interval: Retry interval in milliseconds to suggest to clients in SSE
                       retry field. When set, the server will send a retry field in
                       SSE priming events to control client reconnection timing for
                       polling behavior. Only used when event_store is provided.

    Raises:
        ValueError: If the session ID contains invalid characters.
    """
    if mcp_session_id is not None and not SESSION_ID_PATTERN.fullmatch(mcp_session_id):
        raise ValueError("Session ID must only contain visible ASCII characters (0x21-0x7E)")

    self.mcp_session_id = mcp_session_id
    self.is_json_response_enabled = is_json_response_enabled
    self._event_store = event_store
    self._security = TransportSecurityMiddleware(security_settings)
    self._retry_interval = retry_interval
    self._request_streams: dict[
        RequestId,
        tuple[
            MemoryObjectSendStream[EventMessage],
            MemoryObjectReceiveStream[EventMessage],
        ],
    ] = {}
    self._sse_stream_writers: dict[RequestId, MemoryObjectSendStream[dict[str, str]]] = {}
    self._terminated = False
    # Idle timeout cancel scope; managed by the session manager.
    self.idle_scope: anyio.CancelScope | None = None

is_terminated property

is_terminated: bool

Check if this transport has been explicitly terminated.

close_sse_stream

close_sse_stream(request_id: RequestId) -> None

Close SSE connection for a specific request without terminating the stream.

This method closes the HTTP connection for the specified request, triggering client reconnection. Events continue to be stored in the event store and will be replayed when the client reconnects with Last-Event-ID.

Use this to implement polling behavior during long-running operations - the client will reconnect after the retry interval specified in the priming event.

Parameters:

Name Type Description Default
request_id RequestId

The request ID whose SSE stream should be closed.

required
Note

This is a no-op if there is no active stream for the request ID. Requires event_store to be configured for events to be stored during the disconnect.

Source code in src/mcp/server/streamable_http.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
def close_sse_stream(self, request_id: RequestId) -> None:  # pragma: no cover
    """Close SSE connection for a specific request without terminating the stream.

    This method closes the HTTP connection for the specified request, triggering
    client reconnection. Events continue to be stored in the event store and will
    be replayed when the client reconnects with Last-Event-ID.

    Use this to implement polling behavior during long-running operations -
    the client will reconnect after the retry interval specified in the priming event.

    Args:
        request_id: The request ID whose SSE stream should be closed.

    Note:
        This is a no-op if there is no active stream for the request ID.
        Requires event_store to be configured for events to be stored during
        the disconnect.
    """
    writer = self._sse_stream_writers.pop(request_id, None)
    if writer:
        writer.close()

    # Also close and remove request streams
    if request_id in self._request_streams:
        send_stream, receive_stream = self._request_streams.pop(request_id)
        send_stream.close()
        receive_stream.close()

close_standalone_sse_stream

close_standalone_sse_stream() -> None

Close the standalone GET SSE stream, triggering client reconnection.

This method closes the HTTP connection for the standalone GET stream used for unsolicited server-to-client notifications. The client SHOULD reconnect with Last-Event-ID to resume receiving notifications.

Use this to implement polling behavior for the notification stream - the client will reconnect after the retry interval specified in the priming event.

Note

This is a no-op if there is no active standalone SSE stream. Requires event_store to be configured for events to be stored during the disconnect. Currently, client reconnection for standalone GET streams is NOT implemented - this is a known gap (see test_standalone_get_stream_reconnection).

Source code in src/mcp/server/streamable_http.py
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
def close_standalone_sse_stream(self) -> None:  # pragma: no cover
    """Close the standalone GET SSE stream, triggering client reconnection.

    This method closes the HTTP connection for the standalone GET stream used
    for unsolicited server-to-client notifications. The client SHOULD reconnect
    with Last-Event-ID to resume receiving notifications.

    Use this to implement polling behavior for the notification stream -
    the client will reconnect after the retry interval specified in the priming event.

    Note:
        This is a no-op if there is no active standalone SSE stream.
        Requires event_store to be configured for events to be stored during
        the disconnect.
        Currently, client reconnection for standalone GET streams is NOT
        implemented - this is a known gap (see test_standalone_get_stream_reconnection).
    """
    self.close_sse_stream(GET_STREAM_KEY)

handle_request async

handle_request(
    scope: Scope, receive: Receive, send: Send
) -> None

Application entry point that handles all HTTP requests.

Source code in src/mcp/server/streamable_http.py
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
async def handle_request(self, scope: Scope, receive: Receive, send: Send) -> None:
    """Application entry point that handles all HTTP requests."""
    request = Request(scope, receive)

    # Validate request headers for DNS rebinding protection
    is_post = request.method == "POST"
    error_response = await self._security.validate_request(request, is_post=is_post)
    if error_response:
        await error_response(scope, receive, send)
        return

    if self._terminated:  # pragma: no cover
        # If the session has been terminated, return 404 Not Found
        response = self._create_error_response(
            "Not Found: Session has been terminated",
            HTTPStatus.NOT_FOUND,
        )
        await response(scope, receive, send)
        return

    if request.method == "POST":
        await self._handle_post_request(scope, request, receive, send)
    elif request.method == "GET":
        await self._handle_get_request(request, send)
    elif request.method == "DELETE":
        await self._handle_delete_request(request, send)
    else:  # pragma: no cover
        await self._handle_unsupported_request(request, send)

terminate async

terminate() -> None

Terminate the current session, closing all streams.

Once terminated, all requests with this session ID will receive 404 Not Found.

Source code in src/mcp/server/streamable_http.py
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
async def terminate(self) -> None:
    """Terminate the current session, closing all streams.

    Once terminated, all requests with this session ID will receive 404 Not Found.
    """

    self._terminated = True
    logger.info(f"Terminating session: {self.mcp_session_id}")

    # We need a copy of the keys to avoid modification during iteration
    request_stream_keys = list(self._request_streams.keys())

    # Close all request streams asynchronously
    for key in request_stream_keys:  # pragma: lax no cover
        await self._clean_up_memory_streams(key)

    # Clear the request streams dictionary immediately
    self._request_streams.clear()
    try:
        if self._read_stream_writer is not None:  # pragma: no branch
            await self._read_stream_writer.aclose()
        if self._read_stream is not None:  # pragma: no branch
            await self._read_stream.aclose()
        if self._write_stream_reader is not None:  # pragma: no branch
            await self._write_stream_reader.aclose()
        if self._write_stream is not None:  # pragma: no branch
            await self._write_stream.aclose()
    except Exception as e:  # pragma: no cover
        # During cleanup, we catch all exceptions since streams might be in various states
        logger.debug(f"Error closing streams: {e}")

connect async

connect() -> AsyncGenerator[
    tuple[
        ReadStream[SessionMessage | Exception],
        WriteStream[SessionMessage],
    ],
    None,
]

Context manager that provides read and write streams for a connection.

Yields:

Type Description
AsyncGenerator[tuple[ReadStream[SessionMessage | Exception], WriteStream[SessionMessage]], None]

Tuple of (read_stream, write_stream) for bidirectional communication

Source code in src/mcp/server/streamable_http.py
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
@asynccontextmanager
async def connect(
    self,
) -> AsyncGenerator[
    tuple[
        ReadStream[SessionMessage | Exception],
        WriteStream[SessionMessage],
    ],
    None,
]:
    """Context manager that provides read and write streams for a connection.

    Yields:
        Tuple of (read_stream, write_stream) for bidirectional communication
    """

    # Create the memory streams for this connection

    read_stream_writer, read_stream = create_context_streams[SessionMessage | Exception](0)
    write_stream, write_stream_reader = create_context_streams[SessionMessage](0)

    # Store the streams
    self._read_stream_writer = read_stream_writer
    self._read_stream = read_stream
    self._write_stream_reader = write_stream_reader
    self._write_stream = write_stream

    # Start a task group for message routing
    async with anyio.create_task_group() as tg:
        # Create a message router that distributes messages to request streams
        async def message_router():
            try:
                async for session_message in write_stream_reader:  # pragma: no branch
                    # Determine which request stream(s) should receive this message
                    message = session_message.message
                    target_request_id = None
                    # Check if this is a response with a known request id.
                    # Null-id errors (e.g., parse errors) fall through to
                    # the GET stream since they can't be correlated.
                    if isinstance(message, JSONRPCResponse | JSONRPCError) and message.id is not None:
                        target_request_id = str(message.id)
                    # Extract related_request_id from meta if it exists
                    elif (  # pragma: no cover
                        session_message.metadata is not None
                        and isinstance(
                            session_message.metadata,
                            ServerMessageMetadata,
                        )
                        and session_message.metadata.related_request_id is not None
                    ):
                        target_request_id = str(session_message.metadata.related_request_id)

                    request_stream_id = target_request_id if target_request_id is not None else GET_STREAM_KEY

                    # Store the event if we have an event store,
                    # regardless of whether a client is connected
                    # messages will be replayed on the re-connect
                    event_id = None
                    if self._event_store:  # pragma: lax no cover
                        event_id = await self._event_store.store_event(request_stream_id, message)
                        logger.debug(f"Stored {event_id} from {request_stream_id}")

                    if request_stream_id in self._request_streams:
                        try:
                            # Send both the message and the event ID
                            await self._request_streams[request_stream_id][0].send(EventMessage(message, event_id))
                        except (anyio.BrokenResourceError, anyio.ClosedResourceError):  # pragma: no cover
                            # Stream might be closed, remove from registry
                            self._request_streams.pop(request_stream_id, None)
                    else:  # pragma: no cover
                        logger.debug(
                            f"""Request stream {request_stream_id} not found
                            for message. Still processing message as the client
                            might reconnect and replay."""
                        )
            except anyio.ClosedResourceError:
                if self._terminated:
                    logger.debug("Read stream closed by client")
                else:
                    logger.exception("Unexpected closure of read stream in message router")
            except Exception:  # pragma: lax no cover
                logger.exception("Error in message router")

        # Start the message router
        tg.start_soon(message_router)

        try:
            # Yield the streams for the caller to use
            yield read_stream, write_stream
        finally:
            for stream_id in list(self._request_streams.keys()):  # pragma: lax no cover
                await self._clean_up_memory_streams(stream_id)
            self._request_streams.clear()

            # Clean up the read and write streams
            try:
                await read_stream_writer.aclose()
                await read_stream.aclose()
                await write_stream_reader.aclose()
                await write_stream.aclose()
            except Exception as e:  # pragma: no cover
                # During cleanup, we catch all exceptions since streams might be in various states
                logger.debug(f"Error closing streams: {e}")