1+ import json
12import threading
23from collections import defaultdict
34from datetime import datetime , timezone
1314
1415
1516class SpanBatcher (Batcher ["StreamedSpan" ]):
16- # TODO[span-first]: size-based flushes
1717 # TODO[span-first]: adjust flush/drop defaults
1818 MAX_BEFORE_FLUSH = 1000
19- MAX_BEFORE_DROP = 5000
19+ MAX_BEFORE_DROP = 1000
20+ MAX_KB_BEFORE_FLUSH = 5 * 1024 # 5 MB
2021 FLUSH_WAIT_TIME = 5.0
2122
2223 TYPE = "span"
@@ -33,6 +34,7 @@ def __init__(
3334 # envelope.
3435 # trace_id -> span buffer
3536 self ._span_buffer : dict [str , list ["StreamedSpan" ]] = defaultdict (list )
37+ self ._running_size : dict [str , int ] = defaultdict (lambda : 0 )
3638 self ._capture_func = capture_func
3739 self ._record_lost_func = record_lost_func
3840 self ._running = True
@@ -62,8 +64,20 @@ def add(self, span: "StreamedSpan") -> None:
6264 return None
6365
6466 self ._span_buffer [span .trace_id ].append (span )
67+
6568 if size + 1 >= self .MAX_BEFORE_FLUSH :
6669 self ._flush_event .set ()
70+ return
71+
72+ self ._running_size [span .trace_id ] += self ._estimate_size (span )
73+ if self ._running_size [span .trace_id ] >= self .MAX_KB_BEFORE_FLUSH :
74+ self ._flush_event .set ()
75+ return
76+
77+ @staticmethod
78+ def _estimate_size (item : "StreamedSpan" ) -> int :
79+ span_dict = SpanBatcher ._to_transport_format (item )
80+ return len (str (span_dict ).encode ("utf-8" )) / 1024
6781
6882 @staticmethod
6983 def _to_transport_format (item : "StreamedSpan" ) -> "Any" :
@@ -127,6 +141,7 @@ def _flush(self) -> None:
127141 envelopes .append (envelope )
128142
129143 self ._span_buffer .clear ()
144+ self ._running_size .clear ()
130145
131146 for envelope in envelopes :
132147 self ._capture_func (envelope )
0 commit comments