Skip to content

Commit dc3538d

Browse files
committed
use upsert native SQL queries to update stats + do not update lastTs & period on late measures
- this is way quicker, and won't throw duplicate exception :) - also, do not update last timestamp or sample period if timestamp is late (less than current last_ts)
1 parent a22b055 commit dc3538d

4 files changed

Lines changed: 38 additions & 29 deletions

File tree

src/main/kotlin/ch/derlin/bbdata/common/stats/SqlStats.kt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ data class SqlStats(
3636
@Column(name = "avg_sample_period")
3737
var avgSamplePeriod: Double = .0
3838
) {
39+
/** THIS IS NOT USED ANYMORE see SqlStatsRepository for more details
3940
fun updateWithNewValue(v: NewValue) {
4041
if (nWrites > 0L) {
4142
val deltaMs = abs(v.timestamp!!.millis - lastTs!!.millis)
@@ -47,4 +48,6 @@ data class SqlStats(
4748
4849
nWrites += 1
4950
}
51+
*/
52+
5053
}

src/main/kotlin/ch/derlin/bbdata/common/stats/SqlStatsRepository.kt

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package ch.derlin.bbdata.common.stats
22

33
import ch.derlin.bbdata.Profiles
4+
import org.joda.time.DateTime
45
import org.springframework.context.annotation.Profile
56
import org.springframework.data.jpa.repository.JpaRepository
67
import org.springframework.data.jpa.repository.Modifying
@@ -15,4 +16,25 @@ import org.springframework.transaction.annotation.Transactional
1516

1617
@Profile(Profiles.SQL_STATS)
1718
@Repository
18-
interface SqlStatsRepository : JpaRepository<SqlStats, Long>
19+
interface SqlStatsRepository : JpaRepository<SqlStats, Long> {
20+
21+
@Transactional
22+
@Modifying
23+
@Query("""
24+
INSERT INTO stats (object_id, last_ts, n_writes)
25+
VALUES (:objectId, :ts, 1) ON DUPLICATE KEY UPDATE
26+
avg_sample_period = CASE WHEN VALUES(last_ts) > last_ts
27+
THEN ( (avg_sample_period * (n_writes-1)) + ABS((UNIX_TIMESTAMP(last_ts) * 1000) - (UNIX_TIMESTAMP(VALUES(last_ts)) * 1000)) ) / n_writes
28+
ELSE avg_sample_period
29+
END,
30+
n_writes = n_writes + 1,
31+
last_ts = GREATEST(last_ts, VALUES(last_ts))
32+
""", nativeQuery = true)
33+
fun updateWriteStats(objectId: Long, ts: DateTime)
34+
35+
@Transactional
36+
@Modifying
37+
@Query("INSERT INTO stats (object_id, n_reads) VALUES (:objectId, 1) ON DUPLICATE KEY UPDATE n_reads = n_reads + 1", nativeQuery = true)
38+
fun updateReadCounter(objectId: Long)
39+
40+
}

src/main/kotlin/ch/derlin/bbdata/common/stats/StatsLogic.kt

Lines changed: 11 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -52,13 +52,16 @@ class CassandraStatsLogic(private val objectStatsRepository: ObjectStatsReposito
5252
val objectStats = objectStatsRepository.findById(objectId).orElse(ObjectStats())
5353
val objectStatsCounter = objectStatsCounterRepository.findById(objectId).orElse(ObjectStatsCounter())
5454

55-
// compute new stats
56-
val deltaMs = abs(v.timestamp!!.millis - (objectStats.lastTimestamp ?: v.timestamp).millis)
57-
val nRecords = objectStatsCounter.nValues
58-
val newSamplePeriod = if (nRecords > 0) (objectStats.avgSamplePeriod * (nRecords - 1) + deltaMs) / nRecords else .0f
55+
// update object stats
56+
if (objectStats.lastTimestamp == null || v.timestamp!! > objectStats.lastTimestamp) {
57+
// update lastTs and compute sample period only if new value is not a late timestamp
58+
val deltaMs = abs(v.timestamp!!.millis - (objectStats.lastTimestamp ?: v.timestamp).millis)
59+
val nRecords = objectStatsCounter.nValues
60+
val newSamplePeriod = if (nRecords > 0) (objectStats.avgSamplePeriod * (nRecords - 1) + deltaMs) / nRecords else .0f
61+
objectStatsRepository.update(v.objectId.toInt(), newSamplePeriod, v.timestamp)
62+
}
5963

60-
// save new stats
61-
objectStatsRepository.update(v.objectId.toInt(), newSamplePeriod, v.timestamp)
64+
// increment counter
6265
objectStatsCounterRepository.updateWriteCounter(v.objectId.toInt())
6366
}
6467

@@ -88,30 +91,11 @@ class CassandraStatsLogic(private val objectStatsRepository: ObjectStatsReposito
8891
class SqlStatsLogic(private val statsRepository: SqlStatsRepository) : StatsLogic {
8992

9093
override fun updateStats(v: NewValue) {
91-
val stats = statsRepository.findById(v.objectId!!).orElse(SqlStats(objectId = v.objectId))
92-
stats.updateWithNewValue(v)
93-
statsRepository.save(stats)
94-
}
95-
96-
override fun updateAllStats(vs: List<NewValue>) {
97-
// get all unique objects whose objectIds are in vs
98-
val stats = vs.map { it.objectId!! }.toSet().map{ objectId ->
99-
objectId to statsRepository.findById(objectId).orElse(SqlStats(objectId = objectId))
100-
}.toMap()
101-
102-
// update each: this will also work when multiple values target the same object
103-
vs.forEach {
104-
stats[it.objectId]!!.updateWithNewValue(it)
105-
}
106-
107-
// use the bulk save option of MySQL repositories to speed up the process
108-
statsRepository.saveAll(stats.values)
94+
statsRepository.updateWriteStats(v.objectId!!, v.timestamp!!)
10995
}
11096

11197
override fun incrementReadCounter(objectId: Long) {
112-
val stats = statsRepository.findById(objectId).orElse(SqlStats(objectId = objectId))
113-
stats.nReads += 1
114-
statsRepository.save(stats)
98+
statsRepository.updateReadCounter(objectId)
11599
}
116100

117101
override fun getStats(objectId: Long): Stats {

src/test/kotlin/ch/derlin/bbdata/input/InputApiTest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class InputApiTest {
3333
private lateinit var restTemplate: TestRestTemplate
3434

3535
// asynchronous stats update, hence wait a bit (ms)
36-
val statsWait: Long = 5000
36+
val statsWait: Long = 100
3737

3838
companion object {
3939
val OBJ = 1

0 commit comments

Comments
 (0)