"""HTTP/HTTPS and TCP check logic.""" import socket import time from urllib.parse import urlparse import requests TIMEOUT = 10 def check_http(url: str) -> tuple[bool, float | None, str | None]: """ Check HTTP/HTTPS endpoint. Returns (success, response_time_ms, error_message). Success = 2xx status code. """ if not url.startswith(("http://", "https://")): url = "https://" + url try: start = time.perf_counter() r = requests.get(url, timeout=TIMEOUT) elapsed_ms = (time.perf_counter() - start) * 1000 success = 200 <= r.status_code < 300 return success, elapsed_ms, None if success else f"HTTP {r.status_code}" except requests.RequestException as e: return False, None, str(e) def check_tcp(host: str, port: int) -> tuple[bool, float | None, str | None]: """ Check TCP connectivity. Returns (success, response_time_ms, error_message). """ try: start = time.perf_counter() sock = socket.create_connection((host, port), timeout=TIMEOUT) sock.close() elapsed_ms = (time.perf_counter() - start) * 1000 return True, elapsed_ms, None except (socket.error, OSError) as e: return False, None, str(e) def run_check(service_id: int, target: str, protocol: str): """Run a single check and store the result.""" from app.models import add_check protocol = protocol.lower() if protocol in ("http", "https"): success, response_time_ms, error_message = check_http(target) elif protocol == "tcp": parts = target.split(":") if len(parts) != 2: add_check(service_id, False, None, "Invalid target: expected host:port") return try: port = int(parts[1]) except ValueError: add_check(service_id, False, None, "Invalid port") return success, response_time_ms, error_message = check_tcp(parts[0].strip(), port) else: add_check(service_id, False, None, f"Unknown protocol: {protocol}") return add_check(service_id, success, response_time_ms, error_message)