forked from grafana/cortex-tools
-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy pathblock_gen.go
More file actions
144 lines (118 loc) · 3.74 KB
/
block_gen.go
File metadata and controls
144 lines (118 loc) · 3.74 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package commands
import (
"context"
"log/slog"
"os"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"gopkg.in/alecthomas/kingpin.v2"
"gopkg.in/yaml.v3"
"github.com/cortexproject/cortex-tools/pkg/bench"
)
// BlockGenCommand is the kingpin command to generate blocks of mock data.
type BlockGenCommand struct {
Replicas int `yaml:"replicas"`
Series []bench.SeriesDesc `yaml:"series"`
Cfg BlockGenConfig `yaml:"block_gen"`
configFile string
}
type BlockGenConfig struct {
Interval time.Duration `yaml:"interval"`
BlockSize time.Duration `yaml:"block_size"`
BlockDir string `yaml:"block_dir"`
MinT int64 `yaml:"min_t"`
MaxT int64 `yaml:"max_t"`
}
// Register is used to register the command to a parent command.
func (f *BlockGenCommand) Register(app *kingpin.Application) {
app.Flag("config.file", "configuration file for this tool").Required().StringVar(&f.configFile)
app.Action(f.run)
}
func (f *BlockGenCommand) run(_ *kingpin.ParseContext) error {
logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
content, err := os.ReadFile(f.configFile)
if err != nil {
return errors.Wrap(err, "unable to read workload YAML file from the disk")
}
err = yaml.Unmarshal(content, &f)
if err != nil {
return errors.Wrap(err, "unable to unmarshal workload YAML file")
}
if f.Cfg.BlockDir == "" {
var err error
f.Cfg.BlockDir, err = os.MkdirTemp("", "mockdata")
if err != nil {
return errors.Wrap(err, "failed to create tmp dir")
}
}
seriesSet, totalSeriesTypeMap := bench.SeriesDescToSeries(f.Series)
totalSeries := 0
for _, typeTotal := range totalSeriesTypeMap {
totalSeries += typeTotal
}
writeWorkLoad := bench.WriteWorkload{
TotalSeries: totalSeries,
TotalSeriesTypeMap: totalSeriesTypeMap,
Replicas: f.Replicas,
Series: seriesSet,
}
interval := f.Cfg.Interval.Milliseconds()
blockSize := f.Cfg.BlockSize.Milliseconds()
level.Info(logger).Log("msg", "Generating data", "minT", f.Cfg.MinT, "maxT", f.Cfg.MaxT, "interval", interval)
currentTs := (int64(f.Cfg.MinT) + interval - 1) / interval * interval
ctx := context.Background()
currentBlockID := int64(-1)
lastBlockID := blockID(f.Cfg.MaxT, blockSize)
var w *tsdb.BlockWriter
for ; currentTs <= f.Cfg.MaxT; currentTs += interval {
if currentBlockID != blockID(currentTs, blockSize) {
if w != nil {
_, err = w.Flush(ctx)
if err != nil {
return err
}
}
currentBlockID = blockID(currentTs, blockSize)
level.Info(logger).Log("msg", "starting new block", "block_id", currentBlockID, "blocks_left", lastBlockID-currentBlockID+1)
w, err = tsdb.NewBlockWriter(slog.Default(), f.Cfg.BlockDir, blockSize)
if err != nil {
return err
}
}
timeSeries := writeWorkLoad.GenerateTimeSeries("block_gen", time.Unix(currentTs/1000, 0))
app := w.Appender(ctx)
for _, s := range timeSeries {
var ref storage.SeriesRef
lbls := prompbLabelsToLabelsLabels(s.Labels)
for _, sample := range s.Samples {
ref, err = app.Append(ref, lbls, sample.Timestamp, sample.Value)
if err != nil {
return err
}
}
}
err = app.Commit()
if err != nil {
return err
}
}
_, err = w.Flush(ctx)
level.Info(logger).Log("msg", "finished", "block_dir", f.Cfg.BlockDir)
return err
}
func blockID(ts, blockSize int64) int64 {
return ts / blockSize
}
func prompbLabelsToLabelsLabels(in []prompb.Label) labels.Labels {
b := labels.NewBuilder(labels.EmptyLabels())
for _, l := range in {
b.Set(l.Name, l.Value)
}
return b.Labels()
}