Skip to content

Commit 0d35bf6

Browse files
committed
Spinner Options
-Users can now configure default spinner behavior or provide their own. -Provides future support for us to create other spinners that users can choose to deploy. -Publishers and Subscribers with queue sizes < 1 will now have infinite queues. Previously you needed to disable throttling to get this behavior.
1 parent 631d662 commit 0d35bf6

6 files changed

Lines changed: 160 additions & 71 deletions

File tree

src/index.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,8 @@ let Rosnodejs = {
133133

134134
// create the ros node. Return a promise that will
135135
// resolve when connection to master is established
136-
rosNode = new RosNode(nodeName, rosMasterUri);
136+
const nodeOpts = options.node || {};
137+
rosNode = new RosNode(nodeName, rosMasterUri, nodeOpts);
137138

138139

139140
return this._loadOnTheFlyMessages(options)

src/lib/RosNode.js

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ const PublisherImpl = require('./impl/PublisherImpl.js');
2828
const SubscriberImpl = require('./impl/SubscriberImpl.js');
2929
let ServiceClient = require('./ServiceClient.js');
3030
let ServiceServer = require('./ServiceServer.js');
31-
const Spinner = require('../utils/GlobalSpinner.js');
31+
const GlobalSpinner = require('../utils/spinners/GlobalSpinner.js');
3232
let NetworkUtils = require('../utils/network_utils.js');
3333
let messageUtils = require('../utils/message_utils.js');
3434
let tcprosUtils = require('../utils/tcpros_utils.js');
@@ -78,7 +78,7 @@ class RosNode extends EventEmitter {
7878

7979
this._setupExitHandler();
8080

81-
this._spinner = new Spinner();
81+
this._setupSpinner(options.spinner);
8282
}
8383

8484
getLogger() {
@@ -582,6 +582,30 @@ class RosNode extends EventEmitter {
582582
this._log.error('Not implemented');
583583
}
584584

585+
/**
586+
* Initializes the spinner for this node.
587+
* @param [spinnerOpts] {object} either an instance of a spinner to use or the parameters to configure one
588+
* @param [spinnerOpts.type] {string} type of spinner to create
589+
*/
590+
_setupSpinner(spinnerOpts) {
591+
if (spinnerOpts) {
592+
const { type } = spinnerOpts;
593+
switch (type) {
594+
case 'Global':
595+
this._spinner = new GlobalSpinner(spinnerOpts);
596+
break;
597+
default:
598+
// if the above didn't work, assume they created their own spinner.
599+
// just use it.
600+
this._spinner = spinnerOpts;
601+
break;
602+
}
603+
}
604+
else {
605+
this._spinner = new GlobalSpinner();
606+
}
607+
}
608+
585609
// HAVEN'T TESTED YET
586610
_setupExitHandler() {
587611
// we need to catch that this process is about to exit so we can unregister all our

src/utils/spinners/ClientQueue.js

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
const EventEmitter = require('events')
2+
3+
/**
4+
* @class ClientQueue
5+
* Queue of messages to handle for an individual client (subscriber or publisher)
6+
*/
7+
class ClientQueue extends EventEmitter {
8+
constructor(client, queueSize, throttleMs) {
9+
super();
10+
11+
if (queueSize < 1) {
12+
queueSize = Number.POSITIVE_INFINITY;
13+
}
14+
15+
this._client = client;
16+
17+
this._queue = [];
18+
this._queueSize = queueSize;
19+
20+
this.throttleMs = throttleMs;
21+
this._handleTime = null;
22+
}
23+
24+
destroy() {
25+
this._queue = [];
26+
this._client = null;
27+
this._handleTime = null;
28+
}
29+
30+
push(item) {
31+
this._queue.push(item);
32+
if (this.length > this._queueSize) {
33+
this._queue.shift();
34+
}
35+
}
36+
37+
get length() {
38+
return this._queue.length;
39+
}
40+
41+
handleClientMessages(time) {
42+
if (this._handleTime === null || time - this._handleTime >= this.throttleMs) {
43+
this._handleTime = time;
44+
this._client._handleMsgQueue(this._queue);
45+
this._queue = [];
46+
return true;
47+
}
48+
// else
49+
return false;
50+
}
51+
}
52+
53+
module.exports = ClientQueue;
Lines changed: 29 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -2,53 +2,14 @@
22

33
const DEFAULT_SPIN_RATE_HZ = 200;
44
const events = require('events');
5-
const LoggingManager = require('../lib/Logging.js');
5+
const LoggingManager = require('../../lib/Logging.js');
66
const log = LoggingManager.getLogger('ros.spinner');
77

8+
const ClientQueue = require('./ClientQueue.js');
9+
810
const PING_OP = 'ping';
911
const DELETE_OP = 'delete';
10-
11-
/**
12-
* @class ClientQueue
13-
* Queue of messages to handle for an individual client (subscriber or publisher)
14-
*/
15-
class ClientQueue {
16-
constructor(client, queueSize, throttleMs) {
17-
if (queueSize < 1) {
18-
throw new Error(`Unable to create client message queue with size ${queueSize} - minimum is 1`);
19-
}
20-
21-
this._client = client;
22-
23-
this._queue = [];
24-
this._queueSize = queueSize;
25-
26-
this.throttleMs = throttleMs;
27-
this._handleTime = null;
28-
}
29-
30-
push(item) {
31-
this._queue.push(item);
32-
if (this.length > this._queueSize) {
33-
this._queue.shift();
34-
}
35-
}
36-
37-
get length() {
38-
return this._queue.length;
39-
}
40-
41-
handleClientMessages(time) {
42-
if (this._handleTime === null || time - this._handleTime >= this.throttleMs) {
43-
this._handleTime = time;
44-
this._client._handleMsgQueue(this._queue);
45-
this._queue = [];
46-
return true;
47-
}
48-
// else
49-
return false;
50-
}
51-
}
12+
const ADD_OP = 'add';
5213

5314
/**
5415
* @class GlobalSpinner
@@ -66,15 +27,16 @@ class ClientQueue {
6627
* ping and disconnect operations are replayed in order.
6728
*/
6829
class GlobalSpinner extends events {
69-
constructor(spinRate=DEFAULT_SPIN_RATE_HZ, emit=false) {
30+
constructor({spinRate=null, emit=false} = {}) {
7031
super();
7132

72-
if (typeof spinRate !== 'number') {
73-
spinRate = DEFAULT_SPIN_RATE_HZ;
33+
if (typeof spinRate === 'number') {
34+
this._spinTime = 1 / spinRate;
35+
}
36+
else {
37+
this._spinTime = 0;
7438
}
7539

76-
this._spinTime = 1 / spinRate;
77-
this._expectedSpinExpire = null;
7840
this._spinTimer = null;
7941

8042
this._clientCallQueue = [];
@@ -92,8 +54,21 @@ class GlobalSpinner extends events {
9254
this._emit = emit;
9355
}
9456

57+
clear() {
58+
clearTimeout(this._spinTimer);
59+
this._queueLocked = false;
60+
this._clientQueueMap.forEach((clientQueue) => {
61+
clientQueue.destroy();
62+
});
63+
this._clientQueueMap.clear();
64+
this._clientCallQueue = [];
65+
}
66+
9567
addClient(client, clientId, queueSize, throttleMs) {
96-
if (queueSize > 0) {
68+
if (this._queueLocked) {
69+
this._lockedOpCache.push({op: ADD_OP, client, clientId, queueSize, throttleMs});
70+
}
71+
else if (queueSize > 0) {
9772
this._clientQueueMap.set(clientId, new ClientQueue(client, queueSize, throttleMs));
9873
}
9974
}
@@ -147,28 +122,20 @@ class GlobalSpinner extends events {
147122
_handleLockedOpCache() {
148123
const len = this._lockedOpCache.length;
149124
for (let i = 0; i < len; ++i) {
150-
const {op, clientId, msg} = this._lockedOpCache[i];
125+
const {op, clientId, msg, client, queueSize, throttleMs} = this._lockedOpCache[i];
151126
if (op === PING_OP) {
152127
this.ping(clientId, msg);
153128
}
154129
else if (op === DELETE_OP) {
155130
this.disconnect(clientId);
156131
}
132+
else if (op === ADD_OP) {
133+
this.addClient(client, clientId, queueSize, throttleMs);
134+
}
157135
}
158136
this._lockedOpCache = [];
159137
}
160138

161-
_getClientsWithQueuedMessages() {
162-
const clients = {};
163-
this._clientQueueMap.forEach((value, clientId) => {
164-
const queueSize = value.length;
165-
clients[clientId] = queueSize;
166-
if (queueSize > 0 && this._clientCallQueue.indexOf(clientId) === -1) {
167-
throw new Error(`Client ${clientId} has ${value.length} queued messages but is not in call list!`);
168-
}
169-
});
170-
}
171-
172139
_setTimer() {
173140
if (this._spinTimer === null) {
174141
if (this._emit) {
@@ -180,7 +147,6 @@ class GlobalSpinner extends events {
180147
else {
181148
this._spinTimer = setTimeout(this._handleQueue.bind(this), this._spinTime);
182149
}
183-
this._expectedSpinExpire = Date.now() + this._spinTime;
184150
}
185151
}
186152

@@ -223,4 +189,4 @@ class GlobalSpinner extends events {
223189
}
224190
}
225191

226-
module.exports = GlobalSpinner;
192+
module.exports = GlobalSpinner;

test/SpinnerTest.js

Lines changed: 44 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
const chai = require('chai');
44
const expect = chai.expect;
5-
const Spinner = require('../src/utils/GlobalSpinner.js');
5+
const GlobalSpinner = require('../src/utils/spinners/GlobalSpinner.js');
66

77
let handleList = [];
88
let spinner;
@@ -19,10 +19,10 @@ class DummyClient {
1919
}
2020
}
2121

22-
describe('Spinner', () => {
22+
describe('GlobalSpinner', () => {
2323

2424
beforeEach(() => {
25-
spinner = new Spinner(null, true);
25+
spinner = new GlobalSpinner({emit: true});
2626
handleList = [];
2727
});
2828

@@ -122,8 +122,47 @@ describe('Spinner', () => {
122122
expect(spinner._lockedOpCache.length).to.equal(0);
123123
expect(spinner._clientCallQueue.length).to.equal(0);
124124
expect(spinner._clientQueueMap.has(client.id)).to.be.false;
125+
handleList = [];
125126

126-
done();
127+
spinner.addClient(client, client.id, 3, 0);
128+
spinner.ping(client.id, 'junk');
129+
130+
expect(spinner._lockedOpCache.length).to.equal(0);
131+
132+
// lock the queue so the next disconnect is cached
133+
spinner._queueLocked = true;
134+
135+
spinner.disconnect(client.id);
136+
spinner.addClient(client, client.id, 3, 0);
137+
spinner.ping(client.id, 'junk');
138+
spinner.ping(client.id, 'junk');
139+
140+
expect(spinner._lockedOpCache.length).to.equal(4);
141+
142+
spinner.once('tick', () => {
143+
// things that got in before we locked the spinner
144+
expect(handleList.length).to.equal(1);
145+
expect(handleList[0].queue.length).to.equal(1);
146+
expect(spinner._lockedOpCache.length).to.equal(0);
147+
expect(spinner._clientCallQueue.length).to.equal(1);
148+
expect(spinner._clientQueueMap.has(client.id)).to.be.true;
149+
handleList = [];
150+
151+
// things that got in after we locked the spinner
152+
spinner.once('tick', () => {
153+
expect(handleList.length).to.equal(1);
154+
expect(handleList[0].queue.length).to.equal(2);
155+
expect(spinner._lockedOpCache.length).to.equal(0);
156+
expect(spinner._clientCallQueue.length).to.equal(0);
157+
expect(spinner._clientQueueMap.has(client.id)).to.be.true;
158+
159+
spinner.disconnect(client.id);
160+
161+
expect(spinner._clientQueueMap.has(client.id)).to.be.false;
162+
163+
done();
164+
});
165+
});
127166
});
128167
});
129168
});
@@ -143,7 +182,7 @@ describe('Spinner', () => {
143182
spinner.on('tick', () => {
144183
if (spinner._clientCallQueue.length === 0) {
145184
const lastTick = Date.now();
146-
const tDiff = lastTick - firstTick;
185+
const tDiff = lastTick - firstTick + 1;
147186
expect(tDiff).to.be.at.least(throttleMs);
148187

149188
done();

test/xmlrpcTest.js

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ describe('Protocol Test', () => {
5151
nh._node._subscribers = {};
5252
nh._node._publishers = {};
5353

54+
nh._node._spinner.clear();
55+
5456
// remove any master api handlers we set up
5557
masterStub.removeAllListeners();
5658
});
@@ -307,6 +309,8 @@ describe('Protocol Test', () => {
307309
nh._node._subscribers = {};
308310
nh._node._publishers = {};
309311

312+
nh._node._spinner.clear();
313+
310314
// remove any master api handlers we set up
311315
masterStub.removeAllListeners();
312316
});
@@ -720,6 +724,8 @@ describe('Protocol Test', () => {
720724
nh._node._subscribers = {};
721725
nh._node._publishers = {};
722726

727+
nh._node._spinner.clear();
728+
723729
// remove any master api handlers we set up
724730
masterStub.removeAllListeners();
725731
});

0 commit comments

Comments
 (0)