Skip to content

Commit ff6e692

Browse files
committed
basic websocket server listening
1 parent 92207c5 commit ff6e692

4 files changed

Lines changed: 125 additions & 4 deletions

File tree

examples/test-service-host.js

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ async function main(){
3737

3838
const service = new ExampleService({ name: '@dataparty/example', version: '0.0.1' })
3939

40-
const build = await service.compile(Path.join(__dirname,'../dataparty'), true)
40+
const build = await service.compile(Path.join(__dirname,'/dataparty'), true)
4141

4242
debug('built', Object.keys(build))
4343

@@ -46,7 +46,11 @@ async function main(){
4646
sendFullErrors: true
4747
})
4848

49-
const host = new Dataparty.ServiceHost({runner, trust_proxy: true})
49+
const host = new Dataparty.ServiceHost({
50+
runner,
51+
trust_proxy: false,
52+
wsEnabled: true
53+
})
5054

5155
await party.start()
5256
await runner.start()

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@
102102
"uuidv4": "^6.2.12",
103103
"vm2": "^3.9.2",
104104
"websocket": "github:sevenbitbyte/WebSocket-Node#parcel-build",
105+
"ws": "^8.11.0",
105106
"zangodb": "github:sevenbitbyte/zangodb#hash-patch"
106107
},
107108
"devDependencies": {
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
const {URL} = require('url')
2+
const debug = require('debug')('dataparty.service-host-websocket')
3+
4+
const ws = require('ws')
5+
const WebSocketServer = ws.WebSocketServer
6+
7+
const WATCHDOG_INTERVAL = 30000
8+
9+
class ServiceHostWebsocket{
10+
11+
constructor({trust_proxy, port, path, runner, wsSettings}){
12+
this.port = port
13+
this.path = path || '/ws'
14+
this.runner = runner
15+
this.trust_proxy = trust_proxy
16+
this.wsSettings = wsSettings || {}
17+
18+
this.ws = null
19+
}
20+
21+
start(server){
22+
23+
debug('start')
24+
25+
let settings = {}
26+
27+
if(!server){
28+
settings = { port: this.port, ...this.wsSettings }
29+
30+
} else {
31+
settings = { noServer: true, ...this.wsSettings }
32+
}
33+
34+
debug('\t','settings', settings)
35+
36+
this.ws = new WebSocketServer(settings)
37+
38+
this.ws.on('connection', this.handleConnection.bind(this))
39+
40+
server.on('upgrade', this.handleUpgrade.bind(this))
41+
42+
this.watchdog = setInterval(this.checkClients.bind(this), WATCHDOG_INTERVAL)
43+
}
44+
45+
handleUpgrade(request, socket, head){
46+
47+
debug('handleUpgrade', request.url)
48+
49+
if(request.url == this.path){
50+
this.doUpgrade(request, socket, head)
51+
} else {
52+
socket.destroy()
53+
}
54+
}
55+
56+
doUpgrade(request, socket, head){
57+
debug('doUpgrade')
58+
this.ws.handleUpgrade(request, socket, head, (conn)=>{
59+
debug('head', head)
60+
this.ws.emit('connection', conn, request)
61+
})
62+
}
63+
64+
getConnectionIp(req){
65+
if(this.trust_proxy){
66+
const ip = req.headers['x-forwarded-for'].split(',')[0].trim();
67+
68+
return ip
69+
}
70+
71+
return req.socket.remoteAddress
72+
}
73+
74+
handleConnection(conn, req){
75+
conn.ip = this.getConnectionIp(req)
76+
77+
debug('handleConnection - ', conn.ip)
78+
79+
conn.isAlive = true
80+
conn.on('pong', ()=>{
81+
conn.isAlive = true
82+
})
83+
}
84+
85+
checkClients(){
86+
debug('checkClients')
87+
this.ws.clients.forEach( (conn)=>{
88+
if (conn.isAlive === false){
89+
debug('\t','terminating client')
90+
return conn.terminate()
91+
}
92+
93+
conn.isAlive = false
94+
conn.ping()
95+
});
96+
}
97+
}
98+
99+
module.exports = ServiceHostWebsocket

src/service/service-host.js

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ const bodyParser = require('body-parser')
99
const expressListRoutes = require('express-list-routes')
1010
const debug = require('debug')('dataparty.service-host')
1111

12+
const ServiceHostWebsocket = require('./service-host-websocket')
13+
1214
const Pify = async (p)=>{
1315
return await p
1416
}
@@ -22,6 +24,8 @@ class ServiceHost {
2224
cors = {},
2325
trust_proxy = false,
2426
listenUri = 'http://0.0.0.0:4001',
27+
wsEnabled = true,
28+
wsPort = null,
2529
runner
2630
}={}){
2731
this.apiApp = express()
@@ -45,11 +49,20 @@ class ServiceHost {
4549
this.errorHandlerTimer = null
4650

4751
this.apiServerUri = new URL(listenUri)
52+
53+
if(wsEnabled){
54+
this.wsServer = new ServiceHostWebsocket({
55+
trust_proxy,
56+
port: wsPort,
57+
runner: this.runner
58+
})
59+
}
60+
4861
}
4962

5063
async start(){
5164

52-
debug('starting server')
65+
debug('starting server', this.apiServerUri.toString())
5366

5467
if(this.apiServer==null){
5568
debug('adding default endpoints')
@@ -81,7 +94,6 @@ class ServiceHost {
8194

8295
}
8396

84-
debug('server listening', this.apiServerUri.toString())
8597

8698
await new Promise((resolve,reject)=>{
8799
this.apiServer.listen(listenPort, listenHost, resolve)
@@ -94,6 +106,11 @@ class ServiceHost {
94106

95107
debug('server listening')
96108
debug('address', this.apiServer.address())
109+
110+
if(this.wsServer && this.apiServer){
111+
debug('starting websocket')
112+
this.wsServer.start(this.apiServer)
113+
}
97114
}
98115

99116
async stop(){

0 commit comments

Comments
 (0)