Skip to content

Commit e2dd532

Browse files
committed
Merge branch 'main' of github.com:sensorbucket/SensorBucket
2 parents e580380 + d302f3c commit e2dd532

9 files changed

Lines changed: 261 additions & 153 deletions

File tree

.github/workflows/build-docker-images.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ jobs:
9696
${{ env.REGISTRY }}/${{ matrix.image }}
9797
tags: |
9898
type=edge
99-
type=edge,suffix=-{{date 'YYMMDDhhmm'}}
99+
type=edge,suffix=-{{date 'YYMMDDHHmm'}}
100100
type=sha
101101
type=semver,pattern={{major}}
102102
type=semver,pattern={{major}}.{{minor}}

docker-compose.yaml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ services:
2424
- EP_WORKERS=http://caddy/api
2525
- PROFILER_ADDR=:3100
2626

27-
2827
web-importer:
2928
user: "1000:1000"
3029
build:

services/core/main.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,9 @@ func Run(cleanup cleanupper.Cleanupper) error {
107107
return fmt.Errorf("could not convert SYS_ARCHIVE_TIME to integer: %w", err)
108108
}
109109
measurementstore := measurementsinfra.NewPSQL(pool)
110+
storageErrorPublisher := measurementsinfra.NewStorageErrorPublisher(amqpConn, AMQP_XCHG_PIPELINE_MESSAGES)
110111
measurementservice := measurements.New(measurementstore, sysArchiveTime, MEASUREMENT_BATCH_SIZE, keyClient)
111-
cleanup.Add(measurementservice.StartMeasurementBatchStorer(time.Duration(MEASUREMENT_COMMIT_INTERVAL) * time.Millisecond))
112+
// cleanup.Add(measurementservice.StartMeasurementBatchStorer(time.Duration(MEASUREMENT_COMMIT_INTERVAL) * time.Millisecond))
112113

113114
processingstore := processinginfra.NewPSQLStore(db)
114115
processingPipelinePublisher := processinginfra.NewPipelineMessagePublisher(amqpConn, AMQP_XCHG_PIPELINE_MESSAGES)
@@ -123,7 +124,7 @@ func Run(cleanup cleanupper.Cleanupper) error {
123124
AMQP_QUEUE_MEASUREMENTS,
124125
AMQP_XCHG_PIPELINE_MESSAGES,
125126
AMQP_XCHG_MEASUREMENTS_TOPIC,
126-
measurements.MQMessageProcessor(measurementservice),
127+
measurements.MQMessageProcessor(measurementservice, storageErrorPublisher),
127128
)
128129
go mq.StartQueueProcessor(
129130
amqpConn,

services/core/measurements/application.go

Lines changed: 76 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,13 @@ package measurements
44

55
import (
66
"context"
7+
"errors"
78
"fmt"
8-
"log"
99
"time"
1010

1111
"github.com/google/uuid"
1212
"github.com/samber/lo"
1313

14-
"sensorbucket.nl/sensorbucket/internal/cleanupper"
1514
"sensorbucket.nl/sensorbucket/internal/pagination"
1615
"sensorbucket.nl/sensorbucket/pkg/auth"
1716
"sensorbucket.nl/sensorbucket/pkg/pipeline"
@@ -25,84 +24,85 @@ type Store interface {
2524
GetDatastream(ctx context.Context, id uuid.UUID, filter DatastreamFilter) (*Datastream, error)
2625
FindOrCreateDatastream(ctx context.Context, tenantID, sensorID int64, observedProperty, UnitOfMeasurement string) (*Datastream, error)
2726
StoreMeasurements(context.Context, []Measurement) error
27+
StoreMeasurement(context.Context, Measurement) error
2828
}
2929

3030
// Service is the measurement service which stores measurement data.
3131
type Service struct {
32-
store Store
33-
systemArchiveTime int
34-
keyClient auth.JWKSClient
35-
measurementBatchChan chan Measurement
36-
measurementBatch []Measurement
32+
store Store
33+
systemArchiveTime int
34+
keyClient auth.JWKSClient
35+
// measurementBatchChan chan Measurement
36+
// measurementBatch []Measurement
3737
}
3838

3939
func New(store Store, systemArchiveTime, batchSize int, keyClient auth.JWKSClient) *Service {
4040
return &Service{
41-
store: store,
42-
systemArchiveTime: systemArchiveTime,
43-
keyClient: keyClient,
44-
measurementBatch: make([]Measurement, 0, batchSize),
45-
measurementBatchChan: make(chan Measurement, batchSize),
41+
store: store,
42+
systemArchiveTime: systemArchiveTime,
43+
keyClient: keyClient,
44+
// measurementBatch: make([]Measurement, 0, batchSize),
45+
// measurementBatchChan: make(chan Measurement, batchSize),
4646
}
4747
}
4848

49-
func (s *Service) StartMeasurementBatchStorer(interval time.Duration) cleanupper.Shutdown {
50-
stop := make(chan struct{})
51-
done := make(chan struct{})
52-
t := time.NewTicker(interval)
53-
54-
go func() {
55-
log.Println("Measurement service batch storer started")
56-
defer log.Println("Measurement service batch storer stopped!")
57-
outer:
58-
for {
59-
select {
60-
case <-stop:
61-
if err := s.CommitBatch(false); err != nil {
62-
log.Printf("error committing batch: %s\n", err.Error())
63-
}
64-
break outer
65-
case m := <-s.measurementBatchChan:
66-
s.measurementBatch = append(s.measurementBatch, m)
67-
if len(s.measurementBatch) == cap(s.measurementBatch) {
68-
if err := s.CommitBatch(false); err != nil {
69-
log.Printf("error committing batch: %s\n", err.Error())
70-
}
71-
}
72-
case <-t.C:
73-
if err := s.CommitBatch(false); err != nil {
74-
log.Printf("error committing batch: %s\n", err.Error())
75-
}
76-
}
77-
}
78-
close(done)
79-
}()
80-
81-
return func(ctx context.Context) error {
82-
close(stop)
83-
<-done
84-
return nil
85-
}
86-
}
87-
88-
func (s *Service) CommitBatch(collect bool) error {
89-
if len(s.measurementBatch) == 0 {
90-
if !collect || len(s.measurementBatchChan) == 0 {
91-
return nil
92-
}
93-
count := len(s.measurementBatchChan)
94-
for i := 0; i < count; i++ {
95-
s.measurementBatch = append(s.measurementBatch, <-s.measurementBatchChan)
96-
}
97-
}
98-
log.Printf("Committing %d measurements\n", len(s.measurementBatch))
99-
err := s.store.StoreMeasurements(context.Background(), s.measurementBatch)
100-
if err != nil {
101-
return fmt.Errorf("committing measurements failed: %w", err)
102-
}
103-
s.measurementBatch = s.measurementBatch[:0]
104-
return nil
105-
}
49+
// func (s *Service) StartMeasurementBatchStorer(interval time.Duration) cleanupper.Shutdown {
50+
// stop := make(chan struct{})
51+
// done := make(chan struct{})
52+
// t := time.NewTicker(interval)
53+
//
54+
// go func() {
55+
// log.Println("Measurement service batch storer started")
56+
// defer log.Println("Measurement service batch storer stopped!")
57+
// outer:
58+
// for {
59+
// select {
60+
// case <-stop:
61+
// if err := s.CommitBatch(false); err != nil {
62+
// log.Printf("error committing batch: %s\n", err.Error())
63+
// }
64+
// break outer
65+
// case m := <-s.measurementBatchChan:
66+
// s.measurementBatch = append(s.measurementBatch, m)
67+
// if len(s.measurementBatch) == cap(s.measurementBatch) {
68+
// if err := s.CommitBatch(false); err != nil {
69+
// log.Printf("error committing batch: %s\n", err.Error())
70+
// }
71+
// }
72+
// case <-t.C:
73+
// if err := s.CommitBatch(false); err != nil {
74+
// log.Printf("error committing batch: %s\n", err.Error())
75+
// }
76+
// }
77+
// }
78+
// close(done)
79+
// }()
80+
//
81+
// return func(ctx context.Context) error {
82+
// close(stop)
83+
// <-done
84+
// return nil
85+
// }
86+
// }
87+
//
88+
// func (s *Service) CommitBatch(collect bool) error {
89+
// if len(s.measurementBatch) == 0 {
90+
// if !collect || len(s.measurementBatchChan) == 0 {
91+
// return nil
92+
// }
93+
// count := len(s.measurementBatchChan)
94+
// for range count {
95+
// s.measurementBatch = append(s.measurementBatch, <-s.measurementBatchChan)
96+
// }
97+
// }
98+
// log.Printf("Committing %d measurements\n", len(s.measurementBatch))
99+
// err := s.store.StoreMeasurements(context.Background(), s.measurementBatch)
100+
// if err != nil {
101+
// return fmt.Errorf("committing measurements failed: %w", err)
102+
// }
103+
// s.measurementBatch = s.measurementBatch[:0]
104+
// return nil
105+
// }
106106

107107
func (s *Service) ProcessPipelineMessage(pmsg pipeline.Message) error {
108108
msg := PipelineMessage(pmsg)
@@ -135,10 +135,12 @@ func (s *Service) ProcessPipelineMessage(pmsg pipeline.Message) error {
135135
CreatedAt: time.Now(),
136136
}
137137

138+
var errs []error
138139
for _, m := range msg.Measurements {
139140
sensor, err := dev.GetSensorByExternalIDOrFallback(m.SensorExternalID)
140141
if err != nil {
141-
return fmt.Errorf("cannot get sensor: %w", err)
142+
errs = append(errs, fmt.Errorf("cannog get sensor: %w", err))
143+
continue
142144
}
143145
if sensor.ExternalID != m.SensorExternalID {
144146
m.ObservedProperty = m.SensorExternalID + "_" + m.ObservedProperty
@@ -148,7 +150,8 @@ func (s *Service) ProcessPipelineMessage(pmsg pipeline.Message) error {
148150

149151
ds, err := s.store.FindOrCreateDatastream(ctx, msg.TenantID, sensor.ID, m.ObservedProperty, m.UnitOfMeasurement)
150152
if err != nil {
151-
return err
153+
errs = append(errs, fmt.Errorf("cannog get sensor: %w", err))
154+
continue
152155
}
153156

154157
measurement := baseMeasurement
@@ -186,10 +189,10 @@ func (s *Service) ProcessPipelineMessage(pmsg pipeline.Message) error {
186189
measurement.MeasurementAltitude = m.Altitude
187190
}
188191

189-
s.measurementBatchChan <- measurement
192+
errs = append(errs, s.store.StoreMeasurement(ctx, measurement))
190193
}
191194

192-
return nil
195+
return errors.Join(errs...)
193196
}
194197

195198
// Filter contains query information for a list of measurements

services/core/measurements/application_test.go

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ func TestShouldErrorIfNoDeviceOrNoSensor(t *testing.T) {
135135
FindOrCreateDatastreamFunc: func(ctx context.Context, tenantID, sensorID int64, observedProperty, UnitOfMeasurement string) (*measurements.Datastream, error) {
136136
return &measurements.Datastream{}, nil
137137
},
138+
StoreMeasurementFunc: func(contextMoqParam context.Context, measurement measurements.Measurement) error { return nil },
138139
}
139140
svc := measurements.New(store, 0, 1, authtest.JWKS())
140141

@@ -190,19 +191,18 @@ func TestShouldCopyOverDefaultFields(t *testing.T) {
190191
FindOrCreateDatastreamFunc: func(ctx context.Context, tenantID, sensorID int64, observedProperty, UnitOfMeasurement string) (*measurements.Datastream, error) {
191192
return &ds, nil
192193
},
193-
StoreMeasurementsFunc: func(ctx context.Context, measurementsMoqParam []measurements.Measurement) error { return nil },
194+
StoreMeasurementFunc: func(contextMoqParam context.Context, measurement measurements.Measurement) error { return nil },
194195
}
195196
svc := measurements.New(store, 0, 1, authtest.JWKS())
196197

197198
// Act
198199
err = svc.ProcessPipelineMessage(msg)
199200
require.NoError(t, err)
200-
assert.NoError(t, svc.CommitBatch(true))
201+
// assert.NoError(t, svc.CommitBatch(true))
201202

202203
// Assert
203-
require.Len(t, store.calls.StoreMeasurements, 1, "StoreMeasurements should've been called")
204-
require.Len(t, store.calls.StoreMeasurements[0].MeasurementsMoqParam, 1, "StoreMeasurements should've been supplied a measurements")
205-
measurement := store.calls.StoreMeasurements[0].MeasurementsMoqParam[0]
204+
require.Len(t, store.calls.StoreMeasurement, 1, "StoreMeasurements should've been called")
205+
measurement := store.calls.StoreMeasurement[0].Measurement
206206
assert.Equal(t, msg.TracingID, measurement.UplinkMessageID)
207207
// assert.Equal(t, OrganisationName, measurement.OrganisationName)
208208
// assert.Equal(t, OrganisationAddress, measurement.OrganisationAddress)
@@ -327,20 +327,19 @@ func TestShouldChooseMeasurementLocationOverDeviceLocation(t *testing.T) {
327327
FindOrCreateDatastreamFunc: func(ctx context.Context, tenantID, sensorID int64, observedProperty, UnitOfMeasurement string) (*measurements.Datastream, error) {
328328
return &ds, nil
329329
},
330-
StoreMeasurementsFunc: func(ctx context.Context, measurementsMoqParam []measurements.Measurement) error { return nil },
330+
StoreMeasurementFunc: func(contextMoqParam context.Context, measurement measurements.Measurement) error { return nil },
331331
}
332332
svc := measurements.New(store, 0, 1, authtest.JWKS())
333333

334334
// Act
335335
require.NoError(t,
336336
svc.ProcessPipelineMessage(msg),
337337
)
338-
assert.NoError(t, svc.CommitBatch(true))
338+
// assert.NoError(t, svc.CommitBatch(true))
339339

340340
// Assert
341-
require.Len(t, store.calls.StoreMeasurements, 1, "StoreMeasurements should've been called")
342-
require.Len(t, store.calls.StoreMeasurements[0].MeasurementsMoqParam, 1, "StoreMeasurements should've been supplied a measurements")
343-
measurement := store.calls.StoreMeasurements[0].MeasurementsMoqParam[0]
341+
require.Len(t, store.calls.StoreMeasurement, 1, "StoreMeasurements should've been called")
342+
measurement := store.calls.StoreMeasurement[0].Measurement
344343
assert.Equal(t, tC.ExpectedLatitude, measurement.MeasurementLatitude)
345344
assert.Equal(t, tC.ExpectedLongitude, measurement.MeasurementLongitude)
346345
assert.Equal(t, tC.ExpectedAltitude, measurement.MeasurementAltitude)
@@ -408,19 +407,18 @@ func TestShouldSetExpirationDate(t *testing.T) {
408407
FindOrCreateDatastreamFunc: func(ctx context.Context, tenantID, sensorID int64, observedProperty, UnitOfMeasurement string) (*measurements.Datastream, error) {
409408
return &ds, nil
410409
},
411-
StoreMeasurementsFunc: func(ctx context.Context, measurementsMoqParam []measurements.Measurement) error { return nil },
410+
StoreMeasurementFunc: func(contextMoqParam context.Context, measurement measurements.Measurement) error { return nil },
412411
}
413412
svc := measurements.New(store, sysArchiveTime, 1, authtest.JWKS())
414413

415414
// Act
416415
err = svc.ProcessPipelineMessage(msg)
417416
require.NoError(t, err)
418-
assert.NoError(t, svc.CommitBatch(true))
417+
// assert.NoError(t, svc.CommitBatch(true))
419418

420419
// Assert
421-
require.Len(t, store.calls.StoreMeasurements, 1, "StoreMeasurements should've been called")
422-
require.Len(t, store.calls.StoreMeasurements[0].MeasurementsMoqParam, 1, "StoreMeasurements should've been supplied a measurements")
423-
measurement := store.calls.StoreMeasurements[0].MeasurementsMoqParam[0]
420+
require.Len(t, store.calls.StoreMeasurement, 1, "StoreMeasurements should've been called")
421+
measurement := store.calls.StoreMeasurement[0].Measurement
424422
// Check if the difference in seconds is 0, otherwise there might be a subsecond difference
425423
// due to parsing
426424
assert.Equal(t,
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package measurementsinfra
2+
3+
import (
4+
"encoding/json"
5+
"log/slog"
6+
7+
"github.com/rabbitmq/amqp091-go"
8+
"sensorbucket.nl/sensorbucket/pkg/mq"
9+
"sensorbucket.nl/sensorbucket/services/core/measurements"
10+
)
11+
12+
var logger = slog.Default()
13+
14+
func NewStorageErrorPublisher(conn *mq.AMQPConnection, xchg string) measurements.StorageErrorPublisher {
15+
publisher := conn.Publisher(xchg, func(c *amqp091.Channel) error {
16+
return c.ExchangeDeclare(xchg, "topic", true, false, false, false, nil)
17+
})
18+
messageChan := make(chan *measurements.StorageError, mq.DefaultPrefetch())
19+
go func() {
20+
for msg := range messageChan {
21+
body, err := json.Marshal(msg)
22+
if err != nil {
23+
logger.Warn("Could not marshal storage error", "error", msg)
24+
continue
25+
}
26+
publisher <- mq.PublishMessage{
27+
Topic: "storage_errors",
28+
Publishing: amqp091.Publishing{
29+
MessageId: msg.TracingID,
30+
Body: body,
31+
},
32+
}
33+
}
34+
}()
35+
36+
return messageChan
37+
}

0 commit comments

Comments
 (0)