Files
2026-03-07 03:43:50 +00:00

65 lines
2.1 KiB
Python

"""HTTP/HTTPS and TCP check logic."""
import socket
import time
from urllib.parse import urlparse
import requests
TIMEOUT = 10
def check_http(url: str) -> tuple[bool, float | None, str | None]:
"""
Check HTTP/HTTPS endpoint. Returns (success, response_time_ms, error_message).
Success = 2xx status code.
"""
if not url.startswith(("http://", "https://")):
url = "https://" + url
try:
start = time.perf_counter()
r = requests.get(url, timeout=TIMEOUT)
elapsed_ms = (time.perf_counter() - start) * 1000
success = 200 <= r.status_code < 300
return success, elapsed_ms, None if success else f"HTTP {r.status_code}"
except requests.RequestException as e:
return False, None, str(e)
def check_tcp(host: str, port: int) -> tuple[bool, float | None, str | None]:
"""
Check TCP connectivity. Returns (success, response_time_ms, error_message).
"""
try:
start = time.perf_counter()
sock = socket.create_connection((host, port), timeout=TIMEOUT)
sock.close()
elapsed_ms = (time.perf_counter() - start) * 1000
return True, elapsed_ms, None
except (socket.error, OSError) as e:
return False, None, str(e)
def run_check(service_id: int, target: str, protocol: str):
"""Run a single check and store the result."""
from app.models import add_check
protocol = protocol.lower()
if protocol in ("http", "https"):
success, response_time_ms, error_message = check_http(target)
elif protocol == "tcp":
parts = target.split(":")
if len(parts) != 2:
add_check(service_id, False, None, "Invalid target: expected host:port")
return
try:
port = int(parts[1])
except ValueError:
add_check(service_id, False, None, "Invalid port")
return
success, response_time_ms, error_message = check_tcp(parts[0].strip(), port)
else:
add_check(service_id, False, None, f"Unknown protocol: {protocol}")
return
add_check(service_id, success, response_time_ms, error_message)