diff --git a/astrbot/core/platform/sources/qqofficial/qqofficial_platform_adapter.py b/astrbot/core/platform/sources/qqofficial/qqofficial_platform_adapter.py index c693c546e8..69c93a4f18 100644 --- a/astrbot/core/platform/sources/qqofficial/qqofficial_platform_adapter.py +++ b/astrbot/core/platform/sources/qqofficial/qqofficial_platform_adapter.py @@ -523,6 +523,44 @@ def replace_face(match): # Match face tags: return re.sub(r"]*>", replace_face, content) + @staticmethod + def _sanitize_command_interaction_tags(content: str) -> str: + """Replace QQ Official command interaction tags with user text.""" + import re + + tag_pattern = re.compile( + r"<\s*qqbot-cmd-[\w:-]+\b(?P[^<>]*)/?>", + re.IGNORECASE, + ) + text_attr_pattern = re.compile( + r"""\btext\s*=\s*(?P["'])(?P.*?)(?P=quote)""", + re.IGNORECASE | re.DOTALL, + ) + + def replace_tag(match: re.Match[str]) -> str: + attrs = match.group("attrs") or "" + text_match = text_attr_pattern.search(attrs) + if not text_match: + return "" + return text_match.group("value") + + return tag_pattern.sub(replace_tag, content).strip() + + @staticmethod + def _normalize_message_content( + content: str | None, + mention_id: str | None = None, + ) -> str: + raw_content = content or "" + if mention_id is not None: + raw_content = raw_content.replace( + "<@!" + mention_id + ">", + "", + ) + return QQOfficialPlatformAdapter._sanitize_command_interaction_tags( + QQOfficialPlatformAdapter._parse_face_message(raw_content.strip()) + ) + @staticmethod async def _parse_from_qqofficial( message: botpy.message.Message @@ -549,8 +587,8 @@ async def _parse_from_qqofficial( else: abm.sender = MessageMember(message.author.user_openid, "") # Parse face messages to readable text - abm.message_str = QQOfficialPlatformAdapter._parse_face_message( - message.content.strip() + abm.message_str = QQOfficialPlatformAdapter._normalize_message_content( + message.content ) abm.self_id = "unknown_selfid" msg.append(At(qq="qq_official")) @@ -569,11 +607,9 @@ async def _parse_from_qqofficial( else: abm.self_id = "" - plain_content = QQOfficialPlatformAdapter._parse_face_message( - message.content.replace( - "<@!" + str(abm.self_id) + ">", - "", - ).strip() + plain_content = QQOfficialPlatformAdapter._normalize_message_content( + message.content, + mention_id=str(abm.self_id), ) await QQOfficialPlatformAdapter._append_attachments( diff --git a/tests/test_qqofficial_platform_adapter.py b/tests/test_qqofficial_platform_adapter.py new file mode 100644 index 0000000000..b1c5c52178 --- /dev/null +++ b/tests/test_qqofficial_platform_adapter.py @@ -0,0 +1,171 @@ +from __future__ import annotations + +import pytest + +from astrbot.api.message_components import Plain +from astrbot.api.platform import MessageType +from astrbot.core.platform.sources.qqofficial import qqofficial_platform_adapter +from astrbot.core.platform.sources.qqofficial.qqofficial_platform_adapter import ( + QQOfficialPlatformAdapter, +) + + +class FakeAuthor: + member_openid = "member-openid" + user_openid = "user-openid" + id = "author-id" + username = "author-name" + + +class FakeMention: + id = "bot-id" + + +class FakeGroupMessage: + def __init__(self, content: str | None) -> None: + self.id = "message-id" + self.content = content + self.author = FakeAuthor() + self.group_openid = "group-openid" + self.attachments = [] + + +class FakeChannelMessage: + def __init__(self, content: str | None) -> None: + self.id = "guild-message-id" + self.content = content + self.author = FakeAuthor() + self.mentions = [FakeMention()] + self.attachments = [] + self.channel_id = "channel-id" + + +def _plain_texts(message_components: list[object]) -> list[str]: + return [ + component.text + for component in message_components + if isinstance(component, Plain) + ] + + +def test_sanitize_command_interaction_tags_keeps_plain_text() -> None: + assert ( + QQOfficialPlatformAdapter._sanitize_command_interaction_tags("hello world") + == "hello world" + ) + + +def test_sanitize_command_interaction_tags_extracts_double_quoted_text() -> None: + content = ( + 'hello now' + ) + + assert ( + QQOfficialPlatformAdapter._sanitize_command_interaction_tags(content) + == "hello /quick-map now" + ) + + +def test_sanitize_command_interaction_tags_extracts_single_quoted_text() -> None: + content = "run " + + assert ( + QQOfficialPlatformAdapter._sanitize_command_interaction_tags(content) + == "run /start" + ) + + +def test_sanitize_command_interaction_tags_removes_tag_without_text() -> None: + content = 'hello world' + + assert ( + QQOfficialPlatformAdapter._sanitize_command_interaction_tags(content) + == "hello world" + ) + + +def test_sanitize_command_interaction_tags_leaves_non_target_tag_unchanged() -> None: + content = '' + + assert ( + QQOfficialPlatformAdapter._sanitize_command_interaction_tags(content) == content + ) + + +def test_normalize_message_content_handles_none() -> None: + assert QQOfficialPlatformAdapter._normalize_message_content(None) == "" + + +def test_normalize_message_content_removes_bot_mention_and_sanitizes_tags() -> None: + content = '<@!bot-id> ' + + assert ( + QQOfficialPlatformAdapter._normalize_message_content( + content, + mention_id="bot-id", + ) + == "/quick-map" + ) + + +@pytest.mark.asyncio +async def test_parse_from_qqofficial_sanitizes_message_str_and_plain_component( + monkeypatch, +) -> None: + monkeypatch.setattr( + qqofficial_platform_adapter.botpy.message, + "GroupMessage", + FakeGroupMessage, + ) + raw_message = FakeGroupMessage( + 'hello ' + ) + + message = await QQOfficialPlatformAdapter._parse_from_qqofficial( + raw_message, + MessageType.GROUP_MESSAGE, + ) + + assert message.message_str == "hello /quick-map" + assert _plain_texts(message.message) == ["hello /quick-map"] + + +@pytest.mark.asyncio +async def test_parse_from_qqofficial_handles_none_group_content( + monkeypatch, +) -> None: + monkeypatch.setattr( + qqofficial_platform_adapter.botpy.message, + "GroupMessage", + FakeGroupMessage, + ) + raw_message = FakeGroupMessage(None) + + message = await QQOfficialPlatformAdapter._parse_from_qqofficial( + raw_message, + MessageType.GROUP_MESSAGE, + ) + + assert message.message_str == "" + assert _plain_texts(message.message) == [""] + + +@pytest.mark.asyncio +async def test_parse_from_qqofficial_handles_none_channel_content( + monkeypatch, +) -> None: + monkeypatch.setattr( + qqofficial_platform_adapter.botpy.message, + "Message", + FakeChannelMessage, + ) + raw_message = FakeChannelMessage(None) + + message = await QQOfficialPlatformAdapter._parse_from_qqofficial( + raw_message, + MessageType.GROUP_MESSAGE, + ) + + assert message.message_str == "" + assert _plain_texts(message.message) == [""]