Skip to content

bearer_auth

AuthenticatedUser

Bases: SimpleUser

User with authentication info.

Source code in src/mcp/server/auth/middleware/bearer_auth.py
13
14
15
16
17
18
19
class AuthenticatedUser(SimpleUser):
    """User with authentication info."""

    def __init__(self, auth_info: AccessToken):
        super().__init__(auth_info.client_id)
        self.access_token = auth_info
        self.scopes = auth_info.scopes

authorization_context

authorization_context(
    user: AuthenticatedUser,
) -> AuthorizationContext

Identify the principal user represents, for transports to compare against the principal that created a session. Components the token verifier does not supply are None, so the comparison degrades to the remaining components.

See examples/servers/simple-auth/mcp_simple_auth/token_verifier.py for a verifier that populates subject and claims from an introspection response.

Source code in src/mcp/server/auth/middleware/bearer_auth.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def authorization_context(user: AuthenticatedUser) -> AuthorizationContext:
    """Identify the principal `user` represents, for transports to compare
    against the principal that created a session. Components the token
    verifier does not supply are `None`, so the comparison degrades to the
    remaining components.

    See `examples/servers/simple-auth/mcp_simple_auth/token_verifier.py` for
    a verifier that populates `subject` and `claims` from an introspection
    response."""
    token = user.access_token
    issuer = (token.claims or {}).get("iss")
    return AuthorizationContext(
        client_id=token.client_id,
        issuer=str(issuer) if issuer is not None else None,
        subject=token.subject,
    )

BearerAuthBackend

Bases: AuthenticationBackend

Authentication backend that validates Bearer tokens using a TokenVerifier.

Source code in src/mcp/server/auth/middleware/bearer_auth.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
class BearerAuthBackend(AuthenticationBackend):
    """Authentication backend that validates Bearer tokens using a TokenVerifier."""

    def __init__(self, token_verifier: TokenVerifier):
        self.token_verifier = token_verifier

    async def authenticate(self, conn: HTTPConnection):
        auth_header = next(
            (conn.headers.get(key) for key in conn.headers if key.lower() == "authorization"),
            None,
        )
        if not auth_header or not auth_header.lower().startswith("bearer "):
            return None

        token = auth_header[7:]  # Remove "Bearer " prefix

        # Validate the token with the verifier
        auth_info = await self.token_verifier.verify_token(token)

        if not auth_info:
            return None

        if auth_info.expires_at and auth_info.expires_at < int(time.time()):
            return None

        return AuthCredentials(auth_info.scopes), AuthenticatedUser(auth_info)

RequireAuthMiddleware

Middleware that requires a valid Bearer token in the Authorization header.

This will validate the token with the auth provider and store the resulting auth info in the request state.

Source code in src/mcp/server/auth/middleware/bearer_auth.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
class RequireAuthMiddleware:
    """Middleware that requires a valid Bearer token in the Authorization header.

    This will validate the token with the auth provider and store the resulting
    auth info in the request state.
    """

    def __init__(
        self,
        app: Any,
        required_scopes: list[str],
        resource_metadata_url: AnyHttpUrl | None = None,
    ):
        """Initialize the middleware.

        Args:
            app: ASGI application
            required_scopes: List of scopes that the token must have
            resource_metadata_url: Optional protected resource metadata URL for WWW-Authenticate header
        """
        self.app = app
        self.required_scopes = required_scopes
        self.resource_metadata_url = resource_metadata_url

    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
        auth_user = scope.get("user")
        if not isinstance(auth_user, AuthenticatedUser):
            await self._send_auth_error(
                send, status_code=401, error="invalid_token", description="Authentication required"
            )
            return

        auth_credentials = scope.get("auth")

        for required_scope in self.required_scopes:
            # auth_credentials should always be provided; this is just paranoia
            if auth_credentials is None or required_scope not in auth_credentials.scopes:
                await self._send_auth_error(
                    send, status_code=403, error="insufficient_scope", description=f"Required scope: {required_scope}"
                )
                return

        await self.app(scope, receive, send)

    async def _send_auth_error(self, send: Send, status_code: int, error: str, description: str) -> None:
        """Send an authentication error response with WWW-Authenticate header."""
        # Build WWW-Authenticate header value
        www_auth_parts = [f'error="{error}"', f'error_description="{description}"']
        if self.resource_metadata_url:
            www_auth_parts.append(f'resource_metadata="{self.resource_metadata_url}"')

        www_authenticate = f"Bearer {', '.join(www_auth_parts)}"

        # Send response
        body = {"error": error, "error_description": description}
        body_bytes = json.dumps(body).encode()

        await send(
            {
                "type": "http.response.start",
                "status": status_code,
                "headers": [
                    (b"content-type", b"application/json"),
                    (b"content-length", str(len(body_bytes)).encode()),
                    (b"www-authenticate", www_authenticate.encode()),
                ],
            }
        )

        await send(
            {
                "type": "http.response.body",
                "body": body_bytes,
            }
        )

__init__

__init__(
    app: Any,
    required_scopes: list[str],
    resource_metadata_url: AnyHttpUrl | None = None,
)

Initialize the middleware.

Parameters:

Name Type Description Default
app Any

ASGI application

required
required_scopes list[str]

List of scopes that the token must have

required
resource_metadata_url AnyHttpUrl | None

Optional protected resource metadata URL for WWW-Authenticate header

None
Source code in src/mcp/server/auth/middleware/bearer_auth.py
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def __init__(
    self,
    app: Any,
    required_scopes: list[str],
    resource_metadata_url: AnyHttpUrl | None = None,
):
    """Initialize the middleware.

    Args:
        app: ASGI application
        required_scopes: List of scopes that the token must have
        resource_metadata_url: Optional protected resource metadata URL for WWW-Authenticate header
    """
    self.app = app
    self.required_scopes = required_scopes
    self.resource_metadata_url = resource_metadata_url