from flask import Flask, request, render_template_string, jsonify, make_response from flask_login import LoginManager, UserMixin, login_required import socket import json import time import os login_manager = LoginManager() app = Flask(__name__) login_manager.init_app(app) STATUS = [ "ok", "warning", "error", "unknown" ] HOST_STATUS = [ "up", "down", "unreachable", ] #### Push AUTHORIZED_KEYS = os.environ.get("TOKENS", "").split() COMMAND_FILE = "/var/run/naemon/naemon.cmd" ERROR_NO_REQUEST_HANDLER="NO REQUEST HANDLER" ERROR_NO_TOKEN_SUPPLIED="NO TOKEN" ERROR_BAD_TOKEN_SUPPLIED="BAD TOKEN" ERROR_BAD_COMMAND_FILE="BAD COMMAND FILE" ERROR_COMMAND_FILE_OPEN_WRITE="COMMAND FILE UNWRITEABLE" ERROR_COMMAND_FILE_OPEN="CANNOT OPEN COMMAND FILE" ERROR_BAD_WRITE="WRITE ERROR" ERROR_BAD_DATA="BAD DATA" ERROR_BAD_JSON="BAD JSON" ERROR_NO_CORRECT_STATUS="NO STATUS WAS CORRECT" #### /Push def get_lq(request): # https://mathias-kettner.de/checkmk_livestatus.html socket_path="/var/run/naemon/live" s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) s.connect(socket_path) s.send(request.encode()) s.shutdown(socket.SHUT_WR) chunks = [] while len(chunks) == 0 or len(chunks[-1]) > 0: chunks.append(s.recv(4096)) s.close() return b"".join(chunks).decode() class Host: def __init__(self, name, alias, status, webname, vhost): self.name = name self.alias = alias self.webname = webname or alias self.vhost = vhost self.status = status self.services = [] @classmethod def parse_hosts(cls, payload, vhost): parsed = filter(lambda x: x.vhost == vhost, [cls.parse(p) for p in json.loads(payload)]) return {p.name: p for p in parsed} @classmethod def parse(cls, payload): return cls(payload[0], payload[1], HOST_STATUS[payload[2]], payload[3].get("WEBSTATUS_NAME"), payload[3].get("WEBSTATUS_VHOST")) def __repr__(self): return "Host {}: {} ({})".format(self.name, self.alias, self.webname) @classmethod def query(cls, vhost): answer = get_lq("""GET hosts Filter: groups >= webstatus-hosts Columns: name alias state custom_variables OutputFormat: json """) return cls.parse_hosts(answer, vhost) def fill_services(self, services): self.services = [service for service in services if service.host == self.name] class ServiceGroup: def __init__(self, name, alias): self.name = name self.alias = alias self.services = [] @classmethod def parse_groups(cls, payload): parsed = [cls.parse(p) for p in json.loads(payload)] return {p.name: p for p in parsed} @classmethod def parse(cls, payload): return cls(payload[0], payload[1]) @classmethod def query(cls): answer = get_lq("""GET servicegroups Filter: name ~ ^webstatus- Columns: name alias custom_variables OutputFormat: json """) return cls.parse_groups(answer) def fill_services(self, services, hosts): self.services = [service for service in services if any([group == self.name for group in service.groups]) and service.host in hosts] def __repr__(self): return "ServiceGroup {}: {}".format(self.name, self.alias) class Service: def __init__(self, name, host, groups, status, webname, url, description, infos): self.name = name self.host = host self.groups = groups self.status = status self.webname = webname self.url = url self.description = description self.infos = infos @classmethod def parse_services(cls, payload): parsed = json.loads(payload) return [cls.parse(p) for p in parsed if cls.valid(p[2])] @staticmethod def valid(groups): return any([b.startswith("webstatus-") for b in groups]) @classmethod def parse(cls, payload): return cls(payload[0], payload[1], payload[2], STATUS[payload[3]], payload[4].get("WEBSTATUS_NAME"), payload[4].get("WEBSTATUS_URL"), payload[5], payload[6]) @classmethod def query(cls): answer = get_lq("""GET services Columns: display_name host_name groups state custom_variables description plugin_output OutputFormat: json """) return cls.parse_services(answer) def __repr__(self): return "Service {}: {}".format(self.name, self.webname) def get_infos(vhost): hosts = Host.query(vhost) servicegroups = ServiceGroup.query() services = Service.query() for host in hosts: hosts[host].fill_services(services) for group in servicegroups: servicegroups[group].fill_services(services, hosts) return (hosts, servicegroups, services) TEMPLATE=''' Status

Hosts

{%- for host in hosts.values() %}

{{ host.status }} {{ host.webname }}

{%- for service in servicegroups["webstatus-resources"].services if service.host == host.name -%} {%- if loop.first %} {% endif %} {% endfor %} {%- endfor %} {%- for group in servicegroups.values() if group.services and group.name != "webstatus-resources" %} {%- if loop.first %}

Services

{%- endif %}

{{ group.alias }}

{%- for service in group.services if service.host in hosts -%} {%- if loop.first %} {% endif %} {%- endfor -%}
{%- if loop.last %}
{% endif %} {%- endfor %} ''' @login_manager.request_loader def load_user_from_request(request): api_key = request.headers.get('Token') if api_key in AUTHORIZED_KEYS: return UserMixin() content = request.get_json(force=True, silent=True) if content is not None and content.get("token") in AUTHORIZED_KEYS: return UserMixin() @app.route("/live", methods=["POST"]) @login_required def live(): query = request.get_data() result = get_lq(query.decode() + "\n") resp = make_response(result) resp.content_type = "text/plain" return resp @app.route("/", methods=["GET"]) def get(): (hosts, servicegroups, services) = get_infos(request.host) resp = make_response(render_template_string(TEMPLATE, hosts=hosts, servicegroups=servicegroups)) resp.content_type = "text/html" return resp @app.route("/", methods=["POST"]) @login_required def push(): content = request.get_json(force=True, silent=True) if content is None: return ERROR_BAD_JSON if content.get("cmd") != "submitcheck": return render_error(ERROR_NO_REQUEST_HANDLER) if "checkresult" not in content or not isinstance(content["checkresult"], list): return render_error(ERROR_BAD_DATA) checks = 0 errors = 0 for check in map(lambda x: CheckResult.from_json(x), content["checkresult"]): if check is None: errors += 1 continue try: write_check_output(check) except Exception as e: return render_error(str(e)) checks += 1 return render_response(checks, errors) def write_check_output(check): if check.type== "service": command = "[{time}] PROCESS_SERVICE_CHECK_RESULT;{hostname};{servicename};{state};{output}"; else: command = "[{time}] PROCESS_HOST_CHECK_RESULT;{hostname};{state};{output}"; formatted = command.format( time=int(time.time()), hostname=check.hostname, state=check.state, output=check.output, servicename=check.servicename, ) if not os.path.exists(COMMAND_FILE): raise Exception(ERROR_BAD_COMMAND_FILE) if not os.access(COMMAND_FILE, os.W_OK): raise Exception(ERROR_COMMAND_FILE_OPEN_WRITE) if not os.access(COMMAND_FILE, os.W_OK): raise Exception(ERROR_COMMAND_FILE_OPEN_WRITE) try: with open(COMMAND_FILE, "w") as c: c.write(formatted + "\n") except Exception as e: raise Exception(ERROR_BAD_WRITE) def render_error(error): return jsonify({ "status": "error", "message": error, }) def render_response(checks, errors): if checks > 0: return jsonify({ "status": "ok", "result": { "checks": checks, "errors": errors, } }) else: return jsonify({ "status": "error", "message": ERROR_NO_CORRECT_STATUS, }) class CheckResult: def __init__(self, hostname, state, output, servicename, checktype): self.hostname = hostname self.state = state self.output = output self.servicename = servicename self.type = checktype @classmethod def from_json(klass, j): if not isinstance(j, dict): return None for key in ["hostname", "state", "output"]: if key not in j or not isinstance(j[key], str): return None for key in ["servicename", "type"]: if key in j and not isinstance(j[key], str): return None return klass( j["hostname"], j["state"], j["output"], j.get("servicename", ""), j.get("type", "host"))