Skip to content
Open
111 changes: 83 additions & 28 deletions python/packages/jumpstarter/jumpstarter/client/lease.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
fail_after,
sleep,
)
from anyio.abc import SocketStream
from anyio.from_thread import BlockingPortal
from grpc.aio import AioRpcError, Channel
from jumpstarter_protocol import jumpstarter_pb2, jumpstarter_pb2_grpc
Expand Down Expand Up @@ -312,61 +313,103 @@ def __contextmanager__(self) -> Generator[Self]:
with self.portal.wrap_async_context_manager(self) as value:
yield value

async def handle_async(self, stream):
# DEADLINE_EXCEEDED and CANCELLED are excluded: they indicate client-side
# timeout or cancellation, not server/network transients worth retrying.
_TRANSIENT_GRPC_CODES = frozenset(
{
grpc.StatusCode.UNAVAILABLE,
grpc.StatusCode.RESOURCE_EXHAUSTED,
grpc.StatusCode.ABORTED,
grpc.StatusCode.INTERNAL,
}
)

# UNKNOWN error messages that indicate transient tunnel teardowns.
# We don't blanket-retry all UNKNOWN errors (they could be permanent
# server bugs), but specific messages like "watch channel closed" are
# known to occur during tunnel reconnection.
_TRANSIENT_UNKNOWN_MESSAGES = ("watch channel closed",)

@staticmethod
def _retry_delay(attempt: int, remaining: float, base: float = 0.3, cap: float = 5.0) -> float:
"""Compute exponential-backoff delay, capped by *cap* and *remaining* time."""
return min(base * (2**attempt), cap, remaining)

async def _dial_and_connect(self, stream: SocketStream, channel_ready_timeout: float = 10.0) -> None:
"""Single attempt; raises on failure for caller-driven retry."""
response = await self.controller.Dial(jumpstarter_pb2.DialRequest(lease_name=self.name))
async with connect_router_stream(
response.router_endpoint,
response.router_token,
stream,
self.tls_config,
self.grpc_options,
channel_ready_timeout=channel_ready_timeout,
):
pass
Comment on lines +340 to +349

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[HIGH] channel_ready_timeout is not bounded by the remaining dial_timeout deadline. _dial_and_connect calls connect_router_stream without passing channel_ready_timeout, so it defaults to 10s. The retry loop caps backoff sleep by remaining time, but channel.channel_ready() inside connect_router_stream is independent. When only 1-2s remain on dial_timeout, a single _dial_and_connect call can block for up to 10s, causing total wall-clock time to overshoot dial_timeout by up to 10s.

Suggestion: Pass remaining as an upper bound: await self._dial_and_connect(stream, channel_ready_timeout=max(min(10, remaining), 0.5)).

AI-generated, human reviewed

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Will pass min(10, remaining) (with a 0.5s floor) as channel_ready_timeout to _dial_and_connect so the channel-ready wait is bounded by the overall deadline.


async def handle_async(self, stream: SocketStream) -> None:
logger.debug("Connecting to Lease with name %s", self.name)
# Retry Dial with exponential backoff for transient "exporter not ready" errors.
# This handles the race condition where the client acquires a lease before
# the exporter has transitioned to LEASE_READY status.
# Uses time-based retry bounded by dial_timeout instead of fixed retry count.
base_delay = 0.3
max_delay = 2.0
# Retry Dial + router connection with exponential backoff.
# Handles FAILED_PRECONDITION (exporter not yet ready), transient
# network errors (tunnel drops), and OSError (unreachable endpoint).
# All error paths return instead of raising because handle_async runs
# inside TemporaryUnixListener.serve's task group -- an unhandled
# exception would crash the listener and terminate sibling connections.
deadline = time.monotonic() + self.dial_timeout
attempt = 0
while True:
remaining = deadline - time.monotonic()
channel_ready_timeout = max(min(10.0, remaining), 0)
try:
response = await self.controller.Dial(jumpstarter_pb2.DialRequest(lease_name=self.name))
break
await self._dial_and_connect(stream, channel_ready_timeout=channel_ready_timeout)
return
except AioRpcError as e:
remaining = deadline - time.monotonic()
if e.code() == grpc.StatusCode.FAILED_PRECONDITION and "not ready" in str(e.details()):
remaining = deadline - time.monotonic()
if remaining <= 0:
logger.debug(
logger.warning(
"Exporter not ready and dial timeout (%.1fs) exceeded after %d attempts",
self.dial_timeout,
attempt + 1,
)
raise
delay = min(base_delay * (2**attempt), max_delay, remaining)
return
delay = self._retry_delay(attempt, remaining)
logger.debug(
"Exporter not ready, retrying Dial in %.1fs (attempt %d, %.1fs remaining)",
"Exporter not ready, retrying in %.1fs (attempt %d, %.1fs remaining)",
delay,
attempt + 1,
remaining,
)
await sleep(delay)
attempt += 1
continue
if e.code() == grpc.StatusCode.UNAVAILABLE:
remaining = deadline - time.monotonic()
is_transient = e.code() in self._TRANSIENT_GRPC_CODES or (
e.code() == grpc.StatusCode.UNKNOWN
and any(msg in str(e.details()).lower() for msg in self._TRANSIENT_UNKNOWN_MESSAGES)
)
if is_transient:
if remaining <= 0:
logger.warning(
"Exporter unavailable and dial timeout (%.1fs) exceeded after %d attempts",
self.dial_timeout,
"Connection failed with transient error after %d attempts (%.1fs elapsed): %s",
attempt + 1,
self.dial_timeout,
Comment thread
raballew marked this conversation as resolved.
e.details(),
)
raise
delay = min(base_delay * (2**attempt), max_delay, remaining)
logger.warning(
"Exporter unavailable, retrying Dial in %.1fs (attempt %d, %.1fs remaining)",
return
delay = self._retry_delay(attempt, remaining)
logger.info(
"Connection failed with %s, retrying in %.1fs (attempt %d, %.1fs remaining): %s",
e.code().name,
delay,
attempt + 1,
remaining,
e.details(),
Comment on lines +402 to +407

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[MEDIUM] Three exception handlers repeat the same 4-line backoff pattern: delay = min(base_delay * (2**attempt), max_delay, remaining), log, await sleep(delay), attempt += 1, continue. The handle_async method is around 95 lines with deep nesting. Changing the backoff strategy (e.g., adding jitter) would require coordinated edits in three places.

Suggestion: Extract a helper like _compute_retry_delay(attempt, remaining) -> float to deduplicate the delay calculation while keeping the distinct log messages and timeout behaviors.

AI-generated, human reviewed

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Will extract _retry_delay(attempt, remaining) to centralize the backoff computation. The distinct log messages and timeout behaviors will stay in the caller.

)
await sleep(delay)
attempt += 1
continue
# Exporter went offline or lease ended - log and exit gracefully
if "permission denied" in str(e.details()).lower():
if e.code() == grpc.StatusCode.PERMISSION_DENIED:
self.lease_transferred = True
logger.warning(
"Lease %s has been transferred to another client. Your session is no longer valid.",
Expand All @@ -375,10 +418,22 @@ async def handle_async(self, stream):
else:
logger.warning("Connection to exporter lost: %s", e.details())

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[MEDIUM] No test verifies that PERMISSION_DENIED sets lease_transferred = True. When handle_async receives an AioRpcError with "permission denied" in the details, it sets self.lease_transferred = True. The surrounding control flow was restructured (errors now flow through _dial_and_connect and the new is_transient gate), making regression possible.

Suggestion: Add a test that raises AioRpcError(PERMISSION_DENIED, "permission denied") from _dial_and_connect and asserts lease.lease_transferred is True.

AI-generated, human reviewed

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Will add a test that raises AioRpcError(PERMISSION_DENIED, "permission denied") from _dial_and_connect and asserts lease.lease_transferred is True.

return
async with connect_router_stream(
response.router_endpoint, response.router_token, stream, self.tls_config, self.grpc_options
):
pass
except OSError as e:
remaining = deadline - time.monotonic()
if remaining > 0:
delay = self._retry_delay(attempt, remaining)
logger.info(
"Connection failed with OSError, retrying in %.1fs (attempt %d, %.1fs remaining): %s",
delay,
attempt + 1,
remaining,
e,
)
await sleep(delay)
attempt += 1
continue
logger.warning("Connection failed: %s", e)
return

@asynccontextmanager
async def serve_unix_async(self):
Expand Down
Loading
Loading