Skip to content

Commit e318a1c

Browse files
committed
Add basic support for multiple pub, subs
-Previously, multiple calls to advertise/subscribe would return the same publisher/subscriber so shutting down 1 would shutdown all of them. -Also starts to add support for max number of attempts during xmlrpc calls.
1 parent 93b1ccc commit e318a1c

9 files changed

Lines changed: 725 additions & 500 deletions

File tree

src/lib/MasterApiClient.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ class MasterApiClient {
3636
return this._xmlrpcClient;
3737
}
3838

39-
_call(method, data, resolve, reject) {
40-
this._xmlrpcClient.call(method, data, resolve, reject);
39+
_call(method, data, resolve, reject, options) {
40+
this._xmlrpcClient.call(method, data, resolve, reject, options);
4141
}
4242

4343
registerService(callerId, service, serviceUri, uri) {

src/lib/Publisher.js

Lines changed: 43 additions & 211 deletions
Original file line numberDiff line numberDiff line change
@@ -17,106 +17,72 @@
1717

1818
"use strict";
1919

20-
const SerializationUtils = require('../utils/serialization_utils.js');
21-
const Serialize = SerializationUtils.Serialize;
22-
const TcprosUtils = require('../utils/tcpros_utils.js');
2320
const EventEmitter = require('events');
24-
const Logging = require('./Logging.js');
25-
const {REGISTERING, REGISTERED, SHUTDOWN} = require('../utils/ClientStates.js');
21+
const {rebroadcast} = require('../utils/event_utils.js');
2622

2723
class Publisher extends EventEmitter {
28-
constructor(options, nodeHandle) {
24+
constructor(impl) {
2925
super();
30-
this._topic = options.topic;
3126

32-
this._type = options.type;
27+
++impl.count;
28+
this._impl = impl;
3329

34-
this._latching = !!options.latching;
35-
36-
this._tcpNoDelay = !!options.tcpNoDelay;
37-
38-
39-
if (options.queueSize) {
40-
this._queueSize = options.queueSize;
41-
}
42-
else {
43-
this._queueSize = 1;
44-
}
45-
46-
/**
47-
* throttleMs interacts with queueSize to determine when to send
48-
* messages.
49-
* < 0 : send immediately - no interaction with queue
50-
* >= 0 : place event at end of event queue to publish message
51-
after minimum delay (MS)
52-
*/
53-
if (options.hasOwnProperty('throttleMs')) {
54-
this._throttleMs = options.throttleMs;
55-
}
56-
else {
57-
this._throttleMs = 0;
58-
}
59-
60-
// OPTIONS STILL NOT HANDLED:
61-
// headers: extra headers to include
62-
// subscriber_listener: callback for new subscribers connect/disconnect
63-
64-
this._resolve = !!options.resolve;
65-
66-
this._lastSentMsg = null;
67-
68-
this._nodeHandle = nodeHandle;
69-
this._nodeHandle.getSpinner().addClient(this, this._getSpinnerId(), this._queueSize, this._throttleMs);
70-
71-
this._log = Logging.getLogger('ros.rosnodejs');
72-
73-
this._subClients = {};
74-
75-
this._messageHandler = options.typeClass;
76-
77-
this._state = REGISTERING;
78-
this._register();
79-
}
80-
81-
_getSpinnerId() {
82-
return `Publisher://${this.getTopic()}`;
30+
rebroadcast('registered', this._impl, this);
31+
rebroadcast('connection', this._impl, this);
32+
rebroadcast('disconnect', this._impl, this);
33+
rebroadcast('error', this._impl, this);
8334
}
8435

8536
getTopic() {
86-
return this._topic;
37+
if (this._impl) {
38+
return this._impl.getTopic();
39+
}
40+
// else
41+
return null;
8742
}
8843

8944
getType() {
90-
return this._type;
45+
if (this._impl) {
46+
return this._impl.getType();
47+
}
48+
// else
49+
return null;
9150
}
9251

9352
getLatching() {
94-
return this._latching;
53+
if (this._impl) {
54+
return this._impl.getLatching();
55+
}
56+
// else
57+
return false;
9558
}
9659

9760
getNumSubscribers() {
98-
return Object.keys(this._subClients).length;
61+
if (this._impl) {
62+
return this._impl.getNumSubscribers();
63+
}
64+
// else
65+
return 0;
9966
}
10067

10168
shutdown() {
102-
this._nodeHandle.unadvertise(this.getTopic());
69+
const topic= this.getTopic();
70+
if (this._impl) {
71+
const impl = this._impl
72+
this._impl = null;
73+
this.removeAllListeners();
74+
75+
--impl.count;
76+
if (impl.count <= 0) {
77+
return impl.shutdown();
78+
}
79+
}
80+
// else
81+
return Promise.resolve();
10382
}
10483

10584
isShutdown() {
106-
return this._state === SHUTDOWN;
107-
}
108-
109-
disconnect() {
110-
this._state = SHUTDOWN;
111-
112-
Object.keys(this._subClients).forEach((clientId) => {
113-
const client = this._subClients[clientId];
114-
client.end();
115-
});
116-
117-
// disconnect from the spinner in case we have any pending callbacks
118-
this._nodeHandle.getSpinner().disconnect(this._getSpinnerId());
119-
this._subClients = {};
85+
return !!this._impl;
12086
}
12187

12288
/**
@@ -126,141 +92,7 @@ class Publisher extends EventEmitter {
12692
* @param [throttleMs] {number} optional override for publisher setting
12793
*/
12894
publish(msg, throttleMs) {
129-
if (this.isShutdown()) {
130-
return;
131-
}
132-
133-
if (typeof throttleMs !== 'number') {
134-
throttleMs = this._throttleMs;
135-
}
136-
137-
if (throttleMs < 0) {
138-
// short circuit JS event queue, publish "synchronously"
139-
this._handleMsgQueue([msg]);
140-
}
141-
else {
142-
this._nodeHandle.getSpinner().ping(this._getSpinnerId(), msg);
143-
}
144-
}
145-
146-
/**
147-
* Pulls all msgs off queue, serializes, and publishes them
148-
*/
149-
_handleMsgQueue(msgQueue) {
150-
151-
// There's a small chance that we were shutdown while the spinner was locked
152-
// which could cause _handleMsgQueue to be called if this publisher was in there.
153-
if (this.isShutdown()) {
154-
return;
155-
}
156-
157-
const numClients = this.getNumSubscribers();
158-
if (numClients === 0) {
159-
this._log.debugThrottle(2000, `Publishing message on ${this.getTopic()} with no subscribers`);
160-
}
161-
162-
try {
163-
msgQueue.forEach((msg) => {
164-
if (this._resolve) {
165-
msg = this._messageHandler.Resolve(msg);
166-
}
167-
168-
const serializedMsg = TcprosUtils.serializeMessage(this._messageHandler, msg);
169-
170-
Object.keys(this._subClients).forEach((client) => {
171-
this._subClients[client].write(serializedMsg);
172-
});
173-
174-
// if this publisher is supposed to latch,
175-
// save the last message. Any subscribers that connect
176-
// before another call to publish() will receive this message
177-
if (this.getLatching()) {
178-
this._lastSentMsg = serializedMsg;
179-
}
180-
});
181-
}
182-
catch (err) {
183-
this._log.error('Error when publishing message on topic %s: %s', this.getTopic(), err.stack);
184-
this.emit('error', err);
185-
}
186-
}
187-
188-
handleSubscriberConnection(subscriber, header) {
189-
let error = TcprosUtils.validateSubHeader(
190-
header, this.getTopic(), this.getType(),
191-
this._messageHandler.md5sum());
192-
if (error !== null) {
193-
this._log.error('Unable to validate subscriber connection header '
194-
+ JSON.stringify(header));
195-
subscriber.end(Serialize(error));
196-
return;
197-
}
198-
// else
199-
this._log.info('Pub %s got connection header %s', this.getTopic(), JSON.stringify(header));
200-
201-
// create and send response
202-
let respHeader =
203-
TcprosUtils.createPubHeader(
204-
this._nodeHandle.getNodeName(),
205-
this._messageHandler.md5sum(),
206-
this.getType(),
207-
this.getLatching(),
208-
this._messageHandler.messageDefinition());
209-
subscriber.write(respHeader);
210-
211-
// if this publisher had the tcpNoDelay option set
212-
// disable the nagle algorithm
213-
if (this._tcpNoDelay) {
214-
subscriber.setNoDelay(true);
215-
}
216-
217-
subscriber.on('close', () => {
218-
this._log.info('Publisher %s client %s disconnected!',
219-
this.getTopic(), subscriber.name);
220-
delete this._subClients[subscriber.name];
221-
this.emit('disconnect');
222-
});
223-
224-
subscriber.on('end', () => {
225-
this._log.info('Sub %s sent END', subscriber.name);
226-
});
227-
228-
subscriber.on('error', () => {
229-
this._log.warn('Sub %s had error', subscriber.name);
230-
});
231-
232-
if (this._lastSentMsg !== null) {
233-
this._log.debug('Sending latched msg to new subscriber');
234-
subscriber.write(this._lastSentMsg);
235-
}
236-
237-
// if handshake good, add to list, we'll start publishing to it
238-
this._subClients[subscriber.name] = subscriber;
239-
240-
this.emit('connection', header, subscriber.name);
241-
}
242-
243-
_register() {
244-
this._nodeHandle.registerPublisher(this._topic, this._type)
245-
.then((resp) => {
246-
// if we were shutdown between the starting the registration and now, bail
247-
if (this.isShutdown()) {
248-
return;
249-
}
250-
251-
this._log.info('Registered %s as a publisher: %j', this._topic, resp);
252-
let code = resp[0];
253-
let msg = resp[1];
254-
let subs = resp[2];
255-
if (code === 1) {
256-
// registration worked
257-
this._state = REGISTERED;
258-
this.emit('registered');
259-
}
260-
})
261-
.catch((err) => {
262-
this._log.error('Error while registering publisher %s: %s', this.getTopic(), err);
263-
})
95+
this._impl.publish(msg, throttleMs);
26496
}
26597
}
26698

src/lib/RosNode.js

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ let SlaveApiClient = require('./SlaveApiClient.js');
2424
let ParamServerApiClient = require('./ParamServerApiClient.js');
2525
let Subscriber = require('./Subscriber.js');
2626
let Publisher = require('./Publisher.js');
27+
const PublisherImpl = require('./impl/PublisherImpl.js');
28+
const SubscriberImpl = require('./impl/SubscriberImpl.js');
2729
let ServiceClient = require('./ServiceClient.js');
2830
let ServiceServer = require('./ServiceServer.js');
2931
const Spinner = require('../utils/GlobalSpinner.js');
@@ -93,22 +95,24 @@ class RosNode extends EventEmitter {
9395

9496
advertise(options) {
9597
let topic = options.topic;
96-
let pub = this._publishers[topic];
97-
if (!pub) {
98-
pub = new Publisher(options, this);
99-
this._publishers[topic] = pub;
98+
let pubImpl = this._publishers[topic];
99+
if (!pubImpl) {
100+
pubImpl = new PublisherImpl(options, this);
101+
this._publishers[topic] = pubImpl;
100102
}
101-
return pub;
103+
104+
return new Publisher(pubImpl);
102105
}
103106

104107
subscribe(options, callback) {
105108
let topic = options.topic;
106-
let sub = this._subscribers[topic];
107-
if (!sub) {
108-
sub = new Subscriber(options, this);
109-
this._subscribers[topic] = sub;
109+
let subImpl = this._subscribers[topic];
110+
if (!subImpl) {
111+
subImpl = new SubscriberImpl(options, this);
112+
this._subscribers[topic] = subImpl;
110113
}
111114

115+
const sub = new Subscriber(subImpl);
112116
if (callback && typeof callback === 'function') {
113117
sub.on('message', callback);
114118
}
@@ -134,8 +138,8 @@ class RosNode extends EventEmitter {
134138
const sub = this._subscribers[topic];
135139
if (sub) {
136140
this._debugLog.info('Unsubscribing from topic %s', topic);
137-
sub.disconnect();
138141
delete this._subscribers[topic];
142+
sub._shutdown();
139143
return this.unregisterSubscriber(topic);
140144
}
141145
}
@@ -144,8 +148,8 @@ class RosNode extends EventEmitter {
144148
const pub = this._publishers[topic];
145149
if (pub) {
146150
this._debugLog.info('Unadvertising topic %s', topic);
147-
pub.disconnect();
148151
delete this._publishers[topic];
152+
pub._shutdown();
149153
return this.unregisterPublisher(topic);
150154
}
151155
}
@@ -160,6 +164,18 @@ class RosNode extends EventEmitter {
160164
}
161165
}
162166

167+
hasSubscriber(topic) {
168+
return this._subscribers.hasOwnProperty(topic);
169+
}
170+
171+
hasPublisher(topic) {
172+
return this._publisheres.hasOwnProperty(topic);
173+
}
174+
175+
hasService(service) {
176+
return this._services.hasOwnProperty(service);
177+
}
178+
163179
getNodeName() {
164180
return this._nodeName;
165181
}

0 commit comments

Comments
 (0)