TG-PlatformPlus/logForm.py

263 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import json
import os
import re
import time
from datetime import datetime
import queue
from PyQt6 import *
from PyQt6.QtCore import *
from PyQt6.QtGui import *
from PyQt6.QtWidgets import *
from ui.Ui_logForm import Ui_logForm
from logModel.logManager import logManager
from projectModel.projectManager import projectManager
from common import common
from config import config
from logs import log
import copy
class LogThread(QThread):
def __init__(self, parent=None):
super().__init__(parent)
self.levels = {"DEBUG": ["0"], "INFO": ["0", "1"], "WARNING": ["0", "1", "2"], "ERROR": ["0", "1", "2", "3"]}
self.logQueue = queue.Queue()
self.maxCount = 500
self.msgDataMap = {}
# 增量标记:记录每个 id 上次渲染后新增了多少条
self.newCountMap = {}
def run(self):
while True:
if not self.logQueue.empty():
msgData = self.logQueue.get()
labs = self.levels.get(msgData.get("level", "DEBUG"), ["0"])
for lab in labs:
try:
all_key = lab
id_key = lab + str(msgData.get("tag", ""))
msgStr = f"<span style='color:{msgData['color']};'>{msgData['msg']}</span><br>"
# 更新 id 日志
if id_key not in self.msgDataMap:
self.msgDataMap[id_key] = []
self.msgDataMap[id_key].append(msgStr)
if len(self.msgDataMap[id_key]) > self.maxCount:
self.msgDataMap[id_key].pop(0)
# 更新 all 日志
if all_key not in self.msgDataMap:
self.msgDataMap[all_key] = []
self.msgDataMap[all_key].append(msgStr)
if len(self.msgDataMap[all_key]) > self.maxCount:
self.msgDataMap[all_key].pop(0)
# 标记有新增
self.newCountMap[id_key] = self.newCountMap.get(id_key, 0) + 1
self.newCountMap[all_key] = self.newCountMap.get(all_key, 0) + 1
except Exception as e:
print(e)
else:
QThread.msleep(10) # 队列空时休眠,降低 CPU
class LogForm(QWidget):
def __init__(self, parent=None):
super().__init__(parent)
self.ui = Ui_logForm()
self.ui.setupUi(self)
self.endStr = "---END"
self.timer = QTimer()
self.isScrollBottom = True
self.currentProId = ""
self.refreshCount = config.data["log"].get("refreshCount", 100)
self.logThread = LogThread()
self.levels = {"DEBUG": ["0"], "INFO": ["0", "1"], "WARNING": ["0", "1", "2"], "ERROR": ["0", "1", "2", "3"]}
self.levelIndex = 0
self.refreshTime = config.data["log"].get("refreshTime", 100)
self.tagText = ""
self.lastRenderedCount = 0 # 上次渲染的总条数
self.logArray = []
self.timer.timeout.connect(self.setLogPlainText)
logManager.logMsgData.connect(self.addLogMsgData, type=Qt.ConnectionType.UniqueConnection)
self.ui.pbOpen.clicked.connect(self.openLogPath)
self.ui.pbClear.clicked.connect(self.clearLog)
self.ui.cbLevel.currentIndexChanged.connect(self.comboxChange)
self.ui.cbTag.currentIndexChanged.connect(self.comboxChange)
self.ui.pbLock.clicked.connect(self.changeIsScrollBottom)
self.ui.plainTextEdit.verticalScrollBar().valueChanged.connect(self.scrollbarValueChanged)
projectManager.sig_projectChanged.connect(self.loadLog)
self.timer.start(self.refreshTime)
self.logThread.start()
def clearLog(self):
self.ui.plainTextEdit.clear()
self.logThread.msgDataMap = {}
self.logThread.newCountMap = {}
self.lastRenderedCount = 0
def changeIsScrollBottom(self):
self.isScrollBottom = not self.isScrollBottom
if self.isScrollBottom:
self.scrollToBottom()
self.ui.pbLock.setIcon(QIcon(":/resource/lock.svg"))
else:
self.ui.pbLock.setIcon(QIcon(":/resource/unlock.svg"))
def nowStr(self):
now = datetime.now()
ret = now.strftime('%H:%M:%S.') + f"{now.microsecond // 1000:03d}"
return ret
def _getCurrentId(self):
"""获取当前筛选条件对应的 id"""
level_index = self.ui.cbLevel.currentIndex()
tag_text = str(self.ui.cbTag.currentText())
if tag_text == "ALL":
return str(level_index)
else:
return str(level_index) + tag_text
def searchType(self):
"""切换筛选条件时全量重绘"""
self.timer.stop()
self.ui.plainTextEdit.clear()
self.lastRenderedCount = 0
current_id = self._getCurrentId()
if current_id in self.logThread.msgDataMap:
htmlText = "".join(self.logThread.msgDataMap[current_id])
self.ui.plainTextEdit.appendHtml(htmlText)
self.lastRenderedCount = len(self.logThread.msgDataMap[current_id])
# 清除新增标记
self.logThread.newCountMap[current_id] = 0
self.scrollToBottom()
self.timer.start(self.refreshTime)
def appendColoredText(self, text, font_color):
fmt = QTextCharFormat()
fmt.setForeground(QBrush(font_color))
self.ui.plainTextEdit.mergeCurrentCharFormat(fmt)
self.ui.plainTextEdit.appendPlainText(text)
def setLogPlainText(self):
"""增量更新:只追加新增的日志,不全量重绘"""
try:
current_id = self._getCurrentId()
if current_id not in self.logThread.msgDataMap:
return
new_count = self.logThread.newCountMap.get(current_id, 0)
if new_count <= 0:
return
data_list = self.logThread.msgDataMap[current_id]
total = len(data_list)
# 如果数据被截断(超过 maxCount需要全量重绘
if self.lastRenderedCount > total:
self.ui.plainTextEdit.clear()
htmlText = "".join(data_list)
self.ui.plainTextEdit.appendHtml(htmlText)
self.lastRenderedCount = total
else:
# 只追加新增部分
new_items = data_list[self.lastRenderedCount:]
if new_items:
newHtml = "".join(new_items)
# 使用 moveCursor 追加到末尾,避免 clear
cursor = self.ui.plainTextEdit.textCursor()
cursor.movePosition(QTextCursor.MoveOperation.End)
cursor.insertHtml(newHtml)
self.lastRenderedCount = total
# 重置新增计数
self.logThread.newCountMap[current_id] = 0
self.scrollToBottom()
except Exception as e:
log.error(f"setLogPlainText exception: {e}")
def comboxChange(self, index):
self.searchType()
def openLogPath(self):
os.startfile(projectManager.getLogPath())
def scrollToBottom(self):
if self.isScrollBottom:
self.ui.plainTextEdit.verticalScrollBar().setValue(self.ui.plainTextEdit.verticalScrollBar().maximum())
def scrollbarValueChanged(self, value):
scrollbar = self.ui.plainTextEdit.verticalScrollBar()
max_value = scrollbar.maximum()
current_value = scrollbar.value()
if current_value == max_value:
self.isScrollBottom = True
self.ui.pbLock.setIcon(QIcon(":/resource/lock.svg"))
else:
self.isScrollBottom = False
self.ui.pbLock.setIcon(QIcon(":/resource/unlock.svg"))
def loadLog(self, proId):
self.clearLog()
logFilePath = logManager.loadProLog(proId)
try:
if os.path.exists(logFilePath):
with open(logFilePath, "r", encoding="utf-8") as f:
file_content = f.read()
log_entries = file_content.strip().split(f'{self.endStr}')
endCount = self.refreshCount
if self.refreshCount > len(log_entries):
endCount = 0
for entry in log_entries[-endCount:]:
if entry:
msgData = {}
msgData["time"] = ""
msgData["level"] = "DEBUG"
msgData["msg"] = ""
msgData["color"] = "blue"
msgData["tag"] = ""
try:
log_entry = json.loads(entry)
msgData["time"] = log_entry["time"]
msgData["level"] = log_entry["level"]
msgData["color"] = log_entry["color"]
msgData["msg"] = log_entry["msg"]
msgData["tag"] = log_entry["tag"]
msgData["searchText"] = common.level2Search(msgData["level"])
except:
print("invalid log line:", entry)
self.addLogMsg(msgData)
self.loadFinish()
except Exception as e:
log.error(f"log exception: {e}")
def loadFinish(self):
self.scrollToBottom()
def addLogMsg(self, msgData):
"""加载历史日志时直接放入 LogThread 队列"""
tag = msgData.get("tag", "")
if tag:
msgData["tag"] = str(tag)
if self.ui.cbTag.findText(str(tag)) == -1:
self.ui.cbTag.addItem(str(tag))
self.logThread.logQueue.put(msgData)
def addLogMsgData(self, msgData):
"""实时日志信号处理"""
tag = ""
if msgData.get("tag") is not None:
tag = str(msgData["tag"])
msgData["tag"] = tag
if self.ui.cbTag.findText(tag) == -1:
self.ui.cbTag.addItem(tag)
self.logThread.logQueue.put(msgData)