Skip to content

Commit eb03531

Browse files
committed
support websockets as a peer-comms
1 parent af878b6 commit eb03531

11 files changed

Lines changed: 191 additions & 52 deletions

examples/test-peer-party.js

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ async function main(){
1818
comms: new Dataparty.Comms.RTCSocketComms({
1919
host: true,
2020
wrtc: WRTC,
21-
trickle: true
21+
trickle: true,
22+
discoverRemoteIdentity: true
2223
}),
2324
hostParty: hostLocal,
2425
model: BouncerModel,
@@ -29,7 +30,8 @@ async function main(){
2930
let peer2 = new Dataparty.PeerParty({
3031
comms: new Dataparty.Comms.RTCSocketComms({
3132
wrtc: WRTC,
32-
trickle: true
33+
trickle: true,
34+
session: 'foobar'
3335
}),
3436
model: BouncerModel,
3537
config: new Dataparty.Config.MemoryConfig()
@@ -40,7 +42,7 @@ async function main(){
4042
await peer1.loadIdentity()
4143
await peer2.loadIdentity()
4244

43-
peer1.comms.remoteIdentity = peer2.identity
45+
//peer1.comms.remoteIdentity = peer2.identity
4446
peer2.comms.remoteIdentity = peer1.identity
4547

4648
await peer1.start()

examples/test-service-host.js

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,19 @@ class ExampleService extends Dataparty.IService {
2525

2626
async function main(){
2727

28-
const uri = 'mongodb://localhost:27017/server-party-test'
29-
debug('db location', uri)
28+
//const uri = 'mongodb://localhost:27017/server-party-test'
29+
//debug('db location', uri)
3030

31-
let party = new Dataparty.MongoParty({
32-
uri,
31+
const path = '/data/datparty/srv-party'
32+
33+
let party = new Dataparty.TingoParty({
34+
path,
3335
model: BouncerClientModels,
3436
serverModels: BouncerServerModels,
3537
config: new Dataparty.Config.MemoryConfig()
3638
})
3739

40+
3841
const service = new ExampleService({ name: '@dataparty/example', version: '0.0.1' })
3942

4043
const build = await service.compile(Path.join(__dirname,'/dataparty'), true)

examples/test-service-node-host.js

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,14 @@ class ExampleService extends Dataparty.IService {
2525

2626
async function main(){
2727

28-
const uri = 'mongodb://localhost:27017/server-party-test'
29-
debug('db location', uri)
28+
29+
//const uri = 'mongodb://localhost:27017/server-party-test'
30+
//debug('db location', uri)
31+
32+
const path = '/data/datparty/srv-party'
3033

31-
let party = new Dataparty.MongoParty({
32-
uri,
34+
let party = new Dataparty.TingoParty({
35+
path,
3336
model: BouncerClientModels,
3437
serverModels: BouncerServerModels,
3538
config: new Dataparty.Config.MemoryConfig()
@@ -43,7 +46,7 @@ async function main(){
4346

4447
const runner = new Dataparty.ServiceRunnerNode({
4548
party, service,
46-
sendFullErrors: true
49+
sendFullErrors: false
4750
})
4851

4952
const host = new Dataparty.ServiceHost({
@@ -65,4 +68,4 @@ async function main(){
6568

6669
main().catch(err=>{
6770
console.error(err)
68-
})
71+
})

src/comms/peer-comms.js

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,12 @@ const SocketComms = require('./socket-comms')
77
const AUTH_TIMEOUT_MS = 3000
88

99
class PeerComms extends SocketComms {
10-
constructor({remoteIdentity, host, party, socket}){
11-
super({remoteIdentity, party})
10+
constructor({remoteIdentity, discoverRemoteIdentity, host, party, socket, ...options}){
11+
super({remoteIdentity, discoverRemoteIdentity, party, ...options})
1212

1313
this.socket = socket || null
1414

15-
this.host = host
15+
this.host = host //! Is comms host
1616
this.party = party
1717
this.oncall = null
1818

@@ -114,10 +114,12 @@ class PeerComms extends SocketComms {
114114
this.socket.on('close', this.onclose.bind(this))
115115

116116
if(this.host){
117+
debug('host mode comms')
117118
this.socket.on('connect', this.handleClientConnection.bind(this))
118119
this.socket.on('data', this.handleClientCall.bind(this))
119120
}
120121
else{
122+
debug('client mode comms')
121123
this.socket.on('connect', this.onopen.bind(this))
122124
this.socket.on('data', this.handleMessage.bind(this))
123125
}
@@ -132,7 +134,7 @@ class PeerComms extends SocketComms {
132134
}
133135

134136
close(){
135-
debug('Client closing connection')
137+
debug('closing connection')
136138
this.socket.destroy()
137139
}
138140

src/comms/rest-comms.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ const debug = require('debug')('dataparty.comms.rest')
44

55
const dataparty_crypto = require('@dataparty/crypto')
66

7-
const WebsocketComms = require('./websocket-comms')
7+
const WebsocketComms = require('./old-websocket-comms')
88
const AuthError = require('../errors/auth-error')
99

1010

src/comms/rtc-socket-comms.js

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,10 @@ const debug = require('debug')('dataparty.comms.rtcsocketcomms')
33
const SimplePeer = require('simple-peer')
44
const PeerComms = require('./peer-comms')
55

6-
const AUTH_TIMEOUT_MS = 3000
76

87
class RTCSocketComms extends PeerComms {
9-
constructor({remoteIdentity, host, party, wrtc, trickle = false}){
10-
super({remoteIdentity, host, party})
8+
constructor({remoteIdentity, host, party, wrtc, trickle = false, ...options}){
9+
super({remoteIdentity, host, party, ...options})
1110

1211
this.rtcSettings = {
1312
wrtc,

src/comms/socket-comms.js

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,12 @@ const RosShim = require('./ros-shim')
1010

1111

1212
class SocketComms extends EventEmitter {
13-
constructor({session, uri, party, remoteIdentity}){
13+
constructor({session, uri, party, remoteIdentity, discoverRemoteIdentity}){
1414
super()
1515
this.uri = uri
1616
this.session = session
1717
this.remoteIdentity = remoteIdentity
18+
this.discoverRemoteIdentity = discoverRemoteIdentity
1819

1920
this.party = party //used for access to primary identity
2021

@@ -84,10 +85,20 @@ class SocketComms extends EventEmitter {
8485

8586
return resolve(msg.decrypt(this.party._identity).then(content=>{
8687
const senderPub = Routines.extractPublicKeys(msg.enc)
88+
debug('sender', sender, '\tdiscover', this.discoverRemoteIdentity)
89+
if(this.discoverRemoteIdentity && !sender){
90+
debug('discovered remote identity', senderPub)
91+
this.remoteIdentity = {
92+
key: {
93+
public: senderPub
94+
}
95+
}
96+
sender = this.remoteIdentity
97+
}
8798
debug(`senderPub - ${senderPub}`)
8899

89100
if(senderPub.box != sender.key.public.box || senderPub.sign != sender.key.public.sign){
90-
return Promise.reject('TRUST - reply is not from service')
101+
return Promise.reject('TRUST - reply is not from expected remote')
91102
}
92103

93104
debug('decrypted data')

src/comms/websocket-comms.js

Lines changed: 73 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,81 @@
1-
'use strict'
1+
const debug = require('debug')('dataparty.comms.websocket')
22

3-
const debug = require('debug')('dataparty.comms.websocketcomms')
4-
const W3CWebSocket = require('websocket').w3cwebsocket;
5-
const WebSocket = W3CWebSocket
3+
const WebSocket = require('ws')
4+
const EventEmitter = require('eventemitter3')
65

7-
const SocketComms = require('./socket-comms')
86

9-
class WebsocketComms extends SocketComms {
10-
constructor({session, uri, identity, remoteIdentity}){
11-
super({session, uri, identity, remoteIdentity})
7+
const PeerComms = require('./peer-comms')
128

13-
this.socket = new WebSocket(this.uri)
14-
this.socket.onclose = this.onclose.bind(this)
15-
this.socket.onopen = this.onopen.bind(this)
16-
this.socket.onmessage = this.onmessage.bind(this)
9+
10+
class WebsocketShim extends EventEmitter {
11+
constructor(conn){
12+
super()
13+
this.conn = conn
14+
15+
this.conn.onmessage = (evt) => {
16+
this.emit('data', evt.data)
17+
}
18+
19+
this.conn.onopen = () => {
20+
debug('shim open')
21+
this.emit('connect')
22+
}
23+
24+
this.conn.onclose = () => {
25+
this.emit('close')
26+
}
27+
28+
this.conn.onerror = (err) => {
29+
this.emit('error', err)
30+
}
31+
32+
if(this.conn.readyState == WebSocket.OPEN){
33+
setTimeout(()=>{this.emit('connect')}, 1)
34+
}
35+
36+
debug('connection shim', this.conn.readyState)
37+
}
38+
39+
close(){
40+
this.conn.close()
41+
}
42+
43+
destroy(){
44+
this.conn.terminate()
45+
}
46+
47+
send(val){ this.conn.send(val) }
48+
49+
}
50+
51+
class WebsocketComms extends PeerComms {
52+
constructor({uri, connection, remoteIdentity, host, party, ...options}){
53+
super({remoteIdentity, host, party, ...options})
54+
55+
this.uri = uri
56+
this.connection = connection
57+
58+
if(this.host && !this.connection){
59+
throw new Error('existing connection expected')
60+
}
61+
62+
if(!this.host && (!this.uri && !this.connection)){
63+
throw new Error('uri or existing connection expected')
64+
}
65+
}
66+
67+
68+
async socketInit(){
69+
debug('init')
70+
71+
if(!this.host && !this.connection){
72+
debug('opening client connection to',this.uri)
73+
this.connection = new WebSocket(this.uri)
1774
}
75+
76+
this.socket = new WebsocketShim(this.connection)
77+
}
1878
}
1979

80+
2081
module.exports = WebsocketComms

src/service/middleware/pre/decrypt.js

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,15 @@ module.exports = class Decrypt extends IMiddleware {
3131

3232
if (!Config){ return }
3333

34-
context.debug('input', context.input)
34+
if(!context.input || !context.input.enc){
35+
throw new Error('insecure message')
36+
}
37+
38+
context.debug('input', context.input, typeof context.input)
3539

3640

3741
const msg = new Message(context.input)
38-
context.debug('privateIdentity', context.party.privateIdentity)
42+
context.debug('privateIdentity', context.party.privateIdentity.id)
3943

4044
const publicKeys = Routines.extractPublicKeys(msg.enc)
4145

src/service/service-host-websocket.js

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@ const WebSocketServer = ws.WebSocketServer
66

77
const WATCHDOG_INTERVAL = 30000
88

9+
const Comms = require('../comms')
10+
const PeerParty = require('../party/peer/peer-party')
11+
912
class ServiceHostWebsocket{
1013

1114
constructor({trust_proxy, port, path, runner, wsSettings}){
@@ -44,7 +47,7 @@ class ServiceHostWebsocket{
4447

4548
handleUpgrade(request, socket, head){
4649

47-
debug('handleUpgrade', request.url)
50+
debug('handleUpgrade', request.headers.host, request.url)
4851

4952
if(request.url == this.path){
5053
this.doUpgrade(request, socket, head)
@@ -56,7 +59,6 @@ class ServiceHostWebsocket{
5659
doUpgrade(request, socket, head){
5760
debug('doUpgrade')
5861
this.ws.handleUpgrade(request, socket, head, (conn)=>{
59-
debug('head', head)
6062
this.ws.emit('connection', conn, request)
6163
})
6264
}
@@ -82,10 +84,10 @@ class ServiceHostWebsocket{
8284

8385
}
8486

85-
handleConnection(conn, req){
87+
async handleConnection(conn, req){
8688
conn.ip = this.getConnectionIp(req)
8789

88-
debug('handleConnection - ', conn.ip)
90+
debug('handleConnection - ', conn.ip, '\t>\t' , req.headers.host, req.url)
8991

9092
conn.isAlive = true
9193
conn.on('pong', ()=>{
@@ -95,6 +97,23 @@ class ServiceHostWebsocket{
9597
conn.on('close',()=>{
9698
debug('connection closed', conn.ip)
9799
})
100+
101+
debug('creating peer party')
102+
103+
let peer = new PeerParty({
104+
hostParty: this.runner.party,
105+
mode: this.runner.party.model,
106+
config: this.runner.party.config,
107+
comms: new Comms.WebsocketComms({
108+
host: true,
109+
connection: conn,
110+
discoverRemoteIdentity: true
111+
})
112+
})
113+
114+
await peer.start()
115+
debug('peer created')
116+
98117
}
99118

100119
checkClients(){

0 commit comments

Comments
 (0)