Skip to content

Commit 4e562ce

Browse files
authored
Support asynchronous services (#68)
-Service callbacks can now return promises. Result will be sent when promise resolves. -Adds tests for asynchronous services. -Service clients whose connections are closed while a call is in progress will be rejected.
1 parent 132cd84 commit 4e562ce

3 files changed

Lines changed: 95 additions & 13 deletions

File tree

src/lib/ServiceClient.js

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,14 @@ class ServiceClient extends EventEmitter {
235235
call.serviceClient.write(serializedRequest);
236236

237237
return new Promise((resolve, reject) => {
238+
const closeHandler = () => {
239+
this._log.debug('Service %s client disconnected during call!', this.getService());
240+
reject(new Error('Connection was closed'));
241+
}
242+
238243
call.serviceClient.$deserializeStream.once('message', (msg, success) => {
244+
call.serviceClient.removeListener('close', closeHandler);
245+
239246
if (success) {
240247
resolve(this._messageHandler.Response.deserialize(msg));
241248
}
@@ -245,6 +252,9 @@ class ServiceClient extends EventEmitter {
245252
reject(error);
246253
}
247254
});
255+
256+
// if the connection closes while waiting for a response, reject the request
257+
call.serviceClient.on('close', closeHandler);
248258
});
249259
}
250260

src/lib/ServiceServer.js

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -154,22 +154,29 @@ class ServiceServer extends EventEmitter {
154154

155155
// call service callback
156156
let resp = new this._messageHandler.Response();
157-
let success = this._requestCallback(req, resp);
157+
let result = this._requestCallback(req, resp);
158+
Promise.resolve(result)
159+
.then((success) => {
160+
// client should already have been closed, so if we got here just cut out early
161+
if (this.isShutdown()) {
162+
return;
163+
}
158164

159-
const serializeResponse = TcprosUtils.serializeServiceResponse(
160-
this._messageHandler.Response,
161-
resp,
162-
success
163-
);
165+
const serializeResponse = TcprosUtils.serializeServiceResponse(
166+
this._messageHandler.Response,
167+
resp,
168+
success
169+
);
164170

165-
// send service response
166-
client.write(serializeResponse);
171+
// send service response
172+
client.write(serializeResponse);
167173

168-
if (!client.$persist) {
169-
this._log.debug('Closing non-persistent client');
170-
client.end();
171-
delete this._clients[client.name];
172-
}
174+
if (!client.$persist) {
175+
this._log.debug('Closing non-persistent client');
176+
client.end();
177+
delete this._clients[client.name];
178+
}
179+
});
173180
}
174181

175182
_register() {

test/xmlrpcTest.js

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -812,6 +812,25 @@ describe('Protocol Test', () => {
812812
})
813813
});
814814

815+
it('Asynchronous Call and Response', (done) => {
816+
const nh = rosnodejs.nh;
817+
const serv = nh.advertiseService(service, srvType, (req, resp) => {
818+
return Promise.resolve(true);
819+
});
820+
821+
const client = nh.serviceClient(service, srvType);
822+
nh.waitForService(service)
823+
.then(() => {
824+
return client.call({});
825+
})
826+
.then(() => {
827+
done();
828+
})
829+
.catch((err) => {
830+
throwNext(err);
831+
})
832+
});
833+
815834
it('Service Failure', (done) => {
816835
const nh = rosnodejs.nh;
817836
const serv = nh.advertiseService(service, srvType, (req, resp) => {
@@ -881,6 +900,52 @@ describe('Protocol Test', () => {
881900
setTimeout(done, 500);
882901
});
883902

903+
it('Service Shutdown Handling Call', function(done) {
904+
this.slow(1600);
905+
906+
const nh = rosnodejs.nh;
907+
const serv = nh.advertiseService(service, srvType, (req, resp) => {
908+
serv.shutdown();
909+
910+
return true;
911+
});
912+
913+
const client = nh.serviceClient(service, srvType);
914+
nh.waitForService(service)
915+
.then(() => {
916+
return client.call({});
917+
})
918+
.catch((err) => {
919+
expect(err.message).to.equal('Connection was closed');
920+
done();
921+
});
922+
});
923+
924+
it('Service Shutdown Handling Asynchronous Call', function(done) {
925+
this.slow(1600);
926+
927+
const nh = rosnodejs.nh;
928+
const serv = nh.advertiseService(service, srvType, (req, resp) => {
929+
930+
return new Promise((resolve) => {
931+
setTimeout(() => {
932+
serv.shutdown();
933+
resolve(true);
934+
}, 0);
935+
});
936+
});
937+
938+
const client = nh.serviceClient(service, srvType);
939+
nh.waitForService(service)
940+
.then(() => {
941+
return client.call({});
942+
})
943+
.catch((err) => {
944+
expect(err.message).to.equal('Connection was closed');
945+
done();
946+
});
947+
});
948+
884949
it('Service Unregistered During Call', (done) => {
885950
// simulate a service disconnecting between the lookupService call to ROS Master
886951
// and the connection to the service node's TCPROS server

0 commit comments

Comments
 (0)