415 lines
13 KiB
Python
415 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
# Telegram Notification for Checkmk
|
|
# Derived from Berghmans/cmk_discord (GPL-2.0-only)
|
|
# Version: 0.1.0
|
|
|
|
import datetime
|
|
import html
|
|
import os
|
|
import sys
|
|
from dataclasses import dataclass
|
|
from enum import Enum
|
|
from http import HTTPStatus
|
|
from typing import Optional
|
|
|
|
import requests
|
|
|
|
|
|
@dataclass
|
|
class Context:
|
|
"""Checkmk notification context."""
|
|
|
|
# Common fields
|
|
what: str # SERVICE or HOST
|
|
notification_type: str
|
|
short_datetime: str
|
|
omd_site: str
|
|
hostname: str
|
|
|
|
# Parameters from the notification rule
|
|
telegram_token: Optional[str] = None # PARAMETER_1
|
|
chat_id: Optional[str] = None # PARAMETER_2
|
|
site_url: Optional[str] = None # PARAMETER_3
|
|
|
|
# Optional Telegram parameters
|
|
message_thread_id: Optional[str] = None # PARAMETER_4, for forum topics
|
|
disable_notification: Optional[str] = None # PARAMETER_5: true/false
|
|
|
|
# Service-specific fields
|
|
service_desc: Optional[str] = None
|
|
service_state: Optional[str] = None
|
|
previous_service_state: Optional[str] = None
|
|
service_output: Optional[str] = None
|
|
service_check_command: Optional[str] = None
|
|
service_url: Optional[str] = None
|
|
|
|
# Host-specific fields
|
|
host_state: Optional[str] = None
|
|
previous_host_state: Optional[str] = None
|
|
host_output: Optional[str] = None
|
|
host_check_command: Optional[str] = None
|
|
host_url: Optional[str] = None
|
|
|
|
# Optional comment
|
|
notification_comment: Optional[str] = None
|
|
|
|
@classmethod
|
|
def from_dict(cls, data: dict) -> "Context":
|
|
"""Create Context from environment variable dictionary."""
|
|
return cls(
|
|
what=data.get("WHAT", ""),
|
|
notification_type=data.get("NOTIFICATIONTYPE", ""),
|
|
short_datetime=data.get("SHORTDATETIME", ""),
|
|
omd_site=data.get("OMD_SITE", ""),
|
|
hostname=data.get("HOSTNAME", ""),
|
|
telegram_token=data.get("PARAMETER_1"),
|
|
chat_id=data.get("PARAMETER_2"),
|
|
site_url=data.get("PARAMETER_3"),
|
|
message_thread_id=data.get("PARAMETER_4"),
|
|
disable_notification=data.get("PARAMETER_5"),
|
|
service_desc=data.get("SERVICEDESC"),
|
|
service_state=data.get("SERVICESTATE"),
|
|
previous_service_state=data.get("LASTSERVICESTATE")
|
|
or data.get("PREVIOUSSERVICEHARDSTATE"),
|
|
service_output=data.get("SERVICEOUTPUT"),
|
|
service_check_command=data.get("SERVICECHECKCOMMAND"),
|
|
service_url=data.get("SERVICEURL"),
|
|
host_state=data.get("HOSTSTATE"),
|
|
previous_host_state=data.get("LASTHOSTSTATE")
|
|
or data.get("PREVIOUSHOSTHARDSTATE"),
|
|
host_output=data.get("HOSTOUTPUT"),
|
|
host_check_command=data.get("HOSTCHECKCOMMAND"),
|
|
host_url=data.get("HOSTURL"),
|
|
notification_comment=data.get("NOTIFICATIONCOMMENT"),
|
|
)
|
|
|
|
@classmethod
|
|
def from_env(cls) -> "Context":
|
|
"""Create Context from environment variables (NOTIFY_* variables)."""
|
|
env_dict = {
|
|
var[7:]: value
|
|
for (var, value) in os.environ.items()
|
|
if var.startswith("NOTIFY_")
|
|
}
|
|
return cls.from_dict(env_dict)
|
|
|
|
def validate(self) -> None:
|
|
"""Validate the context and raise SystemExit if invalid."""
|
|
if not self.telegram_token:
|
|
sys.stderr.write("Empty Telegram bot token given as parameter 1")
|
|
sys.exit(2)
|
|
|
|
if self.telegram_token.startswith("http") and not self.telegram_token.startswith(
|
|
"https://api.telegram.org/bot"
|
|
):
|
|
sys.stderr.write(
|
|
"Invalid Telegram API URL given as first parameter "
|
|
"(expected bot token or https://api.telegram.org/bot.../sendMessage)"
|
|
)
|
|
sys.exit(2)
|
|
|
|
if not self.chat_id:
|
|
sys.stderr.write("Empty Telegram chat id given as parameter 2")
|
|
sys.exit(2)
|
|
|
|
if self.site_url and not self.site_url.startswith("http"):
|
|
sys.stderr.write(
|
|
"Invalid site url given as third parameter (not starting with http): %s"
|
|
% self.site_url
|
|
)
|
|
sys.exit(2)
|
|
|
|
|
|
class AlertColor(str, Enum):
|
|
"""Mapping of Checkmk alert states to Telegram-visible state markers."""
|
|
|
|
CRITICAL = "🔴"
|
|
DOWN = "🔴"
|
|
WARNING = "🟡"
|
|
OK = "🟢"
|
|
UP = "🟢"
|
|
UNKNOWN = "🟠"
|
|
UNREACHABLE = "⚫"
|
|
|
|
|
|
class NotificationEmoji(str, Enum):
|
|
"""Mapping of Checkmk notification types to Unicode emojis."""
|
|
|
|
PROBLEM = "🚨"
|
|
RECOVERY = "✅"
|
|
ACKNOWLEDGEMENT = "☑️"
|
|
FLAPPINGSTART = "⁉️"
|
|
FLAPPINGSTOP = "✅"
|
|
DOWNTIMESTART = "⏰"
|
|
DOWNTIMEEND = "✅"
|
|
DOWNTIMECANCELLED = "☑️"
|
|
|
|
|
|
@dataclass
|
|
class TelegramMessage:
|
|
"""Base class for Telegram messages that mimic the Discord embed layout."""
|
|
|
|
ctx: Context
|
|
timestamp: str
|
|
previous_state: Optional[str]
|
|
current_state: Optional[str]
|
|
output: Optional[str]
|
|
title_subject: str
|
|
footer_text: Optional[str]
|
|
url_path: Optional[str]
|
|
fields: Optional[list] = None
|
|
|
|
MAX_TEXT_LENGTH = 4096
|
|
TRUNCATION_SUFFIX = "\n\n… truncated by cmk_telegram because Telegram messages are limited."
|
|
|
|
@staticmethod
|
|
def escape(value: Optional[str]) -> str:
|
|
"""HTML-escape a value for Telegram parse_mode=HTML."""
|
|
return html.escape(str(value or ""), quote=False)
|
|
|
|
@staticmethod
|
|
def get_alert_marker(state: Optional[str]) -> str:
|
|
"""Get a Telegram-visible marker for a Checkmk alert state."""
|
|
if state and state in AlertColor.__members__:
|
|
return AlertColor[state].value
|
|
return "⚪"
|
|
|
|
@staticmethod
|
|
def get_emoji(notification_type: str) -> str:
|
|
"""Get the emoji for the notification type."""
|
|
# IMPORTANT: Use __members__ instead of iterating the enum directly.
|
|
# Duplicate enum values (RECOVERY, FLAPPINGSTOP, DOWNTIMEEND) are aliases
|
|
# and would otherwise be skipped during iteration.
|
|
if notification_type in NotificationEmoji.__members__:
|
|
return NotificationEmoji[notification_type].value
|
|
|
|
for member_name in NotificationEmoji.__members__:
|
|
if notification_type.startswith(member_name):
|
|
return NotificationEmoji[member_name].value
|
|
|
|
return ""
|
|
|
|
@classmethod
|
|
def from_context(cls, ctx: Context) -> "TelegramMessage":
|
|
"""Factory method to create the appropriate message type based on context."""
|
|
timestamp = str(datetime.datetime.fromisoformat(ctx.short_datetime).astimezone())
|
|
message_class = ServiceMessage if ctx.what == "SERVICE" else HostMessage
|
|
return message_class(ctx, timestamp)
|
|
|
|
def _build_title(self) -> str:
|
|
marker = self.get_alert_marker(self.current_state)
|
|
emoji = self.get_emoji(self.ctx.notification_type)
|
|
return "%s %s <b>%s: %s</b>" % (
|
|
marker,
|
|
emoji,
|
|
self.escape(self.ctx.notification_type),
|
|
self.escape(self.title_subject),
|
|
)
|
|
|
|
def _build_state_transition(self) -> str:
|
|
return "<b>%s -> %s</b>" % (
|
|
self.escape(self.previous_state),
|
|
self.escape(self.current_state),
|
|
)
|
|
|
|
def _build_description(self) -> str:
|
|
parts = [self._build_state_transition()]
|
|
|
|
if self.output:
|
|
parts.extend(["", self.escape(self.output)])
|
|
|
|
if self.ctx.notification_comment:
|
|
parts.extend(["", "<b>Comment</b>", self.escape(self.ctx.notification_comment)])
|
|
|
|
return "\n".join(parts)
|
|
|
|
def _build_fields(self) -> str:
|
|
if not self.fields:
|
|
return ""
|
|
|
|
lines = []
|
|
for field in self.fields:
|
|
name = self.escape(field.get("name"))
|
|
value = self.escape(field.get("value"))
|
|
lines.append("<b>%s</b>: %s" % (name, value))
|
|
|
|
return "\n".join(lines)
|
|
|
|
def _build_footer(self) -> str:
|
|
footer_parts = []
|
|
|
|
if self.footer_text:
|
|
footer_parts.append(self.escape(self.footer_text))
|
|
|
|
if self.ctx.omd_site:
|
|
footer_parts.append("Checkmk - %s" % self.escape(self.ctx.omd_site))
|
|
|
|
if self.timestamp:
|
|
footer_parts.append(self.escape(self.timestamp))
|
|
|
|
if not footer_parts:
|
|
return ""
|
|
|
|
return "<i>%s</i>" % self.escape(" • ").join(footer_parts)
|
|
|
|
def get_checkmk_url(self) -> Optional[str]:
|
|
if not self.ctx.site_url or not self.url_path:
|
|
return None
|
|
|
|
return "".join([self.ctx.site_url.rstrip("/"), self.url_path])
|
|
|
|
def _truncate(self, text: str) -> str:
|
|
if len(text) <= self.MAX_TEXT_LENGTH:
|
|
return text
|
|
|
|
keep = self.MAX_TEXT_LENGTH - len(self.TRUNCATION_SUFFIX)
|
|
return text[:keep] + self.TRUNCATION_SUFFIX
|
|
|
|
def to_text(self) -> str:
|
|
sections = [self._build_title(), "", self._build_description()]
|
|
|
|
fields = self._build_fields()
|
|
if fields:
|
|
sections.extend(["", fields])
|
|
|
|
footer = self._build_footer()
|
|
if footer:
|
|
sections.extend(["", footer])
|
|
|
|
return self._truncate("\n".join(sections))
|
|
|
|
def to_reply_markup(self) -> Optional[dict]:
|
|
checkmk_url = self.get_checkmk_url()
|
|
if not checkmk_url:
|
|
return None
|
|
|
|
return {
|
|
"inline_keyboard": [
|
|
[
|
|
{
|
|
"text": "Open in Checkmk",
|
|
"url": checkmk_url,
|
|
}
|
|
]
|
|
]
|
|
}
|
|
|
|
|
|
class ServiceMessage(TelegramMessage):
|
|
"""Telegram message for service notifications."""
|
|
|
|
def __init__(self, ctx: Context, timestamp: str):
|
|
super().__init__(
|
|
ctx=ctx,
|
|
timestamp=timestamp,
|
|
previous_state=ctx.previous_service_state,
|
|
current_state=ctx.service_state,
|
|
output=ctx.service_output,
|
|
title_subject=ctx.service_desc or "",
|
|
footer_text=ctx.service_check_command,
|
|
url_path=ctx.service_url,
|
|
fields=[
|
|
{"name": "Host", "value": ctx.hostname},
|
|
{"name": "Service", "value": ctx.service_desc},
|
|
],
|
|
)
|
|
|
|
|
|
class HostMessage(TelegramMessage):
|
|
"""Telegram message for host notifications."""
|
|
|
|
def __init__(self, ctx: Context, timestamp: str):
|
|
super().__init__(
|
|
ctx=ctx,
|
|
timestamp=timestamp,
|
|
previous_state=ctx.previous_host_state,
|
|
current_state=ctx.host_state,
|
|
output=ctx.host_output,
|
|
title_subject="Host: %s" % ctx.hostname,
|
|
footer_text=ctx.host_check_command,
|
|
url_path=ctx.host_url,
|
|
)
|
|
|
|
|
|
class TelegramBotApi:
|
|
"""Telegram Bot API sender for Checkmk notifications."""
|
|
|
|
def __init__(self, token_or_url: str, chat_id: str, message: TelegramMessage):
|
|
self.token_or_url = token_or_url.strip()
|
|
self.chat_id = chat_id.strip()
|
|
self.message = message
|
|
|
|
def _api_url(self) -> str:
|
|
if self.token_or_url.startswith("https://api.telegram.org/bot"):
|
|
return self.token_or_url
|
|
|
|
token = self.token_or_url
|
|
if token.startswith("bot"):
|
|
token = token[3:]
|
|
|
|
return "https://api.telegram.org/bot%s/sendMessage" % token
|
|
|
|
@staticmethod
|
|
def _as_bool(value: Optional[str]) -> bool:
|
|
return str(value or "").strip().lower() in ("1", "true", "yes", "y", "on")
|
|
|
|
def _build_payload(self) -> dict:
|
|
payload = {
|
|
"chat_id": self.chat_id,
|
|
"text": self.message.to_text(),
|
|
"parse_mode": "HTML",
|
|
"link_preview_options": {"is_disabled": True},
|
|
"disable_notification": self._as_bool(self.message.ctx.disable_notification),
|
|
}
|
|
|
|
reply_markup = self.message.to_reply_markup()
|
|
if reply_markup:
|
|
payload["reply_markup"] = reply_markup
|
|
|
|
if self.message.ctx.message_thread_id:
|
|
payload["message_thread_id"] = self.message.ctx.message_thread_id
|
|
|
|
return payload
|
|
|
|
def send(self) -> None:
|
|
response = requests.post(url=self._api_url(), json=self._build_payload(), timeout=10)
|
|
|
|
if response.status_code != HTTPStatus.OK.value:
|
|
sys.stderr.write(
|
|
"Unexpected response when calling Telegram Bot API URL %s: %i. "
|
|
"Response body: %s" % (self._api_url(), response.status_code, response.text)
|
|
)
|
|
sys.exit(1)
|
|
|
|
try:
|
|
body = response.json()
|
|
except ValueError:
|
|
sys.stderr.write(
|
|
"Unexpected non-JSON response when calling Telegram Bot API URL %s: %s"
|
|
% (self._api_url(), response.text)
|
|
)
|
|
sys.exit(1)
|
|
|
|
if body.get("ok") is not True:
|
|
sys.stderr.write(
|
|
"Unexpected response when calling Telegram Bot API URL %s: %i. "
|
|
"Response body: %s" % (self._api_url(), response.status_code, response.text)
|
|
)
|
|
sys.exit(1)
|
|
|
|
|
|
def main():
|
|
ctx = Context.from_env()
|
|
ctx.validate()
|
|
message = TelegramMessage.from_context(ctx)
|
|
telegram = TelegramBotApi(ctx.telegram_token, ctx.chat_id, message)
|
|
telegram.send()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
main()
|
|
except Exception as e:
|
|
sys.stderr.write("Unhandled exception: %s\n" % e)
|
|
sys.exit(2)
|