diff --git a/.github/workflows/e2e-test-tc.yml b/.github/workflows/e2e-test-tc.yml new file mode 100644 index 00000000..14c1f660 --- /dev/null +++ b/.github/workflows/e2e-test-tc.yml @@ -0,0 +1,172 @@ +name: e2e-test-tc + +on: + workflow_dispatch: + workflow_call: + inputs: + from_build: + type: boolean + default: false + description: "build.yml から呼び出されたかどうか" + push: + branches: + - develop + - "feature/**" + paths: + - ".github/workflows/e2e-test-tc.yml" + - "tests/test_tc.py" + schedule: + # UTC の 01:00 は JST だと 10:00 。 + # 1-5 で 月曜日から金曜日 + - cron: "0 1 * * 1-5" + +env: + TEST_SIGNALING_URLS: ${{ secrets.TEST_SIGNALING_URLS }} + TEST_CHANNEL_ID_PREFIX: ${{ secrets.TEST_CHANNEL_ID_PREFIX }} + TEST_SECRET_KEY: ${{ secrets.TEST_SECRET_KEY }} + TEST_API_URL: ${{ secrets.TEST_API_URL }} + OPENH264_VERSION: 2.6.0 + +permissions: + contents: read + actions: read + +jobs: + e2e_test_tc: + # TC テストは Linux でのみ実行(tc は Linux カーネル機能) + # Python 3.14 かつ Ubuntu 24.04 固定 + runs-on: ubuntu-24.04 + timeout-minutes: 30 + steps: + - uses: actions/checkout@v5 + + # Tailscale のセットアップ + - uses: tailscale/github-action@v4 + with: + oauth-client-id: ${{ secrets.TS_OAUTH_CLIENT_ID }} + oauth-secret: ${{ secrets.TS_OAUTH_SECRET }} + tags: tag:ci + use-cache: "true" + hostname: gha-tc-${{ github.run_id }}-${{ github.run_number }} + version: latest + timeout: 2m + retry: 5 + + # Linux 向けの依存関係インストール + - run: | + sudo apt-get update + sudo apt-get -y install libx11-dev libdrm-dev libva-dev + + # UV のセットアップ + - uses: astral-sh/setup-uv@v7 + with: + enable-cache: false + # Python バージョンの設定(3.14 固定) + python-version: "3.14" + + # 依存関係のインストール (test と tc グループ) + - run: uv sync --no-install-project --group test --group tc + + # Wheel ファイルのダウンロード (build.yml から呼び出された場合: 呼び出し元の artifact を使用) + - if: inputs.from_build == true + uses: actions/download-artifact@v5 + with: + name: ubuntu-24.04_x86_64_python-3.14 + path: dist/ + + # Wheel ファイルのダウンロード (直接実行の場合: 同一ブランチの最新成功ビルド artifact または GitHub Release から取得) + - if: inputs.from_build != true + uses: ./.github/actions/download-whl + id: download-whl + with: + platform_name: ubuntu-24.04_x86_64 + python_version: "3.14" + github_token: ${{ secrets.GITHUB_TOKEN }} + + # wheel ファイル名を取得 + - name: Find wheel file + id: find-wheel + shell: bash + run: | + WHEEL_FILE=$(find dist -name "*.whl" | head -1) + if [ -z "$WHEEL_FILE" ]; then + echo "No wheel file found in dist/" + exit 1 + fi + echo "wheel_file=$WHEEL_FILE" >> $GITHUB_OUTPUT + + # wheel ファイルのインストール + - name: Install wheel package + run: | + echo "Installing wheel: ${{ steps.find-wheel.outputs.wheel_file }}" + + # wheel ファイルの内容を確認 + echo "Checking wheel contents:" + unzip -l "${{ steps.find-wheel.outputs.wheel_file }}" | grep -E "sora_sdk|METADATA" | head -20 + + # uv pip install を使う理由: + # uv add はワークスペース依存関係の管理用で、プロジェクト名 (sora-sdk) と + # 同じ名前の wheel ファイルを追加しようとすると自己参照エラーになる + # uv pip install は仮想環境に直接インストールするため、この制限を回避できる + uv pip install "${{ steps.find-wheel.outputs.wheel_file }}" + + # OpenH264 ライブラリのダウンロード + - uses: shiguredo/github-actions/.github/actions/download-openh264@main + id: openh264 + with: + openh264_version: ${{ env.OPENH264_VERSION }} + use-cache: true + + # OpenH264 パスを環境変数に設定 + - name: Set OpenH264 path + run: echo "OPENH264_PATH=${{ steps.openh264.outputs.openh264_path }}" >> $GITHUB_ENV + + # ネットワークインターフェースの確認 + - name: Check network interface + run: | + echo "=== Network interfaces ===" + ip link show + echo "" + echo "=== Default route ===" + ip route show default + echo "" + echo "=== Current tc qdisc settings ===" + tc qdisc show || true + + # TC E2E テストの実行(root 権限が必要) + - name: Run TC E2E tests + run: | + # UV_NO_SYNC=1 を指定する理由: + # uv run はデフォルトで実行前に pyproject.toml/uv.lock と環境を同期する + # この同期により、uv pip install でインストールした wheel が削除され、 + # 元のソースコードの sora-sdk がインストールされてしまう + # UV_NO_SYNC=1 で同期をスキップし、インストール済みの wheel を使用する + + # tc の設定には root 権限が必要なため sudo で実行 + # -E オプションで環境変数を継承 + # $(which uv) で uv のフルパスを指定(sudo 実行時に PATH が変わるため) + sudo -E $(which uv) run pytest tests/test_tc.py -v --durations=0 -s + env: + UV_NO_SYNC: 1 + TC: 1 + + # テスト後の tc 設定のクリーンアップ確認 + - name: Verify tc cleanup + if: always() + run: | + echo "=== TC qdisc settings after test ===" + tc qdisc show || true + + slack_notify_failed: + needs: [e2e_test_tc] + runs-on: ubuntu-24.04 + if: failure() && inputs.from_build != true + steps: + - name: Slack Notification + uses: rtCamp/action-slack-notify@v2 + env: + SLACK_CHANNEL: sora-python-sdk + SLACK_COLOR: danger + SLACK_TITLE: "TC E2E Test FAILED" + SLACK_ICON_EMOJI: ":japanese_ogre:" + SLACK_WEBHOOK: ${{ secrets.SLACK_WEBHOOK }} diff --git a/.github/workflows/e2e-test.yml b/.github/workflows/e2e-test.yml index b06a7e41..22069cc8 100644 --- a/.github/workflows/e2e-test.yml +++ b/.github/workflows/e2e-test.yml @@ -15,6 +15,7 @@ on: paths: - ".github/workflows/e2e-test.yml" - "tests/**" + - "!tests/**/test_tc*" schedule: # UTC の 01:00 は JST だと 10:00 。 # 1-5 で 月曜日から金曜日 @@ -242,7 +243,7 @@ jobs: # この同期により、uv pip install でインストールした wheel が削除され、 # 元のソースコードの sora-sdk がインストールされてしまう # UV_NO_SYNC=1 で同期をスキップし、インストール済みの wheel を使用する - uv run pytest ${{ matrix.platform.test_target || 'tests/' }} -v --durations=0 ${{ steps.pytest-args.outputs.pytest_extra_args }} + uv run pytest ${{ matrix.platform.test_target || 'tests/' }} -v --durations=0 ${{ steps.pytest-args.outputs.pytest_extra_args }} --ignore-glob=**/test_tc*.py -s env: UV_NO_SYNC: 1 diff --git a/AGENTS.md b/AGENTS.md index 375baee3..a6cf6c9e 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -24,3 +24,8 @@ - コミットメッセージは日本語で書くこと - コミットメッセージは命令形で書くこと - コミットメッセージは〜するという形で書くこと + +## Python について + +- Python のバージョンは 3.11 以上を前提とすること +- 末尾コメントを利用しないこと diff --git a/pyproject.toml b/pyproject.toml index a7a98fba..d83dafbc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,7 +30,16 @@ build-backend = "setuptools.build_meta" [dependency-groups] dev = ["nanobind==2.9.2"] -test = ["pytest", "pytest-repeat", "pytest-xdist", "numpy", "httpx", "pyjwt"] +test = [ + "pytest", + "pytest-timeout", + "pytest-repeat", + "pytest-xdist", + "numpy", + "httpx", + "pyjwt", +] +tc = ["pyroute2"] lint = ["ruff", "ty"] [tool.uv] diff --git a/tests/client.py b/tests/client.py index 856b5d80..df3e590c 100644 --- a/tests/client.py +++ b/tests/client.py @@ -327,6 +327,69 @@ def disconnect_message(self) -> Optional[dict[str, Any]]: def close_message(self) -> Optional[dict[str, Any]]: return self._close_message + @property + def turn_ports(self) -> dict[str, list[int]]: + """TURN URI (RFC 7065) からポート情報を抽出する""" + ports_map: dict[str, list[int]] = {"udp": [], "tcp": [], "tls": []} + if self._offer_message is None: + return ports_map + + config = self._offer_message.get("config") + if config is None: + return ports_map + + ice_servers = config.get("iceServers", []) + for server in ice_servers: + urls = server.get("urls", []) + if isinstance(urls, str): + urls = [urls] + for uri in urls: + if not (uri.startswith("turn:") or uri.startswith("turns:")): + continue + + # TURN URI を手動でパースする + # RFC 7065: turn:host:port?transport=udp + # RFC 7065: turns:host:port?transport=tcp + is_turns = uri.startswith("turns:") + scheme = "turns:" if is_turns else "turn:" + + # スキームを除去 + uri_without_scheme = uri[len(scheme):] + + # クエリパラメータを分離 + query = "" + if "?" in uri_without_scheme: + uri_without_scheme, query = uri_without_scheme.split("?", 1) + + # ホストとポートを分離 + if ":" not in uri_without_scheme: + continue + + # 最後のコロンでホストとポートを分割 + host_port = uri_without_scheme.rsplit(":", 1) + if len(host_port) != 2: + continue + + try: + port = int(host_port[1]) + except ValueError: + continue + + # turns: の場合は tls + if is_turns: + if port not in ports_map["tls"]: + ports_map["tls"].append(port) + # turn: の場合は query の transport を確認 + else: + transport = "udp" # デフォルトは udp + if query: + params = dict(param.split("=") for param in query.split("&") if "=" in param) + transport = params.get("transport", "udp") + if transport in ports_map and port not in ports_map[transport]: + ports_map[transport].append(port) + + return ports_map + @property def connected(self) -> bool: return self._connected.is_set() diff --git a/tests/test_tc.py b/tests/test_tc.py new file mode 100644 index 00000000..7735080a --- /dev/null +++ b/tests/test_tc.py @@ -0,0 +1,550 @@ +"""tc egress を使用した帯域制限のテスト。 + +このテストは pyroute2 を使用して Linux の tc (traffic control) により +ローカルインターフェースの egress (送信方向) に帯域制限を適用し、 +TURN 経由での接続に対する効果を検証する。 +""" + +import os +import sys +import time +from typing import Optional + +import pytest +from client import SoraClient, SoraRole + +# pyroute2 がインストールされていない場合はスキップ +pyroute2 = pytest.importorskip("pyroute2") + +# TC=1 環境変数、Linux 環境、ルート権限のいずれかが満たされない場合はスキップ +pytestmark = pytest.mark.skipif( + os.getenv("TC") != "1" or sys.platform != "linux" or os.geteuid() != 0, + reason="TC=1 環境変数、Linux 環境、ルート権限が必要", +) + +# テスト用の設定値 +INITIAL_BITRATE_KBPS = 1200 # 初期ビットレート (kbps) +BANDWIDTH_LIMIT_KBPS = 250 # 帯域制限値 (kbps) +MIN_BITRATE_BEFORE_LIMIT_KBPS = 500 # 制限前の最小ビットレート (kbps) +BANDWIDTH_OVERHEAD_FACTOR = 1.2 # 帯域制限の許容オーバーヘッド (20%) + + +def get_default_interface() -> str: + """デフォルトのネットワークインターフェース名を取得する。 + + Returns: + デフォルトルートで使用されているインターフェース名 + + Raises: + RuntimeError: デフォルトインターフェースが取得できない場合 + """ + with pyroute2.IPRoute() as ipr: + # デフォルトルートを取得(IPv4) + for route in ipr.get_routes(family=2): + # dst が存在しない場合がデフォルトルート + if not route.get_attr("RTA_DST"): + oif = route.get_attr("RTA_OIF") + if oif: + # インターフェース情報を取得 + links = ipr.get_links(oif) + if links: + ifname = links[0].get_attr("IFLA_IFNAME") + return ifname + + raise RuntimeError("デフォルトインターフェースが見つかりません") + + +class TCEgressManager: + """tc netem qdisc を使用して egress (送信方向) のネットワーク帯域制限を管理する。""" + + def __init__(self, interface: str = "eth0") -> None: + """ + TC egress 帯域制限マネージャーを初期化する。 + + Args: + interface: tc ルールを適用するネットワークインターフェース + """ + self.interface: str = interface + self._bandwidth_applied: bool = False + self.ipr: Optional["pyroute2.IPRoute"] = None + + def __enter__(self): + """コンテキストマネージャーのエントリ。""" + self.ipr = pyroute2.IPRoute() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """コンテキストマネージャーの終了 - tc ルールをクリーンアップする。""" + try: + self.cleanup() + finally: + if self.ipr: + self.ipr.close() + + def add_bandwidth_limit(self, rate_kbps: int) -> None: + """ + インターフェースの egress に帯域制限を追加する。 + + Args: + rate_kbps: 帯域制限 (Kbps) + + Raises: + IndexError: インターフェースが見つからない場合 + Exception: tc 操作が失敗した場合 + """ + if not self.ipr: + raise RuntimeError("IPRoute が初期化されていません") + + # インターフェースインデックスを取得する + indices = self.ipr.link_lookup(ifname=self.interface) + if not indices: + raise IndexError(f"インターフェース '{self.interface}' が見つかりません") + idx = indices[0] + + # tbf (Token Bucket Filter) qdisc で帯域制限を追加する + # root=True を指定すると handle は自動的に 0x10000 (= "1:") に設定される + # rate: 帯域制限 (文字列で "750 kbit" のように指定) + # burst: バーストサイズ (bytes) + # latency: 最大遅延時間 (文字列で "50ms" のように指定) + self.ipr.tc( + "add", + "tbf", + idx, + root=True, + rate=f"{rate_kbps}kbit", + # 32KB + burst=32768, + latency="50ms", + ) + self._bandwidth_applied = True + print(f"tc egress 帯域制限を追加: interface={self.interface}, rate={rate_kbps}kbps") + + def add_delay(self, delay_ms: int) -> None: + """ + インターフェースの egress に遅延を追加する。 + + Args: + delay_ms: 遅延 (ミリ秒) + + Raises: + IndexError: インターフェースが見つからない場合 + Exception: tc 操作が失敗した場合 + """ + if not self.ipr: + raise RuntimeError("IPRoute が初期化されていません") + + # インターフェースインデックスを取得する + indices = self.ipr.link_lookup(ifname=self.interface) + if not indices: + raise IndexError(f"インターフェース '{self.interface}' が見つかりません") + idx = indices[0] + + # netem qdisc で遅延を設定 + self.ipr.tc( + "add", + "netem", + idx, + root=True, + delay=delay_ms * 1000, # マイクロ秒に変換 + ) + self._bandwidth_applied = True + print(f"tc egress 遅延を追加: interface={self.interface}, delay={delay_ms}ms") + + def get_stats(self) -> dict: + """tc qdisc の統計情報を取得する。 + + Returns: + 統計情報を含む辞書 (sent_bytes, sent_packets, drops など) + """ + if not self.ipr: + raise RuntimeError("IPRoute が初期化されていません") + + # インターフェースインデックスを取得する + indices = self.ipr.link_lookup(ifname=self.interface) + if not indices: + raise IndexError(f"インターフェース '{self.interface}' が見つかりません") + idx = indices[0] + + # tc qdisc の情報を取得する (tbf または netem qdisc のみ) + for qdisc in self.ipr.get_qdiscs(idx): + kind = qdisc.get_attr("TCA_KIND") + if kind not in ("tbf", "netem"): + continue + + # TCA_STATS2 属性から統計情報を取得 + stats2 = qdisc.get_attr("TCA_STATS2") + if not stats2: + continue + + # TCA_STATS_BASIC から bytes と packets を取得 + stats_basic = stats2.get_attr("TCA_STATS_BASIC") + sent_bytes = stats_basic.get("bytes", 0) if stats_basic else 0 + sent_packets = stats_basic.get("packets", 0) if stats_basic else 0 + + # TCA_STATS_QUEUE から drops, overlimits, requeues を取得 + stats_queue = stats2.get_attr("TCA_STATS_QUEUE") + if stats_queue: + drops = stats_queue.get("drops", 0) + overlimits = stats_queue.get("overlimits", 0) + requeues = stats_queue.get("requeues", 0) + else: + drops = 0 + overlimits = 0 + requeues = 0 + + return { + "sent_bytes": sent_bytes, + "sent_packets": sent_packets, + "drops": drops, + "overlimits": overlimits, + "requeues": requeues, + } + + return {} + + def cleanup(self) -> None: + """インターフェースから tc 帯域制限設定を削除する。""" + if not self._bandwidth_applied: + return + + if not self.ipr: + return + + try: + indices = self.ipr.link_lookup(ifname=self.interface) + if not indices: + return + idx = indices[0] + # qdisc を削除する + # 削除時には kind を指定せず、index と root のみを指定する + self.ipr.tc("del", index=idx, root=True) + self._bandwidth_applied = False + print(f"tc egress 帯域制限を削除: interface={self.interface}") + except Exception as e: + # クリーンアップ時のエラーは無視する (qdisc が存在しない可能性がある) + print(f"tc egress 帯域制限の削除時にエラー (無視): {e}") + + +def verify_tc_settings(interface: str, qdisc_type: str = "tbf") -> bool: + """tc の設定が存在するか確認する。 + + Args: + interface: ネットワークインターフェース名 + qdisc_type: 確認する qdisc の種類 (tbf または netem) + + Returns: + 設定が存在する場合は True + """ + try: + with pyroute2.IPRoute() as ipr: + # インターフェースインデックスを取得 + indices = ipr.link_lookup(ifname=interface) + if not indices: + return False + idx = indices[0] + + # qdisc の情報を取得 + for qdisc in ipr.get_qdiscs(idx): + kind = qdisc.get_attr("TCA_KIND") + if kind == qdisc_type: + return True + return False + except Exception as e: + print(f"tc 設定の確認に失敗: {e}") + return False + + +def show_tc_stats(interface: str) -> None: + """tc の統計情報を表示する。 + + Args: + interface: ネットワークインターフェース名 + """ + try: + with pyroute2.IPRoute() as ipr: + # インターフェースインデックスを取得 + indices = ipr.link_lookup(ifname=interface) + if not indices: + print(f"インターフェース {interface} が見つかりません") + return + idx = indices[0] + + # qdisc の情報を取得して表示 (tbf/netem のみ) + print(f"\ntc 統計情報 ({interface}):") + found = False + for qdisc in ipr.get_qdiscs(idx): + kind = qdisc.get_attr("TCA_KIND") + # tbf または netem qdisc のみを表示 + if kind not in ("tbf", "netem"): + continue + + found = True + handle = qdisc.get("handle", 0) + parent = qdisc.get("parent", 0) + + # TCA_STATS2 属性から統計情報を取得 + stats2 = qdisc.get_attr("TCA_STATS2") + if not stats2: + continue + + # TCA_STATS_BASIC から bytes と packets を取得 + stats_basic = stats2.get_attr("TCA_STATS_BASIC") + if stats_basic: + sent_bytes = stats_basic.get("bytes", 0) + sent_packets = stats_basic.get("packets", 0) + else: + sent_bytes = 0 + sent_packets = 0 + + # TCA_STATS_QUEUE から drops と overlimits を取得 + stats_queue = stats2.get_attr("TCA_STATS_QUEUE") + if stats_queue: + drops = stats_queue.get("drops", 0) + overlimits = stats_queue.get("overlimits", 0) + else: + drops = 0 + overlimits = 0 + + print(f" qdisc {kind} handle {handle:#x} parent {parent:#x}") + print(f" Sent {sent_bytes} bytes {sent_packets} packets") + print(f" drops {drops}, overlimits {overlimits}") + + if not found: + print(" (tc 設定なし)") + except Exception as e: + print(f"tc 統計情報の取得に失敗: {e}") + + +def get_simulcast_outbound_rtp_stats(webrtc_stats: list) -> list: + """simulcast の outbound-rtp 統計情報を取得してソートする。 + + Args: + webrtc_stats: get_stats() で取得した統計情報のリスト + + Returns: + rid でソートされた outbound-rtp 統計情報のリスト + """ + simulcast_stats = [ + stat + for stat in webrtc_stats + if stat.get("type") == "outbound-rtp" and stat.get("kind") == "video" + ] + simulcast_stats.sort(key=lambda x: x.get("rid", "")) + return simulcast_stats + + +def show_webrtc_stats(webrtc_stats: list) -> None: + """WebRTC の統計情報を表示する。 + + Args: + webrtc_stats: get_stats() で取得した統計情報のリスト + """ + try: + print("\nWebRTC 統計情報:") + outbound_rtp_stats = [ + stat for stat in webrtc_stats if stat.get("type") == "outbound-rtp" + ] + outbound_rtp_stats.sort(key=lambda x: x.get("rid", "")) + + for stat in outbound_rtp_stats: + rid = stat.get("rid", "") + rid_label = f" (rid={rid})" if rid else "" + print(f" outbound-rtp{rid_label}:") + print(f" ssrc: {stat.get('ssrc')}") + print(f" kind: {stat.get('kind')}") + if rid: + print(f" rid: {rid}") + print(f" bytesSent: {stat.get('bytesSent')}") + print(f" packetsSent: {stat.get('packetsSent')}") + if "targetBitrate" in stat: + print(f" targetBitrate: {stat.get('targetBitrate')} bps") + if "totalPacketSendDelay" in stat: + print(f" totalPacketSendDelay: {stat.get('totalPacketSendDelay')} s") + except Exception as e: + print(f"WebRTC 統計情報の表示に失敗: {e}") + + +def test_tc_egress_bandwidth_limit(settings): + """TURN ポート取得後に tc egress で帯域制限をかける。""" + print("\n" + "=" * 60) + print(f"テスト: tc egress 帯域制限 ({BANDWIDTH_LIMIT_KBPS}kbps) の適用") + print("=" * 60) + + interface = get_default_interface() + print(f"使用するネットワークインターフェース: {interface}") + + with SoraClient( + settings, + SoraRole.SENDONLY, + simulcast=True, + audio=False, + video=True, + video_codec_type="VP8", + video_bit_rate=INITIAL_BITRATE_KBPS, + video_width=960, + video_height=540, + ) as sendonly: + time.sleep(10) + + # offer_message が受信されていることを確認 + assert sendonly.offer_message is not None + + # turn_ports プロパティから TURN ポートを取得 + turn_ports = sendonly.turn_ports + + # UDP ポートが取得できていることを確認 + assert len(turn_ports["udp"]) > 0, "UDP ポートが取得できていません" + + # 最初の UDP ポートを使用 + udp_port = turn_ports["udp"][0] + print(f"TURN UDP ポート: {udp_port}") + + # 制限前の WebRTC 統計情報を確認 + print("\n制限前の WebRTC 統計情報:") + time.sleep(3) + webrtc_stats = sendonly.get_stats() + show_webrtc_stats(webrtc_stats) + + # 制限前の targetBitrate を確認 (video の全ての outbound-rtp を取得) + simulcast_outbound_rtp_stats = get_simulcast_outbound_rtp_stats(webrtc_stats) + + # simulcast では r0/r1/r2 の 3 つのストリームが必ず存在する + assert len(simulcast_outbound_rtp_stats) == 3 + + # r0 の確認 + outbound_rtp_r0 = simulcast_outbound_rtp_stats[0] + assert "rid" in outbound_rtp_r0 + assert outbound_rtp_r0["rid"] == "r0" + assert "targetBitrate" in outbound_rtp_r0 + + # r1 の確認 + outbound_rtp_r1 = simulcast_outbound_rtp_stats[1] + assert "rid" in outbound_rtp_r1 + assert outbound_rtp_r1["rid"] == "r1" + assert "targetBitrate" in outbound_rtp_r1 + + # r2 の確認 + outbound_rtp_r2 = simulcast_outbound_rtp_stats[2] + assert "rid" in outbound_rtp_r2 + assert outbound_rtp_r2["rid"] == "r2" + assert "targetBitrate" in outbound_rtp_r2 + + print("\nBefore bandwidth limit - targetBitrate:") + print( + f" rid={outbound_rtp_r0['rid']}: {outbound_rtp_r0['targetBitrate']} bps " + f"({outbound_rtp_r0['targetBitrate'] / 1000} kbps)" + ) + print( + f" rid={outbound_rtp_r1['rid']}: {outbound_rtp_r1['targetBitrate']} bps " + f"({outbound_rtp_r1['targetBitrate'] / 1000} kbps)" + ) + print( + f" rid={outbound_rtp_r2['rid']}: {outbound_rtp_r2['targetBitrate']} bps " + f"({outbound_rtp_r2['targetBitrate'] / 1000} kbps)" + ) + + # r2 (最高画質) のビットレートが MIN_BITRATE_BEFORE_LIMIT_KBPS 以上あることを確認 + assert outbound_rtp_r2["targetBitrate"] >= MIN_BITRATE_BEFORE_LIMIT_KBPS * 1000 + + # tc egress で帯域制限を設定 + with TCEgressManager(interface=interface) as tc: + # 帯域制限を設定 + print(f"\nステップ 1: tc egress 帯域制限 {BANDWIDTH_LIMIT_KBPS}kbps を適用") + tc.add_bandwidth_limit(rate_kbps=BANDWIDTH_LIMIT_KBPS) + + # tc の設定が存在することを確認 + print("\nステップ 2: tc 設定を確認") + assert verify_tc_settings(interface), "tc の設定が確認できません" + + # tc の統計情報を表示 (適用直後) + show_tc_stats(interface) + + # 接続を維持して帯域制限が有効な状態でテスト + print("\nステップ 3: 帯域制限が有効な状態で接続を維持") + time.sleep(10) + + # tc の統計情報を表示 (接続後) + show_tc_stats(interface) + + # 統計情報を取得 + tc_stats = tc.get_stats() + print("\ntc 統計情報 (IPRoute):") + for key, value in tc_stats.items(): + print(f" {key}: {value}") + + # WebRTC 統計情報を表示 + print("\nステップ 4: 制限後の WebRTC 統計情報を確認") + webrtc_stats_after = sendonly.get_stats() + show_webrtc_stats(webrtc_stats_after) + + # targetBitrate を確認 (video の全ての outbound-rtp を取得) + simulcast_outbound_rtp_stats = get_simulcast_outbound_rtp_stats(webrtc_stats_after) + + # simulcast では r0/r1/r2 の 3 つのストリームが必ず存在する + assert len(simulcast_outbound_rtp_stats) == 3 + + # r0/r1/r2 の確認 + outbound_rtp_r0 = simulcast_outbound_rtp_stats[0] + assert "rid" in outbound_rtp_r0 + assert outbound_rtp_r0["rid"] == "r0" + + outbound_rtp_r1 = simulcast_outbound_rtp_stats[1] + assert "rid" in outbound_rtp_r1 + assert outbound_rtp_r1["rid"] == "r1" + + outbound_rtp_r2 = simulcast_outbound_rtp_stats[2] + assert "rid" in outbound_rtp_r2 + assert outbound_rtp_r2["rid"] == "r2" + + print("\nAfter bandwidth limit - outbound-rtp stats:") + for stat in simulcast_outbound_rtp_stats: + rid = stat.get("rid", "none") + bitrate = stat.get("targetBitrate") + quality_limitation = stat.get("qualityLimitationReason", "none") + if bitrate is not None: + print( + f" rid={rid}: targetBitrate={bitrate} bps ({bitrate / 1000} kbps), " + f"qualityLimitationReason={quality_limitation}" + ) + else: + print( + f" rid={rid}: targetBitrate=none (paused), " + f"qualityLimitationReason={quality_limitation}" + ) + + # r0 の確認: targetBitrate が存在し、帯域制限以下であること + assert "targetBitrate" in outbound_rtp_r0 + r0_bitrate = outbound_rtp_r0["targetBitrate"] + assert r0_bitrate <= BANDWIDTH_LIMIT_KBPS * 1000 * BANDWIDTH_OVERHEAD_FACTOR + assert "qualityLimitationReason" in outbound_rtp_r0 + assert outbound_rtp_r0["qualityLimitationReason"] == "bandwidth" + print( + f"\nVerify r0: targetBitrate={r0_bitrate} bps ({r0_bitrate / 1000} kbps), " + f"qualityLimitationReason={outbound_rtp_r0['qualityLimitationReason']}" + ) + + # r1 の確認: targetBitrate が存在せず、qualityLimitationReason が bandwidth であること + assert "targetBitrate" not in outbound_rtp_r1 + assert "qualityLimitationReason" in outbound_rtp_r1 + assert outbound_rtp_r1["qualityLimitationReason"] == "bandwidth" + print( + f"Verify r1: targetBitrate=none (paused), qualityLimitationReason={outbound_rtp_r1['qualityLimitationReason']}" + ) + + # r2 の確認: targetBitrate が存在せず、qualityLimitationReason が bandwidth であること + assert "targetBitrate" not in outbound_rtp_r2 + assert "qualityLimitationReason" in outbound_rtp_r2 + assert outbound_rtp_r2["qualityLimitationReason"] == "bandwidth" + print( + f"Verify r2: targetBitrate=none (paused), qualityLimitationReason={outbound_rtp_r2['qualityLimitationReason']}" + ) + + print("\nTest completed with bandwidth limit applied") + + # クリーンアップ確認 + print("\nAfter cleanup - tc settings:") + show_tc_stats(interface) + + print("\nResult:") + print(" ✓ Test passed: tc egress bandwidth limit applied successfully") + print("=" * 60 + "\n") diff --git a/tests/test_turn.py b/tests/test_turn.py new file mode 100644 index 00000000..aee6dc18 --- /dev/null +++ b/tests/test_turn.py @@ -0,0 +1,53 @@ +import time + +from client import SoraClient, SoraRole + + +def test_turn_ports(settings): + """TURN ポートが正しく取得できることを確認する""" + with SoraClient( + settings, + SoraRole.SENDONLY, + audio=True, + video=False, + ) as sendonly: + time.sleep(3) + + # offer_message が受信されていることを確認 + assert sendonly.offer_message is not None + + # turn_ports プロパティから TURN ポートを取得 + turn_ports = sendonly.turn_ports + + # turn_ports が辞書であることを確認 + assert isinstance(turn_ports, dict) + + # 必須キーが存在することを確認 + assert "udp" in turn_ports + assert "tcp" in turn_ports + assert "tls" in turn_ports + + # 各値がリストであることを確認 + assert isinstance(turn_ports["udp"], list) + assert isinstance(turn_ports["tcp"], list) + assert isinstance(turn_ports["tls"], list) + + # UDP ポートは必ず1つ以上存在する + assert len(turn_ports["udp"]) > 0, "UDP ポートが取得できていません" + + # UDP ポートがエフェメラルポート範囲内であることを確認 + for port in turn_ports["udp"]: + assert 49152 <= port <= 65535, f"UDP ポートがエフェメラルポート範囲外: {port}" + + # デバッグ用にポート情報を出力 + print(f"TURN ports (UDP): {turn_ports['udp']}") + print(f"TURN ports (TCP): {turn_ports['tcp']}") + print(f"TURN ports (TLS): {turn_ports['tls']}") + + # offer_message の config.iceServers を確認 + if "config" in sendonly.offer_message: + config = sendonly.offer_message["config"] + if "iceServers" in config: + print(f"ICE Servers: {config['iceServers']}") + + sendonly.disconnect() diff --git a/uv.lock b/uv.lock index 7ed7388f..b9eb8cc9 100644 --- a/uv.lock +++ b/uv.lock @@ -224,6 +224,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/61/ad/689f02752eeec26aed679477e80e632ef1b682313be70793d798c1d5fc8f/PyJWT-2.10.1-py3-none-any.whl", hash = "sha256:dcdd193e30abefd5debf142f9adfcdd2b58004e644f25406ffaebd50bd98dacb", size = 22997, upload-time = "2024-11-28T03:43:27.893Z" }, ] +[[package]] +name = "pyroute2" +version = "0.9.5" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "win-inet-pton", marker = "sys_platform == 'win32'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/1d/b2/1019ddc278549fb7e64a16d7775e0f7d981135f762b8706e583414d655e3/pyroute2-0.9.5.tar.gz", hash = "sha256:a198ccbe545b031b00b10da4b44df33d548db04af944be8107c05a215ba03872", size = 969492, upload-time = "2025-11-02T18:42:49.603Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/bd/66/995c65e7566543e941705f9b1b99ebfd7f9d3ed67c8eb7cb47f30168dc72/pyroute2-0.9.5-py3-none-any.whl", hash = "sha256:e7d485ce8274cbf473e9092fa65585faf8a3df5fe05ecf497cb9c5b1516ba09f", size = 480641, upload-time = "2025-11-02T18:42:45.75Z" }, +] + [[package]] name = "pytest" version = "9.0.0" @@ -252,6 +264,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/73/d4/8b706b81b07b43081bd68a2c0359fe895b74bf664b20aca8005d2bb3be71/pytest_repeat-0.9.4-py3-none-any.whl", hash = "sha256:c1738b4e412a6f3b3b9e0b8b29fcd7a423e50f87381ad9307ef6f5a8601139f3", size = 4180, upload-time = "2025-04-07T14:59:51.492Z" }, ] +[[package]] +name = "pytest-timeout" +version = "2.4.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "pytest" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/ac/82/4c9ecabab13363e72d880f2fb504c5f750433b2b6f16e99f4ec21ada284c/pytest_timeout-2.4.0.tar.gz", hash = "sha256:7e68e90b01f9eff71332b25001f85c75495fc4e3a836701876183c4bcfd0540a", size = 17973, upload-time = "2025-05-05T19:44:34.99Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/fa/b6/3127540ecdf1464a00e5a01ee60a1b09175f6913f0644ac748494d9c4b21/pytest_timeout-2.4.0-py3-none-any.whl", hash = "sha256:c42667e5cdadb151aeb5b26d114aff6bdf5a907f176a007a30b940d3d865b5c2", size = 14382, upload-time = "2025-05-05T19:44:33.502Z" }, +] + [[package]] name = "pytest-xdist" version = "3.8.0" @@ -312,12 +336,16 @@ lint = [ { name = "ruff" }, { name = "ty" }, ] +tc = [ + { name = "pyroute2" }, +] test = [ { name = "httpx" }, { name = "numpy" }, { name = "pyjwt" }, { name = "pytest" }, { name = "pytest-repeat" }, + { name = "pytest-timeout" }, { name = "pytest-xdist" }, ] @@ -329,12 +357,14 @@ lint = [ { name = "ruff" }, { name = "ty" }, ] +tc = [{ name = "pyroute2" }] test = [ { name = "httpx" }, { name = "numpy" }, { name = "pyjwt" }, { name = "pytest" }, { name = "pytest-repeat" }, + { name = "pytest-timeout" }, { name = "pytest-xdist" }, ] @@ -371,3 +401,12 @@ sdist = { url = "https://files.pythonhosted.org/packages/72/94/1a15dd82efb362ac8 wheels = [ { url = "https://files.pythonhosted.org/packages/18/67/36e9267722cc04a6b9f15c7f3441c2363321a3ea07da7ae0c0707beb2a9c/typing_extensions-4.15.0-py3-none-any.whl", hash = "sha256:f0fa19c6845758ab08074a0cfa8b7aecb71c999ca73d62883bc25cc018c4e548", size = 44614, upload-time = "2025-08-25T13:49:24.86Z" }, ] + +[[package]] +name = "win-inet-pton" +version = "1.1.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/d9/da/0b1487b5835497dea00b00d87c2aca168bb9ca2e2096981690239e23760a/win_inet_pton-1.1.0.tar.gz", hash = "sha256:dd03d942c0d3e2b1cf8bab511844546dfa5f74cb61b241699fa379ad707dea4f", size = 2949, upload-time = "2019-02-19T17:46:23.925Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/be/31/ff772a44aa56319df8afbb0b34f1a856f66f05b9d5f1fed917849e47fdae/win_inet_pton-1.1.0-py2.py3-none-any.whl", hash = "sha256:eaf0193cbe7152ac313598a0da7313fb479f769343c0c16c5308f64887dc885b", size = 4848, upload-time = "2019-02-19T17:46:22.182Z" }, +]