Skip to content

Commit 7b52997

Browse files
committed
Fix stale listener bug
-Fixes an issue with stale listeners on pub, sub impls. -Adds tests to check for stale listeners.
1 parent 13e337f commit 7b52997

4 files changed

Lines changed: 30 additions & 10 deletions

File tree

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
"bunyan": "1.8.1",
3535
"md5": "2.1.0",
3636
"moment": "2.12.0",
37+
"ultron": "1.1.0",
3738
"walker": "1.0.7",
3839
"xmlrpc": "chfritz/node-xmlrpc"
3940
}

src/lib/Publisher.js

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
"use strict";
1919

2020
const EventEmitter = require('events');
21+
const Ultron = require('ultron');
2122
const {rebroadcast} = require('../utils/event_utils.js');
2223

2324
/**
@@ -31,14 +32,15 @@ class Publisher extends EventEmitter {
3132

3233
++impl.count;
3334
this._impl = impl;
35+
this._ultron = new Ultron(impl);
3436

3537
this._topic = impl.getTopic();
3638
this._type = impl.getType();
3739

38-
rebroadcast('registered', this._impl, this);
39-
rebroadcast('connection', this._impl, this);
40-
rebroadcast('disconnect', this._impl, this);
41-
rebroadcast('error', this._impl, this);
40+
rebroadcast('registered', this._ultron, this);
41+
rebroadcast('connection', this._ultron, this);
42+
rebroadcast('disconnect', this._ultron, this);
43+
rebroadcast('error', this._ultron, this);
4244
}
4345

4446
/**
@@ -92,6 +94,8 @@ class Publisher extends EventEmitter {
9294
if (this._impl) {
9395
const impl = this._impl
9496
this._impl = null;
97+
this._ultron.destroy();
98+
this._ultron = null;
9599

96100
--impl.count;
97101
if (impl.count <= 0) {

src/lib/Subscriber.js

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
"use strict";
1919

2020
const EventEmitter = require('events');
21+
const Ultron = require('ultron');
2122
const {rebroadcast} = require('../utils/event_utils.js');
2223

2324
//-----------------------------------------------------------------------
@@ -33,15 +34,16 @@ class Subscriber extends EventEmitter {
3334

3435
++impl.count;
3536
this._impl = impl;
37+
this._ultron = new Ultron(impl);
3638

3739
this._topic = impl.getTopic();
3840
this._type = impl.getType();
3941

40-
rebroadcast('registered', this._impl, this);
41-
rebroadcast('connection', this._impl, this);
42-
rebroadcast('disconnect', this._impl, this);
43-
rebroadcast('error', this._impl, this);
44-
rebroadcast('message', this._impl, this);
42+
rebroadcast('registered', this._ultron, this);
43+
rebroadcast('connection', this._ultron, this);
44+
rebroadcast('disconnect', this._ultron, this);
45+
rebroadcast('error', this._ultron, this);
46+
rebroadcast('message', this._ultron, this);
4547
}
4648

4749
/**
@@ -82,6 +84,8 @@ class Subscriber extends EventEmitter {
8284
if (this._impl) {
8385
const impl = this._impl
8486
this._impl = null;
87+
this._ultron.destroy();
88+
this._ultron = null;
8589

8690
--impl.count;
8791
if (impl.count <= 0) {

test/xmlrpcTest.js

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -579,6 +579,8 @@ describe('Protocol Test', () => {
579579
const pub2 = nh.advertise(topic, msgType, {latching: true});
580580

581581
expect(pub1).to.not.equal(pub2);
582+
expect(pub1._impl.listenerCount('connection')).to.equal(2);
583+
expect(pub2._impl.listenerCount('connection')).to.equal(2);
582584

583585
pub1.publish({data: 1});
584586

@@ -592,6 +594,9 @@ describe('Protocol Test', () => {
592594

593595
pub1.shutdown()
594596
.then(() => {
597+
expect(pub1._impl).to.equal(null);
598+
expect(pub2._impl.listenerCount('connection')).to.equal(1);
599+
595600
expect(sub.getNumPublishers()).to.equal(1);
596601

597602
pub2.publish({data: 3});
@@ -602,6 +607,7 @@ describe('Protocol Test', () => {
602607
pub2.shutdown()
603608
.then(() => {
604609
expect(sub.getNumPublishers()).to.equal(0);
610+
expect(pub2._impl).to.equal(null);
605611
done();
606612
});
607613
});
@@ -624,7 +630,9 @@ describe('Protocol Test', () => {
624630
msg2 = msg.data;
625631
});
626632

627-
expect(sub1).to.not.equal(msg2);
633+
expect(sub1).to.not.equal(sub2);
634+
expect(sub1._impl.listenerCount('connection')).to.equal(2);
635+
expect(sub2._impl.listenerCount('connection')).to.equal(2);
628636

629637
const pub = nh.advertise(topic, msgType, {latching: true});
630638

@@ -643,6 +651,8 @@ describe('Protocol Test', () => {
643651

644652
sub1.shutdown()
645653
.then(() => {
654+
expect(sub1._impl).to.equal(null);
655+
expect(sub2._impl.listenerCount('connection')).to.equal(1);
646656
pub.publish({data: 30});
647657

648658
sub2.once('message', () => {
@@ -652,6 +662,7 @@ describe('Protocol Test', () => {
652662

653663
sub2.shutdown()
654664
.then(() => {
665+
expect(sub2._impl).to.equal(null);
655666
expect(pub.getNumSubscribers()).to.equal(0);
656667
done();
657668
});

0 commit comments

Comments
 (0)