Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,25 @@ jobs:
id: changes
run: |
echo "Checking if agentrun directory has changes..."
# 获取最近两次提交之间的差异;如果没有父提交,则将所有跟踪文件视为已更改
if git rev-parse HEAD^ >/dev/null 2>&1; then
# 与默认分支(main)的分叉点比较,取整个分支引入的全部改动,
# 而非仅最近一次提交(避免多 commit 分支漏检早先提交的改动)。
# CI 以 fetch-depth: 0 检出,origin/main 可用;优先用它,退回本地 main。
BASE_REF=""
for ref in origin/main main; do
if git rev-parse --verify "$ref" >/dev/null 2>&1; then
BASE_REF="$ref"; break
fi
done
MERGE_BASE=""
if [ -n "$BASE_REF" ]; then
MERGE_BASE=$(git merge-base "$BASE_REF" HEAD 2>/dev/null || echo "")
fi
if [ -n "$MERGE_BASE" ] && [ "$MERGE_BASE" != "$(git rev-parse HEAD)" ]; then
echo "Diffing against base ($BASE_REF, merge-base ${MERGE_BASE})"
git diff --name-only "$MERGE_BASE" HEAD > changed_files.txt
elif git rev-parse HEAD^ >/dev/null 2>&1; then
# 回退(如直接 push 到 main,与 base 无分叉):比较最近一次提交
echo "No divergence from base; falling back to HEAD^..HEAD"
git diff --name-only HEAD^ HEAD > changed_files.txt
else
echo "No parent commit; treating all tracked files as changed."
Expand Down
58 changes: 58 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -471,3 +471,61 @@ A: 建议:
3. 执行相关模块的 ut 测试,确保可以正确执行
4. 进行修改内容的总结汇报
5. 根据汇报内容进行检查,重新检查底层 SDK 和 AgentRun SDK 的定义

## 凭证注入与 STS 静默刷新约定(强制)

STS 临时凭证(ak/sk/security_token)会过期。部署在函数计算(FC)时,最新轮转
后的 STS 通过**每次请求的 HTTP 头**下发,而非进程级环境变量。为让所有 client 在
凭证过期后静默刷新,本仓库采用统一机制:

1. **请求级 overlay**:`agentrun/server/sts_middleware.py` 解析 FC 头
(默认 `x-fc-access-key-id` / `x-fc-access-key-secret` / `x-fc-security-token`,
可经构造参数或 `AGENTRUN_STS_HEADER_*` 环境变量覆盖),写入
`agentrun/utils/credential_context.py` 的 `contextvars` overlay。中间件本身
只是 `use_sts_from_headers` 的薄封装(加 FC 门控),二者共用同一套解析逻辑。

**非 agentrun server 场景**(自有 FastAPI / Flask / Django、或非 HTTP 任务):
中间件不会运行,需用户手动注入。SDK 顶层导出两个上下文管理器:
- `agentrun.use_sts_credentials(ak, sk, sts)` —— 显式传值;
- `agentrun.use_sts_from_headers(headers)` —— 从任意请求头映射解析(同 `x-fc-*`)。

```python
from agentrun import use_sts_from_headers
with use_sts_from_headers(request.headers):
... # 块内所有 SDK 调用使用最新 STS,退出自动复位
```

2. **Config 懒解析**:`Config` 的三个凭证 getter 按
**显式传入 > 请求级 overlay(仅当三者均未显式传入)> 环境变量** 解析。
切勿在 `Config.__init__` 里把凭证快照成固定字符串。

3. **client 一律用 credential provider,禁止传静态 ak/sk/sts**:

- **alibabacloud OpenAPI**(控制面 / Bailian / GPDB / Devs 等):
构造 `open_api_util_models.Config` 时传
`credential=build_openapi_credential(cfg)`
(见 `agentrun/utils/credential_providers.py`),**不要**再传
`access_key_id` / `access_key_secret` / `security_token`。
注意 tea_openapi 的优先级是「静态 ak/sk 优先于 credential」,传了静态值
会让 provider 失效。

- **TableStore `OTSClient` / `AsyncOTSClient`**:
构造时传 `credentials_provider=build_ots_credentials_provider(cfg)`
(见 `agentrun/conversation_service/utils.py`),**不要**再传
`access_key_id` / `access_key_secret` / `sts_token`。

原因:直接传静态凭证会在 client 构造时把凭证冻结,长生命周期 client(如
server 启动时仅创建一次的 OTSClient)在 STS 过期后所有请求都会失败。
provider 会在**每次请求**被底层 SDK 调用,从而拿到最新 STS。

4. **数据面手写签名**(`agentrun/utils/data_api.py` 的 RAM 签名)无需 provider:
它本就每次请求调用 `cfg.get_*()`,已随 Config 懒解析自动刷新。

5. **自定义 httpx 签名器**(如 `_AgentrunRamAuth`,用于 MCP SSE / OpenAPI 工具)
必须**持有 `Config`**、在 `auth_flow` 内调用 `cfg.get_*()` 取证,
**不要**在 `__init__` 把 ak/sk/sts 快照成字段。否则长连接(SSE 一次建连、
多请求复用)会冻结建连时的 STS。

新增任何与阿里云 / TableStore 交互的 client 时,必须遵循第 3 条;新增单测应覆盖
「overlay 生效」与「显式凭证不被 overlay 覆盖」两种情况
(参考 `tests/unittests/test_sts_refresh.py`)。
9 changes: 9 additions & 0 deletions agentrun/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@
# ToolSet
from agentrun.toolset import ToolSet, ToolSetClient
from agentrun.utils.config import Config
from agentrun.utils.credential_context import (
StsCredential,
use_sts_credentials,
use_sts_from_headers,
)
from agentrun.utils.exception import (
ResourceAlreadyExistError,
ResourceNotExistError,
Expand Down Expand Up @@ -335,6 +340,10 @@
"ResourceNotExistError",
"ResourceAlreadyExistError",
"Config",
######## STS 凭证刷新(非 server 场景手动注入) ########
"StsCredential",
"use_sts_credentials",
"use_sts_from_headers",
]

# Memory Collection 模块的所有导出(延迟加载)
Expand Down
10 changes: 3 additions & 7 deletions agentrun/conversation_service/__session_store_async_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -880,7 +880,7 @@ async def from_memory_collection_async(
"vector_store_config.instance_name 为空。"
)

# 3. 获取凭证
# 3. 校验凭证存在(fail-fast;运行时由 CredentialsProvider 动态取证)
effective_config = config if isinstance(config, Config) else Config()
access_key_id = effective_config.get_access_key_id()
access_key_secret = effective_config.get_access_key_secret()
Expand All @@ -891,17 +891,13 @@ async def from_memory_collection_async(
"AGENTRUN_ACCESS_KEY_ID / AGENTRUN_ACCESS_KEY_SECRET。"
)

security_token = effective_config.get_security_token()
sts_token = security_token if security_token else None

# 4. 构建 OTSClient + AsyncOTSClient 和 OTSBackend
# 使用 utils.build_ots_clients 避免 codegen 替换 AsyncOTSClient
# 传入 config:OTS 经 CredentialsProvider 每次请求动态取最新 STS。
ots_client, async_ots_client = build_ots_clients(
endpoint,
access_key_id,
access_key_secret,
instance_name,
sts_token=sts_token,
config=effective_config,
)

backend = OTSBackend(
Expand Down
20 changes: 6 additions & 14 deletions agentrun/conversation_service/session_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -1645,7 +1645,7 @@ async def from_memory_collection_async(
"vector_store_config.instance_name 为空。"
)

# 3. 获取凭证
# 3. 校验凭证存在(fail-fast;运行时由 CredentialsProvider 动态取证)
effective_config = config if isinstance(config, Config) else Config()
access_key_id = effective_config.get_access_key_id()
access_key_secret = effective_config.get_access_key_secret()
Expand All @@ -1656,17 +1656,13 @@ async def from_memory_collection_async(
"AGENTRUN_ACCESS_KEY_ID / AGENTRUN_ACCESS_KEY_SECRET。"
)

security_token = effective_config.get_security_token()
sts_token = security_token if security_token else None

# 4. 构建 OTSClient + AsyncOTSClient 和 OTSBackend
# 使用 utils.build_ots_clients 避免 codegen 替换 AsyncOTSClient
# 传入 config:OTS 经 CredentialsProvider 每次请求动态取最新 STS。
ots_client, async_ots_client = build_ots_clients(
endpoint,
access_key_id,
access_key_secret,
instance_name,
sts_token=sts_token,
config=effective_config,
)

backend = OTSBackend(
Expand Down Expand Up @@ -1749,7 +1745,7 @@ def from_memory_collection(
"vector_store_config.instance_name 为空。"
)

# 3. 获取凭证
# 3. 校验凭证存在(fail-fast;运行时由 CredentialsProvider 动态取证)
effective_config = config if isinstance(config, Config) else Config()
access_key_id = effective_config.get_access_key_id()
access_key_secret = effective_config.get_access_key_secret()
Expand All @@ -1760,17 +1756,13 @@ def from_memory_collection(
"AGENTRUN_ACCESS_KEY_ID / AGENTRUN_ACCESS_KEY_SECRET。"
)

security_token = effective_config.get_security_token()
sts_token = security_token if security_token else None

# 4. 构建 OTSClient + OTSClient 和 OTSBackend
# 使用 utils.build_ots_clients 避免 codegen 替换 OTSClient
# 传入 config:OTS 经 CredentialsProvider 每次请求动态取最新 STS。
ots_client, async_ots_client = build_ots_clients(
endpoint,
access_key_id,
access_key_secret,
instance_name,
sts_token=sts_token,
config=effective_config,
)

backend = OTSBackend(
Expand Down
60 changes: 49 additions & 11 deletions agentrun/conversation_service/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
if TYPE_CHECKING:
from tablestore import AsyncOTSClient # type: ignore[import-untyped]
from tablestore import OTSClient
from tablestore.credentials import CredentialsProvider

from agentrun.utils.config import Config

# OTS 单个属性列值上限为 2MB,留 0.5MB 余量(按字符数计)
MAX_COLUMN_SIZE: int = 1_500_000 # 1.5M 字符
Expand Down Expand Up @@ -103,38 +106,73 @@ def from_chunks(chunks: list[str]) -> str:
return "".join(chunks)


def build_ots_credentials_provider(config: "Config") -> "CredentialsProvider":
"""构建 TableStore CredentialsProvider,每次请求从 Config 实时取最新 STS。

TableStore client 在**每个请求**调用 ``credentials_provider.get_credentials()``
(见 tablestore client 的 ``_request_helper``),因此长生命周期的 OTSClient
也能在每次操作时拿到请求级 overlay 注入的最新 STS(再回退环境变量)。

Args:
config: agentrun Config 对象,凭证经其 getter 解析(overlay 优先)。

Returns:
TableStore ``CredentialsProvider`` 实例。
"""
from tablestore.credentials import Credentials, CredentialsProvider

class _AgentrunOtsCredentialsProvider(CredentialsProvider):
def __init__(self, cfg: "Config") -> None:
self._cfg = cfg

def get_credentials(self) -> Credentials:
cfg = self._cfg
return Credentials(
access_key_id=cfg.get_access_key_id(),
access_key_secret=cfg.get_access_key_secret(),
security_token=cfg.get_security_token() or None,
)

return _AgentrunOtsCredentialsProvider(config)


def build_ots_clients(
endpoint: str,
access_key_id: str,
access_key_secret: str,
instance_name: str,
*,
sts_token: str | None = None,
config: "Config",
) -> tuple[OTSClient, AsyncOTSClient]:
"""构建 OTSClient 和 AsyncOTSClient 实例。

独立于 codegen 模板,避免 AsyncOTSClient 被替换为 OTSClient。

凭证统一通过 :func:`build_ots_credentials_provider` 注入:client 每次请求
动态从 ``config`` 取最新 STS(请求级 overlay 优先),STS 过期可静默刷新。
遵循 AGENTS.md 约定——不接受静态 ak/sk/sts。

Args:
endpoint: OTS endpoint。
instance_name: OTS 实例名。
config: agentrun Config 对象,凭证经其 getter 动态解析。

Returns:
(ots_client, async_ots_client) 二元组。
"""
from tablestore import AsyncOTSClient # type: ignore[import-untyped]
from tablestore import OTSClient, WriteRetryPolicy

# 同一个 provider 可被 sync / async client 共享:无状态,按请求读 overlay。
provider = build_ots_credentials_provider(config)
ots_client = OTSClient(
endpoint,
access_key_id,
access_key_secret,
instance_name,
sts_token=sts_token,
instance_name=instance_name,
credentials_provider=provider,
retry_policy=WriteRetryPolicy(),
)
async_ots_client = AsyncOTSClient(
endpoint,
access_key_id,
access_key_secret,
instance_name,
sts_token=sts_token,
instance_name=instance_name,
credentials_provider=provider,
retry_policy=WriteRetryPolicy(),
)
return ots_client, async_ots_client
36 changes: 15 additions & 21 deletions agentrun/memory_collection/memory_conversation.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,19 +189,18 @@ async def _init_memory_store(self):
ots_config = await self._get_ots_config_from_memory_collection()

# 创建 AsyncOTSClient
# 支持使用 STS 临时凭证访问 TableStore
client_kwargs = {
"end_point": ots_config["endpoint"],
"access_key_id": ots_config["access_key_id"],
"access_key_secret": ots_config["access_key_secret"],
"instance_name": ots_config["instance_name"],
}

# 如果提供了 security_token,则添加到参数中(支持 STS 临时凭证)
if ots_config.get("security_token"):
client_kwargs["sts_token"] = ots_config["security_token"]
# 通过 CredentialsProvider 注入凭证:长生命周期的 client(server 启动时
# 仅创建一次)也能在每次请求动态取到请求级 overlay 注入的最新 STS
# (再回退环境变量),无需重建连接。
from agentrun.conversation_service.utils import (
build_ots_credentials_provider,
)

self._ots_client = tablestore.AsyncOTSClient(**client_kwargs)
self._ots_client = tablestore.AsyncOTSClient(
end_point=ots_config["endpoint"],
instance_name=ots_config["instance_name"],
credentials_provider=build_ots_credentials_provider(self.config),
)

# 配置会话表的二级索引元数据字段
# agent_id 字段用于标识会话所属的 Agent
Expand Down Expand Up @@ -259,10 +258,10 @@ async def _get_ots_config_from_memory_collection(self) -> Dict[str, Any]:
Returns:
Dict[str, Any]: OTS 配置字典,包含:
- endpoint: OTS endpoint
- access_key_id: 访问密钥 ID
- access_key_secret: 访问密钥 Secret
- security_token: STS 安全令牌(可选,用于临时凭证)
- instance_name: OTS 实例名称

凭证(ak/sk/sts)不在此返回:OTSClient 通过 CredentialsProvider
每次请求动态从 Config 取最新 STS,无需在此快照。
"""
from agentrun.memory_collection import MemoryCollection

Expand Down Expand Up @@ -307,15 +306,10 @@ async def _get_ots_config_from_memory_collection(self) -> Dict[str, Any]:
f" {original_endpoint} -> {endpoint}"
)

# 构建 OTS 配置
# 构建 OTS 配置(仅连接信息;凭证由 CredentialsProvider 动态注入)
ots_config = {
"endpoint": endpoint,
"instance_name": vs_config.instance_name or "",
"access_key_id": self.config.get_access_key_id(),
"access_key_secret": self.config.get_access_key_secret(),
"security_token": (
self.config.get_security_token()
), # 支持 STS 临时凭证
}

return ots_config
Expand Down
Loading
Loading