-
Notifications
You must be signed in to change notification settings - Fork 854
Expand file tree
/
Copy pathtimeseriesv2.go
More file actions
187 lines (157 loc) · 4.66 KB
/
timeseriesv2.go
File metadata and controls
187 lines (157 loc) · 4.66 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
package cortexpb
import (
"sync"
"go.uber.org/atomic"
)
var dynamicSymbolsCapacity atomic.Int64
func init() {
dynamicSymbolsCapacity.Store(int64(initialSymbolsCapacity))
}
var (
initialSymbolsCapacity = 128
maxSymbolsCapacity = int64(8192)
slicePoolV2 = sync.Pool{
New: func() any {
return make([]PreallocTimeseriesV2, 0, expectedTimeseries)
},
}
timeSeriesPoolV2 = sync.Pool{
New: func() any {
return &TimeSeriesV2{
LabelsRefs: make([]uint32, 0, expectedLabels),
Samples: make([]Sample, 0, expectedSamplesPerSeries),
Histograms: make([]Histogram, 0, expectedHistogramsPerSeries),
Exemplars: make([]ExemplarV2, 0, expectedExemplarsPerSeries),
Metadata: MetadataV2{},
}
},
}
writeRequestPoolV2 = sync.Pool{
New: func() any {
return &PreallocWriteRequestV2{
WriteRequestV2: WriteRequestV2{
Symbols: make([]string, 0, dynamicSymbolsCapacity.Load()),
},
}
},
}
bytePoolV2 = newSlicePool(20)
)
// PreallocWriteRequestV2 is a WriteRequestV2 which preallocs slices on Unmarshal.
type PreallocWriteRequestV2 struct {
WriteRequestV2
data *[]byte
}
// Unmarshal implements proto.Message.
func (p *PreallocWriteRequestV2) Unmarshal(dAtA []byte) error {
p.Timeseries = PreallocTimeseriesV2SliceFromPool()
return p.WriteRequestV2.Unmarshal(dAtA)
}
func (p *PreallocWriteRequestV2) Marshal() (dAtA []byte, err error) {
size := p.Size()
p.data = bytePoolV2.getSlice(size)
dAtA = *p.data
n, err := p.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
// PreallocTimeseriesV2 is a TimeSeries which preallocs slices on Unmarshal.
type PreallocTimeseriesV2 struct {
*TimeSeriesV2
}
// Unmarshal implements proto.Message.
func (p *PreallocTimeseriesV2) Unmarshal(dAtA []byte) error {
p.TimeSeriesV2 = TimeseriesV2FromPool()
return p.TimeSeriesV2.Unmarshal(dAtA)
}
func ReuseWriteRequestV2(req *PreallocWriteRequestV2) {
if req.data != nil {
bytePoolV2.reuseSlice(req.data)
req.data = nil
}
req.Source = 0
// If the underlying array has grown beyond our acceptable maximum capacity,
// we discard this object instead of putting it back into the pool to let GC
// reclaim it.
symbolsCap := int64(cap(req.Symbols))
if symbolsCap > maxSymbolsCapacity {
if req.Timeseries != nil {
ReuseSliceV2(req.Timeseries)
req.Timeseries = nil
}
return
}
// Update the dynamic symbol capacity.
for {
current := dynamicSymbolsCapacity.Load()
// We use an EMA to update the capacity.
newAvg := max((current*9+symbolsCap*1)/10, int64(initialSymbolsCapacity))
if current == newAvg {
// nothing to change
break
}
if dynamicSymbolsCapacity.CompareAndSwap(current, newAvg) {
break
}
}
for i := range req.Symbols {
req.Symbols[i] = ""
}
req.Symbols = req.Symbols[:0]
if req.Timeseries != nil {
ReuseSliceV2(req.Timeseries)
req.Timeseries = nil
}
writeRequestPoolV2.Put(req)
}
func PreallocWriteRequestV2FromPool() *PreallocWriteRequestV2 {
return writeRequestPoolV2.Get().(*PreallocWriteRequestV2)
}
// Reset implements proto.Message and preserves the capacity of the Symbols slice.
func (p *PreallocWriteRequestV2) Reset() {
savedSymbols := p.Symbols
p.WriteRequestV2.Reset()
p.Symbols = savedSymbols[:0]
p.data = nil
}
// PreallocTimeseriesV2SliceFromPool retrieves a slice of PreallocTimeseriesV2 from a sync.Pool.
// ReuseSliceV2 should be called once done.
func PreallocTimeseriesV2SliceFromPool() []PreallocTimeseriesV2 {
return slicePoolV2.Get().([]PreallocTimeseriesV2)
}
// ReuseSliceV2 puts the slice back into a sync.Pool for reuse.
func ReuseSliceV2(ts []PreallocTimeseriesV2) {
for i := range ts {
ReuseTimeseriesV2(ts[i].TimeSeriesV2)
}
slicePoolV2.Put(ts[:0]) //nolint:staticcheck //see comment on slicePool for more details
}
// TimeseriesV2FromPool retrieves a pointer to a TimeSeriesV2 from a sync.Pool.
// ReuseTimeseriesV2 should be called once done, unless ReuseSliceV2 was called on the slice that contains this TimeSeriesV2 .
func TimeseriesV2FromPool() *TimeSeriesV2 {
return timeSeriesPoolV2.Get().(*TimeSeriesV2)
}
// ReuseTimeseriesV2 puts the timeseriesV2 back into a sync.Pool for reuse.
func ReuseTimeseriesV2(ts *TimeSeriesV2) {
// clear ts labelRef and samples
ts.LabelsRefs = ts.LabelsRefs[:0]
ts.Samples = ts.Samples[:0]
// clear metadata
ts.Metadata.Type = 0
ts.Metadata.UnitRef = 0
ts.Metadata.HelpRef = 0
// Clear CT
ts.CreatedTimestamp = 0
// clear exemplar label refs
for i := range ts.Exemplars {
ts.Exemplars[i].LabelsRefs = ts.Exemplars[i].LabelsRefs[:0]
}
for i := range ts.Histograms {
ts.Histograms[i].Reset()
}
ts.Exemplars = ts.Exemplars[:0]
ts.Histograms = ts.Histograms[:0]
timeSeriesPoolV2.Put(ts)
}