Skip to content

memory

In-memory transports

create_client_server_memory_streams async

create_client_server_memory_streams() -> (
    AsyncGenerator[
        tuple[MessageStream, MessageStream], None
    ]
)

Creates a pair of bidirectional memory streams for client-server communication.

Yields:

Type Description
AsyncGenerator[tuple[MessageStream, MessageStream], None]

A tuple of (client_streams, server_streams) where each is a tuple of

AsyncGenerator[tuple[MessageStream, MessageStream], None]

(read_stream, write_stream)

Source code in src/mcp/shared/memory.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@asynccontextmanager
async def create_client_server_memory_streams() -> AsyncGenerator[tuple[MessageStream, MessageStream], None]:
    """Creates a pair of bidirectional memory streams for client-server communication.

    Yields:
        A tuple of (client_streams, server_streams) where each is a tuple of
        (read_stream, write_stream)
    """
    # Create streams for both directions
    server_to_client_send, server_to_client_receive = create_context_streams[SessionMessage | Exception](1)
    client_to_server_send, client_to_server_receive = create_context_streams[SessionMessage | Exception](1)

    client_streams = (server_to_client_receive, client_to_server_send)
    server_streams = (client_to_server_receive, server_to_client_send)

    async with server_to_client_receive, client_to_server_send, client_to_server_receive, server_to_client_send:
        yield client_streams, server_streams