From cb28159b371622b8f3807967d74556d4e37cab08 Mon Sep 17 00:00:00 2001 From: risingLee <871066422@qq.com> Date: Tue, 23 Dec 2025 16:13:19 +0800 Subject: [PATCH] 11 --- .task_state.json | 5 + configs/pcm-influxdb-debug.py | 2056 +++++++++++++++++++++++++++++++++ pcm-influxdb-debug.py | 535 +++++++-- 3 files changed, 2492 insertions(+), 104 deletions(-) create mode 100644 .task_state.json create mode 100644 configs/pcm-influxdb-debug.py diff --git a/.task_state.json b/.task_state.json new file mode 100644 index 0000000..66264a5 --- /dev/null +++ b/.task_state.json @@ -0,0 +1,5 @@ +{ + "running_time": 0, + "last_update": "2025-12-23T00:00:00", + "version": "1.0" +} diff --git a/configs/pcm-influxdb-debug.py b/configs/pcm-influxdb-debug.py new file mode 100644 index 0000000..83514ee --- /dev/null +++ b/configs/pcm-influxdb-debug.py @@ -0,0 +1,2056 @@ +import threading, pynmea2, time, struct, serial, socket, yaml, os, logging.config, json, subprocess, shutil, time, copy, gc, glob +from pymodbus.server.sync import StartTcpServer +from pymodbus.client.sync import ModbusTcpClient +from pymodbus.datastore import ModbusSequentialDataBlock +from pymodbus.datastore import ModbusSlaveContext, ModbusServerContext +from threading import Lock +import numpy as np +from datetime import datetime +from pathlib import Path +from influxdb_client import InfluxDBClient, Point +from influxdb_client.client.write_api import SYNCHRONOUS +from config_service import ConfigService + +def checkValue(data, little_endian=True): + """ + 计算Modbus CRC16校验和 + 参数: + data: 字节串或字节数组 + little_endian: 是否使用小端字节序,默认为False(大端) + 返回: + CRC16值 (2字节,小端字节序) + """ + crc = 0xFFFF + for byte in data: + crc ^= byte + for _ in range(8): + if crc & 0x0001: + crc = (crc >> 1) ^ 0xA001 + else: + crc = crc >> 1 + if little_endian: + # 小端字节序:低位在前,高位在后 + low_byte = crc & 0xFF + high_byte = (crc >> 8) & 0xFF + return (low_byte << 8) | high_byte + else: + # 大端字节序:高位在前,低位在后 + return crc & 0xFFFF + +def nowStr(): + now = datetime.now() + ret = now.strftime('%Y/%m/%d %H:%M:%S.') + f"{now.microsecond // 1000:03d}" + return ret + +def wordData2HexStr(data): + if data: + ret = ' '.join(data[i:i+2].hex() for i in range(0, len(data), 2)) + else: + ret = '' + return ret.upper() + +def float_to_registers(value): + packed = struct.pack('>f', value) + return [struct.unpack('>H', packed[0:2])[0], struct.unpack('>H', packed[2:4])[0]] + +def registers_to_float(registers, byte_order='ABCD'): + """ + 将两个寄存器转换为浮点数 + Args: + registers (list): 两个寄存器的值 [reg1, reg2] + byte_order (str): 字节顺序 + Returns: + float: 转换后的浮点数 + """ + if len(registers) != 2: + return None + + # 将寄存器拆分为字节 + # 每个寄存器是16位,拆分为2个字节 + reg1_bytes = registers[0].to_bytes(2, byteorder='big') # 高地址寄存器 + reg2_bytes = registers[1].to_bytes(2, byteorder='big') # 低地址寄存器 + + # 根据字节顺序组合字节 + if byte_order == 'ABCD': # 标准Modbus (大端序) + byte_array = reg1_bytes + reg2_bytes + elif byte_order == 'CDAB': # 字交换 + byte_array = reg2_bytes + reg1_bytes + elif byte_order == 'BADC': # 字节交换 + byte_array = bytes(reversed(reg1_bytes)) + bytes(reversed(reg2_bytes)) + elif byte_order == 'DCBA': # 字节和字都交换 + byte_array = bytes(reversed(reg2_bytes)) + bytes(reversed(reg1_bytes)) + else: + return None + float_value = struct.unpack('>f', byte_array)[0] # '>f' 表示大端序浮点数 + # 检查是否为NaN或无穷大 + if abs(float_value) == float('inf'): + return None + return float_value + +class ConfigManager: + def __init__(self, regs_config_file, config_file, logger): + self.config_file = Path(config_file) + self.regs_config_file = Path(regs_config_file) + self.lock = threading.Lock() + self.config = {} + self.regs_config = {} + self.logger = logger + self.mapping = BidirectionalMap() + + self.load_all_configs() + + # 设置文件监视器 + # self.observer = Observer() + # self.event_handler = ConfigFileHandler(self) + # self.observer.schedule(self.event_handler, path=str(self.config_file.parent)) + # self.observer.start() + + def load_all_configs(self): + """加载主配置和寄存器配置""" + with self.lock: + if not os.path.exists(self.config_file): + self.logger.warning(f"Config file {self.config_file} not found") + + if not os.path.exists(self.regs_config_file): + self.logger.warning(f"Regsister mapping file {self.regs_config_file} not found") + + # 加载主配置 + with open(self.config_file, 'r') as f: + self.config = yaml.safe_load(f) + # 低速采集sensor_type处理 + self.config['lsdaq']['sensor_type'] = self.config['lsdaq'].get('sensor_type').replace(' ', '') + if len(self.config['lsdaq']['sensor_type']) != 16 or not all(c in '01' for c in self.config['lsdaq']['sensor_type']): + self.config['lsdaq']['sensor_type'] = '1111111111111111' + self.config['lsdaq']['sensor_type'] = int(self.config['lsdaq']['sensor_type'][::-1], 2) + + # 低速采集 warning_param enable 处理 + self.config['lsdaq']['warning_param']['enable'] = self.config['lsdaq']['warning_param'].get('enable').replace(' ', '') + if len(self.config['lsdaq']['warning_param']['enable']) != 16 or not all(c in '01' for c in self.config['lsdaq']['warning_param']['enable']): + self.config['lsdaq']['warning_param']['enable'] = '0000000000000000' + self.config['lsdaq']['warning_param']['enable'] = int(self.config['lsdaq']['warning_param']['enable'][::-1], 2) + + # 高速采集sensor_type处理 + self.config['hsdaq']['sensor_type'] = self.config['hsdaq'].get('sensor_type').replace(' ', '') + if len(self.config['hsdaq']['sensor_type']) != 32 or not all(c in '01' for c in self.config['hsdaq']['sensor_type']): + self.config['hsdaq']['sensor_type'] = '11111111111111111111111111111111' + _s = self.config['hsdaq']['sensor_type'] + self.config['hsdaq']['sensor_type'] = int(''.join([_s[2*i:2*i+2] for i in range(len(_s)//2-1, -1, -1)]), 2) + + # 高速采集save_flag处理 + self.config['hsdaq']['save_flag'] = self.config['hsdaq'].get('save_flag').replace(' ', '') + if len(self.config['hsdaq']['save_flag']) != 16 or not all(c in '01' for c in self.config['hsdaq']['save_flag']): + self.config['hsdaq']['save_flag'] = '1111111111111111' + self.config['hsdaq']['save_flag'] = int(self.config['hsdaq']['save_flag'][::-1], 2) + + # 高速采集 warning_param enable 处理 + self.config['hsdaq']['warning_param']['enable'] = self.config['hsdaq']['warning_param'].get('enable').replace(' ', '') + if len(self.config['hsdaq']['warning_param']['enable']) != 32 or not all(c in '01' for c in self.config['hsdaq']['warning_param']['enable']): + self.config['hsdaq']['warning_param']['enable'] = '0000000000000000' + self.config['hsdaq']['warning_param']['enable'] = int(self.config['hsdaq']['warning_param']['enable'][::-1], 2) + + with open(self.regs_config_file, 'r') as f: + self.regs_config = yaml.safe_load(f) + + # 构建映射关系 + self._build_mappings() + + def _build_mappings(self): + """构建配置键到地址的双向映射""" + # 处理value_regs + # if 'value_regs' in self.regs_config: + # self._process_registers_section(self.regs_config['value_regs'], '', 'value') + + # 处理control_regs + if 'control_regs' in self.regs_config: + self._process_registers_section(self.regs_config['control_regs'], '', 'control') + + def _process_registers_section(self, section, current_path, reg_type): + """处理寄存器配置部分""" + def traverse(node, current_path=""): + # print(f"node={node}, current_path={current_path}") + for key, value in node.items(): + new_path = f"{current_path}.{key}" if current_path else key + if isinstance(value, dict): + if all(isinstance(k, str) and isinstance(v, int) for k, v in value.items()): + # 这是叶子节点,包含寄存器地址 + for sub_key, address in value.items(): + full_path = f"{new_path}.{sub_key}" + self.mapping.add_mapping(full_path, address, reg_type) + else: + traverse(value, new_path) + else: + # 直接映射 + self.mapping.add_mapping(new_path, value[0], value[1]) + + traverse(section, current_path) + # print(f"key_to_address={self.mapping.key_to_address}") + # print(f"address_to_keys={self.mapping.address_to_keys}") + # print(f"key_to_data_type={self.mapping.key_to_data_type}") + # print(f"address_to_data_type={self.mapping.address_to_data_type}") + + def get_config_value(self, config_path): + """通过配置路径获取配置值""" + keys = config_path.split('.') + node = self.config + for key in keys: + if isinstance(node, dict) and key in node: + node = node[key] + else: + return None + return node + + def update_config_value(self, config_path, value): + """更新配置值并保存""" + with self.lock: + # print(config_path) + keys = config_path.split('.') + node = self.config + for key in keys[:-1]: + if key not in node: + node[key] = {} + node = node[key] + node[keys[-1]] = value + + # 保存到文件 + # self._save_config() + return True + + def _save_config(self): + """保存配置到文件""" + _config = copy.deepcopy(self.config) + _config['lsdaq']['sensor_type'] = f"{_config['lsdaq']['sensor_type']:016b}"[::-1] + _config['lsdaq']['warning_param']['enable'] = f"{_config['lsdaq']['warning_param']['enable']:016b}"[::-1] + _s = f"{_config['hsdaq']['sensor_type']:032b}" + _config['hsdaq']['sensor_type'] = ''.join([_s[2*i:2*i+2] for i in range(len(_s)//2-1, -1, -1)]) + _config['hsdaq']['save_flag'] = f"{_config['hsdaq']['save_flag']:016b}"[::-1] + _config['hsdaq']['warning_param']['enable'] = f"{_config['hsdaq']['warning_param']['enable']:016b}"[::-1] + + with open(self.config_file, 'w') as f: + yaml.dump(_config, f, sort_keys=False, default_flow_style=False) + + # def close(self): + # self.observer.stop() + # self.observer.join() + +class BidirectionalMap: + def __init__(self): + self.key_to_address = {} # 配置键 -> (地址, 类型) + self.address_to_keys = {} # 地址 -> [配置键] + self.key_to_data_type = {} # 配置键 -> 数据类型 + self.address_to_data_type = {} # 地址 -> 数据类型 + + def add_mapping(self, config_key, address, reg_type, data_type='uint16'): + """添加映射关系""" + self.key_to_address[config_key] = (address, reg_type) + self.address_to_keys.setdefault(address, []).append(config_key) + self.key_to_data_type[config_key] = data_type + self.address_to_data_type[address] = data_type + + def get_address(self, config_key): + """通过配置键获取地址和类型""" + print(self.key_to_address) + return self.key_to_address.get(config_key, (None, None)) + + def get_config_keys(self, address): + """通过地址获取配置键列表""" + # print(self.address_to_keys) + return self.address_to_keys.get(address, []) + + def get_data_type(self, identifier): + """获取数据类型,identifier可以是地址或配置键""" + if isinstance(identifier, int): + return self.address_to_data_type.get(identifier) + else: + return self.key_to_data_type.get(identifier) + +class DataTypeValidator: + @staticmethod + def validate(value, data_type): + try: + if data_type == 'float32': + return float(value) + elif data_type in ('uint16', 'uint32'): + val = int(value) + if data_type == 'uint16' and not (0 <= val <= 65535): + raise ValueError("Value out of range for uint16") + elif data_type == 'uint32' and not (0 <= val <= 4294967295): + raise ValueError("Value out of range for uint32") + return val + elif data_type == 'int32': + val = int(value) + if not (-2147483648 <= val <= 2147483647): + raise ValueError("Value out of range for int32") + return val + elif data_type == 'string': + return str(value) + else: + return int(value) # 默认处理为uint16 + except (ValueError, TypeError) as e: + logging.error(f"Validation failed for {value} as {data_type}: {str(e)}") + return None + +class RegisterConfigEnhancer: + def __init__(self, register_config): + self.register_config = register_config + self.data_type_mapping = self._create_data_type_mapping() + + def _create_data_type_mapping(self): + """为寄存器分配适当的数据类型""" + mapping = {} + + # GPS数据通常需要浮点数 + if 'value_regs' in self.register_config and 'gps' in self.register_config['value_regs']: + for field in ['latitude', 'longitude', 'altitude', 'speed']: + if field in self.register_config['value_regs']['gps']: + addr = self.register_config['value_regs']['gps'][field] + mapping[addr] = 'float32' + + # 传感器校准参数需要浮点数 + for dev in ['lsdaq', 'hsdaq']: + if dev in self.register_config.get('control_regs', {}): + for param_type in ['sensor_Tmp_CalibParam', 'sensor_Cur_CalibParam', + 'sensor_Vol_CalibParam', 'sensor_Vib_CalibParam']: + if param_type in self.register_config['control_regs'][dev]: + for ch in self.register_config['control_regs'][dev][param_type]: + for param in ['K2', 'K', 'B']: + addr = self.register_config['control_regs'][dev][param_type][ch][param] + mapping[addr] = 'float32' + + return mapping + + def get_data_type(self, address): + return self.data_type_mapping.get(address, 'uint16') + +class ModbusSequentialDataBlockForPCM(ModbusSequentialDataBlock): + def __init__(self, config_manager, logger, *args, **kwargs): + super().__init__(*args, **kwargs) + self.config_manager = config_manager + self._is_client_write = True + self.logger = logger + self._initialize_registers() + + def _initialize_registers(self): + """Initialize register values from configuration""" + for key, value in self.config_manager.regs_config['control_regs'].items(): + config_value = self.config_manager.get_config_value(key) + # print(f"{key}:{value[0]}:{config_value}") + if config_value is not None and ('w' in value[2] or 'W' in value[2]): + match value[1]: + case 'float32': + config_value = float(config_value) + self.server_set_values(value[0]+1, float_to_registers(config_value)) + case 'uint32': + config_value = int(config_value) + # self.server_set_values(value[0]+1, [config_value & 0xFFFF, (config_value >> 16) & 0xFFFF]) + self.server_set_values(value[0]+1, [(config_value >> 16) & 0xFFFF, config_value & 0xFFFF]) + case 'int32': + config_value = int(config_value) + # self.server_set_values(value[0]+1, [config_value & 0xFFFF, (config_value >> 16) & 0xFFFF]) + self.server_set_values(value[0]+1, [(config_value >> 16) & 0xFFFF, config_value & 0xFFFF]) + case 'uint16': + config_value = int(config_value) + # print(f"{key}:{value[0]}:{config_value}:{[struct.pack('>H', config_value)[0]]}") + self.server_set_values(value[0]+1, [config_value & 0xFFFF]) + case 'int16': + config_value = int(config_value) + self.server_set_values(value[0]+1, [config_value & 0xFFFF]) + case _: + pass + self.logger.info("Register initialization completed") + + def setValues(self, address, values): + """Override setValues method""" + if not self._is_client_write: + super().setValues(address, values) + return + + super().setValues(address, values) + + # Handle client writes + updated = False + print(f"*************************address={address}, values={values}*************************") + reg_addr = address - 1 + # print(f"values = {values}") + # path = self.config_manager.mapping.get_config_keys(reg_addr) + # print(f"*************************{path}:{reg_addr}:{values}********************") + # if self.config_manager.update_config_value(path[0], value[0]): + # updated = True + + regCount = len(values) + while(regCount > 0): + path = self.config_manager.mapping.get_config_keys(reg_addr) + print(f"*************************{path}, {reg_addr}, {regCount}*************************") + dataType = self.config_manager.mapping.key_to_address[path[0]][1] + print(f"*************************{path}, {dataType}, {regCount}*************************") + if len(path) > 0: + if '16' in dataType: + print(f"*************************{path}:{reg_addr}:{values[0]}:{regCount}********************") + if dataType in ['int16', 'uint16']: + self.config_manager.update_config_value(path[0], int(values[0])) + regCount -= 1 + reg_addr += 1 + values = values[1:] + elif '32' in dataType: + print(f"*************************{path}:{reg_addr}:{values[0:2]}:{regCount}********************") + if dataType in ['int32', 'uint32']: + self.config_manager.update_config_value(path[0], (values[0]<<16)+values[1]) + elif dataType == 'float32': + self.config_manager.update_config_value(path[0], registers_to_float(values)) + regCount -= 2 + reg_addr += 2 + values = values[2:] + else: + regCount -= 1 + reg_addr += 1 + + if updated: + self.config_manager.save_config() + self.logger.debug(f"Register {address} update triggered configuration change") + + def server_set_values(self, address, values): + """Server-only write method that won't trigger YAML update""" + # self._is_client_write = False + # self.setValues(address, values) + # self._is_client_write = True + super().setValues(address, values) + +class LSDAQ: + def __init__(self, config:dict, logger): + # 加载配置参数 + ''' self.status 码表 + -200: 配置信息错误 + -201: 串口号错误 + -202: 传感器类型错误 + -203: 工作模式错误 + -100: 设备关闭 + -101: 设备未连接 + -1: 多次执行指令失败 + 0: 正常 + 100: 连接失败 + 200: 命令执行失败 + 202: 读取命令错误 + 203: 响应超时 + 204: 报头错误 + 205: 校验错误 + 206: 数据解析错误 + ''' + self.status = -1 + self.config = config + self.logger = logger + self.port = config.get('port', '/dev/ttyLP3') + if self.port != '/dev/ttyLP3': + self.status = -201 + self.baudrate = config.get('baudrate', 115200) + self.timeout = config.get('timeout', 50)/1000.0 + self.mode = config.get('mode', 0) + self.channels = config.get('channels', 16) + if self.mode not in [0, 1]: + self.mode = 0 + self.status = -203 + self.frameNo = 0 + self.sensor_type = config.get('sensor_type', 0xffff) + self.alias = config.get('alias', {}) + for i in range(16): + if f'CH{i+1}' not in self.alias: + self.alias[f'CH{i+1}'] = '' + self.reg_values = { + 'CH1': 0.0, + 'CH2': 0.0, + 'CH3': 0.0, + 'CH4': 0.0, + 'CH5': 0.0, + 'CH6': 0.0, + 'CH7': 0.0, + 'CH8': 0.0, + 'CH9': 0.0, + 'CH10': 0.0, + 'CH11': 0.0, + 'CH12': 0.0, + 'CH13': 0.0, + 'CH14': 0.0, + 'CH15': 0.0, + 'CH16': 0.0, + 'OFFSET': 0.0, + 'POWERVOL': 0.0, + 'TEMP': 0.0, + 'GAIN': 0.0, + 'REF': 0.0, + 'STATUS': 0.0 + } + self.warning_values = { + 'CH1': 0, + 'CH2': 0, + 'CH3': 0, + 'CH4': 0, + 'CH5': 0, + 'CH6': 0, + 'CH7': 0, + 'CH8': 0, + 'CH9': 0, + 'CH10': 0, + 'CH11': 0, + 'CH12': 0, + 'CH13': 0, + 'CH14': 0, + 'CH15': 0, + 'CH16': 0 + } + _sensor_Tmp_CalibParam = { + 'CH1': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH2': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH3': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH4': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH5': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH6': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH7': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH8': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH9': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH10': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH11': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH12': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH13': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH14': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH15': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH16': {'K2':0.0, 'K': 1.0, 'B': 0.0} + } + _sensor_Cur_CalibParam = { + 'CH1': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH2': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH3': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH4': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH5': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH6': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH7': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH8': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH9': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH10': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH11': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH12': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH13': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH14': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH15': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH16': {'K2':0.0, 'K': 1.0, 'B': 0.0} + } + + _sensor_Pres_CalibParam = { + 'CH1': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH2': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH3': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH4': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH5': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH6': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH7': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH8': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH9': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH10': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH11': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH12': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH13': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH14': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH15': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH16': {'K2':0.0, 'K': 1.0, 'B': 0.0} + } + + self.sensor_Tmp_CalibParam = config.get('sensor_Tmp_CalibParam', _sensor_Tmp_CalibParam) + self.sensor_Cur_CalibParam = config.get('sensor_Cur_CalibParam', _sensor_Cur_CalibParam) + self.sensor_Pres_CalibParam = config.get('sensor_Cur_CalibParam', _sensor_Pres_CalibParam) + + # 构建指令集 + self.cmdList = { + # 查询所有通道采集数据 + # 指令格式:指令字符串,回复长度,超时时间,发送校验标志,接收校验标志,指令描述,重试次数 + 'readAllADs': ['', f"0000 0000 0006 0103 0008 0017", 55, 200, 0, 0, 5] + } + self.optFlag = 0 + + def update_config(self): + self.mode = self.config.get('mode') + if self.mode not in [0, 1]: + self.mode = 0 + self.sensor_type = self.config.get('sensor_type', 0xffff) + self.sensor_Tmp_CalibParam = self.config.get('sensor_Tmp_CalibParam') + self.sensor_Cur_CalibParam = self.config.get('sensor_Cur_CalibParam') + self.sensor_Pres_CalibParam = self.config.get('sensor_Pres_CalibParam') + + def exeCmd(self, cmdName:str='readAllADs') -> list: # type: ignore + try: + info = '' + cmd = self.cmdList.get(cmdName, None) + self.status = 0 + if cmd is None: + self.status = 202 + return [False, None, f"Command {cmdName} not found in cmdList."] + retry = 0 + data = bytearray().fromhex(cmd[1]) + + if (cmd[4] == 1): + data += bytearray(checkValue(data[2:]).to_bytes(2, 'big')) + if len(cmd) >= 7: + RETRYTIMES = int(cmd[6]) + else: + RETRYTIMES = 1 + while (retry < RETRYTIMES): + info += f"[{nowStr()}] Sent:{wordData2HexStr(data)}\n" + recvData = bytearray() + self.serial.write(data) + time.sleep(int(cmd[3])/1000.0) + recvData = self.serial.read(int(cmd[2])) + info += (f"[{nowStr()}] Echo:{wordData2HexStr(recvData)}\n") + rspLen = int(cmd[2]) + if len(recvData) >= rspLen: + if recvData[0:2] == bytearray().fromhex(f"0000"): + # info += f"[{nowStr()}] Echo:{wordData2HexStr(recvData[0:rspLen])}\n" + rspLen = len(recvData) + if (cmd[5] == 1): + crc = int.from_bytes(recvData[rspLen-2:rspLen], byteorder='big') + calc_value = checkValue(recvData[0:rspLen-2]) + # info += f"{crc:04X}, {calc_value:04X}\n" + if crc == calc_value: + # self.logger.info(info) + return [True, recvData, info] + else: + self.status = 205 + else: + self.logger.info(info) + return [True, recvData, info] + else: + self.status = 204 + recvData = recvData[1:] + else: + self.status = 203 + retry += 1 + if retry == RETRYTIMES: + self.status = -1 + # self.logger.info(info) + return [False, None, info] + except Exception as e: + info += f"[{nowStr()}] Error in exeCmd({cmd}): {str(e)}\n" # type: ignore + # self.logger.info(info) + return [False, None, info] + + def parseData(self, cmdName, rawData): + _sensor_type = f"{self.sensor_type:016b}"[::-1] + match cmdName: + case 'readAllADs': + datas = struct.unpack('>23H', rawData[9:55]) + if self.mode == 1: + # 校准模式下,直接返回原始数据 + for i in range(self.channels): + self.reg_values[f'CH{i+1}'] = datas[i] + else: + # 工作模式下,进行数据转换 + for i in range(self.channels): + j = i + 1 + if _sensor_type[i] == '0': + # 温度传感器 + # self.logger.info(str(self.reg_values)) + self.reg_values[f'CH{j}'] = (datas[i]**2*self.sensor_Tmp_CalibParam[f'CH{j}']['K2'] + datas[i]*self.sensor_Tmp_CalibParam[f'CH{j}']['K'] + self.sensor_Tmp_CalibParam[f'CH{j}']['B']) + elif _sensor_type[i] == '1': + # 电流传感器 + self.reg_values[f'CH{j}'] = (datas[i]**2*self.sensor_Cur_CalibParam[f'CH{j}']['K2'] + datas[i]*self.sensor_Cur_CalibParam[f'CH{j}']['K'] + self.sensor_Cur_CalibParam[f'CH{j}']['B']) + # 转换为物理量 + self.reg_values[f'CH{j}'] = (self.reg_values[f'CH{j}']**2*self.sensor_Pres_CalibParam[f'CH{j}']['K2'] + self.reg_values[f'CH{j}']*self.sensor_Pres_CalibParam[f'CH{j}']['K'] + self.sensor_Pres_CalibParam[f'CH{j}']['B']) + + self.reg_values['OFFSET'] = datas[16]*256/786432 + self.reg_values['POWERVOL'] = datas[18]*256/786432 + self.reg_values['TEMP'] = (datas[19]*4500000*256/7864320-168000)/563 + 25 #7864320*256/4500000-168000)/563 + 25 + self.reg_values['GAIN'] = datas[20]*256/7864320 + self.reg_values['REF'] = datas[21]*256/786432 + self.reg_values['STATUS'] = self.status + + self.warning_check() + case _: + self.status = 206 + + def warning_check(self): + """检查是否有报警条件""" + for i in range(self.channels): + ch = f'CH{i+1}' + val = self.reg_values[ch] + wp = self.config.get('warning_param', {}) + enable_bits = f"{wp.get('enable', 0):016b}"[::-1] + if enable_bits[i] == '1': + low_limit = wp.get(ch, {}).get('lower', float('-inf')) + high_limit = wp.get(ch, {}).get('upper', float('inf')) + if val < low_limit or val > high_limit: + # self.logger.warning(f"Warning: {ch} value {val} out of limits ({low_limit}, {high_limit})") + self.warning_values[ch] = 1 + else: + self.warning_values[ch] = 0 + else: + self.warning_values[ch] = 0 + + def open(self): + """打开串口连接""" + self.serial = serial.Serial(self.port, self.baudrate, timeout=self.timeout) + if not self.serial.is_open: + self.status = -101 + return -1 + else: + self.status = 0 + return 0 + + def close(self): + self.serial.close() + self.status = -100 + + def run(self): + """主运行循环""" + try: + while True: + match self.optFlag: + case 0: + if self.open() == 0: + self.optFlag = 1 + else: + self.optFlag = -1 + case 1: + ret = self.exeCmd('readAllADs') + if ret[0]: + self.parseData('readAllADs', ret[1]) + # self.logger.info(str(self.reg_values)) + self.frameNo += 1 + if self.frameNo > 0xFFFF: + self.frameNo = 0 + time.sleep(1) + if self.status == -1: + self.optFlag = -1 + case _: + time.sleep(5) + self.close() + self.optFlag = 0 + except KeyboardInterrupt: + self.close() + self.logger.info("Modbus Serial TCP Client stopped.") + +class Breaker: + def __init__(self, config:dict, logger): + # 加载配置参数 + ''' self.errorCode 码表 + 0x0001 打开/dev/ttyUSB0设备失败 + 0x0101 与断路器通讯失败 + ''' + self.errorCode = 0 + ''' self.load_status 码表 + 0x00 负载不在线 + 0x0101 负载在线 + ''' + self.load_status = 0 + self.config = config + self.logger = logger + self.port = config.get('port', '/dev/ttyUSB0') + self.baudrate = config.get('baudrate', 9600) + self.timeout = config.get('timeout', 50)/1000.0 + self.task_start_threshold = config.get('task_start_threshold', 2000) + self.task_stop_threshold = config.get('task_stop_threshold', 2000) + self.locked = 0 + self.closed = 0x0F # 0x0F:分闸 0xF0:合闸 + self.reasonForLastOpen = 15 # F:无 1:过流 2:漏电 3:过温 4:过载 5:过压 6:欠压 7:远程 8:模组 9:失压 A:锁扣 B:限电 0: 本地 + self.active_powers = [] + self.duration = config.get('duration', 5) + self.alarm = 0 + + self.active_power = 0 # 有功功率,单位:W + + OVV = config.get('OVV', 275) + UVV = config.get('UVV', 150) + OCV = config.get('OCV', 10000) + LCV = config.get('LCV', 30) + OTV = config.get('OTV', 80) + OPV = config.get('OPV', 13000) + OVT = config.get('OVT', 0) + UVT = config.get('UVT', 0) + OCT = config.get('OCT', 0) + LCT = config.get('LCT', 200) + OTT = config.get('OTT', 200) + OPT = config.get('OPT', 100) + + self.reg_values = { + 'locked': 0, + 'closed': 0x0F, + 'reasonForLastOpen': 0x0F, + 'alarm': 0, + 'active_power': 0, + 'load_status': 0 + } + # 构建指令集 + self.cmdList = { + # 指令格式:指令描述,指令字符串,回复长度,超时时间,发送校验标志,接收校验标志,重试次数 + 'readAllDatas': ['', f"0204 0000 0027", 83, 300, 1, 1, 3], + 'readOverLimitValues': ['', f"0203 0002 0006", 17, 200, 1, 1, 3], + 'readOverLimitActionTime': ['', f"0203 0010 0006", 17, 200, 1, 1, 3], + 'setOverLimitValues': ['', f"0210 0002 0006 0C {OVV:04X} {UVV:04X} {OCV:04X} {LCV:04X} {OTV:04X} {OPV:04X}", 8, 100, 1, 1, 3], + 'setOverLimitActionTime': ['', f"0210 0010 0006 0C {OVT:04X} {UVT:04X} {OCT:04X} {LCT:04X} {OTT:04X} {OPT:04X}", 8, 100, 1, 1, 3], + 'closeBreaker': ['', f"0210 000D 0001 02 FF00", 8, 100, 1, 1, 3], + 'openBreaker': ['', f"0210 000D 0001 02 0000", 8, 100, 1, 1, 3], + 'turnOnGreen': ['', f"0105 0002 FF00", 8, 100, 1, 1, 3], + 'turnOffGreen': ['', f"0105 0002 0000", 8, 100, 1, 1, 3], + 'turnOnRed': ['', f"0105 0008 FF00", 8, 100, 1, 1, 3], + 'turnOffRed': ['', f"0105 0000 0000", 8, 100, 1, 1, 3], + 'turnOnAlarm': ['', f"0105 00A1 FF00", 8, 100, 1, 1, 3], + 'turnOffAlarm': ['', f"0105 00A1 0000", 8, 100, 1, 1, 3] + } + + self.optFlag = 0 + self.logger.info(f"Breader routine inspection started.") + + def update_config(self): + pass + + def exeCmd(self, cmdName) -> list: # type: ignore + try: + info = '' + cmd = self.cmdList.get(cmdName, None) + if cmd is None: + return [False, None, f"Command {cmdName} not found in cmdList."] + retry = 0 + data = bytearray().fromhex(cmd[1]) + if (cmd[4] == 1): + data += bytearray(checkValue(data[0:]).to_bytes(2, 'big')) + if len(cmd) >= 7: + RETRYTIMES = int(cmd[6]) + else: + RETRYTIMES = 1 + while (retry < RETRYTIMES): + info += f"[{nowStr()}] Sent:{wordData2HexStr(data)}\n" + recvData = bytearray() + self.serial.write(data) + time.sleep(int(cmd[3])/1000.0) + recvData = self.serial.read(int(cmd[2])) + info += (f"[{nowStr()}] Echo:{wordData2HexStr(recvData)}\n") + rspLen = int(cmd[2]) + if len(recvData) >= rspLen: + if recvData[0:2] == bytearray().fromhex(cmd[1][0:4]): + # info += f"[{nowStr()}] Echo:{wordData2HexStr(recvData[0:rspLen])}\n" + recvData = recvData[0:rspLen] + if (cmd[5] == 1): + crc = int.from_bytes(recvData[rspLen-2:rspLen], byteorder='big') + calc_value = checkValue(recvData[0:rspLen-2]) + # info += f"{crc:04X}, {calc_value:04X}\n" + if crc == calc_value: + # self.logger.info(info) + return [True, recvData, info] + else: + # self.logger.info(info) + return [True, recvData, info] + retry += 1 + # self.logger.info(info) + return [False, None, info] + except Exception as e: + info += f"[{nowStr()}] Error in exeCmd({cmd}): {str(e)}\n" # type: ignore + # self.logger.info(info) + return [False, None, info] + + def parseData(self, cmdName, rawData): + try: + match cmdName: + case 'readAllDatas': + rawData = rawData[3:-2] + self.locked = rawData[0] + self.closed = rawData[1] + self.reasonForLastOpen = (rawData[6]&0xF0)>>4 + self.active_power = int.from_bytes(rawData[68:70], byteorder='big') + self.active_powers.append(self.active_power) + + if len(self.active_powers) > self.duration * 2: + self.active_powers = self.active_powers[1:] + if np.mean(self.active_powers) > self.task_start_threshold: + self.load_status = 1 + if np.mean(self.active_powers) < self.task_stop_threshold: + self.load_status = 0 + + + self.reg_values['locked'] = self.locked + self.reg_values['closed'] = self.closed + self.reg_values['reasonForLastOpen'] = self.reasonForLastOpen + self.reg_values['alarm'] = self.alarm + self.reg_values['active_power'] = self.active_power + self.reg_values['load_status'] = self.load_status + + print(f"breaker: {self.reg_values}") + + case 'closeBreaker': + pass + case 'openBreaker': + pass + case _: + pass + except Exception as e: + self.logger.error(f"[{nowStr()}] Error in Breaker: parseData({cmdName}): {str(e)}\n") + + def openBreaker(self): + if self.reg_values['closed'] == 0xF0: + self.optFlag = 2 + + def closeBreaker(self): + if self.reg_values['closed'] == 0x0F: + self.optFlag = 3 + + def alarming(self): + if not self.alarm and self.closed & 0xFF == 0xF0: + self.exeCmd('turnOffGreen') + self.exeCmd('turnOnRed') + self.exeCmd('turnOnAlarm') + + def unalarming(self): + if self.alarm: + if self.closed & 0xFF == 0xF0: + self.exeCmd('turnOnGreen') + self.exeCmd('turnOffRed') + self.exeCmd('turnOffAlarm') + else: + self.exeCmd('turnOffGreen') + self.exeCmd('turnOffRed') + self.exeCmd('turnOffAlarm') + + def open(self): + """打开串口连接""" + self.serial = serial.Serial(self.port, self.baudrate, timeout=self.timeout) + if not self.serial.is_open: + self.errorCode = 0x0001 + return -1 + else: + self.errorCode = 0 + return 0 + + def close(self): + if self.serial.is_open: + self.serial.close() + + def run(self): + """主运行循环""" + try: + while True: + # self.logger.info(f"optFlag={self.optFlag}") + match self.optFlag: + case 0: + if self.open() == 0: + ret0 = self.exeCmd('openBreaker') + # self.logger.info(f"setOverLimitValues ret: {ret0}") + ret1 = self.exeCmd('setOverLimitValues') + # self.logger.info(f"setOverLimitValues ret: {ret1}") + ret2 = self.exeCmd('readOverLimitValues') + self.logger.info(f"readOverLimitValues ret: {ret2}") + ret3 = self.exeCmd('setOverLimitActionTime') + # self.logger.info(f"setOverLimitValues ret: {ret3}") + ret4 = self.exeCmd('readOverLimitActionTime') + self.logger.info(f"readOverLimitActionTime ret: {ret4}") + if ret0[0] and ret1[0] and ret3[0]: + self.optFlag = 1 + continue + self.optFlag = -1 + self.errorCode = 0x0101 + case 1: + time.sleep(0.2) + ret = self.exeCmd('readAllDatas') + self.logger.info(f"readAllDatas ret: {wordData2HexStr(ret[1])}") + if ret[0]: + self.parseData('readAllDatas', ret[1]) + continue + self.optFlag = -1 + self.errorCode = 0x0101 + case 2: + ret = self.exeCmd('openBreaker') + if ret[0]: + self.optFlag = 1 + continue + self.optFlag = -1 + self.errorCode = 0x0101 + case 3: + ret = self.exeCmd('closeBreaker') + if ret[0]: + self.optFlag = 1 + continue + self.optFlag = -1 + self.errorCode = 0x0101 + case _: + time.sleep(1) + self.close() + self.optFlag = 0 + except Exception as e: + self.close() + self.logger.info(f"Error in Breader: run(), {e}") + +class GPS: + def __init__(self, config:dict, logger): + self.status = -1 + self.logger = logger + self.config = config + self.port = config.get('port', '/dev/ttyLP4') + if self.port != '/dev/ttyLP4': + self.status = -201 + self.baudrate = config.get('baudrate', 9600) + self.timeout = config.get('timeout', 1) + self.optFlag = 0 + self.gps_data = {'latitude': 0.0, 'longitude': 0.0, 'altitude': 0.0, 'speed': 0.0} + + def read_data(self): + """从串口读取GPS数据""" + if not self.serial or not self.serial.is_open: + return -1 + try: + # 读取NMEA数据 (简化示例,实际需要解析NMEA语句) + line = self.serial.readline().decode('ascii', errors='ignore').strip() + if line.startswith('$GNGGA') or line.startswith('$GPGGA') or line.startswith('$BDGGA'): + # 示例解析GPGGA语句 (实际应用中需要更健壮的解析) + parts = line.split(',') + if len(parts) > 9: + try: + # 纬度格式转换: ddmm.mmmm -> 十进制 + lat = (float(parts[2][:2]) if parts[2] else 0.0) + (float(parts[2][2:]) if parts[2] else 0.0)/60.0 + if parts[3] == 'S': + lat = -lat + + # 经度格式转换: dddmm.mmmm -> 十进制 + lon = (float(parts[4][:3]) if parts[4] else 0.0) + (float(parts[4][3:]) if parts[4] else 0.0)/60.0 + if parts[5] == 'W': + lon = -lon + + # 海拔高度 + alt = float(parts[9]) if parts[9] else 0.0 + + self.gps_data = { + 'latitude': lat, + 'longitude': lon, + 'altitude': alt, + 'speed': 0.0 # GPGGA不包含速度,需要从GPRMC获取 + } + return 0 + except (ValueError, IndexError) as e: + raise Exception(f"Error in parse GPS data: {e}") + except Exception as e: + self.logger.error(f"Error in read_gps_data(): {e}") + + def open(self): + """打开串口连接""" + self.serial = serial.Serial(self.port, self.baudrate, timeout=self.timeout) + if not self.serial.is_open: + self.status = -101 + return -1 + else: + self.status = 0 + return 0 + + def close(self): + self.serial.close() + self.status = -100 + + def run(self): + """主运行循环""" + try: + while True: + match self.optFlag: + case 0: + if self.open() == 0: + self.optFlag = 1 + else: + self.optFlag = -1 + case 1: + ret = self.read_data() + if ret != 0: + self.optFlag = -1 + continue + self.logger.info(str(self.gps_data)) + case _: + time.sleep(5) + self.close() + self.open() + self.optFlag = 0 + except KeyboardInterrupt: + self.close() + self.logger.info("Modbus Serial TCP Client stopped.") + +class HSDAQ: + def __init__(self, config:dict, logger): + try: + self.config = config + self.logger = logger + result = subprocess.run(["ip","neigh","add", "192.168.0.2", "lladdr","00:0A:35:01:FE:C0", "dev", "ethernet0"], capture_output=True, text=True, encoding="utf-8") + if result.returncode != 0: + self.logger.info(result.stderr) + # result = subprocess.run(["sudo","ethtool","-s", "ethernet0", "speed", "100", "duplex", "full", "autoneg", "off"], capture_output=True, text=True, encoding="utf-8") + # if result.returncode != 0: + # self.logger.info(result.stderr) + + # 设置允许强制修改缓存区大小 + self.sock = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.ntohs(0x0003)) + SO_RCVBUFORCE = 33 + self.sock.setsockopt(socket.SOL_SOCKET, SO_RCVBUFORCE, 1024 * 1024 * 25) + # 设置 SO_NO_CHECK 选项,使用整数值 11 + SO_NO_CHECK = 11 + self.sock.setsockopt(socket.SOL_SOCKET, SO_NO_CHECK, 0) + actual_buf_size = self.sock.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF) + self.logger.info(f"Requested UDP buffer: 50MB, Actual UDP buffer: {actual_buf_size/1024/1024:.2f}MB") + self.sock.bind(('ethernet0', 0)) + + self.dataFileDir = self.config['output_dir'] + self.file_type = self.config.get('file_type', 1) + if self.file_type not in [0, 1]: + self.file_type = 1 + + self.save_flag = self.config.get('save_flag', 0xffff) + self.channels = self.config.get('channels', 16) + + if not os.path.exists(self.dataFileDir): + os.makedirs(self.dataFileDir) + for i in range(self.channels): + os.makedirs(os.path.join(self.config['output_dir'], f"{i+1:02}"), exist_ok=True) + + + self.buffer = b'' + self.feature_data = {} + self.frequency = [0]*16 + self.reg_values = [] + + self.daqBoardNo = self.config.get('daq_board_no', 'XXXXXXXXXX') + self.sensor_type = self.config.get('sensor_type', 0xffffffff) + + self.feature_type = self.config.get('feature_type', '加速度rms') + self.min_vol_cur_phy_value = self.config.get('min_vol_cur_phy_value', 0.0) + self.max_vol_cur_phy_value = self.config.get('max_vol_cur_phy_value', 160.0) + self.scale = self.config.get('vol_cur_phy_scale', 1) + + self.sample_time = self.config.get('sample_time', 1000) + self.sample_period = self.config.get('sample_period', 4000) + self.one_sample_time = self.config.get('one_sample_time', 10) + self.sample_rate = int(1000000/self.one_sample_time) + + self.sample_points = int(self.sample_time*1000/self.one_sample_time) + self.mode = self.config.get('mode', 0) + self.alias = config.get('alias', {}) + for i in range(16): + if f'CH{i+1}' not in self.alias: + self.alias[f'CH{i+1}'] = '' + + self.cmdList = {'startDAQ': bytes.fromhex(f"DDDD 0001 {self.sample_time*1000:08X} {(self.sample_period-self.sample_time)*1000:08X} {self.one_sample_time:08X} {self.sensor_type:08X}"), + 'stopDAQ': bytes.fromhex(f"DDDD 0000 {self.sample_time*1000:08X} {(self.sample_period-self.sample_time)*1000:08X} {self.one_sample_time:08X} {self.sensor_type:08X}")} + + _sensor_Vol_CalibParam = { + 'CH1': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH2': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH3': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH4': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH5': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH6': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH7': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH8': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH9': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH10': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH11': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH12': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH13': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH14': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH15': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH16': {'K2':0.0, 'K': 1.0, 'B': 0.0} + } + _sensor_Cur_CalibParam = { + 'CH1': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH2': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH3': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH4': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH5': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH6': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH7': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH8': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH9': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH10': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH11': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH12': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH13': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH14': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH15': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH16': {'K2':0.0, 'K': 1.0, 'B': 0.0} + } + _sensor_Vib_CalibParam = { + 'CH1': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH2': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH3': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH4': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH5': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH6': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH7': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH8': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH9': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH10': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH11': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH12': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH13': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH14': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH15': {'K2':0.0, 'K': 1.0, 'B': 0.0}, + 'CH16': {'K2':0.0, 'K': 1.0, 'B': 0.0} + } + self.sensor_Vol_CalibParam = self.config.get('sensor_Vol_CalibParam', _sensor_Vol_CalibParam) + self.sensor_Cur_CalibParam = self.config.get('sensor_Cur_CalibParam', _sensor_Cur_CalibParam) + self.sensor_Vib_CalibParam = self.config.get('sensor_Vib_CalibParam', _sensor_Vib_CalibParam) + self.warning_values = { + 'CH1': 0, + 'CH2': 0, + 'CH3': 0, + 'CH4': 0, + 'CH5': 0, + 'CH6': 0, + 'CH7': 0, + 'CH8': 0, + 'CH9': 0, + 'CH10': 0, + 'CH11': 0, + 'CH12': 0, + 'CH13': 0, + 'CH14': 0, + 'CH15': 0, + 'CH16': 0 + } + self.logger.info(f"DAQ thread starts. Address of DAQ board: IP={self.config['host']}, port={self.config['port']}") + except Exception as e: + self.logger.error(f"Error in __init__(): {e}") + time.sleep(5) + self.__init__() + + def update_config(self): + self.file_type = self.config.get('file_type') + if self.file_type not in [0, 1]: + self.file_type = 1 + self.save_flag = self.config.get('save_flag') + self.sensor_type = self.config.get('sensor_type') + self.sample_time = self.config.get('sample_time') + self.sample_period = self.config.get('sample_period') + self.one_sample_time = self.config.get('one_sample_time') + self.sample_points = int(self.sample_time*1000/self.one_sample_time) + self.mode = self.config.get('mode') + if self.mode not in [0, 1]: + self.mode = 1 + + self.cmdList = {'startDAQ': bytes.fromhex(f"DDDD 0001 {self.sample_time*1000:08X} {(self.sample_period-self.sample_time)*1000:08X} {self.one_sample_time:08X} {self.sensor_type:08X}"), + 'stopDAQ': bytes.fromhex(f"DDDD 0000 {self.sample_time*1000:08X} {(self.sample_period-self.sample_time)*1000:08X} {self.one_sample_time:08X} {self.sensor_type:08X}")} + self.sensor_Vol_CalibParam = self.config.get('sensor_Vol_CalibParam') + self.sensor_Cur_CalibParam = self.config.get('sensor_Cur_CalibParam') + self.sensor_Vib_CalibParam = self.config.get('sensor_Vib_CalibParam') + + def start_DAQ(self): + """发送启动采集指令""" + try: + self.buffer = bytearray() + if hasattr(self, 'udpsock') and self.udpsock: + self.udpsock.close() + self.udpsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + # self.udpsock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024 * 25) + self.udpsock.bind((self.config['local_host'], self.config['local_port'])) + self.udpsock.sendto(self.cmdList['startDAQ'], (self.config['host'], self.config['port'])) + self.udpsock.close() + self.logger.info(f"Send start command to DAQ board. {self.cmdList['startDAQ'].hex()}") + except Exception as e: + self.logger.error(f"Error in start_DAQ(): {e}") + + def stop_DAQ(self): + """发送停止采集指令""" + try: + self.buffer = bytearray() + if hasattr(self, 'udpsock') and self.udpsock: + self.udpsock.close() + self.udpsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + # self.udpsock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024 * 25) + self.udpsock.bind((self.config['local_host'], self.config['local_port'])) + self.udpsock.sendto(self.cmdList['stopDAQ'], (self.config['host'], self.config['port'])) + self.udpsock.close() + self.logger.info(f"Send stop command to DAQ board. {self.cmdList['stopDAQ'].hex()}") + except Exception as e: + self.logger.error(f"Error in stop_DAQ(): {e}") + + def _get_dir_size(self, path: Path) -> int: + """利用 Linux 的 du,返回目录本身已占用字节数,毫秒级""" + return int(subprocess.check_output( + ['du', '-sb', str(path)], text=True).split()[0]) + + def _oldest_file(self, path: Path): + """返回目录中最旧的普通文件 Path 对象,没有则返回 None""" + with os.scandir(path) as it: + files = [e for e in it if e.is_file()] + if not files: + return None + # 按修改时间升序 + return Path(min(files, key=lambda e: e.stat().st_mtime).path) + + def save_data(self): + """保存数据到文件""" + try: + #判断磁盘剩余空间是否小于1G,如果是从16通道的旧文件目录中删除文件 + channels = self.channels + # usage = shutil.disk_usage("C:/") + # while usage.free < self.config['daq']['min_free_gb']*1024*1024*1024: + # #获取目录下文件列表,并按照降序排序,如果硬盘空间小于阈值,删除旧的文件 + # for i in range(channels): + # os.makedirs(os.path.join(self.config['daq']['output_dir'], f"{i+1:02}"), exist_ok=True) + # fileList = os.listdir(f"C:/users/Administrator/PCM/data/{i+1:02}") + # fileList.sort(reverse=False) + # if os.path.exists(f"C:/users/Administrator/PCM/data/{i+1:02}/{fileList[0]}"): + # os.remove(f"C:/users/Administrator/PCM/data/{i+1:02}/{fileList[0]}") + # # os.remove(f"C:/users/Administrator/PCM/data/{fileList[1]}") + # usage = shutil.disk_usage("C:/") + + target_dir = Path(self.dataFileDir) + max_usage_gb = 5 + max_usage_bytes = max_usage_gb * 1024**3 + channels = self.channels + while True: + if self._get_dir_size(target_dir) <= max_usage_bytes: + break + for ch in range(channels): + ch_path = target_dir / f'{ch+1:02d}' + ch_path.mkdir(parents=True, exist_ok=True) + victim = self._oldest_file(ch_path) + if victim: + victim.unlink() + self.reg_values = [] + timestamp = time.time() + timeStr = datetime.fromtimestamp(timestamp).strftime("%Y%m%d%H%M%S") + # with open(filename, 'wb') as f: + # f.write(self.buffer) + # f.close() + print(f"Length of buffer: {len(self.buffer)}") + datas = np.frombuffer(self.buffer, dtype='>h') + print(f"Length of datas: {len(datas)}") + datas = datas[:int(len(datas)/channels)*channels] + datas = datas.reshape(-1, channels) + data = None + _s = f"{self.sensor_type:032b}" + _sensor_type = ''.join([_s[2*i:2*i+2] for i in range(channels-1, -1, -1)]) + print(f"save_flag = {self.save_flag}") + _save_flag = f"{self.save_flag:016b}"[::-1] + if 'calib_params' in self.config and 'vibration' in self.config['calib_params']: + _fre = self.config['calib_params']['vibration'].get('frequency', -1) + else: + _fre = -1 + + for i in range(channels): + j = i + 1 + if _fre != -1 and _fre > 0 and self.config['mode'] == 1: + _len = len(datas[:,i])//_fre*_fre + _data = datas[0:_len,i] + else: + _data = datas[:,i] + + _data = _data[0:20] + log_data = np.log(np.abs(_data) + 1e-300) + log_mean_squared = 2 * np.mean(log_data) + np.log(len(_data)) + _rms = np.exp(0.5 * log_mean_squared) / self.scale + + # _rms = np.sqrt(np.mean(_data**2))/self.scale + _min = np.min(_data) + _max = np.max(_data) + _mean = np.mean(_data) + + # if (_max - _mean) * 5 < (_mean - _min): + # _mean = np.mean(_data[0:20]) + # _min = np.min(_data[0:20]) + # _max = np.max(_data[0:20]) + # _rms = np.sqrt(np.mean(np.square(_data[0:20]))) + + self.feature_data[f'CH{j}'] = { + 'min': _min/self.scale, + 'max': _max/self.scale, + 'mean': _mean/self.scale, + 'std': np.std(_data)/self.scale, + 'rms': _rms, + 'sr0': 0.0, + 'sr1': 0.0, + 'sr2': 0.0, + 'sr3': 0.0, + 'sr4': 0.0 + } + rms = self.feature_data[f'CH{j}']['rms'] + mean = self.feature_data[f'CH{j}']['mean'] + + filename = '' + match _sensor_type[2*i:2*i+2]: + case '00': + # 计算频率,以Hz为单位 + self.feature_data[f'CH{j}']['sr0'] = self.calculateFrequency(datas[:, i], self.one_sample_time) + self.reg_values.extend(list(self.feature_data[f'CH{j}'].values())) + data = datas[:, i] + filename = os.path.join(self.dataFileDir, f"{j:02}/{timeStr}_{rms:.3f}_{self.feature_type}_{self.daqBoardNo}_{j:02}_N_{self.sample_rate}_{self.sample_points}_0.000_{mean:.3f}_{self.min_vol_cur_phy_value}_{self.max_vol_cur_phy_value}_{self.scale}") + case '01': + # 计算声音大小 + self.reg_values.extend(list(self.feature_data[f'CH{j}'].values())) + data = datas[:, i] + filename = os.path.join(self.dataFileDir, f"{j:02}/{timeStr}_{rms:.3f}_{self.feature_type}_{self.daqBoardNo}_{j:02}_N_{self.sample_rate}_{self.sample_points}_0.000_{mean:.3f}_{self.min_vol_cur_phy_value}_{self.max_vol_cur_phy_value}_{self.scale}") + case '10': + # 计算电流大小 + data = datas[:, i] + if self.mode != 1: + for k, v in self.feature_data[f'CH{j}'].items(): + self.feature_data[f'CH{j}'][k] = v**2*self.sensor_Cur_CalibParam[f'CH{j}']['K2'] + v*self.sensor_Cur_CalibParam[f'CH{j}']['K'] + self.sensor_Cur_CalibParam[f'CH{j}']['B'] + data = (datas[:, i]**2*self.sensor_Cur_CalibParam[f'CH{j}']['K2'] + datas[:, i]*self.sensor_Cur_CalibParam[f'CH{j}']['K']+self.sensor_Cur_CalibParam[f'CH{j}']['B']).astype(np.float32) + self.reg_values.extend(list(self.feature_data[f'CH{j}'].values())) + filename = os.path.join(self.dataFileDir, f"{j:02}/{timeStr}_{rms:.3f}_{self.feature_type}_{self.daqBoardNo}_{j:02}_N_{self.sample_rate}_{self.sample_points}_0.000_{mean:.3f}_{self.min_vol_cur_phy_value}_{self.max_vol_cur_phy_value}_{self.scale}") + case '11': + # 计算振动大小 + data = datas[:, i] + if self.mode != 1: + for k, v in self.feature_data[f'CH{j}'].items(): + self.feature_data[f'CH{j}'][k] = v**2*self.sensor_Vib_CalibParam[f'CH{j}']['K2'] + v*self.sensor_Vib_CalibParam[f'CH{j}']['K'] + self.sensor_Vib_CalibParam[f'CH{j}']['B'] + data = (datas[:, i]**2*self.sensor_Vib_CalibParam[f'CH{j}']['K2'] + datas[:, i]*self.sensor_Vib_CalibParam[f'CH{j}']['K']+self.sensor_Vib_CalibParam[f'CH{j}']['B']).astype(np.float32) + self.feature_data[f'CH{j}']['rms'] = np.sqrt(np.mean(data**2)) + self.feature_data[f'CH{j}']['sr0'] = self.feature_data[f'CH{j}']['rms']*np.sqrt(2) + else: + self.feature_data[f'CH{j}']['sr0'] = self.feature_data[f'CH{j}']['std']*np.sqrt(2) + self.reg_values.extend(list(self.feature_data[f'CH{j}'].values())) + filename = os.path.join(self.dataFileDir, f"{j:02}/{timeStr}_{rms:.3f}_{self.feature_type}_{self.daqBoardNo}_{j:02}_N_{self.sample_rate}_{self.sample_points}_0.000_{mean:.3f}_{self.min_vol_cur_phy_value}_{self.max_vol_cur_phy_value}_{self.scale}") + case _: + pass + + # 将数据写入文件 + if self.file_type == 1: + filename += '.bin' + if _save_flag[i] == '1': + if isinstance(data, np.ndarray): + try: + temp_data = data.astype('>f4') + bytes_data = temp_data.tobytes() + with open(filename, 'wb', buffering=0) as f: + # 1. 正常写 + f.write(bytes_data) + + # 2. 告诉内核:整个文件以后大概率不读,页 cache 可以立即回收 + fd = os.open(filename, os.O_RDONLY) + try: + # POSIX_FADV_DONTNEED = 4 + os.posix_fadvise(fd, 0, 0, os.POSIX_FADV_DONTNEED) + finally: + os.close(fd) + del temp_data, bytes_data, data + finally: + if 'temp_data' in locals(): + del temp_data + if 'bytes_data' in locals(): + del bytes_data + self.logger.debug(f"Success to save data to {filename}.") + else: + filename += '.csv' + if _save_flag[i] == '1': + if isinstance(data, np.ndarray): + with open(filename, 'w') as f: + # 使用生成器表达式避免创建巨大列表 + lines = (f"{num:.4f}\n" for num in data) + f.writelines(lines) + self.logger.debug(f"Saved data to {filename}.") + del lines + self.warning_check() + data = None + datas = None + self.buffer = bytearray() + self._force_memory_cleanup() + except Exception as e: + self.logger.error(f"Error in save_data(): {e}") + + def warning_check(self): + """检查是否有报警条件""" + for i in range(self.channels): + ch = f'CH{i+1}' + val = self.feature_data[ch]['mean'] + wp = self.config.get('warning_param', {}) + enable_bits = f"{wp.get('enable', 0):016b}"[::-1] + if enable_bits[i] == '1': + low_limit = wp.get(ch, {}).get('lower', float('-inf')) + high_limit = wp.get(ch, {}).get('upper', float('inf')) + if val < low_limit or val > high_limit: + # self.logger.warning(f"Warning: {ch} value {val} out of limits ({low_limit}, {high_limit})") + self.warning_values[ch] = 1 + else: + self.warning_values[ch] = 0 + + def _force_memory_cleanup(self): + """强制内存清理""" + import gc + # 清除各种缓存 + if hasattr(np, 'getbufsize'): + np.setbufsize(32768) + + # 强制垃圾回收 + gc.collect() + gc.collect() # 两次确保回收 + + # 稍微等待让系统处理 + import time + time.sleep(0.01) + + def calculateFrequency(self, signal, oneSampleTime): + '''计算0-1变换的数组中0-1变化的次数, 并计算其频率''' + # oneSampleTime 单位为us + if len(signal) < 2: + return 0.0 # 信号太短无法计算频率 + transitions = 0 # 跳变次数计数器 + # 遍历数组计算跳变次数 + for i in range(1, len(signal)): + if signal[i] != signal[i-1]: + transitions += 1 + # 计算频率: + # 每个周期有2次跳变(0→1和1→0) + # 总时间 = 采样点数 / 采样率 + # 频率 = (跳变次数 / 2) / (总时间) + total_time = len(signal) * oneSampleTime / 1000000 + frequency = (transitions // 2) / total_time if total_time > 0 else 0.0 + return frequency + + def run(self): + """主运行循环""" + self.logger.info(f"Start DAQ thread.") + frame_size = self.config.get('frame_size_max', 1464) + FILESIZE = self.config.get('file_size', 32000000) + DATA_DIR = self.config.get('output_dir', 'data') + optFlag = 0 + # 清空接收缓存,并向DAQ模块发送启动采集指令 + self.buffer = b'' + self.sampleNum = 0 + self.stop_DAQ() + time.sleep(0.01) + self.start_DAQ() + lastFrameNo = 0 + cycles = 0 + while(True): + try: + data, addr = self.sock.recvfrom(frame_size+42) + # 如何返回了数据,数据起始符正确,包号正确,则存储数据 + if data: + data = data[42:] + nowFrameNo = int.from_bytes(data[4:8], 'big') + if nowFrameNo != lastFrameNo + 1: + print(f"Received data: len={len(data)}, lastFrameNo={lastFrameNo}, nowFrameNo={nowFrameNo}") + # self.logger.info(f"Received data: len={len(data)}, frame NO.={int.from_bytes(data[4:8], 'big')}") + else: + continue + # self.logger.debug(f"Head Data:(20 byte) ={' '.join(data[i:i+2].hex() for i in range(0, 20, 2))}") + + if optFlag == 0: + if data and data[0:4] == bytearray([0xa5, 0x5a, 0xa5, 0x5a]) and len(data) == frame_size and data[4:8] == bytearray([0x00, 0x00, 0x00, 0x01]): + self.buffer += data[24:] + self.sampleNum += ((len(data)-24)/32) + # if self.sensorType != int.from_bytes(data[8:12], 'big'): + # self.logger.error(f"In daq_thread(): SensorType in return data doesn't match with config.") + optFlag = 1 + elif optFlag == 1: + if data and data[0:4] == bytearray([0xa5, 0x5a, 0xa5, 0x5a]): + if len(data) != frame_size or data[5:8] == bytearray([0x00, 0x00, 0x01]): + if data[4:8] != bytearray([0x00, 0x00, 0x00, 0x01]): + self.buffer += data[24:] + self.sampleNum += ((len(data)-24)/32) + + cycles += 1 + self.save_data() + self.buffer = bytearray() + self.sampleNum = 0 + optFlag = 0 + else: + self.buffer += data[24:] + self.sampleNum += ((len(data)-24)/32) + + if nowFrameNo - lastFrameNo != 1 and lastFrameNo < nowFrameNo: + if lastFrameNo != 0: + pass + self.logger.warning(f"cycles= {cycles}, lastFrameNo={lastFrameNo}, nowFrameNo={nowFrameNo}") + # raise Exception() + if nowFrameNo != 0: + lastFrameNo = nowFrameNo + except Exception as e: + self.stop_DAQ() + self.sock.close() + self.running = False + self.buffer = bytearray() + self.logger.error(f"Error in daq_thread(): {e}") + self.logger.info(f"Stop DAQ thread.") + break + +class InfluxDBWriter: + def __init__(self, url="http://localhost:8086", token="", org="my-org", bucket="my-bucket"): + """ + 初始化 InfluxDB 客户端 + + 参数: + url: InfluxDB 地址,host模式下使用 http://localhost:8086 + token: API token,格式为 "username:password" 或 token字符串 + org: 组织名称 + bucket: 存储桶名称 + """ + self.client = InfluxDBClient(url=url, token=token, org=org) + self.write_api = self.client.write_api(write_options=SYNCHRONOUS) + self.bucket = bucket + self.org = org + + def write_sensor_data(self, measurement, tags, fields): + """ + 写入传感器数据到 InfluxDB + + 参数: + measurement: 测量名称 (类似表名) + tags: 标签字典,用于索引和分组 (如: {"device": "sensor1", "location": "factory"}) + fields: 字段字典,存储实际数据 (如: {"temperature": 25.6, "humidity": 60.2}) + """ + try: + # 创建数据点 + point = Point(measurement) + + # 添加标签 + for tag_key, tag_value in tags.items(): + point = point.tag(tag_key, tag_value) + + # 添加字段 + for field_key, field_value in fields.items(): + point = point.field(field_key, field_value) + + # 写入数据 + self.write_api.write(bucket=self.bucket, record=point) + print(f"[{datetime.now()}]数据写入成功: {point.to_line_protocol()}") + + except Exception as e: + print(f"写入数据时出错: {e}") + + def write_batch_data(self, points): + """ + 批量写入多个数据点 + """ + try: + self.write_api.write(bucket=self.bucket, record=points) + print(f"[{datetime.now()}]批量写入成功,共 {len(points)} 个数据点") + except Exception as e: + print(f"批量写入时出错: {e}") + + def close(self): + """关闭连接""" + self.client.close() + +class ModbusGateway: + def __init__(self): + # 初始化logger + config_file = 'src/config-1.2-debug.yaml' + config_file_temp = 'config-1.2-debugcopy.yaml' + with open('src/logging-config.json', 'r') as f: + logging.config.dictConfig(json.load(f)) + self.logger = logging.getLogger('PCM') + + self.config_manager = ConfigManager( + regs_config_file='src/regs-mapping-1.2-debug.yaml', + config_file=config_file, + logger=self.logger) + + self.taskInfo = {'status':0x0000, 'running_time':0, 'period':0} + self.taskInfo['period'] = self.config_manager.config['task']['period']*60 + + # 初始化influxdb client + self.influx_client = InfluxDBWriter( + url=self.config_manager.config['influxdb'].get('url', 'http://localhost:8086'), + token=self.config_manager.config['influxdb'].get('token', 'PCM:1842moon'), + org=self.config_manager.config['influxdb'].get('org', 'MEASCON'), + bucket=self.config_manager.config['influxdb'].get('bucket', 'PCM') + ) + + # 连接plc server + self.plc_host = self.config_manager.config['plc-server'].get('host', '172.22.0.3') + self.plc_port = self.config_manager.config['plc-server'].get('port', 5020) + self.plc_client = ModbusTcpClient(self.plc_host, port=self.plc_port) + self.plc_measurements = self.config_manager.config['plc-server'].get('measurements', {}) + + # 创建本地modbus tcp服务器 + self.max_address = 1300 + self.host = self.config_manager.config['modbus-server']['host'] + self.port = self.config_manager.config['modbus-server']['port'] + + self.holding_registers = ModbusSequentialDataBlockForPCM(self.config_manager, self.logger, 0x00, [0]*(self.max_address+1)) + + # 创建数据存储 + store = ModbusSlaveContext( + di=ModbusSequentialDataBlock(0, [0]*1), + co=ModbusSequentialDataBlock(0, [0]*1), + # hr=ModbusSequentialDataBlock(0, [0]*1000), + hr=self.holding_registers, + ir=ModbusSequentialDataBlock(0, [0]*1)) + + self.context = ModbusServerContext(slaves={1:store}, single=False) + + # 启动服务器线程 + self.modbus_sever = threading.Thread( + target=StartTcpServer, + # kwargs={"context": self.context, "address": (self.host, self.port)}) + kwargs={"context": self.context, "address": ('0.0.0.0', self.port)}) + self.modbus_sever.daemon = True + self.modbus_sever.start() + self.logger.info(f"Local modbusTCP service starts, IP={self.host}, port={self.port}") + + self.gps = GPS(self.config_manager.config['gps'], self.logger) + self.breaker = Breaker(self.config_manager.config['breaker'], self.logger) + self.lsdaq = LSDAQ(self.config_manager.config['lsdaq'], self.logger) + self.hsdaq = HSDAQ(self.config_manager.config['hsdaq'], self.logger) + + self.gps_thread = threading.Thread(target=self.gps.run) + self.gps_thread.daemon = True + self.gps_thread.start() + + self.breaker_thread = threading.Thread(target=self.breaker.run) + self.breaker_thread.daemon = True + self.breaker_thread.start() + + self.lsdaq_thread = threading.Thread(target=self.lsdaq.run) + self.lsdaq_thread.daemon = True + self.lsdaq_thread.start() + + self.hsdaq_thread = threading.Thread(target=self.hsdaq.run) + self.hsdaq_thread.daemon = True + self.hsdaq_thread.start() + + # 启动配置服务(HTTP API) + config_service_port = self.config_manager.config.get('config-server', {}).get('port', 5000) + config_service_host = self.config_manager.config.get('config-server', {}).get('host', '127.0.0.1') + + # 如果配置文件中指定了配置文件路径,使用它;否则使用默认的YAML配置文件 + config_service_config_path = self.config_manager.config.get('config-server', {}).get('config_path', config_file_temp) + + self.config_service = ConfigService( + default_config_path=config_service_config_path, + host=config_service_host, + port=config_service_port, + debug=False, + logger=self.logger, + ) + self.config_service.start() + self.logger.info(f"Config service started on {config_service_host}:{config_service_port}") + + # 任务状态持久化文件路径(独立文件,不会被外部覆盖) + self.task_state_file = '.task_state.json' + + def _load_task_running_time(self) -> float: + """ + 从独立的持久化文件加载任务累计运行时间 + + Returns: + float: 累计运行时间(秒),文件不存在时返回0 + """ + try: + if not os.path.exists(self.task_state_file): + self.logger.info(f"Task state file not found: {self.task_state_file}, starting from 0") + return 0 + + with open(self.task_state_file, 'r', encoding='utf-8') as f: + data = json.load(f) + + running_time = data.get('running_time', 0) + if running_time > 0: + self.logger.info(f"Loaded task running_time from {self.task_state_file}: {running_time:.2f}s") + return float(running_time) + + except json.JSONDecodeError as e: + self.logger.error(f"Task state file corrupted: {e}, starting from 0") + return 0 + except Exception as e: + self.logger.error(f"Error loading task running_time: {e}, starting from 0") + return 0 + + def _save_task_running_time(self, running_time: float) -> bool: + """ + 保存任务累计运行时间到独立的持久化文件(原子写入) + + Args: + running_time: 累计运行时间(秒) + + Returns: + bool: 保存成功返回 True,失败返回 False + """ + try: + data = { + 'running_time': running_time, + 'last_update': datetime.now().isoformat(), + 'version': '1.0' + } + + # 原子写入:先写临时文件,再重命名 + temp_file = self.task_state_file + '.tmp' + with open(temp_file, 'w', encoding='utf-8') as f: + json.dump(data, f, indent=2, ensure_ascii=False) + + # 重命名(原子操作) + os.replace(temp_file, self.task_state_file) + return True + + except Exception as e: + self.logger.error(f"Error saving task running_time: {e}") + # 清理临时文件 + try: + temp_file = self.task_state_file + '.tmp' + if os.path.exists(temp_file): + os.remove(temp_file) + except: + pass + return False + + def _reset_task_running_time(self) -> bool: + """ + 重置任务累计运行时间为0 + + Returns: + bool: 重置成功返回 True,失败返回 False + """ + self.logger.info("Resetting task running_time to 0") + return self._save_task_running_time(0) + + def _update_modbus_datas(self): + """更新本地Modbus服务器的寄存器""" + if not hasattr(self, 'context'): + self.logger.error("Local modbus tcp service isn't initilized.") + return + + try: + # 获取本地服务器的slave上下文 + slave = self.context[1] + holding_registers = self.holding_registers + + gps_reg_values = list(self.gps.gps_data.values()) + # print(f"gps_reg_values:{gps_reg_values}") + for i in range(len(gps_reg_values)): + holding_registers.server_set_values(2*i+1, float_to_registers(gps_reg_values[i])) # type: ignore + + breaker_reg_values = list(self.breaker.reg_values.values()) + # print(f"breaker_reg_values:{breaker_reg_values}") + for i in range(len(breaker_reg_values)): + holding_registers.server_set_values(1220+i+1, int(breaker_reg_values[i])) # type: ignore + + lsdaq_reg_values = list(self.lsdaq.reg_values.values()) + # print(f"lsdaq_reg_values:{lsdaq_reg_values}") + for i in range(len(lsdaq_reg_values)): + holding_registers.server_set_values(8+2*i+1, float_to_registers(lsdaq_reg_values[i])) # type: ignore + + lsdaq_warning_values = list(self.lsdaq.warning_values.values()) + # print(f"lsdaq_warning_values:{lsdaq_warning_values}") + for i in range(len(lsdaq_warning_values)): + holding_registers.server_set_values(1129+i+1, int(lsdaq_warning_values[i])) # type: ignore + + hsdaq_reg_values = self.hsdaq.reg_values + # print(f"hsdaq_reg_values:{hsdaq_reg_values}") + # print(f"len = {len(hsdaq_reg_values)}") + for i in range(len(hsdaq_reg_values)): + holding_registers.server_set_values(60+2*i+1, float_to_registers(hsdaq_reg_values[i])) # type: ignore + + hsdaq_warning_values = list(self.hsdaq.warning_values.values()) + # print(f"hsdaq_warning_values:{hsdaq_warning_values}") + for i in range(len(hsdaq_warning_values)): + holding_registers.server_set_values(1145+i+1, int(hsdaq_warning_values[i])) # type: ignore + + # 更新从plc采集到的数据 + # packed_data = bytearray() + # for k, v in self.plc_measurements.items(): + # packed_data.extend(struct.pack('>fHffH', v['value'], v['warning_param']['enable'], v['warning_param']['lower'], v['warning_param']['upper'], v['warning'])) + # register_values = [] + # for i in range(0, len(packed_data), 2): + # if i + 1 < len(packed_data): + # # 组合两个字节为一个16位整数 + # value = (packed_data[i] << 8) | packed_data[i + 1] + # else: + # # 如果字节数为奇数,最后一个字节补0 + # value = packed_data[i] << 8 + # register_values.append(value) + # holding_registers.server_set_values(1161+1, register_values) + + # 读取CPU各温区温度 + thermal_zones = 5 + self.cpu_temperatures = {} + for i in range(thermal_zones): + with open(f"/sys/class/thermal/thermal_zone{i}/temp", 'r') as f: + # 读取温度值(毫摄氏度) + temp_millic = int(f.read().strip()) + # 转换为摄氏度 + temp_c = temp_millic / 1000.0 + self.cpu_temperatures[f'zone{i}'] = temp_c + + holding_registers.server_set_values(50+2*i+1, float_to_registers(temp_c)) # type: ignore + + except Exception as e: + self.logger.error(f"Error in _update_modbus_datas(): {e}") + + def _write_to_influxdb(self): + if not self.influx_client: + self.logger.error("Influxdb isn't initilized. Try to initilize after 1 seconds.") + time.sleep(1) + if hasattr(self, 'influx_client'): + self.influx_client.close() + self.influx_client = InfluxDBClient( + url=self.config_manager.config['influxdb'].get('url', 'http://localhost:8086'), + token=self.config_manager.config['influxdb'].get('token', 'PCM:1842moon'), + org=self.config_manager.config['influxdb'].get('org', 'MEASCON'), + ) + self.write_api = self.influx_client.write_api(write_options=SYNCHRONOUS) + try: + points = [] + # 构建toradex核心板CPU温度 + point = Point("PCM_Measurement") + point.tag("data_type", 'cpu_temperatures') + for field_name, field_value in self.cpu_temperatures.items(): + point.field(field_name, float(field_value)) + points.append(point) + + # 构建GPS采集数据 + point = Point("PCM_Measurement") + point.tag("data_type", 'GPS') + for field_name, field_value in self.gps.gps_data.items(): + point.field(field_name, float(field_value)) + points.append(point) + + # 构建Breaker采集数据 + point = Point("PCM_Measurement") + point.tag("data_type", 'Breaker') + for field_name, field_value in self.breaker.reg_values.items(): + point.field(field_name, float(field_value)) + points.append(point) + + # 构建TaskInfo采集数据 + point = Point("PCM_Measurement") + point.tag("data_type", 'TaskInfo') + print(f"taskInfo = {self.taskInfo}") + for field_name, field_value in self.taskInfo.items(): + if field_value == None: + field_value = 0 + point.field(field_name, float(field_value)) + points.append(point) + + # 构建低速采集数据 + point = Point("PCM_Measurement") + point.tag("data_type", 'LSDAQ') + # for field_name, field_value in self.lsdaq.reg_values.items(): + # point.field(field_name, float(field_value)) + # for field_name, field_value in self.lsdaq.warning_values.items(): + # point.field(field_name+'.warning', field_value) + for field_name, field_value in self.lsdaq.reg_values.items(): + if field_name in self.lsdaq.alias and self.lsdaq.alias.get(field_name) != '': + point.field(self.lsdaq.alias[field_name], float(field_value)) + for field_name, field_value in self.lsdaq.warning_values.items(): + if field_name in self.lsdaq.alias and self.lsdaq.alias.get(field_name) != '': + point.field(self.lsdaq.alias[field_name]+'.warning', field_value) + points.append(point) + + # 构建高速采集数据 + for i in range(16): + # print(str(self.hsdaq.feature_data[f'CH{i}'])) + j = i + 1 + point = Point("PCM_Measurement") + # point.tag("data_type", f'HSDAQ_CH{j}') + if self.hsdaq.alias.get(f'CH{j}') != '': + point.tag("data_type", self.hsdaq.alias[f'CH{j}']) + for field_name, field_value in self.hsdaq.feature_data[f'CH{j}'].items(): + point.field(field_name, float(field_value)) + point.field('warning', self.hsdaq.warning_values[f'CH{j}']) + points.append(point) + + # 构建从PLC采集到的数据 + point = Point("PCM_Measurement") + point.tag("data_type", f'PLC') + for k, v in self.plc_measurements.items(): + point.field(k, float(v['value'])) + points.append(point) + + # 将数据点写入influxdb + self.influx_client.write_batch_data(points) + + except Exception as e: + self.logger.error(f"Error in _write_to_influxdb(): {e}") + + def _read_plc_datas(self): + if not self.plc_client: + self.plc_client = ModbusTcpClient(self.plc_host, port=self.plc_port) + if self.plc_client.connect() and self.plc_measurements: + for k, v in self.plc_measurements.items(): + ret = self.plc_client.read_holding_registers(address=v['address'], count=2, slave=1) + print(f"{ret}") + if (not ret.isError()) and len(ret.registers) == 2: + self.plc_measurements[k]['value'] = registers_to_float(ret.registers, byte_order='ABCD') + if v['warning_param']['enable'] == 1: + low_limit = v['warning_param']['lower'] + high_limit = v['warning_param']['upper'] + val = self.plc_measurements[k]['value'] + if val < low_limit or val > high_limit: + self.plc_measurements[k]['warning'] = 1 + else: + self.plc_measurements[k]['warning'] = 0 + else: + self.plc_measurements[k]['warning'] = 0 + + def run(self): + """主运行循环""" + timestamp = time.time() + task_control_reg_addr = self.config_manager.config['task']['control_reg_addr'] + task_control_reg_value = 0x0000 + self.taskInfo['start_time'] = None + + # 从配置文件恢复运行时间(如果有) + saved_running_time = self._load_task_running_time() + if saved_running_time > 0: + self.taskInfo['running_time'] = saved_running_time + self.logger.info(f"Restored running_time from config: {self.taskInfo['running_time']:.2f}s") + else: + self.taskInfo['running_time'] = 0 + + # 持久化保存相关变量 + last_save_time = time.time() + SAVE_INTERVAL = 5 # 每5秒保存一次 + last_running_time = self.taskInfo['running_time'] # 用于检测变化 + + while True: + self.gps.config = self.config_manager.config['gps'] + self.lsdaq.config = self.config_manager.config['lsdaq'] + self.lsdaq.update_config() + self.hsdaq.config = self.config_manager.config['hsdaq'] + self.hsdaq.update_config() + self.taskInfo['status'] = self.holding_registers.values[task_control_reg_addr+1] + match self.taskInfo['status']: + case 0x0000: + self.breaker.openBreaker() + # 实验停止时重置运行时间 + if self.taskInfo['running_time'] > 0: + self._reset_task_running_time() + self.taskInfo['start_time'] = None + self.taskInfo['running_time'] = 0 + case 0x5555: + self.breaker.closeBreaker() + if self.breaker.reg_values['load_status'] == 1: + current_time = time.time() + self.taskInfo['running_time'] += ( current_time - self.taskInfo['start_time'] ) + self.taskInfo['start_time'] = current_time + else: + self.taskInfo['start_time'] = time.time() + + if self.taskInfo['running_time'] > self.taskInfo['period']: + # 实验完成时重置运行时间 + self._reset_task_running_time() + self.holding_registers.values[task_control_reg_addr+1] = 0xFFFF + continue + + if 1 in self.lsdaq.warning_values.values() or 1 in self.hsdaq.warning_values.values(): + self.breaker.alarming() + self.holding_registers.values[task_control_reg_addr+1] = 0xFFFF + continue + else: + self.breaker.unalarming() + case 0xAAAA: + self.breaker.openBreaker() + self.taskInfo['start_time'] = time.time() + case 0xFFFF: + self.holding_registers.values[task_control_reg_addr+1] = 0x0000 + + # time.sleep(0.01) + nowtime = time.time() + # print(f"{timestamp}, {nowtime}") + if (nowtime-timestamp) > 1: + timestamp = nowtime + #将数据写入influxdb + self._write_to_influxdb() + # self._read_plc_datas() + self._update_modbus_datas() + + # 周期性保存运行时间(仅在运行状态且时间有变化时保存) + if self.taskInfo['status'] == 0x5555: + if (nowtime - last_save_time) >= SAVE_INTERVAL: + if self.taskInfo['running_time'] != last_running_time: + self._save_task_running_time(self.taskInfo['running_time']) + last_running_time = self.taskInfo['running_time'] + last_save_time = nowtime + + # 控制寄存器 + + +if __name__ == "__main__": + gateway = ModbusGateway() + gateway.run() \ No newline at end of file diff --git a/pcm-influxdb-debug.py b/pcm-influxdb-debug.py index cced648..434f04a 100644 --- a/pcm-influxdb-debug.py +++ b/pcm-influxdb-debug.py @@ -11,15 +11,31 @@ from influxdb_client import InfluxDBClient, Point from influxdb_client.client.write_api import SYNCHRONOUS from config_service import ConfigService -def checkValue(data: bytes) -> int: +def checkValue(data, little_endian=True): + """ + 计算Modbus CRC16校验和 + 参数: + data: 字节串或字节数组 + little_endian: 是否使用小端字节序,默认为False(大端) + 返回: + CRC16值 (2字节,小端字节序) + """ crc = 0xFFFF - length = len(data) - if length % 2 != 0: - return 0 - for i in range(0, length, 2): - val = data[i] * 256 + data[i + 1] - crc = crc ^ val - return crc + for byte in data: + crc ^= byte + for _ in range(8): + if crc & 0x0001: + crc = (crc >> 1) ^ 0xA001 + else: + crc = crc >> 1 + if little_endian: + # 小端字节序:低位在前,高位在后 + low_byte = crc & 0xFF + high_byte = (crc >> 8) & 0xFF + return (low_byte << 8) | high_byte + else: + # 大端字节序:高位在前,低位在后 + return crc & 0xFFFF def nowStr(): now = datetime.now() @@ -216,15 +232,6 @@ class ConfigManager: # self.observer.stop() # self.observer.join() -# class ConfigFileHandler(FileSystemEventHandler): -# def __init__(self, config_manager): -# self.config_manager = config_manager - -# def on_modified(self, event): -# if Path(event.src_path) == self.config_manager.config_file: -# logging.info("Config file modified, reloading...") -# self.config_manager.load_all_configs() - class BidirectionalMap: def __init__(self): self.key_to_address = {} # 配置键 -> (地址, 类型) @@ -357,26 +364,55 @@ class ModbusSequentialDataBlockForPCM(ModbusSequentialDataBlock): super().setValues(address, values) return + super().setValues(address, values) + # Handle client writes updated = False - for i, value in enumerate(values): - reg_addr = address-1 + print(f"*************************address={address}, values={values}*************************") + reg_addr = address - 1 + # print(f"values = {values}") + # path = self.config_manager.mapping.get_config_keys(reg_addr) + # print(f"*************************{path}:{reg_addr}:{values}********************") + # if self.config_manager.update_config_value(path[0], value[0]): + # updated = True + + regCount = len(values) + while(regCount > 0): path = self.config_manager.mapping.get_config_keys(reg_addr) - print(f"{path}:{reg_addr}") - if self.config_manager.update_config_value(path[0], value): - updated = True - - super().setValues(address, values) - + print(f"*************************{path}, {reg_addr}, {regCount}*************************") + dataType = self.config_manager.mapping.key_to_address[path[0]][1] + print(f"*************************{path}, {dataType}, {regCount}*************************") + if len(path) > 0: + if '16' in dataType: + print(f"*************************{path}:{reg_addr}:{values[0]}:{regCount}********************") + if dataType in ['int16', 'uint16']: + self.config_manager.update_config_value(path[0], int(values[0])) + regCount -= 1 + reg_addr += 1 + values = values[1:] + elif '32' in dataType: + print(f"*************************{path}:{reg_addr}:{values[0:2]}:{regCount}********************") + if dataType in ['int32', 'uint32']: + self.config_manager.update_config_value(path[0], (values[0]<<16)+values[1]) + elif dataType == 'float32': + self.config_manager.update_config_value(path[0], registers_to_float(values)) + regCount -= 2 + reg_addr += 2 + values = values[2:] + else: + regCount -= 1 + reg_addr += 1 + if updated: self.config_manager.save_config() self.logger.debug(f"Register {address} update triggered configuration change") def server_set_values(self, address, values): """Server-only write method that won't trigger YAML update""" - self._is_client_write = False - self.setValues(address, values) - self._is_client_write = True + # self._is_client_write = False + # self.setValues(address, values) + # self._is_client_write = True + super().setValues(address, values) class LSDAQ: def __init__(self, config:dict, logger): @@ -405,7 +441,7 @@ class LSDAQ: if self.port != '/dev/ttyLP3': self.status = -201 self.baudrate = config.get('baudrate', 115200) - self.timeout = config.get('timeout', 1) + self.timeout = config.get('timeout', 50)/1000.0 self.mode = config.get('mode', 0) self.channels = config.get('channels', 16) if self.mode not in [0, 1]: @@ -687,12 +723,261 @@ class LSDAQ: self.close() self.logger.info("Modbus Serial TCP Client stopped.") +class Breaker: + def __init__(self, config:dict, logger): + # 加载配置参数 + ''' self.errorCode 码表 + 0x0001 打开/dev/ttyUSB0设备失败 + 0x0101 与断路器通讯失败 + ''' + self.errorCode = 0 + ''' self.load_status 码表 + 0x00 负载不在线 + 0x0101 负载在线 + ''' + self.load_status = 0 + self.config = config + self.logger = logger + self.port = config.get('port', '/dev/ttyUSB0') + self.baudrate = config.get('baudrate', 9600) + self.timeout = config.get('timeout', 50)/1000.0 + self.task_start_threshold = config.get('task_start_threshold', 2000) + self.task_stop_threshold = config.get('task_stop_threshold', 2000) + self.locked = 0 + self.closed = 0x0F # 0x0F:分闸 0xF0:合闸 + self.reasonForLastOpen = 15 # F:无 1:过流 2:漏电 3:过温 4:过载 5:过压 6:欠压 7:远程 8:模组 9:失压 A:锁扣 B:限电 0: 本地 + self.active_powers = [] + self.duration = config.get('duration', 5) + self.alarm = 0 + + self.active_power = 0 # 有功功率,单位:W + + OVV = config.get('OVV', 275) + UVV = config.get('UVV', 150) + OCV = config.get('OCV', 10000) + LCV = config.get('LCV', 30) + OTV = config.get('OTV', 80) + OPV = config.get('OPV', 13000) + OVT = config.get('OVT', 0) + UVT = config.get('UVT', 0) + OCT = config.get('OCT', 0) + LCT = config.get('LCT', 200) + OTT = config.get('OTT', 200) + OPT = config.get('OPT', 100) + + self.reg_values = { + 'locked': 0, + 'closed': 0x0F, + 'reasonForLastOpen': 0x0F, + 'alarm': 0, + 'active_power': 0, + 'load_status': 0 + } + # 构建指令集 + self.cmdList = { + # 指令格式:指令描述,指令字符串,回复长度,超时时间,发送校验标志,接收校验标志,重试次数 + 'readAllDatas': ['', f"0204 0000 0027", 83, 300, 1, 1, 3], + 'readOverLimitValues': ['', f"0203 0002 0006", 17, 200, 1, 1, 3], + 'readOverLimitActionTime': ['', f"0203 0010 0006", 17, 200, 1, 1, 3], + 'setOverLimitValues': ['', f"0210 0002 0006 0C {OVV:04X} {UVV:04X} {OCV:04X} {LCV:04X} {OTV:04X} {OPV:04X}", 8, 100, 1, 1, 3], + 'setOverLimitActionTime': ['', f"0210 0010 0006 0C {OVT:04X} {UVT:04X} {OCT:04X} {LCT:04X} {OTT:04X} {OPT:04X}", 8, 100, 1, 1, 3], + 'closeBreaker': ['', f"0210 000D 0001 02 FF00", 8, 100, 1, 1, 3], + 'openBreaker': ['', f"0210 000D 0001 02 0000", 8, 100, 1, 1, 3], + 'turnOnGreen': ['', f"0105 0002 FF00", 8, 100, 1, 1, 3], + 'turnOffGreen': ['', f"0105 0002 0000", 8, 100, 1, 1, 3], + 'turnOnRed': ['', f"0105 0008 FF00", 8, 100, 1, 1, 3], + 'turnOffRed': ['', f"0105 0000 0000", 8, 100, 1, 1, 3], + 'turnOnAlarm': ['', f"0105 00A1 FF00", 8, 100, 1, 1, 3], + 'turnOffAlarm': ['', f"0105 00A1 0000", 8, 100, 1, 1, 3] + } + + self.optFlag = 0 + self.logger.info(f"Breader routine inspection started.") + + def update_config(self): + pass + + def exeCmd(self, cmdName) -> list: # type: ignore + try: + info = '' + cmd = self.cmdList.get(cmdName, None) + if cmd is None: + return [False, None, f"Command {cmdName} not found in cmdList."] + retry = 0 + data = bytearray().fromhex(cmd[1]) + if (cmd[4] == 1): + data += bytearray(checkValue(data[0:]).to_bytes(2, 'big')) + if len(cmd) >= 7: + RETRYTIMES = int(cmd[6]) + else: + RETRYTIMES = 1 + while (retry < RETRYTIMES): + info += f"[{nowStr()}] Sent:{wordData2HexStr(data)}\n" + recvData = bytearray() + self.serial.write(data) + time.sleep(int(cmd[3])/1000.0) + recvData = self.serial.read(int(cmd[2])) + info += (f"[{nowStr()}] Echo:{wordData2HexStr(recvData)}\n") + rspLen = int(cmd[2]) + if len(recvData) >= rspLen: + if recvData[0:2] == bytearray().fromhex(cmd[1][0:4]): + # info += f"[{nowStr()}] Echo:{wordData2HexStr(recvData[0:rspLen])}\n" + recvData = recvData[0:rspLen] + if (cmd[5] == 1): + crc = int.from_bytes(recvData[rspLen-2:rspLen], byteorder='big') + calc_value = checkValue(recvData[0:rspLen-2]) + # info += f"{crc:04X}, {calc_value:04X}\n" + if crc == calc_value: + # self.logger.info(info) + return [True, recvData, info] + else: + # self.logger.info(info) + return [True, recvData, info] + retry += 1 + # self.logger.info(info) + return [False, None, info] + except Exception as e: + info += f"[{nowStr()}] Error in exeCmd({cmd}): {str(e)}\n" # type: ignore + # self.logger.info(info) + return [False, None, info] + + def parseData(self, cmdName, rawData): + try: + match cmdName: + case 'readAllDatas': + rawData = rawData[3:-2] + self.locked = rawData[0] + self.closed = rawData[1] + self.reasonForLastOpen = (rawData[6]&0xF0)>>4 + self.active_power = int.from_bytes(rawData[68:70], byteorder='big') + self.active_powers.append(self.active_power) + + if len(self.active_powers) > self.duration * 2: + self.active_powers = self.active_powers[1:] + if np.mean(self.active_powers) > self.task_start_threshold: + self.load_status = 1 + if np.mean(self.active_powers) < self.task_stop_threshold: + self.load_status = 0 + + + self.reg_values['locked'] = self.locked + self.reg_values['closed'] = self.closed + self.reg_values['reasonForLastOpen'] = self.reasonForLastOpen + self.reg_values['alarm'] = self.alarm + self.reg_values['active_power'] = self.active_power + self.reg_values['load_status'] = self.load_status + + print(f"breaker: {self.reg_values}") + + case 'closeBreaker': + pass + case 'openBreaker': + pass + case _: + pass + except Exception as e: + self.logger.error(f"[{nowStr()}] Error in Breaker: parseData({cmdName}): {str(e)}\n") + + def openBreaker(self): + if self.reg_values['closed'] == 0xF0: + self.optFlag = 2 + + def closeBreaker(self): + if self.reg_values['closed'] == 0x0F: + self.optFlag = 3 + + def alarming(self): + if not self.alarm and self.closed & 0xFF == 0xF0: + self.exeCmd('turnOffGreen') + self.exeCmd('turnOnRed') + self.exeCmd('turnOnAlarm') + + def unalarming(self): + if self.alarm: + if self.closed & 0xFF == 0xF0: + self.exeCmd('turnOnGreen') + self.exeCmd('turnOffRed') + self.exeCmd('turnOffAlarm') + else: + self.exeCmd('turnOffGreen') + self.exeCmd('turnOffRed') + self.exeCmd('turnOffAlarm') + + def open(self): + """打开串口连接""" + self.serial = serial.Serial(self.port, self.baudrate, timeout=self.timeout) + if not self.serial.is_open: + self.errorCode = 0x0001 + return -1 + else: + self.errorCode = 0 + return 0 + + def close(self): + if self.serial.is_open: + self.serial.close() + + def run(self): + """主运行循环""" + try: + while True: + # self.logger.info(f"optFlag={self.optFlag}") + match self.optFlag: + case 0: + if self.open() == 0: + ret0 = self.exeCmd('openBreaker') + # self.logger.info(f"setOverLimitValues ret: {ret0}") + ret1 = self.exeCmd('setOverLimitValues') + # self.logger.info(f"setOverLimitValues ret: {ret1}") + ret2 = self.exeCmd('readOverLimitValues') + self.logger.info(f"readOverLimitValues ret: {ret2}") + ret3 = self.exeCmd('setOverLimitActionTime') + # self.logger.info(f"setOverLimitValues ret: {ret3}") + ret4 = self.exeCmd('readOverLimitActionTime') + self.logger.info(f"readOverLimitActionTime ret: {ret4}") + if ret0[0] and ret1[0] and ret3[0]: + self.optFlag = 1 + continue + self.optFlag = -1 + self.errorCode = 0x0101 + case 1: + time.sleep(0.2) + ret = self.exeCmd('readAllDatas') + self.logger.info(f"readAllDatas ret: {wordData2HexStr(ret[1])}") + if ret[0]: + self.parseData('readAllDatas', ret[1]) + continue + self.optFlag = -1 + self.errorCode = 0x0101 + case 2: + ret = self.exeCmd('openBreaker') + if ret[0]: + self.optFlag = 1 + continue + self.optFlag = -1 + self.errorCode = 0x0101 + case 3: + ret = self.exeCmd('closeBreaker') + if ret[0]: + self.optFlag = 1 + continue + self.optFlag = -1 + self.errorCode = 0x0101 + case _: + time.sleep(1) + self.close() + self.optFlag = 0 + except Exception as e: + self.close() + self.logger.info(f"Error in Breader: run(), {e}") + class GPS: def __init__(self, config:dict, logger): self.status = -1 self.logger = logger + self.config = config self.port = config.get('port', '/dev/ttyLP4') - if self.port != '/dev/ttyLP3': + if self.port != '/dev/ttyLP4': self.status = -201 self.baudrate = config.get('baudrate', 9600) self.timeout = config.get('timeout', 1) @@ -780,9 +1065,12 @@ class HSDAQ: try: self.config = config self.logger = logger - result = subprocess.run(["ip","neigh","add", "192.168.0.2", "lladdr","00:0A:35:01:FE:C0", "dev", "ethernet0"], capture_output=True, text=True, encoding="gbk") + result = subprocess.run(["ip","neigh","add", "192.168.0.2", "lladdr","00:0A:35:01:FE:C0", "dev", "ethernet0"], capture_output=True, text=True, encoding="utf-8") if result.returncode != 0: self.logger.info(result.stderr) + # result = subprocess.run(["sudo","ethtool","-s", "ethernet0", "speed", "100", "duplex", "full", "autoneg", "off"], capture_output=True, text=True, encoding="utf-8") + # if result.returncode != 0: + # self.logger.info(result.stderr) # 设置允许强制修改缓存区大小 self.sock = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.ntohs(0x0003)) @@ -1041,15 +1329,26 @@ class HSDAQ: else: _data = datas[:,i] + _data = _data[0:20] log_data = np.log(np.abs(_data) + 1e-300) log_mean_squared = 2 * np.mean(log_data) + np.log(len(_data)) _rms = np.exp(0.5 * log_mean_squared) / self.scale # _rms = np.sqrt(np.mean(_data**2))/self.scale + _min = np.min(_data) + _max = np.max(_data) + _mean = np.mean(_data) + + # if (_max - _mean) * 5 < (_mean - _min): + # _mean = np.mean(_data[0:20]) + # _min = np.min(_data[0:20]) + # _max = np.max(_data[0:20]) + # _rms = np.sqrt(np.mean(np.square(_data[0:20]))) + self.feature_data[f'CH{j}'] = { - 'min': np.min(_data)/self.scale, - 'max': np.max(_data)/self.scale, - 'mean': np.mean(_data)/self.scale, + 'min': _min/self.scale, + 'max': _max/self.scale, + 'mean': _mean/self.scale, 'std': np.std(_data)/self.scale, 'rms': _rms, 'sr0': 0.0, @@ -1331,7 +1630,10 @@ class ModbusGateway: regs_config_file='src/regs-mapping-1.2-debug.yaml', config_file=config_file, logger=self.logger) - + + self.taskInfo = {'status':0x0000, 'running_time':0, 'period':0} + self.taskInfo['period'] = self.config_manager.config['task']['period']*60 + # 初始化influxdb client self.influx_client = InfluxDBWriter( url=self.config_manager.config['influxdb'].get('url', 'http://localhost:8086'), @@ -1342,12 +1644,12 @@ class ModbusGateway: # 连接plc server self.plc_host = self.config_manager.config['plc-server'].get('host', '172.22.0.3') - self.plc_port = self.config_manager.config['plc-server'].get('port', 502) + self.plc_port = self.config_manager.config['plc-server'].get('port', 5020) self.plc_client = ModbusTcpClient(self.plc_host, port=self.plc_port) self.plc_measurements = self.config_manager.config['plc-server'].get('measurements', {}) # 创建本地modbus tcp服务器 - self.max_address = 1200 + self.max_address = 1300 self.host = self.config_manager.config['modbus-server']['host'] self.port = self.config_manager.config['modbus-server']['port'] @@ -1373,6 +1675,7 @@ class ModbusGateway: self.logger.info(f"Local modbusTCP service starts, IP={self.host}, port={self.port}") self.gps = GPS(self.config_manager.config['gps'], self.logger) + self.breaker = Breaker(self.config_manager.config['breaker'], self.logger) self.lsdaq = LSDAQ(self.config_manager.config['lsdaq'], self.logger) self.hsdaq = HSDAQ(self.config_manager.config['hsdaq'], self.logger) @@ -1380,6 +1683,10 @@ class ModbusGateway: self.gps_thread.daemon = True self.gps_thread.start() + self.breaker_thread = threading.Thread(target=self.breaker.run) + self.breaker_thread.daemon = True + self.breaker_thread.start() + self.lsdaq_thread = threading.Thread(target=self.lsdaq.run) self.lsdaq_thread.daemon = True self.lsdaq_thread.start() @@ -1387,15 +1694,11 @@ class ModbusGateway: self.hsdaq_thread = threading.Thread(target=self.hsdaq.run) self.hsdaq_thread.daemon = True self.hsdaq_thread.start() - - # diagnostics for write calls - self._write_call_count = 0 - self._last_write_ts = None # 启动配置服务(HTTP API) config_service_port = self.config_manager.config.get('config-server', {}).get('port', 5000) config_service_host = self.config_manager.config.get('config-server', {}).get('host', '127.0.0.1') - config_serial_port = self.config_manager.config.get('config-server', {}).get('serial_port', 'ttyUSB0') + # 如果配置文件中指定了配置文件路径,使用它;否则使用默认的YAML配置文件 config_service_config_path = self.config_manager.config.get('config-server', {}).get('config_path', config_file_temp) @@ -1405,7 +1708,6 @@ class ModbusGateway: port=config_service_port, debug=False, logger=self.logger, - serial_port=config_serial_port ) self.config_service.start() self.logger.info(f"Config service started on {config_service_host}:{config_service_port}") @@ -1426,6 +1728,11 @@ class ModbusGateway: for i in range(len(gps_reg_values)): holding_registers.server_set_values(2*i+1, float_to_registers(gps_reg_values[i])) # type: ignore + breaker_reg_values = list(self.breaker.reg_values.values()) + # print(f"breaker_reg_values:{breaker_reg_values}") + for i in range(len(breaker_reg_values)): + holding_registers.server_set_values(1220+i+1, int(breaker_reg_values[i])) # type: ignore + lsdaq_reg_values = list(self.lsdaq.reg_values.values()) # print(f"lsdaq_reg_values:{lsdaq_reg_values}") for i in range(len(lsdaq_reg_values)): @@ -1448,19 +1755,19 @@ class ModbusGateway: holding_registers.server_set_values(1145+i+1, int(hsdaq_warning_values[i])) # type: ignore # 更新从plc采集到的数据 - packed_data = bytearray() - for k, v in self.plc_measurements.items(): - packed_data.extend(struct.pack('>fHffH', v['value'], v['warning_param']['enable'], v['warning_param']['lower'], v['warning_param']['upper'], v['warning'])) - register_values = [] - for i in range(0, len(packed_data), 2): - if i + 1 < len(packed_data): - # 组合两个字节为一个16位整数 - value = (packed_data[i] << 8) | packed_data[i + 1] - else: - # 如果字节数为奇数,最后一个字节补0 - value = packed_data[i] << 8 - register_values.append(value) - holding_registers.server_set_values(1161+1, register_values) + # packed_data = bytearray() + # for k, v in self.plc_measurements.items(): + # packed_data.extend(struct.pack('>fHffH', v['value'], v['warning_param']['enable'], v['warning_param']['lower'], v['warning_param']['upper'], v['warning'])) + # register_values = [] + # for i in range(0, len(packed_data), 2): + # if i + 1 < len(packed_data): + # # 组合两个字节为一个16位整数 + # value = (packed_data[i] << 8) | packed_data[i + 1] + # else: + # # 如果字节数为奇数,最后一个字节补0 + # value = packed_data[i] << 8 + # register_values.append(value) + # holding_registers.server_set_values(1161+1, register_values) # 读取CPU各温区温度 thermal_zones = 5 @@ -1491,8 +1798,6 @@ class ModbusGateway: ) self.write_api = self.influx_client.write_api(write_options=SYNCHRONOUS) try: - # 诊断:测量构建 points 与实际写入的耗时 - t_build_start = time.perf_counter() points = [] # 构建toradex核心板CPU温度 point = Point("PCM_Measurement") @@ -1508,55 +1813,63 @@ class ModbusGateway: point.field(field_name, float(field_value)) points.append(point) + # 构建Breaker采集数据 + point = Point("PCM_Measurement") + point.tag("data_type", 'Breaker') + for field_name, field_value in self.breaker.reg_values.items(): + point.field(field_name, float(field_value)) + points.append(point) + + # 构建TaskInfo采集数据 + point = Point("PCM_Measurement") + point.tag("data_type", 'TaskInfo') + print(f"taskInfo = {self.taskInfo}") + for field_name, field_value in self.taskInfo.items(): + if field_value == None: + field_value = 0 + point.field(field_name, float(field_value)) + points.append(point) + # 构建低速采集数据 point = Point("PCM_Measurement") point.tag("data_type", 'LSDAQ') + # for field_name, field_value in self.lsdaq.reg_values.items(): + # point.field(field_name, float(field_value)) + # for field_name, field_value in self.lsdaq.warning_values.items(): + # point.field(field_name+'.warning', field_value) for field_name, field_value in self.lsdaq.reg_values.items(): if field_name in self.lsdaq.alias and self.lsdaq.alias.get(field_name) != '': point.field(self.lsdaq.alias[field_name], float(field_value)) for field_name, field_value in self.lsdaq.warning_values.items(): if field_name in self.lsdaq.alias and self.lsdaq.alias.get(field_name) != '': point.field(self.lsdaq.alias[field_name]+'.warning', field_value) - points.append(point) - + points.append(point) + # 构建高速采集数据 for i in range(16): + # print(str(self.hsdaq.feature_data[f'CH{i}'])) j = i + 1 point = Point("PCM_Measurement") + # point.tag("data_type", f'HSDAQ_CH{j}') if self.hsdaq.alias.get(f'CH{j}') != '': point.tag("data_type", self.hsdaq.alias[f'CH{j}']) for field_name, field_value in self.hsdaq.feature_data[f'CH{j}'].items(): point.field(field_name, float(field_value)) point.field('warning', self.hsdaq.warning_values[f'CH{j}']) points.append(point) - + # 构建从PLC采集到的数据 point = Point("PCM_Measurement") point.tag("data_type", f'PLC') for k, v in self.plc_measurements.items(): point.field(k, float(v['value'])) points.append(point) - t_build = time.perf_counter() - t_build_start - # 测量写入 API 的耗时 - t_write_start = time.perf_counter() + # 将数据点写入influxdb self.influx_client.write_batch_data(points) - t_write_api = time.perf_counter() - t_write_start - self.logger.info(f"_write_to_influxdb: build={t_build:.3f}s write_api={t_write_api:.3f}s points={len(points)}") except Exception as e: self.logger.error(f"Error in _write_to_influxdb(): {e}") - finally: - # write-call diagnostics: count and delta since last call - try: - now_ts = time.time() - last = getattr(self, "_last_write_ts", None) - delta = (now_ts - last) if last else None - self._last_write_ts = now_ts - self._write_call_count = getattr(self, "_write_call_count", 0) + 1 - self.logger.info(f"WriteCall #{self._write_call_count} at {datetime.now()} delta_since_last={delta if delta is not None else 0:.3f}s") - except Exception: - pass def _read_plc_datas(self): if not self.plc_client: @@ -1581,45 +1894,59 @@ class ModbusGateway: def run(self): """主运行循环""" timestamp = time.time() + task_control_reg_addr = self.config_manager.config['task']['control_reg_addr'] + task_control_reg_value = 0x0000 + self.taskInfo['start_time'] = None + self.taskInfo['running_time'] = 0 while True: self.gps.config = self.config_manager.config['gps'] self.lsdaq.config = self.config_manager.config['lsdaq'] self.lsdaq.update_config() self.hsdaq.config = self.config_manager.config['hsdaq'] self.hsdaq.update_config() + self.taskInfo['status'] = self.holding_registers.values[task_control_reg_addr+1] + match self.taskInfo['status']: + case 0x0000: + self.breaker.openBreaker() + self.taskInfo['start_time'] = None + self.taskInfo['running_time'] = 0 + case 0x5555: + self.breaker.closeBreaker() + if self.breaker.reg_values['load_status'] == 1: + current_time = time.time() + self.taskInfo['running_time'] += ( current_time - self.taskInfo['start_time'] ) + self.taskInfo['start_time'] = current_time + else: + self.taskInfo['start_time'] = time.time() + + if self.taskInfo['running_time'] > self.taskInfo['period']: + self.holding_registers.values[task_control_reg_addr+1] = 0xFFFF + continue + + if 1 in self.lsdaq.warning_values.values() or 1 in self.hsdaq.warning_values.values(): + self.breaker.alarming() + self.holding_registers.values[task_control_reg_addr+1] = 0xFFFF + continue + else: + self.breaker.unalarming() + case 0xAAAA: + self.breaker.openBreaker() + self.taskInfo['start_time'] = time.time() + case 0xFFFF: + self.holding_registers.values[task_control_reg_addr+1] = 0x0000 + # time.sleep(0.01) nowtime = time.time() # print(f"{timestamp}, {nowtime}") if (nowtime-timestamp) > 1: timestamp = nowtime - # 将数据写入influxdb,并测量耗时以便诊断性能问题 - try: - t0 = time.perf_counter() - self._write_to_influxdb() - dt_write = time.perf_counter() - t0 - except Exception as e: - self.logger.error(f"_write_to_influxdb exception: {e}") - dt_write = -1.0 + #将数据写入influxdb + self._write_to_influxdb() + # self._read_plc_datas() + self._update_modbus_datas() - try: - t0 = time.perf_counter() - self._read_plc_datas() - dt_read = time.perf_counter() - t0 - except Exception as e: - self.logger.error(f"_read_plc_datas exception: {e}") - dt_read = -1.0 - - total_dt = (dt_write if dt_write > 0 else 0.0) + (dt_read if dt_read > 0 else 0.0) - self.logger.info(f"Timing: _write_to_influxdb={dt_write:.3f}s _read_plc_datas={dt_read:.3f}s total={total_dt:.3f}s") - # 测量 _update_modbus_datas 耗时以定位主循环延迟来源 - try: - t0 = time.perf_counter() - self._update_modbus_datas() - dt_update = time.perf_counter() - t0 - except Exception as e: - self.logger.error(f"_update_modbus_datas exception: {e}") - dt_update = -1.0 - self.logger.info(f"Timing: _update_modbus_datas={dt_update:.3f}s") + # 控制寄存器 + if __name__ == "__main__": gateway = ModbusGateway()