Skip to content

Commit 8cc61aa

Browse files
committed
generally working
1 parent e76e522 commit 8cc61aa

9 files changed

Lines changed: 75 additions & 35 deletions

File tree

public/example.html

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,33 +12,52 @@
1212

1313
async function init(){
1414

15-
//new Dataparty.Config.MemoryConfig()
16-
17-
config = new Dataparty.Config.LocalStorageConfig({
15+
config = new Dataparty.Config.MemoryConfig({
1816
basePath:'demo',
1917
cloud: {
2018
uri: 'http://172.16.3.1:4001'
2119
}
2220
})
2321

22+
/*config = new Dataparty.Config.LocalStorageConfig({
23+
basePath:'demo',
24+
cloud: {
25+
uri: 'http://172.16.3.1:4001'
26+
}
27+
})*/
28+
2429

2530
const remoteIdentity = await Dataparty.Comms.RestComms.HttpGet( config.read('cloud.uri') + '/identity')
2631

2732
console.log('cloud.uri -',config.read('cloud.uri'))
2833
console.log('\tremoteIdentity', remoteIdentity)
2934

3035
let party = new Dataparty.PeerParty({
31-
comms: new Dataparty.Comms.WebsocketComms({
32-
uri:'ws://172.16.3.1:4001/ws',
33-
discoverRemoteIdentity: false,
34-
remoteIdentity: remoteIdentity,
35-
session: Math.random().toString(36).slice(2)
36-
}),
37-
config: config
38-
})
39-
36+
comms: new Dataparty.Comms.WebsocketComms({
37+
uri:'ws://172.16.3.1:4001/ws',
38+
discoverRemoteIdentity: false,
39+
remoteIdentity: remoteIdentity,
40+
session: Math.random().toString(36).slice(2)
41+
}),
42+
config: config
43+
})
4044

4145
window.party = party
46+
await party.start()
47+
48+
await party.comms.authorized()
49+
50+
51+
timeTopic = new party.ROSLIB.Topic({
52+
ros : party.comms.ros,
53+
name : '/time',
54+
messageType: 'number'
55+
})
56+
57+
timeTopic.subscribe((msg)=>{
58+
console.log(name, msg)
59+
})
60+
4261
}
4362

4463
init()

src/comms/host/host-protocol-scheme.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ const UNADVERTISE_OP = Joi.object().keys({
4949
const SUBSCRIBE_OP = Joi.object().keys({
5050
id: ID_SCHEME.required(),
5151
op: Joi.string().valid('subscribe').required(),
52-
type: Joi.string().required(),
52+
type: Joi.string(),//.required(),
5353
topic: Joi.string().required(),
5454
compression: Joi.string(),
5555
queue_length: Joi.number(),

src/comms/peer-comms.js

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,17 @@ class PeerComms extends SocketComms {
5757
try{
5858

5959
let response = null
60-
const request = await this.decrypt( {data: message}, this.remoteIdentity )
60+
let request = await this.decrypt( {data: message}, this.remoteIdentity )
6161
debug('handleHostCall', request)
6262

6363
let inputValidated
6464

6565
if (this.state === PeerComms.STATES.AUTHED) {
66+
67+
if(typeof request != 'object'){
68+
request = JSON.parse(request)
69+
}
70+
6671
debug('handling authed call')
6772
inputValidated = HostProtocolScheme.ANY_OP.validate(request)
6873
} else if (this.state === PeerComms.STATES.AUTH_REQUIRED) {
@@ -167,7 +172,7 @@ class PeerComms extends SocketComms {
167172
await this.socketInit()
168173
}
169174

170-
this.socket.on('close', this.close.bind(this))
175+
this.socket.on('close', this.stop.bind(this))
171176

172177
if(this.host){
173178
debug('host mode comms')
@@ -192,7 +197,7 @@ class PeerComms extends SocketComms {
192197
}
193198

194199
async close(){
195-
debug('close')
200+
debug('close', this.uuid)
196201

197202
if(this.party.topics){
198203
await this.party.topics.destroyNode(this)
@@ -211,6 +216,8 @@ class PeerComms extends SocketComms {
211216

212217
debug('Here state : ', this.state)
213218

219+
console.log(op.input)
220+
214221
if (op.op === 'auth' && this.state === PeerComms.STATES.AUTH_REQUIRED) {
215222

216223
debug('handling auth op')
@@ -223,25 +230,25 @@ class PeerComms extends SocketComms {
223230
} else if (op.op === 'advertise' && this.state === PeerComms.STATES.AUTHED) {
224231

225232
if(this.party.topics){
226-
await this.party.topics.advertise(this, op.topic)
233+
await this.party.topics.advertise(this, op.input.topic)
227234
}
228235

229236
} else if (op.op === 'subscribe' && this.state === PeerComms.STATES.AUTHED) {
230237

231238
if(this.party.topics){
232-
await this.party.topics.subscribe(this, op.topic)
239+
await this.party.topics.subscribe.bind(this.party.topics)(this, op.input.topic)
233240
}
234241

235242
} else if (op.op === 'unsubscribe' && this.state === PeerComms.STATES.AUTHED) {
236243

237244
if(this.party.topics){
238-
await this.party.topics.unsubscribe(this, op.topic)
245+
await this.party.topics.unsubscribe(this, op.input.topic)
239246
}
240247

241248
} else if (op.op === 'publish' && this.state === PeerComms.STATES.AUTHED) {
242249

243250
if(this.party.topics){
244-
await this.party.topics.publish(this, op.topic, op.msg)
251+
await this.party.topics.publish(this, op.input.topic, op.input.msg)
245252
}
246253

247254
} else {

src/comms/websocket-comms.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ class WebsocketShim extends EventEmitter {
1818

1919
this.conn.onopen = () => {
2020
debug('shim open')
21-
this.emit('connect')
21+
setTimeout(()=>{this.emit('connect')}, 1)
2222
}
2323

2424
this.conn.onclose = () => {

src/party/iparty.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
const debug = require('debug')('dataparty.iparty')
22
const dataparty_crypto = require('@dataparty/crypto')
33

4+
const ROSLIB = require('roslib')
5+
46
const Query = require('./query.js')
57
const IDocument = require('./idocument')
68
const DocumentFactory = require('./document-factory')

src/party/peer/peer-party.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ class PeerParty extends IParty {
3434
debug('host')
3535
this.hostParty = hostParty
3636
this.hostRunner = hostRunner
37-
this.topics = new LocalTopicHost()
37+
this.topics = hostParty.topics || new LocalTopicHost()
3838
}
3939
}
4040

src/topics/host-topic.js

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ class HostTopic {
99
this.type = type
1010
this.created = Date.now()
1111

12+
debug('new topic', path, '(', type, ')')
13+
1214
this.lastMessage = null
1315
this.lastMessageTime = null
1416

@@ -18,7 +20,7 @@ class HostTopic {
1820

1921
subscribe(node){
2022
if(!this.subscribers.get(node.uuid)){
21-
this.subscribers.add(node.uuid, node)
23+
this.subscribers.set(node.uuid, node)
2224
}
2325
}
2426

@@ -30,7 +32,7 @@ class HostTopic {
3032

3133
advertise(node){
3234
if(!this.advertisers.get(node.uuid)){
33-
this.advertisers.add(node.uuid, node)
35+
this.advertisers.set(node.uuid, node)
3436
}
3537
}
3638

@@ -56,6 +58,12 @@ class HostTopic {
5658
this.lastMessageTime = Date.now()
5759

5860
for(let node of this.subscribers){
61+
62+
if(node[0] == sender.uuid){
63+
debug('publish skip', node[0])
64+
continue
65+
}
66+
5967
sends.push(node[1].send(this, data, sender))
6068
}
6169

src/topics/local-topic-host.js

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,30 +13,35 @@ class LocalTopicHost {
1313
this.topicsByPath = new Map()
1414
}
1515

16-
async getTopic(path, create=true){
16+
getTopic(path, create=true){
1717
const normalized = Path.normalize(path)
1818
let topic = this.topicsByPath.get(normalized)
1919

20+
//debug('get topic', topic, normalized)
21+
2022
if(!topic && create == true){
21-
topic = new HostTopic(normalized)
22-
this.topicsByPath.add(normalized, topic)
23+
//topic =
24+
this.topicsByPath.set(normalized, new HostTopic(normalized))
25+
26+
//debug('set topic', this.topicsByPath.get(normalized), normalized)
27+
return this.topicsByPath.get(normalized)
2328
}
2429

2530
return topic
2631
}
2732

28-
async getNodeByUuid(uuid, peer){
33+
getNodeByUuid(uuid, peer){
2934
let node = this.nodesByUuid.get(uuid)
3035

3136
if(!node && peer){
3237
node = new PeerNode(peer)
33-
this.nodesByUuid.add(uuid, node)
38+
this.nodesByUuid.set(uuid, node)
3439
}
3540

3641
return node
3742
}
3843

39-
async getNodeByPeer(peer){
44+
getNodeByPeer(peer){
4045
return this.getNodeByUuid(peer.uuid, peer)
4146
}
4247

@@ -51,6 +56,7 @@ class LocalTopicHost {
5156
}
5257

5358
async subscribe(peer, path){
59+
debug('sub', path)
5460
const topic = this.getTopic(path)
5561
const node = this.getNodeByPeer(peer)
5662

@@ -100,13 +106,11 @@ class LocalTopicHost {
100106
async destroyNode(peer){
101107
const node = this.getNodeByPeer(peer)
102108

103-
debug('destroying node', node.uuid)
109+
debug('destroying node', node.uuid, node)
104110

105111
node.destroy()
106112

107113
this.nodesByUuid.delete(node.uuid)
108-
109-
node=null
110114
}
111115

112116
async cleanUpTopics(){

src/topics/peer-node.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,13 @@ class PeerNode {
3131

3232
advertise(topic){
3333
if(!this.advertisements.get(topic.path)){
34-
this.advertisements.add(topic.path, topic)
34+
this.advertisements.set(topic.path, topic)
3535
}
3636
}
3737

3838
subscribe(topic){
3939
if(!this.subscriptions.get(topic.path)){
40-
this.subscriptions.add(topic.path, topic)
40+
this.subscriptions.set(topic.path, topic)
4141
}
4242
}
4343

0 commit comments

Comments
 (0)