11const debug = require ( 'debug' ) ( 'dataparty.comms.peercomms' )
2-
2+ const uuidv4 = require ( 'uuid/v4' )
33const HttpMocks = require ( 'node-mocks-http' )
44
55const SocketOp = require ( './op/socket-op' )
@@ -22,6 +22,7 @@ class PeerComms extends SocketComms {
2222 constructor ( { remoteIdentity, discoverRemoteIdentity, host, party, socket, ...options } ) {
2323 super ( { remoteIdentity, discoverRemoteIdentity, party, ...options } )
2424
25+ this . uuid = uuidv4 ( )
2526 this . socket = socket || null
2627
2728 this . host = host //! Is comms host\
@@ -166,7 +167,7 @@ class PeerComms extends SocketComms {
166167 await this . socketInit ( )
167168 }
168169
169- this . socket . on ( 'close' , this . onclose . bind ( this ) )
170+ this . socket . on ( 'close' , this . close . bind ( this ) )
170171
171172 if ( this . host ) {
172173 debug ( 'host mode comms' )
@@ -186,12 +187,21 @@ class PeerComms extends SocketComms {
186187 }
187188
188189 async stop ( ) {
190+ debug ( 'stop' )
189191 this . close ( )
190192 }
191193
192- close ( ) {
194+ async close ( ) {
195+ debug ( 'close' )
196+
197+ if ( this . party . topics ) {
198+ await this . party . topics . destroyNode ( this )
199+ }
200+
193201 debug ( 'closing connection' )
194202 this . socket . destroy ( )
203+
204+ this . onclose ( )
195205 }
196206
197207
@@ -210,7 +220,34 @@ class PeerComms extends SocketComms {
210220
211221 return this . handleCallOp ( op )
212222
223+ } else if ( op . op === 'advertise' && this . state === PeerComms . STATES . AUTHED ) {
224+
225+ if ( this . party . topics ) {
226+ await this . party . topics . advertise ( this , op . topic )
227+ }
228+
229+ } else if ( op . op === 'subscribe' && this . state === PeerComms . STATES . AUTHED ) {
230+
231+ if ( this . party . topics ) {
232+ await this . party . topics . subscribe ( this , op . topic )
233+ }
234+
235+ } else if ( op . op === 'unsubscribe' && this . state === PeerComms . STATES . AUTHED ) {
236+
237+ if ( this . party . topics ) {
238+ await this . party . topics . unsubscribe ( this , op . topic )
239+ }
240+
241+ } else if ( op . op === 'publish' && this . state === PeerComms . STATES . AUTHED ) {
242+
243+ if ( this . party . topics ) {
244+ await this . party . topics . publish ( this , op . topic , op . msg )
245+ }
246+
213247 } else {
248+ debug ( '⚠️ op not implemented ⚠️' )
249+ debug ( op . input )
250+
214251 op . result = 'not implemented'
215252 op . setState ( HostOp . STATES . Finished_Fail )
216253 }
0 commit comments