PCM_Viewer/config_manager.py

157 lines
5.0 KiB
Python
Raw Normal View History

2026-02-06 22:49:52 +08:00
"""
配置管理模块
"""
from PyQt6.QtCore import QObject, pyqtSignal as Signal, pyqtProperty as Property, pyqtSlot as Slot
from typing import Optional
import json
import os
class ComponentConfig(QObject):
"""组件配置"""
configChanged = Signal()
imagePathChanged = Signal(str) # 单独的 imagePath 变化信号
def __init__(self, parent=None):
super().__init__(parent)
self._image_path: str = ""
self._influxdb_url: str = ""
self._influxdb_token: str = ""
self._influxdb_org: str = ""
self._influxdb_bucket: str = ""
self._query: str = ""
self._show_config: bool = True
def getImagePath(self):
return self._image_path
def setImagePath(self, value: str):
if self._image_path != value:
self._image_path = value
self.imagePathChanged.emit(value)
self.configChanged.emit()
imagePath = Property(str, getImagePath, setImagePath, notify=imagePathChanged)
def getInfluxdbUrl(self):
return self._influxdb_url
def setInfluxdbUrl(self, value: str):
if self._influxdb_url != value:
self._influxdb_url = value
self.configChanged.emit()
influxdbUrl = Property(str, getInfluxdbUrl, setInfluxdbUrl, notify=configChanged)
def getInfluxdbToken(self):
return self._influxdb_token
def setInfluxdbToken(self, value: str):
if self._influxdb_token != value:
self._influxdb_token = value
self.configChanged.emit()
influxdbToken = Property(str, getInfluxdbToken, setInfluxdbToken, notify=configChanged)
def getInfluxdbOrg(self):
return self._influxdb_org
def setInfluxdbOrg(self, value: str):
if self._influxdb_org != value:
self._influxdb_org = value
self.configChanged.emit()
influxdbOrg = Property(str, getInfluxdbOrg, setInfluxdbOrg, notify=configChanged)
def getInfluxdbBucket(self):
return self._influxdb_bucket
def setInfluxdbBucket(self, value: str):
if self._influxdb_bucket != value:
self._influxdb_bucket = value
self.configChanged.emit()
influxdbBucket = Property(str, getInfluxdbBucket, setInfluxdbBucket, notify=configChanged)
def getQuery(self):
return self._query
def setQuery(self, value: str):
if self._query != value:
self._query = value
self.configChanged.emit()
query = Property(str, getQuery, setQuery, notify=configChanged)
def getShowConfig(self):
return self._show_config
def setShowConfig(self, value: bool):
if self._show_config != value:
self._show_config = value
self.configChanged.emit()
showConfig = Property(bool, getShowConfig, setShowConfig, notify=configChanged)
def toDict(self) -> dict:
"""转换为字典"""
return {
"imagePath": self._image_path,
"influxdbUrl": self._influxdb_url,
"influxdbToken": self._influxdb_token,
"influxdbOrg": self._influxdb_org,
"influxdbBucket": self._influxdb_bucket,
"query": self._query,
"showConfig": self._show_config
}
def fromDict(self, data: dict):
"""从字典加载"""
self._image_path = data.get("imagePath", "")
self._influxdb_url = data.get("influxdbUrl", "")
self._influxdb_token = data.get("influxdbToken", "")
self._influxdb_org = data.get("influxdbOrg", "")
self._influxdb_bucket = data.get("influxdbBucket", "")
self._query = data.get("query", "")
self._show_config = data.get("showConfig", True)
class ConfigManager(QObject):
"""配置管理器"""
def __init__(self, parent=None):
super().__init__(parent)
self._config_file: str = "config.json"
@Slot('QObject*', result=bool)
def saveConfig(self, config) -> bool:
"""保存配置"""
try:
if not isinstance(config, ComponentConfig):
return False
data = config.toDict()
with open(self._config_file, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
return True
except Exception as e:
print(f"保存配置失败: {e}")
return False
@Slot('QObject*', result=bool)
def loadConfig(self, config) -> bool:
"""加载配置"""
try:
if not isinstance(config, ComponentConfig):
return False
if not os.path.exists(self._config_file):
return False
with open(self._config_file, 'r', encoding='utf-8') as f:
data = json.load(f)
config.fromDict(data)
return True
except Exception as e:
print(f"加载配置失败: {e}")
return False