Skip to content

revoke

RevocationRequest

Bases: BaseModel

See https://datatracker.ietf.org/doc/html/rfc7009#section-2.1

Source code in src/mcp/server/auth/handlers/revoke.py
17
18
19
20
21
22
23
class RevocationRequest(BaseModel):
    """See https://datatracker.ietf.org/doc/html/rfc7009#section-2.1"""

    token: str
    token_type_hint: Literal["access_token", "refresh_token"] | None = None
    client_id: str
    client_secret: str | None

RevocationHandler dataclass

Source code in src/mcp/server/auth/handlers/revoke.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
@dataclass
class RevocationHandler:
    provider: OAuthAuthorizationServerProvider[Any, Any, Any]
    client_authenticator: ClientAuthenticator

    async def handle(self, request: Request) -> Response:
        """Handler for the OAuth 2.0 Token Revocation endpoint."""
        try:
            client = await self.client_authenticator.authenticate_request(request)
        except AuthenticationError as e:  # pragma: no cover
            return PydanticJSONResponse(
                status_code=401,
                content=RevocationErrorResponse(
                    error="unauthorized_client",
                    error_description=e.message,
                ),
            )

        try:
            form_data = await request.form()
            revocation_request = RevocationRequest.model_validate(dict(form_data))
        except ValidationError as e:
            return PydanticJSONResponse(
                status_code=400,
                content=RevocationErrorResponse(
                    error="invalid_request",
                    error_description=stringify_pydantic_error(e),
                ),
            )

        loaders = [
            self.provider.load_access_token,
            partial(self.provider.load_refresh_token, client),
        ]
        if revocation_request.token_type_hint == "refresh_token":  # pragma: no cover
            loaders = reversed(loaders)

        token: None | AccessToken | RefreshToken = None
        for loader in loaders:
            token = await loader(revocation_request.token)
            if token is not None:
                break

        # if token is not found, just return HTTP 200 per the RFC
        if token and token.client_id == client.client_id:
            # Revoke token; provider is not meant to be able to do validation
            # at this point that would result in an error
            await self.provider.revoke_token(token)

        # Return successful empty response
        return Response(
            status_code=200,
            headers={
                "Cache-Control": "no-store",
                "Pragma": "no-cache",
            },
        )

handle async

handle(request: Request) -> Response

Handler for the OAuth 2.0 Token Revocation endpoint.

Source code in src/mcp/server/auth/handlers/revoke.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
async def handle(self, request: Request) -> Response:
    """Handler for the OAuth 2.0 Token Revocation endpoint."""
    try:
        client = await self.client_authenticator.authenticate_request(request)
    except AuthenticationError as e:  # pragma: no cover
        return PydanticJSONResponse(
            status_code=401,
            content=RevocationErrorResponse(
                error="unauthorized_client",
                error_description=e.message,
            ),
        )

    try:
        form_data = await request.form()
        revocation_request = RevocationRequest.model_validate(dict(form_data))
    except ValidationError as e:
        return PydanticJSONResponse(
            status_code=400,
            content=RevocationErrorResponse(
                error="invalid_request",
                error_description=stringify_pydantic_error(e),
            ),
        )

    loaders = [
        self.provider.load_access_token,
        partial(self.provider.load_refresh_token, client),
    ]
    if revocation_request.token_type_hint == "refresh_token":  # pragma: no cover
        loaders = reversed(loaders)

    token: None | AccessToken | RefreshToken = None
    for loader in loaders:
        token = await loader(revocation_request.token)
        if token is not None:
            break

    # if token is not found, just return HTTP 200 per the RFC
    if token and token.client_id == client.client_id:
        # Revoke token; provider is not meant to be able to do validation
        # at this point that would result in an error
        await self.provider.revoke_token(token)

    # Return successful empty response
    return Response(
        status_code=200,
        headers={
            "Cache-Control": "no-store",
            "Pragma": "no-cache",
        },
    )