Initial commit

This commit is contained in:
2025-08-21 21:38:06 +02:00
commit c855ca2633
6 changed files with 883 additions and 0 deletions

270
plugins/minecraft.py Executable file
View File

@@ -0,0 +1,270 @@
#!/usr/bin/env python3
from typeguard import typechecked
from dataclasses import dataclass, field, asdict
from typing import List
from typing import Optional
import json, logging, re
from mcstatus import JavaServer
from mcrcon import MCRcon
# Put plugin in /lib/check_mk_agent/plugins
#
# pip3 install mcstatus mcrcon --break-system-packages
# apt install python3-typeguard
logging.basicConfig(level=logging.CRITICAL, format="%(levelname)s: %(message)s")
# MARK: Config
@dataclass
class ServerConfig:
name: str = "default"
host: str = "127.0.0.1"
port: int = 25565
rcon: bool = False
rcon_type: str = "neoforge"
rcon_port: int = 25575
rcon_password: str = ""
@typechecked
def load_configs(cfg_path: str = "minecraft_servers.json") -> list[ServerConfig]:
# TODO: Add forge tps parsing
valid_rcon_types = ["neoforge"] # , 'forge']
defaults = ServerConfig().__dict__.copy() # Get defaults from dataclass
try:
with open(cfg_path, "x", encoding="utf-8") as file:
json.dump([defaults], file, indent=2)
except FileExistsError:
logging.debug("cfg exists")
try:
cfg_raw = json.load(open(cfg_path, encoding="utf-8"))
except (json.JSONDecodeError, FileNotFoundError):
logging.error("Invalid or missing config, using defaults")
cfg_raw = [defaults]
# check if some config is there / else -> default
if not isinstance(cfg_raw, list) or not cfg_raw:
logging.error("Invalid or missing config, using defaults")
cfg_raw = [defaults]
result_run = []
result_write = []
# build config
for entry in cfg_raw:
cfg_run = {**defaults, **entry}
cfg_write = {**defaults, **entry}
# check if rcon is active and rcon_type is not valid
if cfg_run["rcon"] and cfg_run["rcon_type"] not in valid_rcon_types:
logging.debug(
f"Server: {cfg_run['name']} -> RCON disabled! Type '{cfg_run['rcon_type']}' unknown!"
)
cfg_run["rcon"] = False # Disable rcon if rcon_type is invalid
if cfg_run["rcon"]:
logging.debug(
f"Server: {cfg_run['name']} -> RCON enabled! Type '{cfg_run['rcon_type']}'"
)
result_run.append(
ServerConfig(**cfg_run)
) # runtime config as ServerConfig Object
result_write.append(cfg_write) # Cant serialize ServerConfig obj to json
logging.debug("run: " + str(cfg_run))
logging.debug("write: " + str(cfg_write))
# Write cfg back to disk (with defaults if something is missing)
with open(cfg_path, "w", encoding="utf-8") as file:
json.dump(result_write, file, indent=2)
return result_run
# MARK: ServerStatus
@dataclass
class ServerStatus:
player_online: int | None = None
player_max: int | None = None
player_list: str | None = None
motd: str | None = None
ping: float | None = None
server_version: str | None = None
protocol_version: int | None = None
error: str | None = None
@typechecked
def get_status(cfg: ServerConfig) -> ServerStatus:
try:
server = JavaServer(cfg.host, cfg.port).status()
return ServerStatus(
# Player
player_online=server.players.online,
player_max=server.players.max,
player_list=", ".join(p.name for p in (server.players.sample or [])) or "",
# MOTD / Ping / Server-Version / Protocol-Version
motd=server.motd.to_plain().replace("\n", " ").replace("\r", ""),
ping=server.latency,
server_version=getattr(server.version, "name", None),
protocol_version=getattr(server.version, "protocol", None),
)
except Exception as e:
return ServerStatus(
error="Error: " + str(e),
)
# MARK: RCON
@dataclass
class WorldStats:
world: str
tps: float
tick: float
@dataclass
class RconMetrics:
worlds: List[WorldStats] = field(default_factory=list)
error: str | None = None
@typechecked
def add(self, world: str, tps: float, tick: float) -> None:
self.worlds.append(WorldStats(world, tps, tick))
@typechecked
def get_rcon(cfg: ServerConfig) -> RconMetrics:
try:
match cfg.rcon_type:
case "neoforge":
rcon_cmd = "neoforge tps"
rcon_parser = parse_neoforge_tps
# TODO: Add forge tps parsing
# case 'forge':
# rcon_cmd = 'forge tps'
# rcon_parser = parse_forge_tps
case (
_
): # Unknown rcon_type -> This should never happen -> config should disable rcon if not valid
return RconMetrics(
error="Error (RCON): Unknown RCON type {cfg.rcon_type}",
)
# get rcon command response and close connection
with MCRcon(cfg.host, cfg.rcon_password, port=cfg.rcon_port) as remote:
resp = remote.command(rcon_cmd)
# Check if command is valid on server
if resp.lower().startswith("unknown or incomplete command"):
logging.error(f"Error (RCON): Unknown command: {rcon_cmd}")
return RconMetrics(
error=f"Error (RCON): Unknown command: {rcon_cmd}",
)
return rcon_parser(resp) # return parsed data
except Exception as e:
logging.error(f"Error (RCON): {str(e)}")
return RconMetrics(
error=f"Error (RCON): {str(e)}",
)
# MARK: RCON response parsing
def parse_neoforge_tps(data: str) -> RconMetrics:
result = RconMetrics()
try:
regex = re.compile(
r"^(?P<world>.+?):\s*"
r"(?P<tps>[\d,]+)\s*TPS\s*"
r"\(\s*(?P<tick>[\d,]+)\s*ms/tick\)\s*$"
)
for raw_line in data.splitlines():
line = raw_line.strip()
if not line:
continue
m = regex.match(line)
if not m:
# TODO: Could this make problems? Do we have not parsable lines?
logging.warning("Could not parse TPS line: %r", line)
return RconMetrics(
error=f"Error (RCON-Parse): Could not parse TPS line: {line}",
)
world = m.group("world")
try:
tps = float(m.group("tps").replace(",", "."))
tick = float(m.group("tick").replace(",", "."))
except ValueError as e:
logging.error("Invalid numeric value in line %r: %s", line, e)
return RconMetrics(
error=f"Error (RCON-Parse): Invalid numeric value in line {line}: {str(e)}",
)
if not (0.0 <= tps <= 20.0):
logging.error(
"TPS out of expected range [0,20]: %.3f for %r", tps, world
)
return RconMetrics(
error=f"Error (RCON-Parse): TPS out of expected range [0,20]: {tps:.3f} in {world}"
)
result.add(world, tps, tick)
return result
except Exception as e:
logging.error(f"Error (RCON): {str(e)}")
return RconMetrics(
error=f"Error (RCON): {str(e)}",
)
# TODO: Add forge tps parsing
# def parse_forge_tps(data: str) -> RconMetrics:
# return RconMetrics()
def print_data(
cfg: ServerConfig, status: ServerStatus, rcon: Optional[RconMetrics]
) -> None:
payload = {
"server_name": cfg.name,
"status": asdict(status),
"rcon": asdict(rcon) if rcon is not None else None,
}
print(json.dumps(payload, ensure_ascii=False, indent=2))
if __name__ == "__main__":
all_results = []
for cfg in load_configs():
logging.debug("Data: " + str(cfg))
data = get_status(cfg)
rcon = get_rcon(cfg) if cfg.rcon else None
all_results.append({
"server_name": cfg.name,
"status": asdict(data),
"rcon": asdict(rcon) if rcon is not None else None
})
logging.info(f"Server: {cfg.name}")
logging.info("Data: " + str(data) + "\n")
logging.info("Rcon: " + str(rcon))
print('<<<minecraft:sep(0)>>>')
print(json.dumps(all_results, ensure_ascii=False, indent=None))