Skip to content
Merged

Dev #84

Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion plugins/communication_protocols/http/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "utcp-http"
version = "1.1.1"
version = "1.1.2"
authors = [
{ name = "UTCP Contributors" },
]
Expand Down
83 changes: 83 additions & 0 deletions plugins/communication_protocols/http/src/utcp_http/_security.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
"""URL validation shared by every HTTP-based communication protocol.

Centralised so all three HTTP protocols (http, streamable_http, sse) enforce
the same trust boundary at every network edge — manual discovery AND tool
invocation. Issue #83 (CVE-class SSRF) was caused by the runtime invocation
path forgetting the discovery-time check, so this module also provides an
explicit ``ensure_secure_url`` to call before every aiohttp request.
"""

from __future__ import annotations

from ipaddress import ip_address
from typing import Optional
from urllib.parse import urlparse

# Hostnames considered safe to talk to over plain HTTP.
_LOOPBACK_HOSTNAMES = frozenset({"localhost", "127.0.0.1", "::1", "[::1]"})


def is_secure_url(url: str) -> bool:
"""Return True if ``url`` is safe to fetch from a UTCP HTTP protocol.

Allowed:
- Any ``https://`` URL.
- ``http://`` URLs whose host is exactly ``localhost``, ``127.0.0.1``,
or ``::1``.

Disallowed:
- Plain ``http://`` to any other host (MITM exposure).
- URLs whose hostname *starts* with ``localhost`` / ``127.0.0.1`` but
isn't actually loopback (e.g. ``http://localhost.evil.com``,
``http://127.0.0.1.attacker.example``). The earlier ``startswith``
check let these through.
- Anything without a scheme/host (file://, gopher://, javascript:, ...).
"""
if not isinstance(url, str) or not url:
return False

try:
parsed = urlparse(url)
except ValueError:
return False

scheme = (parsed.scheme or "").lower()
if scheme not in {"http", "https"}:
return False

host = (parsed.hostname or "").lower()
if not host:
return False

if scheme == "https":
return True

# http:// is only allowed for loopback.
if host in _LOOPBACK_HOSTNAMES:
return True

# Catch any other literal loopback IP that urlparse normalised
# (e.g. ``http://127.000.000.001``).
try:
return ip_address(host).is_loopback
except ValueError:
return False


def ensure_secure_url(url: str, *, context: Optional[str] = None) -> None:
"""Raise ``ValueError`` if ``url`` is not safe to fetch.

``context`` is a short label (``"manual discovery"``, ``"tool invocation"``,
etc.) included in the error so log readers can tell which trust boundary
was breached.
"""
if is_secure_url(url):
return

where = f" during {context}" if context else ""
raise ValueError(
f"Security error{where}: URL must use HTTPS or be a literal loopback "
f"address (localhost / 127.0.0.1 / ::1). Got: {url!r}. "
"Plain HTTP to any other host is rejected to prevent MITM attacks "
"and SSRF into internal services."
)
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from utcp_http.http_call_template import HttpCallTemplate
from aiohttp import ClientSession, BasicAuth as AiohttpBasicAuth
from utcp_http.openapi_converter import OpenApiConverter
from utcp_http._security import ensure_secure_url
import logging

logging.basicConfig(
Expand Down Expand Up @@ -123,14 +124,10 @@ async def register_manual(self, caller, manual_call_template: CallTemplate) -> R

try:
url = manual_call_template.url

# Security check: Enforce HTTPS or localhost to prevent MITM attacks
if not (url.startswith("https://") or url.startswith("http://localhost") or url.startswith("http://127.0.0.1")):
raise ValueError(
f"Security error: URL must use HTTPS or start with 'http://localhost' or 'http://127.0.0.1'. Got: {url}. "
"Non-secure URLs are vulnerable to man-in-the-middle attacks."
)


# Security check: only HTTPS or loopback HTTP allowed for manual discovery.
ensure_secure_url(url, context="manual discovery")

logger.info(f"Discovering tools from '{manual_call_template.name}' (HTTP) at {url}")

# Use the call template's configuration (headers, auth, HTTP method, etc.)
Expand Down Expand Up @@ -274,7 +271,15 @@ async def call_tool(self, caller, tool_name: str, tool_args: Dict[str, Any], too

# Build the URL with path parameters substituted
url = self._build_url_with_path_params(tool_call_template.url, remaining_args)


# Security check: re-validate the resolved URL before each invocation.
# An attacker-controlled OpenAPI spec discovered over a legitimate HTTPS
# URL can declare ``servers[0].url`` pointing at internal services
# (e.g. http://169.254.169.254 for cloud metadata, http://127.0.0.1:9200
# for an unauthenticated Elasticsearch). Without this re-check, tool
# invocation is a blind SSRF primitive — see GHSA / issue #83.
ensure_secure_url(url, context="tool invocation")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: The SSRF check allows loopback addresses (http://127.0.0.1:*), which contradicts the threat model documented in its own comment. An attacker-controlled OpenAPI spec fetched over a legitimate HTTPS endpoint can set servers[0].url to http://127.0.0.1:9200 (or any other local service), and ensure_secure_url will pass it through because 127.0.0.1 is in the loopback allowlist.

Consider using a stricter variant for the tool-invocation context that only allows HTTPS (no loopback HTTP), or at least remove the misleading comment about blocking http://127.0.0.1:9200.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At plugins/communication_protocols/http/src/utcp_http/http_communication_protocol.py, line 281:

<comment>The SSRF check allows loopback addresses (`http://127.0.0.1:*`), which contradicts the threat model documented in its own comment. An attacker-controlled OpenAPI spec fetched over a legitimate HTTPS endpoint can set `servers[0].url` to `http://127.0.0.1:9200` (or any other local service), and `ensure_secure_url` will pass it through because `127.0.0.1` is in the loopback allowlist.

Consider using a stricter variant for the tool-invocation context that only allows HTTPS (no loopback HTTP), or at least remove the misleading comment about blocking `http://127.0.0.1:9200`.</comment>

<file context>
@@ -274,7 +271,15 @@ async def call_tool(self, caller, tool_name: str, tool_args: Dict[str, Any], too
+        # (e.g. http://169.254.169.254 for cloud metadata, http://127.0.0.1:9200
+        # for an unauthenticated Elasticsearch). Without this re-check, tool
+        # invocation is a blind SSRF primitive — see GHSA / issue #83.
+        ensure_secure_url(url, context="tool invocation")
+
         # The rest of the arguments are query parameters
</file context>


# The rest of the arguments are query parameters
query_params = remaining_args

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from utcp.data.auth_implementations.oauth2_auth import OAuth2Auth
from utcp_http.sse_call_template import SseCallTemplate
from aiohttp import ClientSession, BasicAuth as AiohttpBasicAuth
from utcp_http._security import ensure_secure_url
import traceback
import logging

Expand Down Expand Up @@ -77,14 +78,10 @@ async def register_manual(self, caller, manual_call_template: CallTemplate) -> R

try:
url = manual_call_template.url

# Security check: Enforce HTTPS or localhost to prevent MITM attacks
if not (url.startswith("https://") or url.startswith("http://localhost") or url.startswith("http://127.0.0.1")):
raise ValueError(
f"Security error: URL must use HTTPS or start with 'http://localhost' or 'http://127.0.0.1'. Got: {url}. "
"Non-secure URLs are vulnerable to man-in-the-middle attacks."
)


# Security check: only HTTPS or loopback HTTP allowed for manual discovery.
ensure_secure_url(url, context="manual discovery")

logger.info(f"Discovering tools from '{manual_call_template.name}' (SSE) at {url}")

# Use the provider's configuration (headers, auth, etc.)
Expand Down Expand Up @@ -188,7 +185,12 @@ async def call_tool_streaming(self, caller, tool_name: str, tool_args: Dict[str,

# Build the URL with path parameters substituted
url = self._build_url_with_path_params(tool_call_template.url, remaining_args)


# Security check: re-validate the resolved URL before each invocation.
# Defends against SSRF via attacker-controlled OpenAPI specs that point
# ``servers[0].url`` at internal services. See issue #83.
ensure_secure_url(url, context="tool invocation")

# The rest of the arguments are query parameters
query_params = remaining_args

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from utcp.data.auth_implementations import OAuth2Auth
from utcp_http.streamable_http_call_template import StreamableHttpCallTemplate
from aiohttp import ClientSession, BasicAuth as AiohttpBasicAuth, ClientResponse
from utcp_http._security import ensure_secure_url
import logging

logging.basicConfig(
Expand Down Expand Up @@ -78,14 +79,11 @@ async def register_manual(self, caller, manual_call_template: CallTemplate) -> R
raise ValueError("StreamableHttpCommunicationProtocol can only be used with StreamableHttpCallTemplate")

url = manual_call_template.url

# Security check: Enforce HTTPS or localhost to prevent MITM attacks
if not (url.startswith("https://") or url.startswith("http://localhost") or url.startswith("http://127.0.0.1")):
raise ValueError(
f"Security error: URL must use HTTPS or start with 'http://localhost' or 'http://127.0.0.1'. Got: {url}. "
"Non-secure URLs are vulnerable to man-in-the-middle attacks."
)


# Security check: only HTTPS or loopback HTTP allowed for manual discovery.
ensure_secure_url(url, context="manual discovery")


logger.info(f"Discovering tools from '{manual_call_template.name}' (HTTP Stream) at {url}")

try:
Expand Down Expand Up @@ -216,7 +214,12 @@ async def call_tool_streaming(self, caller, tool_name: str, tool_args: Dict[str,

# Build the URL with path parameters substituted
url = self._build_url_with_path_params(tool_call_template.url, remaining_args)


# Security check: re-validate the resolved URL before each invocation.
# Defends against SSRF via attacker-controlled OpenAPI specs that point
# ``servers[0].url`` at internal services. See issue #83.
ensure_secure_url(url, context="tool invocation")

# The rest of the arguments are query parameters
query_params = remaining_args

Expand Down
67 changes: 67 additions & 0 deletions plugins/communication_protocols/http/tests/test_security.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
"""Tests for the URL trust-boundary helper used by every HTTP-based protocol.

The helper backs the fix for issue #83 (SSRF via attacker-controlled
``servers[0].url`` in OpenAPI specs). These cases pin the exact accept/reject
decisions so the bypass never silently regresses.
"""

import pytest

from utcp_http._security import ensure_secure_url, is_secure_url


@pytest.mark.parametrize(
"url",
[
"https://example.com/openapi.json",
"HTTPS://example.com/openapi.json",
"https://example.com:8443/v1/tool",
"http://localhost/openapi.json",
"http://localhost:8080/v1/tool",
"http://127.0.0.1:9090/sensitive",
"http://[::1]:9090/sensitive",
],
)
def test_secure_urls_accepted(url: str) -> None:
assert is_secure_url(url) is True


@pytest.mark.parametrize(
"url",
[
# Plain http to non-loopback host (the historical SSRF target).
"http://169.254.169.254/latest/meta-data/",
"http://internal.service.local/secret",
"http://10.0.0.5/admin",
"http://example.com/openapi.json",
# The localhost.evil.com / 127.0.0.1.attacker.example bypass that the
# original `startswith` check let through.
"http://localhost.evil.com/path",
"http://127.0.0.1.attacker.example/path",
# Non-http schemes must always be rejected.
"file:///etc/passwd",
"ftp://example.com/x",
"javascript:alert(1)",
# Garbage.
"",
"not-a-url",
],
)
def test_insecure_urls_rejected(url: str) -> None:
assert is_secure_url(url) is False


def test_ensure_secure_url_raises_with_context() -> None:
with pytest.raises(ValueError) as exc:
ensure_secure_url(
"http://169.254.169.254/latest/meta-data/",
context="tool invocation",
)
msg = str(exc.value)
assert "tool invocation" in msg
assert "169.254.169.254" in msg


def test_ensure_secure_url_passes_silently_for_valid_url() -> None:
# Should not raise.
ensure_secure_url("https://example.com/v1/tool", context="manual discovery")
Loading