Skip to content

resolver

Resolver - An anyio-compatible future-like object for async result passing.

This provides a simple way to pass a result (or exception) from one coroutine to another without depending on asyncio.Future.

Resolver

Bases: Generic[T]

A simple resolver for passing results between coroutines.

Unlike asyncio.Future, this works with any anyio-compatible async backend.

Usage

resolver: Resolver[str] = Resolver()

In one coroutine:

resolver.set_result("hello")

In another coroutine:

result = await resolver.wait() # returns "hello"

Source code in src/mcp/shared/experimental/tasks/resolver.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
class Resolver(Generic[T]):
    """A simple resolver for passing results between coroutines.

    Unlike asyncio.Future, this works with any anyio-compatible async backend.

    Usage:
        resolver: Resolver[str] = Resolver()

        # In one coroutine:
        resolver.set_result("hello")

        # In another coroutine:
        result = await resolver.wait()  # returns "hello"
    """

    def __init__(self) -> None:
        self._event = anyio.Event()
        self._value: T | None = None
        self._exception: BaseException | None = None

    def set_result(self, value: T) -> None:
        """Set the result value and wake up waiters."""
        if self._event.is_set():
            raise RuntimeError("Resolver already completed")
        self._value = value
        self._event.set()

    def set_exception(self, exc: BaseException) -> None:
        """Set an exception and wake up waiters."""
        if self._event.is_set():
            raise RuntimeError("Resolver already completed")
        self._exception = exc
        self._event.set()

    async def wait(self) -> T:
        """Wait for the result and return it, or raise the exception."""
        await self._event.wait()
        if self._exception is not None:
            raise self._exception
        # If we reach here, set_result() was called, so _value is set
        return cast(T, self._value)

    def done(self) -> bool:
        """Return True if the resolver has been completed."""
        return self._event.is_set()

set_result

set_result(value: T) -> None

Set the result value and wake up waiters.

Source code in src/mcp/shared/experimental/tasks/resolver.py
34
35
36
37
38
39
def set_result(self, value: T) -> None:
    """Set the result value and wake up waiters."""
    if self._event.is_set():
        raise RuntimeError("Resolver already completed")
    self._value = value
    self._event.set()

set_exception

set_exception(exc: BaseException) -> None

Set an exception and wake up waiters.

Source code in src/mcp/shared/experimental/tasks/resolver.py
41
42
43
44
45
46
def set_exception(self, exc: BaseException) -> None:
    """Set an exception and wake up waiters."""
    if self._event.is_set():
        raise RuntimeError("Resolver already completed")
    self._exception = exc
    self._event.set()

wait async

wait() -> T

Wait for the result and return it, or raise the exception.

Source code in src/mcp/shared/experimental/tasks/resolver.py
48
49
50
51
52
53
54
async def wait(self) -> T:
    """Wait for the result and return it, or raise the exception."""
    await self._event.wait()
    if self._exception is not None:
        raise self._exception
    # If we reach here, set_result() was called, so _value is set
    return cast(T, self._value)

done

done() -> bool

Return True if the resolver has been completed.

Source code in src/mcp/shared/experimental/tasks/resolver.py
56
57
58
def done(self) -> bool:
    """Return True if the resolver has been completed."""
    return self._event.is_set()