Skip to content

Index

OAuth2 Authentication implementation for HTTPX.

Implements authorization code flow with PKCE and automatic token refresh.

OAuthFlowError

Bases: Exception

Base exception for OAuth flow errors.

Source code in src/mcp/client/auth/exceptions.py
1
2
class OAuthFlowError(Exception):
    """Base exception for OAuth flow errors."""

OAuthRegistrationError

Bases: OAuthFlowError

Raised when client registration fails.

Source code in src/mcp/client/auth/exceptions.py
 9
10
class OAuthRegistrationError(OAuthFlowError):
    """Raised when client registration fails."""

OAuthTokenError

Bases: OAuthFlowError

Raised when token operations fail.

Source code in src/mcp/client/auth/exceptions.py
5
6
class OAuthTokenError(OAuthFlowError):
    """Raised when token operations fail."""

OAuthClientProvider

Bases: Auth

OAuth2 authentication for httpx.

Handles OAuth flow with automatic client registration and token storage.

Source code in src/mcp/client/auth/oauth2.py
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
class OAuthClientProvider(httpx.Auth):
    """OAuth2 authentication for httpx.

    Handles OAuth flow with automatic client registration and token storage.
    """

    requires_response_body = True

    def __init__(
        self,
        server_url: str,
        client_metadata: OAuthClientMetadata,
        storage: TokenStorage,
        redirect_handler: Callable[[str], Awaitable[None]] | None = None,
        callback_handler: Callable[[], Awaitable[tuple[str, str | None]]] | None = None,
        timeout: float = 300.0,
        client_metadata_url: str | None = None,
        validate_resource_url: Callable[[str, str | None], Awaitable[None]] | None = None,
    ):
        """Initialize OAuth2 authentication.

        Args:
            server_url: The MCP server URL.
            client_metadata: OAuth client metadata for registration.
            storage: Token storage implementation.
            redirect_handler: Handler for authorization redirects.
            callback_handler: Handler for authorization callbacks.
            timeout: Timeout for the OAuth flow.
            client_metadata_url: URL-based client ID. When provided and the server
                advertises client_id_metadata_document_supported=True, this URL will be
                used as the client_id instead of performing dynamic client registration.
                Must be a valid HTTPS URL with a non-root pathname.
            validate_resource_url: Optional callback to override resource URL validation.
                Called with (server_url, prm_resource) where prm_resource is the resource
                from Protected Resource Metadata (or None if not present). If not provided,
                default validation rejects mismatched resources per RFC 8707.

        Raises:
            ValueError: If client_metadata_url is provided but not a valid HTTPS URL
                with a non-root pathname.
        """
        # Validate client_metadata_url if provided
        if client_metadata_url is not None and not is_valid_client_metadata_url(client_metadata_url):
            raise ValueError(
                f"client_metadata_url must be a valid HTTPS URL with a non-root pathname, got: {client_metadata_url}"
            )

        self.context = OAuthContext(
            server_url=server_url,
            client_metadata=client_metadata,
            storage=storage,
            redirect_handler=redirect_handler,
            callback_handler=callback_handler,
            timeout=timeout,
            client_metadata_url=client_metadata_url,
        )
        self._validate_resource_url_callback = validate_resource_url
        self._initialized = False

    async def _handle_protected_resource_response(self, response: httpx.Response) -> bool:
        """Handle protected resource metadata discovery response.

        Per SEP-985, supports fallback when discovery fails at one URL.

        Returns:
            True if metadata was successfully discovered, False if we should try next URL
        """
        if response.status_code == 200:
            try:
                content = await response.aread()
                metadata = ProtectedResourceMetadata.model_validate_json(content)
                self.context.protected_resource_metadata = metadata
                if metadata.authorization_servers:  # pragma: no branch
                    self.context.auth_server_url = str(metadata.authorization_servers[0])
                return True

            except ValidationError:  # pragma: no cover
                # Invalid metadata - try next URL
                logger.warning(f"Invalid protected resource metadata at {response.request.url}")
                return False
        elif response.status_code == 404:  # pragma: no cover
            # Not found - try next URL in fallback chain
            logger.debug(f"Protected resource metadata not found at {response.request.url}, trying next URL")
            return False
        else:
            # Other error - fail immediately
            raise OAuthFlowError(
                f"Protected Resource Metadata request failed: {response.status_code}"
            )  # pragma: no cover

    async def _perform_authorization(self) -> httpx.Request:
        """Perform the authorization flow."""
        auth_code, code_verifier = await self._perform_authorization_code_grant()
        token_request = await self._exchange_token_authorization_code(auth_code, code_verifier)
        return token_request

    async def _perform_authorization_code_grant(self) -> tuple[str, str]:
        """Perform the authorization redirect and get auth code."""
        if self.context.client_metadata.redirect_uris is None:
            raise OAuthFlowError("No redirect URIs provided for authorization code grant")  # pragma: no cover
        if not self.context.redirect_handler:
            raise OAuthFlowError("No redirect handler provided for authorization code grant")  # pragma: no cover
        if not self.context.callback_handler:
            raise OAuthFlowError("No callback handler provided for authorization code grant")  # pragma: no cover

        if self.context.oauth_metadata and self.context.oauth_metadata.authorization_endpoint:
            auth_endpoint = str(self.context.oauth_metadata.authorization_endpoint)
        else:
            auth_base_url = self.context.get_authorization_base_url(self.context.server_url)
            auth_endpoint = urljoin(auth_base_url, "/authorize")

        if not self.context.client_info:
            raise OAuthFlowError("No client info available for authorization")  # pragma: no cover

        # Generate PKCE parameters
        pkce_params = PKCEParameters.generate()
        state = secrets.token_urlsafe(32)

        auth_params = {
            "response_type": "code",
            "client_id": self.context.client_info.client_id,
            "redirect_uri": str(self.context.client_metadata.redirect_uris[0]),
            "state": state,
            "code_challenge": pkce_params.code_challenge,
            "code_challenge_method": "S256",
        }

        # Only include resource param if conditions are met
        if self.context.should_include_resource_param(self.context.protocol_version):
            auth_params["resource"] = self.context.get_resource_url()  # RFC 8707

        if self.context.client_metadata.scope:  # pragma: no branch
            auth_params["scope"] = self.context.client_metadata.scope

            # OIDC requires prompt=consent when offline_access is requested
            # https://openid.net/specs/openid-connect-core-1_0.html#OfflineAccess
            if "offline_access" in self.context.client_metadata.scope.split():
                auth_params["prompt"] = "consent"

        authorization_url = f"{auth_endpoint}?{urlencode(auth_params)}"
        await self.context.redirect_handler(authorization_url)

        # Wait for callback
        auth_code, returned_state = await self.context.callback_handler()

        if returned_state is None or not secrets.compare_digest(returned_state, state):
            raise OAuthFlowError(f"State parameter mismatch: {returned_state} != {state}")  # pragma: no cover

        if not auth_code:
            raise OAuthFlowError("No authorization code received")  # pragma: no cover

        # Return auth code and code verifier for token exchange
        return auth_code, pkce_params.code_verifier

    def _get_token_endpoint(self) -> str:
        if self.context.oauth_metadata and self.context.oauth_metadata.token_endpoint:
            token_url = str(self.context.oauth_metadata.token_endpoint)
        else:
            auth_base_url = self.context.get_authorization_base_url(self.context.server_url)
            token_url = urljoin(auth_base_url, "/token")
        return token_url

    async def _exchange_token_authorization_code(
        self, auth_code: str, code_verifier: str, *, token_data: dict[str, Any] | None = {}
    ) -> httpx.Request:
        """Build token exchange request for authorization_code flow."""
        if self.context.client_metadata.redirect_uris is None:
            raise OAuthFlowError("No redirect URIs provided for authorization code grant")  # pragma: no cover
        if not self.context.client_info:
            raise OAuthFlowError("Missing client info")  # pragma: no cover

        token_url = self._get_token_endpoint()
        token_data = token_data or {}
        token_data.update(
            {
                "grant_type": "authorization_code",
                "code": auth_code,
                "redirect_uri": str(self.context.client_metadata.redirect_uris[0]),
                "client_id": self.context.client_info.client_id,
                "code_verifier": code_verifier,
            }
        )

        # Only include resource param if conditions are met
        if self.context.should_include_resource_param(self.context.protocol_version):
            token_data["resource"] = self.context.get_resource_url()  # RFC 8707

        # Prepare authentication based on preferred method
        headers = {"Content-Type": "application/x-www-form-urlencoded"}
        token_data, headers = self.context.prepare_token_auth(token_data, headers)

        return httpx.Request("POST", token_url, data=token_data, headers=headers)

    async def _handle_token_response(self, response: httpx.Response) -> None:
        """Handle token exchange response."""
        if response.status_code not in {200, 201}:
            body = await response.aread()  # pragma: no cover
            body_text = body.decode("utf-8")  # pragma: no cover
            raise OAuthTokenError(f"Token exchange failed ({response.status_code}): {body_text}")  # pragma: no cover

        # Parse and validate response with scope validation
        token_response = await handle_token_response_scopes(response)

        # Store tokens in context
        self.context.current_tokens = token_response
        self.context.update_token_expiry(token_response)
        await self.context.storage.set_tokens(token_response)

    async def _refresh_token(self) -> httpx.Request:
        """Build token refresh request."""
        if not self.context.current_tokens or not self.context.current_tokens.refresh_token:
            raise OAuthTokenError("No refresh token available")  # pragma: no cover

        if not self.context.client_info or not self.context.client_info.client_id:
            raise OAuthTokenError("No client info available")  # pragma: no cover

        if self.context.oauth_metadata and self.context.oauth_metadata.token_endpoint:
            token_url = str(self.context.oauth_metadata.token_endpoint)
        else:
            auth_base_url = self.context.get_authorization_base_url(self.context.server_url)
            token_url = urljoin(auth_base_url, "/token")

        refresh_data: dict[str, str] = {
            "grant_type": "refresh_token",
            "refresh_token": self.context.current_tokens.refresh_token,
            "client_id": self.context.client_info.client_id,
        }

        # Only include resource param if conditions are met
        if self.context.should_include_resource_param(self.context.protocol_version):
            refresh_data["resource"] = self.context.get_resource_url()  # RFC 8707

        # Prepare authentication based on preferred method
        headers = {"Content-Type": "application/x-www-form-urlencoded"}
        refresh_data, headers = self.context.prepare_token_auth(refresh_data, headers)

        return httpx.Request("POST", token_url, data=refresh_data, headers=headers)

    async def _handle_refresh_response(self, response: httpx.Response) -> bool:  # pragma: no cover
        """Handle token refresh response. Returns True if successful."""
        if response.status_code != 200:
            logger.warning(f"Token refresh failed: {response.status_code}")
            self.context.clear_tokens()
            return False

        try:
            content = await response.aread()
            token_response = OAuthToken.model_validate_json(content)

            self.context.current_tokens = token_response
            self.context.update_token_expiry(token_response)
            await self.context.storage.set_tokens(token_response)

            return True
        except ValidationError:
            logger.exception("Invalid refresh response")
            self.context.clear_tokens()
            return False

    async def _initialize(self) -> None:  # pragma: no cover
        """Load stored tokens and client info."""
        self.context.current_tokens = await self.context.storage.get_tokens()
        self.context.client_info = await self.context.storage.get_client_info()
        self._initialized = True

    def _add_auth_header(self, request: httpx.Request) -> None:
        """Add authorization header to request if we have valid tokens."""
        if self.context.current_tokens and self.context.current_tokens.access_token:  # pragma: no branch
            request.headers["Authorization"] = f"Bearer {self.context.current_tokens.access_token}"

    async def _handle_oauth_metadata_response(self, response: httpx.Response) -> None:
        content = await response.aread()
        metadata = OAuthMetadata.model_validate_json(content)
        self.context.oauth_metadata = metadata

    async def _validate_resource_match(self, prm: ProtectedResourceMetadata) -> None:
        """Validate that PRM resource matches the server URL per RFC 8707."""
        prm_resource = str(prm.resource) if prm.resource else None

        if self._validate_resource_url_callback is not None:
            await self._validate_resource_url_callback(self.context.server_url, prm_resource)
            return

        if not prm_resource:
            return  # pragma: no cover
        default_resource = resource_url_from_server_url(self.context.server_url)
        if not check_resource_allowed(requested_resource=default_resource, configured_resource=prm_resource):
            raise OAuthFlowError(f"Protected resource {prm_resource} does not match expected {default_resource}")

    async def async_auth_flow(self, request: httpx.Request) -> AsyncGenerator[httpx.Request, httpx.Response]:
        """HTTPX auth flow integration."""
        async with self.context.lock:
            if not self._initialized:
                await self._initialize()  # pragma: no cover

            # Capture protocol version from request headers
            self.context.protocol_version = request.headers.get(MCP_PROTOCOL_VERSION)

            if not self.context.is_token_valid() and self.context.can_refresh_token():
                # Try to refresh token
                refresh_request = await self._refresh_token()  # pragma: no cover
                refresh_response = yield refresh_request  # pragma: no cover

                if not await self._handle_refresh_response(refresh_response):  # pragma: no cover
                    # Refresh failed, need full re-authentication
                    self._initialized = False

            if self.context.is_token_valid():
                self._add_auth_header(request)

            response = yield request

            if response.status_code == 401:
                # Perform full OAuth flow
                try:
                    # OAuth flow must be inline due to generator constraints
                    www_auth_resource_metadata_url = extract_resource_metadata_from_www_auth(response)

                    # Step 1: Discover protected resource metadata (SEP-985 with fallback support)
                    prm_discovery_urls = build_protected_resource_metadata_discovery_urls(
                        www_auth_resource_metadata_url, self.context.server_url
                    )

                    for url in prm_discovery_urls:  # pragma: no branch
                        discovery_request = create_oauth_metadata_request(url)

                        discovery_response = yield discovery_request  # sending request

                        prm = await handle_protected_resource_response(discovery_response)
                        if prm:
                            # Validate PRM resource matches server URL (RFC 8707)
                            await self._validate_resource_match(prm)
                            self.context.protected_resource_metadata = prm

                            # todo: try all authorization_servers to find the OASM
                            assert (
                                len(prm.authorization_servers) > 0
                            )  # this is always true as authorization_servers has a min length of 1

                            self.context.auth_server_url = str(prm.authorization_servers[0])
                            break
                        else:
                            logger.debug(f"Protected resource metadata discovery failed: {url}")

                    asm_discovery_urls = build_oauth_authorization_server_metadata_discovery_urls(
                        self.context.auth_server_url, self.context.server_url
                    )

                    # Step 2: Discover OAuth Authorization Server Metadata (OASM) (with fallback for legacy servers)
                    for url in asm_discovery_urls:  # pragma: no branch
                        oauth_metadata_request = create_oauth_metadata_request(url)
                        oauth_metadata_response = yield oauth_metadata_request

                        ok, asm = await handle_auth_metadata_response(oauth_metadata_response)
                        if not ok:
                            break
                        if ok and asm:
                            self.context.oauth_metadata = asm
                            break
                        else:
                            logger.debug(f"OAuth metadata discovery failed: {url}")

                    # Step 3: Apply scope selection strategy
                    self.context.client_metadata.scope = get_client_metadata_scopes(
                        extract_scope_from_www_auth(response),
                        self.context.protected_resource_metadata,
                        self.context.oauth_metadata,
                        self.context.client_metadata.grant_types,
                    )

                    # Step 4: Register client or use URL-based client ID (CIMD)
                    if not self.context.client_info:
                        if should_use_client_metadata_url(
                            self.context.oauth_metadata, self.context.client_metadata_url
                        ):
                            # Use URL-based client ID (CIMD)
                            logger.debug(f"Using URL-based client ID (CIMD): {self.context.client_metadata_url}")
                            client_information = create_client_info_from_metadata_url(
                                self.context.client_metadata_url,  # type: ignore[arg-type]
                                redirect_uris=self.context.client_metadata.redirect_uris,
                            )
                            self.context.client_info = client_information
                            await self.context.storage.set_client_info(client_information)
                        else:
                            # Fallback to Dynamic Client Registration
                            registration_request = create_client_registration_request(
                                self.context.oauth_metadata,
                                self.context.client_metadata,
                                self.context.get_authorization_base_url(self.context.server_url),
                            )
                            registration_response = yield registration_request
                            client_information = await handle_registration_response(registration_response)
                            self.context.client_info = client_information
                            await self.context.storage.set_client_info(client_information)

                    # Step 5: Perform authorization and complete token exchange
                    token_response = yield await self._perform_authorization()
                    await self._handle_token_response(token_response)
                except Exception:  # pragma: no cover
                    logger.exception("OAuth flow error")
                    raise

                # Retry with new tokens
                self._add_auth_header(request)
                yield request
            elif response.status_code == 403:
                # Step 1: Extract error field from WWW-Authenticate header
                error = extract_field_from_www_auth(response, "error")

                # Step 2: Check if we need to step-up authorization
                if error == "insufficient_scope":  # pragma: no branch
                    try:
                        # Step 2a: Update the required scopes
                        self.context.client_metadata.scope = get_client_metadata_scopes(
                            extract_scope_from_www_auth(response),
                            self.context.protected_resource_metadata,
                            self.context.oauth_metadata,
                            self.context.client_metadata.grant_types,
                        )

                        # Step 2b: Perform (re-)authorization and token exchange
                        token_response = yield await self._perform_authorization()
                        await self._handle_token_response(token_response)
                    except Exception:  # pragma: no cover
                        logger.exception("OAuth flow error")
                        raise

                # Retry with new tokens
                self._add_auth_header(request)
                yield request

__init__

__init__(
    server_url: str,
    client_metadata: OAuthClientMetadata,
    storage: TokenStorage,
    redirect_handler: (
        Callable[[str], Awaitable[None]] | None
    ) = None,
    callback_handler: (
        Callable[[], Awaitable[tuple[str, str | None]]]
        | None
    ) = None,
    timeout: float = 300.0,
    client_metadata_url: str | None = None,
    validate_resource_url: (
        Callable[[str, str | None], Awaitable[None]] | None
    ) = None,
)

Initialize OAuth2 authentication.

Parameters:

Name Type Description Default
server_url str

The MCP server URL.

required
client_metadata OAuthClientMetadata

OAuth client metadata for registration.

required
storage TokenStorage

Token storage implementation.

required
redirect_handler Callable[[str], Awaitable[None]] | None

Handler for authorization redirects.

None
callback_handler Callable[[], Awaitable[tuple[str, str | None]]] | None

Handler for authorization callbacks.

None
timeout float

Timeout for the OAuth flow.

300.0
client_metadata_url str | None

URL-based client ID. When provided and the server advertises client_id_metadata_document_supported=True, this URL will be used as the client_id instead of performing dynamic client registration. Must be a valid HTTPS URL with a non-root pathname.

None
validate_resource_url Callable[[str, str | None], Awaitable[None]] | None

Optional callback to override resource URL validation. Called with (server_url, prm_resource) where prm_resource is the resource from Protected Resource Metadata (or None if not present). If not provided, default validation rejects mismatched resources per RFC 8707.

None

Raises:

Type Description
ValueError

If client_metadata_url is provided but not a valid HTTPS URL with a non-root pathname.

Source code in src/mcp/client/auth/oauth2.py
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
def __init__(
    self,
    server_url: str,
    client_metadata: OAuthClientMetadata,
    storage: TokenStorage,
    redirect_handler: Callable[[str], Awaitable[None]] | None = None,
    callback_handler: Callable[[], Awaitable[tuple[str, str | None]]] | None = None,
    timeout: float = 300.0,
    client_metadata_url: str | None = None,
    validate_resource_url: Callable[[str, str | None], Awaitable[None]] | None = None,
):
    """Initialize OAuth2 authentication.

    Args:
        server_url: The MCP server URL.
        client_metadata: OAuth client metadata for registration.
        storage: Token storage implementation.
        redirect_handler: Handler for authorization redirects.
        callback_handler: Handler for authorization callbacks.
        timeout: Timeout for the OAuth flow.
        client_metadata_url: URL-based client ID. When provided and the server
            advertises client_id_metadata_document_supported=True, this URL will be
            used as the client_id instead of performing dynamic client registration.
            Must be a valid HTTPS URL with a non-root pathname.
        validate_resource_url: Optional callback to override resource URL validation.
            Called with (server_url, prm_resource) where prm_resource is the resource
            from Protected Resource Metadata (or None if not present). If not provided,
            default validation rejects mismatched resources per RFC 8707.

    Raises:
        ValueError: If client_metadata_url is provided but not a valid HTTPS URL
            with a non-root pathname.
    """
    # Validate client_metadata_url if provided
    if client_metadata_url is not None and not is_valid_client_metadata_url(client_metadata_url):
        raise ValueError(
            f"client_metadata_url must be a valid HTTPS URL with a non-root pathname, got: {client_metadata_url}"
        )

    self.context = OAuthContext(
        server_url=server_url,
        client_metadata=client_metadata,
        storage=storage,
        redirect_handler=redirect_handler,
        callback_handler=callback_handler,
        timeout=timeout,
        client_metadata_url=client_metadata_url,
    )
    self._validate_resource_url_callback = validate_resource_url
    self._initialized = False

async_auth_flow async

async_auth_flow(
    request: Request,
) -> AsyncGenerator[Request, Response]

HTTPX auth flow integration.

Source code in src/mcp/client/auth/oauth2.py
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
async def async_auth_flow(self, request: httpx.Request) -> AsyncGenerator[httpx.Request, httpx.Response]:
    """HTTPX auth flow integration."""
    async with self.context.lock:
        if not self._initialized:
            await self._initialize()  # pragma: no cover

        # Capture protocol version from request headers
        self.context.protocol_version = request.headers.get(MCP_PROTOCOL_VERSION)

        if not self.context.is_token_valid() and self.context.can_refresh_token():
            # Try to refresh token
            refresh_request = await self._refresh_token()  # pragma: no cover
            refresh_response = yield refresh_request  # pragma: no cover

            if not await self._handle_refresh_response(refresh_response):  # pragma: no cover
                # Refresh failed, need full re-authentication
                self._initialized = False

        if self.context.is_token_valid():
            self._add_auth_header(request)

        response = yield request

        if response.status_code == 401:
            # Perform full OAuth flow
            try:
                # OAuth flow must be inline due to generator constraints
                www_auth_resource_metadata_url = extract_resource_metadata_from_www_auth(response)

                # Step 1: Discover protected resource metadata (SEP-985 with fallback support)
                prm_discovery_urls = build_protected_resource_metadata_discovery_urls(
                    www_auth_resource_metadata_url, self.context.server_url
                )

                for url in prm_discovery_urls:  # pragma: no branch
                    discovery_request = create_oauth_metadata_request(url)

                    discovery_response = yield discovery_request  # sending request

                    prm = await handle_protected_resource_response(discovery_response)
                    if prm:
                        # Validate PRM resource matches server URL (RFC 8707)
                        await self._validate_resource_match(prm)
                        self.context.protected_resource_metadata = prm

                        # todo: try all authorization_servers to find the OASM
                        assert (
                            len(prm.authorization_servers) > 0
                        )  # this is always true as authorization_servers has a min length of 1

                        self.context.auth_server_url = str(prm.authorization_servers[0])
                        break
                    else:
                        logger.debug(f"Protected resource metadata discovery failed: {url}")

                asm_discovery_urls = build_oauth_authorization_server_metadata_discovery_urls(
                    self.context.auth_server_url, self.context.server_url
                )

                # Step 2: Discover OAuth Authorization Server Metadata (OASM) (with fallback for legacy servers)
                for url in asm_discovery_urls:  # pragma: no branch
                    oauth_metadata_request = create_oauth_metadata_request(url)
                    oauth_metadata_response = yield oauth_metadata_request

                    ok, asm = await handle_auth_metadata_response(oauth_metadata_response)
                    if not ok:
                        break
                    if ok and asm:
                        self.context.oauth_metadata = asm
                        break
                    else:
                        logger.debug(f"OAuth metadata discovery failed: {url}")

                # Step 3: Apply scope selection strategy
                self.context.client_metadata.scope = get_client_metadata_scopes(
                    extract_scope_from_www_auth(response),
                    self.context.protected_resource_metadata,
                    self.context.oauth_metadata,
                    self.context.client_metadata.grant_types,
                )

                # Step 4: Register client or use URL-based client ID (CIMD)
                if not self.context.client_info:
                    if should_use_client_metadata_url(
                        self.context.oauth_metadata, self.context.client_metadata_url
                    ):
                        # Use URL-based client ID (CIMD)
                        logger.debug(f"Using URL-based client ID (CIMD): {self.context.client_metadata_url}")
                        client_information = create_client_info_from_metadata_url(
                            self.context.client_metadata_url,  # type: ignore[arg-type]
                            redirect_uris=self.context.client_metadata.redirect_uris,
                        )
                        self.context.client_info = client_information
                        await self.context.storage.set_client_info(client_information)
                    else:
                        # Fallback to Dynamic Client Registration
                        registration_request = create_client_registration_request(
                            self.context.oauth_metadata,
                            self.context.client_metadata,
                            self.context.get_authorization_base_url(self.context.server_url),
                        )
                        registration_response = yield registration_request
                        client_information = await handle_registration_response(registration_response)
                        self.context.client_info = client_information
                        await self.context.storage.set_client_info(client_information)

                # Step 5: Perform authorization and complete token exchange
                token_response = yield await self._perform_authorization()
                await self._handle_token_response(token_response)
            except Exception:  # pragma: no cover
                logger.exception("OAuth flow error")
                raise

            # Retry with new tokens
            self._add_auth_header(request)
            yield request
        elif response.status_code == 403:
            # Step 1: Extract error field from WWW-Authenticate header
            error = extract_field_from_www_auth(response, "error")

            # Step 2: Check if we need to step-up authorization
            if error == "insufficient_scope":  # pragma: no branch
                try:
                    # Step 2a: Update the required scopes
                    self.context.client_metadata.scope = get_client_metadata_scopes(
                        extract_scope_from_www_auth(response),
                        self.context.protected_resource_metadata,
                        self.context.oauth_metadata,
                        self.context.client_metadata.grant_types,
                    )

                    # Step 2b: Perform (re-)authorization and token exchange
                    token_response = yield await self._perform_authorization()
                    await self._handle_token_response(token_response)
                except Exception:  # pragma: no cover
                    logger.exception("OAuth flow error")
                    raise

            # Retry with new tokens
            self._add_auth_header(request)
            yield request

PKCEParameters

Bases: BaseModel

PKCE (Proof Key for Code Exchange) parameters.

Source code in src/mcp/client/auth/oauth2.py
56
57
58
59
60
61
62
63
64
65
66
67
68
class PKCEParameters(BaseModel):
    """PKCE (Proof Key for Code Exchange) parameters."""

    code_verifier: str = Field(..., min_length=43, max_length=128)
    code_challenge: str = Field(..., min_length=43, max_length=128)

    @classmethod
    def generate(cls) -> "PKCEParameters":
        """Generate new PKCE parameters."""
        code_verifier = "".join(secrets.choice(string.ascii_letters + string.digits + "-._~") for _ in range(128))
        digest = hashlib.sha256(code_verifier.encode()).digest()
        code_challenge = base64.urlsafe_b64encode(digest).decode().rstrip("=")
        return cls(code_verifier=code_verifier, code_challenge=code_challenge)

generate classmethod

generate() -> PKCEParameters

Generate new PKCE parameters.

Source code in src/mcp/client/auth/oauth2.py
62
63
64
65
66
67
68
@classmethod
def generate(cls) -> "PKCEParameters":
    """Generate new PKCE parameters."""
    code_verifier = "".join(secrets.choice(string.ascii_letters + string.digits + "-._~") for _ in range(128))
    digest = hashlib.sha256(code_verifier.encode()).digest()
    code_challenge = base64.urlsafe_b64encode(digest).decode().rstrip("=")
    return cls(code_verifier=code_verifier, code_challenge=code_challenge)

TokenStorage

Bases: Protocol

Protocol for token storage implementations.

Source code in src/mcp/client/auth/oauth2.py
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
class TokenStorage(Protocol):
    """Protocol for token storage implementations."""

    async def get_tokens(self) -> OAuthToken | None:
        """Get stored tokens."""
        ...

    async def set_tokens(self, tokens: OAuthToken) -> None:
        """Store tokens."""
        ...

    async def get_client_info(self) -> OAuthClientInformationFull | None:
        """Get stored client information."""
        ...

    async def set_client_info(self, client_info: OAuthClientInformationFull) -> None:
        """Store client information."""
        ...

get_tokens async

get_tokens() -> OAuthToken | None

Get stored tokens.

Source code in src/mcp/client/auth/oauth2.py
74
75
76
async def get_tokens(self) -> OAuthToken | None:
    """Get stored tokens."""
    ...

set_tokens async

set_tokens(tokens: OAuthToken) -> None

Store tokens.

Source code in src/mcp/client/auth/oauth2.py
78
79
80
async def set_tokens(self, tokens: OAuthToken) -> None:
    """Store tokens."""
    ...

get_client_info async

get_client_info() -> OAuthClientInformationFull | None

Get stored client information.

Source code in src/mcp/client/auth/oauth2.py
82
83
84
async def get_client_info(self) -> OAuthClientInformationFull | None:
    """Get stored client information."""
    ...

set_client_info async

set_client_info(
    client_info: OAuthClientInformationFull,
) -> None

Store client information.

Source code in src/mcp/client/auth/oauth2.py
86
87
88
async def set_client_info(self, client_info: OAuthClientInformationFull) -> None:
    """Store client information."""
    ...