mirror of
https://github.com/neoStudiosLCE/neoLegacy.git
synced 2026-06-09 23:52:56 +00:00
723 lines
26 KiB
Python
723 lines
26 KiB
Python
"""
|
|
LCE Server Stress Testing Tool
|
|
|
|
Rapid bot connect/disconnect cycles to stress test thread safety
|
|
of the server's player list, socket handling, and movement processing.
|
|
|
|
Usage:
|
|
python stress_test.py <host> <port> [options]
|
|
|
|
Options:
|
|
--bots N Max concurrent bots (default: 8)
|
|
--cycles N Number of connect/disconnect cycles (default: 50, 0=infinite)
|
|
--hold MIN MAX Hold time range in seconds before disconnect (default: 2 10)
|
|
--ramp SECS Delay between spawning each bot (default: 0.5)
|
|
--move Bots send movement packets while connected
|
|
--burst N Spawn N bots simultaneously (default: 1)
|
|
--duration SECS Run for N seconds then stop (default: 0, unlimited)
|
|
--quiet Suppress per-bot log messages
|
|
"""
|
|
|
|
import argparse
|
|
import logging
|
|
import math
|
|
import os
|
|
import random
|
|
import secrets
|
|
import socket
|
|
import struct
|
|
import sys
|
|
import threading
|
|
import time
|
|
from dataclasses import dataclass, field
|
|
|
|
# Import protocol code from the server-monitor tool.
|
|
# Search sibling tools directory first, then the itsRevela tools directory.
|
|
_TOOLS = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
_SEARCH_PATHS = [
|
|
os.path.join(_TOOLS, "server-monitor"),
|
|
os.path.join(os.path.dirname(_TOOLS), "server-monitor"),
|
|
os.path.normpath(os.path.join(_TOOLS, "..", "..", "..", "itsRevela", "tools", "server-monitor")),
|
|
]
|
|
for _p in _SEARCH_PATHS:
|
|
if os.path.isdir(_p) and _p not in sys.path:
|
|
sys.path.append(_p)
|
|
break
|
|
|
|
from protocol import frame_packet, DataInputStream, DataOutputStream
|
|
from packets import (
|
|
PRE_LOGIN, LOGIN, KEEP_ALIVE, CUSTOM_PAYLOAD, DISCONNECT,
|
|
build_prelogin, build_login, build_keep_alive,
|
|
build_custom_payload, parse_prelogin_response,
|
|
parse_login_response_stream, DISCONNECT_REASONS,
|
|
)
|
|
|
|
logger = logging.getLogger("stress")
|
|
|
|
SMALLID_REJECT = 0xFF
|
|
|
|
CIPHER_KEY_PREFIX = (
|
|
b"\xFA\x00\x07"
|
|
b"\x00\x4D\x00\x43\x00\x7C\x00\x43\x00\x4B\x00\x65\x00\x79"
|
|
b"\x00\x20"
|
|
)
|
|
CIPHER_KEY_TOTAL = len(CIPHER_KEY_PREFIX) + 32
|
|
|
|
CIPHER_ON_PATTERN = (
|
|
b"\xFA\x00\x06"
|
|
b"\x00\x4D\x00\x43\x00\x7C\x00\x43\x00\x4F\x00\x6E"
|
|
b"\x00\x00"
|
|
)
|
|
|
|
CIPHER_KEY_CHANNEL = "MC|CKey"
|
|
CIPHER_ACK_CHANNEL = "MC|CAck"
|
|
CIPHER_ON_CHANNEL = "MC|COn"
|
|
IDENTITY_TOKEN_ISSUE = "MC|CTIssue"
|
|
IDENTITY_TOKEN_CHALLENGE = "MC|CTChallenge"
|
|
IDENTITY_TOKEN_RESPONSE = "MC|CTResponse"
|
|
|
|
|
|
def _build_channel_pattern(channel: str) -> bytes:
|
|
result = b"\xFA"
|
|
result += struct.pack(">h", len(channel))
|
|
for ch in channel:
|
|
result += struct.pack(">H", ord(ch))
|
|
return result
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CipherState (AES-CTR, optional dependency)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class CipherState:
|
|
def __init__(self, key: bytes, iv: bytes):
|
|
from Crypto.Cipher import AES
|
|
from Crypto.Util import Counter
|
|
ctr = Counter.new(128, initial_value=int.from_bytes(iv, "big"))
|
|
self._cipher = AES.new(key, AES.MODE_CTR, counter=ctr)
|
|
|
|
def process(self, data: bytes) -> bytes:
|
|
return self._cipher.encrypt(data)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Statistics tracker
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@dataclass
|
|
class Stats:
|
|
lock: threading.Lock = field(default_factory=threading.Lock)
|
|
connects: int = 0
|
|
disconnects: int = 0
|
|
rejections: int = 0
|
|
errors: int = 0
|
|
moves_sent: int = 0
|
|
keepalives_sent: int = 0
|
|
start_time: float = 0.0
|
|
|
|
def summary(self) -> str:
|
|
elapsed = time.time() - self.start_time if self.start_time else 0
|
|
with self.lock:
|
|
return (
|
|
f"[{elapsed:.0f}s] "
|
|
f"connects={self.connects} "
|
|
f"disconnects={self.disconnects} "
|
|
f"rejections={self.rejections} "
|
|
f"errors={self.errors} "
|
|
f"moves={self.moves_sent} "
|
|
f"keepalives={self.keepalives_sent}"
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Movement packet builder
|
|
# ---------------------------------------------------------------------------
|
|
|
|
MOVE_PLAYER = 0x0D # MovePlayerPacket::PosRot — what we send AND what server sends for teleports.
|
|
|
|
def build_move_player(x: float, y: float, z: float,
|
|
yaw: float, pitch: float, on_ground: bool) -> bytes:
|
|
# Wire order matches MovePlayerPacket::PosRot::write: x, y (feet),
|
|
# yView (eye), z, yaw, pitch, flags. Server kicks for IllegalStance
|
|
# if (yView - y) is outside [0.1, 1.65], so feet must come first.
|
|
dos = DataOutputStream()
|
|
dos.write_double(x)
|
|
dos.write_double(y)
|
|
dos.write_double(y + 1.62)
|
|
dos.write_double(z)
|
|
dos.write_float(yaw)
|
|
dos.write_float(pitch)
|
|
dos.write_bool(on_ground)
|
|
return dos.getvalue()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Single bot stress connection
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class StressBot:
|
|
"""A bot that connects, optionally moves around, then disconnects."""
|
|
|
|
def __init__(self, name: str, host: str, port: int, xuid: int,
|
|
hold_min: float, hold_max: float, send_moves: bool,
|
|
stats: Stats, quiet: bool):
|
|
self.name = name
|
|
self.host = host
|
|
self.port = port
|
|
self.xuid = xuid
|
|
self.hold_min = hold_min
|
|
self.hold_max = hold_max
|
|
self.send_moves = send_moves
|
|
self.stats = stats
|
|
self.quiet = quiet
|
|
self._sock: socket.socket | None = None
|
|
self._recv_buf = bytearray()
|
|
self._scan_buf = bytearray()
|
|
self._recv_cipher: CipherState | None = None
|
|
self._send_cipher: CipherState | None = None
|
|
self._cipher_key = b""
|
|
self._cipher_iv = b""
|
|
self._identity_token = b""
|
|
self._entity_id = 0
|
|
self._running = True
|
|
# Server-tracked position. Initialized when server sends its first
|
|
# MovePlayer::PosRot teleport after login, and updated whenever the
|
|
# server teleports us (eg. plugin scatter, anti-cheat correction).
|
|
self._pos_x = 0.0
|
|
self._pos_y = 64.0
|
|
self._pos_z = 0.0
|
|
self._pos_initialized = False
|
|
|
|
def log(self, msg: str) -> None:
|
|
if not self.quiet:
|
|
logger.info(msg)
|
|
|
|
def run(self) -> bool:
|
|
"""Run one connect-hold-disconnect cycle. Returns True on success."""
|
|
try:
|
|
return self._do_cycle()
|
|
except Exception as e:
|
|
self.log(f"[{self.name}] error: {e}")
|
|
with self.stats.lock:
|
|
self.stats.errors += 1
|
|
return False
|
|
finally:
|
|
if self._sock:
|
|
try:
|
|
self._sock.close()
|
|
except OSError:
|
|
pass
|
|
self._sock = None
|
|
|
|
def _do_cycle(self) -> bool:
|
|
# Connect
|
|
self._sock = socket.create_connection((self.host, self.port), timeout=10)
|
|
self._sock.settimeout(5.0)
|
|
|
|
# SmallID
|
|
first_byte = self._recv_exact(1)
|
|
if first_byte[0] == SMALLID_REJECT:
|
|
reject_data = self._recv_exact(5)
|
|
reason_id = struct.unpack(">i", reject_data[1:5])[0]
|
|
reason = DISCONNECT_REASONS.get(reason_id, f"Unknown({reason_id})")
|
|
self.log(f"[{self.name}] rejected: {reason}")
|
|
with self.stats.lock:
|
|
self.stats.rejections += 1
|
|
return False
|
|
|
|
small_id = first_byte[0]
|
|
self.log(f"[{self.name}] assigned smallId={small_id}")
|
|
|
|
# PreLogin
|
|
self._send_packet(PRE_LOGIN, build_prelogin(self.name))
|
|
|
|
# Read PreLogin response
|
|
self._sock.settimeout(5.0)
|
|
if not self._read_until_packet(PRE_LOGIN, timeout=5.0):
|
|
self.log(f"[{self.name}] no PreLogin response")
|
|
with self.stats.lock:
|
|
self.stats.errors += 1
|
|
return False
|
|
|
|
# Login
|
|
self._send_packet(LOGIN, build_login(self.name, self.xuid))
|
|
|
|
# Read Login response
|
|
if not self._read_until_packet(LOGIN, timeout=5.0):
|
|
self.log(f"[{self.name}] no Login response")
|
|
with self.stats.lock:
|
|
self.stats.errors += 1
|
|
return False
|
|
|
|
with self.stats.lock:
|
|
self.stats.connects += 1
|
|
|
|
self.log(f"[{self.name}] connected (entityId={self._entity_id})")
|
|
|
|
# Cipher scan (3 second window)
|
|
self._sock.settimeout(1.0)
|
|
self._do_cipher_scan()
|
|
|
|
# Hold phase: stay connected, send keepalives and optional movement
|
|
hold_time = random.uniform(self.hold_min, self.hold_max)
|
|
hold_end = time.time() + hold_time
|
|
last_keepalive = time.time()
|
|
keepalive_counter = 0
|
|
|
|
while time.time() < hold_end and self._running:
|
|
# Drain incoming data
|
|
try:
|
|
chunk = self._sock.recv(65536)
|
|
if not chunk:
|
|
break
|
|
self._recv_buf.extend(chunk)
|
|
self._drain_frames()
|
|
except socket.timeout:
|
|
pass
|
|
except OSError:
|
|
break
|
|
|
|
now = time.time()
|
|
|
|
# Keepalive every 10s
|
|
if now - last_keepalive >= 10.0:
|
|
keepalive_counter += 1
|
|
self._send_packet(KEEP_ALIVE, build_keep_alive(keepalive_counter))
|
|
with self.stats.lock:
|
|
self.stats.keepalives_sent += 1
|
|
last_keepalive = now
|
|
|
|
# Movement packets every 50ms. We can't do real travel because
|
|
# the server's anti-cheat compares our claimed position against
|
|
# what its own physics computes, and we don't simulate collision
|
|
# or gravity. Instead we drift ±0.3 blocks from whatever
|
|
# position the server most recently teleported us to. To spread
|
|
# bots out, use the test plugin's /fktest scatter from in-game.
|
|
if self.send_moves and self._pos_initialized:
|
|
new_x = self._pos_x + random.uniform(-0.3, 0.3)
|
|
new_z = self._pos_z + random.uniform(-0.3, 0.3)
|
|
yaw = random.uniform(0, 360)
|
|
self._send_packet(MOVE_PLAYER,
|
|
build_move_player(new_x, self._pos_y, new_z, yaw, 0.0, True))
|
|
# Optimistically update; server will correct us via PosRot
|
|
# if it disagreed (eg. we drifted into a block).
|
|
self._pos_x = new_x
|
|
self._pos_z = new_z
|
|
with self.stats.lock:
|
|
self.stats.moves_sent += 1
|
|
time.sleep(0.05)
|
|
|
|
# Disconnect (just close the socket)
|
|
self.log(f"[{self.name}] disconnecting after {hold_time:.1f}s hold")
|
|
with self.stats.lock:
|
|
self.stats.disconnects += 1
|
|
return True
|
|
|
|
def _do_cipher_scan(self) -> None:
|
|
"""Wait for the cipher handshake to finish or up to ~4s.
|
|
|
|
Returns early once both keys are exchanged. The upper bound has to
|
|
cover the worst case where a stack of plaintext setup packets
|
|
(level info, scoreboard, initial chunks) sits in front of MC|CKey
|
|
in the recv buffer. The server's own cipher-handshake grace is
|
|
100 ticks (~5s).
|
|
"""
|
|
scan_start = time.time()
|
|
scan_buf = bytearray()
|
|
|
|
while time.time() - scan_start < 4.0 and self._running:
|
|
# _handle_custom_payload may have already activated cipher via
|
|
# the drain path inside _read_until_packet.
|
|
if self._cipher_key and self._recv_cipher:
|
|
return
|
|
|
|
try:
|
|
chunk = self._sock.recv(65536)
|
|
if not chunk:
|
|
return
|
|
self._recv_buf.extend(chunk)
|
|
except socket.timeout:
|
|
pass
|
|
except OSError:
|
|
return
|
|
|
|
# Also drain framed packets
|
|
self._drain_frames()
|
|
|
|
# Accumulate raw data for cipher pattern scanning
|
|
scan_buf.extend(self._recv_buf)
|
|
|
|
# Look for MC|CKey
|
|
if not self._cipher_key:
|
|
idx = scan_buf.find(CIPHER_KEY_PREFIX)
|
|
if idx >= 0 and idx + CIPHER_KEY_TOTAL <= len(scan_buf):
|
|
key_start = idx + len(CIPHER_KEY_PREFIX)
|
|
key_data = bytes(scan_buf[key_start:key_start + 32])
|
|
self._cipher_key = key_data[:16]
|
|
self._cipher_iv = key_data[16:32]
|
|
self.log(f"[{self.name}] got cipher key")
|
|
|
|
self._send_packet(CUSTOM_PAYLOAD,
|
|
build_custom_payload(CIPHER_ACK_CHANNEL))
|
|
iv_send = bytearray(self._cipher_iv)
|
|
iv_send[0] ^= 0x80
|
|
self._send_cipher = CipherState(self._cipher_key, bytes(iv_send))
|
|
|
|
del scan_buf[:idx + CIPHER_KEY_TOTAL]
|
|
|
|
# Look for MC|COn
|
|
if self._cipher_key and not self._recv_cipher:
|
|
idx = scan_buf.find(CIPHER_ON_PATTERN)
|
|
if idx >= 0:
|
|
self._recv_cipher = CipherState(self._cipher_key, self._cipher_iv)
|
|
self.log(f"[{self.name}] cipher active")
|
|
return
|
|
|
|
if not self._cipher_key:
|
|
self.log(f"[{self.name}] no cipher (server has it disabled)")
|
|
|
|
def _read_until_packet(self, expected_id: int, timeout: float) -> bool:
|
|
"""Read frames until we get the expected packet ID or timeout."""
|
|
deadline = time.time() + timeout
|
|
while time.time() < deadline:
|
|
try:
|
|
chunk = self._sock.recv(65536)
|
|
if not chunk:
|
|
return False
|
|
self._recv_buf.extend(chunk)
|
|
except socket.timeout:
|
|
continue
|
|
except OSError:
|
|
return False
|
|
|
|
while len(self._recv_buf) >= 4:
|
|
payload_len = struct.unpack(">I", self._recv_buf[:4])[0]
|
|
total = 4 + payload_len
|
|
if len(self._recv_buf) < total:
|
|
break
|
|
|
|
raw_payload = bytes(self._recv_buf[4:total])
|
|
del self._recv_buf[:total]
|
|
|
|
if self._recv_cipher:
|
|
raw_payload = self._recv_cipher.process(raw_payload)
|
|
|
|
if not raw_payload:
|
|
continue
|
|
|
|
packet_id = raw_payload[0]
|
|
data = raw_payload[1:]
|
|
|
|
if packet_id == expected_id:
|
|
if packet_id == PRE_LOGIN:
|
|
try:
|
|
parsed = parse_prelogin_response(data)
|
|
self.log(f"[{self.name}] PreLogin: {parsed['player_count']} online")
|
|
except Exception:
|
|
pass
|
|
return True
|
|
elif packet_id == LOGIN:
|
|
try:
|
|
dis = DataInputStream(data)
|
|
parsed = parse_login_response_stream(dis)
|
|
self._entity_id = parsed["entity_id"]
|
|
except Exception:
|
|
pass
|
|
return True
|
|
|
|
# Handle cipher handshake during the login wait. With
|
|
# require-secure-client, the server holds the Login response
|
|
# behind the security gate until cipher activates, so the
|
|
# gate-bypass MC|CKey/MC|COn frames arrive before LOGIN.
|
|
# Dropping them here would deadlock both sides.
|
|
elif packet_id == CUSTOM_PAYLOAD:
|
|
self._handle_custom_payload(data)
|
|
|
|
elif packet_id == DISCONNECT:
|
|
try:
|
|
dis = DataInputStream(data)
|
|
reason_id = dis.read_int()
|
|
reason = DISCONNECT_REASONS.get(reason_id, f"Unknown({reason_id})")
|
|
self.log(f"[{self.name}] kicked during login: {reason}")
|
|
except Exception:
|
|
pass
|
|
return False
|
|
|
|
return False
|
|
|
|
def _drain_frames(self) -> None:
|
|
"""Drain and discard all complete frames from recv buffer."""
|
|
while len(self._recv_buf) >= 4:
|
|
payload_len = struct.unpack(">I", self._recv_buf[:4])[0]
|
|
total = 4 + payload_len
|
|
if len(self._recv_buf) < total:
|
|
break
|
|
|
|
raw_payload = bytes(self._recv_buf[4:total])
|
|
del self._recv_buf[:total]
|
|
|
|
if self._recv_cipher:
|
|
raw_payload = self._recv_cipher.process(raw_payload)
|
|
|
|
if not raw_payload:
|
|
continue
|
|
|
|
packet_id = raw_payload[0]
|
|
data = raw_payload[1:]
|
|
|
|
# Handle identity tokens
|
|
if packet_id == CUSTOM_PAYLOAD:
|
|
self._handle_custom_payload(data)
|
|
elif packet_id == MOVE_PLAYER:
|
|
self._handle_server_move(data)
|
|
|
|
def _handle_server_move(self, data: bytes) -> None:
|
|
"""Track server's view of our position. PosRot format:
|
|
double x, double y, double yView, double z, float yRot, float xRot, byte flags."""
|
|
try:
|
|
dis = DataInputStream(data)
|
|
x = dis.read_double()
|
|
y = dis.read_double()
|
|
_yView = dis.read_double()
|
|
z = dis.read_double()
|
|
self._pos_x = x
|
|
self._pos_y = y
|
|
self._pos_z = z
|
|
self._pos_initialized = True
|
|
except Exception:
|
|
pass
|
|
|
|
def _handle_custom_payload(self, data: bytes) -> None:
|
|
"""Handle cipher handshake and identity token channels."""
|
|
try:
|
|
dis = DataInputStream(data)
|
|
channel = dis.read_utf()
|
|
length = dis.read_short()
|
|
payload = dis.read_raw(length) if length > 0 else b""
|
|
|
|
# Cipher channels arrive in plaintext before encryption is active.
|
|
# Handle them here so the bot survives bursts where the whole
|
|
# handshake frame lands in a single recv(), bypassing the leftover
|
|
# byte-pattern scan.
|
|
if channel == CIPHER_KEY_CHANNEL and len(payload) == 32 and not self._cipher_key:
|
|
self._cipher_key = payload[:16]
|
|
self._cipher_iv = payload[16:32]
|
|
self.log(f"[{self.name}] got cipher key")
|
|
self._send_packet(CUSTOM_PAYLOAD,
|
|
build_custom_payload(CIPHER_ACK_CHANNEL))
|
|
iv_send = bytearray(self._cipher_iv)
|
|
iv_send[0] ^= 0x80
|
|
self._send_cipher = CipherState(self._cipher_key, bytes(iv_send))
|
|
elif channel == CIPHER_ON_CHANNEL:
|
|
if self._cipher_key and not self._recv_cipher:
|
|
self._recv_cipher = CipherState(self._cipher_key, self._cipher_iv)
|
|
self.log(f"[{self.name}] cipher active")
|
|
elif channel == IDENTITY_TOKEN_ISSUE and len(payload) == 32:
|
|
self._identity_token = payload
|
|
self.log(f"[{self.name}] got identity token")
|
|
elif channel == IDENTITY_TOKEN_CHALLENGE:
|
|
if self._identity_token and len(self._identity_token) == 32:
|
|
self._send_packet(CUSTOM_PAYLOAD,
|
|
build_custom_payload(IDENTITY_TOKEN_RESPONSE, self._identity_token))
|
|
else:
|
|
self._send_packet(CUSTOM_PAYLOAD,
|
|
build_custom_payload(IDENTITY_TOKEN_RESPONSE))
|
|
self.log(f"[{self.name}] answered identity challenge")
|
|
except Exception:
|
|
pass
|
|
|
|
def _send_packet(self, packet_id: int, payload: bytes) -> None:
|
|
raw = frame_packet(packet_id, payload)
|
|
if self._send_cipher:
|
|
header = raw[:4]
|
|
encrypted = self._send_cipher.process(raw[4:])
|
|
raw = header + encrypted
|
|
try:
|
|
if self._sock:
|
|
self._sock.sendall(raw)
|
|
except OSError:
|
|
pass
|
|
|
|
def _recv_exact(self, n: int) -> bytes:
|
|
data = b""
|
|
while len(data) < n:
|
|
chunk = self._sock.recv(n - len(data))
|
|
if not chunk:
|
|
raise ConnectionError("Connection closed during recv")
|
|
data += chunk
|
|
return data
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Orchestrator
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class StressTestRunner:
|
|
def __init__(self, host: str, port: int, max_bots: int, cycles: int,
|
|
hold_min: float, hold_max: float, ramp: float,
|
|
send_moves: bool, burst: int, duration: float,
|
|
quiet: bool):
|
|
self.host = host
|
|
self.port = port
|
|
self.max_bots = max_bots
|
|
self.cycles = cycles
|
|
self.hold_min = hold_min
|
|
self.hold_max = hold_max
|
|
self.ramp = ramp
|
|
self.send_moves = send_moves
|
|
self.burst = burst
|
|
self.duration = duration
|
|
self.quiet = quiet
|
|
self.stats = Stats()
|
|
self._active = threading.Semaphore(max_bots)
|
|
self._stop = threading.Event()
|
|
self._bot_counter = 0
|
|
self._counter_lock = threading.Lock()
|
|
|
|
def run(self) -> None:
|
|
self.stats.start_time = time.time()
|
|
logger.info(f"Stress test: {self.host}:{self.port}")
|
|
logger.info(f" max_bots={self.max_bots} cycles={self.cycles or 'infinite'} "
|
|
f"hold={self.hold_min}-{self.hold_max}s ramp={self.ramp}s "
|
|
f"burst={self.burst} moves={self.send_moves}")
|
|
if self.duration:
|
|
logger.info(f" duration={self.duration}s")
|
|
|
|
# Status printer
|
|
status_thread = threading.Thread(target=self._print_status, daemon=True)
|
|
status_thread.start()
|
|
|
|
# Duration timer
|
|
if self.duration > 0:
|
|
timer = threading.Timer(self.duration, self._stop.set)
|
|
timer.daemon = True
|
|
timer.start()
|
|
|
|
cycle = 0
|
|
try:
|
|
while not self._stop.is_set():
|
|
if self.cycles > 0 and cycle >= self.cycles:
|
|
break
|
|
|
|
# Spawn a burst of bots
|
|
threads = []
|
|
for _ in range(self.burst):
|
|
if self._stop.is_set():
|
|
break
|
|
if self.cycles > 0 and cycle >= self.cycles:
|
|
break
|
|
|
|
self._active.acquire()
|
|
if self._stop.is_set():
|
|
self._active.release()
|
|
break
|
|
|
|
with self._counter_lock:
|
|
self._bot_counter += 1
|
|
bot_name = f"StressBot{self._bot_counter}"
|
|
|
|
xuid = secrets.randbits(62) | (1 << 32)
|
|
bot = StressBot(
|
|
name=bot_name,
|
|
host=self.host,
|
|
port=self.port,
|
|
xuid=xuid,
|
|
hold_min=self.hold_min,
|
|
hold_max=self.hold_max,
|
|
send_moves=self.send_moves,
|
|
stats=self.stats,
|
|
quiet=self.quiet,
|
|
)
|
|
|
|
t = threading.Thread(
|
|
target=self._run_bot, args=(bot,), daemon=True)
|
|
t.start()
|
|
threads.append(t)
|
|
cycle += 1
|
|
|
|
# Ramp delay between bursts
|
|
if self.ramp > 0 and not self._stop.is_set():
|
|
self._stop.wait(self.ramp)
|
|
|
|
except KeyboardInterrupt:
|
|
logger.info("\nInterrupted by user")
|
|
self._stop.set()
|
|
|
|
# Wait for active bots to finish
|
|
logger.info("Waiting for active bots to finish...")
|
|
for _ in range(self.max_bots):
|
|
self._active.acquire(timeout=30)
|
|
|
|
logger.info(f"\nFinal: {self.stats.summary()}")
|
|
|
|
def _run_bot(self, bot: StressBot) -> None:
|
|
try:
|
|
bot.run()
|
|
finally:
|
|
self._active.release()
|
|
|
|
def _print_status(self) -> None:
|
|
while not self._stop.is_set():
|
|
self._stop.wait(5.0)
|
|
if not self._stop.is_set():
|
|
logger.info(self.stats.summary())
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CLI
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser(
|
|
description="LCE Server Stress Testing Tool",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
)
|
|
parser.add_argument("host", nargs="?", default="127.0.0.1",
|
|
help="Server hostname or IP (default: 127.0.0.1)")
|
|
parser.add_argument("port", nargs="?", type=int, default=19132,
|
|
help="Server port (default: 19132)")
|
|
parser.add_argument("--bots", type=int, default=8,
|
|
help="Max concurrent bots (default: 8)")
|
|
parser.add_argument("--cycles", type=int, default=50,
|
|
help="Connect/disconnect cycles (default: 50, 0=infinite)")
|
|
parser.add_argument("--hold", type=float, nargs=2, default=[2.0, 10.0],
|
|
metavar=("MIN", "MAX"),
|
|
help="Hold time range in seconds (default: 2 10)")
|
|
parser.add_argument("--ramp", type=float, default=0.5,
|
|
help="Delay between spawning bots (default: 0.5)")
|
|
parser.add_argument("--move", action="store_true",
|
|
help="Bots send movement packets while connected")
|
|
parser.add_argument("--burst", type=int, default=1,
|
|
help="Bots to spawn simultaneously (default: 1)")
|
|
parser.add_argument("--duration", type=float, default=0,
|
|
help="Run for N seconds then stop (default: unlimited)")
|
|
parser.add_argument("--quiet", action="store_true",
|
|
help="Suppress per-bot log messages")
|
|
|
|
args = parser.parse_args()
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s %(message)s",
|
|
datefmt="%H:%M:%S",
|
|
)
|
|
|
|
runner = StressTestRunner(
|
|
host=args.host,
|
|
port=args.port,
|
|
max_bots=args.bots,
|
|
cycles=args.cycles,
|
|
hold_min=args.hold[0],
|
|
hold_max=args.hold[1],
|
|
ramp=args.ramp,
|
|
send_moves=args.move,
|
|
burst=args.burst,
|
|
duration=args.duration,
|
|
quiet=args.quiet,
|
|
)
|
|
runner.run()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|