326 lines
11 KiB
Python
326 lines
11 KiB
Python
"""Flask app and routes."""
|
|
import os
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
from flask import Flask, redirect, render_template, request, url_for
|
|
from flask_login import LoginManager, current_user, login_required, login_user, logout_user
|
|
|
|
from app import models
|
|
|
|
|
|
def _is_safe_redirect_url(url: str | None) -> bool:
|
|
"""Check that redirect URL is relative to our app (prevents open redirect)."""
|
|
if not url:
|
|
return False
|
|
return url.startswith("/") and "//" not in url
|
|
|
|
|
|
class User:
|
|
"""Flask-Login compatible user wrapper."""
|
|
|
|
def __init__(self, user_dict: dict):
|
|
self.id = user_dict["id"]
|
|
self.username = user_dict["username"]
|
|
self.is_admin = bool(user_dict.get("is_admin", 0))
|
|
|
|
def get_id(self) -> str:
|
|
return str(self.id)
|
|
|
|
@property
|
|
def is_authenticated(self) -> bool:
|
|
return True
|
|
|
|
@property
|
|
def is_active(self) -> bool:
|
|
return True
|
|
|
|
@property
|
|
def is_anonymous(self) -> bool:
|
|
return False
|
|
|
|
|
|
def _parse_report_dates(from_ts, to_ts, preset):
|
|
"""Parse from/to dates, applying preset if given. Returns (from_ts, to_ts, from_display, to_display)."""
|
|
now = datetime.now(timezone.utc)
|
|
if preset == "24h":
|
|
to_ts = now.isoformat()
|
|
from_ts = (now - timedelta(hours=24)).isoformat()
|
|
elif preset == "7d":
|
|
to_ts = now.isoformat()
|
|
from_ts = (now - timedelta(days=7)).isoformat()
|
|
elif preset == "30d":
|
|
to_ts = now.isoformat()
|
|
from_ts = (now - timedelta(days=30)).isoformat()
|
|
elif preset == "90d":
|
|
to_ts = now.isoformat()
|
|
from_ts = (now - timedelta(days=90)).isoformat()
|
|
if from_ts and len(from_ts) == 10:
|
|
from_ts = from_ts + "T00:00:00"
|
|
if to_ts and len(to_ts) == 10:
|
|
to_ts = to_ts + "T23:59:59.999999"
|
|
from_display = from_ts[:16] if from_ts else ""
|
|
to_display = to_ts[:16] if to_ts else ""
|
|
return from_ts, to_ts, from_display, to_display
|
|
|
|
ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
app = Flask(
|
|
__name__,
|
|
template_folder=os.path.join(ROOT, "templates"),
|
|
static_folder=os.path.join(ROOT, "static"),
|
|
)
|
|
app.config["SECRET_KEY"] = os.environ.get("SECRET_KEY", "dev-secret-change-in-production")
|
|
VERSION = os.environ.get("VERSION", "dev")
|
|
|
|
login_manager = LoginManager(app)
|
|
login_manager.login_view = "login"
|
|
login_manager.login_message = "Please log in to access this page."
|
|
|
|
|
|
@login_manager.user_loader
|
|
def load_user(user_id: str):
|
|
user_dict = models.get_user_by_id(int(user_id)) if user_id.isdigit() else None
|
|
return User(user_dict) if user_dict else None
|
|
|
|
|
|
@login_manager.unauthorized_handler
|
|
def unauthorized():
|
|
if request.is_json or request.accept_mimetypes.best == "application/json":
|
|
return {"error": "Authentication required"}, 401
|
|
return redirect(url_for("login", next=request.url))
|
|
|
|
|
|
@app.route("/login", methods=["GET", "POST"])
|
|
def login():
|
|
if current_user.is_authenticated:
|
|
return redirect(url_for("dashboard"))
|
|
if request.method == "POST":
|
|
username = (request.form.get("username") or "").strip()
|
|
password = request.form.get("password") or ""
|
|
user_dict = models.verify_user(username, password)
|
|
if user_dict:
|
|
login_user(User(user_dict))
|
|
next_param = request.form.get("next") or request.args.get("next")
|
|
next_url = next_param if _is_safe_redirect_url(next_param) else url_for("dashboard")
|
|
return redirect(next_url)
|
|
return render_template("login.html", error="Invalid username or password", version=VERSION)
|
|
return render_template("login.html", version=VERSION)
|
|
|
|
|
|
@app.route("/logout")
|
|
def logout():
|
|
logout_user()
|
|
return redirect(url_for("login"))
|
|
|
|
|
|
@app.route("/users", methods=["GET", "POST"])
|
|
@login_required
|
|
def users():
|
|
if not current_user.is_admin:
|
|
return "Forbidden", 403
|
|
if request.method == "POST":
|
|
username = (request.form.get("username") or "").strip()
|
|
password = request.form.get("password") or ""
|
|
user_id = models.create_user(username, password, is_admin=False)
|
|
if user_id:
|
|
return redirect(url_for("users"))
|
|
return render_template(
|
|
"users.html",
|
|
users=models.list_users(),
|
|
error="Username already exists or invalid",
|
|
version=VERSION,
|
|
)
|
|
return render_template("users.html", users=models.list_users(), version=VERSION)
|
|
|
|
|
|
@app.route("/")
|
|
@login_required
|
|
def dashboard():
|
|
services = models.list_services()
|
|
return render_template("dashboard.html", services=services, version=VERSION)
|
|
|
|
|
|
@app.route("/api/services", methods=["GET"])
|
|
@login_required
|
|
def api_list_services():
|
|
services = models.list_services()
|
|
return {"services": services}
|
|
|
|
|
|
@app.route("/api/services", methods=["POST"])
|
|
@login_required
|
|
def api_add_service():
|
|
data = request.get_json(silent=True)
|
|
if data is None and request.form:
|
|
data = request.form.to_dict()
|
|
if not data:
|
|
return {"error": "Request body must be valid JSON (Content-Type: application/json) or form data"}, 415
|
|
name = data.get("name", "").strip()
|
|
target = data.get("target", "").strip()
|
|
protocol = (data.get("protocol") or "https").lower().strip()
|
|
if not name or not target:
|
|
return {"error": "name and target are required"}, 400
|
|
if protocol not in ("http", "https", "tcp"):
|
|
return {"error": "protocol must be http, https, or tcp"}, 400
|
|
interval = int(data.get("interval_seconds", 60))
|
|
interval = max(10, min(3600, interval))
|
|
try:
|
|
sid = models.add_service(name, target, protocol, interval)
|
|
if request.accept_mimetypes.best == "application/json" or request.is_json:
|
|
return {"id": sid, "name": name, "target": target, "protocol": protocol}
|
|
return redirect(url_for("dashboard"))
|
|
except Exception as e:
|
|
return {"error": str(e)}, 500
|
|
|
|
|
|
@app.route("/api/services/<int:service_id>", methods=["DELETE"])
|
|
@login_required
|
|
def api_delete_service(service_id):
|
|
if models.delete_service(service_id):
|
|
return {"deleted": service_id}
|
|
return {"error": "Not found"}, 404
|
|
|
|
|
|
@app.route("/api/services/<int:service_id>", methods=["PATCH"])
|
|
@login_required
|
|
def api_update_service(service_id):
|
|
svc = models.get_service(service_id)
|
|
if not svc:
|
|
return {"error": "Not found"}, 404
|
|
data = request.get_json(silent=True)
|
|
if data is None and request.form:
|
|
data = request.form.to_dict()
|
|
if not data:
|
|
return {"error": "Request body must be valid JSON or form data"}, 415
|
|
updates = {}
|
|
if "name" in data and data["name"] is not None:
|
|
updates["name"] = str(data["name"]).strip()
|
|
if "target" in data and data["target"] is not None:
|
|
updates["target"] = str(data["target"]).strip()
|
|
if "protocol" in data and data["protocol"] is not None:
|
|
p = str(data["protocol"]).lower().strip()
|
|
if p not in ("http", "https", "tcp"):
|
|
return {"error": "protocol must be http, https, or tcp"}, 400
|
|
updates["protocol"] = p
|
|
if "interval_seconds" in data and data["interval_seconds"] is not None:
|
|
updates["interval_seconds"] = max(10, min(3600, int(data["interval_seconds"])))
|
|
if not updates:
|
|
return {"id": service_id, **dict(svc)}
|
|
if updates.get("name") == "" or updates.get("target") == "":
|
|
return {"error": "name and target cannot be empty"}, 400
|
|
if models.update_service(service_id, **updates):
|
|
updated = models.get_service(service_id)
|
|
return dict(updated)
|
|
return {"error": "Update failed"}, 500
|
|
|
|
|
|
@app.route("/api/services/<int:service_id>")
|
|
@login_required
|
|
def api_get_service(service_id):
|
|
svc = models.get_service(service_id)
|
|
if not svc:
|
|
return {"error": "Not found"}, 404
|
|
checks = models.get_checks(service_id, limit=50)
|
|
return {"service": dict(svc), "checks": checks}
|
|
|
|
|
|
@app.route("/api/services/<int:service_id>/edit")
|
|
@login_required
|
|
def edit_service(service_id):
|
|
svc = models.get_service(service_id)
|
|
if not svc:
|
|
return "Service not found", 404
|
|
return render_template("edit.html", service=dict(svc), version=VERSION)
|
|
|
|
|
|
@app.route("/api/services/<int:service_id>/report")
|
|
@login_required
|
|
def report(service_id):
|
|
svc = models.get_service(service_id)
|
|
if not svc:
|
|
return "Service not found", 404
|
|
from_ts = request.args.get("from") or None
|
|
to_ts = request.args.get("to") or None
|
|
preset = request.args.get("preset")
|
|
from_ts, to_ts, from_display, to_display = _parse_report_dates(from_ts, to_ts, preset)
|
|
status_filter = request.args.get("status")
|
|
search = request.args.get("search", "").strip() or None
|
|
page = max(1, int(request.args.get("page", 1)))
|
|
per_page = min(100, max(10, int(request.args.get("per_page", 10))))
|
|
stats = models.get_report_stats(service_id, from_ts=from_ts, to_ts=to_ts)
|
|
checks_total = models.get_checks_count(service_id, from_ts=from_ts, to_ts=to_ts, status_filter=status_filter, search=search)
|
|
checks = models.get_checks(
|
|
service_id,
|
|
limit=per_page,
|
|
offset=(page - 1) * per_page,
|
|
from_ts=from_ts,
|
|
to_ts=to_ts,
|
|
status_filter=status_filter,
|
|
search=search,
|
|
)
|
|
chart_checks = models.get_checks(service_id, limit=200, from_ts=from_ts, to_ts=to_ts)
|
|
period_label = _format_period_label(from_display, to_display) if (from_ts or to_ts) else None
|
|
total_pages = (checks_total + per_page - 1) // per_page if checks_total else 1
|
|
return render_template(
|
|
"report.html",
|
|
service=dict(svc),
|
|
stats=stats,
|
|
checks=checks,
|
|
chart_checks=chart_checks,
|
|
version=VERSION,
|
|
from_date=from_display,
|
|
to_date=to_display,
|
|
period_label=period_label,
|
|
preset=preset,
|
|
status_filter=status_filter or "",
|
|
search=search or "",
|
|
page=page,
|
|
per_page=per_page,
|
|
checks_total=checks_total,
|
|
total_pages=total_pages,
|
|
)
|
|
|
|
|
|
def _format_period_label(from_display, to_display):
|
|
"""Format period for display."""
|
|
if from_display and to_display:
|
|
return f"{from_display} to {to_display}"
|
|
if from_display:
|
|
return f"From {from_display}"
|
|
if to_display:
|
|
return f"Until {to_display}"
|
|
return None
|
|
|
|
|
|
@app.route("/api/services/<int:service_id>/history")
|
|
@login_required
|
|
def api_history(service_id):
|
|
svc = models.get_service(service_id)
|
|
if not svc:
|
|
return {"error": "Not found"}, 404
|
|
limit = min(500, int(request.args.get("limit", 100)))
|
|
from_ts = request.args.get("from") or None
|
|
to_ts = request.args.get("to") or None
|
|
if from_ts and len(from_ts) == 10:
|
|
from_ts = from_ts + "T00:00:00"
|
|
if to_ts and len(to_ts) == 10:
|
|
to_ts = to_ts + "T23:59:59.999999"
|
|
history = models.get_history(service_id, limit=limit, from_ts=from_ts, to_ts=to_ts)
|
|
return {"history": history}
|
|
|
|
|
|
@app.route("/api/services/<int:service_id>/stats")
|
|
@login_required
|
|
def api_report_stats(service_id):
|
|
"""JSON report stats with optional from/to query params for date range."""
|
|
svc = models.get_service(service_id)
|
|
if not svc:
|
|
return {"error": "Not found"}, 404
|
|
from_ts = request.args.get("from") or None
|
|
to_ts = request.args.get("to") or None
|
|
if from_ts and len(from_ts) == 10:
|
|
from_ts = from_ts + "T00:00:00"
|
|
if to_ts and len(to_ts) == 10:
|
|
to_ts = to_ts + "T23:59:59.999999"
|
|
stats = models.get_report_stats(service_id, from_ts=from_ts, to_ts=to_ts)
|
|
return {"service": dict(svc), "stats": stats}
|