#!/usr/bin/env python3 """ 日志管理模块 """ import os import datetime import json from typing import List, Dict, Optional class LogLevel: DEBUG = "debug" INFO = "info" WARNING = "warning" ERROR = "error" SUCCESS = "success" class LogEntry: """日志条目""" def __init__(self, level: str, message: str, timestamp: datetime.datetime = None): self.level = level self.message = message self.timestamp = timestamp or datetime.datetime.now() def to_dict(self) -> Dict: """转换为字典""" return { "level": self.level, "message": self.message, "timestamp": self.timestamp.isoformat() } @classmethod def from_dict(cls, data: Dict) -> 'LogEntry': """从字典创建""" return cls( level=data["level"], message=data["message"], timestamp=datetime.datetime.fromisoformat(data["timestamp"]) ) class Logger: """日志管理器""" def __init__(self, log_dir: str = "logs"): self.log_dir = log_dir self.logs: List[LogEntry] = [] self.max_memory_logs = 1000 # 内存中保留的最大日志数 self.current_filter = "all" # all, error, error_warning, no_debug, debug # 确保日志目录存在 os.makedirs(self.log_dir, exist_ok=True) def log(self, message: str, level: str = LogLevel.INFO): """记录日志""" entry = LogEntry(level, message) self.logs.append(entry) # 当日志超过阈值时,保存到文件 if len(self.logs) > self.max_memory_logs: self.save_to_file() return entry def debug(self, message: str): """记录debug日志""" return self.log(message, LogLevel.DEBUG) def info(self, message: str): """记录info日志""" return self.log(message, LogLevel.INFO) def warning(self, message: str): """记录warning日志""" return self.log(message, LogLevel.WARNING) def error(self, message: str): """记录error日志""" return self.log(message, LogLevel.ERROR) def success(self, message: str): """记录success日志""" return self.log(message, LogLevel.SUCCESS) def save_to_file(self) -> str: """保存日志到文件""" timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") filename = os.path.join(self.log_dir, f"rdss_{timestamp}.log") with open(filename, 'w', encoding='utf-8') as f: for entry in self.logs: json.dump(entry.to_dict(), f, ensure_ascii=False) f.write('\n') # 清空内存中的日志 self.logs = [] return filename def load_from_file(self, filename: str) -> List[LogEntry]: """从文件加载日志""" entries = [] try: with open(filename, 'r', encoding='utf-8') as f: for line in f: line = line.strip() if line: data = json.loads(line) entries.append(LogEntry.from_dict(data)) except Exception as e: self.error(f"加载日志文件失败: {str(e)}") return entries def get_filtered_logs(self) -> List[LogEntry]: """获取过滤后的日志""" if self.current_filter == "all": return self.logs elif self.current_filter == "error": return [entry for entry in self.logs if entry.level == LogLevel.ERROR] elif self.current_filter == "error_warning": return [entry for entry in self.logs if entry.level in [LogLevel.ERROR, LogLevel.WARNING]] elif self.current_filter == "no_debug": return [entry for entry in self.logs if entry.level != LogLevel.DEBUG] elif self.current_filter == "debug": return [entry for entry in self.logs if entry.level == LogLevel.DEBUG] return self.logs def set_filter(self, filter_type: str): """设置过滤器""" self.current_filter = filter_type def clear(self): """清空日志""" self.logs = [] def get_log_files(self) -> List[str]: """获取所有日志文件""" files = [] if os.path.exists(self.log_dir): for file in os.listdir(self.log_dir): if file.startswith("rdss_") and file.endswith(".log"): files.append(os.path.join(self.log_dir, file)) # 按时间排序 files.sort(reverse=True) return files # 全局日志实例 global_logger = Logger() def get_logger() -> Logger: """获取全局日志实例""" return global_logger