Skip to content

SpeechHandle.wait_for_playout() ignores interruption, causing 5s deadlock during tool preamble #5359

@tmtcam-042

Description

@tmtcam-042

Bug Description

SpeechHandle.wait_for_playout() awaits only _done_fut and is blind to _interrupt_fut. When user speech interrupts an agent turn during tool preamble playout, any code awaiting wait_for_playout() (e.g. a function tool waiting for its preamble to finish) is stuck until the 5-second INTERRUPTION_TIMEOUT hard-kills all tasks.

In our case, the interruption comes from API-injected user input (programmatic, not live speech), which means:

  • It bypasses VAD entirely — the adaptive interruption handling added in v1.5 cannot filter it
  • It arrives with no audio duration, so min_interruption_duration / min_interruption_words guards don't help
  • It's a legitimate, expected input path (e.g. operator messages injected via API while the agent is mid-turn)

This creates a ~5s unrecoverable dead window where:

  • The interrupted speech can't complete normally
  • The tool can't start executing
  • Speech scheduling gets paused (via drain/pause)
  • All subsequent user input is dropped ("skipping user input, speech scheduling is paused")

Expected Behavior

When API-injected user input (or any interruption) arrives during tool preamble playout:

  1. wait_for_playout() detects the interruption immediately (not after 5s) and raises
  2. The tool aborts cleanly — the SDK's tool execution handler catches the exception and produces an error tool output
  3. Speech scheduling remains active — the _scheduling_paused flag is never set because the speech handle resolves promptly instead of hanging until the timeout triggers drain
  4. The new user input from generate_reply(user_input=...) is processed normally by the agent as a new turn
  5. No cascading failures — no RPC timeouts, no worker drain, no process kill

The key difference: the agent recovers in milliseconds instead of dying in 5 seconds.

Reproduction Steps

1. Agent receives user input and the LLM decides to call a function tool                                                                                                                                                                                                               
2. LLM emits preamble text (e.g. "Let's get ready...") before the tool call                                                                                                                                                                                                          
3. The tool's implementation calls `RunContext.wait_for_playout()` (or `SpeechHandle.wait_for_playout()`) to wait for the preamble audio to finish before executing                                                                                                                    
4. While the preamble TTS is still playing out, programmatic input is injected via the API.                                                                                                                                                                                                  
5. `SpeechHandle.interrupt()` → `_cancel()` fires, setting `_interrupt_fut` and starting the 5s timeout                                                                                                                                                                                
6. `wait_for_playout()` at `speech_handle.py:154` does `await asyncio.shield(self._done_fut)` — it doesn't see `_interrupt_fut`                                                                                                                                                        
7. The tool hangs for up to 5 seconds until `_on_timeout()` force-cancels everything and calls `_mark_done()`                                                                                                                                                                          
8. By this point, scheduling is paused, the worker is draining, and the process is killed
...
- Sample code snippet, or a GitHub Gist link -



"""
Reproduction: wait_for_playout() ignores interruption, causing a ~5s
deadlock when user input arrives (via API or otherwise) during tool
preamble playout.

Requires: livekit-agents>=1.3.12, livekit-plugins-openai, livekit
Env vars: LIVEKIT_URL, LIVEKIT_API_KEY, LIVEKIT_API_SECRET, OPENAI_API_KEY

    python repro.py start
"""

from __future__ import annotations

import asyncio
import logging
import time

from livekit.agents import AgentSession, JobContext, WorkerOptions, cli
from livekit.agents.llm import function_tool
from livekit.agents.voice import Agent, RunContext
from livekit.plugins.openai.realtime import RealtimeModel

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("repro")


class ReproAgent(Agent):
    def __init__(self) -> None:
        super().__init__(
            instructions=(
                "You have a tool called `slow_tool`. "
                "Say a short preamble like 'Sure, let me run that for you now' "
                "then call the tool. Always call the tool."
            ),
            llm=RealtimeModel(
                model="gpt-realtime",
                modalities=["text", "audio"],
            ),
        )

    async def on_enter(self) -> None:
        """Kick off the flow — ask the LLM to call slow_tool."""
        self.session.generate_reply(user_input="Please run the slow tool for me")

    @function_tool
    async def slow_tool(self, context: RunContext) -> str:
        """Run a slow operation. Say a short preamble before calling this."""
        t0 = time.monotonic()
        logger.info("slow_tool: waiting for preamble playout…")

        # This blocks until preamble audio finishes — but if interrupted,
        # it hangs until the 5s INTERRUPTION_TIMEOUT hard-kills everything.
        await context.wait_for_playout()

        elapsed = time.monotonic() - t0
        logger.info(f"slow_tool: wait_for_playout returned after {elapsed:.2f}s")

        if elapsed > 4.0:
            logger.error(
                f"BUG REPRODUCED — wait_for_playout blocked for {elapsed:.2f}s "
                "(expected <1s, got ≈INTERRUPTION_TIMEOUT)"
            )
        return "done"


async def entrypoint(ctx: JobContext) -> None:
    await ctx.connect()

    session = AgentSession()
    await session.start(agent=ReproAgent(), room=ctx.room)

    # Wait for the LLM to start emitting the preamble, then interrupt
    # via the same API path used in production: interrupt + generate_reply.
    # 2s is enough for the LLM to have emitted preamble text and for TTS
    # to begin playout, but before wait_for_playout() resolves.
    await asyncio.sleep(2)

    logger.info("Injecting API user input to interrupt during preamble…")
    session.interrupt(force=True)
    session.generate_reply(
        user_input="Actually, never mind, do something else!",
        allow_interruptions=True,
    )

    # Observe: slow_tool's wait_for_playout will block for ~5s
    # (INTERRUPTION_TIMEOUT), then logs:
    #   "speech not done in time after interruption, cancelling the speech arbitrarily"
    # followed by force-cancellation of the tool task.


if __name__ == "__main__":
    cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint))

Operating System

Ubuntu 24.04.4 LTS

Models Used

OpenAI gpt-realtime, Elevenlabs Flash v2.5

Package Versions

livekit==1.0.23
livekit-agents==1.3.12
livekit-api==1.0.7
livekit-plugins-elevenlabs==1.3.12
livekit-plugins-openai==1.3.12

Session/Room/Call IDs

No response

Proposed Solution

RunContext.wait_for_playout() should race the generation future against the speech handle's interrupt future. Currently in events.py:81:                                                                                                                                               
                 

  # Current — interrupt-unaware
  async def wait_for_playout(self) -> None:                                                                                                                                                                                                                                              
      await self.speech_handle._wait_for_generation(step_idx=self._initial_step_idx)
                                                                                                                                                                                                                                                                                         
  Should become something like:                                                                                                                                                                                                                                                          
                                                                                                                                                                                                                                                                                         
  async def wait_for_playout(self) -> None:                                                                                                                                                                                                                                              
      gen_fut = asyncio.ensure_future(                                                                                                                                                                                                                                                   
          self.speech_handle._wait_for_generation(step_idx=self._initial_step_idx)                                                                                                                                                                                                       
      )                                                                                                                                                                                                                                                                                  
      interrupt_fut = asyncio.ensure_future(                                                                                                                                                                                                                                             
          asyncio.shield(self.speech_handle._interrupt_fut)                                                                                                                                                                                                                              
      )                                                                                                                                                                                                                                                                                  
      done, pending = await asyncio.wait(                                                                                                                                                                                                                                                
          {gen_fut, interrupt_fut}, return_when=asyncio.FIRST_COMPLETED                                                                                                                                                                                                                  
      )                                                                                                                                                                                                                                                                                  
      for p in pending:                                                                                                                                                                                                                                                                  
          p.cancel()                                                                                                                                                                                                                                                                     
      if self.speech_handle.interrupted:                                                                                                                                                                                                                                                 
          raise InterruptedError("Speech was interrupted during playout")                                                                                                                                                                                                                

         
                                                                                                                                                                                                                                                                                
  Same pattern should apply to SpeechHandle.wait_for_playout() (_done_fut vs _interrupt_fut). Note the SDK already has this exact pattern internallySpeechHandle.wait_if_not_interrupted() does exactly this race. It's just not used in the playout wait paths

Additional Context

No response

Screenshots and Recordings

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions