Skip to content

bearer_auth

AuthenticatedUser

Bases: SimpleUser

User with authentication info.

Source code in src/mcp/server/auth/middleware/bearer_auth.py
13
14
15
16
17
18
19
class AuthenticatedUser(SimpleUser):
    """User with authentication info."""

    def __init__(self, auth_info: AccessToken):
        super().__init__(auth_info.client_id)
        self.access_token = auth_info
        self.scopes = auth_info.scopes

BearerAuthBackend

Bases: AuthenticationBackend

Authentication backend that validates Bearer tokens using a TokenVerifier.

Source code in src/mcp/server/auth/middleware/bearer_auth.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
class BearerAuthBackend(AuthenticationBackend):
    """Authentication backend that validates Bearer tokens using a TokenVerifier."""

    def __init__(self, token_verifier: TokenVerifier):
        self.token_verifier = token_verifier

    async def authenticate(self, conn: HTTPConnection):
        auth_header = next(
            (conn.headers.get(key) for key in conn.headers if key.lower() == "authorization"),
            None,
        )
        if not auth_header or not auth_header.lower().startswith("bearer "):
            return None

        token = auth_header[7:]  # Remove "Bearer " prefix

        # Validate the token with the verifier
        auth_info = await self.token_verifier.verify_token(token)

        if not auth_info:
            return None

        if auth_info.expires_at and auth_info.expires_at < int(time.time()):
            return None

        return AuthCredentials(auth_info.scopes), AuthenticatedUser(auth_info)

RequireAuthMiddleware

Middleware that requires a valid Bearer token in the Authorization header.

This will validate the token with the auth provider and store the resulting auth info in the request state.

Source code in src/mcp/server/auth/middleware/bearer_auth.py
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
class RequireAuthMiddleware:
    """Middleware that requires a valid Bearer token in the Authorization header.

    This will validate the token with the auth provider and store the resulting
    auth info in the request state.
    """

    def __init__(
        self,
        app: Any,
        required_scopes: list[str],
        resource_metadata_url: AnyHttpUrl | None = None,
    ):
        """Initialize the middleware.

        Args:
            app: ASGI application
            required_scopes: List of scopes that the token must have
            resource_metadata_url: Optional protected resource metadata URL for WWW-Authenticate header
        """
        self.app = app
        self.required_scopes = required_scopes
        self.resource_metadata_url = resource_metadata_url

    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
        auth_user = scope.get("user")
        if not isinstance(auth_user, AuthenticatedUser):
            await self._send_auth_error(
                send, status_code=401, error="invalid_token", description="Authentication required"
            )
            return

        auth_credentials = scope.get("auth")

        for required_scope in self.required_scopes:
            # auth_credentials should always be provided; this is just paranoia
            if auth_credentials is None or required_scope not in auth_credentials.scopes:
                await self._send_auth_error(
                    send, status_code=403, error="insufficient_scope", description=f"Required scope: {required_scope}"
                )
                return

        await self.app(scope, receive, send)

    async def _send_auth_error(self, send: Send, status_code: int, error: str, description: str) -> None:
        """Send an authentication error response with WWW-Authenticate header."""
        # Build WWW-Authenticate header value
        www_auth_parts = [f'error="{error}"', f'error_description="{description}"']
        if self.resource_metadata_url:  # pragma: no cover
            www_auth_parts.append(f'resource_metadata="{self.resource_metadata_url}"')

        www_authenticate = f"Bearer {', '.join(www_auth_parts)}"

        # Send response
        body = {"error": error, "error_description": description}
        body_bytes = json.dumps(body).encode()

        await send(
            {
                "type": "http.response.start",
                "status": status_code,
                "headers": [
                    (b"content-type", b"application/json"),
                    (b"content-length", str(len(body_bytes)).encode()),
                    (b"www-authenticate", www_authenticate.encode()),
                ],
            }
        )

        await send(
            {
                "type": "http.response.body",
                "body": body_bytes,
            }
        )

__init__

__init__(
    app: Any,
    required_scopes: list[str],
    resource_metadata_url: AnyHttpUrl | None = None,
)

Initialize the middleware.

Parameters:

Name Type Description Default
app Any

ASGI application

required
required_scopes list[str]

List of scopes that the token must have

required
resource_metadata_url AnyHttpUrl | None

Optional protected resource metadata URL for WWW-Authenticate header

None
Source code in src/mcp/server/auth/middleware/bearer_auth.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def __init__(
    self,
    app: Any,
    required_scopes: list[str],
    resource_metadata_url: AnyHttpUrl | None = None,
):
    """Initialize the middleware.

    Args:
        app: ASGI application
        required_scopes: List of scopes that the token must have
        resource_metadata_url: Optional protected resource metadata URL for WWW-Authenticate header
    """
    self.app = app
    self.required_scopes = required_scopes
    self.resource_metadata_url = resource_metadata_url