organize the recent checks
This commit is contained in:
@@ -137,27 +137,40 @@ def add_check(service_id: int, success: bool, response_time_ms: float | None, er
|
||||
)
|
||||
|
||||
|
||||
def get_checks(service_id: int, limit: int = 50, from_ts: str = None, to_ts: str = None, status_filter: str = None, search: str = None):
|
||||
"""Get recent checks for a service, optionally filtered by timestamp, status (ok/error), and error search."""
|
||||
def _checks_where_args(service_id: int, from_ts: str = None, to_ts: str = None, status_filter: str = None, search: str = None):
|
||||
"""Build WHERE clause and args for checks queries."""
|
||||
q = "WHERE service_id = ?"
|
||||
args = [service_id]
|
||||
if from_ts:
|
||||
q += " AND timestamp >= ?"
|
||||
args.append(from_ts)
|
||||
if to_ts:
|
||||
q += " AND timestamp <= ?"
|
||||
args.append(to_ts)
|
||||
if status_filter == "error":
|
||||
q += " AND status = 'ERROR'"
|
||||
elif status_filter == "ok":
|
||||
q += " AND status = 'OK'"
|
||||
if search:
|
||||
q += " AND (error_message LIKE ? OR status LIKE ?)"
|
||||
args.extend([f"%{search}%", f"%{search}%"])
|
||||
return q, args
|
||||
|
||||
|
||||
def get_checks_count(service_id: int, from_ts: str = None, to_ts: str = None, status_filter: str = None, search: str = None) -> int:
|
||||
"""Count checks matching filters (for pagination)."""
|
||||
where, args = _checks_where_args(service_id, from_ts, to_ts, status_filter, search)
|
||||
with get_db() as conn:
|
||||
q = "SELECT * FROM checks WHERE service_id = ?"
|
||||
args = [service_id]
|
||||
if from_ts:
|
||||
q += " AND timestamp >= ?"
|
||||
args.append(from_ts)
|
||||
if to_ts:
|
||||
q += " AND timestamp <= ?"
|
||||
args.append(to_ts)
|
||||
if status_filter == "error":
|
||||
q += " AND status = 'ERROR'"
|
||||
elif status_filter == "ok":
|
||||
q += " AND status = 'OK'"
|
||||
if search:
|
||||
q += " AND (error_message LIKE ? OR status LIKE ?)"
|
||||
args.extend([f"%{search}%", f"%{search}%"])
|
||||
q += " ORDER BY timestamp DESC LIMIT ?"
|
||||
args.append(limit)
|
||||
rows = conn.execute(q, args).fetchall()
|
||||
row = conn.execute(f"SELECT COUNT(*) FROM checks {where}", args).fetchone()
|
||||
return row[0]
|
||||
|
||||
|
||||
def get_checks(service_id: int, limit: int = 50, offset: int = 0, from_ts: str = None, to_ts: str = None, status_filter: str = None, search: str = None):
|
||||
"""Get recent checks for a service, optionally filtered and paginated."""
|
||||
where, args = _checks_where_args(service_id, from_ts, to_ts, status_filter, search)
|
||||
args.extend([limit, offset])
|
||||
with get_db() as conn:
|
||||
rows = conn.execute(f"SELECT * FROM checks {where} ORDER BY timestamp DESC LIMIT ? OFFSET ?", args).fetchall()
|
||||
return [dict(r) for r in rows]
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user