PCM_Viewer/config_manager.py

157 lines
5.0 KiB
Python

"""
配置管理模块
"""
from PyQt6.QtCore import QObject, pyqtSignal as Signal, pyqtProperty as Property, pyqtSlot as Slot
from typing import Optional
import json
import os
class ComponentConfig(QObject):
"""组件配置"""
configChanged = Signal()
imagePathChanged = Signal(str) # 单独的 imagePath 变化信号
def __init__(self, parent=None):
super().__init__(parent)
self._image_path: str = ""
self._influxdb_url: str = ""
self._influxdb_token: str = ""
self._influxdb_org: str = ""
self._influxdb_bucket: str = ""
self._query: str = ""
self._show_config: bool = True
def getImagePath(self):
return self._image_path
def setImagePath(self, value: str):
if self._image_path != value:
self._image_path = value
self.imagePathChanged.emit(value)
self.configChanged.emit()
imagePath = Property(str, getImagePath, setImagePath, notify=imagePathChanged)
def getInfluxdbUrl(self):
return self._influxdb_url
def setInfluxdbUrl(self, value: str):
if self._influxdb_url != value:
self._influxdb_url = value
self.configChanged.emit()
influxdbUrl = Property(str, getInfluxdbUrl, setInfluxdbUrl, notify=configChanged)
def getInfluxdbToken(self):
return self._influxdb_token
def setInfluxdbToken(self, value: str):
if self._influxdb_token != value:
self._influxdb_token = value
self.configChanged.emit()
influxdbToken = Property(str, getInfluxdbToken, setInfluxdbToken, notify=configChanged)
def getInfluxdbOrg(self):
return self._influxdb_org
def setInfluxdbOrg(self, value: str):
if self._influxdb_org != value:
self._influxdb_org = value
self.configChanged.emit()
influxdbOrg = Property(str, getInfluxdbOrg, setInfluxdbOrg, notify=configChanged)
def getInfluxdbBucket(self):
return self._influxdb_bucket
def setInfluxdbBucket(self, value: str):
if self._influxdb_bucket != value:
self._influxdb_bucket = value
self.configChanged.emit()
influxdbBucket = Property(str, getInfluxdbBucket, setInfluxdbBucket, notify=configChanged)
def getQuery(self):
return self._query
def setQuery(self, value: str):
if self._query != value:
self._query = value
self.configChanged.emit()
query = Property(str, getQuery, setQuery, notify=configChanged)
def getShowConfig(self):
return self._show_config
def setShowConfig(self, value: bool):
if self._show_config != value:
self._show_config = value
self.configChanged.emit()
showConfig = Property(bool, getShowConfig, setShowConfig, notify=configChanged)
def toDict(self) -> dict:
"""转换为字典"""
return {
"imagePath": self._image_path,
"influxdbUrl": self._influxdb_url,
"influxdbToken": self._influxdb_token,
"influxdbOrg": self._influxdb_org,
"influxdbBucket": self._influxdb_bucket,
"query": self._query,
"showConfig": self._show_config
}
def fromDict(self, data: dict):
"""从字典加载"""
self._image_path = data.get("imagePath", "")
self._influxdb_url = data.get("influxdbUrl", "")
self._influxdb_token = data.get("influxdbToken", "")
self._influxdb_org = data.get("influxdbOrg", "")
self._influxdb_bucket = data.get("influxdbBucket", "")
self._query = data.get("query", "")
self._show_config = data.get("showConfig", True)
class ConfigManager(QObject):
"""配置管理器"""
def __init__(self, parent=None):
super().__init__(parent)
self._config_file: str = "config.json"
@Slot('QObject*', result=bool)
def saveConfig(self, config) -> bool:
"""保存配置"""
try:
if not isinstance(config, ComponentConfig):
return False
data = config.toDict()
with open(self._config_file, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
return True
except Exception as e:
print(f"保存配置失败: {e}")
return False
@Slot('QObject*', result=bool)
def loadConfig(self, config) -> bool:
"""加载配置"""
try:
if not isinstance(config, ComponentConfig):
return False
if not os.path.exists(self._config_file):
return False
with open(self._config_file, 'r', encoding='utf-8') as f:
data = json.load(f)
config.fromDict(data)
return True
except Exception as e:
print(f"加载配置失败: {e}")
return False