RDSS/utils/logger.py

151 lines
4.8 KiB
Python
Raw Permalink Normal View History

2026-03-13 14:16:00 +08:00
#!/usr/bin/env python3
"""
日志管理模块
"""
import os
import datetime
import json
from typing import List, Dict, Optional
class LogLevel:
DEBUG = "debug"
INFO = "info"
WARNING = "warning"
ERROR = "error"
SUCCESS = "success"
class LogEntry:
"""日志条目"""
def __init__(self, level: str, message: str, timestamp: datetime.datetime = None):
self.level = level
self.message = message
self.timestamp = timestamp or datetime.datetime.now()
def to_dict(self) -> Dict:
"""转换为字典"""
return {
"level": self.level,
"message": self.message,
"timestamp": self.timestamp.isoformat()
}
@classmethod
def from_dict(cls, data: Dict) -> 'LogEntry':
"""从字典创建"""
return cls(
level=data["level"],
message=data["message"],
timestamp=datetime.datetime.fromisoformat(data["timestamp"])
)
class Logger:
"""日志管理器"""
def __init__(self, log_dir: str = "logs"):
self.log_dir = log_dir
self.logs: List[LogEntry] = []
self.max_memory_logs = 1000 # 内存中保留的最大日志数
self.current_filter = "all" # all, error, error_warning, no_debug, debug
# 确保日志目录存在
os.makedirs(self.log_dir, exist_ok=True)
def log(self, message: str, level: str = LogLevel.INFO):
"""记录日志"""
entry = LogEntry(level, message)
self.logs.append(entry)
# 当日志超过阈值时,保存到文件
if len(self.logs) > self.max_memory_logs:
self.save_to_file()
return entry
def debug(self, message: str):
"""记录debug日志"""
return self.log(message, LogLevel.DEBUG)
def info(self, message: str):
"""记录info日志"""
return self.log(message, LogLevel.INFO)
def warning(self, message: str):
"""记录warning日志"""
return self.log(message, LogLevel.WARNING)
def error(self, message: str):
"""记录error日志"""
return self.log(message, LogLevel.ERROR)
def success(self, message: str):
"""记录success日志"""
return self.log(message, LogLevel.SUCCESS)
def save_to_file(self) -> str:
"""保存日志到文件"""
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
filename = os.path.join(self.log_dir, f"rdss_{timestamp}.log")
with open(filename, 'w', encoding='utf-8') as f:
for entry in self.logs:
json.dump(entry.to_dict(), f, ensure_ascii=False)
f.write('\n')
# 清空内存中的日志
self.logs = []
return filename
def load_from_file(self, filename: str) -> List[LogEntry]:
"""从文件加载日志"""
entries = []
try:
with open(filename, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line:
data = json.loads(line)
entries.append(LogEntry.from_dict(data))
except Exception as e:
self.error(f"加载日志文件失败: {str(e)}")
return entries
def get_filtered_logs(self) -> List[LogEntry]:
"""获取过滤后的日志"""
if self.current_filter == "all":
return self.logs
elif self.current_filter == "error":
return [entry for entry in self.logs if entry.level == LogLevel.ERROR]
elif self.current_filter == "error_warning":
return [entry for entry in self.logs if entry.level in [LogLevel.ERROR, LogLevel.WARNING]]
elif self.current_filter == "no_debug":
return [entry for entry in self.logs if entry.level != LogLevel.DEBUG]
elif self.current_filter == "debug":
return [entry for entry in self.logs if entry.level == LogLevel.DEBUG]
return self.logs
def set_filter(self, filter_type: str):
"""设置过滤器"""
self.current_filter = filter_type
def clear(self):
"""清空日志"""
self.logs = []
def get_log_files(self) -> List[str]:
"""获取所有日志文件"""
files = []
if os.path.exists(self.log_dir):
for file in os.listdir(self.log_dir):
if file.startswith("rdss_") and file.endswith(".log"):
files.append(os.path.join(self.log_dir, file))
# 按时间排序
files.sort(reverse=True)
return files
# 全局日志实例
global_logger = Logger()
def get_logger() -> Logger:
"""获取全局日志实例"""
return global_logger