Files
jenkins-docker-deploy-example/app/models.py
2026-03-10 15:34:22 +00:00

494 lines
20 KiB
Python

"""SQLite schema and database helpers."""
import os
import sqlite3
from contextlib import contextmanager
from datetime import datetime, timedelta, timezone
from pathlib import Path
from werkzeug.security import check_password_hash, generate_password_hash
DATA_PATH = os.environ.get("DATA_PATH", "/app/data")
DB_PATH = Path(DATA_PATH) / "monitor.db"
# Retention: keep last N checks per service, and optionally drop checks older than N days
CHECK_RETENTION_COUNT = int(os.environ.get("CHECK_RETENTION_COUNT", "5000"))
CHECK_RETENTION_DAYS = int(os.environ.get("CHECK_RETENTION_DAYS", "0")) or None
# Rollup: aggregate checks older than N hours into hourly buckets for long-term reporting
ROLLUP_AGE_HOURS = int(os.environ.get("ROLLUP_AGE_HOURS", "24"))
def _ensure_data_dir():
Path(DATA_PATH).mkdir(parents=True, exist_ok=True)
def _migrate_add_status(conn):
"""Add status column if missing (migration for existing DBs)."""
try:
conn.execute("SELECT status FROM checks LIMIT 1")
except sqlite3.OperationalError:
conn.execute("ALTER TABLE checks ADD COLUMN status TEXT")
conn.execute("UPDATE checks SET status = CASE WHEN success = 1 THEN 'OK' ELSE 'ERROR' END")
def _migrate_add_rollups(conn):
"""Create uptime_rollups table for aggregated hourly stats (long-term reporting)."""
conn.execute("""
CREATE TABLE IF NOT EXISTS uptime_rollups (
id INTEGER PRIMARY KEY AUTOINCREMENT,
service_id INTEGER NOT NULL,
period_start TEXT NOT NULL,
period_end TEXT NOT NULL,
total_checks INTEGER NOT NULL,
success_count INTEGER NOT NULL,
sum_response_ms REAL NOT NULL,
response_count INTEGER NOT NULL,
min_response_ms REAL,
max_response_ms REAL,
FOREIGN KEY (service_id) REFERENCES services(id),
UNIQUE(service_id, period_start)
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_rollups_service ON uptime_rollups(service_id)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_rollups_period ON uptime_rollups(period_start)")
try:
conn.execute("SELECT response_count FROM uptime_rollups LIMIT 1")
except sqlite3.OperationalError:
conn.execute("ALTER TABLE uptime_rollups ADD COLUMN response_count INTEGER NOT NULL DEFAULT 0")
def _migrate_add_users(conn):
"""Create users table for authentication."""
conn.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL UNIQUE,
password_hash TEXT NOT NULL,
is_admin INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_users_username ON users(username)")
def _seed_admin_if_empty(conn):
"""Create initial admin user from env if no users exist."""
row = conn.execute("SELECT COUNT(*) FROM users").fetchone()
if row[0] > 0:
return
admin_user = os.environ.get("ADMIN_USER")
admin_password = os.environ.get("ADMIN_PASSWORD")
if not admin_user or not admin_password:
return
password_hash = generate_password_hash(admin_password)
conn.execute(
"INSERT INTO users (username, password_hash, is_admin, created_at) VALUES (?, ?, 1, ?)",
(admin_user, password_hash, datetime.utcnow().isoformat()),
)
def create_user(username: str, password: str, is_admin: bool = False) -> int | None:
"""Create a new user. Returns user id or None if username exists."""
username = username.strip()
if not username or not password:
return None
password_hash = generate_password_hash(password)
with get_db() as conn:
try:
cur = conn.execute(
"INSERT INTO users (username, password_hash, is_admin, created_at) VALUES (?, ?, ?, ?)",
(username, password_hash, 1 if is_admin else 0, datetime.utcnow().isoformat()),
)
return cur.lastrowid
except sqlite3.IntegrityError:
return None
def get_user_by_id(user_id: int) -> dict | None:
"""Get a user by id."""
with get_db() as conn:
row = conn.execute("SELECT * FROM users WHERE id = ?", (user_id,)).fetchone()
return dict(row) if row else None
def get_user_by_username(username: str) -> dict | None:
"""Get a user by username."""
with get_db() as conn:
row = conn.execute("SELECT * FROM users WHERE username = ?", (username.strip(),)).fetchone()
return dict(row) if row else None
def verify_user(username: str, password: str) -> dict | None:
"""Verify credentials and return user dict if valid."""
user = get_user_by_username(username)
if not user or not check_password_hash(user["password_hash"], password):
return None
return user
def list_users():
"""Return all users (id, username, is_admin, created_at)."""
with get_db() as conn:
rows = conn.execute(
"SELECT id, username, is_admin, created_at FROM users ORDER BY username"
).fetchall()
return [dict(r) for r in rows]
@contextmanager
def get_db():
_ensure_data_dir()
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
try:
yield conn
conn.commit()
finally:
conn.close()
def init_db():
"""Create tables if they don't exist."""
with get_db() as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS services (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
target TEXT NOT NULL,
protocol TEXT NOT NULL CHECK(protocol IN ('http', 'https', 'tcp')),
interval_seconds INTEGER NOT NULL DEFAULT 60,
created_at TEXT NOT NULL
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS checks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
service_id INTEGER NOT NULL,
success INTEGER NOT NULL,
status TEXT NOT NULL,
response_time_ms REAL,
timestamp TEXT NOT NULL,
error_message TEXT,
FOREIGN KEY (service_id) REFERENCES services(id)
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_checks_service ON checks(service_id)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_checks_timestamp ON checks(timestamp)")
_migrate_add_status(conn)
conn.execute("CREATE INDEX IF NOT EXISTS idx_checks_status ON checks(status)")
_migrate_add_rollups(conn)
_migrate_add_users(conn)
_seed_admin_if_empty(conn)
def list_services():
"""Return all services with last check info and uptime."""
with get_db() as conn:
rows = conn.execute(
"""
SELECT s.*,
(SELECT success FROM checks WHERE service_id = s.id ORDER BY timestamp DESC LIMIT 1) as last_success,
(SELECT response_time_ms FROM checks WHERE service_id = s.id ORDER BY timestamp DESC LIMIT 1) as last_response_ms,
(SELECT timestamp FROM checks WHERE service_id = s.id ORDER BY timestamp DESC LIMIT 1) as last_check,
(SELECT ROUND(100.0 * SUM(success) / NULLIF(COUNT(*), 0), 2)
FROM (SELECT success FROM checks WHERE service_id = s.id ORDER BY timestamp DESC LIMIT 1000)) as uptime_pct
FROM services s
ORDER BY s.id
"""
).fetchall()
return [dict(r) for r in rows]
def get_service(service_id: int):
"""Get a single service by id."""
with get_db() as conn:
row = conn.execute("SELECT * FROM services WHERE id = ?", (service_id,)).fetchone()
return dict(row) if row else None
def add_service(name: str, target: str, protocol: str, interval_seconds: int = 60) -> int:
"""Add a new service. Returns the new service id."""
with get_db() as conn:
cur = conn.execute(
"INSERT INTO services (name, target, protocol, interval_seconds, created_at) VALUES (?, ?, ?, ?, ?)",
(name, target, protocol, interval_seconds, datetime.utcnow().isoformat()),
)
return cur.lastrowid
def update_service(service_id: int, name: str = None, target: str = None, protocol: str = None, interval_seconds: int = None) -> bool:
"""Update a service. Only provided fields are updated. Returns True if updated."""
updates = []
args = []
if name is not None:
updates.append("name = ?")
args.append(name)
if target is not None:
updates.append("target = ?")
args.append(target)
if protocol is not None:
updates.append("protocol = ?")
args.append(protocol)
if interval_seconds is not None:
updates.append("interval_seconds = ?")
args.append(interval_seconds)
if not updates:
return True
args.append(service_id)
with get_db() as conn:
cur = conn.execute(
f"UPDATE services SET {', '.join(updates)} WHERE id = ?",
args,
)
return cur.rowcount > 0
def add_check(service_id: int, success: bool, response_time_ms: float | None, error_message: str | None = None):
"""Record a check result. status is OK or ERROR for searchability."""
status = "OK" if success else "ERROR"
with get_db() as conn:
conn.execute(
"INSERT INTO checks (service_id, success, status, response_time_ms, timestamp, error_message) VALUES (?, ?, ?, ?, ?, ?)",
(service_id, 1 if success else 0, status, response_time_ms, datetime.utcnow().isoformat(), error_message),
)
def _checks_where_args(service_id: int, from_ts: str = None, to_ts: str = None, status_filter: str = None, search: str = None):
"""Build WHERE clause and args for checks queries."""
q = "WHERE service_id = ?"
args = [service_id]
if from_ts:
q += " AND timestamp >= ?"
args.append(from_ts)
if to_ts:
q += " AND timestamp <= ?"
args.append(to_ts)
if status_filter == "error":
q += " AND status = 'ERROR'"
elif status_filter == "ok":
q += " AND status = 'OK'"
if search:
q += " AND (error_message LIKE ? OR status LIKE ?)"
args.extend([f"%{search}%", f"%{search}%"])
return q, args
def get_checks_count(service_id: int, from_ts: str = None, to_ts: str = None, status_filter: str = None, search: str = None) -> int:
"""Count checks matching filters (for pagination)."""
where, args = _checks_where_args(service_id, from_ts, to_ts, status_filter, search)
with get_db() as conn:
row = conn.execute(f"SELECT COUNT(*) FROM checks {where}", args).fetchone()
return row[0]
def get_checks(service_id: int, limit: int = 50, offset: int = 0, from_ts: str = None, to_ts: str = None, status_filter: str = None, search: str = None):
"""Get recent checks for a service, optionally filtered and paginated."""
where, args = _checks_where_args(service_id, from_ts, to_ts, status_filter, search)
args.extend([limit, offset])
with get_db() as conn:
rows = conn.execute(f"SELECT * FROM checks {where} ORDER BY timestamp DESC LIMIT ? OFFSET ?", args).fetchall()
return [dict(r) for r in rows]
def get_report_stats(service_id: int, from_ts: str = None, to_ts: str = None):
"""
Compute uptime % and latency stats for a service over a time range.
Uses hourly rollups for old data + raw checks for recent data (last ROLLUP_AGE_HOURS).
Supports accurate reporting over 90+ days.
"""
now = datetime.now(timezone.utc)
raw_cutoff = (now - timedelta(hours=ROLLUP_AGE_HOURS)).isoformat()
to_ts = to_ts or now.isoformat()
from_ts = from_ts or "1970-01-01T00:00:00"
total = 0
success_count = 0
sum_response_ms = 0.0
count_with_response = 0
min_ms = None
max_ms = None
with get_db() as conn:
# 1. Rollups: hourly buckets that end before raw_cutoff
rollup_end = raw_cutoff if raw_cutoff < to_ts else from_ts
if from_ts < rollup_end:
q = """
SELECT total_checks, success_count, sum_response_ms, response_count, min_response_ms, max_response_ms
FROM uptime_rollups
WHERE service_id = ? AND period_start >= ? AND period_start < ?
"""
rollup_rows = conn.execute(q, (service_id, from_ts, rollup_end)).fetchall()
for r in rollup_rows:
total += r["total_checks"]
success_count += r["success_count"]
sum_response_ms += r["sum_response_ms"] or 0
count_with_response += r["response_count"] or 0
if r["min_response_ms"] is not None:
min_ms = r["min_response_ms"] if min_ms is None else min(min_ms, r["min_response_ms"])
if r["max_response_ms"] is not None:
max_ms = r["max_response_ms"] if max_ms is None else max(max_ms, r["max_response_ms"])
# 2. Raw checks: recent data (overlaps with rollup period if range is entirely recent)
raw_from = from_ts if from_ts >= raw_cutoff else raw_cutoff
if raw_from <= to_ts:
q = "SELECT success, response_time_ms FROM checks WHERE service_id = ? AND timestamp >= ? AND timestamp <= ?"
raw_rows = conn.execute(q, (service_id, raw_from, to_ts)).fetchall()
for r in raw_rows:
total += 1
success_count += 1 if r["success"] else 0
if r["response_time_ms"] is not None:
sum_response_ms += r["response_time_ms"]
count_with_response += 1
min_ms = r["response_time_ms"] if min_ms is None else min(min_ms, r["response_time_ms"])
max_ms = r["response_time_ms"] if max_ms is None else max(max_ms, r["response_time_ms"])
if total == 0:
return {"total": 0, "uptime_pct": 0, "avg_ms": None, "min_ms": None, "max_ms": None}
uptime_pct = (success_count / total) * 100
avg_ms = round(sum_response_ms / count_with_response, 2) if count_with_response else None
return {
"total": total,
"uptime_pct": round(uptime_pct, 2),
"avg_ms": avg_ms,
"min_ms": round(min_ms, 2) if min_ms is not None else None,
"max_ms": round(max_ms, 2) if max_ms is not None else None,
}
def get_history(service_id: int, limit: int = 100, from_ts: str = None, to_ts: str = None):
"""Get check history for charts (JSON), optionally filtered by timestamp range."""
with get_db() as conn:
q = "SELECT timestamp, success, response_time_ms FROM checks WHERE service_id = ?"
args = [service_id]
if from_ts:
q += " AND timestamp >= ?"
args.append(from_ts)
if to_ts:
q += " AND timestamp <= ?"
args.append(to_ts)
q += " ORDER BY timestamp DESC LIMIT ?"
args.append(limit)
rows = conn.execute(q, args).fetchall()
return [{"timestamp": r["timestamp"], "success": bool(r["success"]), "response_time_ms": r["response_time_ms"]} for r in rows]
def delete_service(service_id: int) -> bool:
"""Delete a service and its check history. Returns True if deleted."""
with get_db() as conn:
conn.execute("DELETE FROM checks WHERE service_id = ?", (service_id,))
conn.execute("DELETE FROM uptime_rollups WHERE service_id = ?", (service_id,))
cur = conn.execute("DELETE FROM services WHERE id = ?", (service_id,))
return cur.rowcount > 0
def get_all_services_for_scheduler():
"""Return all services for the scheduler."""
with get_db() as conn:
rows = conn.execute("SELECT id, target, protocol, interval_seconds FROM services").fetchall()
return [dict(r) for r in rows]
def _hour_start(ts: str) -> str:
"""Return ISO timestamp truncated to hour boundary (e.g. 2026-03-10T14:00:00)."""
dt = datetime.fromisoformat(ts.replace("Z", "+00:00"))
return dt.replace(minute=0, second=0, microsecond=0).isoformat()
def rollup_old_checks() -> int:
"""
Aggregate checks older than ROLLUP_AGE_HOURS into hourly buckets.
Returns number of raw checks that were rolled up and deleted.
"""
cutoff = datetime.now(timezone.utc) - timedelta(hours=ROLLUP_AGE_HOURS)
cutoff_ts = cutoff.isoformat()
with get_db() as conn:
# Get checks older than cutoff, grouped by service and hour
rows = conn.execute(
"""
SELECT service_id,
strftime('%Y-%m-%dT%H:00:00', timestamp) as period_start,
COUNT(*) as total_checks,
SUM(success) as success_count,
SUM(CASE WHEN response_time_ms IS NOT NULL THEN response_time_ms ELSE 0 END) as sum_response_ms,
SUM(CASE WHEN response_time_ms IS NOT NULL THEN 1 ELSE 0 END) as response_count,
MIN(CASE WHEN response_time_ms IS NOT NULL THEN response_time_ms END) as min_response_ms,
MAX(response_time_ms) as max_response_ms,
GROUP_CONCAT(id) as check_ids
FROM checks
WHERE timestamp < ?
GROUP BY service_id, period_start
""",
(cutoff_ts,),
).fetchall()
if not rows:
return 0
deleted = 0
for r in rows:
period_end = datetime.fromisoformat(r["period_start"].replace("Z", "+00:00")) + timedelta(hours=1)
period_end_ts = period_end.isoformat()
conn.execute(
"""
INSERT INTO uptime_rollups (service_id, period_start, period_end, total_checks, success_count, sum_response_ms, response_count, min_response_ms, max_response_ms)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(service_id, period_start) DO UPDATE SET
total_checks = total_checks + excluded.total_checks,
success_count = success_count + excluded.success_count,
sum_response_ms = sum_response_ms + excluded.sum_response_ms,
response_count = response_count + excluded.response_count,
min_response_ms = MIN(min_response_ms, excluded.min_response_ms),
max_response_ms = MAX(max_response_ms, excluded.max_response_ms)
""",
(
r["service_id"],
r["period_start"],
period_end_ts,
r["total_checks"],
r["success_count"],
r["sum_response_ms"] or 0,
r["response_count"] or 0,
r["min_response_ms"],
r["max_response_ms"],
),
)
ids = [int(x) for x in (r["check_ids"] or "").split(",") if x]
if ids:
placeholders = ",".join("?" * len(ids))
cur = conn.execute(f"DELETE FROM checks WHERE id IN ({placeholders})", ids)
deleted += cur.rowcount
return deleted
def prune_checks_retention() -> int:
"""
Remove old checks to limit storage. Keeps last CHECK_RETENTION_COUNT per service.
If CHECK_RETENTION_DAYS is set, also deletes checks older than that.
Returns number of rows deleted.
"""
with get_db() as conn:
deleted = 0
# Delete checks older than N days (if configured)
if CHECK_RETENTION_DAYS:
cutoff = (datetime.now(timezone.utc) - timedelta(days=CHECK_RETENTION_DAYS)).isoformat()
cur = conn.execute("DELETE FROM checks WHERE timestamp < ?", (cutoff,))
deleted += cur.rowcount
# Keep only last N checks per service
service_ids = [r[0] for r in conn.execute("SELECT id FROM services").fetchall()]
for sid in service_ids:
# Get ids of checks to keep (most recent N)
keep_ids = conn.execute(
"SELECT id FROM checks WHERE service_id = ? ORDER BY timestamp DESC LIMIT ?",
(sid, CHECK_RETENTION_COUNT),
).fetchall()
keep_ids = [r[0] for r in keep_ids]
if not keep_ids:
continue
placeholders = ",".join("?" * len(keep_ids))
cur = conn.execute(
f"DELETE FROM checks WHERE service_id = ? AND id NOT IN ({placeholders})",
[sid] + keep_ids,
)
deleted += cur.rowcount
return deleted