110 lines
4.1 KiB
Python
110 lines
4.1 KiB
Python
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
from dataclasses import dataclass
|
||
|
|
from datetime import datetime
|
||
|
|
from typing import Optional, Tuple
|
||
|
|
|
||
|
|
import paramiko
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass
|
||
|
|
class SshCreds:
|
||
|
|
host: str
|
||
|
|
port: int
|
||
|
|
username: str
|
||
|
|
password: Optional[str] = None
|
||
|
|
use_password: bool = True
|
||
|
|
key_path: Optional[str] = None
|
||
|
|
connect_timeout_s: int = 5
|
||
|
|
command_timeout_s: int = 8
|
||
|
|
retries: int = 1
|
||
|
|
use_timedatectl_fallback: bool = True
|
||
|
|
|
||
|
|
|
||
|
|
class SshTimeSetter:
|
||
|
|
def __init__(self, creds: SshCreds) -> None:
|
||
|
|
self._creds = creds
|
||
|
|
self._client: Optional[paramiko.SSHClient] = None
|
||
|
|
|
||
|
|
def _ensure_client(self) -> paramiko.SSHClient:
|
||
|
|
if self._client is not None:
|
||
|
|
return self._client
|
||
|
|
client = paramiko.SSHClient()
|
||
|
|
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
||
|
|
if not self._creds.use_password and self._creds.key_path:
|
||
|
|
pkey = paramiko.RSAKey.from_private_key_file(self._creds.key_path)
|
||
|
|
client.connect(
|
||
|
|
hostname=self._creds.host,
|
||
|
|
port=int(self._creds.port or 22),
|
||
|
|
username=self._creds.username,
|
||
|
|
pkey=pkey,
|
||
|
|
look_for_keys=False,
|
||
|
|
allow_agent=False,
|
||
|
|
timeout=self._creds.connect_timeout_s,
|
||
|
|
)
|
||
|
|
else:
|
||
|
|
client.connect(
|
||
|
|
hostname=self._creds.host,
|
||
|
|
port=int(self._creds.port or 22),
|
||
|
|
username=self._creds.username,
|
||
|
|
password=self._creds.password or "",
|
||
|
|
look_for_keys=False,
|
||
|
|
allow_agent=False,
|
||
|
|
timeout=self._creds.connect_timeout_s,
|
||
|
|
)
|
||
|
|
self._client = client
|
||
|
|
return client
|
||
|
|
|
||
|
|
def close(self) -> None:
|
||
|
|
if self._client is not None:
|
||
|
|
try:
|
||
|
|
self._client.close()
|
||
|
|
except Exception:
|
||
|
|
pass
|
||
|
|
self._client = None
|
||
|
|
|
||
|
|
def set_time_from_windows_now(self) -> Tuple[bool, str]:
|
||
|
|
now = datetime.now()
|
||
|
|
ts = now.strftime("%Y-%m-%d %H:%M:%S")
|
||
|
|
attempts = max(1, int(self._creds.retries or 1))
|
||
|
|
last_err = ""
|
||
|
|
for i in range(attempts):
|
||
|
|
try:
|
||
|
|
client = self._ensure_client()
|
||
|
|
# Primary: date -s
|
||
|
|
cmd = f"sudo -S date -s '{ts}'"
|
||
|
|
stdin, stdout, stderr = client.exec_command(cmd, get_pty=True, timeout=self._creds.command_timeout_s)
|
||
|
|
if self._creds.use_password:
|
||
|
|
stdin.write((self._creds.password or "") + "\n")
|
||
|
|
stdin.flush()
|
||
|
|
out = stdout.read().decode("utf-8", errors="ignore").strip()
|
||
|
|
err = stderr.read().decode("utf-8", errors="ignore").strip()
|
||
|
|
exit_status = stdout.channel.recv_exit_status()
|
||
|
|
if exit_status == 0:
|
||
|
|
return True, (out or f"时间已设置为 {ts}")
|
||
|
|
# Optional fallback: timedatectl
|
||
|
|
if self._creds.use_timedatectl_fallback:
|
||
|
|
cmd2 = f"sudo -S timedatectl set-time '{ts}'"
|
||
|
|
stdin2, stdout2, stderr2 = client.exec_command(cmd2, get_pty=True, timeout=self._creds.command_timeout_s)
|
||
|
|
if self._creds.use_password:
|
||
|
|
stdin2.write((self._creds.password or "") + "\n")
|
||
|
|
stdin2.flush()
|
||
|
|
out2 = stdout2.read().decode("utf-8", errors="ignore").strip()
|
||
|
|
err2 = stderr2.read().decode("utf-8", errors="ignore").strip()
|
||
|
|
status2 = stdout2.channel.recv_exit_status()
|
||
|
|
if status2 == 0:
|
||
|
|
return True, (out2 or f"时间已设置为 {ts}")
|
||
|
|
last_err = err2 or err or out2 or out or "设置时间失败"
|
||
|
|
else:
|
||
|
|
last_err = err or out or "设置时间失败"
|
||
|
|
except Exception as e:
|
||
|
|
last_err = str(e)
|
||
|
|
# force reconnect next attempt
|
||
|
|
try:
|
||
|
|
self.close()
|
||
|
|
except Exception:
|
||
|
|
pass
|
||
|
|
return False, last_err or "设置时间失败"
|
||
|
|
|
||
|
|
|