-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy pathImpressionCountsCacheInRedis.ts
More file actions
95 lines (80 loc) · 3.08 KB
/
ImpressionCountsCacheInRedis.ts
File metadata and controls
95 lines (80 loc) · 3.08 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
import { ILogger } from '../../logger/types';
import { ImpressionCountsPayload } from '../../sync/submitters/types';
import { forOwn } from '../../utils/lang';
import { ImpressionCountsCacheInMemory } from '../inMemory/ImpressionCountsCacheInMemory';
import { LOG_PREFIX, REFRESH_RATE, TTL_REFRESH } from './constants';
import type { RedisAdapter } from './RedisAdapter';
export class ImpressionCountsCacheInRedis extends ImpressionCountsCacheInMemory {
private readonly log: ILogger;
private readonly key: string;
private readonly redis: RedisAdapter;
private readonly refreshRate: number;
private intervalId: any;
constructor(log: ILogger, key: string, redis: RedisAdapter, impressionCountsCacheSize?: number, refreshRate = REFRESH_RATE) {
super(impressionCountsCacheSize);
this.log = log;
this.key = key;
this.redis = redis;
this.refreshRate = refreshRate;
this.onFullQueue = () => { this.postImpressionCountsInRedis(); };
}
private postImpressionCountsInRedis() {
const counts = this.pop();
const keys = Object.keys(counts);
if (!keys.length) return Promise.resolve(false);
const pipeline = this.redis.pipeline();
keys.forEach(key => {
pipeline.hincrby(this.key, key, counts[key]);
});
return pipeline.exec()
.then((data: [Error | null, unknown][] | null) => {
// If this is the creation of the key on Redis, set the expiration for it in 3600 seconds.
if (data && data.length && data.length === keys.length) {
return this.redis.expire(this.key, TTL_REFRESH);
}
})
.catch((err: unknown) => {
this.log.error(`${LOG_PREFIX}Error in impression counts pipeline: ${err}.`);
return false;
});
}
start() {
this.intervalId = setInterval(this.postImpressionCountsInRedis.bind(this), this.refreshRate);
}
stop() {
clearInterval(this.intervalId);
return this.postImpressionCountsInRedis();
}
// Async consumer API, used by synchronizer
getImpressionsCount(): Promise<ImpressionCountsPayload | undefined> {
return this.redis.hgetall(this.key)
.then((counts: Record<string, string>) => {
if (!Object.keys(counts).length) return undefined;
this.redis.del(this.key).catch(() => { /* no-op */ });
const pf: ImpressionCountsPayload['pf'] = [];
forOwn(counts, (count: string, key) => {
const nameAndTime = key.split('::');
if (nameAndTime.length !== 2) {
this.log.error(`${LOG_PREFIX}Error spliting key ${key}`);
return;
}
const timeFrame = parseInt(nameAndTime[1]);
if (isNaN(timeFrame)) {
this.log.error(`${LOG_PREFIX}Error parsing time frame ${nameAndTime[1]}`);
return;
}
const rawCount = parseInt(count);
if (isNaN(rawCount)) {
this.log.error(`${LOG_PREFIX}Error parsing raw count ${count}`);
return;
}
pf.push({
f: nameAndTime[0],
m: timeFrame,
rc: rawCount,
});
});
return { pf };
});
}
}