Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions astrbot/core/provider/func_tool_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,19 @@ async def call(self, context: Any, **kwargs: Any) -> Any:
if call_override is not None and call_override is not FunctionTool.call:
return await self._wrapped.call(context, **kwargs)

run = getattr(self._wrapped, "run", None)
if run is not None:
event = context.context.event

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

In _check_tool_permission, the retrieval of event from context is safely guarded against AttributeError in case context or context.context is missing or not fully formed. However, here event = context.context.event is accessed directly, which can lead to an unhandled AttributeError if a mock or an unexpected context object is passed.

We should retrieve event safely using nested getattr calls to match the robust behavior of _check_tool_permission.

Suggested change
event = context.context.event
event = getattr(getattr(context, "context", None), "event", None)

result = run(event, **kwargs)
if _inspect.isasyncgen(result):
last: Any = None
async for item in result:
last = item
return last
if _inspect.isawaitable(result):
return await result
return result

return "error: tool has no callable handler"


Expand Down
21 changes: 21 additions & 0 deletions tests/unit/test_tool_permission.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,27 @@ async def call(self, context, **kwargs):
assert result == "from call()"


@pytest.mark.asyncio
async def test_guarded_tool_delegates_to_wrapped_run():
_clear_tool_permissions()
mgr = FunctionToolManager()

class RunnableTool(FunctionTool):
async def run(self, event, **kwargs):
return f"from run(): {event.get_sender_id()} {kwargs['value']}"

wrapped = RunnableTool(
name="has_run",
description="desc",
parameters={},
)
guarded = _PermissionGuardedTool(wrapped, mgr)
context = _make_context(sender_id="runner")

result = await guarded.call(context, value="ok")
assert result == "from run(): runner ok"


@pytest.mark.asyncio
async def test_guarded_tool_handles_async_generator_handler():
_clear_tool_permissions()
Expand Down
Loading