Skip to content

Commit be4e5bd

Browse files
committed
host comms with validation
1 parent 154cecd commit be4e5bd

3 files changed

Lines changed: 164 additions & 14 deletions

File tree

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
const reach = require('../utils/reach')
1+
const reach = require('../../utils/reach')
22
const debug = require('debug')('dataparty.op.host-op')
33
const moment = require('moment')
44
const EventEmitter = require('eventemitter3')
@@ -34,6 +34,7 @@ class Op extends EventEmitter {
3434
this.end = undefined
3535
this.state = Op.STATES.Pending
3636
this.level = Op.STATUS_LEVELS.Status_Info
37+
this.result = null
3738
}
3839

3940
get op(){
Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,28 +5,29 @@ const ID_SCHEME = Joi.alternatives().try(Joi.string(), Joi.number())
55

66
const OP_HEADER = Joi.object().keys({
77
id: ID_SCHEME.required(),
8-
op: Joi.string().valid([
8+
op: Joi.string().valid(
99
'auth',
10-
'call',
10+
'peer-call',
1111
'advertise',
1212
'subscribe',
1313
'unsubscribe',
1414
'publish'
15-
]).required()
15+
).required()
1616
})
1717

1818

1919
const AUTH_OP = Joi.object().keys({
2020
id: ID_SCHEME.required(),
2121
op: Joi.string().valid('auth').required(),
22-
session: Joi.objectId().required()
22+
session: Joi.string().required(), //Joi.objectId().required(),
2323
})
2424

2525
const CALL_OP = Joi.object().keys({
2626
id: ID_SCHEME.required(),
27-
op: Joi.string().valid('call').required(),
27+
op: Joi.string().valid('peer-call').required(),
28+
session: Joi.string(), //Joi.objectId(),
2829
endpoint: Joi.string(),
29-
body: Joi.string()
30+
data: Joi.object()
3031
})
3132

3233
const ADVERTISE_OP = Joi.object().keys({
@@ -75,7 +76,8 @@ const ANY_OP = Joi.alternatives().try(
7576
UNADVERTISE_OP,
7677
SUBSCRIBE_OP,
7778
UNSUBSCRIBE_OP,
78-
PUBLISH_OP
79+
PUBLISH_OP,
80+
CALL_OP
7981
)
8082

8183
module.exports = {
@@ -86,5 +88,6 @@ module.exports = {
8688
'SUBSCRIBE_OP': SUBSCRIBE_OP,
8789
'UNSUBSCRIBE_OP': UNSUBSCRIBE_OP,
8890
'PUBLISH_OP': PUBLISH_OP,
91+
'PEER_CALL_OP': CALL_OP,
8992
'ANY_OP': ANY_OP
9093
}

src/comms/peer-comms.js

Lines changed: 152 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,29 +4,127 @@ const debug = require('debug')('dataparty.comms.peercomms')
44
const SocketOp = require('./op/socket-op')
55
const SocketComms = require('./socket-comms')
66

7+
const Joi = require('@hapi/joi')
8+
const HostOp = require('./host/host-op')
9+
const HostProtocolScheme = require('./host/host-protocol-scheme')
10+
711
const AUTH_TIMEOUT_MS = 3000
812

13+
const HOST_SESSION_STATES = {
14+
AUTH_REQUIRED: 'AUTH_REQUIRED',
15+
AUTHED: 'AUTHED',
16+
SERVER_CLOSED: 'SERVER_CLOSED',
17+
CLIENT_CLOSED: 'CLIENT_CLOSED'
18+
}
19+
920
class PeerComms extends SocketComms {
1021
constructor({remoteIdentity, discoverRemoteIdentity, host, party, socket, ...options}){
1122
super({remoteIdentity, discoverRemoteIdentity, party, ...options})
1223

1324
this.socket = socket || null
1425

15-
this.host = host //! Is comms host
16-
this.party = party
26+
this.host = host //! Is comms host\
1727
this.oncall = null
1828

1929
this._host_auth_timeout = null
2030

31+
if(this.host){
32+
this.state = PeerComms.STATES.AUTH_REQUIRED
33+
this.session = undefined
34+
this.identity = undefined
35+
this.actor = undefined
36+
}
37+
2138
this.pending_calls = 0
2239
}
2340

41+
setState(state) {
42+
this.state = state
43+
this.emit('state', this.state)
44+
}
45+
46+
static get STATES() {
47+
return HOST_SESSION_STATES
48+
}
49+
2450
async handleClientCall(message){
51+
52+
debug('handleClientCall - pending calls - ', this.pending_calls)
53+
54+
this.pending_calls++
55+
try{
56+
57+
let response = null
58+
const request = await this.decrypt( {data: message}, this.remoteIdentity )
59+
debug('handleHostCall', request)
60+
61+
let inputValidated
62+
63+
if (this.state === PeerComms.STATES.AUTHED) {
64+
debug('handling authed call')
65+
inputValidated = HostProtocolScheme.ANY_OP.validate(request)
66+
} else if (this.state === PeerComms.STATES.AUTH_REQUIRED) {
67+
debug('handling non-authed call')
68+
inputValidated = HostProtocolScheme.AUTH_OP.validate(request)
69+
} else {
70+
throw new Error(
71+
'Recieved input in unexpected session state [',
72+
this.state,
73+
']'
74+
)
75+
}
76+
77+
if(inputValidated.error !== undefined){
78+
throw inputValidated.error
79+
}
80+
81+
debug('original input ->', typeof request, request)
82+
debug('validated input ->', inputValidated)
83+
const op = new HostOp({ msg: message, input: inputValidated.value })
84+
85+
/*debug('session id : ', op.input.session, this.session)
86+
87+
if (this.session && op.input.session === this.session.id) {
88+
debug('session id MATCH')
89+
}*/
90+
91+
op.once('finished', state => {
92+
const response = {
93+
op: 'status',
94+
id: op.id,
95+
level: op.level,
96+
state: op.state,
97+
stats: {
98+
start: op.start,
99+
end: op.end,
100+
duration_ms: op.end - op.start
101+
},
102+
...op.result
103+
}
104+
105+
this.send(response)
106+
})
107+
108+
109+
110+
await this.authorizeOperation(op)
111+
112+
} catch (err) {
113+
debug('EXCEPTION ->', err)
114+
}
115+
this.pending_calls--
116+
117+
118+
return
119+
25120
let response = null
26121
const request = await this.decrypt( {data: message}, this.remoteIdentity )
27122
debug('handleHostCall', request)
28123

29124
if(!this.authed){
125+
126+
127+
30128
if(request.op == 'auth'){
31129
debug('allowing client')
32130
response = {
@@ -66,7 +164,7 @@ class PeerComms extends SocketComms {
66164

67165

68166
async handleClientConnection(){
69-
debug('handleHostConnection')
167+
debug('handleClientConnection')
70168

71169
this._host_auth_timeout = setTimeout(
72170
this.handleAuthTimeout.bind(this),
@@ -90,8 +188,8 @@ class PeerComms extends SocketComms {
90188
this.onmessage({data: message})
91189
}
92190

93-
async call(path, data){
94-
if(this.host){ throw new Error('host-not-allowed-call') }
191+
async call(path, data, force=false){
192+
if(this.host && !this.force){ throw new Error('host-not-allowed-call') }
95193
if(!this.authed){ throw new Error('not authed') }
96194

97195
if (!this.party.hasIdentity()) {
@@ -115,6 +213,7 @@ class PeerComms extends SocketComms {
115213

116214
if(this.host){
117215
debug('host mode comms')
216+
118217
this.socket.on('connect', this.handleClientConnection.bind(this))
119218
this.socket.on('data', this.handleClientCall.bind(this))
120219
}
@@ -136,10 +235,57 @@ class PeerComms extends SocketComms {
136235
close(){
137236
debug('closing connection')
138237
this.socket.destroy()
139-
}
238+
}
239+
240+
241+
async authorizeOperation(op) {
242+
243+
debug('Here\'s op', op)
244+
245+
debug('Here state : ', this.state)
246+
247+
if (op.op === 'auth' && this.state === PeerComms.STATES.AUTH_REQUIRED) {
248+
249+
debug('handling auth op')
250+
return this.handleAuthOp(op)
251+
252+
} else if (op.op === 'peer-call' && this.state === PeerComms.STATES.AUTHED) {
253+
254+
return this.handleCallOp(op)
140255

256+
} else {
257+
op.result='not implemented'
258+
op.setState(HostOp.STATES.Finished_Fail)
259+
}
260+
}
261+
262+
async handleAuthOp(op){
263+
264+
debug('allowing client - ', this.remoteIdentity)
265+
266+
clearTimeout(this._host_auth_timeout)
267+
this._host_auth_timeout = null
141268

269+
this.authed = true
270+
this.setState(PeerComms.STATES.AUTHED)
271+
op.setState(HostOp.STATES.Finished_Success)
142272

273+
this.emit('open')
274+
return
275+
}
276+
277+
async handleCallOp(op){
278+
debug('peer-call')
279+
if(op.input.endpoint == 'api-v2-peer-bouncer'){
280+
281+
debug('ask->',op.input.data)
282+
op.result = await this.party.handleCall(op.input.data)
283+
284+
op.setState(HostOp.STATES.Finished_Success)
285+
286+
return
287+
}
288+
}
143289
}
144290

145291

0 commit comments

Comments
 (0)