"""SQLite schema and database helpers.""" import os import sqlite3 from contextlib import contextmanager from datetime import datetime, timedelta, timezone from pathlib import Path DATA_PATH = os.environ.get("DATA_PATH", "/app/data") DB_PATH = Path(DATA_PATH) / "monitor.db" # Retention: keep last N checks per service, and optionally drop checks older than N days CHECK_RETENTION_COUNT = int(os.environ.get("CHECK_RETENTION_COUNT", "5000")) CHECK_RETENTION_DAYS = int(os.environ.get("CHECK_RETENTION_DAYS", "0")) or None # Rollup: aggregate checks older than N hours into hourly buckets for long-term reporting ROLLUP_AGE_HOURS = int(os.environ.get("ROLLUP_AGE_HOURS", "24")) def _ensure_data_dir(): Path(DATA_PATH).mkdir(parents=True, exist_ok=True) def _migrate_add_status(conn): """Add status column if missing (migration for existing DBs).""" try: conn.execute("SELECT status FROM checks LIMIT 1") except sqlite3.OperationalError: conn.execute("ALTER TABLE checks ADD COLUMN status TEXT") conn.execute("UPDATE checks SET status = CASE WHEN success = 1 THEN 'OK' ELSE 'ERROR' END") def _migrate_add_rollups(conn): """Create uptime_rollups table for aggregated hourly stats (long-term reporting).""" conn.execute(""" CREATE TABLE IF NOT EXISTS uptime_rollups ( id INTEGER PRIMARY KEY AUTOINCREMENT, service_id INTEGER NOT NULL, period_start TEXT NOT NULL, period_end TEXT NOT NULL, total_checks INTEGER NOT NULL, success_count INTEGER NOT NULL, sum_response_ms REAL NOT NULL, response_count INTEGER NOT NULL, min_response_ms REAL, max_response_ms REAL, FOREIGN KEY (service_id) REFERENCES services(id), UNIQUE(service_id, period_start) ) """) conn.execute("CREATE INDEX IF NOT EXISTS idx_rollups_service ON uptime_rollups(service_id)") conn.execute("CREATE INDEX IF NOT EXISTS idx_rollups_period ON uptime_rollups(period_start)") try: conn.execute("SELECT response_count FROM uptime_rollups LIMIT 1") except sqlite3.OperationalError: conn.execute("ALTER TABLE uptime_rollups ADD COLUMN response_count INTEGER NOT NULL DEFAULT 0") @contextmanager def get_db(): _ensure_data_dir() conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row try: yield conn conn.commit() finally: conn.close() def init_db(): """Create tables if they don't exist.""" with get_db() as conn: conn.execute(""" CREATE TABLE IF NOT EXISTS services ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, target TEXT NOT NULL, protocol TEXT NOT NULL CHECK(protocol IN ('http', 'https', 'tcp')), interval_seconds INTEGER NOT NULL DEFAULT 60, created_at TEXT NOT NULL ) """) conn.execute(""" CREATE TABLE IF NOT EXISTS checks ( id INTEGER PRIMARY KEY AUTOINCREMENT, service_id INTEGER NOT NULL, success INTEGER NOT NULL, status TEXT NOT NULL, response_time_ms REAL, timestamp TEXT NOT NULL, error_message TEXT, FOREIGN KEY (service_id) REFERENCES services(id) ) """) conn.execute("CREATE INDEX IF NOT EXISTS idx_checks_service ON checks(service_id)") conn.execute("CREATE INDEX IF NOT EXISTS idx_checks_timestamp ON checks(timestamp)") _migrate_add_status(conn) conn.execute("CREATE INDEX IF NOT EXISTS idx_checks_status ON checks(status)") _migrate_add_rollups(conn) def list_services(): """Return all services with last check info and uptime.""" with get_db() as conn: rows = conn.execute( """ SELECT s.*, (SELECT success FROM checks WHERE service_id = s.id ORDER BY timestamp DESC LIMIT 1) as last_success, (SELECT response_time_ms FROM checks WHERE service_id = s.id ORDER BY timestamp DESC LIMIT 1) as last_response_ms, (SELECT timestamp FROM checks WHERE service_id = s.id ORDER BY timestamp DESC LIMIT 1) as last_check, (SELECT ROUND(100.0 * SUM(success) / NULLIF(COUNT(*), 0), 2) FROM (SELECT success FROM checks WHERE service_id = s.id ORDER BY timestamp DESC LIMIT 1000)) as uptime_pct FROM services s ORDER BY s.id """ ).fetchall() return [dict(r) for r in rows] def get_service(service_id: int): """Get a single service by id.""" with get_db() as conn: row = conn.execute("SELECT * FROM services WHERE id = ?", (service_id,)).fetchone() return dict(row) if row else None def add_service(name: str, target: str, protocol: str, interval_seconds: int = 60) -> int: """Add a new service. Returns the new service id.""" with get_db() as conn: cur = conn.execute( "INSERT INTO services (name, target, protocol, interval_seconds, created_at) VALUES (?, ?, ?, ?, ?)", (name, target, protocol, interval_seconds, datetime.utcnow().isoformat()), ) return cur.lastrowid def update_service(service_id: int, name: str = None, target: str = None, protocol: str = None, interval_seconds: int = None) -> bool: """Update a service. Only provided fields are updated. Returns True if updated.""" updates = [] args = [] if name is not None: updates.append("name = ?") args.append(name) if target is not None: updates.append("target = ?") args.append(target) if protocol is not None: updates.append("protocol = ?") args.append(protocol) if interval_seconds is not None: updates.append("interval_seconds = ?") args.append(interval_seconds) if not updates: return True args.append(service_id) with get_db() as conn: cur = conn.execute( f"UPDATE services SET {', '.join(updates)} WHERE id = ?", args, ) return cur.rowcount > 0 def add_check(service_id: int, success: bool, response_time_ms: float | None, error_message: str | None = None): """Record a check result. status is OK or ERROR for searchability.""" status = "OK" if success else "ERROR" with get_db() as conn: conn.execute( "INSERT INTO checks (service_id, success, status, response_time_ms, timestamp, error_message) VALUES (?, ?, ?, ?, ?, ?)", (service_id, 1 if success else 0, status, response_time_ms, datetime.utcnow().isoformat(), error_message), ) def _checks_where_args(service_id: int, from_ts: str = None, to_ts: str = None, status_filter: str = None, search: str = None): """Build WHERE clause and args for checks queries.""" q = "WHERE service_id = ?" args = [service_id] if from_ts: q += " AND timestamp >= ?" args.append(from_ts) if to_ts: q += " AND timestamp <= ?" args.append(to_ts) if status_filter == "error": q += " AND status = 'ERROR'" elif status_filter == "ok": q += " AND status = 'OK'" if search: q += " AND (error_message LIKE ? OR status LIKE ?)" args.extend([f"%{search}%", f"%{search}%"]) return q, args def get_checks_count(service_id: int, from_ts: str = None, to_ts: str = None, status_filter: str = None, search: str = None) -> int: """Count checks matching filters (for pagination).""" where, args = _checks_where_args(service_id, from_ts, to_ts, status_filter, search) with get_db() as conn: row = conn.execute(f"SELECT COUNT(*) FROM checks {where}", args).fetchone() return row[0] def get_checks(service_id: int, limit: int = 50, offset: int = 0, from_ts: str = None, to_ts: str = None, status_filter: str = None, search: str = None): """Get recent checks for a service, optionally filtered and paginated.""" where, args = _checks_where_args(service_id, from_ts, to_ts, status_filter, search) args.extend([limit, offset]) with get_db() as conn: rows = conn.execute(f"SELECT * FROM checks {where} ORDER BY timestamp DESC LIMIT ? OFFSET ?", args).fetchall() return [dict(r) for r in rows] def get_report_stats(service_id: int, from_ts: str = None, to_ts: str = None): """ Compute uptime % and latency stats for a service over a time range. Uses hourly rollups for old data + raw checks for recent data (last ROLLUP_AGE_HOURS). Supports accurate reporting over 90+ days. """ now = datetime.now(timezone.utc) raw_cutoff = (now - timedelta(hours=ROLLUP_AGE_HOURS)).isoformat() to_ts = to_ts or now.isoformat() from_ts = from_ts or "1970-01-01T00:00:00" total = 0 success_count = 0 sum_response_ms = 0.0 count_with_response = 0 min_ms = None max_ms = None with get_db() as conn: # 1. Rollups: hourly buckets that end before raw_cutoff rollup_end = raw_cutoff if raw_cutoff < to_ts else from_ts if from_ts < rollup_end: q = """ SELECT total_checks, success_count, sum_response_ms, response_count, min_response_ms, max_response_ms FROM uptime_rollups WHERE service_id = ? AND period_start >= ? AND period_start < ? """ rollup_rows = conn.execute(q, (service_id, from_ts, rollup_end)).fetchall() for r in rollup_rows: total += r["total_checks"] success_count += r["success_count"] sum_response_ms += r["sum_response_ms"] or 0 count_with_response += r["response_count"] or 0 if r["min_response_ms"] is not None: min_ms = r["min_response_ms"] if min_ms is None else min(min_ms, r["min_response_ms"]) if r["max_response_ms"] is not None: max_ms = r["max_response_ms"] if max_ms is None else max(max_ms, r["max_response_ms"]) # 2. Raw checks: recent data (overlaps with rollup period if range is entirely recent) raw_from = from_ts if from_ts >= raw_cutoff else raw_cutoff if raw_from <= to_ts: q = "SELECT success, response_time_ms FROM checks WHERE service_id = ? AND timestamp >= ? AND timestamp <= ?" raw_rows = conn.execute(q, (service_id, raw_from, to_ts)).fetchall() for r in raw_rows: total += 1 success_count += 1 if r["success"] else 0 if r["response_time_ms"] is not None: sum_response_ms += r["response_time_ms"] count_with_response += 1 min_ms = r["response_time_ms"] if min_ms is None else min(min_ms, r["response_time_ms"]) max_ms = r["response_time_ms"] if max_ms is None else max(max_ms, r["response_time_ms"]) if total == 0: return {"total": 0, "uptime_pct": 0, "avg_ms": None, "min_ms": None, "max_ms": None} uptime_pct = (success_count / total) * 100 avg_ms = round(sum_response_ms / count_with_response, 2) if count_with_response else None return { "total": total, "uptime_pct": round(uptime_pct, 2), "avg_ms": avg_ms, "min_ms": round(min_ms, 2) if min_ms is not None else None, "max_ms": round(max_ms, 2) if max_ms is not None else None, } def get_history(service_id: int, limit: int = 100, from_ts: str = None, to_ts: str = None): """Get check history for charts (JSON), optionally filtered by timestamp range.""" with get_db() as conn: q = "SELECT timestamp, success, response_time_ms FROM checks WHERE service_id = ?" args = [service_id] if from_ts: q += " AND timestamp >= ?" args.append(from_ts) if to_ts: q += " AND timestamp <= ?" args.append(to_ts) q += " ORDER BY timestamp DESC LIMIT ?" args.append(limit) rows = conn.execute(q, args).fetchall() return [{"timestamp": r["timestamp"], "success": bool(r["success"]), "response_time_ms": r["response_time_ms"]} for r in rows] def delete_service(service_id: int) -> bool: """Delete a service and its check history. Returns True if deleted.""" with get_db() as conn: conn.execute("DELETE FROM checks WHERE service_id = ?", (service_id,)) conn.execute("DELETE FROM uptime_rollups WHERE service_id = ?", (service_id,)) cur = conn.execute("DELETE FROM services WHERE id = ?", (service_id,)) return cur.rowcount > 0 def get_all_services_for_scheduler(): """Return all services for the scheduler.""" with get_db() as conn: rows = conn.execute("SELECT id, target, protocol, interval_seconds FROM services").fetchall() return [dict(r) for r in rows] def _hour_start(ts: str) -> str: """Return ISO timestamp truncated to hour boundary (e.g. 2026-03-10T14:00:00).""" dt = datetime.fromisoformat(ts.replace("Z", "+00:00")) return dt.replace(minute=0, second=0, microsecond=0).isoformat() def rollup_old_checks() -> int: """ Aggregate checks older than ROLLUP_AGE_HOURS into hourly buckets. Returns number of raw checks that were rolled up and deleted. """ cutoff = datetime.now(timezone.utc) - timedelta(hours=ROLLUP_AGE_HOURS) cutoff_ts = cutoff.isoformat() with get_db() as conn: # Get checks older than cutoff, grouped by service and hour rows = conn.execute( """ SELECT service_id, strftime('%Y-%m-%dT%H:00:00', timestamp) as period_start, COUNT(*) as total_checks, SUM(success) as success_count, SUM(CASE WHEN response_time_ms IS NOT NULL THEN response_time_ms ELSE 0 END) as sum_response_ms, SUM(CASE WHEN response_time_ms IS NOT NULL THEN 1 ELSE 0 END) as response_count, MIN(CASE WHEN response_time_ms IS NOT NULL THEN response_time_ms END) as min_response_ms, MAX(response_time_ms) as max_response_ms, GROUP_CONCAT(id) as check_ids FROM checks WHERE timestamp < ? GROUP BY service_id, period_start """, (cutoff_ts,), ).fetchall() if not rows: return 0 deleted = 0 for r in rows: period_end = datetime.fromisoformat(r["period_start"].replace("Z", "+00:00")) + timedelta(hours=1) period_end_ts = period_end.isoformat() conn.execute( """ INSERT INTO uptime_rollups (service_id, period_start, period_end, total_checks, success_count, sum_response_ms, response_count, min_response_ms, max_response_ms) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(service_id, period_start) DO UPDATE SET total_checks = total_checks + excluded.total_checks, success_count = success_count + excluded.success_count, sum_response_ms = sum_response_ms + excluded.sum_response_ms, response_count = response_count + excluded.response_count, min_response_ms = MIN(min_response_ms, excluded.min_response_ms), max_response_ms = MAX(max_response_ms, excluded.max_response_ms) """, ( r["service_id"], r["period_start"], period_end_ts, r["total_checks"], r["success_count"], r["sum_response_ms"] or 0, r["response_count"] or 0, r["min_response_ms"], r["max_response_ms"], ), ) ids = [int(x) for x in (r["check_ids"] or "").split(",") if x] if ids: placeholders = ",".join("?" * len(ids)) cur = conn.execute(f"DELETE FROM checks WHERE id IN ({placeholders})", ids) deleted += cur.rowcount return deleted def prune_checks_retention() -> int: """ Remove old checks to limit storage. Keeps last CHECK_RETENTION_COUNT per service. If CHECK_RETENTION_DAYS is set, also deletes checks older than that. Returns number of rows deleted. """ with get_db() as conn: deleted = 0 # Delete checks older than N days (if configured) if CHECK_RETENTION_DAYS: cutoff = (datetime.now(timezone.utc) - timedelta(days=CHECK_RETENTION_DAYS)).isoformat() cur = conn.execute("DELETE FROM checks WHERE timestamp < ?", (cutoff,)) deleted += cur.rowcount # Keep only last N checks per service service_ids = [r[0] for r in conn.execute("SELECT id FROM services").fetchall()] for sid in service_ids: # Get ids of checks to keep (most recent N) keep_ids = conn.execute( "SELECT id FROM checks WHERE service_id = ? ORDER BY timestamp DESC LIMIT ?", (sid, CHECK_RETENTION_COUNT), ).fetchall() keep_ids = [r[0] for r in keep_ids] if not keep_ids: continue placeholders = ",".join("?" * len(keep_ids)) cur = conn.execute( f"DELETE FROM checks WHERE service_id = ? AND id NOT IN ({placeholders})", [sid] + keep_ids, ) deleted += cur.rowcount return deleted