-
Notifications
You must be signed in to change notification settings - Fork 32
fix: add retry logic for tunnel reconnection in jmp shell proxy #679
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
8f62700
7aca3ed
efc4c96
440541b
7562532
b197028
5d97fae
225e134
01d45a5
e0a6baa
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,6 +22,7 @@ | |
| fail_after, | ||
| sleep, | ||
| ) | ||
| from anyio.abc import SocketStream | ||
| from anyio.from_thread import BlockingPortal | ||
| from grpc.aio import AioRpcError, Channel | ||
| from jumpstarter_protocol import jumpstarter_pb2, jumpstarter_pb2_grpc | ||
|
|
@@ -312,61 +313,103 @@ def __contextmanager__(self) -> Generator[Self]: | |
| with self.portal.wrap_async_context_manager(self) as value: | ||
| yield value | ||
|
|
||
| async def handle_async(self, stream): | ||
| # DEADLINE_EXCEEDED and CANCELLED are excluded: they indicate client-side | ||
| # timeout or cancellation, not server/network transients worth retrying. | ||
| _TRANSIENT_GRPC_CODES = frozenset( | ||
| { | ||
| grpc.StatusCode.UNAVAILABLE, | ||
| grpc.StatusCode.RESOURCE_EXHAUSTED, | ||
| grpc.StatusCode.ABORTED, | ||
| grpc.StatusCode.INTERNAL, | ||
| } | ||
| ) | ||
|
|
||
| # UNKNOWN error messages that indicate transient tunnel teardowns. | ||
| # We don't blanket-retry all UNKNOWN errors (they could be permanent | ||
| # server bugs), but specific messages like "watch channel closed" are | ||
| # known to occur during tunnel reconnection. | ||
| _TRANSIENT_UNKNOWN_MESSAGES = ("watch channel closed",) | ||
|
|
||
| @staticmethod | ||
| def _retry_delay(attempt: int, remaining: float, base: float = 0.3, cap: float = 5.0) -> float: | ||
| """Compute exponential-backoff delay, capped by *cap* and *remaining* time.""" | ||
| return min(base * (2**attempt), cap, remaining) | ||
|
|
||
| async def _dial_and_connect(self, stream: SocketStream, channel_ready_timeout: float = 10.0) -> None: | ||
| """Single attempt; raises on failure for caller-driven retry.""" | ||
| response = await self.controller.Dial(jumpstarter_pb2.DialRequest(lease_name=self.name)) | ||
| async with connect_router_stream( | ||
| response.router_endpoint, | ||
| response.router_token, | ||
| stream, | ||
| self.tls_config, | ||
| self.grpc_options, | ||
| channel_ready_timeout=channel_ready_timeout, | ||
| ): | ||
| pass | ||
|
|
||
| async def handle_async(self, stream: SocketStream) -> None: | ||
| logger.debug("Connecting to Lease with name %s", self.name) | ||
| # Retry Dial with exponential backoff for transient "exporter not ready" errors. | ||
| # This handles the race condition where the client acquires a lease before | ||
| # the exporter has transitioned to LEASE_READY status. | ||
| # Uses time-based retry bounded by dial_timeout instead of fixed retry count. | ||
| base_delay = 0.3 | ||
| max_delay = 2.0 | ||
| # Retry Dial + router connection with exponential backoff. | ||
| # Handles FAILED_PRECONDITION (exporter not yet ready), transient | ||
| # network errors (tunnel drops), and OSError (unreachable endpoint). | ||
| # All error paths return instead of raising because handle_async runs | ||
| # inside TemporaryUnixListener.serve's task group -- an unhandled | ||
| # exception would crash the listener and terminate sibling connections. | ||
| deadline = time.monotonic() + self.dial_timeout | ||
| attempt = 0 | ||
| while True: | ||
| remaining = deadline - time.monotonic() | ||
| channel_ready_timeout = max(min(10.0, remaining), 0) | ||
| try: | ||
| response = await self.controller.Dial(jumpstarter_pb2.DialRequest(lease_name=self.name)) | ||
| break | ||
| await self._dial_and_connect(stream, channel_ready_timeout=channel_ready_timeout) | ||
| return | ||
| except AioRpcError as e: | ||
| remaining = deadline - time.monotonic() | ||
| if e.code() == grpc.StatusCode.FAILED_PRECONDITION and "not ready" in str(e.details()): | ||
| remaining = deadline - time.monotonic() | ||
| if remaining <= 0: | ||
| logger.debug( | ||
| logger.warning( | ||
| "Exporter not ready and dial timeout (%.1fs) exceeded after %d attempts", | ||
| self.dial_timeout, | ||
| attempt + 1, | ||
| ) | ||
| raise | ||
| delay = min(base_delay * (2**attempt), max_delay, remaining) | ||
| return | ||
| delay = self._retry_delay(attempt, remaining) | ||
| logger.debug( | ||
| "Exporter not ready, retrying Dial in %.1fs (attempt %d, %.1fs remaining)", | ||
| "Exporter not ready, retrying in %.1fs (attempt %d, %.1fs remaining)", | ||
| delay, | ||
| attempt + 1, | ||
| remaining, | ||
| ) | ||
| await sleep(delay) | ||
| attempt += 1 | ||
| continue | ||
| if e.code() == grpc.StatusCode.UNAVAILABLE: | ||
| remaining = deadline - time.monotonic() | ||
| is_transient = e.code() in self._TRANSIENT_GRPC_CODES or ( | ||
| e.code() == grpc.StatusCode.UNKNOWN | ||
| and any(msg in str(e.details()).lower() for msg in self._TRANSIENT_UNKNOWN_MESSAGES) | ||
| ) | ||
| if is_transient: | ||
| if remaining <= 0: | ||
| logger.warning( | ||
| "Exporter unavailable and dial timeout (%.1fs) exceeded after %d attempts", | ||
| self.dial_timeout, | ||
| "Connection failed with transient error after %d attempts (%.1fs elapsed): %s", | ||
| attempt + 1, | ||
| self.dial_timeout, | ||
|
raballew marked this conversation as resolved.
|
||
| e.details(), | ||
| ) | ||
| raise | ||
| delay = min(base_delay * (2**attempt), max_delay, remaining) | ||
| logger.warning( | ||
| "Exporter unavailable, retrying Dial in %.1fs (attempt %d, %.1fs remaining)", | ||
| return | ||
| delay = self._retry_delay(attempt, remaining) | ||
| logger.info( | ||
| "Connection failed with %s, retrying in %.1fs (attempt %d, %.1fs remaining): %s", | ||
| e.code().name, | ||
| delay, | ||
| attempt + 1, | ||
| remaining, | ||
| e.details(), | ||
|
Comment on lines
+402
to
+407
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [MEDIUM] Three exception handlers repeat the same 4-line backoff pattern: Suggestion: Extract a helper like AI-generated, human reviewed
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed. Will extract |
||
| ) | ||
| await sleep(delay) | ||
| attempt += 1 | ||
| continue | ||
| # Exporter went offline or lease ended - log and exit gracefully | ||
| if "permission denied" in str(e.details()).lower(): | ||
| if e.code() == grpc.StatusCode.PERMISSION_DENIED: | ||
| self.lease_transferred = True | ||
| logger.warning( | ||
| "Lease %s has been transferred to another client. Your session is no longer valid.", | ||
|
|
@@ -375,10 +418,22 @@ async def handle_async(self, stream): | |
| else: | ||
| logger.warning("Connection to exporter lost: %s", e.details()) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [MEDIUM] No test verifies that PERMISSION_DENIED sets Suggestion: Add a test that raises AI-generated, human reviewed
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point. Will add a test that raises |
||
| return | ||
| async with connect_router_stream( | ||
| response.router_endpoint, response.router_token, stream, self.tls_config, self.grpc_options | ||
| ): | ||
| pass | ||
| except OSError as e: | ||
| remaining = deadline - time.monotonic() | ||
| if remaining > 0: | ||
| delay = self._retry_delay(attempt, remaining) | ||
| logger.info( | ||
| "Connection failed with OSError, retrying in %.1fs (attempt %d, %.1fs remaining): %s", | ||
| delay, | ||
| attempt + 1, | ||
| remaining, | ||
| e, | ||
| ) | ||
| await sleep(delay) | ||
| attempt += 1 | ||
| continue | ||
| logger.warning("Connection failed: %s", e) | ||
| return | ||
|
|
||
| @asynccontextmanager | ||
| async def serve_unix_async(self): | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[HIGH]
channel_ready_timeoutis not bounded by the remainingdial_timeoutdeadline._dial_and_connectcallsconnect_router_streamwithout passingchannel_ready_timeout, so it defaults to 10s. The retry loop caps backoff sleep byremainingtime, butchannel.channel_ready()insideconnect_router_streamis independent. When only 1-2s remain ondial_timeout, a single_dial_and_connectcall can block for up to 10s, causing total wall-clock time to overshootdial_timeoutby up to 10s.Suggestion: Pass
remainingas an upper bound:await self._dial_and_connect(stream, channel_ready_timeout=max(min(10, remaining), 0.5)).AI-generated, human reviewed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. Will pass
min(10, remaining)(with a 0.5s floor) aschannel_ready_timeoutto_dial_and_connectso the channel-ready wait is bounded by the overall deadline.