Skip to content

Commit 41ea434

Browse files
committed
feat(observability): log throttle events and AIMD transitions at INFO/DEBUG
Add structured log lines to the retry and adaptive-concurrency layers so operators can diagnose throttle storms from logs alone. - `pinecone._internal.http_client`: DEBUG per throttled retry attempt (status, host, attempt N/total, computed delay, Retry-After value) - `pinecone._internal.adaptive`: DEBUG on every AIMD limit decrease/increase (only on real transitions, not per-success); INFO on first throttle per host with a user-actionable message linking to docs - `tests/unit/_internal/test_retry_logging.py`: 6 new tests covering sync and async throttle logging, AIMD decrease/increase/ceiling-floor log behaviour, and first-throttle-per-host INFO dedupe - `docs/guides/retries.md`: new Observability section documenting log namespaces, INFO vs DEBUG semantics, and field meanings
1 parent 17431f7 commit 41ea434

4 files changed

Lines changed: 231 additions & 5 deletions

File tree

docs/guides/retries.md

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,68 @@ this matters.
312312

313313
---
314314

315+
## Observability
316+
317+
The SDK emits structured log records so you can diagnose retry storms and throttling
318+
pressure without adding instrumentation yourself.
319+
320+
### Log namespaces
321+
322+
| Logger | Events |
323+
|--------|--------|
324+
| `pinecone._internal.http_client` | Throttled HTTP response received; retry delay computed |
325+
| `pinecone._internal.adaptive` | AIMD concurrency limit transitions |
326+
327+
### INFO messages
328+
329+
An INFO-level record is emitted the **first time** a given host rate-limits a client
330+
instance:
331+
332+
```
333+
Rate limited by host=<host>. Adaptive concurrency will reduce in-flight requests.
334+
See https://docs.pinecone.io/python/retries for details.
335+
```
336+
337+
This fires once per host per `Pinecone` / `AsyncPinecone` object, so it surfaces in your
338+
logs without flooding them on repeated throttling.
339+
340+
### DEBUG messages
341+
342+
Enable DEBUG-level logging on the two namespaces above to see granular retry events:
343+
344+
```python
345+
import logging
346+
logging.getLogger("pinecone._internal.http_client").setLevel(logging.DEBUG)
347+
logging.getLogger("pinecone._internal.adaptive").setLevel(logging.DEBUG)
348+
```
349+
350+
**Throttle record** (emitted once per retry attempt that receives a retryable response):
351+
352+
```
353+
Throttled response: status=429 host=my-index.svc.pinecone.io attempt=1/4 delay=0.531s retry_after=absent
354+
```
355+
356+
Fields: `status` (HTTP status code), `host`, `attempt` (N of total attempts),
357+
`delay` (computed wait in seconds), `retry_after` (parsed `Retry-After` header value or
358+
`absent`).
359+
360+
**AIMD limit decrease** (emitted when the adaptive limiter reduces concurrency):
361+
362+
```
363+
AIMD limiter decreased: before=8 after=4 ceiling=8
364+
```
365+
366+
**AIMD limit increase** (emitted when the limiter recovers a concurrency slot):
367+
368+
```
369+
AIMD limiter increased: now=5 ceiling=8
370+
```
371+
372+
Increase records only fire on actual transitions — not on every successful request —
373+
so the volume is proportional to recovery events, not request throughput.
374+
375+
---
376+
315377
## See Also
316378

317379
- {doc}`/guides/error-handling` — Exception hierarchy, `RateLimitError.retry_after`, and how to catch specific errors

pinecone/_internal/adaptive.py

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,10 @@
1111

1212
from __future__ import annotations
1313

14+
import logging
1415
import threading
15-
from typing import TYPE_CHECKING
1616

17-
if TYPE_CHECKING:
18-
pass
17+
logger = logging.getLogger(__name__)
1918

2019

2120
class _AdaptiveLimiter:
@@ -48,16 +47,33 @@ def current_limit(self) -> int:
4847
def report_throttled(self) -> None:
4948
"""Halve the limit (floored at 1) and reset the success streak."""
5049
with self._lock:
50+
before = self._limit
5151
self._limit = max(1, self._limit // 2)
5252
self._success_streak = 0
53+
if before != self._limit:
54+
logger.debug(
55+
"AIMD limiter decreased: before=%d after=%d ceiling=%d",
56+
before,
57+
self._limit,
58+
self._ceiling,
59+
)
5360

5461
def report_success(self) -> None:
5562
"""Increment the success streak; bump limit by 1 if streak hits current limit."""
5663
with self._lock:
5764
self._success_streak += 1
65+
increased = False
5866
if self._success_streak >= self._limit:
59-
self._limit = min(self._ceiling, self._limit + 1)
67+
if self._limit < self._ceiling:
68+
self._limit = self._limit + 1
69+
increased = True
6070
self._success_streak = 0
71+
if increased:
72+
logger.debug(
73+
"AIMD limiter increased: now=%d ceiling=%d",
74+
self._limit,
75+
self._ceiling,
76+
)
6177

6278
def update_ceiling(self, ceiling: int) -> None:
6379
"""Re-anchor the ceiling (e.g., a later bulk call uses a different max_concurrency).
@@ -82,11 +98,12 @@ class _AdaptiveLimiterRegistry:
8298
each batch dispatch.
8399
"""
84100

85-
__slots__ = ("_limiters", "_lock")
101+
__slots__ = ("_ever_throttled", "_limiters", "_lock")
86102

87103
def __init__(self) -> None:
88104
self._lock = threading.Lock()
89105
self._limiters: dict[str, _AdaptiveLimiter] = {}
106+
self._ever_throttled: set[str] = set()
90107

91108
def get(self, host: str, ceiling: int) -> _AdaptiveLimiter:
92109
"""Return the limiter for ``host``, creating one with ``ceiling`` if absent.
@@ -113,5 +130,13 @@ def report_throttled(self, host: str) -> None:
113130
"""
114131
with self._lock:
115132
limiter = self._limiters.get(host)
133+
first_time = host not in self._ever_throttled
134+
self._ever_throttled.add(host)
116135
if limiter is not None:
117136
limiter.report_throttled()
137+
if first_time:
138+
logger.info(
139+
"Rate limited by host=%s. Adaptive concurrency will reduce in-flight requests. "
140+
"See https://docs.pinecone.io/python/retries for details.",
141+
host,
142+
)

pinecone/_internal/http_client.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,16 @@ def handle_request(self, request: httpx.Request) -> httpx.Response:
194194
response.close()
195195
delay = _compute_retry_after_delay(self._config, response, attempt, prev_delay)
196196
prev_delay = delay
197+
logger.debug(
198+
"Throttled response: status=%d host=%s attempt=%d/%d"
199+
" delay=%.3fs retry_after=%s",
200+
response.status_code,
201+
request.url.host,
202+
attempt + 1,
203+
self._config.max_retries + 1,
204+
delay,
205+
response.headers.get("retry-after", "absent"),
206+
)
197207
time.sleep(delay)
198208
else:
199209
return response
@@ -244,6 +254,16 @@ async def handle_async_request(self, request: httpx.Request) -> httpx.Response:
244254
await response.aclose()
245255
delay = _compute_retry_after_delay(self._config, response, attempt, prev_delay)
246256
prev_delay = delay
257+
logger.debug(
258+
"Throttled response: status=%d host=%s attempt=%d/%d"
259+
" delay=%.3fs retry_after=%s",
260+
response.status_code,
261+
request.url.host,
262+
attempt + 1,
263+
self._config.max_retries + 1,
264+
delay,
265+
response.headers.get("retry-after", "absent"),
266+
)
247267
await asyncio.sleep(delay)
248268
else:
249269
return response
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
"""Tests for throttle-event and AIMD-transition log lines."""
2+
3+
from __future__ import annotations
4+
5+
import logging
6+
from unittest.mock import AsyncMock, MagicMock, patch
7+
8+
import httpx
9+
import pytest
10+
11+
from pinecone._internal.adaptive import _AdaptiveLimiter, _AdaptiveLimiterRegistry
12+
from pinecone._internal.config import RetryConfig
13+
from pinecone._internal.http_client import _AsyncRetryTransport, _RetryTransport
14+
15+
16+
def _transport(max_retries: int = 1) -> tuple[_RetryTransport, MagicMock]:
17+
inner = MagicMock(spec=httpx.BaseTransport)
18+
cfg = RetryConfig(max_retries=max_retries, backoff_factor=0.001, max_wait=0.01)
19+
return _RetryTransport(transport=inner, retry_config=cfg), inner # type: ignore[arg-type]
20+
21+
22+
def _async_transport(max_retries: int = 1) -> tuple[_AsyncRetryTransport, AsyncMock]:
23+
inner = AsyncMock(spec=httpx.AsyncBaseTransport)
24+
cfg = RetryConfig(max_retries=max_retries, backoff_factor=0.001, max_wait=0.01)
25+
return _AsyncRetryTransport(transport=inner, retry_config=cfg), inner # type: ignore[arg-type]
26+
27+
28+
def _req() -> httpx.Request:
29+
return httpx.Request("POST", "https://example.com/test")
30+
31+
32+
def test_throttle_response_logs_debug_with_fields(caplog: pytest.LogCaptureFixture) -> None:
33+
rt, inner = _transport(max_retries=1)
34+
inner.handle_request.side_effect = [
35+
httpx.Response(429),
36+
httpx.Response(200),
37+
]
38+
with (
39+
caplog.at_level(logging.DEBUG, logger="pinecone._internal.http_client"),
40+
patch("pinecone._internal.http_client.time.sleep"),
41+
):
42+
rt.handle_request(_req())
43+
44+
throttle_records = [r for r in caplog.records if "Throttled response" in r.getMessage()]
45+
assert len(throttle_records) == 1
46+
msg = throttle_records[0].getMessage()
47+
assert throttle_records[0].levelname == "DEBUG"
48+
assert "status=429" in msg
49+
assert "host=example.com" in msg
50+
51+
52+
def test_aimd_decrease_logs_debug(caplog: pytest.LogCaptureFixture) -> None:
53+
lim = _AdaptiveLimiter(ceiling=8)
54+
with caplog.at_level(logging.DEBUG, logger="pinecone._internal.adaptive"):
55+
lim.report_throttled()
56+
57+
records = [r for r in caplog.records if "AIMD limiter decreased" in r.getMessage()]
58+
assert len(records) == 1
59+
assert "before=8 after=4" in records[0].getMessage()
60+
61+
62+
def test_aimd_increase_logs_debug_only_on_transition(caplog: pytest.LogCaptureFixture) -> None:
63+
lim = _AdaptiveLimiter(ceiling=8)
64+
lim.report_throttled() # 8 → 4; limit is now 4, streak is 0
65+
# Need exactly 4 successful calls to cross the threshold (streak reaches limit)
66+
with caplog.at_level(logging.DEBUG, logger="pinecone._internal.adaptive"):
67+
for _ in range(4):
68+
lim.report_success()
69+
70+
increase_records = [r for r in caplog.records if "AIMD limiter increased" in r.getMessage()]
71+
assert len(increase_records) == 1
72+
73+
74+
def test_aimd_no_log_when_at_ceiling(caplog: pytest.LogCaptureFixture) -> None:
75+
lim = _AdaptiveLimiter(ceiling=8)
76+
# No throttle; limit starts at ceiling (8)
77+
with caplog.at_level(logging.DEBUG, logger="pinecone._internal.adaptive"):
78+
for _ in range(100):
79+
lim.report_success()
80+
81+
increase_records = [r for r in caplog.records if "AIMD limiter increased" in r.getMessage()]
82+
assert len(increase_records) == 0
83+
84+
85+
def test_first_throttle_per_host_logs_info(caplog: pytest.LogCaptureFixture) -> None:
86+
reg = _AdaptiveLimiterRegistry()
87+
reg.get("api-1.pinecone.io", 8)
88+
89+
with caplog.at_level(logging.INFO, logger="pinecone._internal.adaptive"):
90+
reg.report_throttled("api-1.pinecone.io")
91+
reg.report_throttled("api-1.pinecone.io")
92+
93+
info_records = [
94+
r
95+
for r in caplog.records
96+
if r.levelname == "INFO" and "Rate limited by host=api-1.pinecone.io" in r.getMessage()
97+
]
98+
assert len(info_records) == 1
99+
100+
101+
@pytest.mark.asyncio
102+
async def test_async_throttle_response_logs_debug(caplog: pytest.LogCaptureFixture) -> None:
103+
rt, inner = _async_transport(max_retries=1)
104+
inner.handle_async_request.side_effect = [
105+
httpx.Response(429),
106+
httpx.Response(200),
107+
]
108+
with (
109+
caplog.at_level(logging.DEBUG, logger="pinecone._internal.http_client"),
110+
patch("pinecone._internal.http_client.asyncio.sleep"),
111+
):
112+
await rt.handle_async_request(_req())
113+
114+
throttle_records = [r for r in caplog.records if "Throttled response" in r.getMessage()]
115+
assert len(throttle_records) == 1
116+
msg = throttle_records[0].getMessage()
117+
assert throttle_records[0].levelname == "DEBUG"
118+
assert "status=429" in msg
119+
assert "host=example.com" in msg

0 commit comments

Comments
 (0)