PCM_Report/config_model.py

393 lines
14 KiB
Python
Raw Normal View History

2025-12-11 14:32:31 +08:00
import json
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional
import re
@dataclass
class InfluxQueryConfig:
bucket: str = ""
measurement: str = ""
fields: List[str] = field(default_factory=list)
filters: Dict[str, str] = field(default_factory=dict)
timeRange: str = "-1h"
aggregate: str = ""
windowPeriod: str = "" # e.g. 1m
@dataclass
class TableOptions:
firstColumn: str = "time" # time | seconds
firstTitle: str = "" # custom header for the first column
titles: Dict[str, str] = field(default_factory=dict) # field -> title
@dataclass
class DbConnectionConfig:
engine: str = "mysql" # mysql | sqlserver | sqlite
host: str = "localhost"
port: int = 3306
database: str = ""
username: str = ""
password: str = ""
@dataclass
class PlaceholderConfig:
key: str
type: str # text | table | chart | cell | scriptTable | dbText
label: str = ""
title: str = ""
value: str = "" # for text
dbQuery: str = "" # for dbText: SQL query string
influx: Optional[InfluxQueryConfig] = None
chart: Dict[str, Any] = field(default_factory=dict)
table: TableOptions = field(default_factory=TableOptions)
grid: List[List[str]] = field(default_factory=list) # for manual cells
@dataclass
class InfluxGlobalConfig:
url: str = ""
org: str = ""
token: str = ""
username: str = "" # for UI login fallback
password: str = "" # for UI login fallback
landingUrl: str = "" # optional: post-login destination (full URL or path)
bucket: str = "" # InfluxDB bucket name for monitoring
measurement: str = "" # InfluxDB measurement name for monitoring
@dataclass
class ExperimentProcess:
"""实验流程表格数据"""
headers: List[str] = field(default_factory=list) # 表头
rows: List[List[str]] = field(default_factory=list) # 数据行
scriptFile: str = "" # Python脚本文件base64编码
scriptName: str = "" # 脚本文件名(用于显示)
remark: str = "" # 实验备注
# ===== 新增TCP Modbus 与设备阈值/报警配置 =====
@dataclass
class TcpModbusConfig:
ip: str = "127.0.0.1"
port: int = 502
@dataclass
class ConfigServiceSettings:
"""配置服务连接设置"""
host: str = "127.0.0.1"
port: int = 5000
configPath: str = "config.json"
@dataclass
class ThresholdConfig:
name: str = "" # 阈值名称,如 压力上限
register: int = 0 # 读取/比较的寄存器地址
regType: str = "holding" # 地址类型: holding | input | coil
value: float = 0.0 # 阈值
@dataclass
class DeviceConfig:
deviceName: str = "" # 设备名
thresholds: List[ThresholdConfig] = field(default_factory=list)
alarmRegister: int = 0 # 设备状态/报警等寄存器地址
alarmType: str = "holding" # 地址类型: holding | input | coil
@dataclass
class GlobalParametersConfig:
"""全局参数配置,用于存储可在报告模板中引用的变量"""
parameters: Dict[str, str] = field(default_factory=dict)
@dataclass
class InfluxGlobalConfigWrapper:
pass
@dataclass
class AppConfig:
influx: InfluxGlobalConfig = field(default_factory=InfluxGlobalConfig)
placeholders: Dict[str, PlaceholderConfig] = field(default_factory=dict)
experimentProcess: ExperimentProcess = field(default_factory=ExperimentProcess) # 实验流程
tcpModbus: TcpModbusConfig = field(default_factory=TcpModbusConfig)
devices: List[DeviceConfig] = field(default_factory=list)
configService: ConfigServiceSettings = field(default_factory=ConfigServiceSettings) # 配置服务设置
db: DbConnectionConfig = field(default_factory=DbConnectionConfig)
globalParameters: GlobalParametersConfig = field(default_factory=GlobalParametersConfig) # 全局参数
@staticmethod
def from_dict(data: Dict[str, Any]) -> "AppConfig":
influx_data = data.get("influx", {})
influx = InfluxGlobalConfig(
url=influx_data.get("url", ""),
org=influx_data.get("org", ""),
token=influx_data.get("token", ""),
username=influx_data.get("username", ""),
password=influx_data.get("password", ""),
landingUrl=influx_data.get("landingUrl", ""),
bucket=influx_data.get("bucket", ""),
measurement=influx_data.get("measurement", ""),
)
placeholders: Dict[str, PlaceholderConfig] = {}
for k, v in data.get("placeholders", {}).items():
influx_cfg = None
if "influx" in v and isinstance(v["influx"], dict):
inf = v["influx"]
influx_cfg = InfluxQueryConfig(
bucket=inf.get("bucket", ""),
measurement=inf.get("measurement", ""),
fields=inf.get("fields", []),
filters=inf.get("filters", {}),
timeRange=inf.get("timeRange", "-1h"),
aggregate=inf.get("aggregate", ""),
windowPeriod=inf.get("windowPeriod", ""),
)
table_opts = TableOptions()
if "table" in v and isinstance(v["table"], dict):
to = v["table"]
table_opts.firstColumn = to.get("firstColumn", table_opts.firstColumn)
table_opts.firstTitle = to.get("firstTitle", table_opts.firstTitle)
table_opts.titles = dict(to.get("titles", {}))
placeholders[k] = PlaceholderConfig(
key=k,
type=v.get("type", "text"),
label=v.get("label", k),
title=v.get("title", ""),
value=v.get("value", ""),
dbQuery=v.get("dbQuery", ""),
influx=influx_cfg,
chart=v.get("chart", {}),
table=table_opts,
grid=v.get("grid", []),
)
# 新增TCP Modbus
tcp = TcpModbusConfig(**data.get("tcpModbus", {}))
# 新增:设备列表
devices: List[DeviceConfig] = []
for d in data.get("devices", []):
ths: List[ThresholdConfig] = []
for t in d.get("thresholds", []):
ths.append(
ThresholdConfig(
name=t.get("name", ""),
register=int(t.get("register", 0)),
regType=t.get("regType", "holding"),
value=float(t.get("value", 0.0)),
)
)
devices.append(
DeviceConfig(
deviceName=d.get("deviceName", ""),
thresholds=ths,
alarmRegister=int(d.get("alarmRegister", 0)),
alarmType=d.get("alarmType", "holding"),
)
)
# 实验流程
exp_data = data.get("experimentProcess", {})
experiment_process = ExperimentProcess(
headers=exp_data.get("headers", []),
rows=exp_data.get("rows", []),
scriptFile=exp_data.get("scriptFile", ""),
scriptName=exp_data.get("scriptName", ""),
remark=exp_data.get("remark", "")
)
# 配置服务设置
config_service_data = data.get("configService", {})
config_service = ConfigServiceSettings(
host=config_service_data.get("host", "127.0.0.1"),
port=int(config_service_data.get("port", 5000)),
configPath=config_service_data.get("configPath", "config.json")
)
db_data = data.get("db", {})
db_conn = DbConnectionConfig(
engine=str(db_data.get("engine", "mysql") or "mysql"),
host=str(db_data.get("host", "localhost") or "localhost"),
port=int(db_data.get("port", 3306) or 3306),
database=str(db_data.get("database", "") or ""),
username=str(db_data.get("username", "") or ""),
password=str(db_data.get("password", "") or ""),
)
# 全局参数
global_params_data = data.get("globalParameters", {})
global_params = GlobalParametersConfig(
parameters=global_params_data.get("parameters", {})
)
return AppConfig(
influx=influx,
placeholders=placeholders,
experimentProcess=experiment_process,
tcpModbus=tcp,
devices=devices,
configService=config_service,
db=db_conn,
globalParameters=global_params,
)
def _natural_sorted_keys(self) -> List[str]:
def key_fn(k: str) -> tuple[str, int, str]:
m = re.match(r"([a-zA-Z]+)(\d+)$", k)
if m:
return (m.group(1), int(m.group(2)), k)
return (k, 10**9, k)
return sorted(self.placeholders.keys(), key=key_fn)
def to_dict(self) -> Dict[str, Any]:
result: Dict[str, Any] = {
"influx": {
"url": self.influx.url,
"org": self.influx.org,
"token": self.influx.token,
"username": self.influx.username,
"password": self.influx.password,
"landingUrl": self.influx.landingUrl,
"bucket": self.influx.bucket,
"measurement": self.influx.measurement,
},
"placeholders": {},
"tcpModbus": {
"ip": self.tcpModbus.ip,
"port": self.tcpModbus.port,
},
"devices": [
{
"deviceName": d.deviceName,
"alarmRegister": d.alarmRegister,
"alarmType": d.alarmType,
"thresholds": [
{
"name": t.name,
"register": t.register,
"regType": t.regType,
"value": t.value,
} for t in d.thresholds
]
} for d in self.devices
],
"db": {
"engine": self.db.engine,
"host": self.db.host,
"port": self.db.port,
"database": self.db.database,
"username": self.db.username,
"password": self.db.password,
},
}
for k in self._natural_sorted_keys():
ph = self.placeholders[k]
ph_dict: Dict[str, Any] = {
"type": ph.type,
"label": ph.label or k,
"title": ph.title,
"value": ph.value,
"dbQuery": ph.dbQuery,
"chart": ph.chart or {},
}
if ph.influx:
ph_dict["influx"] = {
"bucket": ph.influx.bucket,
"measurement": ph.influx.measurement,
"fields": ph.influx.fields,
"filters": ph.influx.filters,
"timeRange": ph.influx.timeRange,
"aggregate": ph.influx.aggregate,
"windowPeriod": ph.influx.windowPeriod,
}
if ph.type == "table":
ph_dict["table"] = {
"firstColumn": ph.table.firstColumn,
"firstTitle": ph.table.firstTitle,
"titles": ph.table.titles,
}
if ph.type == "cell":
ph_dict["grid"] = ph.grid
result["placeholders"][k] = ph_dict
# 实验流程
result["experimentProcess"] = {
"headers": self.experimentProcess.headers,
"rows": self.experimentProcess.rows,
"scriptFile": self.experimentProcess.scriptFile,
"scriptName": self.experimentProcess.scriptName,
"remark": self.experimentProcess.remark
}
# 配置服务设置
result["configService"] = {
"host": self.configService.host,
"port": self.configService.port,
"configPath": self.configService.configPath
}
# 全局参数
result["globalParameters"] = {
"parameters": self.globalParameters.parameters
}
return result
@staticmethod
def load(path: Path) -> "AppConfig":
with path.open("r", encoding="utf-8") as f:
data = json.load(f)
return AppConfig.from_dict(data)
def save(self, path: Path) -> None:
# 如果文件已存在,先创建备份
if path.exists():
import shutil
backup_path = path.with_suffix('.json.bak')
try:
shutil.copy2(path, backup_path)
except Exception:
pass # 备份失败不影响保存
# 保存配置
with path.open("w", encoding="utf-8") as f:
json.dump(self.to_dict(), f, ensure_ascii=False, indent=2)
def ensure_placeholder(self, key: str, type_hint: str) -> PlaceholderConfig:
if key not in self.placeholders:
default = PlaceholderConfig(
key=key,
type=type_hint,
label=key,
title=key if type_hint in ("table", "chart") else "",
influx=InfluxQueryConfig() if type_hint in ("table", "chart") else None,
grid=[[""]] if type_hint == "cell" else [],
dbQuery="" if type_hint == "dbText" else "",
)
self.placeholders[key] = default
ph = self.placeholders[key]
if ph.type != type_hint:
ph.type = type_hint
if type_hint in ("table", "chart"):
ph.influx = ph.influx or InfluxQueryConfig()
if type_hint == "table" and ph.table is None:
ph.table = TableOptions()
else:
ph.influx = None
if type_hint == "cell":
ph.grid = ph.grid or [[""]]
else:
ph.grid = []
if type_hint == "dbText":
ph.dbQuery = ph.dbQuery or ""
if type_hint not in ("table", "chart"):
ph.table = TableOptions()
if type_hint not in ("table", "chart"):
ph.title = ph.title if type_hint in ("table", "chart") else ""
return ph