Skip to content

Commit 13e337f

Browse files
committed
Shutdown logging cleanup on shutdown. Add tests for multiple pub/subs. Don't advertise the same service twice per node.
1 parent 66040f6 commit 13e337f

6 files changed

Lines changed: 115 additions & 7 deletions

File tree

src/lib/Logging.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,10 @@ class LoggingManager {
239239
this._forEachLogger((logger) => logger.clearExpiredThrottledLogs(), true);
240240
}
241241

242+
stopLogCleanup() {
243+
clearInterval(this._cleanLoggersInterval);
244+
}
245+
242246
_handleGetLoggers(req, resp) {
243247
if (this._externalLog.getLoggers !== null) {
244248
this._externalLog.getLoggers(req, resp);

src/lib/RosNode.js

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -123,10 +123,13 @@ class RosNode extends EventEmitter {
123123
advertiseService(options, callback) {
124124
let service = options.service;
125125
let serv = this._services[service];
126-
if (!serv) {
127-
serv = new ServiceServer(options, callback, this);
128-
this._services[service] = serv;
126+
if (serv) {
127+
this._log.warn('Tried to advertise a service that is already advertised in this node [%s]', service);
128+
return;
129129
}
130+
// else
131+
serv = new ServiceServer(options, callback, this);
132+
this._services[service] = serv;
130133
return serv;
131134
}
132135

@@ -635,6 +638,8 @@ class RosNode extends EventEmitter {
635638
promises.push(shutdownServers());
636639
promises.push(clearXmlrpcQueues());
637640

641+
Logging.stopLogCleanup();
642+
638643
process.removeListener('exit', exitHandler);
639644
process.removeListener('SIGINT', sigIntHandler);
640645

src/lib/ServiceServer.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,10 @@ class ServiceServer extends EventEmitter {
108108
}
109109

110110
handleClientConnection(client, header) {
111+
if (this.isShutdown()) {
112+
return;
113+
}
114+
// else
111115
// TODO: verify header data
112116
this._log.debug('Service %s handling new client connection ', this.getService());
113117

src/lib/impl/PublisherImpl.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ class PublisherImpl extends EventEmitter {
268268

269269
// if this publisher had the tcpNoDelay option set
270270
// disable the nagle algorithm
271-
if (this._tcpNoDelay) {
271+
if (this._tcpNoDelay || header.tcp_nodelay === 1) {
272272
subscriber.setNoDelay(true);
273273
}
274274

src/utils/tcpros_utils.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ let TcprosUtils = {
143143
let info = {};
144144
const fields = deserializeStringFields(header);
145145
fields.forEach((field) => {
146-
146+
147147
if (field.startsWith(md5Prefix)) {
148148
info.md5sum = field.substr(md5Prefix.length);
149149
}
@@ -164,7 +164,7 @@ let TcprosUtils = {
164164
let info = {};
165165
const fields = deserializeStringFields(header);
166166
fields.forEach((field) => {
167-
167+
168168
if (field.startsWith(md5Prefix)) {
169169
info.md5sum = field.substr(md5Prefix.length);
170170
}
@@ -206,7 +206,7 @@ let TcprosUtils = {
206206
let info = {};
207207
const fields = deserializeStringFields(header);
208208
fields.forEach((field) => {
209-
209+
210210
if (field.startsWith(md5Prefix)) {
211211
info.md5sum = field.substr(md5Prefix.length);
212212
}

test/xmlrpcTest.js

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -565,6 +565,101 @@ describe('Protocol Test', () => {
565565
// if we haven't received a message by now we should be good
566566
setTimeout(done, 500);
567567
});
568+
569+
it('2 Publishers on Same Topic', function(done) {
570+
this.slow(2000);
571+
const nh = rosnodejs.nh;
572+
573+
let msg1;
574+
const sub = nh.subscribe(topic, msgType, (msg) => {
575+
msg1 = msg.data;
576+
});
577+
578+
const pub1 = nh.advertise(topic, msgType, {latching: true});
579+
const pub2 = nh.advertise(topic, msgType, {latching: true});
580+
581+
expect(pub1).to.not.equal(pub2);
582+
583+
pub1.publish({data: 1});
584+
585+
sub.once('message', ({data}) => {
586+
expect(sub.getNumPublishers()).to.equal(1);
587+
expect(data).to.equal(1);
588+
589+
pub2.publish({data: 2});
590+
sub.once('message', ({data}) => {
591+
expect(data).to.equal(2);
592+
593+
pub1.shutdown()
594+
.then(() => {
595+
expect(sub.getNumPublishers()).to.equal(1);
596+
597+
pub2.publish({data: 3});
598+
599+
sub.once('message', ({data}) => {
600+
expect(data).to.equal(3);
601+
602+
pub2.shutdown()
603+
.then(() => {
604+
expect(sub.getNumPublishers()).to.equal(0);
605+
done();
606+
});
607+
});
608+
})
609+
});
610+
})
611+
});
612+
613+
it('2 Subscribers on Same Topic', function(done) {
614+
this.slow(2000);
615+
const nh = rosnodejs.nh;
616+
617+
let msg1;
618+
const sub1 = nh.subscribe(topic, msgType, (msg) => {
619+
msg1 = msg.data;
620+
});
621+
622+
let msg2;
623+
const sub2 = nh.subscribe(topic, msgType, (msg) => {
624+
msg2 = msg.data;
625+
});
626+
627+
expect(sub1).to.not.equal(msg2);
628+
629+
const pub = nh.advertise(topic, msgType, {latching: true});
630+
631+
pub.publish({data: 1});
632+
633+
sub2.once('message', () => {
634+
expect(pub.getNumSubscribers()).to.equal(1);
635+
636+
expect(msg1).to.equal(msg2);
637+
pub.publish({data: 25});
638+
639+
sub2.once('message', () => {
640+
expect(msg1).to.equal(msg2);
641+
msg1 = null;
642+
msg2 = null;
643+
644+
sub1.shutdown()
645+
.then(() => {
646+
pub.publish({data: 30});
647+
648+
sub2.once('message', () => {
649+
expect(msg1).to.equal(null);
650+
expect(msg2).to.equal(30);
651+
expect(pub.getNumSubscribers()).to.equal(1);
652+
653+
sub2.shutdown()
654+
.then(() => {
655+
expect(pub.getNumSubscribers()).to.equal(0);
656+
done();
657+
});
658+
});
659+
});
660+
});
661+
});
662+
});
568663
});
569664

570665
describe('Service', () => {

0 commit comments

Comments
 (0)