from flask import Flask, request, render_template_string, jsonify, make_response from flask_login import LoginManager, UserMixin, login_required import socket import json import time import os login_manager = LoginManager() app = Flask(__name__) login_manager.init_app(app) STATUS = [ "ok", "warning", "error", "unknown" ] HOST_STATUS = [ "up", "down", "unreachable", ] #### Push AUTHORIZED_KEYS = os.environ.get("TOKENS", "").split() COMMAND_FILE = "/var/run/naemon/naemon.cmd" ERROR_NO_REQUEST_HANDLER="NO REQUEST HANDLER" ERROR_NO_TOKEN_SUPPLIED="NO TOKEN" ERROR_BAD_TOKEN_SUPPLIED="BAD TOKEN" ERROR_BAD_COMMAND_FILE="BAD COMMAND FILE" ERROR_COMMAND_FILE_OPEN_WRITE="COMMAND FILE UNWRITEABLE" ERROR_COMMAND_FILE_OPEN="CANNOT OPEN COMMAND FILE" ERROR_BAD_WRITE="WRITE ERROR" ERROR_BAD_DATA="BAD DATA" ERROR_BAD_JSON="BAD JSON" ERROR_NO_CORRECT_STATUS="NO STATUS WAS CORRECT" #### /Push def get_lq(request): # https://mathias-kettner.de/checkmk_livestatus.html socket_path="/var/run/naemon/live" s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) s.connect(socket_path) s.send(request.encode()) s.shutdown(socket.SHUT_WR) chunks = [] while len(chunks) == 0 or len(chunks[-1]) > 0: chunks.append(s.recv(4096)) s.close() return b"".join(chunks).decode() class Host: def __init__(self, name, alias, status, webname): self.name = name self.alias = alias self.webname = webname or alias self.status = status self.services = [] @classmethod def parse_hosts(cls, payload): parsed = [cls.parse(p) for p in json.loads(payload)] return {p.name: p for p in parsed} @classmethod def parse(cls, payload): return cls(payload[0], payload[1], HOST_STATUS[payload[2]], payload[3].get("WEBSTATUS_NAME")) def __repr__(self): return "Host {}: {} ({})".format(self.name, self.alias, self.webname) @classmethod def query(cls): answer = get_lq("""GET hosts Filter: groups >= webstatus-hosts Columns: name alias state custom_variables OutputFormat: json """) return cls.parse_hosts(answer) def fill_services(self, services): self.services = [service for service in services if service.host == self.name] class ServiceGroup: def __init__(self, name, alias): self.name = name self.alias = alias self.services = [] @classmethod def parse_groups(cls, payload): parsed = [cls.parse(p) for p in json.loads(payload)] return {p.name: p for p in parsed} @classmethod def parse(cls, payload): return cls(payload[0], payload[1]) @classmethod def query(cls): answer = get_lq("""GET servicegroups Filter: name ~ ^webstatus- Columns: name alias custom_variables OutputFormat: json """) return cls.parse_groups(answer) def fill_services(self, services): self.services = [service for service in services if any([group == self.name for group in service.groups])] def __repr__(self): return "ServiceGroup {}: {}".format(self.name, self.alias) class Service: def __init__(self, name, host, groups, status, webname, url, description, infos): self.name = name self.host = host self.groups = groups self.status = status self.webname = webname self.url = url self.description = description self.infos = infos @classmethod def parse_services(cls, payload): parsed = json.loads(payload) return [cls.parse(p) for p in parsed if cls.valid(p[2])] @staticmethod def valid(groups): return any([b.startswith("webstatus-") for b in groups]) @classmethod def parse(cls, payload): return cls(payload[0], payload[1], payload[2], STATUS[payload[3]], payload[4].get("WEBSTATUS_NAME"), payload[4].get("WEBSTATUS_URL"), payload[5], payload[6]) @classmethod def query(cls): answer = get_lq("""GET services Columns: display_name host_name groups state custom_variables description plugin_output OutputFormat: json """) return cls.parse_services(answer) def __repr__(self): return "Service {}: {}".format(self.name, self.webname) def get_infos(): hosts = Host.query() servicegroups = ServiceGroup.query() services = Service.query() for host in hosts: hosts[host].fill_services(services) for group in servicegroups: servicegroups[group].fill_services(services) return (hosts, servicegroups, services) TEMPLATE='''