-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpersistence.go
More file actions
403 lines (343 loc) Β· 11 KB
/
persistence.go
File metadata and controls
403 lines (343 loc) Β· 11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
package matcher
import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"sync"
)
// JSONPersistence implements PersistenceInterface using JSON files
type JSONPersistence struct {
rulesPath string
dimensionsPath string
}
// NewJSONPersistence creates a new JSON persistence layer
func NewJSONPersistence(dataDir string) *JSONPersistence {
// Ensure directory exists
os.MkdirAll(dataDir, 0755)
return &JSONPersistence{
rulesPath: filepath.Join(dataDir, "rules.json"),
dimensionsPath: filepath.Join(dataDir, "dimensions.json"),
}
}
// LoadRules loads all rules from JSON file
func (jp *JSONPersistence) LoadRules(ctx context.Context) ([]*Rule, error) {
data, err := os.ReadFile(jp.rulesPath)
if err != nil {
if os.IsNotExist(err) {
return []*Rule{}, nil // Return empty slice if file doesn't exist
}
return nil, fmt.Errorf("failed to read rules file: %w", err)
}
var rules []*Rule
if err := json.Unmarshal(data, &rules); err != nil {
return nil, fmt.Errorf("failed to unmarshal rules: %w", err)
}
return rules, nil
}
// SaveRules saves all rules to JSON file
func (jp *JSONPersistence) SaveRules(ctx context.Context, rules []*Rule) error {
data, err := json.MarshalIndent(rules, "", " ")
if err != nil {
return fmt.Errorf("failed to marshal rules: %w", err)
}
if err := os.WriteFile(jp.rulesPath, data, 0644); err != nil {
return fmt.Errorf("failed to write rules file: %w", err)
}
return nil
}
// LoadRulesByTenant loads rules for a specific tenant and application
func (jp *JSONPersistence) LoadRulesByTenant(ctx context.Context, tenantID, applicationID string) ([]*Rule, error) {
// Load all rules and filter by tenant/application
allRules, err := jp.LoadRules(ctx)
if err != nil {
return nil, err
}
var filteredRules []*Rule
for _, rule := range allRules {
if rule.MatchesTenantContext(tenantID, applicationID) {
filteredRules = append(filteredRules, rule)
}
}
return filteredRules, nil
}
// LoadDimensionConfigs loads dimension configurations from JSON file
func (jp *JSONPersistence) LoadDimensionConfigs(ctx context.Context) ([]*DimensionConfig, error) {
data, err := os.ReadFile(jp.dimensionsPath)
if err != nil {
if os.IsNotExist(err) {
// Return default dimensions if file doesn't exist
return []*DimensionConfig{}, nil
}
return nil, fmt.Errorf("failed to read dimensions file: %w", err)
}
var configs []*DimensionConfig
if err := json.Unmarshal(data, &configs); err != nil {
return nil, fmt.Errorf("failed to unmarshal dimensions: %w", err)
}
return configs, nil
}
// SaveDimensionConfigs saves dimension configurations to JSON file
func (jp *JSONPersistence) SaveDimensionConfigs(ctx context.Context, configs []*DimensionConfig) error {
data, err := json.MarshalIndent(configs, "", " ")
if err != nil {
return fmt.Errorf("failed to marshal dimensions: %w", err)
}
if err := os.WriteFile(jp.dimensionsPath, data, 0644); err != nil {
return fmt.Errorf("failed to write dimensions file: %w", err)
}
return nil
}
// LoadDimensionConfigsByTenant loads dimension configurations for a specific tenant and application
func (jp *JSONPersistence) LoadDimensionConfigsByTenant(ctx context.Context, tenantID, applicationID string) ([]*DimensionConfig, error) {
// Load all dimension configs and filter by tenant/application
allConfigs, err := jp.LoadDimensionConfigs(ctx)
if err != nil {
return nil, err
}
var filteredConfigs []*DimensionConfig
for _, config := range allConfigs {
// Match tenant context or include global configs (empty tenant/app)
if (config.TenantID == "" && config.ApplicationID == "") ||
(config.TenantID == tenantID && config.ApplicationID == applicationID) {
filteredConfigs = append(filteredConfigs, config)
}
}
return filteredConfigs, nil
}
// Health checks if the persistence layer is healthy
func (jp *JSONPersistence) Health(ctx context.Context) error {
// Check if we can write to the directory
testFile := filepath.Join(filepath.Dir(jp.rulesPath), ".health_check")
if err := os.WriteFile(testFile, []byte("ok"), 0644); err != nil {
return fmt.Errorf("cannot write to data directory: %w", err)
}
os.Remove(testFile)
return nil
}
// MockEventSubscriber is a mock implementation for testing
type MockEventSubscriber struct {
events chan *Event
closed bool
mu sync.RWMutex
}
// NewMockEventSubscriber creates a new mock event subscriber
func NewMockEventSubscriber() *MockEventSubscriber {
return &MockEventSubscriber{
events: make(chan *Event, 100),
}
}
// Publish publishes an event to the mock broker
func (mes *MockEventSubscriber) Publish(ctx context.Context, event *Event) error {
mes.mu.RLock()
defer mes.mu.RUnlock()
if mes.closed {
return fmt.Errorf("subscriber is closed")
}
select {
case mes.events <- event:
return nil
case <-ctx.Done():
return ctx.Err()
default:
return fmt.Errorf("event queue is full")
}
}
// Subscribe starts listening for events
func (mes *MockEventSubscriber) Subscribe(ctx context.Context, events chan<- *Event) error {
go func() {
for {
select {
case event, ok := <-mes.events:
if !ok {
return
}
select {
case events <- event:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}()
return nil
}
// PublishEvent publishes an event (for testing) - deprecated, use Publish instead
func (mes *MockEventSubscriber) PublishEvent(event *Event) {
mes.mu.RLock()
closed := mes.closed
mes.mu.RUnlock()
if closed {
return
}
// Best-effort publish without context
select {
case mes.events <- event:
default:
// drop if full
}
}
// Close closes the subscriber
func (mes *MockEventSubscriber) Close() error {
mes.mu.Lock()
defer mes.mu.Unlock()
if !mes.closed {
mes.closed = true
close(mes.events)
}
return nil
}
// Health checks if the subscriber is healthy
func (mes *MockEventSubscriber) Health(ctx context.Context) error {
if mes.closed {
return fmt.Errorf("subscriber is closed")
}
return nil
}
// KafkaEventSubscriber implements EventSubscriberInterface using Kafka
// Note: This is a basic example. In production, you'd use a proper Kafka client library
type KafkaEventSubscriber struct {
brokers []string
topics []string
groupID string
// In a real implementation, you'd have a Kafka consumer here
// consumer *kafka.Consumer
eventsChan chan *Event
ctx context.Context
cancel context.CancelFunc
closed bool
}
// NewKafkaEventSubscriber creates a new Kafka event subscriber
func NewKafkaEventSubscriber(brokers []string, topics []string, groupID string) *KafkaEventSubscriber {
ctx, cancel := context.WithCancel(context.Background())
return &KafkaEventSubscriber{
brokers: brokers,
topics: topics,
groupID: groupID,
eventsChan: make(chan *Event, 1000),
ctx: ctx,
cancel: cancel,
}
}
// Subscribe starts listening for events from Kafka
func (kes *KafkaEventSubscriber) Subscribe(ctx context.Context, events chan<- *Event) error {
// In a real implementation, you would:
// 1. Create a Kafka consumer
// 2. Subscribe to the topics
// 3. Start consuming messages
// 4. Parse messages into Event structs
// 5. Send events to the events channel
// This is a placeholder implementation
go func() {
for {
select {
case event := <-kes.eventsChan:
if kes.closed {
return
}
select {
case events <- event:
case <-ctx.Done():
return
case <-kes.ctx.Done():
return
}
case <-ctx.Done():
return
case <-kes.ctx.Done():
return
}
}
}()
return nil
}
// Close closes the Kafka subscriber
func (kes *KafkaEventSubscriber) Close() error {
kes.closed = true
kes.cancel()
close(kes.eventsChan)
// In a real implementation, you would close the Kafka consumer here
// return kes.consumer.Close()
return nil
}
// Health checks if the Kafka subscriber is healthy
func (kes *KafkaEventSubscriber) Health(ctx context.Context) error {
if kes.closed {
return fmt.Errorf("kafka subscriber is closed")
}
// In a real implementation, you would check Kafka connection health
// For example, try to fetch metadata or ping the brokers
return nil
}
// DatabasePersistence implements PersistenceInterface using a SQL database
// This is a placeholder - you would use your preferred database driver
type DatabasePersistence struct {
connectionString string
// db *sql.DB
}
// NewDatabasePersistence creates a new database persistence layer
func NewDatabasePersistence(connectionString string) *DatabasePersistence {
return &DatabasePersistence{
connectionString: connectionString,
}
}
// LoadRules loads all rules from the database
func (dp *DatabasePersistence) LoadRules(ctx context.Context) ([]*Rule, error) {
// Placeholder implementation
// In a real implementation, you would:
// 1. Connect to the database
// 2. Execute a SELECT query to get all rules
// 3. Scan the results into Rule structs
// 4. Return the rules
return []*Rule{}, nil
}
// SaveRules saves all rules to the database
func (dp *DatabasePersistence) SaveRules(ctx context.Context, rules []*Rule) error {
// Placeholder implementation
// In a real implementation, you would:
// 1. Start a transaction
// 2. Clear existing rules (or use UPSERT)
// 3. Insert all new rules
// 4. Commit the transaction
return nil
}
// LoadDimensionConfigs loads dimension configurations from the database
func (dp *DatabasePersistence) LoadDimensionConfigs(ctx context.Context) ([]*DimensionConfig, error) {
// Placeholder implementation
return []*DimensionConfig{}, nil
}
// SaveDimensionConfigs saves dimension configurations to the database
func (dp *DatabasePersistence) SaveDimensionConfigs(ctx context.Context, configs []*DimensionConfig) error {
// Placeholder implementation
return nil
}
// LoadRulesByTenant loads rules for a specific tenant and application from the database
func (dp *DatabasePersistence) LoadRulesByTenant(ctx context.Context, tenantID, applicationID string) ([]*Rule, error) {
// Placeholder implementation
// In a real implementation, you would:
// 1. Connect to the database
// 2. Execute a SELECT query with WHERE clause for tenant_id and application_id
// 3. Scan the results into Rule structs
// 4. Return the filtered rules
return []*Rule{}, nil
}
// LoadDimensionConfigsByTenant loads dimension configurations for a specific tenant and application from the database
func (dp *DatabasePersistence) LoadDimensionConfigsByTenant(ctx context.Context, tenantID, applicationID string) ([]*DimensionConfig, error) {
// Placeholder implementation
// In a real implementation, you would:
// 1. Connect to the database
// 2. Execute a SELECT query with WHERE clause for tenant_id and application_id
// 3. Include global configs (empty tenant_id and application_id)
// 4. Scan the results into DimensionConfig structs
// 5. Return the filtered configs
return []*DimensionConfig{}, nil
}
// Health checks if the database connection is healthy
func (dp *DatabasePersistence) Health(ctx context.Context) error {
// Placeholder implementation
// In a real implementation, you would ping the database
return nil
}