263 lines
10 KiB
Python
263 lines
10 KiB
Python
import json
|
||
import os
|
||
import re
|
||
import time
|
||
from datetime import datetime
|
||
import queue
|
||
from PyQt6 import *
|
||
from PyQt6.QtCore import *
|
||
from PyQt6.QtGui import *
|
||
from PyQt6.QtWidgets import *
|
||
from ui.Ui_logForm import Ui_logForm
|
||
from logModel.logManager import logManager
|
||
from projectModel.projectManager import projectManager
|
||
from common import common
|
||
from config import config
|
||
from logs import log
|
||
import copy
|
||
|
||
class LogThread(QThread):
|
||
|
||
def __init__(self, parent=None):
|
||
super().__init__(parent)
|
||
self.levels = {"DEBUG": ["0"], "INFO": ["0", "1"], "WARNING": ["0", "1", "2"], "ERROR": ["0", "1", "2", "3"]}
|
||
self.logQueue = queue.Queue()
|
||
self.maxCount = 500
|
||
self.msgDataMap = {}
|
||
# 增量标记:记录每个 id 上次渲染后新增了多少条
|
||
self.newCountMap = {}
|
||
|
||
def run(self):
|
||
while True:
|
||
if not self.logQueue.empty():
|
||
msgData = self.logQueue.get()
|
||
labs = self.levels.get(msgData.get("level", "DEBUG"), ["0"])
|
||
for lab in labs:
|
||
try:
|
||
all_key = lab
|
||
id_key = lab + str(msgData.get("tag", ""))
|
||
msgStr = f"<span style='color:{msgData['color']};'>{msgData['msg']}</span><br>"
|
||
|
||
# 更新 id 日志
|
||
if id_key not in self.msgDataMap:
|
||
self.msgDataMap[id_key] = []
|
||
self.msgDataMap[id_key].append(msgStr)
|
||
if len(self.msgDataMap[id_key]) > self.maxCount:
|
||
self.msgDataMap[id_key].pop(0)
|
||
|
||
# 更新 all 日志
|
||
if all_key not in self.msgDataMap:
|
||
self.msgDataMap[all_key] = []
|
||
self.msgDataMap[all_key].append(msgStr)
|
||
if len(self.msgDataMap[all_key]) > self.maxCount:
|
||
self.msgDataMap[all_key].pop(0)
|
||
|
||
# 标记有新增
|
||
self.newCountMap[id_key] = self.newCountMap.get(id_key, 0) + 1
|
||
self.newCountMap[all_key] = self.newCountMap.get(all_key, 0) + 1
|
||
|
||
except Exception as e:
|
||
print(e)
|
||
else:
|
||
QThread.msleep(10) # 队列空时休眠,降低 CPU
|
||
|
||
|
||
class LogForm(QWidget):
|
||
|
||
def __init__(self, parent=None):
|
||
super().__init__(parent)
|
||
self.ui = Ui_logForm()
|
||
self.ui.setupUi(self)
|
||
self.endStr = "---END"
|
||
self.timer = QTimer()
|
||
self.isScrollBottom = True
|
||
self.currentProId = ""
|
||
self.refreshCount = config.data["log"].get("refreshCount", 100)
|
||
self.logThread = LogThread()
|
||
self.levels = {"DEBUG": ["0"], "INFO": ["0", "1"], "WARNING": ["0", "1", "2"], "ERROR": ["0", "1", "2", "3"]}
|
||
self.levelIndex = 0
|
||
self.refreshTime = config.data["log"].get("refreshTime", 100)
|
||
self.tagText = ""
|
||
self.lastRenderedCount = 0 # 上次渲染的总条数
|
||
self.logArray = []
|
||
self.timer.timeout.connect(self.setLogPlainText)
|
||
logManager.logMsgData.connect(self.addLogMsgData, type=Qt.ConnectionType.UniqueConnection)
|
||
self.ui.pbOpen.clicked.connect(self.openLogPath)
|
||
self.ui.pbClear.clicked.connect(self.clearLog)
|
||
self.ui.cbLevel.currentIndexChanged.connect(self.comboxChange)
|
||
self.ui.cbTag.currentIndexChanged.connect(self.comboxChange)
|
||
self.ui.pbLock.clicked.connect(self.changeIsScrollBottom)
|
||
self.ui.plainTextEdit.verticalScrollBar().valueChanged.connect(self.scrollbarValueChanged)
|
||
projectManager.sig_projectChanged.connect(self.loadLog)
|
||
self.timer.start(self.refreshTime)
|
||
self.logThread.start()
|
||
|
||
def clearLog(self):
|
||
self.ui.plainTextEdit.clear()
|
||
self.logThread.msgDataMap = {}
|
||
self.logThread.newCountMap = {}
|
||
self.lastRenderedCount = 0
|
||
|
||
def changeIsScrollBottom(self):
|
||
self.isScrollBottom = not self.isScrollBottom
|
||
if self.isScrollBottom:
|
||
self.scrollToBottom()
|
||
self.ui.pbLock.setIcon(QIcon(":/resource/lock.svg"))
|
||
else:
|
||
self.ui.pbLock.setIcon(QIcon(":/resource/unlock.svg"))
|
||
|
||
def nowStr(self):
|
||
now = datetime.now()
|
||
ret = now.strftime('%H:%M:%S.') + f"{now.microsecond // 1000:03d}"
|
||
return ret
|
||
|
||
def _getCurrentId(self):
|
||
"""获取当前筛选条件对应的 id"""
|
||
level_index = self.ui.cbLevel.currentIndex()
|
||
tag_text = str(self.ui.cbTag.currentText())
|
||
if tag_text == "ALL":
|
||
return str(level_index)
|
||
else:
|
||
return str(level_index) + tag_text
|
||
|
||
def searchType(self):
|
||
"""切换筛选条件时全量重绘"""
|
||
self.timer.stop()
|
||
self.ui.plainTextEdit.clear()
|
||
self.lastRenderedCount = 0
|
||
|
||
current_id = self._getCurrentId()
|
||
if current_id in self.logThread.msgDataMap:
|
||
htmlText = "".join(self.logThread.msgDataMap[current_id])
|
||
self.ui.plainTextEdit.appendHtml(htmlText)
|
||
self.lastRenderedCount = len(self.logThread.msgDataMap[current_id])
|
||
# 清除新增标记
|
||
self.logThread.newCountMap[current_id] = 0
|
||
|
||
self.scrollToBottom()
|
||
self.timer.start(self.refreshTime)
|
||
|
||
def appendColoredText(self, text, font_color):
|
||
fmt = QTextCharFormat()
|
||
fmt.setForeground(QBrush(font_color))
|
||
self.ui.plainTextEdit.mergeCurrentCharFormat(fmt)
|
||
self.ui.plainTextEdit.appendPlainText(text)
|
||
|
||
def setLogPlainText(self):
|
||
"""增量更新:只追加新增的日志,不全量重绘"""
|
||
try:
|
||
current_id = self._getCurrentId()
|
||
|
||
if current_id not in self.logThread.msgDataMap:
|
||
return
|
||
|
||
new_count = self.logThread.newCountMap.get(current_id, 0)
|
||
if new_count <= 0:
|
||
return
|
||
|
||
data_list = self.logThread.msgDataMap[current_id]
|
||
total = len(data_list)
|
||
|
||
# 如果数据被截断(超过 maxCount),需要全量重绘
|
||
if self.lastRenderedCount > total:
|
||
self.ui.plainTextEdit.clear()
|
||
htmlText = "".join(data_list)
|
||
self.ui.plainTextEdit.appendHtml(htmlText)
|
||
self.lastRenderedCount = total
|
||
else:
|
||
# 只追加新增部分
|
||
new_items = data_list[self.lastRenderedCount:]
|
||
if new_items:
|
||
newHtml = "".join(new_items)
|
||
# 使用 moveCursor 追加到末尾,避免 clear
|
||
cursor = self.ui.plainTextEdit.textCursor()
|
||
cursor.movePosition(QTextCursor.MoveOperation.End)
|
||
cursor.insertHtml(newHtml)
|
||
self.lastRenderedCount = total
|
||
|
||
# 重置新增计数
|
||
self.logThread.newCountMap[current_id] = 0
|
||
|
||
self.scrollToBottom()
|
||
|
||
except Exception as e:
|
||
log.error(f"setLogPlainText exception: {e}")
|
||
|
||
def comboxChange(self, index):
|
||
self.searchType()
|
||
|
||
def openLogPath(self):
|
||
os.startfile(projectManager.getLogPath())
|
||
|
||
def scrollToBottom(self):
|
||
if self.isScrollBottom:
|
||
self.ui.plainTextEdit.verticalScrollBar().setValue(self.ui.plainTextEdit.verticalScrollBar().maximum())
|
||
|
||
def scrollbarValueChanged(self, value):
|
||
scrollbar = self.ui.plainTextEdit.verticalScrollBar()
|
||
max_value = scrollbar.maximum()
|
||
current_value = scrollbar.value()
|
||
|
||
if current_value == max_value:
|
||
self.isScrollBottom = True
|
||
self.ui.pbLock.setIcon(QIcon(":/resource/lock.svg"))
|
||
else:
|
||
self.isScrollBottom = False
|
||
self.ui.pbLock.setIcon(QIcon(":/resource/unlock.svg"))
|
||
|
||
def loadLog(self, proId):
|
||
self.clearLog()
|
||
logFilePath = logManager.loadProLog(proId)
|
||
try:
|
||
if os.path.exists(logFilePath):
|
||
with open(logFilePath, "r", encoding="utf-8") as f:
|
||
file_content = f.read()
|
||
log_entries = file_content.strip().split(f'{self.endStr}')
|
||
endCount = self.refreshCount
|
||
if self.refreshCount > len(log_entries):
|
||
endCount = 0
|
||
for entry in log_entries[-endCount:]:
|
||
if entry:
|
||
msgData = {}
|
||
msgData["time"] = ""
|
||
msgData["level"] = "DEBUG"
|
||
msgData["msg"] = ""
|
||
msgData["color"] = "blue"
|
||
msgData["tag"] = ""
|
||
try:
|
||
log_entry = json.loads(entry)
|
||
msgData["time"] = log_entry["time"]
|
||
msgData["level"] = log_entry["level"]
|
||
msgData["color"] = log_entry["color"]
|
||
msgData["msg"] = log_entry["msg"]
|
||
msgData["tag"] = log_entry["tag"]
|
||
msgData["searchText"] = common.level2Search(msgData["level"])
|
||
except:
|
||
print("invalid log line:", entry)
|
||
self.addLogMsg(msgData)
|
||
self.loadFinish()
|
||
except Exception as e:
|
||
log.error(f"log exception: {e}")
|
||
|
||
def loadFinish(self):
|
||
self.scrollToBottom()
|
||
|
||
def addLogMsg(self, msgData):
|
||
"""加载历史日志时直接放入 LogThread 队列"""
|
||
tag = msgData.get("tag", "")
|
||
if tag:
|
||
msgData["tag"] = str(tag)
|
||
if self.ui.cbTag.findText(str(tag)) == -1:
|
||
self.ui.cbTag.addItem(str(tag))
|
||
self.logThread.logQueue.put(msgData)
|
||
|
||
def addLogMsgData(self, msgData):
|
||
"""实时日志信号处理"""
|
||
tag = ""
|
||
if msgData.get("tag") is not None:
|
||
tag = str(msgData["tag"])
|
||
msgData["tag"] = tag
|
||
if self.ui.cbTag.findText(tag) == -1:
|
||
self.ui.cbTag.addItem(tag)
|
||
self.logThread.logQueue.put(msgData)
|