@@ -2,6 +2,8 @@ import { Namespace, RemoteSocket, Server, Socket } from "socket.io";
22import {
33 ClientEvents ,
44 Feature ,
5+ NamespaceDetails ,
6+ NamespaceEvent ,
57 SerializedSocket ,
68 ServerEvents ,
79} from "./typed-events" ;
@@ -46,6 +48,10 @@ interface InstrumentOptions {
4648 * The store
4749 */
4850 store : Store ;
51+ /**
52+ * Whether to send all events or only aggregated events to the UI, for performance purposes.
53+ */
54+ mode : "development" | "production" ;
4955}
5056
5157const initAuthenticationMiddleware = (
@@ -120,16 +126,26 @@ const initStatsEmitter = (
120126 pid : process . pid ,
121127 } ;
122128
129+ const io = adminNamespace . server ;
130+
123131 const emitStats = ( ) => {
124132 debug ( "emit stats" ) ;
125- // @ts -ignore private reference
126- const clientsCount = adminNamespace . server . engine . clientsCount ;
133+ const namespaces : NamespaceDetails [ ] = [ ] ;
134+ io . _nsps . forEach ( ( namespace ) => {
135+ namespaces . push ( {
136+ name : namespace . name ,
137+ socketsCount : namespace . sockets . size ,
138+ } ) ;
139+ } ) ;
127140
128141 adminNamespace . emit (
129142 "server_stats" ,
130143 Object . assign ( { } , baseStats , {
131144 uptime : process . uptime ( ) ,
132- clientsCount,
145+ clientsCount : io . engine . clientsCount ,
146+ pollingClientsCount : io . _pollingClientsCount ,
147+ aggregatedEvents : io . _eventBuffer . getValuesAndClear ( ) ,
148+ namespaces,
133149 } )
134150 ) ;
135151 } ;
@@ -295,7 +311,7 @@ const registerFeatureHandlers = (
295311 }
296312} ;
297313
298- const registerListeners = (
314+ const registerVerboseListeners = (
299315 adminNamespace : Namespace < { } , ServerEvents > ,
300316 nsp : Namespace
301317) => {
@@ -407,6 +423,81 @@ const serializeData = (data: any) => {
407423 return obj ;
408424} ;
409425
426+ declare module "socket.io" {
427+ interface Server {
428+ _eventBuffer : EventBuffer ;
429+ _pollingClientsCount : number ;
430+ }
431+ }
432+
433+ class EventBuffer {
434+ private buffer : Map < string , NamespaceEvent > = new Map ( ) ;
435+
436+ public push ( type : string , subType ?: string , count = 1 ) {
437+ const timestamp = new Date ( ) ;
438+ timestamp . setMilliseconds ( 0 ) ;
439+ const key = `${ timestamp . getTime ( ) } ;${ type } ;${ subType } ` ;
440+ if ( this . buffer . has ( key ) ) {
441+ this . buffer . get ( key ) ! . count += count ;
442+ } else {
443+ this . buffer . set ( key , {
444+ timestamp : timestamp . getTime ( ) ,
445+ type,
446+ subType,
447+ count,
448+ } ) ;
449+ }
450+ }
451+
452+ public getValuesAndClear ( ) {
453+ const values = [ ...this . buffer . values ( ) ] ;
454+ this . buffer . clear ( ) ;
455+ return values ;
456+ }
457+ }
458+
459+ const registerEngineListeners = ( io : Server ) => {
460+ io . _eventBuffer = new EventBuffer ( ) ;
461+ io . _pollingClientsCount = 0 ;
462+
463+ io . engine . on ( "connection" , ( rawSocket : any ) => {
464+ io . _eventBuffer . push ( "rawConnection" ) ;
465+
466+ if ( rawSocket . transport . name === "polling" ) {
467+ io . _pollingClientsCount ++ ;
468+
469+ const decr = ( ) => {
470+ io . _pollingClientsCount -- ;
471+ } ;
472+
473+ rawSocket . once ( "upgrade" , ( ) => {
474+ rawSocket . removeListener ( "close" , decr ) ;
475+ decr ( ) ;
476+ } ) ;
477+
478+ rawSocket . once ( "close" , decr ) ;
479+ }
480+
481+ rawSocket . on ( "packetCreate" , ( { data } : { data : string | Buffer } ) => {
482+ if ( data ) {
483+ io . _eventBuffer . push ( "packetsOut" , undefined ) ;
484+ io . _eventBuffer . push ( "bytesOut" , undefined , Buffer . byteLength ( data ) ) ;
485+ }
486+ } ) ;
487+
488+ rawSocket . on ( "packet" , ( { data } : { data : string | Buffer } ) => {
489+ if ( data ) {
490+ io . _eventBuffer . push ( "packetsIn" , undefined ) ;
491+ io . _eventBuffer . push ( "bytesIn" , undefined , Buffer . byteLength ( data ) ) ;
492+ }
493+ } ) ;
494+
495+ rawSocket . on ( "close" , ( reason : string ) => {
496+ io . _eventBuffer . push ( "rawDisconnection" , reason ) ;
497+ } ) ;
498+ } ) ;
499+ } ;
500+
410501export function instrument ( io : Server , opts : Partial < InstrumentOptions > ) {
411502 const options : InstrumentOptions = Object . assign (
412503 {
@@ -415,6 +506,7 @@ export function instrument(io: Server, opts: Partial<InstrumentOptions>) {
415506 readonly : false ,
416507 serverId : undefined ,
417508 store : new InMemoryStore ( ) ,
509+ mode : process . env . NODE_ENV || "development" ,
418510 } ,
419511 opts
420512 ) ;
@@ -428,22 +520,36 @@ export function instrument(io: Server, opts: Partial<InstrumentOptions>) {
428520 initAuthenticationMiddleware ( adminNamespace , options ) ;
429521
430522 const supportedFeatures = options . readonly ? [ ] : detectSupportedFeatures ( io ) ;
523+ supportedFeatures . push ( Feature . AGGREGATED_EVENTS ) ;
524+ const isDevelopmentMode = options . mode === "development" ;
525+ if ( isDevelopmentMode ) {
526+ supportedFeatures . push ( Feature . ALL_EVENTS ) ;
527+ }
431528 debug ( "supported features: %j" , supportedFeatures ) ;
432529
433- initStatsEmitter ( adminNamespace , options . serverId ) ;
434-
435530 adminNamespace . on ( "connection" , async ( socket ) => {
436531 registerFeatureHandlers ( io , socket , supportedFeatures ) ;
437532
438533 socket . emit ( "config" , {
439534 supportedFeatures,
440535 } ) ;
441536
442- socket . emit ( "all_sockets" , await fetchAllSockets ( io ) ) ;
537+ if ( isDevelopmentMode ) {
538+ socket . emit ( "all_sockets" , await fetchAllSockets ( io ) ) ;
539+ }
443540 } ) ;
444541
445- io . _nsps . forEach ( ( nsp ) => registerListeners ( adminNamespace , nsp ) ) ;
446- io . on ( "new_namespace" , ( nsp ) => registerListeners ( adminNamespace , nsp ) ) ;
542+ registerEngineListeners ( io ) ;
543+
544+ if ( isDevelopmentMode ) {
545+ const registerNamespaceListeners = ( nsp : Namespace ) => {
546+ registerVerboseListeners ( adminNamespace , nsp ) ;
547+ } ;
548+ io . _nsps . forEach ( registerNamespaceListeners ) ;
549+ io . on ( "new_namespace" , registerNamespaceListeners ) ;
550+ }
551+
552+ initStatsEmitter ( adminNamespace , options . serverId ) ;
447553}
448554
449555export { InMemoryStore , RedisStore } from "./stores" ;
0 commit comments