from flask import Flask, request, render_template_string, jsonify, make_response
from flask_login import LoginManager, UserMixin, login_required
import socket
import json
import time
import os
login_manager = LoginManager()
app = Flask(__name__)
login_manager.init_app(app)
STATUS = [
"ok",
"warning",
"error",
"unknown"
]
HOST_STATUS = [
"up",
"down",
"unreachable",
]
#### Push
AUTHORIZED_KEYS = os.environ.get("TOKENS", "").split()
COMMAND_FILE = "/var/run/naemon/naemon.cmd"
ERROR_NO_REQUEST_HANDLER="NO REQUEST HANDLER"
ERROR_NO_TOKEN_SUPPLIED="NO TOKEN"
ERROR_BAD_TOKEN_SUPPLIED="BAD TOKEN"
ERROR_BAD_COMMAND_FILE="BAD COMMAND FILE"
ERROR_COMMAND_FILE_OPEN_WRITE="COMMAND FILE UNWRITEABLE"
ERROR_COMMAND_FILE_OPEN="CANNOT OPEN COMMAND FILE"
ERROR_BAD_WRITE="WRITE ERROR"
ERROR_BAD_DATA="BAD DATA"
ERROR_BAD_JSON="BAD JSON"
ERROR_NO_CORRECT_STATUS="NO STATUS WAS CORRECT"
#### /Push
def get_lq(request):
# https://mathias-kettner.de/checkmk_livestatus.html
socket_path="/var/run/naemon/live"
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
s.connect(socket_path)
s.send(request.encode())
s.shutdown(socket.SHUT_WR)
chunks = []
while len(chunks) == 0 or len(chunks[-1]) > 0:
chunks.append(s.recv(4096))
s.close()
return b"".join(chunks).decode()
class Host:
def __init__(self, name, alias, status, webname, vhost):
self.name = name
self.alias = alias
self.webname = webname or alias
self.vhost = vhost
self.status = status
self.services = []
@classmethod
def parse_hosts(cls, payload, vhost):
parsed = filter(lambda x: x.vhost == vhost, [cls.parse(p) for p in json.loads(payload)])
return {p.name: p for p in parsed}
@classmethod
def parse(cls, payload):
return cls(payload[0], payload[1], HOST_STATUS[payload[2]], payload[3].get("WEBSTATUS_NAME"), payload[3].get("WEBSTATUS_VHOST"))
def __repr__(self):
return "Host {}: {} ({})".format(self.name, self.alias, self.webname)
@classmethod
def query(cls, vhost):
answer = get_lq("""GET hosts
Filter: groups >= webstatus-hosts
Columns: name alias state custom_variables
OutputFormat: json
""")
return cls.parse_hosts(answer, vhost)
def fill_services(self, services):
self.services = [service for service in services if service.host == self.name]
class ServiceGroup:
def __init__(self, name, alias):
self.name = name
self.alias = alias
self.services = []
@classmethod
def parse_groups(cls, payload):
parsed = [cls.parse(p) for p in json.loads(payload)]
return {p.name: p for p in parsed}
@classmethod
def parse(cls, payload):
return cls(payload[0], payload[1])
@classmethod
def query(cls):
answer = get_lq("""GET servicegroups
Filter: name ~ ^webstatus-
Columns: name alias custom_variables
OutputFormat: json
""")
return cls.parse_groups(answer)
def fill_services(self, services, hosts):
self.services = [service for service in services if any([group == self.name for group in service.groups]) and service.host in hosts]
def __repr__(self):
return "ServiceGroup {}: {}".format(self.name, self.alias)
class Service:
def __init__(self, name, host, groups, status, webname, url, description, infos):
self.name = name
self.host = host
self.groups = groups
self.status = status
self.webname = webname
self.url = url
self.description = description
self.infos = infos
@classmethod
def parse_services(cls, payload):
parsed = json.loads(payload)
return [cls.parse(p) for p in parsed if cls.valid(p[2])]
@staticmethod
def valid(groups):
return any([b.startswith("webstatus-") for b in groups])
@classmethod
def parse(cls, payload):
return cls(payload[0],
payload[1],
payload[2],
STATUS[payload[3]],
payload[4].get("WEBSTATUS_NAME"),
payload[4].get("WEBSTATUS_URL"),
payload[5],
payload[6])
@classmethod
def query(cls):
answer = get_lq("""GET services
Columns: display_name host_name groups state custom_variables description plugin_output
OutputFormat: json
""")
return cls.parse_services(answer)
def __repr__(self):
return "Service {}: {}".format(self.name, self.webname)
def get_infos(vhost):
hosts = Host.query(vhost)
servicegroups = ServiceGroup.query()
services = Service.query()
for host in hosts:
hosts[host].fill_services(services)
for group in servicegroups:
servicegroups[group].fill_services(services, hosts)
return (hosts, servicegroups, services)
TEMPLATE='''<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN" "http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd">
<html lang="en" xmlns="http://www.w3.org/1999/xhtml">
<head>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>Status</title>
<meta name="referrer" content="no-referrer" />
<style type="text/css">
ul {
list-style: none;
margin: 0px;
}
ul li:nth-child(2n) {
background-color: rgb(240, 240, 240);
}
li.resource, li.service {
margin: 1px 0px;
}
span.status {
display: inline-block;
width: 150px;
text-align: center;
margin-right: 5px;
font-variant: small-caps;
font-size: 1.2em;
}
.status_ok,.status_up {
background-color: rgba(0, 255, 0, 0.5);;
}
.status_warning {
background-color: rgba(255, 255, 0, 0.5);;
}
.status_error,.status_down {
background-color: rgba(255, 0, 0, 0.5);;
}
.status_unknown,.status_unreachable {
background-color: rgba(0, 0, 255, 0.5);;
}
.infos {
margin-left: 40px;
color: rgb(100, 100, 100);
}
div#services {
column-count: auto;
column-width: 36em;
}
div.servicegroup {
-webkit-column-break-inside: avoid;
break-inside: avoid;
}
h3.servicegroup_title, h3.host_title {
margin: 1px 0px;
}
span.service_host, span.infos {
float: right;
display: inline-block;
color: rgb(100, 100, 100);
}
</style>
</head>
<body>
<h2>Hosts</h2>
{%- for host in hosts.values() %}
<h3 class="host_title">
<span class="status status_{{ host.status }}">{{ host.status }}</span>
<span class="host">{{ host.webname }}</span>
</h3>
{%- for service in servicegroups["webstatus-resources"].services if service.host == host.name -%}
{%- if loop.first %}
<ul class="resources">
{% endif %}
<li class="resource">
<span class="status status_{{ service.status }}">{{ service.status }}</span>
<span class="description">{{ service.description }}</span>
<span class="infos">{{ service.infos }}</span>
</li>
{%- if loop.last %}
</ul>
{% endif %}
{% endfor %}
{%- endfor %}
{%- for group in servicegroups.values() if group.services and group.name != "webstatus-resources" %}
{%- if loop.first %}
<h2>Services</h2>
<div id="services">
{%- endif %}
<div class="servicegroup">
<h3 class="servicegroup_title">{{ group.alias }}</h3>
{%- for service in group.services if service.host in hosts -%}
{%- if loop.first %}
<ul class="services">
{% endif %}
<li class="service" title="{{ service.infos }}">
<span class="status status_{{ service.status }}">{{ service.status }}</span>
<span class="description">
{% if service.url and service.url.startswith("https://") %}
<a href="{{ service.url }}">{{ service.webname or service.description }}</a>
{% else %}
{{ service.webname or service.description }}
{% endif %}
</span>
<span class="service_host">{{ hosts[service.host].webname }}</span>
</li>
{%- if loop.last %}
</ul>
{% endif %}
{%- endfor -%}
</div>
{%- if loop.last %}
</div>
{% endif %}
{%- endfor %}
</body>
</html>
'''
@login_manager.request_loader
def load_user_from_request(request):
api_key = request.headers.get('Token')
if api_key in AUTHORIZED_KEYS:
return UserMixin()
content = request.get_json(force=True, silent=True)
if content is not None and content.get("token") in AUTHORIZED_KEYS:
return UserMixin()
@app.route("/live", methods=["POST"])
@login_required
def live():
query = request.get_data()
result = get_lq(query.decode() + "\n")
resp = make_response(result)
resp.content_type = "text/plain"
return resp
@app.route("/", methods=["GET"])
def get():
(hosts, servicegroups, services) = get_infos(request.host)
resp = make_response(render_template_string(TEMPLATE, hosts=hosts, servicegroups=servicegroups))
resp.content_type = "text/html"
return resp
@app.route("/", methods=["POST"])
@login_required
def push():
content = request.get_json(force=True, silent=True)
if content is None:
return ERROR_BAD_JSON
if content.get("cmd") != "submitcheck":
return render_error(ERROR_NO_REQUEST_HANDLER)
if "checkresult" not in content or not isinstance(content["checkresult"], list):
return render_error(ERROR_BAD_DATA)
checks = 0
errors = 0
for check in map(lambda x: CheckResult.from_json(x), content["checkresult"]):
if check is None:
errors += 1
continue
try:
write_check_output(check)
except Exception as e:
return render_error(str(e))
checks += 1
return render_response(checks, errors)
def write_check_output(check):
if check.type== "service":
command = "[{time}] PROCESS_SERVICE_CHECK_RESULT;{hostname};{servicename};{state};{output}";
else:
command = "[{time}] PROCESS_HOST_CHECK_RESULT;{hostname};{state};{output}";
formatted = command.format(
time=int(time.time()),
hostname=check.hostname,
state=check.state,
output=check.output,
servicename=check.servicename,
)
if not os.path.exists(COMMAND_FILE):
raise Exception(ERROR_BAD_COMMAND_FILE)
if not os.access(COMMAND_FILE, os.W_OK):
raise Exception(ERROR_COMMAND_FILE_OPEN_WRITE)
if not os.access(COMMAND_FILE, os.W_OK):
raise Exception(ERROR_COMMAND_FILE_OPEN_WRITE)
try:
with open(COMMAND_FILE, "w") as c:
c.write(formatted + "\n")
except Exception as e:
raise Exception(ERROR_BAD_WRITE)
def render_error(error):
return jsonify({
"status": "error",
"message": error,
})
def render_response(checks, errors):
if checks > 0:
return jsonify({
"status": "ok",
"result": {
"checks": checks,
"errors": errors,
}
})
else:
return jsonify({
"status": "error",
"message": ERROR_NO_CORRECT_STATUS,
})
class CheckResult:
def __init__(self, hostname, state, output, servicename, checktype):
self.hostname = hostname
self.state = state
self.output = output
self.servicename = servicename
self.type = checktype
@classmethod
def from_json(klass, j):
if not isinstance(j, dict):
return None
for key in ["hostname", "state", "output"]:
if key not in j or not isinstance(j[key], str):
return None
for key in ["servicename", "type"]:
if key in j and not isinstance(j[key], str):
return None
return klass(
j["hostname"],
j["state"],
j["output"],
j.get("servicename", ""),
j.get("type", "host"))