Skip to content

Commit c7f9fa6

Browse files
authored
Add support for tcpNoDelay option on subscribers. Clean up (#107)
1 parent 99e6afd commit c7f9fa6

4 files changed

Lines changed: 67 additions & 135 deletions

File tree

src/lib/impl/PublisherImpl.js

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,7 @@ class PublisherImpl extends EventEmitter {
4343

4444
this._tcpNoDelay = !!options.tcpNoDelay;
4545

46-
47-
if (options.queueSize) {
46+
if (options.hasOwnProperty('queueSize')) {
4847
this._queueSize = options.queueSize;
4948
}
5049
else {
@@ -165,7 +164,7 @@ class PublisherImpl extends EventEmitter {
165164
}
166165

167166
/**
168-
* Check if this subscriber has been shutdown
167+
* Check if this publisher has been shutdown
169168
* @returns {boolean}
170169
*/
171170
isShutdown() {
@@ -242,18 +241,18 @@ class PublisherImpl extends EventEmitter {
242241
/**
243242
* Handles a new connection from a subscriber to this publisher's node.
244243
* Validates the connection header and sends a response header
245-
* @param subscriber {Socket} client that sent the header
244+
* @param socket {Socket} client that sent the header
246245
* @param header {Object} deserialized connection header.
247246
*/
248-
handleSubscriberConnection(subscriber, header) {
247+
handleSubscriberConnection(socket, header) {
249248
let error = TcprosUtils.validateSubHeader(
250249
header, this.getTopic(), this.getType(),
251250
this._messageHandler.md5sum());
252251

253252
if (error !== null) {
254253
this._log.error('Unable to validate subscriber connection header '
255254
+ JSON.stringify(header));
256-
subscriber.end(Serialize(error));
255+
socket.end(Serialize(error));
257256
return;
258257
}
259258
// else
@@ -267,42 +266,42 @@ class PublisherImpl extends EventEmitter {
267266
this.getType(),
268267
this.getLatching(),
269268
this._messageHandler.messageDefinition());
270-
subscriber.write(respHeader);
269+
socket.write(respHeader);
271270

272271
// if this publisher had the tcpNoDelay option set
273272
// disable the nagle algorithm
274-
if (this._tcpNoDelay || header.tcp_nodelay === 1) {
275-
subscriber.setNoDelay(true);
273+
if (this._tcpNoDelay || header.tcp_nodelay === '1') {
274+
socket.setNoDelay(true);
276275
}
277276

278-
subscriber.on('close', () => {
277+
socket.on('close', () => {
279278
this._log.info('Publisher client socket %s on topic %s disconnected',
280-
subscriber.name, this.getTopic());
281-
subscriber.removeAllListeners();
282-
delete this._subClients[subscriber.name];
279+
socket.name, this.getTopic());
280+
socket.removeAllListeners();
281+
delete this._subClients[socket.name];
283282
this.emit('disconnect');
284283
});
285284

286-
subscriber.on('end', () => {
285+
socket.on('end', () => {
287286
this._log.info('Publisher client socket %s on topic %s ended the connection',
288-
subscriber.name, this.getTopic());
287+
socket.name, this.getTopic());
289288
});
290289

291-
subscriber.on('error', (err) => {
290+
socket.on('error', (err) => {
292291
this._log.warn('Publisher client socket %s on topic %s had error: %s',
293-
subscriber.name, this.getTopic(), err);
292+
socket.name, this.getTopic(), err);
294293
});
295294

296295
// if we've cached a message from latching, send it now
297296
if (this._lastSentMsg !== null) {
298297
this._log.debug('Sending latched msg to new subscriber');
299-
subscriber.write(this._lastSentMsg);
298+
socket.write(this._lastSentMsg);
300299
}
301300

302301
// handshake was good - we'll start publishing to it
303-
this._subClients[subscriber.name] = subscriber;
302+
this._subClients[socket.name] = socket;
304303

305-
this.emit('connection', header, subscriber.name);
304+
this.emit('connection', header, socket.name);
306305
}
307306

308307
/**

src/lib/impl/SubscriberImpl.js

Lines changed: 31 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ class SubscriberImpl extends EventEmitter {
4848

4949
this._type = options.type;
5050

51-
if (options.queueSize) {
51+
if (options.hasOwnProperty('queueSize')) {
5252
this._queueSize = options.queueSize;
5353
}
5454
else {
@@ -67,6 +67,10 @@ class SubscriberImpl extends EventEmitter {
6767
this._throttleMs = 0;
6868
}
6969

70+
// tcpNoDelay will be set as a field in the connection header sent to the
71+
// relevant publisher - the publisher should then set tcpNoDelay on the socket
72+
this._tcpNoDelay = !!options.tcpNoDelay;
73+
7074
this._msgHandleTime = null;
7175

7276
this._nodeHandle = nodeHandle;
@@ -294,51 +298,51 @@ class SubscriberImpl extends EventEmitter {
294298
let port = info[2];
295299
let address = info[1];
296300

297-
let client = new Socket();
298-
client.name = address + ':' + port;
299-
client.nodeUri = nodeUri;
301+
let socket = new Socket();
302+
socket.name = address + ':' + port;
303+
socket.nodeUri = nodeUri;
300304

301-
client.on('end', () => {
305+
socket.on('end', () => {
302306
this._log.info('Subscriber client socket %s on topic %s ended the connection',
303-
client.name, this.getTopic());
307+
socket.name, this.getTopic());
304308
});
305309

306-
client.on('error', (err) => {
310+
socket.on('error', (err) => {
307311
this._log.warn('Subscriber client socket %s on topic %s had error: %s',
308-
client.name, this.getTopic(), err);
312+
socket.name, this.getTopic(), err);
309313
});
310314

311315
// hook into close event to clean things up
312-
client.on('close', () => {
316+
socket.on('close', () => {
313317
this._log.info('Subscriber client socket %s on topic %s disconnected',
314-
client.name, this.getTopic());
315-
this._disconnectClient(client.nodeUri);
318+
socket.name, this.getTopic());
319+
this._disconnectClient(socket.nodeUri);
316320
});
317321

318322
// open the socket at the provided address, port
319-
client.connect(port, address, () => {
323+
socket.connect(port, address, () => {
320324
if (this.isShutdown()) {
321-
client.end();
325+
socket.end();
322326
return;
323327
}
324328

325329
this._log.debug('Subscriber on ' + this.getTopic() + ' connected to publisher at ' + address + ':' + port);
326-
client.write(this._createTcprosHandshake());
330+
socket.write(this._createTcprosHandshake());
327331
});
328332

329333
// create a DeserializeStream to chunk out messages
330334
let deserializer = new DeserializeStream();
331-
client.$deserializer = deserializer;
332-
client.pipe(deserializer);
335+
socket.$deserializer = deserializer;
336+
socket.pipe(deserializer);
333337

334338
// cache client in "pending" map.
335339
// It's not validated yet so we don't want it to show up as a client.
336340
// Need to keep track of it in case we're shutdown before it can be validated.
337-
this._pendingPubClients[client.nodeUri] = client;
341+
this._pendingPubClients[socket.nodeUri] = socket;
338342

339343
// create a one-time handler for the connection header
340344
// if the connection is validated, we'll listen for more events
341-
deserializer.once('message', this._handleConnectionHeader.bind(this, client));
345+
deserializer.once('message', this._handleConnectionHeader.bind(this, socket));
342346
}
343347

344348
/**
@@ -347,18 +351,18 @@ class SubscriberImpl extends EventEmitter {
347351
*/
348352
_createTcprosHandshake() {
349353
return TcprosUtils.createSubHeader(this._nodeHandle.getNodeName(), this._messageHandler.md5sum(),
350-
this.getTopic(), this.getType(), this._messageHandler.messageDefinition());
354+
this.getTopic(), this.getType(), this._messageHandler.messageDefinition(), this._tcpNoDelay);
351355
}
352356

353357
/**
354358
* Handles the connection header from a publisher. If connection is validated,
355359
* we'll start handling messages from the client.
356-
* @param client {Socket} publisher client who sent the connection header
360+
* @param socket {Socket} publisher client who sent the connection header
357361
* @param msg {string} message received from the publisher
358362
*/
359-
_handleConnectionHeader(client, msg) {
363+
_handleConnectionHeader(socket, msg) {
360364
if (this.isShutdown()) {
361-
this._disconnectClient(client.nodeUri);
365+
this._disconnectClient(socket.nodeUri);
362366
return;
363367
}
364368

@@ -373,22 +377,21 @@ class SubscriberImpl extends EventEmitter {
373377
const error = TcprosUtils.validatePubHeader(header, this.getType(), this._messageHandler.md5sum());
374378
if (error) {
375379
this._log.error(`Unable to validate subscriber ${this.getTopic()} connection header ${JSON.stringify(header)}`);
376-
TcprosUtils.parsePubHeader(msg);
377-
client.end(Serialize(error));
380+
socket.end(Serialize(error));
378381
return;
379382
}
380383
// connection header was valid - we're good to go!
381384
this._log.debug('Subscriber ' + this.getTopic() + ' got connection header ' + JSON.stringify(header));
382385

383386
// cache client now that we've verified the connection header
384-
this._pubClients[client.nodeUri] = client;
387+
this._pubClients[socket.nodeUri] = socket;
385388
// remove client from pending map now that it's validated
386-
delete this._pendingPubClients[client.nodeUri];
389+
delete this._pendingPubClients[socket.nodeUri];
387390

388391
// pipe all future messages to _handleMessage
389-
client.$deserializer.on('message', this._handleMessage.bind(this));
392+
socket.$deserializer.on('message', this._handleMessage.bind(this));
390393

391-
this.emit('connection', header, client.name);
394+
this.emit('connection', header, socket.name);
392395
}
393396

394397
/**

src/utils/tcpros_utils.js

Lines changed: 16 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,11 @@ const md5Prefix = 'md5sum=';
2828
const topicPrefix = 'topic=';
2929
const servicePrefix = 'service=';
3030
const typePrefix = 'type=';
31-
const latchingPrefix = 'latching=';
32-
const persistentPrefix = 'persistent=';
3331
const errorPrefix = 'error=';
3432
const messageDefinitionPrefix = 'message_definition=';
33+
const latchingField = 'latching=1';
34+
const persistentField = 'persistent=1';
35+
const tcpNoDelayField = 'tcp_nodelay=1';
3536

3637
//-----------------------------------------------------------------------
3738

@@ -66,14 +67,19 @@ function deserializeStringFields(buffer) {
6667
*/
6768
let TcprosUtils = {
6869

69-
createSubHeader(callerId, md5sum, topic, type, messageDefinition) {
70+
createSubHeader(callerId, md5sum, topic, type, messageDefinition, tcpNoDelay) {
7071
const fields = [
7172
callerIdPrefix + callerId,
7273
md5Prefix + md5sum,
7374
topicPrefix + topic,
7475
typePrefix + type,
7576
messageDefinitionPrefix + messageDefinition
7677
];
78+
79+
if (tcpNoDelay) {
80+
fields.push(tcpNoDelayField);
81+
}
82+
7783
return serializeStringFields(fields);
7884
},
7985

@@ -92,9 +98,13 @@ let TcprosUtils = {
9298
callerIdPrefix + callerId,
9399
md5Prefix + md5sum,
94100
typePrefix + type,
95-
latchingPrefix + latching,
96101
messageDefinitionPrefix + messageDefinition
97102
];
103+
104+
if (latching) {
105+
fields.push(latchingField);
106+
}
107+
98108
return serializeStringFields(fields);
99109
},
100110

@@ -106,8 +116,9 @@ let TcprosUtils = {
106116
];
107117

108118
if (persistent) {
109-
fields.push(persistentPrefix + '1');
119+
fields.push(persistentField);
110120
}
121+
111122
return serializeStringFields(fields);
112123
},
113124

@@ -139,87 +150,6 @@ let TcprosUtils = {
139150
return info;
140151
},
141152

142-
parseSubHeader(header) {
143-
let info = {};
144-
const fields = deserializeStringFields(header);
145-
fields.forEach((field) => {
146-
147-
if (field.startsWith(md5Prefix)) {
148-
info.md5sum = field.substr(md5Prefix.length);
149-
}
150-
else if (field.startsWith(topicPrefix)) {
151-
info.topic = field.substr(topicPrefix.length);
152-
}
153-
else if (field.startsWith(callerIdPrefix)) {
154-
info.callerId = field.substr(callerIdPrefix.length);
155-
}
156-
else if (field.startsWith(typePrefix)) {
157-
info.type = field.substr(typePrefix.length);
158-
}
159-
});
160-
return info;
161-
},
162-
163-
parsePubHeader(header) {
164-
let info = {};
165-
const fields = deserializeStringFields(header);
166-
fields.forEach((field) => {
167-
168-
if (field.startsWith(md5Prefix)) {
169-
info.md5sum = field.substr(md5Prefix.length);
170-
}
171-
else if (field.startsWith(latchingPrefix)) {
172-
info.latching = field.substr(latchingPrefix.length);
173-
}
174-
else if (field.startsWith(callerIdPrefix)) {
175-
info.callerId = field.substr(callerIdPrefix.length);
176-
}
177-
else if (field.startsWith(typePrefix)) {
178-
info.type = field.substr(typePrefix.length);
179-
}
180-
});
181-
return info;
182-
},
183-
184-
parseServiceClientHeader(header) {
185-
let info = {};
186-
const fields = deserializeStringFields(header);
187-
fields.forEach((field) => {
188-
189-
if (field.startsWith(md5Prefix)) {
190-
info.md5sum = field.substr(md5Prefix.length);
191-
}
192-
else if (field.startsWith(servicePrefix)) {
193-
info.service = field.substr(servicePrefix.length);
194-
}
195-
else if (field.startsWith(callerIdPrefix)) {
196-
info.callerId = field.substr(callerIdPrefix.length);
197-
}
198-
else if (field.startsWith(typePrefix)) {
199-
info.type = field.substr(typePrefix.length);
200-
}
201-
});
202-
return info;
203-
},
204-
205-
parseServiceServerHeader(header) {
206-
let info = {};
207-
const fields = deserializeStringFields(header);
208-
fields.forEach((field) => {
209-
210-
if (field.startsWith(md5Prefix)) {
211-
info.md5sum = field.substr(md5Prefix.length);
212-
}
213-
else if (field.startsWith(callerIdPrefix)) {
214-
info.callerId = field.substr(callerIdPrefix.length);
215-
}
216-
else if (field.startsWith(typePrefix)) {
217-
info.type = field.substr(typePrefix.length);
218-
}
219-
});
220-
return info;
221-
},
222-
223153
validateSubHeader(header, topic, type, md5sum) {
224154
if (!header.hasOwnProperty('topic')) {
225155
return this.serializeString('Connection header missing expected field [topic]');

test/onTheFly.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ const MASTER_PORT = 11234;
1111
describe('OnTheFly', function () {
1212
let master;
1313

14-
before(() => {
14+
before(function() {
1515
this.timeout(0);
1616

1717
master = new Master('localhost', MASTER_PORT);

0 commit comments

Comments
 (0)