Skip to content

client_auth

ClientAuthenticator

ClientAuthenticator is a callable which validates requests from a client application, used to verify /token calls.

If, during registration, the client requested to be issued a secret, the authenticator asserts that /token calls must be authenticated with that same secret.

NOTE: clients can opt for no authentication during registration, in which case this logic is skipped.

Source code in src/mcp/server/auth/middleware/client_auth.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
class ClientAuthenticator:
    """ClientAuthenticator is a callable which validates requests from a client
    application, used to verify /token calls.

    If, during registration, the client requested to be issued a secret, the
    authenticator asserts that /token calls must be authenticated with
    that same secret.

    NOTE: clients can opt for no authentication during registration, in which case this
    logic is skipped.
    """

    def __init__(self, provider: OAuthAuthorizationServerProvider[Any, Any, Any]):
        """Initialize the authenticator.

        Args:
            provider: Provider to look up client information
        """
        self.provider = provider

    async def authenticate_request(self, request: Request) -> OAuthClientInformationFull:
        """Authenticate a client from an HTTP request.

        Extracts client credentials from the appropriate location based on the
        client's registered authentication method and validates them.

        Args:
            request: The HTTP request containing client credentials

        Returns:
            The authenticated client information

        Raises:
            AuthenticationError: If authentication fails
        """
        form_data = await request.form()
        client_id = form_data.get("client_id")
        if not client_id:
            raise AuthenticationError("Missing client_id")

        client = await self.provider.get_client(str(client_id))
        if not client:
            raise AuthenticationError("Invalid client_id")  # pragma: no cover

        request_client_secret: str | None = None
        auth_header = request.headers.get("Authorization", "")

        if client.token_endpoint_auth_method == "client_secret_basic":
            if not auth_header.startswith("Basic "):
                raise AuthenticationError("Missing or invalid Basic authentication in Authorization header")

            try:
                encoded_credentials = auth_header[6:]  # Remove "Basic " prefix
                decoded = base64.b64decode(encoded_credentials).decode("utf-8")
                if ":" not in decoded:
                    raise ValueError("Invalid Basic auth format")
                basic_client_id, request_client_secret = decoded.split(":", 1)

                # URL-decode both parts per RFC 6749 Section 2.3.1
                basic_client_id = unquote(basic_client_id)
                request_client_secret = unquote(request_client_secret)

                if basic_client_id != client_id:
                    raise AuthenticationError("Client ID mismatch in Basic auth")
            except (ValueError, UnicodeDecodeError, binascii.Error):
                raise AuthenticationError("Invalid Basic authentication header")

        elif client.token_endpoint_auth_method == "client_secret_post":
            raw_form_data = form_data.get("client_secret")
            # form_data.get() can return an UploadFile or None, so we need to check if it's a string
            if isinstance(raw_form_data, str):
                request_client_secret = str(raw_form_data)

        elif client.token_endpoint_auth_method == "none":
            request_client_secret = None
        else:
            raise AuthenticationError(  # pragma: no cover
                f"Unsupported auth method: {client.token_endpoint_auth_method}"
            )

        # If client from the store expects a secret, validate that the request provides
        # that secret
        if client.client_secret:
            if not request_client_secret:
                raise AuthenticationError("Client secret is required")

            # hmac.compare_digest requires that both arguments are either bytes or a `str` containing
            # only ASCII characters. Since we do not control `request_client_secret`, we encode both
            # arguments to bytes.
            if not hmac.compare_digest(client.client_secret.encode(), request_client_secret.encode()):
                raise AuthenticationError("Invalid client_secret")

            if client.client_secret_expires_at and client.client_secret_expires_at < int(time.time()):
                raise AuthenticationError("Client secret has expired")  # pragma: no cover

        return client

__init__

__init__(
    provider: OAuthAuthorizationServerProvider[
        Any, Any, Any
    ],
)

Initialize the authenticator.

Parameters:

Name Type Description Default
provider OAuthAuthorizationServerProvider[Any, Any, Any]

Provider to look up client information

required
Source code in src/mcp/server/auth/middleware/client_auth.py
31
32
33
34
35
36
37
def __init__(self, provider: OAuthAuthorizationServerProvider[Any, Any, Any]):
    """Initialize the authenticator.

    Args:
        provider: Provider to look up client information
    """
    self.provider = provider

authenticate_request async

authenticate_request(
    request: Request,
) -> OAuthClientInformationFull

Authenticate a client from an HTTP request.

Extracts client credentials from the appropriate location based on the client's registered authentication method and validates them.

Parameters:

Name Type Description Default
request Request

The HTTP request containing client credentials

required

Returns:

Type Description
OAuthClientInformationFull

The authenticated client information

Raises:

Type Description
AuthenticationError

If authentication fails

Source code in src/mcp/server/auth/middleware/client_auth.py
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
async def authenticate_request(self, request: Request) -> OAuthClientInformationFull:
    """Authenticate a client from an HTTP request.

    Extracts client credentials from the appropriate location based on the
    client's registered authentication method and validates them.

    Args:
        request: The HTTP request containing client credentials

    Returns:
        The authenticated client information

    Raises:
        AuthenticationError: If authentication fails
    """
    form_data = await request.form()
    client_id = form_data.get("client_id")
    if not client_id:
        raise AuthenticationError("Missing client_id")

    client = await self.provider.get_client(str(client_id))
    if not client:
        raise AuthenticationError("Invalid client_id")  # pragma: no cover

    request_client_secret: str | None = None
    auth_header = request.headers.get("Authorization", "")

    if client.token_endpoint_auth_method == "client_secret_basic":
        if not auth_header.startswith("Basic "):
            raise AuthenticationError("Missing or invalid Basic authentication in Authorization header")

        try:
            encoded_credentials = auth_header[6:]  # Remove "Basic " prefix
            decoded = base64.b64decode(encoded_credentials).decode("utf-8")
            if ":" not in decoded:
                raise ValueError("Invalid Basic auth format")
            basic_client_id, request_client_secret = decoded.split(":", 1)

            # URL-decode both parts per RFC 6749 Section 2.3.1
            basic_client_id = unquote(basic_client_id)
            request_client_secret = unquote(request_client_secret)

            if basic_client_id != client_id:
                raise AuthenticationError("Client ID mismatch in Basic auth")
        except (ValueError, UnicodeDecodeError, binascii.Error):
            raise AuthenticationError("Invalid Basic authentication header")

    elif client.token_endpoint_auth_method == "client_secret_post":
        raw_form_data = form_data.get("client_secret")
        # form_data.get() can return an UploadFile or None, so we need to check if it's a string
        if isinstance(raw_form_data, str):
            request_client_secret = str(raw_form_data)

    elif client.token_endpoint_auth_method == "none":
        request_client_secret = None
    else:
        raise AuthenticationError(  # pragma: no cover
            f"Unsupported auth method: {client.token_endpoint_auth_method}"
        )

    # If client from the store expects a secret, validate that the request provides
    # that secret
    if client.client_secret:
        if not request_client_secret:
            raise AuthenticationError("Client secret is required")

        # hmac.compare_digest requires that both arguments are either bytes or a `str` containing
        # only ASCII characters. Since we do not control `request_client_secret`, we encode both
        # arguments to bytes.
        if not hmac.compare_digest(client.client_secret.encode(), request_client_secret.encode()):
            raise AuthenticationError("Invalid client_secret")

        if client.client_secret_expires_at and client.client_secret_expires_at < int(time.time()):
            raise AuthenticationError("Client secret has expired")  # pragma: no cover

    return client