Skip to content

direct_dispatcher

In-memory Dispatcher that wires two peers together with no transport.

DirectDispatcher is the simplest possible Dispatcher implementation: a request on one side directly invokes the other side's on_request. There is no serialization, no JSON-RPC framing, and no streams. It exists to:

  • prove the Dispatcher Protocol is implementable without JSON-RPC
  • provide a fast substrate for testing the layers above the dispatcher (ServerRunner, Context, Connection) without wire-level moving parts
  • embed a server in-process when the JSON-RPC overhead is unnecessary

Unlike JSONRPCDispatcher, exceptions raised in a handler propagate directly to the caller - there is no exception-to-ErrorData boundary here.

DirectDispatcher

A Dispatcher that calls a peer's handlers directly, in-process.

Two instances are wired together with create_direct_dispatcher_pair; each holds a reference to the other. send_raw_request on one awaits the peer's on_request. run parks until close is called.

Lifecycle mirrors JSONRPCDispatcher: send_raw_request requires run() to have started, and once a side has closed - via close() or run() ending - send_raw_request raises MCPError (CONNECTION_CLOSED) and inbound requests fail the peer's call the same way instead of invoking the handler. Notifications are fire-and-forget in both directions: after close they are silently dropped.

Source code in src/mcp/shared/direct_dispatcher.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
class DirectDispatcher:
    """A `Dispatcher` that calls a peer's handlers directly, in-process.

    Two instances are wired together with `create_direct_dispatcher_pair`; each
    holds a reference to the other. `send_raw_request` on one awaits the peer's
    `on_request`. `run` parks until `close` is called.

    Lifecycle mirrors `JSONRPCDispatcher`: `send_raw_request` requires `run()`
    to have started, and once a side has closed - via `close()` or `run()`
    ending - `send_raw_request` raises `MCPError` (`CONNECTION_CLOSED`) and
    inbound requests fail the peer's call the same way instead of invoking the
    handler. Notifications are fire-and-forget in both directions: after close
    they are silently dropped.
    """

    def __init__(self, transport_ctx: TransportContext):
        self._transport_ctx = transport_ctx
        self._peer: DirectDispatcher | None = None
        self._on_request: OnRequest | None = None
        self._on_notify: OnNotify | None = None
        self._next_id = 0
        self._ready = anyio.Event()
        self._close_event = anyio.Event()
        self._running = False
        self._closed = False

    def connect_to(self, peer: DirectDispatcher) -> None:
        self._peer = peer

    async def send_raw_request(
        self,
        method: str,
        params: Mapping[str, Any] | None,
        opts: CallOptions | None = None,
    ) -> dict[str, Any]:
        """Send a request by invoking the peer's `on_request` directly.

        Raises:
            MCPError: The peer's handler raised; `REQUEST_TIMEOUT` if
                `opts["timeout"]` elapsed; `CONNECTION_CLOSED` if either
                side has closed.
            RuntimeError: Called before `run()`.
        """
        if self._peer is None:
            raise RuntimeError("DirectDispatcher has no peer; use create_direct_dispatcher_pair()")
        # Post-close sends get the same CONNECTION_CLOSED contract as JSONRPCDispatcher.
        if self._closed:
            raise MCPError(code=CONNECTION_CLOSED, message="Connection closed")
        if not self._running:
            raise RuntimeError("DirectDispatcher.send_raw_request called before run()")
        return await self._peer._dispatch_request(method, params, opts)

    async def notify(self, method: str, params: Mapping[str, Any] | None) -> None:
        """Send a notification by invoking the peer's `on_notify` directly.

        Fire-and-forget: usable before `run()` (delivery waits for the peer to
        start), and after close it is silently dropped, matching
        `JSONRPCDispatcher.notify`.
        """
        if self._peer is None:
            raise RuntimeError("DirectDispatcher has no peer; use create_direct_dispatcher_pair()")
        if self._closed:
            logger.debug("dropped notification %r on closed DirectDispatcher", method)
            return
        await self._peer._dispatch_notify(method, params)

    async def run(
        self,
        on_request: OnRequest,
        on_notify: OnNotify,
        *,
        task_status: anyio.abc.TaskStatus[None] = anyio.TASK_STATUS_IGNORED,
    ) -> None:
        """Mark this side ready and park until `close()` is called.

        Single-shot, like `JSONRPCDispatcher.run`: once it returns the
        dispatcher stays closed and cannot be restarted.
        """
        try:
            self._on_request = on_request
            self._on_notify = on_notify
            self._running = True
            self._ready.set()
            task_status.started()
            await self._close_event.wait()
        finally:
            self._running = False
            self._closed = True
            # run() may end via cancellation without close() ever being
            # called; setting the event wakes `_wait_ready` waiters so they
            # observe the closed state instead of parking forever.
            self._close_event.set()

    def close(self) -> None:
        self._closed = True
        self._close_event.set()

    def _make_context(
        self, on_progress: ProgressFnT | None = None, request_id: RequestId | None = None
    ) -> _DirectDispatchContext:
        assert self._peer is not None
        peer = self._peer
        return _DirectDispatchContext(
            transport=self._transport_ctx,
            _back_request=lambda m, p, o: peer._dispatch_request(m, p, o),
            _back_notify=lambda m, p: peer._dispatch_notify(m, p),
            request_id=request_id,
            _on_progress=on_progress,
        )

    async def _wait_ready(self) -> None:
        """Park until `run()` has started, waking early if this side closes.

        Raises:
            MCPError: `CONNECTION_CLOSED` if this side has closed.
        """
        if not self._ready.is_set() and not self._close_event.is_set():
            async with anyio.create_task_group() as tg:

                async def wake_on(event: anyio.Event) -> None:
                    await event.wait()
                    tg.cancel_scope.cancel()

                tg.start_soon(wake_on, self._ready)
                tg.start_soon(wake_on, self._close_event)
        if self._closed:
            raise MCPError(code=CONNECTION_CLOSED, message="Connection closed")

    async def _dispatch_request(
        self,
        method: str,
        params: Mapping[str, Any] | None,
        opts: CallOptions | None,
    ) -> dict[str, Any]:
        opts = opts or {}
        try:
            with anyio.fail_after(opts.get("timeout")):
                # Inside the timeout scope, so a configured timeout also bounds
                # waiting on a peer whose run() has not started yet.
                await self._wait_ready()
                assert self._on_request is not None
                # Synthesize an id: the DispatchContext contract reserves None for notifications.
                self._next_id += 1
                dctx = self._make_context(on_progress=opts.get("on_progress"), request_id=self._next_id)
                try:
                    return await self._on_request(dctx, method, params)
                except MCPError:
                    raise
                except ValidationError as e:
                    # Same shape JSONRPCDispatcher writes, so runner-over-direct
                    # tests see what runner-over-JSONRPC would.
                    raise MCPError(code=INVALID_PARAMS, message="Invalid request parameters", data="") from e
                except Exception as e:
                    raise MCPError(code=INTERNAL_ERROR, message=str(e)) from e
        except TimeoutError:
            raise MCPError(
                code=REQUEST_TIMEOUT,
                message=f"Timed out after {opts.get('timeout')}s waiting for {method!r}",
            ) from None

    async def _dispatch_notify(self, method: str, params: Mapping[str, Any] | None) -> None:
        try:
            await self._wait_ready()
        except MCPError:
            # Notifications are fire-and-forget: a notify to a closed peer is
            # dropped, not raised back into the sender's call.
            logger.debug("dropped notification %r to closed DirectDispatcher", method)
            return
        assert self._on_notify is not None
        dctx = self._make_context()
        await self._on_notify(dctx, method, params)

send_raw_request async

send_raw_request(
    method: str,
    params: Mapping[str, Any] | None,
    opts: CallOptions | None = None,
) -> dict[str, Any]

Send a request by invoking the peer's on_request directly.

Raises:

Type Description
MCPError

The peer's handler raised; REQUEST_TIMEOUT if opts["timeout"] elapsed; CONNECTION_CLOSED if either side has closed.

RuntimeError

Called before run().

Source code in src/mcp/shared/direct_dispatcher.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
async def send_raw_request(
    self,
    method: str,
    params: Mapping[str, Any] | None,
    opts: CallOptions | None = None,
) -> dict[str, Any]:
    """Send a request by invoking the peer's `on_request` directly.

    Raises:
        MCPError: The peer's handler raised; `REQUEST_TIMEOUT` if
            `opts["timeout"]` elapsed; `CONNECTION_CLOSED` if either
            side has closed.
        RuntimeError: Called before `run()`.
    """
    if self._peer is None:
        raise RuntimeError("DirectDispatcher has no peer; use create_direct_dispatcher_pair()")
    # Post-close sends get the same CONNECTION_CLOSED contract as JSONRPCDispatcher.
    if self._closed:
        raise MCPError(code=CONNECTION_CLOSED, message="Connection closed")
    if not self._running:
        raise RuntimeError("DirectDispatcher.send_raw_request called before run()")
    return await self._peer._dispatch_request(method, params, opts)

notify async

notify(
    method: str, params: Mapping[str, Any] | None
) -> None

Send a notification by invoking the peer's on_notify directly.

Fire-and-forget: usable before run() (delivery waits for the peer to start), and after close it is silently dropped, matching JSONRPCDispatcher.notify.

Source code in src/mcp/shared/direct_dispatcher.py
136
137
138
139
140
141
142
143
144
145
146
147
148
async def notify(self, method: str, params: Mapping[str, Any] | None) -> None:
    """Send a notification by invoking the peer's `on_notify` directly.

    Fire-and-forget: usable before `run()` (delivery waits for the peer to
    start), and after close it is silently dropped, matching
    `JSONRPCDispatcher.notify`.
    """
    if self._peer is None:
        raise RuntimeError("DirectDispatcher has no peer; use create_direct_dispatcher_pair()")
    if self._closed:
        logger.debug("dropped notification %r on closed DirectDispatcher", method)
        return
    await self._peer._dispatch_notify(method, params)

run async

run(
    on_request: OnRequest,
    on_notify: OnNotify,
    *,
    task_status: TaskStatus[None] = TASK_STATUS_IGNORED
) -> None

Mark this side ready and park until close() is called.

Single-shot, like JSONRPCDispatcher.run: once it returns the dispatcher stays closed and cannot be restarted.

Source code in src/mcp/shared/direct_dispatcher.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
async def run(
    self,
    on_request: OnRequest,
    on_notify: OnNotify,
    *,
    task_status: anyio.abc.TaskStatus[None] = anyio.TASK_STATUS_IGNORED,
) -> None:
    """Mark this side ready and park until `close()` is called.

    Single-shot, like `JSONRPCDispatcher.run`: once it returns the
    dispatcher stays closed and cannot be restarted.
    """
    try:
        self._on_request = on_request
        self._on_notify = on_notify
        self._running = True
        self._ready.set()
        task_status.started()
        await self._close_event.wait()
    finally:
        self._running = False
        self._closed = True
        # run() may end via cancellation without close() ever being
        # called; setting the event wakes `_wait_ready` waiters so they
        # observe the closed state instead of parking forever.
        self._close_event.set()

create_direct_dispatcher_pair

create_direct_dispatcher_pair(
    *,
    can_send_request: bool = True,
    headers: Mapping[str, str] | None = None
) -> tuple[DirectDispatcher, DirectDispatcher]

Create two DirectDispatcher instances wired to each other.

Parameters:

Name Type Description Default
can_send_request bool

Sets TransportContext.can_send_request on both sides. Pass False to simulate a transport with no back-channel.

True
headers Mapping[str, str] | None

Sets TransportContext.headers on both sides.

None

Returns:

Type Description
DirectDispatcher

A (client, server) pair. The wiring is symmetric, so the roles

DirectDispatcher

are conventional only.

Source code in src/mcp/shared/direct_dispatcher.py
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
def create_direct_dispatcher_pair(
    *,
    can_send_request: bool = True,
    headers: Mapping[str, str] | None = None,
) -> tuple[DirectDispatcher, DirectDispatcher]:
    """Create two `DirectDispatcher` instances wired to each other.

    Args:
        can_send_request: Sets `TransportContext.can_send_request` on both
            sides. Pass `False` to simulate a transport with no back-channel.
        headers: Sets `TransportContext.headers` on both sides.

    Returns:
        A `(client, server)` pair. The wiring is symmetric, so the roles
        are conventional only.
    """
    ctx = TransportContext(kind=DIRECT_TRANSPORT_KIND, can_send_request=can_send_request, headers=headers)
    client = DirectDispatcher(ctx)
    server = DirectDispatcher(ctx)
    client.connect_to(server)
    server.connect_to(client)
    return client, server