Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion backend/agents/create_agent_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ def _build_external_agent_config(agent: dict, agent_url: str) -> ExternalA2AAgen
transport_type=agent.get("transport_type", "http-streaming"),
protocol_version=agent.get("protocol_version", "1.0"),
protocol_type=agent.get("protocol_type", PROTOCOL_JSONRPC),
timeout=300.0,
timeout=float(agent.get("timeout") or 300.0),
custom_headers=agent.get("custom_headers") or None,
raw_card=agent.get("raw_card"),
)

Expand Down
65 changes: 65 additions & 0 deletions backend/apps/a2a_client_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,18 @@
)


class UpdateAgentSettingsRequest(BaseModel):
"""Request to update user-configurable settings for an external A2A agent."""
custom_headers: Optional[dict] = Field(
default=None,
description="Custom HTTP headers as JSON object to send with agent requests"
)
timeout: Optional[float] = Field(
default=None,
description="Request timeout in seconds (default: 300)"
)


class TestNacosConnectionRequest(BaseModel):
"""Request to test Nacos connectivity without saving the config."""
nacos_addr: str = Field(description="Nacos server address (e.g., http://nacos-server:8848)")
Expand Down Expand Up @@ -328,6 +340,59 @@
)


@router.put("/agents/{external_agent_id}/settings")
async def update_agent_settings(
external_agent_id: int,
request: UpdateAgentSettingsRequest,
authorization: Annotated[Optional[str], Header()] = None,
http_request: Request = None
):
"""Update user-configurable settings for an external A2A agent.

Updates custom HTTP headers and/or request timeout.
These settings are preserved across agent card refreshes.

Args:
external_agent_id: The external agent database ID.
request: Request containing the settings to update.
"""
try:
_, tenant_id, _ = get_current_user_info(authorization, http_request)

result = a2a_client_service.update_agent_settings(
external_agent_id=external_agent_id,
tenant_id=tenant_id,
custom_headers=request.custom_headers,
timeout=request.timeout,
)

if not result:
raise HTTPException(
status_code=HTTPStatus.NOT_FOUND,
detail=f"Agent {external_agent_id} not found"
)

return JSONResponse(
status_code=HTTPStatus.OK,
content={"status": "success", "data": result}
)

except HTTPException:
raise
except ValueError as e:
logger.error(f"Invalid settings: {e}")

Check failure on line 383 in backend/apps/a2a_client_app.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Use "logging.exception()" instead.

See more on https://sonarcloud.io/project/issues?id=ModelEngine-Group_nexent&issues=AZ7Tgkt4HrNUCHtbhlVb&open=AZ7Tgkt4HrNUCHtbhlVb&pullRequest=3247
raise HTTPException(
status_code=HTTPStatus.BAD_REQUEST,
detail=str(e)
)
except Exception as e:
logger.error(f"Update agent settings failed: {e}", exc_info=True)

Check failure on line 389 in backend/apps/a2a_client_app.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Use "logging.exception()" instead.

See more on https://sonarcloud.io/project/issues?id=ModelEngine-Group_nexent&issues=AZ7Tgkt4HrNUCHtbhlVc&open=AZ7Tgkt4HrNUCHtbhlVc&pullRequest=3247
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
detail="Failed to update agent settings"
)


# =============================================================================
# External Agent Relations (Sub-agent)
# =============================================================================
Expand Down
74 changes: 73 additions & 1 deletion backend/database/a2a_agent_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,8 @@ def create_external_agent_from_url(
"supported_interfaces": agent.supported_interfaces,
"source_type": agent.source_type,
"base_url": agent.base_url,
"custom_headers": agent.custom_headers,
"timeout": agent.timeout,
"is_available": agent.is_available,
"cached_at": agent.cached_at.isoformat() if agent.cached_at else None,
"cache_expires_at": agent.cache_expires_at.isoformat() if agent.cache_expires_at else None,
Expand Down Expand Up @@ -351,6 +353,8 @@ def create_external_agent_from_nacos(
"supported_interfaces": agent.supported_interfaces,
"source_type": agent.source_type,
"base_url": agent.base_url,
"custom_headers": agent.custom_headers,
"timeout": agent.timeout,
"is_available": agent.is_available,
"cached_at": agent.cached_at.isoformat() if agent.cached_at else None,
"cache_expires_at": agent.cache_expires_at.isoformat() if agent.cache_expires_at else None,
Expand Down Expand Up @@ -392,6 +396,8 @@ def get_external_agent_by_id(external_agent_id: int, tenant_id: str) -> Optional
"nacos_config_id": agent.nacos_config_id,
"nacos_agent_name": agent.nacos_agent_name,
"raw_card": agent.raw_card,
"custom_headers": agent.custom_headers,
"timeout": agent.timeout,
"is_available": agent.is_available,
"last_check_at": agent.last_check_at.isoformat() if agent.last_check_at else None,
"last_check_result": agent.last_check_result,
Expand Down Expand Up @@ -447,6 +453,8 @@ def list_external_agents(
"source_type": agent.source_type,
"source_url": agent.source_url,
"base_url": agent.base_url,
"custom_headers": agent.custom_headers,
"timeout": agent.timeout,
"is_available": agent.is_available,
"last_check_result": agent.last_check_result,
"create_time": agent.create_time.isoformat() if agent.create_time else None,
Expand Down Expand Up @@ -558,7 +566,7 @@ def update_external_agent_protocol(
if interface:
agent.agent_url = interface.get("url", agent.agent_url)

agent.updated_time = datetime.now(timezone.utc)
agent.update_time = datetime.now(timezone.utc)

return {
"id": agent.id,
Expand All @@ -574,6 +582,68 @@ def update_external_agent_protocol(
"nacos_config_id": agent.nacos_config_id,
"nacos_agent_name": agent.nacos_agent_name,
"raw_card": agent.raw_card,
"custom_headers": agent.custom_headers,
"timeout": agent.timeout,
"is_available": agent.is_available,
"last_check_at": agent.last_check_at.isoformat() if agent.last_check_at else None,
"last_check_result": agent.last_check_result,
"cached_at": agent.cached_at.isoformat() if agent.cached_at else None,
"cache_expires_at": agent.cache_expires_at.isoformat() if agent.cache_expires_at else None,
"create_time": agent.create_time.isoformat() if agent.create_time else None,
"update_time": agent.update_time.isoformat() if agent.update_time else None,
}


def update_external_agent_settings(
external_agent_id: int,
tenant_id: str,
custom_headers: Optional[Dict[str, Any]] = None,
timeout: Optional[float] = None,
) -> Optional[Dict[str, Any]]:
"""Update user-configurable settings for an external agent.

Args:
external_agent_id: The external agent database ID.
tenant_id: Tenant ID for isolation.
custom_headers: Custom HTTP headers to send with requests.
timeout: Request timeout in seconds.

Returns:
Updated agent information dict or None if not found.
"""
with _get_db_session() as session:
agent = session.query(A2AExternalAgent).filter(
A2AExternalAgent.id == external_agent_id,
A2AExternalAgent.tenant_id == tenant_id,
A2AExternalAgent.delete_flag != 'Y'
).first()

if not agent:
return None

if custom_headers is not None:
agent.custom_headers = custom_headers
if timeout is not None:
agent.timeout = timeout

agent.update_time = datetime.now(timezone.utc)

return {
"id": agent.id,
"name": agent.name,
"description": agent.description,
"version": agent.version,
"agent_url": agent.agent_url,
"protocol_type": agent.protocol_type,
"streaming": agent.streaming,
"supported_interfaces": agent.supported_interfaces,
"source_type": agent.source_type,
"source_url": agent.source_url,
"nacos_config_id": agent.nacos_config_id,
"nacos_agent_name": agent.nacos_agent_name,
"raw_card": agent.raw_card,
"custom_headers": agent.custom_headers,
"timeout": agent.timeout,
"is_available": agent.is_available,
"last_check_at": agent.last_check_at.isoformat() if agent.last_check_at else None,
"last_check_result": agent.last_check_result,
Expand Down Expand Up @@ -855,6 +925,8 @@ def query_external_sub_agents(
"streaming": agent.streaming,
"supported_interfaces": agent.supported_interfaces,
"raw_card": agent.raw_card,
"custom_headers": agent.custom_headers,
"timeout": agent.timeout,
"is_enabled": relation.is_enabled,
}
for relation, agent in results
Expand Down
4 changes: 4 additions & 0 deletions backend/database/db_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -880,6 +880,10 @@ class A2AExternalAgent(TableBase):
# Full original Agent Card
raw_card = Column(JSON, doc="Full original Agent Card JSON from discovery")

# User-configurable settings (not overridden on card refresh)
custom_headers = Column(JSON, doc="Custom HTTP headers as JSON object for A2A agent requests")
timeout = Column(Float, default=300.0, doc="Request timeout in seconds for calling this agent")

# Cache management
cached_at = Column(TIMESTAMP(timezone=False), doc="Timestamp when Agent Card was cached")
cache_expires_at = Column(TIMESTAMP(timezone=False), doc="Timestamp when cache expires")
Expand Down
67 changes: 39 additions & 28 deletions backend/services/a2a_client_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,36 @@ def update_agent_protocol(

return result

def update_agent_settings(
self,
external_agent_id: int,
tenant_id: str,
custom_headers: Optional[Dict[str, Any]] = None,
timeout: Optional[float] = None,
) -> Optional[Dict[str, Any]]:
"""Update user-configurable settings for an external agent.

Args:
external_agent_id: External agent database ID.
tenant_id: Tenant ID for isolation.
custom_headers: Custom HTTP headers to send with requests.
timeout: Request timeout in seconds.

Returns:
Updated agent information dict or None if not found.
"""
result = a2a_agent_db.update_external_agent_settings(
external_agent_id=external_agent_id,
tenant_id=tenant_id,
custom_headers=custom_headers,
timeout=timeout,
)

if result:
logger.info(f"Updated agent {external_agent_id} settings: custom_headers={custom_headers is not None}, timeout={timeout}")

return result

async def refresh_agent_card(
self,
external_agent_id: int,
Expand Down Expand Up @@ -532,36 +562,15 @@ async def refresh_agent_card(
new_description = card.get("description")
new_supported_interfaces = card.get("supportedInterfaces", [])

# Extract new protocol type from the card
new_protocol_type = _extract_protocol_type(new_supported_interfaces)
current_protocol_type = agent.get("protocol_type")

# Determine if we need to update agent_url and protocol_type
# Determine if we need to update agent_url
# Update agent_url if it changed in the remote card
update_agent_url = new_url is not None and new_url != agent_url

# Update protocol_type if it changed in the remote card
update_protocol_type = new_protocol_type != current_protocol_type
# NOTE: protocol_type is a user-configured setting and must NOT be
# overridden when refreshing the agent card. Only card metadata
# (name, description, supported_interfaces) is updated here.

# When protocol_type changes, we need to find the corresponding interface URL
if update_protocol_type:
logger.info(
f"Protocol type changed for agent {external_agent_id}: "
f"{current_protocol_type} -> {new_protocol_type}"
)
# The database function will handle finding the correct interface URL
result = a2a_agent_db.refresh_external_agent_cache(
external_agent_id=external_agent_id,
tenant_id=tenant_id,
user_id=user_id,
new_raw_card=card,
new_agent_url=new_url if update_agent_url else None,
new_name=new_name,
new_description=new_description,
new_supported_interfaces=new_supported_interfaces,
new_protocol_type=new_protocol_type
)
elif update_agent_url:
if update_agent_url:
# Only agent_url changed
logger.info(
f"Agent URL changed for agent {external_agent_id}: "
Expand Down Expand Up @@ -707,7 +716,8 @@ async def call_agent(

logger.info(f"Calling external A2A agent {external_agent_id}: url={endpoint_url}, protocol={protocol_type}, payload={payload}")

headers = build_a2a_headers()
custom_headers = agent.get("custom_headers") or {}
headers = build_a2a_headers(custom_headers=custom_headers)
async with A2AHttpClient() as client:
response = await client.post_json(endpoint_url, payload, headers)

Expand Down Expand Up @@ -776,7 +786,8 @@ async def call_agent_streaming(

logger.info(f"Calling external A2A agent {external_agent_id} (streaming): url={endpoint_url}, protocol={protocol_type}, payload={payload}")

headers = build_a2a_headers(api_key)
custom_headers = agent.get("custom_headers") or {}
headers = build_a2a_headers(api_key, custom_headers=custom_headers)

try:
async with A2AHttpClient() as client:
Expand Down
4 changes: 3 additions & 1 deletion backend/utils/a2a_http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ async def post_stream(
raise


def build_a2a_headers(api_key: Optional[str] = None) -> Dict[str, str]:
def build_a2a_headers(api_key: Optional[str] = None, custom_headers: Optional[Dict[str, str]] = None) -> Dict[str, str]:
"""Build HTTP headers for A2A requests."""
headers = {
"Content-Type": CONTENT_TYPE_JSON,
Expand All @@ -285,4 +285,6 @@ def build_a2a_headers(api_key: Optional[str] = None) -> Dict[str, str]:
}
if api_key:
headers["Authorization"] = f"Bearer {api_key}"
if custom_headers:
headers.update(custom_headers)
return headers
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
-- Migration: Add custom_headers and timeout columns to ag_a2a_external_agent_t
-- Date: 2026-06-17
-- Description: Support custom HTTP headers and configurable timeout per A2A external agent.
-- - custom_headers: user-defined HTTP headers sent with every request to this agent
-- - timeout: per-agent request timeout in seconds (default 300)

SET search_path TO nexent;

BEGIN;

DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_schema = 'nexent'
AND table_name = 'ag_a2a_external_agent_t'
AND column_name = 'custom_headers'
) THEN
ALTER TABLE nexent.ag_a2a_external_agent_t
ADD COLUMN custom_headers JSON DEFAULT NULL;

COMMENT ON COLUMN nexent.ag_a2a_external_agent_t.custom_headers
IS 'Custom HTTP headers as JSON object for A2A agent requests';
END IF;
END $$;

DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_schema = 'nexent'
AND table_name = 'ag_a2a_external_agent_t'
AND column_name = 'timeout'
) THEN
ALTER TABLE nexent.ag_a2a_external_agent_t
ADD COLUMN timeout DOUBLE PRECISION DEFAULT 300.0;

COMMENT ON COLUMN nexent.ag_a2a_external_agent_t.timeout
IS 'Request timeout in seconds for calling this agent (default 300)';
END IF;
END $$;

COMMIT;
Loading