Skip to content

session_group

SessionGroup concurrently manages multiple MCP session connections.

Tools, resources, and prompts are aggregated across servers. Servers may be connected to or disconnected from at any point after initialization.

This abstraction can handle naming collisions using a custom user-provided hook.

SseServerParameters

Bases: BaseModel

Parameters for initializing an sse_client.

Source code in src/mcp/client/session_group.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
class SseServerParameters(BaseModel):
    """Parameters for initializing an sse_client."""

    # The endpoint URL.
    url: str

    # Optional headers to include in requests.
    headers: dict[str, Any] | None = None

    # HTTP timeout for regular operations (in seconds).
    timeout: float = 5.0

    # Timeout for SSE read operations (in seconds).
    sse_read_timeout: float = 300.0

StreamableHttpParameters

Bases: BaseModel

Parameters for initializing a streamable_http_client.

Source code in src/mcp/client/session_group.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
class StreamableHttpParameters(BaseModel):
    """Parameters for initializing a streamable_http_client."""

    # The endpoint URL.
    url: str

    # Optional headers to include in requests.
    headers: dict[str, Any] | None = None

    # HTTP timeout for regular operations (in seconds).
    timeout: float = 30.0

    # Timeout for SSE read operations (in seconds).
    sse_read_timeout: float = 300.0

    # Close the client session when the transport closes.
    terminate_on_close: bool = True

ClientSessionParameters dataclass

Parameters for establishing a client session to an MCP server.

Source code in src/mcp/client/session_group.py
72
73
74
75
76
77
78
79
80
81
82
@dataclass
class ClientSessionParameters:
    """Parameters for establishing a client session to an MCP server."""

    read_timeout_seconds: float | None = None
    sampling_callback: SamplingFnT | None = None
    elicitation_callback: ElicitationFnT | None = None
    list_roots_callback: ListRootsFnT | None = None
    logging_callback: LoggingFnT | None = None
    message_handler: MessageHandlerFnT | None = None
    client_info: types.Implementation | None = None

ClientSessionGroup

Client for managing connections to multiple MCP servers.

This class is responsible for encapsulating management of server connections. It aggregates tools, resources, and prompts from all connected servers.

For auxiliary handlers, such as resource subscription, this is delegated to the client and can be accessed via the session.

Example
name_fn = lambda name, server_info: f"{(server_info.name)}_{name}"
async with ClientSessionGroup(component_name_hook=name_fn) as group:
    for server_param in server_params:
        await group.connect_to_server(server_param)
    ...
Source code in src/mcp/client/session_group.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
class ClientSessionGroup:
    """Client for managing connections to multiple MCP servers.

    This class is responsible for encapsulating management of server connections.
    It aggregates tools, resources, and prompts from all connected servers.

    For auxiliary handlers, such as resource subscription, this is delegated to
    the client and can be accessed via the session.

    Example:
        ```python
        name_fn = lambda name, server_info: f"{(server_info.name)}_{name}"
        async with ClientSessionGroup(component_name_hook=name_fn) as group:
            for server_param in server_params:
                await group.connect_to_server(server_param)
            ...
        ```
    """

    class _ComponentNames(BaseModel):
        """Used for reverse index to find components."""

        prompts: set[str] = Field(default_factory=set)
        resources: set[str] = Field(default_factory=set)
        tools: set[str] = Field(default_factory=set)

    # Standard MCP components.
    _prompts: dict[str, types.Prompt]
    _resources: dict[str, types.Resource]
    _tools: dict[str, types.Tool]

    # Client-server connection management.
    _sessions: dict[mcp.ClientSession, _ComponentNames]
    _tool_to_session: dict[str, mcp.ClientSession]
    _exit_stack: contextlib.AsyncExitStack
    _session_exit_stacks: dict[mcp.ClientSession, contextlib.AsyncExitStack]

    # Optional fn consuming (component_name, server_info) for custom names.
    # This is to provide a means to mitigate naming conflicts across servers.
    # Example: (tool_name, server_info) => "{result.server_info.name}.{tool_name}"
    _ComponentNameHook: TypeAlias = Callable[[str, types.Implementation], str]
    _component_name_hook: _ComponentNameHook | None

    def __init__(
        self,
        exit_stack: contextlib.AsyncExitStack | None = None,
        component_name_hook: _ComponentNameHook | None = None,
    ) -> None:
        """Initializes the MCP client."""

        self._tools = {}
        self._resources = {}
        self._prompts = {}

        self._sessions = {}
        self._tool_to_session = {}
        if exit_stack is None:
            self._exit_stack = contextlib.AsyncExitStack()
            self._owns_exit_stack = True
        else:
            self._exit_stack = exit_stack
            self._owns_exit_stack = False
        self._session_exit_stacks = {}
        self._component_name_hook = component_name_hook

    async def __aenter__(self) -> Self:  # pragma: no cover
        # Enter the exit stack only if we created it ourselves
        if self._owns_exit_stack:
            await self._exit_stack.__aenter__()
        return self

    async def __aexit__(
        self,
        _exc_type: type[BaseException] | None,
        _exc_val: BaseException | None,
        _exc_tb: TracebackType | None,
    ) -> bool | None:  # pragma: no cover
        """Closes session exit stacks and main exit stack upon completion."""

        # Only close the main exit stack if we created it
        if self._owns_exit_stack:
            await self._exit_stack.aclose()

        # Concurrently close session stacks.
        async with anyio.create_task_group() as tg:
            for exit_stack in self._session_exit_stacks.values():
                tg.start_soon(exit_stack.aclose)

    @property
    def sessions(self) -> list[mcp.ClientSession]:
        """Returns the list of sessions being managed."""
        return list(self._sessions.keys())  # pragma: no cover

    @property
    def prompts(self) -> dict[str, types.Prompt]:
        """Returns the prompts as a dictionary of names to prompts."""
        return self._prompts

    @property
    def resources(self) -> dict[str, types.Resource]:
        """Returns the resources as a dictionary of names to resources."""
        return self._resources

    @property
    def tools(self) -> dict[str, types.Tool]:
        """Returns the tools as a dictionary of names to tools."""
        return self._tools

    async def call_tool(
        self,
        name: str,
        arguments: dict[str, Any] | None = None,
        read_timeout_seconds: float | None = None,
        progress_callback: ProgressFnT | None = None,
        *,
        meta: types.RequestParamsMeta | None = None,
    ) -> types.CallToolResult:
        """Executes a tool given its name and arguments."""
        session = self._tool_to_session[name]
        session_tool_name = self.tools[name].name
        return await session.call_tool(
            session_tool_name,
            arguments=arguments,
            read_timeout_seconds=read_timeout_seconds,
            progress_callback=progress_callback,
            meta=meta,
        )

    async def disconnect_from_server(self, session: mcp.ClientSession) -> None:
        """Disconnects from a single MCP server."""

        session_known_for_components = session in self._sessions
        session_known_for_stack = session in self._session_exit_stacks

        if not session_known_for_components and not session_known_for_stack:
            raise MCPError(
                code=types.INVALID_PARAMS,
                message="Provided session is not managed or already disconnected.",
            )

        if session_known_for_components:  # pragma: no branch
            component_names = self._sessions.pop(session)  # Pop from _sessions tracking

            # Remove prompts associated with the session.
            for name in component_names.prompts:
                if name in self._prompts:  # pragma: no branch
                    del self._prompts[name]
            # Remove resources associated with the session.
            for name in component_names.resources:
                if name in self._resources:  # pragma: no branch
                    del self._resources[name]
            # Remove tools associated with the session.
            for name in component_names.tools:
                if name in self._tools:  # pragma: no branch
                    del self._tools[name]
                if name in self._tool_to_session:  # pragma: no branch
                    del self._tool_to_session[name]

        # Clean up the session's resources via its dedicated exit stack
        if session_known_for_stack:
            session_stack_to_close = self._session_exit_stacks.pop(session)  # pragma: no cover
            await session_stack_to_close.aclose()  # pragma: no cover

    async def connect_with_session(
        self, server_info: types.Implementation, session: mcp.ClientSession
    ) -> mcp.ClientSession:
        """Connects to a single MCP server."""
        await self._aggregate_components(server_info, session)
        return session

    async def connect_to_server(
        self,
        server_params: ServerParameters,
        session_params: ClientSessionParameters | None = None,
    ) -> mcp.ClientSession:
        """Connects to a single MCP server."""
        server_info, session = await self._establish_session(server_params, session_params or ClientSessionParameters())
        return await self.connect_with_session(server_info, session)

    async def _establish_session(
        self,
        server_params: ServerParameters,
        session_params: ClientSessionParameters,
    ) -> tuple[types.Implementation, mcp.ClientSession]:
        """Establish a client session to an MCP server."""

        session_stack = contextlib.AsyncExitStack()
        try:
            # Create read and write streams that facilitate io with the server.
            if isinstance(server_params, StdioServerParameters):
                client = mcp.stdio_client(server_params)
                read, write = await session_stack.enter_async_context(client)
            elif isinstance(server_params, SseServerParameters):
                client = sse_client(
                    url=server_params.url,
                    headers=server_params.headers,
                    timeout=server_params.timeout,
                    sse_read_timeout=server_params.sse_read_timeout,
                )
                read, write = await session_stack.enter_async_context(client)
            else:
                httpx_client = create_mcp_http_client(
                    headers=server_params.headers,
                    timeout=httpx.Timeout(
                        server_params.timeout,
                        read=server_params.sse_read_timeout,
                    ),
                )
                await session_stack.enter_async_context(httpx_client)

                client = streamable_http_client(
                    url=server_params.url,
                    http_client=httpx_client,
                    terminate_on_close=server_params.terminate_on_close,
                )
                read, write = await session_stack.enter_async_context(client)

            session = await session_stack.enter_async_context(
                mcp.ClientSession(
                    read,
                    write,
                    read_timeout_seconds=session_params.read_timeout_seconds,
                    sampling_callback=session_params.sampling_callback,
                    elicitation_callback=session_params.elicitation_callback,
                    list_roots_callback=session_params.list_roots_callback,
                    logging_callback=session_params.logging_callback,
                    message_handler=session_params.message_handler,
                    client_info=session_params.client_info,
                )
            )

            result = await session.initialize()

            # Session successfully initialized.
            # Store its stack and register the stack with the main group stack.
            self._session_exit_stacks[session] = session_stack
            # session_stack itself becomes a resource managed by the
            # main _exit_stack.
            await self._exit_stack.enter_async_context(session_stack)

            return result.server_info, session
        except Exception:  # pragma: no cover
            # If anything during this setup fails, ensure the session-specific
            # stack is closed.
            await session_stack.aclose()
            raise

    async def _aggregate_components(self, server_info: types.Implementation, session: mcp.ClientSession) -> None:
        """Aggregates prompts, resources, and tools from a given session."""

        # Create a reverse index so we can find all prompts, resources, and
        # tools belonging to this session. Used for removing components from
        # the session group via self.disconnect_from_server.
        component_names = self._ComponentNames()

        # Temporary components dicts. We do not want to modify the aggregate
        # lists in case of an intermediate failure.
        prompts_temp: dict[str, types.Prompt] = {}
        resources_temp: dict[str, types.Resource] = {}
        tools_temp: dict[str, types.Tool] = {}
        tool_to_session_temp: dict[str, mcp.ClientSession] = {}

        # Query the server for its prompts and aggregate to list.
        try:
            prompts = (await session.list_prompts()).prompts
            for prompt in prompts:
                name = self._component_name(prompt.name, server_info)
                prompts_temp[name] = prompt
                component_names.prompts.add(name)
        except MCPError as err:  # pragma: no cover
            logging.warning(f"Could not fetch prompts: {err}")

        # Query the server for its resources and aggregate to list.
        try:
            resources = (await session.list_resources()).resources
            for resource in resources:
                name = self._component_name(resource.name, server_info)
                resources_temp[name] = resource
                component_names.resources.add(name)
        except MCPError as err:  # pragma: no cover
            logging.warning(f"Could not fetch resources: {err}")

        # Query the server for its tools and aggregate to list.
        try:
            tools = (await session.list_tools()).tools
            for tool in tools:
                name = self._component_name(tool.name, server_info)
                tools_temp[name] = tool
                tool_to_session_temp[name] = session
                component_names.tools.add(name)
        except MCPError as err:  # pragma: no cover
            logging.warning(f"Could not fetch tools: {err}")

        # Clean up exit stack for session if we couldn't retrieve anything
        # from the server.
        if not any((prompts_temp, resources_temp, tools_temp)):
            del self._session_exit_stacks[session]  # pragma: no cover

        # Check for duplicates.
        matching_prompts = prompts_temp.keys() & self._prompts.keys()
        if matching_prompts:
            raise MCPError(  # pragma: no cover
                code=types.INVALID_PARAMS,
                message=f"{matching_prompts} already exist in group prompts.",
            )
        matching_resources = resources_temp.keys() & self._resources.keys()
        if matching_resources:
            raise MCPError(  # pragma: no cover
                code=types.INVALID_PARAMS,
                message=f"{matching_resources} already exist in group resources.",
            )
        matching_tools = tools_temp.keys() & self._tools.keys()
        if matching_tools:
            raise MCPError(code=types.INVALID_PARAMS, message=f"{matching_tools} already exist in group tools.")

        # Aggregate components.
        self._sessions[session] = component_names
        self._prompts.update(prompts_temp)
        self._resources.update(resources_temp)
        self._tools.update(tools_temp)
        self._tool_to_session.update(tool_to_session_temp)

    def _component_name(self, name: str, server_info: types.Implementation) -> str:
        if self._component_name_hook:
            return self._component_name_hook(name, server_info)
        return name

__init__

__init__(
    exit_stack: AsyncExitStack | None = None,
    component_name_hook: _ComponentNameHook | None = None,
) -> None

Initializes the MCP client.

Source code in src/mcp/client/session_group.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
def __init__(
    self,
    exit_stack: contextlib.AsyncExitStack | None = None,
    component_name_hook: _ComponentNameHook | None = None,
) -> None:
    """Initializes the MCP client."""

    self._tools = {}
    self._resources = {}
    self._prompts = {}

    self._sessions = {}
    self._tool_to_session = {}
    if exit_stack is None:
        self._exit_stack = contextlib.AsyncExitStack()
        self._owns_exit_stack = True
    else:
        self._exit_stack = exit_stack
        self._owns_exit_stack = False
    self._session_exit_stacks = {}
    self._component_name_hook = component_name_hook

__aexit__ async

__aexit__(
    _exc_type: type[BaseException] | None,
    _exc_val: BaseException | None,
    _exc_tb: TracebackType | None,
) -> bool | None

Closes session exit stacks and main exit stack upon completion.

Source code in src/mcp/client/session_group.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
async def __aexit__(
    self,
    _exc_type: type[BaseException] | None,
    _exc_val: BaseException | None,
    _exc_tb: TracebackType | None,
) -> bool | None:  # pragma: no cover
    """Closes session exit stacks and main exit stack upon completion."""

    # Only close the main exit stack if we created it
    if self._owns_exit_stack:
        await self._exit_stack.aclose()

    # Concurrently close session stacks.
    async with anyio.create_task_group() as tg:
        for exit_stack in self._session_exit_stacks.values():
            tg.start_soon(exit_stack.aclose)

sessions property

sessions: list[ClientSession]

Returns the list of sessions being managed.

prompts property

prompts: dict[str, Prompt]

Returns the prompts as a dictionary of names to prompts.

resources property

resources: dict[str, Resource]

Returns the resources as a dictionary of names to resources.

tools property

tools: dict[str, Tool]

Returns the tools as a dictionary of names to tools.

call_tool async

call_tool(
    name: str,
    arguments: dict[str, Any] | None = None,
    read_timeout_seconds: float | None = None,
    progress_callback: ProgressFnT | None = None,
    *,
    meta: RequestParamsMeta | None = None
) -> CallToolResult

Executes a tool given its name and arguments.

Source code in src/mcp/client/session_group.py
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
async def call_tool(
    self,
    name: str,
    arguments: dict[str, Any] | None = None,
    read_timeout_seconds: float | None = None,
    progress_callback: ProgressFnT | None = None,
    *,
    meta: types.RequestParamsMeta | None = None,
) -> types.CallToolResult:
    """Executes a tool given its name and arguments."""
    session = self._tool_to_session[name]
    session_tool_name = self.tools[name].name
    return await session.call_tool(
        session_tool_name,
        arguments=arguments,
        read_timeout_seconds=read_timeout_seconds,
        progress_callback=progress_callback,
        meta=meta,
    )

disconnect_from_server async

disconnect_from_server(session: ClientSession) -> None

Disconnects from a single MCP server.

Source code in src/mcp/client/session_group.py
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
async def disconnect_from_server(self, session: mcp.ClientSession) -> None:
    """Disconnects from a single MCP server."""

    session_known_for_components = session in self._sessions
    session_known_for_stack = session in self._session_exit_stacks

    if not session_known_for_components and not session_known_for_stack:
        raise MCPError(
            code=types.INVALID_PARAMS,
            message="Provided session is not managed or already disconnected.",
        )

    if session_known_for_components:  # pragma: no branch
        component_names = self._sessions.pop(session)  # Pop from _sessions tracking

        # Remove prompts associated with the session.
        for name in component_names.prompts:
            if name in self._prompts:  # pragma: no branch
                del self._prompts[name]
        # Remove resources associated with the session.
        for name in component_names.resources:
            if name in self._resources:  # pragma: no branch
                del self._resources[name]
        # Remove tools associated with the session.
        for name in component_names.tools:
            if name in self._tools:  # pragma: no branch
                del self._tools[name]
            if name in self._tool_to_session:  # pragma: no branch
                del self._tool_to_session[name]

    # Clean up the session's resources via its dedicated exit stack
    if session_known_for_stack:
        session_stack_to_close = self._session_exit_stacks.pop(session)  # pragma: no cover
        await session_stack_to_close.aclose()  # pragma: no cover

connect_with_session async

connect_with_session(
    server_info: Implementation, session: ClientSession
) -> ClientSession

Connects to a single MCP server.

Source code in src/mcp/client/session_group.py
248
249
250
251
252
253
async def connect_with_session(
    self, server_info: types.Implementation, session: mcp.ClientSession
) -> mcp.ClientSession:
    """Connects to a single MCP server."""
    await self._aggregate_components(server_info, session)
    return session

connect_to_server async

connect_to_server(
    server_params: ServerParameters,
    session_params: ClientSessionParameters | None = None,
) -> ClientSession

Connects to a single MCP server.

Source code in src/mcp/client/session_group.py
255
256
257
258
259
260
261
262
async def connect_to_server(
    self,
    server_params: ServerParameters,
    session_params: ClientSessionParameters | None = None,
) -> mcp.ClientSession:
    """Connects to a single MCP server."""
    server_info, session = await self._establish_session(server_params, session_params or ClientSessionParameters())
    return await self.connect_with_session(server_info, session)