#!/opt/homebrew/bin/python3 # -*- coding:utf-8 -*- import json import shutil import datetime import uuid import os from PyQt6 import * from PyQt6.QtCore import * from logs import log from typing import Union from config import config from common import common from userModel.userManager import userManager class ProjectManager(QObject): sig_projectChanged = pyqtSignal(str) def __init__(self, parent=None): super().__init__() self.currentProId = "" self.basePath = config.data["project"]["base_path"] if not os.path.exists(self.basePath): os.mkdir(self.basePath) self.path = "" self.historyPath = "" self.logPath = "" self.cmdHistoryPath = "" self.reportPath = "" self.projectDict = {} self.projectTaskDict = {} self.currentProInfo = {} self.rowCount = 100 self.batch_size = 50 # 批量写入的条数 self.timeout = 5000 # 超时(毫秒) self.log_buffer = [] # 日志缓存 # self.timer = QTimer(self) # self.timer.timeout.connect(self.flush_logs) # 超时触发写入 # self.timer.start(10) # 启动定时器 @pyqtSlot(str,str, result=str) def create(self, name, remark): id = str(uuid.uuid4()) self.setCurProPath(id) userData = userManager.getCurrentUser() userName = "unknown" if userData: userName = userData["name"] if not os.path.exists(self.path): os.mkdir(self.path) if not os.path.exists(self.historyPath): os.mkdir(self.historyPath) if not os.path.exists(self.logPath): os.mkdir(self.logPath) if not os.path.exists(self.reportPath): os.mkdir(self.reportPath) with open(os.path.join(self.path, "info.txt"), "w") as f: f.write(json.dumps({ "id": id, "name": name, "path": os.path.normpath(self.path), "remark": remark, "operator": userName, "attr": {}, "createDate": str(QDateTime.currentDateTime().toString("yyyy-MM-dd hh:mm:ss")), "lastDate": str(QDateTime.currentDateTime().toString("yyyy-MM-dd hh:mm:ss")) })) return id def setCurProPath(self,id): if id == common.emputyProId: self.path = "" self.historyPath = "" self.logPath = "" self.reportPath = "" else: self.path = os.path.join(self.basePath, id) self.historyPath = os.path.join(self.path, "history") self.logPath = os.path.join(self.path, "log") self.reportPath = os.path.join(self.path, "report") @pyqtSlot(list, result=bool) def saveProTask(self, taskInfo, params): try: proId = self.currentProId param = {"task": taskInfo, "params": params} return self.updateProTask(proId, param) except Exception as e: print(f"Error addTask: {e}") return False @pyqtSlot(str, QVariant, result=bool) def updateProTask(self, id, param = None): try: if not isinstance(param, dict): param = param.toVariant() data = {} infoPath = os.path.join(self.basePath, id, "task.txt") if "task" in param: data["task"] = param["task"] if "params" in param: data["params"] = param["params"] with open(infoPath, 'w', encoding="utf-8") as file: json.dump(data, file, indent=4) return True except Exception as e: return False @pyqtSlot(str,QVariant, result=bool) def addHistory(self, proId, historyInfo): try: if not isinstance(historyInfo, dict): historyInfo = historyInfo.toVariant() historyInfo["time"] = str(QDateTime.currentDateTime().toString("yyyy-MM-dd hh:mm:ss")) userData = userManager.getCurrentUser() userName = "unknown" if userData: userName = userData["name"] historyInfo["operator"] = userName return self.updateHistory(proId, historyInfo) except Exception as e: print(f"Error addHistory: {e}") return False @pyqtSlot(str, QVariant, result=bool) def updateHistory(self, id, param=None): try: if not isinstance(param, dict): param = param.toVariant() # 设置命令历史文件路径 self.cmdHistoryPath = os.path.join(self.basePath, id, "cmdhistory.txt") self.add_log(param) # 添加日志到缓存 return True except Exception as e: log.error(f"更新历史记录失败: {str(e)}") # 记录错误日志 return False def add_log(self, data): """添加日志到缓存""" try: self.log_buffer.append(data) if len(self.log_buffer) >= self.batch_size: self.flush_logs() # 达到条数限制时立即写入 except Exception as e: print(f"添加日志错误: {str(e)}") def flush_logs(self): """将缓存中的日志写入文件""" if not self.log_buffer: return # 如果缓存为空则不写入 log_path = self.cmdHistoryPath # 获取日志文件路径 if not log_path: self.log_buffer.clear() return try: # 检查文件大小并进行轮换 if os.path.exists(log_path) and os.path.getsize(log_path) > 100*1024*1024: # 100MB self.rotate_history_file(log_path) with open(log_path, "a+", encoding='utf-8') as f: for entry in self.log_buffer: json.dump(entry, f, ensure_ascii=False) f.write('\n') self.log_buffer.clear() # 清空缓存 except Exception as e: print(f"写入日志失败: {str(e)}") # def stop(self): # """停止日志写入""" # # self.timer.stop() # self.flush_logs() # 在停止前写入剩余日志 def rotate_history_file(self, file_path): """日志轮转实现""" base_name = os.path.basename(file_path).split('.')[0] log_dir = os.path.dirname(file_path) # 删除最旧日志 existing_logs = sorted([f for f in os.listdir(log_dir) if f.startswith(base_name)]) if len(existing_logs) >= 30: os.remove(os.path.join(log_dir, existing_logs[0])) # 重命名当前日志 timestamp = QDateTime.currentDateTime().toString("yyyyMMddHHmmss") new_name = f"{base_name}_{timestamp}.bak.txt" os.rename(file_path, os.path.join(log_dir, new_name)) @pyqtSlot(str, QVariant, result=bool) def update(self, id, param = None): try: if not isinstance(param, dict): param = param.toVariant() data = {} infoPath = os.path.join(self.basePath, id, "info.txt") with open(infoPath, 'r', encoding="utf-8") as file: content = file.read() data = json.loads(content) if "name" in param: data["name"] = param["name"] if "remark" in param: data["remark"] = param["remark"] if "attr" in param: data["attr"] = param["attr"] data["lastDate"] = str(QDateTime.currentDateTime().toString("yyyy-MM-dd hh:mm:ss")) with open(infoPath, 'w', encoding="utf-8") as file: json.dump(data, file, indent=4) return True except Exception as e: return False @pyqtSlot(str, result=bool) @pyqtSlot(list, result=bool) def delete(self, id): try: if isinstance(id, str): proDir = os.path.join(self.basePath, id) shutil.rmtree(proDir, ignore_errors=False, onerror=None) return True else: ids = id for _id in ids: proDir = os.path.join(self.basePath, _id) shutil.rmtree(proDir, ignore_errors=False, onerror=None) return True except Exception as e: print(f"Error deletePro: {e}") return False @pyqtSlot(str, result=bool) def isProjectExist(self, id): if isinstance(id, str): if id in self.projectDict: return True else: return False else: return False @pyqtSlot(str, result=bool) def updateProId(self, id): if id == common.emputyProId: self.setCurProPath(id) self.sig_projectChanged.emit(id) return False self.currentProId = id self.currentProInfo = self.projectDict[id] self.setCurProPath(id) self.sig_projectChanged.emit(id) return True @pyqtSlot(str, result=bool) def setCurrentProId(self, id): if id == common.emputyProId: self.setCurProPath(id) self.sig_projectChanged.emit(id) return False if self.currentProId != id: self.currentProId = id self.currentProInfo = self.projectDict[id] self.setCurProPath(id) self.sig_projectChanged.emit(id) return True @pyqtSlot(result=QVariant) def getCurrentProInfo(self): return self.currentProInfo @pyqtSlot(result=str) def getCurrentProId(self): return self.currentProId @pyqtSlot(result=str) def getProPath(self): if self.currentProId != "": if not os.path.exists(self.path): os.mkdir(self.path) return self.path else: return "" @pyqtSlot(result=str) def getLogPath(self): if self.currentProId != "" and self.logPath != "": if not os.path.exists(self.logPath): os.mkdir(self.logPath) return self.logPath else: return "" @pyqtSlot(result=str) def getHistoryPath(self): if self.currentProId != "": if not os.path.exists(self.historyPath): os.mkdir(self.historyPath) return self.historyPath else: return "" @pyqtSlot(result=str) def getReportPath(self): if self.currentProId != "": if not os.path.exists(self.reportPath): os.mkdir(self.reportPath) return self.reportPath else: return "" @pyqtSlot(result=str) def getBasePath(self): return self.basePath @pyqtSlot(str, result=QVariant) def getInfo(self, id): try: if isinstance(id, str) and id == "all": self.projectDict = {} # 按时间排序 dirs = sorted([d for d in os.listdir(self.basePath) if os.path.isdir(os.path.join(self.basePath, d))], key=lambda x: os.path.getmtime(os.path.join(self.basePath, x)), reverse=True) # 遍历每个子目录 for dir_name in dirs: dir_path = os.path.join(self.basePath, dir_name) # 如果当前文件是info.txt,打印其路径 file = "info.txt" # 获取info文件的路径 file_path = os.path.join(dir_path, file) if os.path.exists(file_path): # 读取文件内容 with open(file_path, 'r', encoding="utf-8") as f: content = f.read() info = json.loads(content) self.projectDict[info["id"]] = info return self.projectDict else: file_path = os.path.join(self.basePath, id, "info.txt") if os.path.exists(file_path): with open(file_path, 'r', encoding="utf-8") as f: content = f.read() info = json.loads(content) return info except Exception as e: print(f"Error getInfo: {e}") return None @pyqtSlot(str, result=QVariant) def getProTask(self, id): try: if id == "": self.projectTaskDict[id] = {} return {} else: file_path = os.path.join(self.basePath, id, "task.txt") if os.path.exists(file_path): with open(file_path, 'r', encoding="utf-8") as f: content = f.read() task = json.loads(content) self.projectTaskDict[id] = task or {} return task or {} else: self.projectTaskDict[id] = {} return {} except Exception as e: self.projectTaskDict[id] = {} print(f"Error getInfo: {e}") return {} @pyqtSlot(str, result=list) def getCmdHistory(self, id): try: if id == "": return [] else: file_path = os.path.join(self.basePath, id, "cmdhistory.txt") if os.path.exists(file_path): with open(file_path, 'r', encoding="utf-8") as f: content = f.read() history_entries = content.strip().split('\n') if self.rowCount > len(history_entries): self.rowCount = 0 history = [] for entry in history_entries[-self.rowCount:]: history.append(json.loads(entry)) return history or [] else: return [] except Exception as e: print(f"Error getInfo: {e}") return [] projectManager = ProjectManager()