Skip to content

Commit 007b6e9

Browse files
committed
fix remote document change notifications
1 parent 8f292aa commit 007b6e9

11 files changed

Lines changed: 63 additions & 21 deletions

File tree

examples/test-loopback-party.js

Lines changed: 40 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,15 @@ const WRTC = require('wrtc')
44
const BouncerModel = require('@dataparty/bouncer-model/dist/bouncer-model.json')
55
const Dataparty = require('../src')
66

7+
8+
async function getUser(party, name) {
9+
return (await party.find()
10+
.type('user')
11+
.where('name').equals(name)
12+
.exec())[0]
13+
}
14+
15+
716
async function main(){
817
const dbPath = await fs.mkdtemp('/tmp/tingo-party')
918
const configPath = await fs.mkdtemp('/tmp/tingo-party-config')
@@ -18,11 +27,13 @@ async function main(){
1827
config
1928
})
2029

30+
hostLocal.topics = new Dataparty.LocalTopicHost()
31+
2132
let loopback = new Dataparty.Comms.LoopbackChannel()
2233

2334
let comms1 = new Dataparty.Comms.LoopbackComms({
2435
host: true,
25-
channel: loopback.peer1
36+
channel: loopback.port1
2637
})
2738

2839
let peer1 = new Dataparty.PeerParty({
@@ -33,7 +44,9 @@ async function main(){
3344
})
3445

3546

36-
let comms2 = new Dataparty.Comms.LoopbackComms({ channel: loopback.peer2, session: 'foobar' })
47+
48+
49+
let comms2 = new Dataparty.Comms.LoopbackComms({ channel: loopback.port2, session: 'foobar' })
3750

3851
let peer2 = new Dataparty.PeerParty({
3952
comms: comms2,
@@ -71,16 +84,36 @@ async function main(){
7184

7285

7386
if(!user){
74-
debug('creating document')
87+
console.log('peer2 creating document')
7588
user = await peer2.createDocument('user', {name: 'tester', created: (new Date()).toISOString() })
7689
}
7790
else{
78-
debug('loaded document')
91+
console.log('peer2 loaded document')
7992
}
80-
93+
94+
console.log(user.data)
95+
console.log('hash',user.hash)
96+
97+
console.log('peer1 find document by new field value')
98+
let userFind = await getUser(peer1,'tester')
99+
console.log(userFind.data)
100+
console.log('hash',userFind.hash)
101+
102+
user.on('change', (obj)=>{ console.log('peer2 remote event [document.on(change)]', obj ) })
103+
user.on('update', (obj)=>{ console.log('peer2 event [document.on(update)]')})
104+
user.on('value', (doc)=>{ console.log('peer2 event [document.on(value)]') })
105+
user.on('remove', (obj)=>{ console.log('peer2 event [document.on(remove)]') })
106+
107+
await user.watch()
108+
109+
console.log('\npeer1 changing document field')
110+
userFind.data.name = 'renamed-tester'
111+
await userFind.save()
112+
113+
console.log(userFind.data)
114+
console.log('hash',userFind.hash)
81115

82-
console.log(user.data)
83-
process.exit()
116+
await userFind.remove()
84117
}
85118

86119

src/bouncer/idb.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ class IDb extends EventEmitter {
6060

6161
emitChange(msg, change){
6262
const { type, id, revision } = msg.$meta
63+
debug('emitChange', `${type}:${id}`)
6364
this.emit(
6465
`${type}:${id}`,
6566
{

src/comms/isocket-comms.js

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ class ISocketComms extends EventEmitter {
126126
debug(msg.id)
127127

128128
if(msg.op != 'publish'){
129-
debug('emit id')
129+
debug('emit id', msg.id)
130130
comm.emit(msg.id, msg)
131131
} else {
132132
debug('emit message')
@@ -136,7 +136,11 @@ class ISocketComms extends EventEmitter {
136136
}
137137

138138
send(input){
139-
debug('send - ', input)
139+
debug('send - ', typeof input, input)
140+
141+
if(typeof input != 'object'){
142+
input = JSON.parse(input)
143+
}
140144

141145
const content = new Message({msg: input})
142146

src/comms/loopback-channel-port.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
const debug=require('debug')('dataparty.comms.loopback-channel-port')
2-
2+
const EventEmitter = require("eventemitter3")
33

44
class LoopbackChannelPort {
55
constructor(peer, name){

src/comms/loopback-channel.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ module.exports = class LoopbackChannel {
1717
this.port1 = new LoopbackChannelPort(undefined, '1')
1818

1919
//! The second channel peer
20-
this.peer2 = new LoopbackChannelPort(this.port1, '2')
20+
this.port2 = new LoopbackChannelPort(this.port1, '2')
2121

2222
this.port1.peer = this.port2
2323
}

src/comms/ros-shim.js

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,6 @@ class RosShim extends ROSLIB.Ros {
2727

2828
handleMessage(message) {
2929

30-
console.log(message)
31-
3230
if (message.op === 'publish') {
3331
debug('publish op')
3432
this.emit(message.topic, message.msg);

src/party/idocument.js

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,10 @@ class IDocument extends EventEmitter {
239239
.id(this.id)
240240
.exec()
241241
.then(docs => {
242+
if(docs.length==0){
243+
return
244+
}
245+
242246
this._data = docs[0].data
243247

244248
debug('pull found', docs)
@@ -268,8 +272,7 @@ class IDocument extends EventEmitter {
268272

269273
if (this.watchSub){ return }
270274

271-
const socket = await this.party.socket()
272-
const ros = socket.ros
275+
const ros = this.party.comms.ros
273276
const watchPath = '/dataparty/document/' + this.type + '/' + this.id
274277

275278
debug('watch document', watchPath)
@@ -355,7 +358,7 @@ class IDocument extends EventEmitter {
355358
case 'update':
356359
case 'create':
357360

358-
if(this.followcache){
361+
if(this.followcache && event.event !== 'remove'){
359362
this._data = newMsg
360363

361364
/**

src/party/loki-cache.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ module.exports = class LokiCache extends EventEmitter {
3232
}
3333

3434
remove(type, id){
35-
console('cache remove')
3635
debug('remove', type, id)
3736
var collection = this.db.getCollection(type)
3837

src/topics/host-topic.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
'use strict'
22

33
const Path = require('path')
4-
const debug = require('debug')('dataparty.topics.peer-node')
4+
const debug = require('debug')('dataparty.topics.host-topic')
55

66
class HostTopic {
77
constructor(path, type){

src/topics/local-topic-host.js

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,16 +77,20 @@ class LocalTopicHost {
7777

7878
if(topic.path.indexOf('/dataparty/document/') != -1 && !exists){
7979
const [arg0, arg1, docType, docId] = topic.path.substr(1).split('/')
80+
debug('is document watcher', docType+':'+docId)
8081

81-
peer.hostParty.db.on(docType+':'+docId, async (event)=>{
82+
peer.party.hostParty.db.on(docType+':'+docId, async (event)=>{
8283
await this.handleDocChange(topic.path, event)
8384
})
8485
}
8586
}
8687

8788
async handleDocChange(path, event){
89+
debug('handleDocChange', path)
8890
const topic = this.getTopic(path,false)
8991

92+
debug('\ttopic',topic)
93+
9094
if(!topic){return}
9195

9296
const [arg0, arg1, docType, docId] = topic.path.substr(1).split('/')
@@ -96,7 +100,7 @@ class LocalTopicHost {
96100
id: event.msg.id,
97101
type: event.msg.type,
98102
revision: event.msg.revision,
99-
operationType: event.change
103+
operationType: event.event
100104
})
101105
}
102106
}

0 commit comments

Comments
 (0)