TG-PlatformPlus/common.py

949 lines
38 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/opt/homebrew/bin/python3
# -*- coding:utf-8 -*-
import re
import shutil
import zipfile
import hashlib
import json
import sqlite3
import os
from dog import dog
from dog import suDog
from markdown import markdown
from PyQt6.QtCore import *
from logs import log
from PyQt6.QtGui import *
from PyQt6.QtWidgets import *
from PyQt6.QtSerialPort import QSerialPortInfo
from configparser import ConfigParser
from config import config
from influxDB import influxdb
from globals import _G
from devmodelModel.devModelManager import devModelManager
from deviceModel.deviceManager import deviceManager
from userModel.userManager import userManager
from taskModel.taskManager import taskManager
from instructionModel.instructionManager import instructionManager
class QxStandardItem(QStandardItem):
def __init__(self, text="", max_length=200, *args):
super().__init__(*args)
self.max_length = max_length
self.setText(text)
def setText(self, text):
super().setText(text)
# 检查文本长度并设置工具提示
if len(text) > self.max_length:
self.setToolTip(text) # 设置工具提示为完整文本
else:
self.setToolTip("") # 清空工具提示
class AlignDelegate(QStyledItemDelegate):
def __init__(self, view):
super(AlignDelegate, self).__init__()
self.view = view
def paint(self, painter, option, index):
# 设置文本对齐方式为右对齐和垂直居中
option.displayAlignment = Qt.AlignmentFlag.AlignLeft | Qt.AlignmentFlag.AlignVCenter
# 获取单元格的文本
text = index.model().data(index, role=Qt.ItemDataRole.DisplayRole)
# 创建一个新的 QStyleOptionViewItem因为我们不能直接修改 option.text
option_text = QStyleOptionViewItem()
option_text.rect = option.rect
option_text.state = option.state
option_text.text = text
option_text.displayAlignment = option.displayAlignment
# 使用 QPainter 绘制文本,这里我们不直接使用 option 来绘制,因为我们想要自定义文本的绘制
painter.setFont(option.font)
# 14 像素的缩进量
fm = QFontMetricsF(option.font)
xIndent = 14 # 缩进量,单位为像素
# Access the selection model from the view
selection_model = self.view.selectionModel()
# if not index.flags() & Qt.ItemFlag.ItemIsEditable: # 判断 item 是否可编辑
# painter.setPen(QColor("#BFBFBF"))
# else:
# Check if the index is selected using the selection model
if selection_model.isSelected(index):
# 设置选中状态下的文本颜色
painter.setPen(option.palette.highlightedText().color())
else:
# 设置未选中状态下的文本颜色
painter.setPen(option.palette.text().color())
rect = QRectF(option.rect.left() + xIndent, option.rect.top(), option.rect.width() - xIndent, option.rect.height())
elidedText = painter.fontMetrics().elidedText(text, Qt.TextElideMode.ElideRight, int(rect.width()))
painter.drawText(rect, option.displayAlignment, elidedText)
class AlignDevDelegate(QStyledItemDelegate):
def __init__(self, view, hideCloumn = 1, devCloumn = 3):
super(AlignDevDelegate, self).__init__()
self.view = view
self.hideCloumn = hideCloumn
self.devCloumn = devCloumn
def paint(self, painter, option, index):
if index.column() == self.hideCloumn: # 只对第一列进行居中设置
option.displayAlignment = Qt.AlignmentFlag.AlignCenter | Qt.AlignmentFlag.AlignVCenter
else:
option.displayAlignment = Qt.AlignmentFlag.AlignLeft | Qt.AlignmentFlag.AlignVCenter
# 以下是你的原始代码
text = index.model().data(index, role=Qt.ItemDataRole.DisplayRole)
option_text = QStyleOptionViewItem()
option_text.rect = option.rect
option_text.state = option.state
option_text.text = text
option_text.displayAlignment = option.displayAlignment
xIndent = 2 # 缩进量,单位为像素
painter.setFont(option.font)
fm = QFontMetricsF(option.font)
selection_model = self.view.selectionModel()
icontype = index.sibling(index.row(), 3).data(Qt.ItemDataRole.DisplayRole)
if index.column() == self.devCloumn:
if len(text) <= 0:
#改变背景色rgba(255, 255, 255, 0)
#画红色框
painter.setPen(QColor(255, 0, 0))
painter.setBrush(QColor(255, 0, 0, 0))
#画缩小2px的矩形
painter.drawRect(option.rect.adjusted(0, 6, -2, -6))
if selection_model.isSelected(index):
painter.setPen(option.palette.highlightedText().color())
else:
painter.setPen(option.palette.text().color())
rect = QRectF(option.rect.left() + xIndent, option.rect.top(), option.rect.width() - xIndent, option.rect.height())
elidedText = painter.fontMetrics().elidedText(text, Qt.TextElideMode.ElideRight, int(rect.width()))
painter.drawText(rect, option.displayAlignment, elidedText)
if index.column() == 3:
if icontype == "任务":
icon = QIcon(":/resource/child.png") # 替换为你的图标路径
icon_rect = QRectF(option.rect.left() + 25, option.rect.top(), 20, option.rect.height()) # 图标的位置和大小
icon.paint(painter, icon_rect.toRect())
class AlignNoDelegate(QStyledItemDelegate):
def __init__(self, view, hideCloumn = 1):
super(AlignNoDelegate, self).__init__()
self.view = view
self.hideCloumn = hideCloumn
def paint(self, painter, option, index):
if index.column() == self.hideCloumn: # 只对第一列进行居中设置
option.displayAlignment = Qt.AlignmentFlag.AlignCenter | Qt.AlignmentFlag.AlignVCenter
else:
option.displayAlignment = Qt.AlignmentFlag.AlignLeft | Qt.AlignmentFlag.AlignVCenter
# 以下是你的原始代码
text = index.model().data(index, role=Qt.ItemDataRole.DisplayRole)
option_text = QStyleOptionViewItem()
option_text.rect = option.rect
option_text.state = option.state
option_text.text = text
option_text.displayAlignment = option.displayAlignment
xIndent = 2 # 缩进量,单位为像素
painter.setFont(option.font)
fm = QFontMetricsF(option.font)
selection_model = self.view.selectionModel()
if selection_model.isSelected(index):
painter.setPen(option.palette.highlightedText().color())
else:
painter.setPen(option.palette.text().color())
rect = QRectF(option.rect.left() + xIndent, option.rect.top(), option.rect.width() - xIndent, option.rect.height())
elidedText = painter.fontMetrics().elidedText(text, Qt.TextElideMode.ElideRight, int(rect.width()))
painter.drawText(rect, option.displayAlignment, elidedText)
class AlignTaskNoDelegate(QStyledItemDelegate):
def __init__(self, view, hideCloumn = 1):
super(AlignTaskNoDelegate, self).__init__()
self.view = view
self.hideCloumn = hideCloumn
def paint(self, painter, option, index):
if index.column() == self.hideCloumn: # 只对第一列进行居中设置
option.displayAlignment = Qt.AlignmentFlag.AlignCenter | Qt.AlignmentFlag.AlignVCenter
else:
option.displayAlignment = Qt.AlignmentFlag.AlignLeft | Qt.AlignmentFlag.AlignVCenter
# 以下是你的原始代码
text = index.model().data(index, role=Qt.ItemDataRole.DisplayRole)
option_text = QStyleOptionViewItem()
option_text.rect = option.rect
option_text.state = option.state
option_text.text = text
option_text.displayAlignment = option.displayAlignment
painter.setFont(option.font)
fm = QFontMetricsF(option.font)
icontype = index.sibling(index.row(), 8).data(Qt.ItemDataRole.DisplayRole)
selection_model = self.view.selectionModel()
if selection_model.isSelected(index):
painter.setPen(option.palette.highlightedText().color())
else:
painter.setPen(option.palette.text().color())
rect = QRectF(option.rect.left(), option.rect.top(), option.rect.width() , option.rect.height())
elidedText = painter.fontMetrics().elidedText(text, Qt.TextElideMode.ElideRight, int(rect.width()))
if index.column() == self.hideCloumn: # 只对第一列进行居中设置
painter.drawText(rect, option.displayAlignment, elidedText)
else:
rect = QRectF(option.rect.left(), option.rect.top(), option.rect.width()-22 , option.rect.height())
painter.drawText(rect, option.displayAlignment, elidedText)
if index.column() == 2:
if icontype == "parent":
icon = QIcon(":/resource/parent.png") # 替换为你的图标路径
icon_rect = QRectF(option.rect.right() - 22, option.rect.top(), 20, option.rect.height()) # 图标的位置和大小
icon.paint(painter, icon_rect.toRect())
elif icontype == "child":
icon = QIcon(":/resource/child.png") # 替换为你的图标路径
icon_rect = QRectF(option.rect.right() - 22, option.rect.top(), 20, option.rect.height()) # 图标的位置和大小
icon.paint(painter, icon_rect.toRect())
class DragDropInfo:
def __init__(self, source_index, target_index):
self.source_index = source_index
self.target_index = target_index
class MarkdownViewer(QDialog):
def __init__(self):
super().__init__()
self.initUI()
def initUI(self):
self.setWindowTitle("帮助文档")
self.setGeometry(685, 60, 800, 900)
self.textEdit = QTextEdit()
self.textEdit.setReadOnly(True)
self.textEdit.setAcceptRichText(True)
layout = QVBoxLayout()
layout.addWidget(self.textEdit)
self.setLayout(layout)
def showMarkdown(self, markdown_file):
self.fileName = markdown_file
helpPath = os.path.join(common.appPath, self.fileName)
with open(helpPath, 'r',encoding='utf-8') as file:
self.setMarkdown(file.read())
self.exec()
def setFileName(self, fileName):
self.fileName = fileName
helpPath = os.path.join(self.appPath, self.fileName)
with open(helpPath, 'r',encoding='utf-8') as file:
self.setMarkdown(file.read())
def setMarkdown(self, markdown_text):
html_text = markdown(markdown_text)
self.textEdit.setHtml(html_text)
class TaskProgressBar(QProgressBar):
def __init__(self, taskId= "",parent=None):
super(TaskProgressBar, self).__init__(parent)
self.taskId = taskId
self.setAlignment(Qt.AlignmentFlag.AlignCenter)
self.setValue(0)
self.setTextVisible(False)
def updateProgress(self, taskId, value, maxValue):
if self.taskId != taskId:
return
if maxValue == 0:
value = 0
self.setTextVisible(True)
if maxValue == -1:
self.setTextVisible(False)
maxValue = value
self.setMaximum(maxValue)
self.setValue(value)
class Alert(QMessageBox):
def __init__(self, parent=None):
super(Alert, self).__init__(parent)
self.setIcon(QMessageBox.Icon.Information)
self.setWindowFlags(self.windowFlags() | Qt.WindowType.WindowStaysOnTopHint)
self.setStandardButtons(QMessageBox.StandardButton.Ok)
def event(self, event):
if event.type() == QEvent.Type.WindowDeactivate:
# 在这里处理QMessageBox失去焦点事件
self.close()
return super().event(event)
def showMsg(self, title, msg, time):
self.setWindowTitle(title)
self.setText(msg)
self.show()
if time != 0:
timer = QTimer(self)
timer.singleShot(time, self.close)
def onWindowClose(self):
self.close()
class Scanf(QInputDialog):
scanfFinish = pyqtSignal(str)
def __init__(self, parent=None):
super(Scanf, self).__init__(parent)
self.setWindowFlags(self.windowFlags() | Qt.WindowType.WindowStaysOnTopHint)
self.setInputMode(QInputDialog.InputMode.TextInput)
self.setOkButtonText("确定")
self.setCancelButtonText("取消")
self.setWindowFlags(Qt.WindowType.WindowCloseButtonHint)
self.accepted.connect(self.sendScanf)
self.rejected.connect(self.sendEmputy)
def event(self, event):
if event.type() == QEvent.Type.WindowDeactivate:
self.close()
return super().event(event)
def sendScanf(self):
self.scanfFinish.emit(self.textValue())
def sendEmputy(self):
self.scanfFinish.emit("")
def showMsg(self, title, msg, value):
self.setWindowTitle(title)
self.setLabelText(msg)
self.setTextValue(value)
self.show()
def onWindowClose(self):
self.close()
# QComboBox禁用鼠标滚轮
class NoWheelComboBox(QComboBox):
def __init__(self, parent=None):
super().__init__(parent)
def wheelEvent(self, event):
return
class Common(QObject):
sig_appChanged = pyqtSignal()
sig_changeQml = pyqtSignal(str)
sig_unChecked = pyqtSignal()
sig_changeMainQml = pyqtSignal(str)
showAlert = pyqtSignal(str, str, int)
showScanf = pyqtSignal(str, str, str)
scanfFinish = pyqtSignal(str)
appReady = pyqtSignal(str)
registQml = pyqtSignal(str,str)
windowClose = pyqtSignal()
def __init__(self):
super().__init__()
self.scriptFile = dict()
self.appList = []
self.appPath = os.path.dirname(__file__)
self.fstPath = ""
if self.fstPath == "":
self.fstPath = "qml"
if "_internal" in self.appPath:
log.error("存在 _internal")
self.fstPath = "_internal/qml"
else:
log.error("不存在 _internal")
self.emputyProId = "00000000-0000-0000-0000-000000000000"
self.defaultGroupId = "00000000-0000-0000-0000-000000000000"
self.systemInterfaceUuid = "00000000-0000-0000-0000-000000000000"
self.noChooseUuid = "0"
self.scanfResult = ""
self.qscript = {}
self.setSriptFile()
self.updateAppList()
self.showAlert.connect(self.onShowAlert)
self.showScanf.connect(self.onShowScanf)
def onWindowClose(self):
self.windowClose.emit()
@pyqtSlot(result=QVariant)
def getConfig(self):
return config.data
@pyqtSlot(result=str)
def getCurrentUser(self):
currentUser = userManager.getCurrentUser()
if "name" in currentUser:
return currentUser["name"]
else:
return ""
@pyqtSlot(result=QVariant)
def getAllG(self):
return _G.getall()
@pyqtSlot(str, result=QVariant)
def getG(self,name):
return _G.get(name)
@pyqtSlot(str,str, result=bool)
def setG(self, name, json_str):
value = json.loads(json_str)
_G.set(name, value)
return True
@pyqtSlot(result=str)
def firstPath(self):
return self.fstPath
def resizeDefTableView(self, tableView, indexs):
itemWidth = int( (tableView.width()) / len(indexs) )
for index in indexs:
tableView.setColumnWidth(index, itemWidth)
def resizeTableView(self, tableView, indexs):
itemWidth = int( (tableView.width()) / len(indexs) )
for index in indexs:
tableView.setColumnWidth(index, itemWidth)
def resizeNumberTableView(self, tableView, indexs):
totalColumns = len(indexs)
scollbarWidth = 0
if tableView.verticalScrollBar().isVisible():
scollbarWidth = tableView.verticalScrollBar().width() # Subtract the fixed width of the first column
totalWidth = tableView.width() - tableView.verticalHeader().width() - scollbarWidth # Subtract the fixed width of the first column
avgWidth = int( totalWidth / totalColumns ) # Calculate the average width for the remaining columns
for index in indexs:
tableView.setColumnWidth(index, avgWidth) # Set the average width for each column in the index list
def resizeNoTableView(self, tableView, indexs, colIndex = 1):
totalColumns = len(indexs)
totalWidth = tableView.width() - tableView.columnWidth(colIndex) # Subtract the fixed width of the first column
avgWidth = int( totalWidth / (totalColumns - 1) ) # Calculate the average width for the remaining columns
for index in indexs:
tableView.setColumnWidth(index, avgWidth) # Set the average width for each column in the index list
def onShowAlert(self, title, msg, time):
self.alert = Alert()
self.windowClose.connect(self.alert.onWindowClose)
self.alert.showMsg(title, msg, time)
def onShowScanf(self, title, msg, value):
self.scanf = Scanf()
self.windowClose.connect(self.scanf.onWindowClose)
self.scanf.scanfFinish.connect(self.onScanfFinish)
self.scanf.showMsg(title, msg, value)
def onScanfFinish(self, msg):
self.scanfResult = msg
@pyqtSlot(str,str,str,str, result=bool)
def initAppScript(self, appId, instancName, scriptFile, plusFile="plus"):
try:
if dog.isDogAlive() == 1 or suDog.isDogAlive() == 1:
scriptPath = os.path.join(self.appPath,"qml",plusFile,appId,"script", scriptFile)
with open(scriptPath, "r") as f:
script = f.read()
exec(script, self.qscript)
if self.qscript != {}:
self.appReady.emit(instancName)
return True
return False
except Exception as e:
print(e)
return False
@pyqtSlot(str,str,str,str,str, result=bool)
def registAppQml(self, appId, instancName, className, scriptFile, plusFile="plus"):
try:
if dog.isDogAlive() == 1 or suDog.isDogAlive() == 1:
scriptPath = os.path.join(self.appPath,"qml",plusFile,appId,"script", scriptFile)
with open(scriptPath, "r") as f:
script = f.read()
exec(script, self.qscript)
if self.qscript != {}:
self.registQml.emit(instancName, className)
return True
return False
except Exception as e:
print(e)
return False
# 任务详细信息
def getTaskDetails(self, taskInstructions):
for taskInstruction in taskInstructions:
if taskInstruction["target_type"] == "instruction":
target_param = taskInstruction["target_param"] # 指令参数
instructionInfo = instructionManager.getInfo(str(taskInstruction["target_id"]))
instructionInfo["target_param"] = target_param
taskInstruction["instructionInfo"] = instructionInfo #指令
device_id = taskInstruction["device_id"]
interfaceIndex = taskInstruction["interface_index"]
if interfaceIndex == "" or interfaceIndex == None:
interfaceIndex = 0
else:
interfaceIndex = int(interfaceIndex)
device = deviceManager.getInfo(device_id)
if device != None:
devInterface = device["interface"]
dev_model_id = device["dev_model_id"]
devModel = devModelManager.getInfo(dev_model_id)
devInterfaceKeys = []
if devModel != None:
attr_json_dict = devModel["attr"]
devmodelInterfacetypes = {}
if "interfaceType" in attr_json_dict:
devmodelInterfacetypes = attr_json_dict["interfaceType"]
devInterfaceKeys = list(devmodelInterfacetypes.keys())
if(interfaceIndex < len(devInterface)):
if (len(devInterfaceKeys) > interfaceIndex):
interfaceName = devInterfaceKeys[interfaceIndex]
interface = devInterface[interfaceIndex]
device["interfaceName"] = interfaceName #接口名称
taskInstruction["interface"] = interface #接口
taskInstruction["device"] = device #设备
elif taskInstruction["target_type"] == "task":
task_id = taskInstruction["target_id"]
taskInfo = taskManager.getInfo(str(task_id))
taskInstruction["taskInfo"] = taskInfo #任务
def getInstructionDetail(self, instructionId, attr):
try:
taskInstruction = {
"target_type": "instruction",
"target_param": attr["param"],
"target_id": instructionId
}
deviceId = attr["deviceId"]
interfaceIndex = int(attr["interfaceIndex"])
instructionInfo = instructionManager.getInfo(str(instructionId))
taskInstruction["instructionInfo"] = instructionInfo #指令
device = deviceManager.getInfo(deviceId)
if device != None:
devInterface = device["interface"]
dev_model_id = device["dev_model_id"]
devModel = devModelManager.getInfo(dev_model_id)
devInterfaceKeys = []
if devModel != None:
attr_json_dict = devModel["attr"]
devmodelInterfacetypes = {}
if "interfaceType" in attr_json_dict:
devmodelInterfacetypes = attr_json_dict["interfaceType"]
devInterfaceKeys = list(devmodelInterfacetypes.keys())
if(interfaceIndex < len(devInterface)):
if (len(devInterfaceKeys) > interfaceIndex):
interfaceName = devInterfaceKeys[interfaceIndex]
interface = devInterface[interfaceIndex]
device["interfaceName"] = interfaceName
taskInstruction["interface"] = interface #接口
taskInstruction["device"] = device #设备
else:
return None
return taskInstruction
except Exception as e:
print(e)
return None
#日志等级转换
def level2Search(self, level):
if level == "DEBUG":
return "0"
elif level == "INFO":
return "01"
elif level == "WARNING":
return "012"
elif level == "ERROR":
return "0123"
@pyqtSlot(str)
def changeQml(self, qmlfile):
self.sig_changeQml.emit(qmlfile)
def changeMainQml(self, qmlfile):
self.sig_changeMainQml.emit(qmlfile)
#显示隐藏App
@pyqtSlot(str,bool, result=bool)
def showApp(self, id, show):
_path = os.path.join(self.appPath, "qml", "plus")
try:
appDir = os.path.join(_path,id)
with open(os.path.join(appDir,'info'), 'r', encoding="utf-8") as file:
info = file.read()
info = json.loads(info)
info["General"]["Show"] = show
with open(os.path.join(appDir,'info'), 'w', encoding="utf-8") as file:
json.dump(info, file, indent=4)
self.sig_appChanged.emit()
return True
except Exception as e:
print(f"Error showApp: {e}")
return False
# 卸载App
@pyqtSlot(str, result=bool)
def deleteApp(self, id):
_path = os.path.join(self.appPath, "qml", "plus")
try:
appDir = os.path.join(_path, id)
shutil.rmtree(appDir, ignore_errors=False, onerror=None)
self.sig_appChanged.emit()
return True
except Exception as e:
print(f"Error deleteApp: {e}")
return False
# 安装App
@pyqtSlot(str, result=bool)
def installApp(self, filePath):
_path = os.path.join(self.appPath, "qml", "plus")
filePath = self.appfilePath(filePath)
# 检查文件是否存在
if not os.path.isfile(filePath):
print(f"File does not exist: {filePath}")
return False
# 检查文件是否可读
try:
with open(filePath, "rb") as file:
file.read()
except FileNotFoundError:
print(f"File not found: {filePath}")
return False
except PermissionError:
print(f"Permission denied: {filePath}")
return False
# 解压文件
try:
# 获取解压目录名
zipJson = self.zipJson(filePath)
# 获取配置项的值
appId = zipJson["General"]["AppId"]
appName = zipJson["General"]["Name"]
# extracted_dirname , ext = os.path.splitext(filePath)
appDir = os.path.join(_path,appId)
os.makedirs( appDir, exist_ok=True)
shutil.unpack_archive(filePath, appDir, "zip")
os.remove(filePath)
if not self.compare_md5_txt(appDir):
print("md5 校验失败")
shutil.rmtree(appDir, ignore_errors=False, onerror=None)
return False
self.sig_appChanged.emit()
return True
except Exception as e:
print(f"Error unpacking archive: {e}")
return False
@pyqtSlot()
def updateAppList(self):
_path = os.path.join(self.appPath, "qml", "plus")
self.appList = []
# 遍历目录下的所有子目录
for root, dirs, files in os.walk(_path):
# 遍历当前目录下的所有文件找到info文件
for file in files:
if file == 'info':
# 获取info文件的路径
file_path = os.path.join(root, file)
# 读取文件内容
with open(file_path, 'r', encoding="utf-8") as f:
content = f.read()
info = json.loads(content)
# 将文件内容保存到列表中
self.appList.append( {
"id": info["General"]["AppId"],
"info": info,
"show": info["General"]["Show"]
} )
# # 将app_list列表转换为JSON文件
# with open(os.path.join(_path,'apps.json'), 'w', encoding="utf-8") as f:
# json.dump(self.appList, f, indent=4)
@pyqtSlot(result=list)
def appList(self):
return self.appList
@pyqtSlot(result=str)
def getSystemInterfaceUuid(self):
return self.systemInterfaceUuid
#获取校验脚本
@pyqtSlot(result=list)
def getCheckFileName(self):
try:
keys = self.scriptFile.keys()
result = []
for key in keys:
if "check_" in key:
result.append(key)
return result
except Exception as e:
log.error(e)
return None
#获取全部脚本
@pyqtSlot(result=list)
def getAllFileNames(self):
try:
return list(self.scriptFile.keys())
except Exception as e:
log.error(e)
return None
# 获取串口列表
@pyqtSlot(result=list)
def getComList(self):
comList = list()
for a in QSerialPortInfo.availablePorts():
comList.append(a.portName())
return comList
def calculate_file_md5(self, file_path):
# 计算给定文件的MD5值。
md5 = hashlib.md5()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
md5.update(chunk)
return md5.hexdigest()
def calculate_directory_md5(self, directory_path):
#计算给定目录下除md5.txt文件以外的全部文件的MD5值。
md5_values = {}
with os.scandir(directory_path) as it:
for entry in it:
if entry.is_file() and entry.name != "md5.txt":
file_md5 = self.calculate_file_md5(entry.path)
md5_values[entry.name] = file_md5
elif entry.is_dir():
sub_directory_md5 = self.calculate_directory_md5(entry.path)
md5_values[entry.name] = sub_directory_md5
return md5_values
def compare_md5_txt(self, directory_path):
#读取md5.txt文件中的内容与calculate_directory_md5函数返回的字典进行比较。
with open(os.path.join(directory_path, "md5.txt"), "r") as f:
md5_txt = f.read()
directory_md5 = self.calculate_directory_md5(directory_path)
json_str = json.dumps(directory_md5, indent=4, sort_keys=True)
# print(json_str)
md5_str = hashlib.md5(json_str.encode("utf-8")).hexdigest()
# print(md5_str)
# print(md5_txt)
if md5_str == md5_txt:
return True
else:
return False
def appfilePath(self, path):
_path = os.path.join(self.appPath, "qml", "plus")
path = path.replace("file:///", "")
filename = os.path.basename(path)
filename, ext = os.path.splitext(filename)
new_path = os.path.join(_path , filename +".zip")
shutil.copyfile(path, new_path)
return new_path
def zipJson(self, zipPath):
# 打开zip文件
with zipfile.ZipFile(zipPath, "r") as zip_ref:
with zip_ref.open("info") as f:
info_txt = f.read()
return json.loads(info_txt)
def setSriptFile(self, conf = None):
_path = os.path.join(self.appPath, "scripts")
if conf:
_path = conf["path"]
for file_name in os.listdir(_path):
file_path = os.path.join(_path, file_name)
if os.path.isfile(file_path):
with open(file_path, "r", encoding="utf-8") as file:
self.scriptFile[file_name] = file.read()
return self.scriptFile
def parseCodecPys(self, s):
try:
result = re.search(r"codec_(\w+).pys", s)
if result:
return result.group(1)
else:
return ""
except:
return ""
def parseCheckPys(self, s):
try:
result = re.search(r"check_(\w+).pys", s)
if result:
return result.group(1)
else:
return ""
except:
return ""
# base64转Pixmap
def base64ToPixmap(self, base64_str):
return QPixmap.fromImage(QImage.fromData(QByteArray.fromBase64(base64_str.encode("utf-8"))))
#QImage转base64
def base64Data(self, image):
ba = QByteArray()
buffer = QBuffer(ba)
buffer.open(QIODevice.OpenModeFlag.WriteOnly)
image.save(buffer, "PNG")
return ba.toBase64().data().decode()
def is_json_string(self, string):
try:
if string.isdigit():
return False
json_object = json.loads(string)
return True
except:
return False
def writeJson(self, path, table_name, data):
try:
data = common.ensure_serializable(data)
if '_sa_instance_state' in data:
del data['_sa_instance_state']
# 拼接完整的文件路径
file_path = os.path.join(path, table_name)
# 检查路径是否存在,如果不存在,则创建路径
if not os.path.exists(file_path):
os.makedirs(file_path)
# 写入JSON文件
with open(os.path.join(file_path, f'{data["id"]}.json'), 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=4)
except Exception as e:
log.error(table_name,data,e)
def writeZip(self, target_path, directory_path):
# 创建ZIP文件并添加目录内容
with zipfile.ZipFile(target_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
# 递归添加目录和文件到ZIP文件中
self.add_dir_to_zip(directory_path, zipf, directory_path)
def clearDir(self, path):
shutil.rmtree(path)
def createDir(self, path):
if not os.path.exists(path):
os.makedirs(path)
def add_dir_to_zip(self, dir_path, zipf, parent_dir):
for root, dirs, files in os.walk(dir_path):
for file in files:
file_path = os.path.join(root, file)
# 使用os.path.relpath获取相对于parent_dir的路径作为归档名称
arcname = os.path.relpath(file_path, parent_dir)
zipf.write(file_path, arcname=arcname)
for subdir in dirs:
subdir_path = os.path.join(root, subdir)
# 递归调用add_dir_to_zip()函数将空目录也添加到ZIP文件中
self.add_dir_to_zip(subdir_path, zipf, parent_dir)
# 确保所有数据都是可序列化的
def ensure_serializable(self, obj):
try:
if 'script' in obj:
# 删掉b''
if isinstance(obj['script'], bytes):
obj['script'] = obj['script'].decode('utf-8')
#如果存在_sa_instance_state 则删除
if "attr" in obj:
obj["attr"] = obj["attr"] and json.loads(obj["attr"]) or {}
if "interface" in obj:
obj["interface"] = obj["interface"] and json.loads(obj["interface"]) or []
if isinstance(obj, dict):
return {k: self.ensure_serializable(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [self.ensure_serializable(v) for v in obj]
elif isinstance(obj, bytes):
return obj.decode('utf-8') # 将bytes转换为字符串
else:
return obj
except Exception as e:
return obj
# 导出表结构和数据到SQL文件
def export_table_structure_and_data(db_path, table_name):
# 连接到SQLite数据库
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
output_file = f"{table_name}.sql"
try:
# 打开文件准备写入
with open(output_file, 'w', encoding='utf-8') as sql_file:
sql_file.write("PRAGMA foreign_keys = false;\n")
sql_file.write(f"DROP TABLE IF EXISTS \"{table_name}\";\n")
# 写入表结构
cursor.execute("SELECT sql FROM sqlite_master WHERE type='table' AND name=?", (table_name,))
for create_table_sql in cursor.fetchall():
sql_file.write(create_table_sql[0] + ";\n\n")
# 写入表数据
cursor.execute(f"SELECT * FROM {table_name}")
rows = cursor.fetchall()
columns = [column[0] for column in cursor.description]
for row in rows:
formatted_values =", ".join(
"'{}'".format(v.replace("'", "''") if isinstance(v, str) else v) for v in row
)
sql_file.write(f"INSERT INTO {table_name} ({', '.join(columns)}) VALUES ({formatted_values});\n")
# 写入与表相关的触发器
cursor.execute("SELECT sql FROM sqlite_master WHERE type='trigger' AND tbl_name=?", (table_name,))
triggers = cursor.fetchall()
for trigger_sql in triggers:
sql_file.write(trigger_sql[0] + ";\n")
sql_file.write("PRAGMA foreign_keys = true;\n")
except sqlite3.Error as e:
print(f"SQLite error1: {e}")
finally:
# 关闭数据库连接
conn.close()
# 使用示例
# export_table_structure_and_data('data.db', 'dev_model')
# export_table_structure_and_data('data.db', 'device')
# export_table_structure_and_data('data.db', 'instruction')
# export_table_structure_and_data('data.db', 'task')
# export_table_structure_and_data('data.db', 'task_group')
# export_table_structure_and_data('data.db', 'task_instruction')
# 导入表结构和数据
def import_sql_to_database(self,db_path, table_bame):
# 连接到SQLite数据库
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
sql_file_path = f"{table_bame}.sql"
try:
# 打开SQL文件
with open(sql_file_path, 'r', encoding='utf-8') as file:
# 读取整个文件内容
sql_script = file.read()
# 逐个处理SQL语句这里假设每个语句以分号结束
# 使用conn的executescript方法来执行多条语句
try:
conn.executescript(sql_script)
except sqlite3.Error as e:
# 如果执行失败,打印错误信息
print(f"SQLite error: {e}")
return # 终止函数执行
# 提交事务
conn.commit()
print("SQL file imported successfully.")
except FileNotFoundError:
print(f"The file {sql_file_path} was not found.")
except Exception as e:
print(f"An unexpected error occurred: {e}")
finally:
# 无论是否发生异常,都确保关闭数据库连接
conn.close()
common = Common()