"""Flask app and routes.""" import os from datetime import datetime, timedelta, timezone from flask import Flask, redirect, render_template, request, url_for from flask_login import LoginManager, current_user, login_required, login_user, logout_user from app import models def _is_safe_redirect_url(url: str | None) -> bool: """Check that redirect URL is relative to our app (prevents open redirect).""" if not url: return False return url.startswith("/") and "//" not in url class User: """Flask-Login compatible user wrapper.""" def __init__(self, user_dict: dict): self.id = user_dict["id"] self.username = user_dict["username"] self.is_admin = bool(user_dict.get("is_admin", 0)) def get_id(self) -> str: return str(self.id) @property def is_authenticated(self) -> bool: return True @property def is_active(self) -> bool: return True @property def is_anonymous(self) -> bool: return False def _parse_report_dates(from_ts, to_ts, preset): """Parse from/to dates, applying preset if given. Returns (from_ts, to_ts, from_display, to_display).""" now = datetime.now(timezone.utc) if preset == "24h": to_ts = now.isoformat() from_ts = (now - timedelta(hours=24)).isoformat() elif preset == "7d": to_ts = now.isoformat() from_ts = (now - timedelta(days=7)).isoformat() elif preset == "30d": to_ts = now.isoformat() from_ts = (now - timedelta(days=30)).isoformat() elif preset == "90d": to_ts = now.isoformat() from_ts = (now - timedelta(days=90)).isoformat() if from_ts and len(from_ts) == 10: from_ts = from_ts + "T00:00:00" if to_ts and len(to_ts) == 10: to_ts = to_ts + "T23:59:59.999999" from_display = from_ts[:16] if from_ts else "" to_display = to_ts[:16] if to_ts else "" return from_ts, to_ts, from_display, to_display ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) app = Flask( __name__, template_folder=os.path.join(ROOT, "templates"), static_folder=os.path.join(ROOT, "static"), ) app.config["SECRET_KEY"] = os.environ.get("SECRET_KEY", "dev-secret-change-in-production") VERSION = os.environ.get("VERSION", "dev") login_manager = LoginManager(app) login_manager.login_view = "login" login_manager.login_message = "Please log in to access this page." @login_manager.user_loader def load_user(user_id: str): user_dict = models.get_user_by_id(int(user_id)) if user_id.isdigit() else None return User(user_dict) if user_dict else None @login_manager.unauthorized_handler def unauthorized(): if request.is_json or request.accept_mimetypes.best == "application/json": return {"error": "Authentication required"}, 401 return redirect(url_for("login", next=request.url)) @app.route("/login", methods=["GET", "POST"]) def login(): if current_user.is_authenticated: return redirect(url_for("dashboard")) if request.method == "POST": username = (request.form.get("username") or "").strip() password = request.form.get("password") or "" user_dict = models.verify_user(username, password) if user_dict: login_user(User(user_dict)) next_param = request.form.get("next") or request.args.get("next") next_url = next_param if _is_safe_redirect_url(next_param) else url_for("dashboard") return redirect(next_url) return render_template("login.html", error="Invalid username or password", version=VERSION) return render_template("login.html", version=VERSION) @app.route("/logout") def logout(): logout_user() return redirect(url_for("login")) @app.route("/users", methods=["GET", "POST"]) @login_required def users(): if not current_user.is_admin: return "Forbidden", 403 if request.method == "POST": username = (request.form.get("username") or "").strip() password = request.form.get("password") or "" user_id = models.create_user(username, password, is_admin=False) if user_id: return redirect(url_for("users")) return render_template( "users.html", users=models.list_users(), error="Username already exists or invalid", version=VERSION, ) return render_template("users.html", users=models.list_users(), version=VERSION) @app.route("/") @login_required def dashboard(): services = models.list_services() return render_template("dashboard.html", services=services, version=VERSION) @app.route("/api/services", methods=["GET"]) @login_required def api_list_services(): services = models.list_services() return {"services": services} @app.route("/api/services", methods=["POST"]) @login_required def api_add_service(): data = request.get_json(silent=True) if data is None and request.form: data = request.form.to_dict() if not data: return {"error": "Request body must be valid JSON (Content-Type: application/json) or form data"}, 415 name = data.get("name", "").strip() target = data.get("target", "").strip() protocol = (data.get("protocol") or "https").lower().strip() if not name or not target: return {"error": "name and target are required"}, 400 if protocol not in ("http", "https", "tcp"): return {"error": "protocol must be http, https, or tcp"}, 400 interval = int(data.get("interval_seconds", 60)) interval = max(10, min(3600, interval)) try: sid = models.add_service(name, target, protocol, interval) if request.accept_mimetypes.best == "application/json" or request.is_json: return {"id": sid, "name": name, "target": target, "protocol": protocol} return redirect(url_for("dashboard")) except Exception as e: return {"error": str(e)}, 500 @app.route("/api/services/", methods=["DELETE"]) @login_required def api_delete_service(service_id): if models.delete_service(service_id): return {"deleted": service_id} return {"error": "Not found"}, 404 @app.route("/api/services/", methods=["PATCH"]) @login_required def api_update_service(service_id): svc = models.get_service(service_id) if not svc: return {"error": "Not found"}, 404 data = request.get_json(silent=True) if data is None and request.form: data = request.form.to_dict() if not data: return {"error": "Request body must be valid JSON or form data"}, 415 updates = {} if "name" in data and data["name"] is not None: updates["name"] = str(data["name"]).strip() if "target" in data and data["target"] is not None: updates["target"] = str(data["target"]).strip() if "protocol" in data and data["protocol"] is not None: p = str(data["protocol"]).lower().strip() if p not in ("http", "https", "tcp"): return {"error": "protocol must be http, https, or tcp"}, 400 updates["protocol"] = p if "interval_seconds" in data and data["interval_seconds"] is not None: updates["interval_seconds"] = max(10, min(3600, int(data["interval_seconds"]))) if not updates: return {"id": service_id, **dict(svc)} if updates.get("name") == "" or updates.get("target") == "": return {"error": "name and target cannot be empty"}, 400 if models.update_service(service_id, **updates): updated = models.get_service(service_id) return dict(updated) return {"error": "Update failed"}, 500 @app.route("/api/services/") @login_required def api_get_service(service_id): svc = models.get_service(service_id) if not svc: return {"error": "Not found"}, 404 checks = models.get_checks(service_id, limit=50) return {"service": dict(svc), "checks": checks} @app.route("/api/services//edit") @login_required def edit_service(service_id): svc = models.get_service(service_id) if not svc: return "Service not found", 404 return render_template("edit.html", service=dict(svc), version=VERSION) @app.route("/api/services//report") @login_required def report(service_id): svc = models.get_service(service_id) if not svc: return "Service not found", 404 from_ts = request.args.get("from") or None to_ts = request.args.get("to") or None preset = request.args.get("preset") from_ts, to_ts, from_display, to_display = _parse_report_dates(from_ts, to_ts, preset) status_filter = request.args.get("status") search = request.args.get("search", "").strip() or None page = max(1, int(request.args.get("page", 1))) per_page = min(100, max(10, int(request.args.get("per_page", 10)))) stats = models.get_report_stats(service_id, from_ts=from_ts, to_ts=to_ts) checks_total = models.get_checks_count(service_id, from_ts=from_ts, to_ts=to_ts, status_filter=status_filter, search=search) checks = models.get_checks( service_id, limit=per_page, offset=(page - 1) * per_page, from_ts=from_ts, to_ts=to_ts, status_filter=status_filter, search=search, ) chart_checks = models.get_checks(service_id, limit=200, from_ts=from_ts, to_ts=to_ts) period_label = _format_period_label(from_display, to_display) if (from_ts or to_ts) else None total_pages = (checks_total + per_page - 1) // per_page if checks_total else 1 return render_template( "report.html", service=dict(svc), stats=stats, checks=checks, chart_checks=chart_checks, version=VERSION, from_date=from_display, to_date=to_display, period_label=period_label, preset=preset, status_filter=status_filter or "", search=search or "", page=page, per_page=per_page, checks_total=checks_total, total_pages=total_pages, ) def _format_period_label(from_display, to_display): """Format period for display.""" if from_display and to_display: return f"{from_display} to {to_display}" if from_display: return f"From {from_display}" if to_display: return f"Until {to_display}" return None @app.route("/api/services//history") @login_required def api_history(service_id): svc = models.get_service(service_id) if not svc: return {"error": "Not found"}, 404 limit = min(500, int(request.args.get("limit", 100))) from_ts = request.args.get("from") or None to_ts = request.args.get("to") or None if from_ts and len(from_ts) == 10: from_ts = from_ts + "T00:00:00" if to_ts and len(to_ts) == 10: to_ts = to_ts + "T23:59:59.999999" history = models.get_history(service_id, limit=limit, from_ts=from_ts, to_ts=to_ts) return {"history": history} @app.route("/api/services//stats") @login_required def api_report_stats(service_id): """JSON report stats with optional from/to query params for date range.""" svc = models.get_service(service_id) if not svc: return {"error": "Not found"}, 404 from_ts = request.args.get("from") or None to_ts = request.args.get("to") or None if from_ts and len(from_ts) == 10: from_ts = from_ts + "T00:00:00" if to_ts and len(to_ts) == 10: to_ts = to_ts + "T23:59:59.999999" stats = models.get_report_stats(service_id, from_ts=from_ts, to_ts=to_ts) return {"service": dict(svc), "stats": stats}