PCM_Report/time_sync.py

110 lines
4.1 KiB
Python
Raw Permalink Normal View History

2025-12-11 14:32:31 +08:00
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime
from typing import Optional, Tuple
import paramiko
@dataclass
class SshCreds:
host: str
port: int
username: str
password: Optional[str] = None
use_password: bool = True
key_path: Optional[str] = None
connect_timeout_s: int = 5
command_timeout_s: int = 8
retries: int = 1
use_timedatectl_fallback: bool = True
class SshTimeSetter:
def __init__(self, creds: SshCreds) -> None:
self._creds = creds
self._client: Optional[paramiko.SSHClient] = None
def _ensure_client(self) -> paramiko.SSHClient:
if self._client is not None:
return self._client
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
if not self._creds.use_password and self._creds.key_path:
pkey = paramiko.RSAKey.from_private_key_file(self._creds.key_path)
client.connect(
hostname=self._creds.host,
port=int(self._creds.port or 22),
username=self._creds.username,
pkey=pkey,
look_for_keys=False,
allow_agent=False,
timeout=self._creds.connect_timeout_s,
)
else:
client.connect(
hostname=self._creds.host,
port=int(self._creds.port or 22),
username=self._creds.username,
password=self._creds.password or "",
look_for_keys=False,
allow_agent=False,
timeout=self._creds.connect_timeout_s,
)
self._client = client
return client
def close(self) -> None:
if self._client is not None:
try:
self._client.close()
except Exception:
pass
self._client = None
def set_time_from_windows_now(self) -> Tuple[bool, str]:
now = datetime.now()
ts = now.strftime("%Y-%m-%d %H:%M:%S")
attempts = max(1, int(self._creds.retries or 1))
last_err = ""
for i in range(attempts):
try:
client = self._ensure_client()
# Primary: date -s
cmd = f"sudo -S date -s '{ts}'"
stdin, stdout, stderr = client.exec_command(cmd, get_pty=True, timeout=self._creds.command_timeout_s)
if self._creds.use_password:
stdin.write((self._creds.password or "") + "\n")
stdin.flush()
out = stdout.read().decode("utf-8", errors="ignore").strip()
err = stderr.read().decode("utf-8", errors="ignore").strip()
exit_status = stdout.channel.recv_exit_status()
if exit_status == 0:
return True, (out or f"时间已设置为 {ts}")
# Optional fallback: timedatectl
if self._creds.use_timedatectl_fallback:
cmd2 = f"sudo -S timedatectl set-time '{ts}'"
stdin2, stdout2, stderr2 = client.exec_command(cmd2, get_pty=True, timeout=self._creds.command_timeout_s)
if self._creds.use_password:
stdin2.write((self._creds.password or "") + "\n")
stdin2.flush()
out2 = stdout2.read().decode("utf-8", errors="ignore").strip()
err2 = stderr2.read().decode("utf-8", errors="ignore").strip()
status2 = stdout2.channel.recv_exit_status()
if status2 == 0:
return True, (out2 or f"时间已设置为 {ts}")
last_err = err2 or err or out2 or out or "设置时间失败"
else:
last_err = err or out or "设置时间失败"
except Exception as e:
last_err = str(e)
# force reconnect next attempt
try:
self.close()
except Exception:
pass
return False, last_err or "设置时间失败"