This commit is contained in:
2026-03-07 03:43:50 +00:00
parent 365cb6692e
commit 17d9033152
17 changed files with 1349 additions and 88 deletions

1
app/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""Statping clone - HTTP/HTTPS and TCP monitoring with history and reports."""

64
app/checker.py Normal file
View File

@@ -0,0 +1,64 @@
"""HTTP/HTTPS and TCP check logic."""
import socket
import time
from urllib.parse import urlparse
import requests
TIMEOUT = 10
def check_http(url: str) -> tuple[bool, float | None, str | None]:
"""
Check HTTP/HTTPS endpoint. Returns (success, response_time_ms, error_message).
Success = 2xx status code.
"""
if not url.startswith(("http://", "https://")):
url = "https://" + url
try:
start = time.perf_counter()
r = requests.get(url, timeout=TIMEOUT)
elapsed_ms = (time.perf_counter() - start) * 1000
success = 200 <= r.status_code < 300
return success, elapsed_ms, None if success else f"HTTP {r.status_code}"
except requests.RequestException as e:
return False, None, str(e)
def check_tcp(host: str, port: int) -> tuple[bool, float | None, str | None]:
"""
Check TCP connectivity. Returns (success, response_time_ms, error_message).
"""
try:
start = time.perf_counter()
sock = socket.create_connection((host, port), timeout=TIMEOUT)
sock.close()
elapsed_ms = (time.perf_counter() - start) * 1000
return True, elapsed_ms, None
except (socket.error, OSError) as e:
return False, None, str(e)
def run_check(service_id: int, target: str, protocol: str):
"""Run a single check and store the result."""
from app.models import add_check
protocol = protocol.lower()
if protocol in ("http", "https"):
success, response_time_ms, error_message = check_http(target)
elif protocol == "tcp":
parts = target.split(":")
if len(parts) != 2:
add_check(service_id, False, None, "Invalid target: expected host:port")
return
try:
port = int(parts[1])
except ValueError:
add_check(service_id, False, None, "Invalid port")
return
success, response_time_ms, error_message = check_tcp(parts[0].strip(), port)
else:
add_check(service_id, False, None, f"Unknown protocol: {protocol}")
return
add_check(service_id, success, response_time_ms, error_message)

203
app/main.py Normal file
View File

@@ -0,0 +1,203 @@
"""Flask app and routes."""
import os
from datetime import datetime, timedelta, timezone
from flask import Flask, redirect, render_template, request, url_for
from app import models
def _parse_report_dates(from_ts, to_ts, preset):
"""Parse from/to dates, applying preset if given. Returns (from_ts, to_ts, from_display, to_display)."""
now = datetime.now(timezone.utc)
if preset == "24h":
to_ts = now.isoformat()
from_ts = (now - timedelta(hours=24)).isoformat()
elif preset == "7d":
to_ts = now.isoformat()
from_ts = (now - timedelta(days=7)).isoformat()
elif preset == "30d":
to_ts = now.isoformat()
from_ts = (now - timedelta(days=30)).isoformat()
if from_ts and len(from_ts) == 10:
from_ts = from_ts + "T00:00:00"
if to_ts and len(to_ts) == 10:
to_ts = to_ts + "T23:59:59.999999"
from_display = from_ts[:16] if from_ts else ""
to_display = to_ts[:16] if to_ts else ""
return from_ts, to_ts, from_display, to_display
ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
app = Flask(
__name__,
template_folder=os.path.join(ROOT, "templates"),
static_folder=os.path.join(ROOT, "static"),
)
VERSION = os.environ.get("VERSION", "dev")
@app.route("/")
def dashboard():
services = models.list_services()
return render_template("dashboard.html", services=services, version=VERSION)
@app.route("/api/services", methods=["GET"])
def api_list_services():
services = models.list_services()
return {"services": services}
@app.route("/api/services", methods=["POST"])
def api_add_service():
data = request.get_json(silent=True)
if data is None and request.form:
data = request.form.to_dict()
if not data:
return {"error": "Request body must be valid JSON (Content-Type: application/json) or form data"}, 415
name = data.get("name", "").strip()
target = data.get("target", "").strip()
protocol = (data.get("protocol") or "https").lower().strip()
if not name or not target:
return {"error": "name and target are required"}, 400
if protocol not in ("http", "https", "tcp"):
return {"error": "protocol must be http, https, or tcp"}, 400
interval = int(data.get("interval_seconds", 60))
interval = max(10, min(3600, interval))
try:
sid = models.add_service(name, target, protocol, interval)
if request.accept_mimetypes.best == "application/json" or request.is_json:
return {"id": sid, "name": name, "target": target, "protocol": protocol}
return redirect(url_for("dashboard"))
except Exception as e:
return {"error": str(e)}, 500
@app.route("/api/services/<int:service_id>", methods=["DELETE"])
def api_delete_service(service_id):
if models.delete_service(service_id):
return {"deleted": service_id}
return {"error": "Not found"}, 404
@app.route("/api/services/<int:service_id>", methods=["PATCH"])
def api_update_service(service_id):
svc = models.get_service(service_id)
if not svc:
return {"error": "Not found"}, 404
data = request.get_json(silent=True)
if data is None and request.form:
data = request.form.to_dict()
if not data:
return {"error": "Request body must be valid JSON or form data"}, 415
updates = {}
if "name" in data and data["name"] is not None:
updates["name"] = str(data["name"]).strip()
if "target" in data and data["target"] is not None:
updates["target"] = str(data["target"]).strip()
if "protocol" in data and data["protocol"] is not None:
p = str(data["protocol"]).lower().strip()
if p not in ("http", "https", "tcp"):
return {"error": "protocol must be http, https, or tcp"}, 400
updates["protocol"] = p
if "interval_seconds" in data and data["interval_seconds"] is not None:
updates["interval_seconds"] = max(10, min(3600, int(data["interval_seconds"])))
if not updates:
return {"id": service_id, **dict(svc)}
if updates.get("name") == "" or updates.get("target") == "":
return {"error": "name and target cannot be empty"}, 400
if models.update_service(service_id, **updates):
updated = models.get_service(service_id)
return dict(updated)
return {"error": "Update failed"}, 500
@app.route("/api/services/<int:service_id>")
def api_get_service(service_id):
svc = models.get_service(service_id)
if not svc:
return {"error": "Not found"}, 404
checks = models.get_checks(service_id, limit=50)
return {"service": dict(svc), "checks": checks}
@app.route("/api/services/<int:service_id>/edit")
def edit_service(service_id):
svc = models.get_service(service_id)
if not svc:
return "Service not found", 404
return render_template("edit.html", service=dict(svc), version=VERSION)
@app.route("/api/services/<int:service_id>/report")
def report(service_id):
svc = models.get_service(service_id)
if not svc:
return "Service not found", 404
from_ts = request.args.get("from") or None
to_ts = request.args.get("to") or None
preset = request.args.get("preset")
from_ts, to_ts, from_display, to_display = _parse_report_dates(from_ts, to_ts, preset)
status_filter = request.args.get("status")
search = request.args.get("search", "").strip() or None
stats = models.get_report_stats(service_id, from_ts=from_ts, to_ts=to_ts)
checks = models.get_checks(service_id, limit=100, from_ts=from_ts, to_ts=to_ts, status_filter=status_filter, search=search)
chart_checks = models.get_checks(service_id, limit=200, from_ts=from_ts, to_ts=to_ts)
period_label = _format_period_label(from_display, to_display) if (from_ts or to_ts) else None
return render_template(
"report.html",
service=dict(svc),
stats=stats,
checks=checks,
chart_checks=chart_checks,
version=VERSION,
from_date=from_display,
to_date=to_display,
period_label=period_label,
preset=preset,
status_filter=status_filter or "",
search=search or "",
)
def _format_period_label(from_display, to_display):
"""Format period for display."""
if from_display and to_display:
return f"{from_display} to {to_display}"
if from_display:
return f"From {from_display}"
if to_display:
return f"Until {to_display}"
return None
@app.route("/api/services/<int:service_id>/history")
def api_history(service_id):
svc = models.get_service(service_id)
if not svc:
return {"error": "Not found"}, 404
limit = min(500, int(request.args.get("limit", 100)))
from_ts = request.args.get("from") or None
to_ts = request.args.get("to") or None
if from_ts and len(from_ts) == 10:
from_ts = from_ts + "T00:00:00"
if to_ts and len(to_ts) == 10:
to_ts = to_ts + "T23:59:59.999999"
history = models.get_history(service_id, limit=limit, from_ts=from_ts, to_ts=to_ts)
return {"history": history}
@app.route("/api/services/<int:service_id>/stats")
def api_report_stats(service_id):
"""JSON report stats with optional from/to query params for date range."""
svc = models.get_service(service_id)
if not svc:
return {"error": "Not found"}, 404
from_ts = request.args.get("from") or None
to_ts = request.args.get("to") or None
if from_ts and len(from_ts) == 10:
from_ts = from_ts + "T00:00:00"
if to_ts and len(to_ts) == 10:
to_ts = to_ts + "T23:59:59.999999"
stats = models.get_report_stats(service_id, from_ts=from_ts, to_ts=to_ts)
return {"service": dict(svc), "stats": stats}

221
app/models.py Normal file
View File

@@ -0,0 +1,221 @@
"""SQLite schema and database helpers."""
import os
import sqlite3
from contextlib import contextmanager
from datetime import datetime
from pathlib import Path
DATA_PATH = os.environ.get("DATA_PATH", "/app/data")
DB_PATH = Path(DATA_PATH) / "monitor.db"
def _ensure_data_dir():
Path(DATA_PATH).mkdir(parents=True, exist_ok=True)
def _migrate_add_status(conn):
"""Add status column if missing (migration for existing DBs)."""
try:
conn.execute("SELECT status FROM checks LIMIT 1")
except sqlite3.OperationalError:
conn.execute("ALTER TABLE checks ADD COLUMN status TEXT")
conn.execute("UPDATE checks SET status = CASE WHEN success = 1 THEN 'OK' ELSE 'ERROR' END")
@contextmanager
def get_db():
_ensure_data_dir()
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
try:
yield conn
conn.commit()
finally:
conn.close()
def init_db():
"""Create tables if they don't exist."""
with get_db() as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS services (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
target TEXT NOT NULL,
protocol TEXT NOT NULL CHECK(protocol IN ('http', 'https', 'tcp')),
interval_seconds INTEGER NOT NULL DEFAULT 60,
created_at TEXT NOT NULL
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS checks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
service_id INTEGER NOT NULL,
success INTEGER NOT NULL,
status TEXT NOT NULL,
response_time_ms REAL,
timestamp TEXT NOT NULL,
error_message TEXT,
FOREIGN KEY (service_id) REFERENCES services(id)
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_checks_service ON checks(service_id)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_checks_timestamp ON checks(timestamp)")
_migrate_add_status(conn)
conn.execute("CREATE INDEX IF NOT EXISTS idx_checks_status ON checks(status)")
def list_services():
"""Return all services with last check info and uptime."""
with get_db() as conn:
rows = conn.execute(
"""
SELECT s.*,
(SELECT success FROM checks WHERE service_id = s.id ORDER BY timestamp DESC LIMIT 1) as last_success,
(SELECT response_time_ms FROM checks WHERE service_id = s.id ORDER BY timestamp DESC LIMIT 1) as last_response_ms,
(SELECT timestamp FROM checks WHERE service_id = s.id ORDER BY timestamp DESC LIMIT 1) as last_check,
(SELECT ROUND(100.0 * SUM(success) / NULLIF(COUNT(*), 0), 2)
FROM (SELECT success FROM checks WHERE service_id = s.id ORDER BY timestamp DESC LIMIT 1000)) as uptime_pct
FROM services s
ORDER BY s.id
"""
).fetchall()
return [dict(r) for r in rows]
def get_service(service_id: int):
"""Get a single service by id."""
with get_db() as conn:
row = conn.execute("SELECT * FROM services WHERE id = ?", (service_id,)).fetchone()
return dict(row) if row else None
def add_service(name: str, target: str, protocol: str, interval_seconds: int = 60) -> int:
"""Add a new service. Returns the new service id."""
with get_db() as conn:
cur = conn.execute(
"INSERT INTO services (name, target, protocol, interval_seconds, created_at) VALUES (?, ?, ?, ?, ?)",
(name, target, protocol, interval_seconds, datetime.utcnow().isoformat()),
)
return cur.lastrowid
def update_service(service_id: int, name: str = None, target: str = None, protocol: str = None, interval_seconds: int = None) -> bool:
"""Update a service. Only provided fields are updated. Returns True if updated."""
updates = []
args = []
if name is not None:
updates.append("name = ?")
args.append(name)
if target is not None:
updates.append("target = ?")
args.append(target)
if protocol is not None:
updates.append("protocol = ?")
args.append(protocol)
if interval_seconds is not None:
updates.append("interval_seconds = ?")
args.append(interval_seconds)
if not updates:
return True
args.append(service_id)
with get_db() as conn:
cur = conn.execute(
f"UPDATE services SET {', '.join(updates)} WHERE id = ?",
args,
)
return cur.rowcount > 0
def add_check(service_id: int, success: bool, response_time_ms: float | None, error_message: str | None = None):
"""Record a check result. status is OK or ERROR for searchability."""
status = "OK" if success else "ERROR"
with get_db() as conn:
conn.execute(
"INSERT INTO checks (service_id, success, status, response_time_ms, timestamp, error_message) VALUES (?, ?, ?, ?, ?, ?)",
(service_id, 1 if success else 0, status, response_time_ms, datetime.utcnow().isoformat(), error_message),
)
def get_checks(service_id: int, limit: int = 50, from_ts: str = None, to_ts: str = None, status_filter: str = None, search: str = None):
"""Get recent checks for a service, optionally filtered by timestamp, status (ok/error), and error search."""
with get_db() as conn:
q = "SELECT * FROM checks WHERE service_id = ?"
args = [service_id]
if from_ts:
q += " AND timestamp >= ?"
args.append(from_ts)
if to_ts:
q += " AND timestamp <= ?"
args.append(to_ts)
if status_filter == "error":
q += " AND status = 'ERROR'"
elif status_filter == "ok":
q += " AND status = 'OK'"
if search:
q += " AND (error_message LIKE ? OR status LIKE ?)"
args.extend([f"%{search}%", f"%{search}%"])
q += " ORDER BY timestamp DESC LIMIT ?"
args.append(limit)
rows = conn.execute(q, args).fetchall()
return [dict(r) for r in rows]
def get_report_stats(service_id: int, from_ts: str = None, to_ts: str = None):
"""Compute uptime % and latency stats for a service, optionally over a time range."""
with get_db() as conn:
q = "SELECT success, response_time_ms FROM checks WHERE service_id = ?"
args = [service_id]
if from_ts:
q += " AND timestamp >= ?"
args.append(from_ts)
if to_ts:
q += " AND timestamp <= ?"
args.append(to_ts)
q += " ORDER BY timestamp DESC LIMIT 10000"
rows = conn.execute(q, args).fetchall()
if not rows:
return {"total": 0, "uptime_pct": 0, "avg_ms": None, "min_ms": None, "max_ms": None}
total = len(rows)
success_count = sum(1 for r in rows if r["success"])
uptime_pct = (success_count / total) * 100 if total else 0
response_times = [r["response_time_ms"] for r in rows if r["response_time_ms"] is not None]
return {
"total": total,
"uptime_pct": round(uptime_pct, 2),
"avg_ms": round(sum(response_times) / len(response_times), 2) if response_times else None,
"min_ms": min(response_times) if response_times else None,
"max_ms": max(response_times) if response_times else None,
}
def get_history(service_id: int, limit: int = 100, from_ts: str = None, to_ts: str = None):
"""Get check history for charts (JSON), optionally filtered by timestamp range."""
with get_db() as conn:
q = "SELECT timestamp, success, response_time_ms FROM checks WHERE service_id = ?"
args = [service_id]
if from_ts:
q += " AND timestamp >= ?"
args.append(from_ts)
if to_ts:
q += " AND timestamp <= ?"
args.append(to_ts)
q += " ORDER BY timestamp DESC LIMIT ?"
args.append(limit)
rows = conn.execute(q, args).fetchall()
return [{"timestamp": r["timestamp"], "success": bool(r["success"]), "response_time_ms": r["response_time_ms"]} for r in rows]
def delete_service(service_id: int) -> bool:
"""Delete a service and its check history. Returns True if deleted."""
with get_db() as conn:
conn.execute("DELETE FROM checks WHERE service_id = ?", (service_id,))
cur = conn.execute("DELETE FROM services WHERE id = ?", (service_id,))
return cur.rowcount > 0
def get_all_services_for_scheduler():
"""Return all services for the scheduler."""
with get_db() as conn:
rows = conn.execute("SELECT id, target, protocol, interval_seconds FROM services").fetchall()
return [dict(r) for r in rows]

41
app/scheduler.py Normal file
View File

@@ -0,0 +1,41 @@
"""APScheduler setup for periodic checks."""
from apscheduler.schedulers.background import BackgroundScheduler
from app.checker import run_check
from app.models import get_all_services_for_scheduler
def _run_all_checks():
"""Run checks for all registered services."""
services = get_all_services_for_scheduler()
for svc in services:
run_check(svc["id"], svc["target"], svc["protocol"])
def start_scheduler():
"""Start the background scheduler. Uses interval jobs per service."""
scheduler = BackgroundScheduler()
def add_jobs():
services = get_all_services_for_scheduler()
for svc in services:
job_id = f"service_{svc['id']}"
if scheduler.get_job(job_id):
scheduler.remove_job(job_id)
interval = max(10, svc["interval_seconds"])
scheduler.add_job(
run_check,
"interval",
seconds=interval,
id=job_id,
args=[svc["id"], svc["target"], svc["protocol"]],
)
# Run checks immediately on startup, then schedule
_run_all_checks()
add_jobs()
# Refresh job list every 60 seconds in case services were added
scheduler.add_job(add_jobs, "interval", seconds=60, id="refresh_jobs")
scheduler.start()