#!/opt/homebrew/bin/python3 # -*- coding:utf-8 -*- import re import shutil import zipfile import hashlib import json import sqlite3 import os from dog import dog from dog import suDog from markdown import markdown from PyQt6.QtCore import * from logs import log from PyQt6.QtGui import * from PyQt6.QtWidgets import * from PyQt6.QtSerialPort import QSerialPortInfo from configparser import ConfigParser from config import config from influxDB import influxdb from globals import _G from devmodelModel.devModelManager import devModelManager from deviceModel.deviceManager import deviceManager from userModel.userManager import userManager from taskModel.taskManager import taskManager from instructionModel.instructionManager import instructionManager class QxStandardItem(QStandardItem): def __init__(self, text="", max_length=200, *args): super().__init__(*args) self.max_length = max_length self.setText(text) def setText(self, text): super().setText(text) # 检查文本长度并设置工具提示 if len(text) > self.max_length: self.setToolTip(text) # 设置工具提示为完整文本 else: self.setToolTip("") # 清空工具提示 class AlignDelegate(QStyledItemDelegate): def __init__(self, view): super(AlignDelegate, self).__init__() self.view = view def paint(self, painter, option, index): # 设置文本对齐方式为右对齐和垂直居中 option.displayAlignment = Qt.AlignmentFlag.AlignLeft | Qt.AlignmentFlag.AlignVCenter # 获取单元格的文本 text = index.model().data(index, role=Qt.ItemDataRole.DisplayRole) # 创建一个新的 QStyleOptionViewItem,因为我们不能直接修改 option.text option_text = QStyleOptionViewItem() option_text.rect = option.rect option_text.state = option.state option_text.text = text option_text.displayAlignment = option.displayAlignment # 使用 QPainter 绘制文本,这里我们不直接使用 option 来绘制,因为我们想要自定义文本的绘制 painter.setFont(option.font) # 14 像素的缩进量 fm = QFontMetricsF(option.font) xIndent = 14 # 缩进量,单位为像素 # Access the selection model from the view selection_model = self.view.selectionModel() # if not index.flags() & Qt.ItemFlag.ItemIsEditable: # 判断 item 是否可编辑 # painter.setPen(QColor("#BFBFBF")) # else: # Check if the index is selected using the selection model if selection_model.isSelected(index): # 设置选中状态下的文本颜色 painter.setPen(option.palette.highlightedText().color()) else: # 设置未选中状态下的文本颜色 painter.setPen(option.palette.text().color()) rect = QRectF(option.rect.left() + xIndent, option.rect.top(), option.rect.width() - xIndent, option.rect.height()) elidedText = painter.fontMetrics().elidedText(text, Qt.TextElideMode.ElideRight, int(rect.width())) painter.drawText(rect, option.displayAlignment, elidedText) class AlignDevDelegate(QStyledItemDelegate): def __init__(self, view, hideCloumn = 1, devCloumn = 3): super(AlignDevDelegate, self).__init__() self.view = view self.hideCloumn = hideCloumn self.devCloumn = devCloumn def paint(self, painter, option, index): if index.column() == self.hideCloumn: # 只对第一列进行居中设置 option.displayAlignment = Qt.AlignmentFlag.AlignCenter | Qt.AlignmentFlag.AlignVCenter else: option.displayAlignment = Qt.AlignmentFlag.AlignLeft | Qt.AlignmentFlag.AlignVCenter # 以下是你的原始代码 text = index.model().data(index, role=Qt.ItemDataRole.DisplayRole) option_text = QStyleOptionViewItem() option_text.rect = option.rect option_text.state = option.state option_text.text = text option_text.displayAlignment = option.displayAlignment xIndent = 2 # 缩进量,单位为像素 painter.setFont(option.font) fm = QFontMetricsF(option.font) selection_model = self.view.selectionModel() icontype = index.sibling(index.row(), 3).data(Qt.ItemDataRole.DisplayRole) if index.column() == self.devCloumn: if len(text) <= 0: #改变背景色rgba(255, 255, 255, 0) #画红色框 painter.setPen(QColor(255, 0, 0)) painter.setBrush(QColor(255, 0, 0, 0)) #画缩小2px的矩形 painter.drawRect(option.rect.adjusted(0, 6, -2, -6)) if selection_model.isSelected(index): painter.setPen(option.palette.highlightedText().color()) else: painter.setPen(option.palette.text().color()) rect = QRectF(option.rect.left() + xIndent, option.rect.top(), option.rect.width() - xIndent, option.rect.height()) elidedText = painter.fontMetrics().elidedText(text, Qt.TextElideMode.ElideRight, int(rect.width())) painter.drawText(rect, option.displayAlignment, elidedText) if index.column() == 3: if icontype == "任务": icon = QIcon(":/resource/child.png") # 替换为你的图标路径 icon_rect = QRectF(option.rect.left() + 25, option.rect.top(), 20, option.rect.height()) # 图标的位置和大小 icon.paint(painter, icon_rect.toRect()) class AlignNoDelegate(QStyledItemDelegate): def __init__(self, view, hideCloumn = 1): super(AlignNoDelegate, self).__init__() self.view = view self.hideCloumn = hideCloumn def paint(self, painter, option, index): if index.column() == self.hideCloumn: # 只对第一列进行居中设置 option.displayAlignment = Qt.AlignmentFlag.AlignCenter | Qt.AlignmentFlag.AlignVCenter else: option.displayAlignment = Qt.AlignmentFlag.AlignLeft | Qt.AlignmentFlag.AlignVCenter # 以下是你的原始代码 text = index.model().data(index, role=Qt.ItemDataRole.DisplayRole) option_text = QStyleOptionViewItem() option_text.rect = option.rect option_text.state = option.state option_text.text = text option_text.displayAlignment = option.displayAlignment xIndent = 2 # 缩进量,单位为像素 painter.setFont(option.font) fm = QFontMetricsF(option.font) selection_model = self.view.selectionModel() if selection_model.isSelected(index): painter.setPen(option.palette.highlightedText().color()) else: painter.setPen(option.palette.text().color()) rect = QRectF(option.rect.left() + xIndent, option.rect.top(), option.rect.width() - xIndent, option.rect.height()) elidedText = painter.fontMetrics().elidedText(text, Qt.TextElideMode.ElideRight, int(rect.width())) painter.drawText(rect, option.displayAlignment, elidedText) class AlignTaskNoDelegate(QStyledItemDelegate): def __init__(self, view, hideCloumn = 1): super(AlignTaskNoDelegate, self).__init__() self.view = view self.hideCloumn = hideCloumn def paint(self, painter, option, index): if index.column() == self.hideCloumn: # 只对第一列进行居中设置 option.displayAlignment = Qt.AlignmentFlag.AlignCenter | Qt.AlignmentFlag.AlignVCenter else: option.displayAlignment = Qt.AlignmentFlag.AlignLeft | Qt.AlignmentFlag.AlignVCenter # 以下是你的原始代码 text = index.model().data(index, role=Qt.ItemDataRole.DisplayRole) option_text = QStyleOptionViewItem() option_text.rect = option.rect option_text.state = option.state option_text.text = text option_text.displayAlignment = option.displayAlignment painter.setFont(option.font) fm = QFontMetricsF(option.font) icontype = index.sibling(index.row(), 8).data(Qt.ItemDataRole.DisplayRole) selection_model = self.view.selectionModel() if selection_model.isSelected(index): painter.setPen(option.palette.highlightedText().color()) else: painter.setPen(option.palette.text().color()) rect = QRectF(option.rect.left(), option.rect.top(), option.rect.width() , option.rect.height()) elidedText = painter.fontMetrics().elidedText(text, Qt.TextElideMode.ElideRight, int(rect.width())) if index.column() == self.hideCloumn: # 只对第一列进行居中设置 painter.drawText(rect, option.displayAlignment, elidedText) else: rect = QRectF(option.rect.left(), option.rect.top(), option.rect.width()-22 , option.rect.height()) painter.drawText(rect, option.displayAlignment, elidedText) if index.column() == 2: if icontype == "parent": icon = QIcon(":/resource/parent.png") # 替换为你的图标路径 icon_rect = QRectF(option.rect.right() - 22, option.rect.top(), 20, option.rect.height()) # 图标的位置和大小 icon.paint(painter, icon_rect.toRect()) elif icontype == "child": icon = QIcon(":/resource/child.png") # 替换为你的图标路径 icon_rect = QRectF(option.rect.right() - 22, option.rect.top(), 20, option.rect.height()) # 图标的位置和大小 icon.paint(painter, icon_rect.toRect()) class DragDropInfo: def __init__(self, source_index, target_index): self.source_index = source_index self.target_index = target_index class MarkdownViewer(QDialog): def __init__(self): super().__init__() self.initUI() def initUI(self): self.setWindowTitle("帮助文档") self.setGeometry(685, 60, 800, 900) self.textEdit = QTextEdit() self.textEdit.setReadOnly(True) self.textEdit.setAcceptRichText(True) layout = QVBoxLayout() layout.addWidget(self.textEdit) self.setLayout(layout) def showMarkdown(self, markdown_file): self.fileName = markdown_file helpPath = os.path.join(common.appPath, self.fileName) with open(helpPath, 'r',encoding='utf-8') as file: self.setMarkdown(file.read()) self.exec() def setFileName(self, fileName): self.fileName = fileName helpPath = os.path.join(self.appPath, self.fileName) with open(helpPath, 'r',encoding='utf-8') as file: self.setMarkdown(file.read()) def setMarkdown(self, markdown_text): html_text = markdown(markdown_text) self.textEdit.setHtml(html_text) class TaskProgressBar(QProgressBar): def __init__(self, taskId= "",parent=None): super(TaskProgressBar, self).__init__(parent) self.taskId = taskId self.setAlignment(Qt.AlignmentFlag.AlignCenter) self.setValue(0) self.setTextVisible(False) def updateProgress(self, taskId, value, maxValue): if self.taskId != taskId: return if maxValue == 0: value = 0 self.setTextVisible(True) if maxValue == -1: self.setTextVisible(False) maxValue = value self.setMaximum(maxValue) self.setValue(value) class Alert(QMessageBox): def __init__(self, parent=None): super(Alert, self).__init__(parent) self.setIcon(QMessageBox.Icon.Information) self.setWindowFlags(self.windowFlags() | Qt.WindowType.WindowStaysOnTopHint) self.setStandardButtons(QMessageBox.StandardButton.Ok) def event(self, event): if event.type() == QEvent.Type.WindowDeactivate: # 在这里处理QMessageBox失去焦点事件 self.close() return super().event(event) def showMsg(self, title, msg, time): self.setWindowTitle(title) self.setText(msg) self.show() if time != 0: timer = QTimer(self) timer.singleShot(time, self.close) def onWindowClose(self): self.close() class Scanf(QInputDialog): scanfFinish = pyqtSignal(str) def __init__(self, parent=None): super(Scanf, self).__init__(parent) self.setWindowFlags(self.windowFlags() | Qt.WindowType.WindowStaysOnTopHint) self.setInputMode(QInputDialog.InputMode.TextInput) self.setOkButtonText("确定") self.setCancelButtonText("取消") self.setWindowFlags(Qt.WindowType.WindowCloseButtonHint) self.accepted.connect(self.sendScanf) self.rejected.connect(self.sendEmputy) def event(self, event): if event.type() == QEvent.Type.WindowDeactivate: self.close() return super().event(event) def sendScanf(self): self.scanfFinish.emit(self.textValue()) def sendEmputy(self): self.scanfFinish.emit("") def showMsg(self, title, msg, value): self.setWindowTitle(title) self.setLabelText(msg) self.setTextValue(value) self.show() def onWindowClose(self): self.close() # QComboBox禁用鼠标滚轮 class NoWheelComboBox(QComboBox): def __init__(self, parent=None): super().__init__(parent) def wheelEvent(self, event): return class Common(QObject): sig_appChanged = pyqtSignal() sig_changeQml = pyqtSignal(str) sig_unChecked = pyqtSignal() sig_changeMainQml = pyqtSignal(str) showAlert = pyqtSignal(str, str, int) showScanf = pyqtSignal(str, str, str) scanfFinish = pyqtSignal(str) appReady = pyqtSignal(str) registQml = pyqtSignal(str,str) windowClose = pyqtSignal() def __init__(self): super().__init__() self.scriptFile = dict() self.appList = [] self.appPath = os.path.dirname(__file__) self.fstPath = "" if self.fstPath == "": self.fstPath = "qml" if "_internal" in self.appPath: log.error("存在 _internal") self.fstPath = "_internal/qml" else: log.error("不存在 _internal") self.emputyProId = "00000000-0000-0000-0000-000000000000" self.defaultGroupId = "00000000-0000-0000-0000-000000000000" self.systemInterfaceUuid = "00000000-0000-0000-0000-000000000000" self.noChooseUuid = "0" self.scanfResult = "" self.qscript = {} self.setSriptFile() self.updateAppList() self.showAlert.connect(self.onShowAlert) self.showScanf.connect(self.onShowScanf) def onWindowClose(self): self.windowClose.emit() @pyqtSlot(result=QVariant) def getConfig(self): return config.data @pyqtSlot(result=str) def getCurrentUser(self): currentUser = userManager.getCurrentUser() if "name" in currentUser: return currentUser["name"] else: return "" @pyqtSlot(result=QVariant) def getAllG(self): return _G.getall() @pyqtSlot(str, result=QVariant) def getG(self,name): return _G.get(name) @pyqtSlot(str,str, result=bool) def setG(self, name, json_str): value = json.loads(json_str) _G.set(name, value) return True @pyqtSlot(result=str) def firstPath(self): return self.fstPath def resizeDefTableView(self, tableView, indexs): itemWidth = int( (tableView.width()) / len(indexs) ) for index in indexs: tableView.setColumnWidth(index, itemWidth) def resizeTableView(self, tableView, indexs): itemWidth = int( (tableView.width()) / len(indexs) ) for index in indexs: tableView.setColumnWidth(index, itemWidth) def resizeNumberTableView(self, tableView, indexs): totalColumns = len(indexs) scollbarWidth = 0 if tableView.verticalScrollBar().isVisible(): scollbarWidth = tableView.verticalScrollBar().width() # Subtract the fixed width of the first column totalWidth = tableView.width() - tableView.verticalHeader().width() - scollbarWidth # Subtract the fixed width of the first column avgWidth = int( totalWidth / totalColumns ) # Calculate the average width for the remaining columns for index in indexs: tableView.setColumnWidth(index, avgWidth) # Set the average width for each column in the index list def resizeNoTableView(self, tableView, indexs, colIndex = 1): totalColumns = len(indexs) totalWidth = tableView.width() - tableView.columnWidth(colIndex) # Subtract the fixed width of the first column avgWidth = int( totalWidth / (totalColumns - 1) ) # Calculate the average width for the remaining columns for index in indexs: tableView.setColumnWidth(index, avgWidth) # Set the average width for each column in the index list def onShowAlert(self, title, msg, time): self.alert = Alert() self.windowClose.connect(self.alert.onWindowClose) self.alert.showMsg(title, msg, time) def onShowScanf(self, title, msg, value): self.scanf = Scanf() self.windowClose.connect(self.scanf.onWindowClose) self.scanf.scanfFinish.connect(self.onScanfFinish) self.scanf.showMsg(title, msg, value) def onScanfFinish(self, msg): self.scanfResult = msg @pyqtSlot(str,str,str,str, result=bool) def initAppScript(self, appId, instancName, scriptFile, plusFile="plus"): try: if dog.isDogAlive() == 1 or suDog.isDogAlive() == 1: scriptPath = os.path.join(self.appPath,"qml",plusFile,appId,"script", scriptFile) with open(scriptPath, "r") as f: script = f.read() exec(script, self.qscript) if self.qscript != {}: self.appReady.emit(instancName) return True return False except Exception as e: print(e) return False @pyqtSlot(str,str,str,str,str, result=bool) def registAppQml(self, appId, instancName, className, scriptFile, plusFile="plus"): try: if dog.isDogAlive() == 1 or suDog.isDogAlive() == 1: scriptPath = os.path.join(self.appPath,"qml",plusFile,appId,"script", scriptFile) with open(scriptPath, "r") as f: script = f.read() exec(script, self.qscript) if self.qscript != {}: self.registQml.emit(instancName, className) return True return False except Exception as e: print(e) return False # 任务详细信息 def getTaskDetails(self, taskInstructions): for taskInstruction in taskInstructions: if taskInstruction["target_type"] == "instruction": target_param = taskInstruction["target_param"] # 指令参数 instructionInfo = instructionManager.getInfo(str(taskInstruction["target_id"])) instructionInfo["target_param"] = target_param taskInstruction["instructionInfo"] = instructionInfo #指令 device_id = taskInstruction["device_id"] interfaceIndex = taskInstruction["interface_index"] if interfaceIndex == "" or interfaceIndex == None: interfaceIndex = 0 else: interfaceIndex = int(interfaceIndex) device = deviceManager.getInfo(device_id) if device != None: devInterface = device["interface"] dev_model_id = device["dev_model_id"] devModel = devModelManager.getInfo(dev_model_id) devInterfaceKeys = [] if devModel != None: attr_json_dict = devModel["attr"] devmodelInterfacetypes = {} if "interfaceType" in attr_json_dict: devmodelInterfacetypes = attr_json_dict["interfaceType"] devInterfaceKeys = list(devmodelInterfacetypes.keys()) if(interfaceIndex < len(devInterface)): if (len(devInterfaceKeys) > interfaceIndex): interfaceName = devInterfaceKeys[interfaceIndex] interface = devInterface[interfaceIndex] device["interfaceName"] = interfaceName #接口名称 taskInstruction["interface"] = interface #接口 taskInstruction["device"] = device #设备 elif taskInstruction["target_type"] == "task": task_id = taskInstruction["target_id"] taskInfo = taskManager.getInfo(str(task_id)) taskInstruction["taskInfo"] = taskInfo #任务 def getInstructionDetail(self, instructionId, attr): try: taskInstruction = { "target_type": "instruction", "target_param": attr["param"], "target_id": instructionId } deviceId = attr["deviceId"] interfaceIndex = int(attr["interfaceIndex"]) instructionInfo = instructionManager.getInfo(str(instructionId)) taskInstruction["instructionInfo"] = instructionInfo #指令 device = deviceManager.getInfo(deviceId) if device != None: devInterface = device["interface"] dev_model_id = device["dev_model_id"] devModel = devModelManager.getInfo(dev_model_id) devInterfaceKeys = [] if devModel != None: attr_json_dict = devModel["attr"] devmodelInterfacetypes = {} if "interfaceType" in attr_json_dict: devmodelInterfacetypes = attr_json_dict["interfaceType"] devInterfaceKeys = list(devmodelInterfacetypes.keys()) if(interfaceIndex < len(devInterface)): if (len(devInterfaceKeys) > interfaceIndex): interfaceName = devInterfaceKeys[interfaceIndex] interface = devInterface[interfaceIndex] device["interfaceName"] = interfaceName taskInstruction["interface"] = interface #接口 taskInstruction["device"] = device #设备 else: return None return taskInstruction except Exception as e: print(e) return None #日志等级转换 def level2Search(self, level): if level == "DEBUG": return "0" elif level == "INFO": return "01" elif level == "WARNING": return "012" elif level == "ERROR": return "0123" @pyqtSlot(str) def changeQml(self, qmlfile): self.sig_changeQml.emit(qmlfile) def changeMainQml(self, qmlfile): self.sig_changeMainQml.emit(qmlfile) #显示隐藏App @pyqtSlot(str,bool, result=bool) def showApp(self, id, show): _path = os.path.join(self.appPath, "qml", "plus") try: appDir = os.path.join(_path,id) with open(os.path.join(appDir,'info'), 'r', encoding="utf-8") as file: info = file.read() info = json.loads(info) info["General"]["Show"] = show with open(os.path.join(appDir,'info'), 'w', encoding="utf-8") as file: json.dump(info, file, indent=4) self.sig_appChanged.emit() return True except Exception as e: print(f"Error showApp: {e}") return False # 卸载App @pyqtSlot(str, result=bool) def deleteApp(self, id): _path = os.path.join(self.appPath, "qml", "plus") try: appDir = os.path.join(_path, id) shutil.rmtree(appDir, ignore_errors=False, onerror=None) self.sig_appChanged.emit() return True except Exception as e: print(f"Error deleteApp: {e}") return False # 安装App @pyqtSlot(str, result=bool) def installApp(self, filePath): _path = os.path.join(self.appPath, "qml", "plus") filePath = self.appfilePath(filePath) # 检查文件是否存在 if not os.path.isfile(filePath): print(f"File does not exist: {filePath}") return False # 检查文件是否可读 try: with open(filePath, "rb") as file: file.read() except FileNotFoundError: print(f"File not found: {filePath}") return False except PermissionError: print(f"Permission denied: {filePath}") return False # 解压文件 try: # 获取解压目录名 zipJson = self.zipJson(filePath) # 获取配置项的值 appId = zipJson["General"]["AppId"] appName = zipJson["General"]["Name"] # extracted_dirname , ext = os.path.splitext(filePath) appDir = os.path.join(_path,appId) os.makedirs( appDir, exist_ok=True) shutil.unpack_archive(filePath, appDir, "zip") os.remove(filePath) if not self.compare_md5_txt(appDir): print("md5 校验失败") shutil.rmtree(appDir, ignore_errors=False, onerror=None) return False self.sig_appChanged.emit() return True except Exception as e: print(f"Error unpacking archive: {e}") return False @pyqtSlot() def updateAppList(self): _path = os.path.join(self.appPath, "qml", "plus") self.appList = [] # 遍历目录下的所有子目录 for root, dirs, files in os.walk(_path): # 遍历当前目录下的所有文件,找到info文件 for file in files: if file == 'info': # 获取info文件的路径 file_path = os.path.join(root, file) # 读取文件内容 with open(file_path, 'r', encoding="utf-8") as f: content = f.read() info = json.loads(content) # 将文件内容保存到列表中 self.appList.append( { "id": info["General"]["AppId"], "info": info, "show": info["General"]["Show"] } ) # # 将app_list列表转换为JSON文件 # with open(os.path.join(_path,'apps.json'), 'w', encoding="utf-8") as f: # json.dump(self.appList, f, indent=4) @pyqtSlot(result=list) def appList(self): return self.appList @pyqtSlot(result=str) def getSystemInterfaceUuid(self): return self.systemInterfaceUuid #获取校验脚本 @pyqtSlot(result=list) def getCheckFileName(self): try: keys = self.scriptFile.keys() result = [] for key in keys: if "check_" in key: result.append(key) return result except Exception as e: log.error(e) return None #获取全部脚本 @pyqtSlot(result=list) def getAllFileNames(self): try: return list(self.scriptFile.keys()) except Exception as e: log.error(e) return None # 获取串口列表 @pyqtSlot(result=list) def getComList(self): comList = list() for a in QSerialPortInfo.availablePorts(): comList.append(a.portName()) return comList def calculate_file_md5(self, file_path): # 计算给定文件的MD5值。 md5 = hashlib.md5() with open(file_path, "rb") as f: for chunk in iter(lambda: f.read(4096), b""): md5.update(chunk) return md5.hexdigest() def calculate_directory_md5(self, directory_path): #计算给定目录下除md5.txt文件以外的全部文件的MD5值。 md5_values = {} with os.scandir(directory_path) as it: for entry in it: if entry.is_file() and entry.name != "md5.txt": file_md5 = self.calculate_file_md5(entry.path) md5_values[entry.name] = file_md5 elif entry.is_dir(): sub_directory_md5 = self.calculate_directory_md5(entry.path) md5_values[entry.name] = sub_directory_md5 return md5_values def compare_md5_txt(self, directory_path): #读取md5.txt文件中的内容,与calculate_directory_md5函数返回的字典进行比较。 with open(os.path.join(directory_path, "md5.txt"), "r") as f: md5_txt = f.read() directory_md5 = self.calculate_directory_md5(directory_path) json_str = json.dumps(directory_md5, indent=4, sort_keys=True) # print(json_str) md5_str = hashlib.md5(json_str.encode("utf-8")).hexdigest() # print(md5_str) # print(md5_txt) if md5_str == md5_txt: return True else: return False def appfilePath(self, path): _path = os.path.join(self.appPath, "qml", "plus") path = path.replace("file:///", "") filename = os.path.basename(path) filename, ext = os.path.splitext(filename) new_path = os.path.join(_path , filename +".zip") shutil.copyfile(path, new_path) return new_path def zipJson(self, zipPath): # 打开zip文件 with zipfile.ZipFile(zipPath, "r") as zip_ref: with zip_ref.open("info") as f: info_txt = f.read() return json.loads(info_txt) def setSriptFile(self, conf = None): _path = os.path.join(self.appPath, "scripts") if conf: _path = conf["path"] for file_name in os.listdir(_path): file_path = os.path.join(_path, file_name) if os.path.isfile(file_path): with open(file_path, "r", encoding="utf-8") as file: self.scriptFile[file_name] = file.read() return self.scriptFile def parseCodecPys(self, s): try: result = re.search(r"codec_(\w+).pys", s) if result: return result.group(1) else: return "" except: return "" def parseCheckPys(self, s): try: result = re.search(r"check_(\w+).pys", s) if result: return result.group(1) else: return "" except: return "" # base64转Pixmap def base64ToPixmap(self, base64_str): return QPixmap.fromImage(QImage.fromData(QByteArray.fromBase64(base64_str.encode("utf-8")))) #QImage转base64 def base64Data(self, image): ba = QByteArray() buffer = QBuffer(ba) buffer.open(QIODevice.OpenModeFlag.WriteOnly) image.save(buffer, "PNG") return ba.toBase64().data().decode() def is_json_string(self, string): try: if string.isdigit(): return False json_object = json.loads(string) return True except: return False def writeJson(self, path, table_name, data): try: data = common.ensure_serializable(data) if '_sa_instance_state' in data: del data['_sa_instance_state'] # 拼接完整的文件路径 file_path = os.path.join(path, table_name) # 检查路径是否存在,如果不存在,则创建路径 if not os.path.exists(file_path): os.makedirs(file_path) # 写入JSON文件 with open(os.path.join(file_path, f'{data["id"]}.json'), 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=4) except Exception as e: log.error(table_name,data,e) def writeZip(self, target_path, directory_path): # 创建ZIP文件并添加目录内容 with zipfile.ZipFile(target_path, 'w', zipfile.ZIP_DEFLATED) as zipf: # 递归添加目录和文件到ZIP文件中 self.add_dir_to_zip(directory_path, zipf, directory_path) def clearDir(self, path): shutil.rmtree(path) def createDir(self, path): if not os.path.exists(path): os.makedirs(path) def add_dir_to_zip(self, dir_path, zipf, parent_dir): for root, dirs, files in os.walk(dir_path): for file in files: file_path = os.path.join(root, file) # 使用os.path.relpath获取相对于parent_dir的路径作为归档名称 arcname = os.path.relpath(file_path, parent_dir) zipf.write(file_path, arcname=arcname) for subdir in dirs: subdir_path = os.path.join(root, subdir) # 递归调用add_dir_to_zip()函数,将空目录也添加到ZIP文件中 self.add_dir_to_zip(subdir_path, zipf, parent_dir) # 确保所有数据都是可序列化的 def ensure_serializable(self, obj): try: if 'script' in obj: # 删掉b'' if isinstance(obj['script'], bytes): obj['script'] = obj['script'].decode('utf-8') #如果存在_sa_instance_state 则删除 if "attr" in obj: obj["attr"] = obj["attr"] and json.loads(obj["attr"]) or {} if "interface" in obj: obj["interface"] = obj["interface"] and json.loads(obj["interface"]) or [] if isinstance(obj, dict): return {k: self.ensure_serializable(v) for k, v in obj.items()} elif isinstance(obj, list): return [self.ensure_serializable(v) for v in obj] elif isinstance(obj, bytes): return obj.decode('utf-8') # 将bytes转换为字符串 else: return obj except Exception as e: return obj # 导出表结构和数据到SQL文件 def export_table_structure_and_data(db_path, table_name): # 连接到SQLite数据库 conn = sqlite3.connect(db_path) cursor = conn.cursor() output_file = f"{table_name}.sql" try: # 打开文件准备写入 with open(output_file, 'w', encoding='utf-8') as sql_file: sql_file.write("PRAGMA foreign_keys = false;\n") sql_file.write(f"DROP TABLE IF EXISTS \"{table_name}\";\n") # 写入表结构 cursor.execute("SELECT sql FROM sqlite_master WHERE type='table' AND name=?", (table_name,)) for create_table_sql in cursor.fetchall(): sql_file.write(create_table_sql[0] + ";\n\n") # 写入表数据 cursor.execute(f"SELECT * FROM {table_name}") rows = cursor.fetchall() columns = [column[0] for column in cursor.description] for row in rows: formatted_values =", ".join( "'{}'".format(v.replace("'", "''") if isinstance(v, str) else v) for v in row ) sql_file.write(f"INSERT INTO {table_name} ({', '.join(columns)}) VALUES ({formatted_values});\n") # 写入与表相关的触发器 cursor.execute("SELECT sql FROM sqlite_master WHERE type='trigger' AND tbl_name=?", (table_name,)) triggers = cursor.fetchall() for trigger_sql in triggers: sql_file.write(trigger_sql[0] + ";\n") sql_file.write("PRAGMA foreign_keys = true;\n") except sqlite3.Error as e: print(f"SQLite error1: {e}") finally: # 关闭数据库连接 conn.close() # 使用示例 # export_table_structure_and_data('data.db', 'dev_model') # export_table_structure_and_data('data.db', 'device') # export_table_structure_and_data('data.db', 'instruction') # export_table_structure_and_data('data.db', 'task') # export_table_structure_and_data('data.db', 'task_group') # export_table_structure_and_data('data.db', 'task_instruction') # 导入表结构和数据 def import_sql_to_database(self,db_path, table_bame): # 连接到SQLite数据库 conn = sqlite3.connect(db_path) cursor = conn.cursor() sql_file_path = f"{table_bame}.sql" try: # 打开SQL文件 with open(sql_file_path, 'r', encoding='utf-8') as file: # 读取整个文件内容 sql_script = file.read() # 逐个处理SQL语句,这里假设每个语句以分号结束 # 使用conn的executescript方法来执行多条语句 try: conn.executescript(sql_script) except sqlite3.Error as e: # 如果执行失败,打印错误信息 print(f"SQLite error: {e}") return # 终止函数执行 # 提交事务 conn.commit() print("SQL file imported successfully.") except FileNotFoundError: print(f"The file {sql_file_path} was not found.") except Exception as e: print(f"An unexpected error occurred: {e}") finally: # 无论是否发生异常,都确保关闭数据库连接 conn.close() common = Common()