Skip to content

Commit 5a09fe4

Browse files
Merge pull request #51 from chris-smith/multiplePubSubs
Multiple pub subs
2 parents a93e311 + 010725a commit 5a09fe4

13 files changed

Lines changed: 1077 additions & 497 deletions

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
"bunyan": "1.8.1",
3535
"md5": "2.1.0",
3636
"moment": "2.12.0",
37+
"ultron": "1.1.0",
3738
"walker": "1.0.7",
3839
"xmlrpc": "chfritz/node-xmlrpc"
3940
}

src/lib/Logging.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,10 @@ class LoggingManager {
239239
this._forEachLogger((logger) => logger.clearExpiredThrottledLogs(), true);
240240
}
241241

242+
stopLogCleanup() {
243+
clearInterval(this._cleanLoggersInterval);
244+
}
245+
242246
_handleGetLoggers(req, resp) {
243247
if (this._externalLog.getLoggers !== null) {
244248
this._externalLog.getLoggers(req, resp);

src/lib/MasterApiClient.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ class MasterApiClient {
3636
return this._xmlrpcClient;
3737
}
3838

39-
_call(method, data, resolve, reject) {
40-
this._xmlrpcClient.call(method, data, resolve, reject);
39+
_call(method, data, resolve, reject, options) {
40+
this._xmlrpcClient.call(method, data, resolve, reject, options);
4141
}
4242

4343
registerService(callerId, service, serviceUri, uri) {

src/lib/Publisher.js

Lines changed: 71 additions & 208 deletions
Original file line numberDiff line numberDiff line change
@@ -17,106 +17,103 @@
1717

1818
"use strict";
1919

20-
const SerializationUtils = require('../utils/serialization_utils.js');
21-
const Serialize = SerializationUtils.Serialize;
22-
const TcprosUtils = require('../utils/tcpros_utils.js');
2320
const EventEmitter = require('events');
24-
const Logging = require('./Logging.js');
25-
const {REGISTERING, REGISTERED, SHUTDOWN} = require('../utils/ClientStates.js');
21+
const Ultron = require('ultron');
22+
const {rebroadcast} = require('../utils/event_utils.js');
2623

24+
/**
25+
* @class Publisher
26+
* Public facing publishers class. Allows users to send messages to subscribers
27+
* on a given topic.
28+
*/
2729
class Publisher extends EventEmitter {
28-
constructor(options, nodeHandle) {
30+
constructor(impl) {
2931
super();
30-
this._topic = options.topic;
31-
32-
this._type = options.type;
33-
34-
this._latching = !!options.latching;
35-
36-
this._tcpNoDelay = !!options.tcpNoDelay;
37-
38-
39-
if (options.queueSize) {
40-
this._queueSize = options.queueSize;
41-
}
42-
else {
43-
this._queueSize = 1;
44-
}
45-
46-
/**
47-
* throttleMs interacts with queueSize to determine when to send
48-
* messages.
49-
* < 0 : send immediately - no interaction with queue
50-
* >= 0 : place event at end of event queue to publish message
51-
after minimum delay (MS)
52-
*/
53-
if (options.hasOwnProperty('throttleMs')) {
54-
this._throttleMs = options.throttleMs;
55-
}
56-
else {
57-
this._throttleMs = 0;
58-
}
5932

60-
// OPTIONS STILL NOT HANDLED:
61-
// headers: extra headers to include
62-
// subscriber_listener: callback for new subscribers connect/disconnect
33+
++impl.count;
34+
this._impl = impl;
35+
this._ultron = new Ultron(impl);
6336

64-
this._resolve = !!options.resolve;
37+
this._topic = impl.getTopic();
38+
this._type = impl.getType();
6539

66-
this._lastSentMsg = null;
67-
68-
this._nodeHandle = nodeHandle;
69-
this._nodeHandle.getSpinner().addClient(this, this._getSpinnerId(), this._queueSize, this._throttleMs);
70-
71-
this._log = Logging.getLogger('ros.rosnodejs');
72-
73-
this._subClients = {};
74-
75-
this._messageHandler = options.typeClass;
76-
77-
this._state = REGISTERING;
78-
this._register();
79-
}
80-
81-
_getSpinnerId() {
82-
return `Publisher://${this.getTopic()}`;
40+
rebroadcast('registered', this._ultron, this);
41+
rebroadcast('connection', this._ultron, this);
42+
rebroadcast('disconnect', this._ultron, this);
43+
rebroadcast('error', this._ultron, this);
8344
}
8445

46+
/**
47+
* Get the topic this publisher is publishing on
48+
* @returns {string}
49+
*/
8550
getTopic() {
8651
return this._topic;
8752
}
8853

54+
/**
55+
* Get the type of message this publisher is sending
56+
* (e.g. std_msgs/String)
57+
* @returns {string}
58+
*/
8959
getType() {
9060
return this._type;
9161
}
9262

63+
/**
64+
* Check if this publisher is latching
65+
* @returns {boolean}
66+
*/
9367
getLatching() {
94-
return this._latching;
68+
if (this._impl) {
69+
return this._impl.getLatching();
70+
}
71+
// else
72+
return false;
9573
}
9674

75+
/**
76+
* Get the numbber of subscribers currently connected to this publisher
77+
* @returns {number}
78+
*/
9779
getNumSubscribers() {
98-
return Object.keys(this._subClients).length;
80+
if (this._impl) {
81+
return this._impl.getNumSubscribers();
82+
}
83+
// else
84+
return 0;
9985
}
10086

87+
/**
88+
* Shuts down this publisher. If this is the last publisher on this topic
89+
* for this node, closes the publisher and unregisters the topic from Master
90+
* @returns {Promise}
91+
*/
10192
shutdown() {
102-
this._nodeHandle.unadvertise(this.getTopic());
103-
}
93+
const topic= this.getTopic();
94+
if (this._impl) {
95+
const impl = this._impl
96+
this._impl = null;
97+
this._ultron.destroy();
98+
this._ultron = null;
99+
100+
--impl.count;
101+
if (impl.count <= 0) {
102+
return impl.getNode().unadvertise(impl.getTopic());
103+
}
104104

105-
isShutdown() {
106-
return this._state === SHUTDOWN;
105+
this.removeAllListeners();
106+
}
107+
// else
108+
return Promise.resolve();
107109
}
108110

109-
disconnect() {
110-
this._state = SHUTDOWN;
111-
112-
Object.keys(this._subClients).forEach((clientId) => {
113-
const client = this._subClients[clientId];
114-
client.end();
115-
});
116-
117-
// disconnect from the spinner in case we have any pending callbacks
118-
this._nodeHandle.getSpinner().disconnect(this._getSpinnerId());
119-
this._subClients = {};
111+
/**
112+
* Check if this publisher has been shutdown
113+
* @returns {boolean}
114+
*/
115+
isShutdown() {
116+
return !!this._impl;
120117
}
121118

122119
/**
@@ -126,141 +123,7 @@ class Publisher extends EventEmitter {
126123
* @param [throttleMs] {number} optional override for publisher setting
127124
*/
128125
publish(msg, throttleMs) {
129-
if (this.isShutdown()) {
130-
return;
131-
}
132-
133-
if (typeof throttleMs !== 'number') {
134-
throttleMs = this._throttleMs;
135-
}
136-
137-
if (throttleMs < 0) {
138-
// short circuit JS event queue, publish "synchronously"
139-
this._handleMsgQueue([msg]);
140-
}
141-
else {
142-
this._nodeHandle.getSpinner().ping(this._getSpinnerId(), msg);
143-
}
144-
}
145-
146-
/**
147-
* Pulls all msgs off queue, serializes, and publishes them
148-
*/
149-
_handleMsgQueue(msgQueue) {
150-
151-
// There's a small chance that we were shutdown while the spinner was locked
152-
// which could cause _handleMsgQueue to be called if this publisher was in there.
153-
if (this.isShutdown()) {
154-
return;
155-
}
156-
157-
const numClients = this.getNumSubscribers();
158-
if (numClients === 0) {
159-
this._log.debugThrottle(2000, `Publishing message on ${this.getTopic()} with no subscribers`);
160-
}
161-
162-
try {
163-
msgQueue.forEach((msg) => {
164-
if (this._resolve) {
165-
msg = this._messageHandler.Resolve(msg);
166-
}
167-
168-
const serializedMsg = TcprosUtils.serializeMessage(this._messageHandler, msg);
169-
170-
Object.keys(this._subClients).forEach((client) => {
171-
this._subClients[client].write(serializedMsg);
172-
});
173-
174-
// if this publisher is supposed to latch,
175-
// save the last message. Any subscribers that connect
176-
// before another call to publish() will receive this message
177-
if (this.getLatching()) {
178-
this._lastSentMsg = serializedMsg;
179-
}
180-
});
181-
}
182-
catch (err) {
183-
this._log.error('Error when publishing message on topic %s: %s', this.getTopic(), err.stack);
184-
this.emit('error', err);
185-
}
186-
}
187-
188-
handleSubscriberConnection(subscriber, header) {
189-
let error = TcprosUtils.validateSubHeader(
190-
header, this.getTopic(), this.getType(),
191-
this._messageHandler.md5sum());
192-
if (error !== null) {
193-
this._log.error('Unable to validate subscriber connection header '
194-
+ JSON.stringify(header));
195-
subscriber.end(Serialize(error));
196-
return;
197-
}
198-
// else
199-
this._log.info('Pub %s got connection header %s', this.getTopic(), JSON.stringify(header));
200-
201-
// create and send response
202-
let respHeader =
203-
TcprosUtils.createPubHeader(
204-
this._nodeHandle.getNodeName(),
205-
this._messageHandler.md5sum(),
206-
this.getType(),
207-
this.getLatching(),
208-
this._messageHandler.messageDefinition());
209-
subscriber.write(respHeader);
210-
211-
// if this publisher had the tcpNoDelay option set
212-
// disable the nagle algorithm
213-
if (this._tcpNoDelay) {
214-
subscriber.setNoDelay(true);
215-
}
216-
217-
subscriber.on('close', () => {
218-
this._log.info('Publisher %s client %s disconnected!',
219-
this.getTopic(), subscriber.name);
220-
delete this._subClients[subscriber.name];
221-
this.emit('disconnect');
222-
});
223-
224-
subscriber.on('end', () => {
225-
this._log.info('Sub %s sent END', subscriber.name);
226-
});
227-
228-
subscriber.on('error', () => {
229-
this._log.warn('Sub %s had error', subscriber.name);
230-
});
231-
232-
if (this._lastSentMsg !== null) {
233-
this._log.debug('Sending latched msg to new subscriber');
234-
subscriber.write(this._lastSentMsg);
235-
}
236-
237-
// if handshake good, add to list, we'll start publishing to it
238-
this._subClients[subscriber.name] = subscriber;
239-
240-
this.emit('connection', header, subscriber.name);
241-
}
242-
243-
_register() {
244-
this._nodeHandle.registerPublisher(this._topic, this._type)
245-
.then((resp) => {
246-
// if we were shutdown between the starting the registration and now, bail
247-
if (this.isShutdown()) {
248-
return;
249-
}
250-
251-
this._log.info('Registered %s as a publisher: %j', this._topic, resp);
252-
let code = resp[0];
253-
let msg = resp[1];
254-
let subs = resp[2];
255-
if (code === 1) {
256-
// registration worked
257-
this._state = REGISTERED;
258-
this.emit('registered');
259-
}
260-
})
261-
.catch((err) => {
262-
this._log.error('Error while registering publisher %s: %s', this.getTopic(), err);
263-
})
126+
this._impl.publish(msg, throttleMs);
264127
}
265128
}
266129

0 commit comments

Comments
 (0)