Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions packages/core/src/Networking/Client/NetworkManager.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { ClientChannel } from '@geckos.io/client'
import type { ClientOptions } from '@geckos.io/common/lib/types'

import type { CommandPacket } from '../Server/Commands'
import { geckos } from '@geckos.io/client'
import Game from '../../BaseGame'
import EventEmitter from '../../Utils/EventEmitter'
Expand All @@ -13,6 +14,15 @@ export default class NetworkManager extends EventEmitter {
pingNow = 0
private _connected = false

private currentSequenceId = 0

/**
* This command queue is used for replaying packets locally.
* When ServerCommand.SV_STATE comes in we drop anything thats below
* the lastProcessedSequenceId
*/
private localCommandQueue: CommandPacket<any>[] = []

constructor(options: ClientOptions) {
super()
// eslint-disable-next-line ts/no-this-alias
Expand Down Expand Up @@ -60,9 +70,24 @@ export default class NetworkManager extends EventEmitter {
instance = undefined
}

protected onPong() {}
/**
* All commands need to go through here to get the
* sequenceId assigned and add it to the queue for local replay
*/
public sendCommand(commandPacket: CommandPacket<any>) {
commandPacket.sequenceId = this.currentSequenceId++

this.localCommandQueue.push(commandPacket)
this.socket.emit('command', commandPacket)
}

public dropCommandsAtSequenceId(sequenceId: number) {
this.localCommandQueue = this.localCommandQueue.filter(packet => packet.sequenceId! > sequenceId)
}

protected onPong() { }

protected onDisconnect() {}
protected onDisconnect() { }

protected pingCheck() {
this.pingNow = performance.now()
Expand Down
18 changes: 18 additions & 0 deletions packages/core/src/Networking/Entities/Player.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,28 @@ export default abstract class Player extends NetworkedLivingActor {

trackedEntities = new Set<string>()

private _lastProcessedSequenceId = 0

get lastProcessedSequenceId() {
return this._lastProcessedSequenceId
}

updateLastProcessedSequenceId(sequenceId: number) {
this._lastProcessedSequenceId = sequenceId
}

networkedFieldCallbacks(): Record<string, (value: unknown) => void> {
return {
...super.networkedFieldCallbacks(),
lastProcessedSequenceId: sequenceId => this._lastProcessedSequenceId = sequenceId as number,
}
}

public serialize() {
return {
...super.serialize(),
$typeName: this.$typeName,
lastProcessedSequenceId: this.lastProcessedSequenceId,
}
}
}
18 changes: 13 additions & 5 deletions packages/core/src/Networking/Server/Commands.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
import type { ChannelId } from '@geckos.io/client'

export enum ServerCommand {
SV_PONG = 'sv_pong',
SV_STATE = 'sv_state',
SV_REMOVE_ENTITY = 'sv_remove_entity',
}
export enum ClientCommand {
CL_PING = 'cl_ping',
}

export type Command = ClientCommand | ServerCommand

export interface CommandPacket<T extends ServerCommand> {
cmd: T
data: unknown
export interface CommandPacket<T extends Command | string> {
type: T
sequenceId?: number
}

export type SV_TEST = CommandPacket<ServerCommand.SV_STATE> & {
boo: string
export interface IncomingClientCommandPacket<T extends Command> extends CommandPacket<T> {
playerId: ChannelId
}
48 changes: 48 additions & 0 deletions packages/core/src/Networking/Server/LatencySimulator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import type winston from 'winston'

export interface LatencySimulatorOptions {
/** Minimum latency to simulate */
fixedLatency?: number

/** How much ms jitter should we include */
jitter?: number

/**
* How much percent packet loss to simulate between 0 - 1
* 1 = 100%
* 0 = 0%
*/
packetLoss?: number
}

export default class LatencySimulator {
private options: Required<LatencySimulatorOptions>

constructor(options: LatencySimulatorOptions, logger: winston.Logger) {
this.options = {
fixedLatency: 0,
jitter: 0,
packetLoss: 0,
...options, // Spread incoming over the defaults
}

logger.warn(`RUNNING WITH LATENCY SIMULATION: latency ${this.options.fixedLatency}ms, jitter: ${this.options.jitter}ms, packet loss: ${this.options.packetLoss}%`)
}

/**
* Calculate our latency items and then
* run the callback as appropriate
*/
public handle(cb: () => void) {
if (Math.random() < this.options.packetLoss) {
// Drop the packet completely
return
}

const latency = this.options.fixedLatency + (Math.random() - 0.5) * this.options.jitter

setTimeout(() => {
cb()
}, latency)
}
}
65 changes: 59 additions & 6 deletions packages/core/src/Networking/Server/Server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,28 @@ import type winston from 'winston'
import type GameObject from '../../World/GameObject'
import type Player from '../Entities/Player'
import type NetworkedActor from '../NetworkedActor'
import type { ClientCommand, CommandPacket, IncomingClientCommandPacket } from './Commands'
import type { LatencySimulatorOptions } from './LatencySimulator'
import { Buffer } from 'node:buffer'
import geckos from '@geckos.io/server'
import express from 'express'
import BaseGame from '../../BaseGame'
import { ServerCommand } from './Commands'
import LatencySimulator from './LatencySimulator'
import BandwidthTracker from './Stats/BandwidthTracker'
import CpuTracker from './Stats/CpuTracker'
import ServerWorld from './World'

let instance: Server<GameObject>

export interface ServerOptions {
latencySimulation?: LatencySimulatorOptions
}

export default abstract class Server<TClient extends GameObject> {
game: BaseGame

private commandBuffer: object[] = []
private commandBuffer: IncomingClientCommandPacket<ClientCommand>[] = []

gameSocket: GeckosServer
httpServer: Express = express()
Expand All @@ -31,11 +38,12 @@ export default abstract class Server<TClient extends GameObject> {

bandwidthTracker = new BandwidthTracker()
private cpuTracker: CpuTracker
private latencySimulator?: LatencySimulator

protected abstract onConnection(channel: ServerChannel): TClient
protected abstract onCommand(command: any, delta: number): void

constructor(logger: winston.Logger, phyicsWorld?: RAPIER.World) {
constructor(logger: winston.Logger, phyicsWorld?: RAPIER.World, options?: ServerOptions) {
instance = this as unknown as Server<GameObject>

this.game = new BaseGame(logger, phyicsWorld)
Expand All @@ -45,6 +53,10 @@ export default abstract class Server<TClient extends GameObject> {
this.logger = logger
this.cpuTracker = new CpuTracker(this.logger)

if (options?.latencySimulation) {
this.latencySimulator = new LatencySimulator(options.latencySimulation, this.logger)
}

this.gameSocket.onConnection((channel) => {
this.logger.info(`Connected: ${channel.id}`)

Expand All @@ -54,7 +66,14 @@ export default abstract class Server<TClient extends GameObject> {
this.logger.info(`${(this.game.world as ServerWorld).players.length} total players connected`)

channel.on('ping', () => {
channel.emit('pong')
const pong = () => channel.emit('pong')

if (this.latencySimulator) {
this.latencySimulator.handle(pong)
}
else {
pong()
}
})

channel.onDisconnect(() => {
Expand Down Expand Up @@ -110,7 +129,7 @@ export default abstract class Server<TClient extends GameObject> {
private bufferIncomingCommand(channel: ServerChannel, command: Data) {
this.commandBuffer.push({
playerId: channel.id!,
...(command) as object,
...(command) as CommandPacket<ClientCommand>,
})
}

Expand Down Expand Up @@ -175,7 +194,15 @@ export default abstract class Server<TClient extends GameObject> {
}

this.bandwidthTracker.recordSent('server', Buffer.byteLength(JSON.stringify(state)))
con.channel.emit(ServerCommand.SV_STATE, state)

const SendState = () => con.channel.emit(ServerCommand.SV_STATE, state)

if (this.latencySimulator) {
this.latencySimulator.handle(SendState)
}
else {
SendState()
}
})

// Mark entities as synced ONLY if they were actually sent to at least one client
Expand All @@ -189,10 +216,36 @@ export default abstract class Server<TClient extends GameObject> {
abstract getStateSyncDistance(): number

private runThroughBuffer() {
/**
* Due to network latency packets could come in different orders. First step is sort by sequence Id
*
* This currently will move recently connected clients to the
* front of the tick to be processed. Not really an issue at the moment but if it does
* turn out to be an issue then we need to sort the packets by playerId too.
*
* TODO fix ordering to sort packets only for player ids.
* If in an FPS player1 shoots first but this sorting puts player2 shot to be
* ticked over first because their sequenceId is lower its not really fair
*/
this.commandBuffer.sort((a, b) => a.sequenceId! - b.sequenceId!)

while (this.commandBuffer.length > 0) {
const currentCommand = this.commandBuffer[0]

this.onCommand(currentCommand, this.calculateDelta())
const process = () => this.onCommand(currentCommand, this.calculateDelta())

if (this.latencySimulator) {
this.latencySimulator.handle(process)
}
else {
process()
}

/**
* Update the players lastProcessedSequenceId after the command has executed
*/
const player = this.game.world.entities.items.get(currentCommand.playerId!) as Player
player.updateLastProcessedSequenceId(currentCommand.sequenceId!)

this.commandBuffer.shift()
}
Expand Down
73 changes: 73 additions & 0 deletions packages/core/tests/Networking/Client/NetworkManager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,4 +125,77 @@ describe('networkManager', () => {
expect(mockSocket.emit).toHaveBeenCalledWith('ping')
vi.useRealTimers()
})

describe('command queue', () => {
it('sendCommand assigns sequential sequenceIds starting at 0', () => {
const nm = new NetworkManager({})
const a = { type: 'cmd_a' as any }
const b = { type: 'cmd_b' as any }
const c = { type: 'cmd_c' as any }

nm.sendCommand(a)
nm.sendCommand(b)
nm.sendCommand(c)

expect(a.sequenceId).toBe(0)
expect(b.sequenceId).toBe(1)
expect(c.sequenceId).toBe(2)
})

it('sendCommand pushes the packet to the local queue and emits it on the socket', () => {
const nm = new NetworkManager({})
const packet = { type: 'cmd' as any }

nm.sendCommand(packet)

const queue = (nm as any).localCommandQueue
expect(queue).toHaveLength(1)
expect(queue[0]).toBe(packet)
expect(mockSocket.emit).toHaveBeenCalledWith('command', packet)
})

it('dropCommandsAtSequenceId removes acknowledged commands with sequenceId <= given', () => {
const nm = new NetworkManager({})
nm.sendCommand({ type: 'a' as any })
nm.sendCommand({ type: 'b' as any })
nm.sendCommand({ type: 'c' as any })
nm.sendCommand({ type: 'd' as any })

nm.dropCommandsAtSequenceId(2)

const queue = (nm as any).localCommandQueue as { type: string, sequenceId: number }[]
expect(queue.map(p => p.sequenceId)).toEqual([3])
})

it('dropCommandsAtSequenceId keeps commands with sequenceId greater than given', () => {
const nm = new NetworkManager({})
nm.sendCommand({ type: 'a' as any })
nm.sendCommand({ type: 'b' as any })
nm.sendCommand({ type: 'c' as any })

nm.dropCommandsAtSequenceId(0)

const queue = (nm as any).localCommandQueue as { sequenceId: number }[]
expect(queue.map(p => p.sequenceId)).toEqual([1, 2])
})

it('dropCommandsAtSequenceId clears the queue when the ack covers every command', () => {
const nm = new NetworkManager({})
nm.sendCommand({ type: 'a' as any })
nm.sendCommand({ type: 'b' as any })
nm.sendCommand({ type: 'c' as any })

nm.dropCommandsAtSequenceId(10)

const queue = (nm as any).localCommandQueue as unknown[]
expect(queue).toHaveLength(0)
})

it('dropCommandsAtSequenceId is a no-op on an empty queue', () => {
const nm = new NetworkManager({})

expect(() => nm.dropCommandsAtSequenceId(5)).not.toThrow()
expect((nm as any).localCommandQueue).toHaveLength(0)
})
})
})
Loading
Loading