Improved packet writing, should improve responsiveness when many players are online

This commit is contained in:
Ceikry 2023-01-30 23:21:37 +00:00 committed by Ryan
parent e8c22249a2
commit cc1eca5237
2 changed files with 19 additions and 45 deletions

View file

@ -1,21 +1,15 @@
package core.net.packet package core.net.packet
import core.api.TickListener import core.api.tryPop
import core.net.packet.out.* import core.net.packet.out.*
import core.tools.SystemLogger import core.tools.SystemLogger
import java.lang.IndexOutOfBoundsException import java.io.PrintWriter
import java.io.StringWriter
import java.util.* import java.util.*
import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.locks.ReentrantLock
import kotlin.NoSuchElementException
class PacketWriteQueue : TickListener {
override fun tick() {
flush()
}
class PacketWriteQueue {
companion object { companion object {
private val queueLock = ReentrantLock()
private val packetsToQueue = LinkedList<QueuedPacket<*>?>()
private val packetsToWrite = LinkedList<QueuedPacket<*>?>() private val packetsToWrite = LinkedList<QueuedPacket<*>?>()
@JvmStatic @JvmStatic
@ -33,46 +27,24 @@ class PacketWriteQueue : TickListener {
@JvmStatic @JvmStatic
fun <T> push(packet: OutgoingPacket<T>, context: T) { fun <T> push(packet: OutgoingPacket<T>, context: T) {
if (queueLock.isHeldByCurrentThread) packetsToWrite.add(QueuedPacket(packet, context))
packetsToQueue.add(QueuedPacket(packet, context))
else
packetsToWrite.add(QueuedPacket(packet, context))
} }
@JvmStatic @JvmStatic
fun flush() { fun flush() {
queueLock.lock() var countThisCycle = packetsToWrite.size
val sw = StringWriter()
var hasEnded = false val pw = PrintWriter(sw)
while (!hasEnded) { while (countThisCycle-- > 0) {
val pkt = packetsToWrite.tryPop(null) ?: continue
try { try {
val packet = packetsToWrite.pop() write(pkt.out, pkt.context)
write(packet?.out ?: continue, packet.context ?: continue)
} catch (e: NoSuchElementException) {
hasEnded = true
}
}
if (packetsToWrite.isNotEmpty()) {
SystemLogger.logWarn(this::class.java, "Packet queue was NOT empty! Remaining packets: ${packetsToWrite.size}")
try {
for (pkt: QueuedPacket<*>? in packetsToWrite) SystemLogger.logWarn(this::class.java, "${pkt?.out?.javaClass?.simpleName ?: "NULL"} <- ${pkt?.context ?: "NULL"}")
} catch (ignored: NullPointerException) {
//do nothing, we don't care, this can happen when everything is working as intended.
} catch (ignored: IndexOutOfBoundsException) {
//do nothing, we don't care, this can happen when everything is working as intended.
} catch (e: Exception) { } catch (e: Exception) {
e.printStackTrace() e.printStackTrace(pw)
} finally { SystemLogger.logErr(this::class.java, "Error flushing packet ${pkt.out::class.java}: $sw")
packetsToWrite.clear() continue
} }
} }
queueLock.unlock()
while (packetsToQueue.isNotEmpty()) {
packetsToWrite.add(packetsToQueue.pop())
}
} }
@Suppress("UNCHECKED_CAST") @Suppress("UNCHECKED_CAST")
@ -80,8 +52,7 @@ class PacketWriteQueue : TickListener {
val pack = out as? OutgoingPacket<T> val pack = out as? OutgoingPacket<T>
val ctx = context as? T val ctx = context as? T
if (pack == null || ctx == null) { if (pack == null || ctx == null) {
SystemLogger.logWarn(this::class.java, "Failed packet casting") throw IllegalStateException("Failed packet casting")
return
} }
pack.send(ctx) pack.send(ctx)
} }

View file

@ -11,6 +11,7 @@ import core.game.world.GameWorld
import core.game.world.repository.Repository import core.game.world.repository.Repository
import core.game.world.update.UpdateSequence import core.game.world.update.UpdateSequence
import core.net.packet.PacketProcessor import core.net.packet.PacketProcessor
import core.net.packet.PacketWriteQueue
import core.tools.colorize import core.tools.colorize
import java.lang.Long.max import java.lang.Long.max
import java.text.SimpleDateFormat import java.text.SimpleDateFormat
@ -84,6 +85,7 @@ class MajorUpdateWorker {
if (!skipPulseUpdate) { if (!skipPulseUpdate) {
GameWorld.Pulser.updateAll() GameWorld.Pulser.updateAll()
} }
GameWorld.tickListeners.forEach { it.tick() }
try { try {
sequence.start() sequence.start()
@ -96,9 +98,10 @@ class MajorUpdateWorker {
GameWorld.pulse() GameWorld.pulse()
//disconnect all players waiting to be disconnected //disconnect all players waiting to be disconnected
Repository.disconnectionQueue.update() Repository.disconnectionQueue.update()
GameWorld.tickListeners.forEach { it.tick() }
//tick all manager plugins //tick all manager plugins
Managers.tick() Managers.tick()
PacketWriteQueue.flush()
} }
fun start() { fun start() {