151 lines
4.8 KiB
Python
151 lines
4.8 KiB
Python
|
|
#!/usr/bin/env python3
|
||
|
|
"""
|
||
|
|
日志管理模块
|
||
|
|
"""
|
||
|
|
|
||
|
|
import os
|
||
|
|
import datetime
|
||
|
|
import json
|
||
|
|
from typing import List, Dict, Optional
|
||
|
|
|
||
|
|
class LogLevel:
|
||
|
|
DEBUG = "debug"
|
||
|
|
INFO = "info"
|
||
|
|
WARNING = "warning"
|
||
|
|
ERROR = "error"
|
||
|
|
SUCCESS = "success"
|
||
|
|
|
||
|
|
class LogEntry:
|
||
|
|
"""日志条目"""
|
||
|
|
def __init__(self, level: str, message: str, timestamp: datetime.datetime = None):
|
||
|
|
self.level = level
|
||
|
|
self.message = message
|
||
|
|
self.timestamp = timestamp or datetime.datetime.now()
|
||
|
|
|
||
|
|
def to_dict(self) -> Dict:
|
||
|
|
"""转换为字典"""
|
||
|
|
return {
|
||
|
|
"level": self.level,
|
||
|
|
"message": self.message,
|
||
|
|
"timestamp": self.timestamp.isoformat()
|
||
|
|
}
|
||
|
|
|
||
|
|
@classmethod
|
||
|
|
def from_dict(cls, data: Dict) -> 'LogEntry':
|
||
|
|
"""从字典创建"""
|
||
|
|
return cls(
|
||
|
|
level=data["level"],
|
||
|
|
message=data["message"],
|
||
|
|
timestamp=datetime.datetime.fromisoformat(data["timestamp"])
|
||
|
|
)
|
||
|
|
|
||
|
|
class Logger:
|
||
|
|
"""日志管理器"""
|
||
|
|
def __init__(self, log_dir: str = "logs"):
|
||
|
|
self.log_dir = log_dir
|
||
|
|
self.logs: List[LogEntry] = []
|
||
|
|
self.max_memory_logs = 1000 # 内存中保留的最大日志数
|
||
|
|
self.current_filter = "all" # all, error, error_warning, no_debug, debug
|
||
|
|
|
||
|
|
# 确保日志目录存在
|
||
|
|
os.makedirs(self.log_dir, exist_ok=True)
|
||
|
|
|
||
|
|
def log(self, message: str, level: str = LogLevel.INFO):
|
||
|
|
"""记录日志"""
|
||
|
|
entry = LogEntry(level, message)
|
||
|
|
self.logs.append(entry)
|
||
|
|
|
||
|
|
# 当日志超过阈值时,保存到文件
|
||
|
|
if len(self.logs) > self.max_memory_logs:
|
||
|
|
self.save_to_file()
|
||
|
|
|
||
|
|
return entry
|
||
|
|
|
||
|
|
def debug(self, message: str):
|
||
|
|
"""记录debug日志"""
|
||
|
|
return self.log(message, LogLevel.DEBUG)
|
||
|
|
|
||
|
|
def info(self, message: str):
|
||
|
|
"""记录info日志"""
|
||
|
|
return self.log(message, LogLevel.INFO)
|
||
|
|
|
||
|
|
def warning(self, message: str):
|
||
|
|
"""记录warning日志"""
|
||
|
|
return self.log(message, LogLevel.WARNING)
|
||
|
|
|
||
|
|
def error(self, message: str):
|
||
|
|
"""记录error日志"""
|
||
|
|
return self.log(message, LogLevel.ERROR)
|
||
|
|
|
||
|
|
def success(self, message: str):
|
||
|
|
"""记录success日志"""
|
||
|
|
return self.log(message, LogLevel.SUCCESS)
|
||
|
|
|
||
|
|
def save_to_file(self) -> str:
|
||
|
|
"""保存日志到文件"""
|
||
|
|
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
|
|
filename = os.path.join(self.log_dir, f"rdss_{timestamp}.log")
|
||
|
|
|
||
|
|
with open(filename, 'w', encoding='utf-8') as f:
|
||
|
|
for entry in self.logs:
|
||
|
|
json.dump(entry.to_dict(), f, ensure_ascii=False)
|
||
|
|
f.write('\n')
|
||
|
|
|
||
|
|
# 清空内存中的日志
|
||
|
|
self.logs = []
|
||
|
|
return filename
|
||
|
|
|
||
|
|
def load_from_file(self, filename: str) -> List[LogEntry]:
|
||
|
|
"""从文件加载日志"""
|
||
|
|
entries = []
|
||
|
|
try:
|
||
|
|
with open(filename, 'r', encoding='utf-8') as f:
|
||
|
|
for line in f:
|
||
|
|
line = line.strip()
|
||
|
|
if line:
|
||
|
|
data = json.loads(line)
|
||
|
|
entries.append(LogEntry.from_dict(data))
|
||
|
|
except Exception as e:
|
||
|
|
self.error(f"加载日志文件失败: {str(e)}")
|
||
|
|
|
||
|
|
return entries
|
||
|
|
|
||
|
|
def get_filtered_logs(self) -> List[LogEntry]:
|
||
|
|
"""获取过滤后的日志"""
|
||
|
|
if self.current_filter == "all":
|
||
|
|
return self.logs
|
||
|
|
elif self.current_filter == "error":
|
||
|
|
return [entry for entry in self.logs if entry.level == LogLevel.ERROR]
|
||
|
|
elif self.current_filter == "error_warning":
|
||
|
|
return [entry for entry in self.logs if entry.level in [LogLevel.ERROR, LogLevel.WARNING]]
|
||
|
|
elif self.current_filter == "no_debug":
|
||
|
|
return [entry for entry in self.logs if entry.level != LogLevel.DEBUG]
|
||
|
|
elif self.current_filter == "debug":
|
||
|
|
return [entry for entry in self.logs if entry.level == LogLevel.DEBUG]
|
||
|
|
return self.logs
|
||
|
|
|
||
|
|
def set_filter(self, filter_type: str):
|
||
|
|
"""设置过滤器"""
|
||
|
|
self.current_filter = filter_type
|
||
|
|
|
||
|
|
def clear(self):
|
||
|
|
"""清空日志"""
|
||
|
|
self.logs = []
|
||
|
|
|
||
|
|
def get_log_files(self) -> List[str]:
|
||
|
|
"""获取所有日志文件"""
|
||
|
|
files = []
|
||
|
|
if os.path.exists(self.log_dir):
|
||
|
|
for file in os.listdir(self.log_dir):
|
||
|
|
if file.startswith("rdss_") and file.endswith(".log"):
|
||
|
|
files.append(os.path.join(self.log_dir, file))
|
||
|
|
# 按时间排序
|
||
|
|
files.sort(reverse=True)
|
||
|
|
return files
|
||
|
|
|
||
|
|
# 全局日志实例
|
||
|
|
global_logger = Logger()
|
||
|
|
|
||
|
|
def get_logger() -> Logger:
|
||
|
|
"""获取全局日志实例"""
|
||
|
|
return global_logger
|