Skip to content

Commit 6843507

Browse files
committed
terminate pub/sub operations
1 parent 8cc61aa commit 6843507

4 files changed

Lines changed: 104 additions & 15 deletions

File tree

src/comms/peer-comms.js

Lines changed: 48 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,25 @@ const HOST_SESSION_STATES = {
1818
CLIENT_CLOSED: 'CLIENT_CLOSED'
1919
}
2020

21+
22+
function truncateString(str, num) {
23+
24+
if(!str){return ''}
25+
26+
if(typeof str != 'string'){
27+
str = str.toString()
28+
}
29+
30+
let length = str.length
31+
32+
if (str.length <= num) {
33+
return str
34+
}
35+
return str.slice(0, num) + '...' + (length-num) + 'more bytes'
36+
}
37+
38+
39+
2140
class PeerComms extends SocketComms {
2241
constructor({remoteIdentity, discoverRemoteIdentity, host, party, socket, ...options}){
2342
super({remoteIdentity, discoverRemoteIdentity, party, ...options})
@@ -58,7 +77,7 @@ class PeerComms extends SocketComms {
5877

5978
let response = null
6079
let request = await this.decrypt( {data: message}, this.remoteIdentity )
61-
debug('handleHostCall', request)
80+
debug('handleHostCall', truncateString(request, 1024))
6281

6382
let inputValidated
6483

@@ -85,8 +104,8 @@ class PeerComms extends SocketComms {
85104
throw inputValidated.error
86105
}
87106

88-
debug('original input ->', typeof request, request)
89-
debug('validated input ->', inputValidated)
107+
//debug('original input ->', typeof request, request)
108+
//debug('validated input ->', inputValidated)
90109
const op = new HostOp({ msg: message, input: inputValidated.value })
91110

92111
/*debug('session id : ', op.input.session, this.session)
@@ -109,6 +128,8 @@ class PeerComms extends SocketComms {
109128
...op.result
110129
}
111130

131+
debug(response.id, response.state, response.stats.duration_ms, 'ms')
132+
112133
this.send(response)
113134
})
114135

@@ -133,6 +154,8 @@ class PeerComms extends SocketComms {
133154
)
134155
}
135156

157+
158+
136159
async handleAuthTimeout(){
137160
clearTimeout(this._host_auth_timeout)
138161
this._host_auth_timeout = null
@@ -144,7 +167,7 @@ class PeerComms extends SocketComms {
144167
}
145168

146169
async handleMessage(message){
147-
debug('handleMessage', message.toString())
170+
debug('handleMessage', truncateString(message.toString(), 1024) )
148171

149172
this.onmessage({data: message})
150173
}
@@ -159,7 +182,7 @@ class PeerComms extends SocketComms {
159182

160183
let callOp = new SocketOp( 'peer-call', { endpoint: path, data }, this )
161184

162-
debug('running peer-call endpoint =', path, data)
185+
debug('running peer-call endpoint =', path, truncateString(data, 1024))
163186

164187
const reply = await callOp.run()
165188

@@ -212,11 +235,11 @@ class PeerComms extends SocketComms {
212235

213236
async authorizeOperation(op) {
214237

215-
debug('Here\'s op', op)
238+
//debug('Here\'s op', op)
216239

217-
debug('Here state : ', this.state)
240+
//debug('state : ', this.state)
218241

219-
console.log(op.input)
242+
//console.log(op.input)
220243

221244
if (op.op === 'auth' && this.state === PeerComms.STATES.AUTH_REQUIRED) {
222245

@@ -231,24 +254,40 @@ class PeerComms extends SocketComms {
231254

232255
if(this.party.topics){
233256
await this.party.topics.advertise(this, op.input.topic)
257+
op.setState(HostOp.STATES.Finished_Success)
258+
}
259+
else{
260+
op.setState(HostOp.STATES.Finished_Fail)
234261
}
235262

236263
} else if (op.op === 'subscribe' && this.state === PeerComms.STATES.AUTHED) {
237264

238265
if(this.party.topics){
239266
await this.party.topics.subscribe.bind(this.party.topics)(this, op.input.topic)
267+
op.setState(HostOp.STATES.Finished_Success)
268+
}
269+
else{
270+
op.setState(HostOp.STATES.Finished_Fail)
240271
}
241272

242273
} else if (op.op === 'unsubscribe' && this.state === PeerComms.STATES.AUTHED) {
243274

244275
if(this.party.topics){
245276
await this.party.topics.unsubscribe(this, op.input.topic)
277+
op.setState(HostOp.STATES.Finished_Success)
278+
}
279+
else{
280+
op.setState(HostOp.STATES.Finished_Fail)
246281
}
247282

248283
} else if (op.op === 'publish' && this.state === PeerComms.STATES.AUTHED) {
249284

250285
if(this.party.topics){
251286
await this.party.topics.publish(this, op.input.topic, op.input.msg)
287+
op.setState(HostOp.STATES.Finished_Success)
288+
}
289+
else{
290+
op.setState(HostOp.STATES.Finished_Fail)
252291
}
253292

254293
} else {
@@ -283,7 +322,7 @@ class PeerComms extends SocketComms {
283322
debug('calling runner')
284323

285324
if(op.input.endpoint == 'api-v2-peer-bouncer'){
286-
debug('ask->',op.input.data)
325+
debug('ask->', truncateString(op.input.data, 1024))
287326
op.result = {result: await this.party.handleCall(op.input.data) }
288327

289328
op.setState(HostOp.STATES.Finished_Success)

src/topics/host-topic.js

Lines changed: 44 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@ class HostTopic {
1616

1717
this.subscribers = new Map()
1818
this.advertisers = new Map()
19+
20+
this.sending = false
21+
this.sendRequest = false
1922
}
2023

2124
subscribe(node){
@@ -54,22 +57,58 @@ class HostTopic {
5457
throw new Error('published called before advertise')
5558
}
5659

57-
this.lastMessage = data
58-
this.lastMessageTime = Date.now()
60+
if(!this.sending && this.sendRequest){
61+
62+
this.sendRequest=false
63+
64+
}else if(this.sending){
65+
if(!this.sendRequest){
66+
this.sendRequest = true
67+
}
68+
69+
debug('throttle')
70+
this.lastMessage = data
71+
this.lastMessageTime = Date.now()
72+
73+
return
74+
75+
}else if(!this.sendRequest && !this.sending) {
76+
77+
this.lastMessage = data
78+
this.lastMessageTime = Date.now()
79+
}
80+
81+
5982

6083
for(let node of this.subscribers){
6184

62-
if(node[0] == sender.uuid){
85+
if(sender && node[0] == sender.uuid){
6386
debug('publish skip', node[0])
6487
continue
6588
}
6689

67-
sends.push(node[1].send(this, data, sender))
90+
//await node[1].send(this, this.lastMessage, sender)
91+
92+
sends.push(node[1].send(this, this.lastMessage, sender))
6893
}
6994

7095
if(sends.length > 0){
7196
debug('publishing', this.path)
72-
await Promise.all(sends)
97+
this.sending = true
98+
99+
try{
100+
await Promise.all(sends)
101+
this.sending = false
102+
103+
if(this.sendRequest){
104+
debug('send requested')
105+
//setTimeout(this.publish.bind(this),1)
106+
}
107+
}
108+
catch(err){
109+
this.sending = false
110+
throw err
111+
}
73112
}
74113
}
75114
}

src/topics/local-topic-host.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ class LocalTopicHost {
106106
async destroyNode(peer){
107107
const node = this.getNodeByPeer(peer)
108108

109-
debug('destroying node', node.uuid, node)
109+
debug('destroying node', node.uuid)
110110

111111
node.destroy()
112112

src/topics/peer-node.js

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@ class PeerNode {
1414
}
1515

1616
async send(topic, data, sender=null){
17+
18+
if(!data){return}
19+
1720
if(this.subscriptions.has(topic.path)){
1821

1922
debug(' publish to node', this.uuid)
@@ -26,6 +29,14 @@ class PeerNode {
2629
msg: data
2730
})
2831

32+
/*await this.peer.socket.send(JSON.stringify({
33+
op: 'publish',
34+
id: 'publish:'+this.peer.opId,
35+
topic: topic.path,
36+
sender: { uuid: this.uuid, identity: this.peer.remoteIdentity },
37+
msg: data
38+
}))*/
39+
2940
}
3041
}
3142

0 commit comments

Comments
 (0)