diff --git a/backend/agents/create_agent_info.py b/backend/agents/create_agent_info.py index 50df7eb99..b5af69600 100644 --- a/backend/agents/create_agent_info.py +++ b/backend/agents/create_agent_info.py @@ -131,7 +131,8 @@ def _build_external_agent_config(agent: dict, agent_url: str) -> ExternalA2AAgen transport_type=agent.get("transport_type", "http-streaming"), protocol_version=agent.get("protocol_version", "1.0"), protocol_type=agent.get("protocol_type", PROTOCOL_JSONRPC), - timeout=300.0, + timeout=float(agent.get("timeout") or 300.0), + custom_headers=agent.get("custom_headers") or None, raw_card=agent.get("raw_card"), ) diff --git a/backend/apps/a2a_client_app.py b/backend/apps/a2a_client_app.py index ea149ac31..20dc84bf2 100644 --- a/backend/apps/a2a_client_app.py +++ b/backend/apps/a2a_client_app.py @@ -46,6 +46,18 @@ class UpdateAgentProtocolRequest(BaseModel): ) +class UpdateAgentSettingsRequest(BaseModel): + """Request to update user-configurable settings for an external A2A agent.""" + custom_headers: Optional[dict] = Field( + default=None, + description="Custom HTTP headers as JSON object to send with agent requests" + ) + timeout: Optional[float] = Field( + default=None, + description="Request timeout in seconds (default: 300)" + ) + + class TestNacosConnectionRequest(BaseModel): """Request to test Nacos connectivity without saving the config.""" nacos_addr: str = Field(description="Nacos server address (e.g., http://nacos-server:8848)") @@ -328,6 +340,59 @@ async def update_agent_protocol( ) +@router.put("/agents/{external_agent_id}/settings") +async def update_agent_settings( + external_agent_id: int, + request: UpdateAgentSettingsRequest, + authorization: Annotated[Optional[str], Header()] = None, + http_request: Request = None +): + """Update user-configurable settings for an external A2A agent. + + Updates custom HTTP headers and/or request timeout. + These settings are preserved across agent card refreshes. + + Args: + external_agent_id: The external agent database ID. + request: Request containing the settings to update. + """ + try: + _, tenant_id, _ = get_current_user_info(authorization, http_request) + + result = a2a_client_service.update_agent_settings( + external_agent_id=external_agent_id, + tenant_id=tenant_id, + custom_headers=request.custom_headers, + timeout=request.timeout, + ) + + if not result: + raise HTTPException( + status_code=HTTPStatus.NOT_FOUND, + detail=f"Agent {external_agent_id} not found" + ) + + return JSONResponse( + status_code=HTTPStatus.OK, + content={"status": "success", "data": result} + ) + + except HTTPException: + raise + except ValueError as e: + logger.error(f"Invalid settings: {e}") + raise HTTPException( + status_code=HTTPStatus.BAD_REQUEST, + detail=str(e) + ) + except Exception as e: + logger.error(f"Update agent settings failed: {e}", exc_info=True) + raise HTTPException( + status_code=HTTPStatus.INTERNAL_SERVER_ERROR, + detail="Failed to update agent settings" + ) + + # ============================================================================= # External Agent Relations (Sub-agent) # ============================================================================= diff --git a/backend/database/a2a_agent_db.py b/backend/database/a2a_agent_db.py index c1d998272..793a74230 100644 --- a/backend/database/a2a_agent_db.py +++ b/backend/database/a2a_agent_db.py @@ -244,6 +244,8 @@ def create_external_agent_from_url( "supported_interfaces": agent.supported_interfaces, "source_type": agent.source_type, "base_url": agent.base_url, + "custom_headers": agent.custom_headers, + "timeout": agent.timeout, "is_available": agent.is_available, "cached_at": agent.cached_at.isoformat() if agent.cached_at else None, "cache_expires_at": agent.cache_expires_at.isoformat() if agent.cache_expires_at else None, @@ -351,6 +353,8 @@ def create_external_agent_from_nacos( "supported_interfaces": agent.supported_interfaces, "source_type": agent.source_type, "base_url": agent.base_url, + "custom_headers": agent.custom_headers, + "timeout": agent.timeout, "is_available": agent.is_available, "cached_at": agent.cached_at.isoformat() if agent.cached_at else None, "cache_expires_at": agent.cache_expires_at.isoformat() if agent.cache_expires_at else None, @@ -392,6 +396,8 @@ def get_external_agent_by_id(external_agent_id: int, tenant_id: str) -> Optional "nacos_config_id": agent.nacos_config_id, "nacos_agent_name": agent.nacos_agent_name, "raw_card": agent.raw_card, + "custom_headers": agent.custom_headers, + "timeout": agent.timeout, "is_available": agent.is_available, "last_check_at": agent.last_check_at.isoformat() if agent.last_check_at else None, "last_check_result": agent.last_check_result, @@ -447,6 +453,8 @@ def list_external_agents( "source_type": agent.source_type, "source_url": agent.source_url, "base_url": agent.base_url, + "custom_headers": agent.custom_headers, + "timeout": agent.timeout, "is_available": agent.is_available, "last_check_result": agent.last_check_result, "create_time": agent.create_time.isoformat() if agent.create_time else None, @@ -558,7 +566,7 @@ def update_external_agent_protocol( if interface: agent.agent_url = interface.get("url", agent.agent_url) - agent.updated_time = datetime.now(timezone.utc) + agent.update_time = datetime.now(timezone.utc) return { "id": agent.id, @@ -574,6 +582,68 @@ def update_external_agent_protocol( "nacos_config_id": agent.nacos_config_id, "nacos_agent_name": agent.nacos_agent_name, "raw_card": agent.raw_card, + "custom_headers": agent.custom_headers, + "timeout": agent.timeout, + "is_available": agent.is_available, + "last_check_at": agent.last_check_at.isoformat() if agent.last_check_at else None, + "last_check_result": agent.last_check_result, + "cached_at": agent.cached_at.isoformat() if agent.cached_at else None, + "cache_expires_at": agent.cache_expires_at.isoformat() if agent.cache_expires_at else None, + "create_time": agent.create_time.isoformat() if agent.create_time else None, + "update_time": agent.update_time.isoformat() if agent.update_time else None, + } + + +def update_external_agent_settings( + external_agent_id: int, + tenant_id: str, + custom_headers: Optional[Dict[str, Any]] = None, + timeout: Optional[float] = None, +) -> Optional[Dict[str, Any]]: + """Update user-configurable settings for an external agent. + + Args: + external_agent_id: The external agent database ID. + tenant_id: Tenant ID for isolation. + custom_headers: Custom HTTP headers to send with requests. + timeout: Request timeout in seconds. + + Returns: + Updated agent information dict or None if not found. + """ + with _get_db_session() as session: + agent = session.query(A2AExternalAgent).filter( + A2AExternalAgent.id == external_agent_id, + A2AExternalAgent.tenant_id == tenant_id, + A2AExternalAgent.delete_flag != 'Y' + ).first() + + if not agent: + return None + + if custom_headers is not None: + agent.custom_headers = custom_headers + if timeout is not None: + agent.timeout = timeout + + agent.update_time = datetime.now(timezone.utc) + + return { + "id": agent.id, + "name": agent.name, + "description": agent.description, + "version": agent.version, + "agent_url": agent.agent_url, + "protocol_type": agent.protocol_type, + "streaming": agent.streaming, + "supported_interfaces": agent.supported_interfaces, + "source_type": agent.source_type, + "source_url": agent.source_url, + "nacos_config_id": agent.nacos_config_id, + "nacos_agent_name": agent.nacos_agent_name, + "raw_card": agent.raw_card, + "custom_headers": agent.custom_headers, + "timeout": agent.timeout, "is_available": agent.is_available, "last_check_at": agent.last_check_at.isoformat() if agent.last_check_at else None, "last_check_result": agent.last_check_result, @@ -855,6 +925,8 @@ def query_external_sub_agents( "streaming": agent.streaming, "supported_interfaces": agent.supported_interfaces, "raw_card": agent.raw_card, + "custom_headers": agent.custom_headers, + "timeout": agent.timeout, "is_enabled": relation.is_enabled, } for relation, agent in results diff --git a/backend/database/db_models.py b/backend/database/db_models.py index b779266c9..1e0246273 100644 --- a/backend/database/db_models.py +++ b/backend/database/db_models.py @@ -880,6 +880,10 @@ class A2AExternalAgent(TableBase): # Full original Agent Card raw_card = Column(JSON, doc="Full original Agent Card JSON from discovery") + # User-configurable settings (not overridden on card refresh) + custom_headers = Column(JSON, doc="Custom HTTP headers as JSON object for A2A agent requests") + timeout = Column(Float, default=300.0, doc="Request timeout in seconds for calling this agent") + # Cache management cached_at = Column(TIMESTAMP(timezone=False), doc="Timestamp when Agent Card was cached") cache_expires_at = Column(TIMESTAMP(timezone=False), doc="Timestamp when cache expires") diff --git a/backend/services/a2a_client_service.py b/backend/services/a2a_client_service.py index e4e81fec5..e16129a0e 100644 --- a/backend/services/a2a_client_service.py +++ b/backend/services/a2a_client_service.py @@ -453,6 +453,36 @@ def update_agent_protocol( return result + def update_agent_settings( + self, + external_agent_id: int, + tenant_id: str, + custom_headers: Optional[Dict[str, Any]] = None, + timeout: Optional[float] = None, + ) -> Optional[Dict[str, Any]]: + """Update user-configurable settings for an external agent. + + Args: + external_agent_id: External agent database ID. + tenant_id: Tenant ID for isolation. + custom_headers: Custom HTTP headers to send with requests. + timeout: Request timeout in seconds. + + Returns: + Updated agent information dict or None if not found. + """ + result = a2a_agent_db.update_external_agent_settings( + external_agent_id=external_agent_id, + tenant_id=tenant_id, + custom_headers=custom_headers, + timeout=timeout, + ) + + if result: + logger.info(f"Updated agent {external_agent_id} settings: custom_headers={custom_headers is not None}, timeout={timeout}") + + return result + async def refresh_agent_card( self, external_agent_id: int, @@ -532,36 +562,15 @@ async def refresh_agent_card( new_description = card.get("description") new_supported_interfaces = card.get("supportedInterfaces", []) - # Extract new protocol type from the card - new_protocol_type = _extract_protocol_type(new_supported_interfaces) - current_protocol_type = agent.get("protocol_type") - - # Determine if we need to update agent_url and protocol_type + # Determine if we need to update agent_url # Update agent_url if it changed in the remote card update_agent_url = new_url is not None and new_url != agent_url - # Update protocol_type if it changed in the remote card - update_protocol_type = new_protocol_type != current_protocol_type + # NOTE: protocol_type is a user-configured setting and must NOT be + # overridden when refreshing the agent card. Only card metadata + # (name, description, supported_interfaces) is updated here. - # When protocol_type changes, we need to find the corresponding interface URL - if update_protocol_type: - logger.info( - f"Protocol type changed for agent {external_agent_id}: " - f"{current_protocol_type} -> {new_protocol_type}" - ) - # The database function will handle finding the correct interface URL - result = a2a_agent_db.refresh_external_agent_cache( - external_agent_id=external_agent_id, - tenant_id=tenant_id, - user_id=user_id, - new_raw_card=card, - new_agent_url=new_url if update_agent_url else None, - new_name=new_name, - new_description=new_description, - new_supported_interfaces=new_supported_interfaces, - new_protocol_type=new_protocol_type - ) - elif update_agent_url: + if update_agent_url: # Only agent_url changed logger.info( f"Agent URL changed for agent {external_agent_id}: " @@ -707,7 +716,8 @@ async def call_agent( logger.info(f"Calling external A2A agent {external_agent_id}: url={endpoint_url}, protocol={protocol_type}, payload={payload}") - headers = build_a2a_headers() + custom_headers = agent.get("custom_headers") or {} + headers = build_a2a_headers(custom_headers=custom_headers) async with A2AHttpClient() as client: response = await client.post_json(endpoint_url, payload, headers) @@ -776,7 +786,8 @@ async def call_agent_streaming( logger.info(f"Calling external A2A agent {external_agent_id} (streaming): url={endpoint_url}, protocol={protocol_type}, payload={payload}") - headers = build_a2a_headers(api_key) + custom_headers = agent.get("custom_headers") or {} + headers = build_a2a_headers(api_key, custom_headers=custom_headers) try: async with A2AHttpClient() as client: diff --git a/backend/utils/a2a_http_client.py b/backend/utils/a2a_http_client.py index 8b7c55d9f..5b6d78a80 100644 --- a/backend/utils/a2a_http_client.py +++ b/backend/utils/a2a_http_client.py @@ -276,7 +276,7 @@ async def post_stream( raise -def build_a2a_headers(api_key: Optional[str] = None) -> Dict[str, str]: +def build_a2a_headers(api_key: Optional[str] = None, custom_headers: Optional[Dict[str, str]] = None) -> Dict[str, str]: """Build HTTP headers for A2A requests.""" headers = { "Content-Type": CONTENT_TYPE_JSON, @@ -285,4 +285,6 @@ def build_a2a_headers(api_key: Optional[str] = None) -> Dict[str, str]: } if api_key: headers["Authorization"] = f"Bearer {api_key}" + if custom_headers: + headers.update(custom_headers) return headers diff --git a/docker/sql/v2.2.1_add_custom_headers_timeout_to_external_agent.sql b/docker/sql/v2.2.1_add_custom_headers_timeout_to_external_agent.sql new file mode 100644 index 000000000..4404727c1 --- /dev/null +++ b/docker/sql/v2.2.1_add_custom_headers_timeout_to_external_agent.sql @@ -0,0 +1,43 @@ +-- Migration: Add custom_headers and timeout columns to ag_a2a_external_agent_t +-- Date: 2026-06-17 +-- Description: Support custom HTTP headers and configurable timeout per A2A external agent. +-- - custom_headers: user-defined HTTP headers sent with every request to this agent +-- - timeout: per-agent request timeout in seconds (default 300) + +SET search_path TO nexent; + +BEGIN; + +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_schema = 'nexent' + AND table_name = 'ag_a2a_external_agent_t' + AND column_name = 'custom_headers' + ) THEN + ALTER TABLE nexent.ag_a2a_external_agent_t + ADD COLUMN custom_headers JSON DEFAULT NULL; + + COMMENT ON COLUMN nexent.ag_a2a_external_agent_t.custom_headers + IS 'Custom HTTP headers as JSON object for A2A agent requests'; + END IF; +END $$; + +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_schema = 'nexent' + AND table_name = 'ag_a2a_external_agent_t' + AND column_name = 'timeout' + ) THEN + ALTER TABLE nexent.ag_a2a_external_agent_t + ADD COLUMN timeout DOUBLE PRECISION DEFAULT 300.0; + + COMMENT ON COLUMN nexent.ag_a2a_external_agent_t.timeout + IS 'Request timeout in seconds for calling this agent (default 300)'; + END IF; +END $$; + +COMMIT; diff --git a/frontend/app/[locale]/agents/components/a2a/A2AAgentDiscoveryModal.tsx b/frontend/app/[locale]/agents/components/a2a/A2AAgentDiscoveryModal.tsx index bc9260a29..ee9ef5bd3 100644 --- a/frontend/app/[locale]/agents/components/a2a/A2AAgentDiscoveryModal.tsx +++ b/frontend/app/[locale]/agents/components/a2a/A2AAgentDiscoveryModal.tsx @@ -18,6 +18,8 @@ import { Select, Popover, Radio, + InputNumber, + Divider, } from "antd"; import { Globe, @@ -67,10 +69,16 @@ interface A2AAgentDiscoveryModalProps { localAgentId?: number; } +// Constants for agent settings +const DEFAULT_PROTOCOL = "JSONRPC"; +const DEFAULT_TIMEOUT_SECONDS = 300; +const MIN_TIMEOUT_SECONDS = 1; +const MAX_TIMEOUT_SECONDS = 3600; + // Helper function to extract available protocols from supported interfaces function extractAvailableProtocols(supportedInterfaces?: Record[]): string[] { if (!supportedInterfaces || supportedInterfaces.length === 0) { - return ["JSONRPC"]; // Default protocol + return [DEFAULT_PROTOCOL]; } const protocols = new Set(); @@ -82,49 +90,98 @@ function extractAvailableProtocols(supportedInterfaces?: Record[]): } } - return protocols.size > 0 ? Array.from(protocols) : ["JSONRPC"]; + return protocols.size > 0 ? Array.from(protocols) : [DEFAULT_PROTOCOL]; } -// Agent Protocol Setting Popover Component -interface AgentProtocolSettingProps { +// Agent Settings Popover Component (protocol, custom headers, timeout) +interface AgentSettingsProps { agent: A2AExternalAgent; onProtocolChange: (agentId: string, protocolType: string) => void; + onSettingsChange: (agentId: string, settings: { custom_headers?: Record | null; timeout?: number | null }) => void; } -function AgentProtocolSetting({ agent, onProtocolChange }: Readonly) { +function AgentSettings({ agent, onProtocolChange, onSettingsChange }: Readonly) { const { t } = useTranslation("common"); const [open, setOpen] = useState(false); const [selectedProtocol, setSelectedProtocol] = useState( - (agent as any).protocol_type || "JSONRPC" + agent.protocol_type || DEFAULT_PROTOCOL ); + const [timeout, setTimeout] = useState(agent.timeout ?? DEFAULT_TIMEOUT_SECONDS); + // custom_headers stored as key-value pairs for editing + const [headerPairs, setHeaderPairs] = useState<{ key: string; value: string }[]>(() => { + const h = agent.custom_headers || {}; + return Object.entries(h).map(([key, value]) => ({ key, value })); + }); const [saving, setSaving] = useState(false); const availableProtocols = extractAvailableProtocols(agent.supported_interfaces); + // Sync local state when the agent prop changes (e.g. after list reload) + useEffect(() => { + setSelectedProtocol(agent.protocol_type || DEFAULT_PROTOCOL); + }, [agent.protocol_type]); + + useEffect(() => { + setTimeout(agent.timeout ?? DEFAULT_TIMEOUT_SECONDS); + }, [agent.timeout]); + useEffect(() => { - setSelectedProtocol((agent as any).protocol_type || "JSONRPC"); - }, [(agent as any).protocol_type]); + const h = agent.custom_headers || {}; + setHeaderPairs(Object.entries(h).map(([key, value]) => ({ key, value }))); + }, [agent.custom_headers]); - const handleSave = () => { + const addHeaderPair = () => { + setHeaderPairs([...headerPairs, { key: "", value: "" }]); + }; + + const removeHeaderPair = (index: number) => { + setHeaderPairs(headerPairs.filter((_, i) => i !== index)); + }; + + const updateHeaderPair = (index: number, field: "key" | "value", val: string) => { + const updated = [...headerPairs]; + updated[index] = { ...updated[index], [field]: val }; + setHeaderPairs(updated); + }; + + const handleSave = async () => { setSaving(true); - onProtocolChange(String(agent.id), selectedProtocol); - setSaving(false); - setOpen(false); + try { + // Save protocol if changed + if (selectedProtocol !== (agent.protocol_type || DEFAULT_PROTOCOL)) { + onProtocolChange(String(agent.id), selectedProtocol); + } + + // Build custom_headers object from pairs (skip empty keys) + const custom_headers: Record = {}; + for (const { key, value } of headerPairs) { + if (key.trim()) { + custom_headers[key.trim()] = value; + } + } + + onSettingsChange(String(agent.id), { + custom_headers: Object.keys(custom_headers).length > 0 ? custom_headers : null, + timeout: timeout, + }); + } finally { + setSaving(false); + setOpen(false); + } }; return ( -
- - {t("a2a.protocol.selectProtocol")} - +
+ {/* Protocol */} +
+ {t("a2a.protocol.selectProtocol")}
setSelectedProtocol(e.target.value)} - style={{ display: "flex", flexDirection: "column", gap: 8 }} + style={{ display: "flex", flexDirection: "column", gap: 6, marginBottom: 16 }} > {PROTOCOL_TYPES.map((protocol) => { const isAvailable = availableProtocols.includes(protocol.value); @@ -138,16 +195,67 @@ function AgentProtocolSetting({ agent, onProtocolChange }: Readonly {protocol.label} {!isAvailable && ( - - N/A - + N/A )} ); })} -
+ + + + {/* Timeout */} +
+ {t("a2a.settings.timeout")} +
+ setTimeout(v ?? DEFAULT_TIMEOUT_SECONDS)} + addonAfter={t("a2a.settings.seconds")} + style={{ width: "100%", marginBottom: 16 }} + /> + + + + {/* Custom Headers */} +
+ {t("a2a.settings.customHeaders")} + +
+
+ {headerPairs.map((pair, index) => ( +
+ updateHeaderPair(index, "key", e.target.value)} + size="small" + style={{ flex: 1 }} + /> + updateHeaderPair(index, "value", e.target.value)} + size="small" + style={{ flex: 1 }} + /> +
+ ))} +
+ +