Skip to content

transport_security

DNS rebinding protection for MCP server transports.

TransportSecuritySettings

Bases: BaseModel

Settings for MCP transport security features.

These settings help protect against DNS rebinding attacks by validating incoming request headers.

Source code in src/mcp/server/transport_security.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class TransportSecuritySettings(BaseModel):
    """Settings for MCP transport security features.

    These settings help protect against DNS rebinding attacks by validating incoming request headers.
    """

    enable_dns_rebinding_protection: bool = True
    """Enable DNS rebinding protection (recommended for production)."""

    allowed_hosts: list[str] = Field(default_factory=list)
    """List of allowed Host header values.

    Only applies when `enable_dns_rebinding_protection` is `True`.
    """

    allowed_origins: list[str] = Field(default_factory=list)
    """List of allowed Origin header values.

    Only applies when `enable_dns_rebinding_protection` is `True`.
    """

enable_dns_rebinding_protection class-attribute instance-attribute

enable_dns_rebinding_protection: bool = True

Enable DNS rebinding protection (recommended for production).

allowed_hosts class-attribute instance-attribute

allowed_hosts: list[str] = Field(default_factory=list)

List of allowed Host header values.

Only applies when enable_dns_rebinding_protection is True.

allowed_origins class-attribute instance-attribute

allowed_origins: list[str] = Field(default_factory=list)

List of allowed Origin header values.

Only applies when enable_dns_rebinding_protection is True.

TransportSecurityMiddleware

Middleware to enforce DNS rebinding protection for MCP transport endpoints.

Source code in src/mcp/server/transport_security.py
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
class TransportSecurityMiddleware:
    """Middleware to enforce DNS rebinding protection for MCP transport endpoints."""

    def __init__(self, settings: TransportSecuritySettings | None = None):
        # If not specified, disable DNS rebinding protection by default for backwards compatibility
        self.settings = settings or TransportSecuritySettings(enable_dns_rebinding_protection=False)

    def _validate_host(self, host: str | None) -> bool:  # pragma: no cover
        """Validate the Host header against allowed values."""
        if not host:
            logger.warning("Missing Host header in request")
            return False

        # Check exact match first
        if host in self.settings.allowed_hosts:
            return True

        # Check wildcard port patterns
        for allowed in self.settings.allowed_hosts:
            if allowed.endswith(":*"):
                # Extract base host from pattern
                base_host = allowed[:-2]
                # Check if the actual host starts with base host and has a port
                if host.startswith(base_host + ":"):
                    return True

        logger.warning(f"Invalid Host header: {host}")
        return False

    def _validate_origin(self, origin: str | None) -> bool:  # pragma: no cover
        """Validate the Origin header against allowed values."""
        # Origin can be absent for same-origin requests
        if not origin:
            return True

        # Check exact match first
        if origin in self.settings.allowed_origins:
            return True

        # Check wildcard port patterns
        for allowed in self.settings.allowed_origins:
            if allowed.endswith(":*"):
                # Extract base origin from pattern
                base_origin = allowed[:-2]
                # Check if the actual origin starts with base origin and has a port
                if origin.startswith(base_origin + ":"):
                    return True

        logger.warning(f"Invalid Origin header: {origin}")
        return False

    def _validate_content_type(self, content_type: str | None) -> bool:
        """Validate the Content-Type header for POST requests."""
        return content_type is not None and content_type.lower().startswith("application/json")

    async def validate_request(self, request: Request, is_post: bool = False) -> Response | None:
        """Validate request headers for DNS rebinding protection.

        Returns None if validation passes, or an error Response if validation fails.
        """
        # Always validate Content-Type for POST requests
        if is_post:  # pragma: no branch
            content_type = request.headers.get("content-type")
            if not self._validate_content_type(content_type):
                return Response("Invalid Content-Type header", status_code=400)

        # Skip remaining validation if DNS rebinding protection is disabled
        if not self.settings.enable_dns_rebinding_protection:
            return None

        # Validate Host header  # pragma: no cover
        host = request.headers.get("host")  # pragma: no cover
        if not self._validate_host(host):  # pragma: no cover
            return Response("Invalid Host header", status_code=421)  # pragma: no cover

        # Validate Origin header  # pragma: no cover
        origin = request.headers.get("origin")  # pragma: no cover
        if not self._validate_origin(origin):  # pragma: no cover
            return Response("Invalid Origin header", status_code=403)  # pragma: no cover

        return None  # pragma: no cover

validate_request async

validate_request(
    request: Request, is_post: bool = False
) -> Response | None

Validate request headers for DNS rebinding protection.

Returns None if validation passes, or an error Response if validation fails.

Source code in src/mcp/server/transport_security.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
async def validate_request(self, request: Request, is_post: bool = False) -> Response | None:
    """Validate request headers for DNS rebinding protection.

    Returns None if validation passes, or an error Response if validation fails.
    """
    # Always validate Content-Type for POST requests
    if is_post:  # pragma: no branch
        content_type = request.headers.get("content-type")
        if not self._validate_content_type(content_type):
            return Response("Invalid Content-Type header", status_code=400)

    # Skip remaining validation if DNS rebinding protection is disabled
    if not self.settings.enable_dns_rebinding_protection:
        return None

    # Validate Host header  # pragma: no cover
    host = request.headers.get("host")  # pragma: no cover
    if not self._validate_host(host):  # pragma: no cover
        return Response("Invalid Host header", status_code=421)  # pragma: no cover

    # Validate Origin header  # pragma: no cover
    origin = request.headers.get("origin")  # pragma: no cover
    if not self._validate_origin(origin):  # pragma: no cover
        return Response("Invalid Origin header", status_code=403)  # pragma: no cover

    return None  # pragma: no cover