2056 lines
94 KiB
Python
2056 lines
94 KiB
Python
import threading, pynmea2, time, struct, serial, socket, yaml, os, logging.config, json, subprocess, shutil, time, copy, gc, glob
|
||
from pymodbus.server.sync import StartTcpServer
|
||
from pymodbus.client.sync import ModbusTcpClient
|
||
from pymodbus.datastore import ModbusSequentialDataBlock
|
||
from pymodbus.datastore import ModbusSlaveContext, ModbusServerContext
|
||
from threading import Lock
|
||
import numpy as np
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
from influxdb_client import InfluxDBClient, Point
|
||
from influxdb_client.client.write_api import SYNCHRONOUS
|
||
from config_service import ConfigService
|
||
|
||
def checkValue(data, little_endian=True):
|
||
"""
|
||
计算Modbus CRC16校验和
|
||
参数:
|
||
data: 字节串或字节数组
|
||
little_endian: 是否使用小端字节序,默认为False(大端)
|
||
返回:
|
||
CRC16值 (2字节,小端字节序)
|
||
"""
|
||
crc = 0xFFFF
|
||
for byte in data:
|
||
crc ^= byte
|
||
for _ in range(8):
|
||
if crc & 0x0001:
|
||
crc = (crc >> 1) ^ 0xA001
|
||
else:
|
||
crc = crc >> 1
|
||
if little_endian:
|
||
# 小端字节序:低位在前,高位在后
|
||
low_byte = crc & 0xFF
|
||
high_byte = (crc >> 8) & 0xFF
|
||
return (low_byte << 8) | high_byte
|
||
else:
|
||
# 大端字节序:高位在前,低位在后
|
||
return crc & 0xFFFF
|
||
|
||
def nowStr():
|
||
now = datetime.now()
|
||
ret = now.strftime('%Y/%m/%d %H:%M:%S.') + f"{now.microsecond // 1000:03d}"
|
||
return ret
|
||
|
||
def wordData2HexStr(data):
|
||
if data:
|
||
ret = ' '.join(data[i:i+2].hex() for i in range(0, len(data), 2))
|
||
else:
|
||
ret = ''
|
||
return ret.upper()
|
||
|
||
def float_to_registers(value):
|
||
packed = struct.pack('>f', value)
|
||
return [struct.unpack('>H', packed[0:2])[0], struct.unpack('>H', packed[2:4])[0]]
|
||
|
||
def registers_to_float(registers, byte_order='ABCD'):
|
||
"""
|
||
将两个寄存器转换为浮点数
|
||
Args:
|
||
registers (list): 两个寄存器的值 [reg1, reg2]
|
||
byte_order (str): 字节顺序
|
||
Returns:
|
||
float: 转换后的浮点数
|
||
"""
|
||
if len(registers) != 2:
|
||
return None
|
||
|
||
# 将寄存器拆分为字节
|
||
# 每个寄存器是16位,拆分为2个字节
|
||
reg1_bytes = registers[0].to_bytes(2, byteorder='big') # 高地址寄存器
|
||
reg2_bytes = registers[1].to_bytes(2, byteorder='big') # 低地址寄存器
|
||
|
||
# 根据字节顺序组合字节
|
||
if byte_order == 'ABCD': # 标准Modbus (大端序)
|
||
byte_array = reg1_bytes + reg2_bytes
|
||
elif byte_order == 'CDAB': # 字交换
|
||
byte_array = reg2_bytes + reg1_bytes
|
||
elif byte_order == 'BADC': # 字节交换
|
||
byte_array = bytes(reversed(reg1_bytes)) + bytes(reversed(reg2_bytes))
|
||
elif byte_order == 'DCBA': # 字节和字都交换
|
||
byte_array = bytes(reversed(reg2_bytes)) + bytes(reversed(reg1_bytes))
|
||
else:
|
||
return None
|
||
float_value = struct.unpack('>f', byte_array)[0] # '>f' 表示大端序浮点数
|
||
# 检查是否为NaN或无穷大
|
||
if abs(float_value) == float('inf'):
|
||
return None
|
||
return float_value
|
||
|
||
class ConfigManager:
|
||
def __init__(self, regs_config_file, config_file, logger):
|
||
self.config_file = Path(config_file)
|
||
self.regs_config_file = Path(regs_config_file)
|
||
self.lock = threading.Lock()
|
||
self.config = {}
|
||
self.regs_config = {}
|
||
self.logger = logger
|
||
self.mapping = BidirectionalMap()
|
||
|
||
self.load_all_configs()
|
||
|
||
# 设置文件监视器
|
||
# self.observer = Observer()
|
||
# self.event_handler = ConfigFileHandler(self)
|
||
# self.observer.schedule(self.event_handler, path=str(self.config_file.parent))
|
||
# self.observer.start()
|
||
|
||
def load_all_configs(self):
|
||
"""加载主配置和寄存器配置"""
|
||
with self.lock:
|
||
if not os.path.exists(self.config_file):
|
||
self.logger.warning(f"Config file {self.config_file} not found")
|
||
|
||
if not os.path.exists(self.regs_config_file):
|
||
self.logger.warning(f"Regsister mapping file {self.regs_config_file} not found")
|
||
|
||
# 加载主配置
|
||
with open(self.config_file, 'r') as f:
|
||
self.config = yaml.safe_load(f)
|
||
# 低速采集sensor_type处理
|
||
self.config['lsdaq']['sensor_type'] = self.config['lsdaq'].get('sensor_type').replace(' ', '')
|
||
if len(self.config['lsdaq']['sensor_type']) != 16 or not all(c in '01' for c in self.config['lsdaq']['sensor_type']):
|
||
self.config['lsdaq']['sensor_type'] = '1111111111111111'
|
||
self.config['lsdaq']['sensor_type'] = int(self.config['lsdaq']['sensor_type'][::-1], 2)
|
||
|
||
# 低速采集 warning_param enable 处理
|
||
self.config['lsdaq']['warning_param']['enable'] = self.config['lsdaq']['warning_param'].get('enable').replace(' ', '')
|
||
if len(self.config['lsdaq']['warning_param']['enable']) != 16 or not all(c in '01' for c in self.config['lsdaq']['warning_param']['enable']):
|
||
self.config['lsdaq']['warning_param']['enable'] = '0000000000000000'
|
||
self.config['lsdaq']['warning_param']['enable'] = int(self.config['lsdaq']['warning_param']['enable'][::-1], 2)
|
||
|
||
# 高速采集sensor_type处理
|
||
self.config['hsdaq']['sensor_type'] = self.config['hsdaq'].get('sensor_type').replace(' ', '')
|
||
if len(self.config['hsdaq']['sensor_type']) != 32 or not all(c in '01' for c in self.config['hsdaq']['sensor_type']):
|
||
self.config['hsdaq']['sensor_type'] = '11111111111111111111111111111111'
|
||
_s = self.config['hsdaq']['sensor_type']
|
||
self.config['hsdaq']['sensor_type'] = int(''.join([_s[2*i:2*i+2] for i in range(len(_s)//2-1, -1, -1)]), 2)
|
||
|
||
# 高速采集save_flag处理
|
||
self.config['hsdaq']['save_flag'] = self.config['hsdaq'].get('save_flag').replace(' ', '')
|
||
if len(self.config['hsdaq']['save_flag']) != 16 or not all(c in '01' for c in self.config['hsdaq']['save_flag']):
|
||
self.config['hsdaq']['save_flag'] = '1111111111111111'
|
||
self.config['hsdaq']['save_flag'] = int(self.config['hsdaq']['save_flag'][::-1], 2)
|
||
|
||
# 高速采集 warning_param enable 处理
|
||
self.config['hsdaq']['warning_param']['enable'] = self.config['hsdaq']['warning_param'].get('enable').replace(' ', '')
|
||
if len(self.config['hsdaq']['warning_param']['enable']) != 32 or not all(c in '01' for c in self.config['hsdaq']['warning_param']['enable']):
|
||
self.config['hsdaq']['warning_param']['enable'] = '0000000000000000'
|
||
self.config['hsdaq']['warning_param']['enable'] = int(self.config['hsdaq']['warning_param']['enable'][::-1], 2)
|
||
|
||
with open(self.regs_config_file, 'r') as f:
|
||
self.regs_config = yaml.safe_load(f)
|
||
|
||
# 构建映射关系
|
||
self._build_mappings()
|
||
|
||
def _build_mappings(self):
|
||
"""构建配置键到地址的双向映射"""
|
||
# 处理value_regs
|
||
# if 'value_regs' in self.regs_config:
|
||
# self._process_registers_section(self.regs_config['value_regs'], '', 'value')
|
||
|
||
# 处理control_regs
|
||
if 'control_regs' in self.regs_config:
|
||
self._process_registers_section(self.regs_config['control_regs'], '', 'control')
|
||
|
||
def _process_registers_section(self, section, current_path, reg_type):
|
||
"""处理寄存器配置部分"""
|
||
def traverse(node, current_path=""):
|
||
# print(f"node={node}, current_path={current_path}")
|
||
for key, value in node.items():
|
||
new_path = f"{current_path}.{key}" if current_path else key
|
||
if isinstance(value, dict):
|
||
if all(isinstance(k, str) and isinstance(v, int) for k, v in value.items()):
|
||
# 这是叶子节点,包含寄存器地址
|
||
for sub_key, address in value.items():
|
||
full_path = f"{new_path}.{sub_key}"
|
||
self.mapping.add_mapping(full_path, address, reg_type)
|
||
else:
|
||
traverse(value, new_path)
|
||
else:
|
||
# 直接映射
|
||
self.mapping.add_mapping(new_path, value[0], value[1])
|
||
|
||
traverse(section, current_path)
|
||
# print(f"key_to_address={self.mapping.key_to_address}")
|
||
# print(f"address_to_keys={self.mapping.address_to_keys}")
|
||
# print(f"key_to_data_type={self.mapping.key_to_data_type}")
|
||
# print(f"address_to_data_type={self.mapping.address_to_data_type}")
|
||
|
||
def get_config_value(self, config_path):
|
||
"""通过配置路径获取配置值"""
|
||
keys = config_path.split('.')
|
||
node = self.config
|
||
for key in keys:
|
||
if isinstance(node, dict) and key in node:
|
||
node = node[key]
|
||
else:
|
||
return None
|
||
return node
|
||
|
||
def update_config_value(self, config_path, value):
|
||
"""更新配置值并保存"""
|
||
with self.lock:
|
||
# print(config_path)
|
||
keys = config_path.split('.')
|
||
node = self.config
|
||
for key in keys[:-1]:
|
||
if key not in node:
|
||
node[key] = {}
|
||
node = node[key]
|
||
node[keys[-1]] = value
|
||
|
||
# 保存到文件
|
||
# self._save_config()
|
||
return True
|
||
|
||
def _save_config(self):
|
||
"""保存配置到文件"""
|
||
_config = copy.deepcopy(self.config)
|
||
_config['lsdaq']['sensor_type'] = f"{_config['lsdaq']['sensor_type']:016b}"[::-1]
|
||
_config['lsdaq']['warning_param']['enable'] = f"{_config['lsdaq']['warning_param']['enable']:016b}"[::-1]
|
||
_s = f"{_config['hsdaq']['sensor_type']:032b}"
|
||
_config['hsdaq']['sensor_type'] = ''.join([_s[2*i:2*i+2] for i in range(len(_s)//2-1, -1, -1)])
|
||
_config['hsdaq']['save_flag'] = f"{_config['hsdaq']['save_flag']:016b}"[::-1]
|
||
_config['hsdaq']['warning_param']['enable'] = f"{_config['hsdaq']['warning_param']['enable']:016b}"[::-1]
|
||
|
||
with open(self.config_file, 'w') as f:
|
||
yaml.dump(_config, f, sort_keys=False, default_flow_style=False)
|
||
|
||
# def close(self):
|
||
# self.observer.stop()
|
||
# self.observer.join()
|
||
|
||
class BidirectionalMap:
|
||
def __init__(self):
|
||
self.key_to_address = {} # 配置键 -> (地址, 类型)
|
||
self.address_to_keys = {} # 地址 -> [配置键]
|
||
self.key_to_data_type = {} # 配置键 -> 数据类型
|
||
self.address_to_data_type = {} # 地址 -> 数据类型
|
||
|
||
def add_mapping(self, config_key, address, reg_type, data_type='uint16'):
|
||
"""添加映射关系"""
|
||
self.key_to_address[config_key] = (address, reg_type)
|
||
self.address_to_keys.setdefault(address, []).append(config_key)
|
||
self.key_to_data_type[config_key] = data_type
|
||
self.address_to_data_type[address] = data_type
|
||
|
||
def get_address(self, config_key):
|
||
"""通过配置键获取地址和类型"""
|
||
print(self.key_to_address)
|
||
return self.key_to_address.get(config_key, (None, None))
|
||
|
||
def get_config_keys(self, address):
|
||
"""通过地址获取配置键列表"""
|
||
# print(self.address_to_keys)
|
||
return self.address_to_keys.get(address, [])
|
||
|
||
def get_data_type(self, identifier):
|
||
"""获取数据类型,identifier可以是地址或配置键"""
|
||
if isinstance(identifier, int):
|
||
return self.address_to_data_type.get(identifier)
|
||
else:
|
||
return self.key_to_data_type.get(identifier)
|
||
|
||
class DataTypeValidator:
|
||
@staticmethod
|
||
def validate(value, data_type):
|
||
try:
|
||
if data_type == 'float32':
|
||
return float(value)
|
||
elif data_type in ('uint16', 'uint32'):
|
||
val = int(value)
|
||
if data_type == 'uint16' and not (0 <= val <= 65535):
|
||
raise ValueError("Value out of range for uint16")
|
||
elif data_type == 'uint32' and not (0 <= val <= 4294967295):
|
||
raise ValueError("Value out of range for uint32")
|
||
return val
|
||
elif data_type == 'int32':
|
||
val = int(value)
|
||
if not (-2147483648 <= val <= 2147483647):
|
||
raise ValueError("Value out of range for int32")
|
||
return val
|
||
elif data_type == 'string':
|
||
return str(value)
|
||
else:
|
||
return int(value) # 默认处理为uint16
|
||
except (ValueError, TypeError) as e:
|
||
logging.error(f"Validation failed for {value} as {data_type}: {str(e)}")
|
||
return None
|
||
|
||
class RegisterConfigEnhancer:
|
||
def __init__(self, register_config):
|
||
self.register_config = register_config
|
||
self.data_type_mapping = self._create_data_type_mapping()
|
||
|
||
def _create_data_type_mapping(self):
|
||
"""为寄存器分配适当的数据类型"""
|
||
mapping = {}
|
||
|
||
# GPS数据通常需要浮点数
|
||
if 'value_regs' in self.register_config and 'gps' in self.register_config['value_regs']:
|
||
for field in ['latitude', 'longitude', 'altitude', 'speed']:
|
||
if field in self.register_config['value_regs']['gps']:
|
||
addr = self.register_config['value_regs']['gps'][field]
|
||
mapping[addr] = 'float32'
|
||
|
||
# 传感器校准参数需要浮点数
|
||
for dev in ['lsdaq', 'hsdaq']:
|
||
if dev in self.register_config.get('control_regs', {}):
|
||
for param_type in ['sensor_Tmp_CalibParam', 'sensor_Cur_CalibParam',
|
||
'sensor_Vol_CalibParam', 'sensor_Vib_CalibParam']:
|
||
if param_type in self.register_config['control_regs'][dev]:
|
||
for ch in self.register_config['control_regs'][dev][param_type]:
|
||
for param in ['K2', 'K', 'B']:
|
||
addr = self.register_config['control_regs'][dev][param_type][ch][param]
|
||
mapping[addr] = 'float32'
|
||
|
||
return mapping
|
||
|
||
def get_data_type(self, address):
|
||
return self.data_type_mapping.get(address, 'uint16')
|
||
|
||
class ModbusSequentialDataBlockForPCM(ModbusSequentialDataBlock):
|
||
def __init__(self, config_manager, logger, *args, **kwargs):
|
||
super().__init__(*args, **kwargs)
|
||
self.config_manager = config_manager
|
||
self._is_client_write = True
|
||
self.logger = logger
|
||
self._initialize_registers()
|
||
|
||
def _initialize_registers(self):
|
||
"""Initialize register values from configuration"""
|
||
for key, value in self.config_manager.regs_config['control_regs'].items():
|
||
config_value = self.config_manager.get_config_value(key)
|
||
# print(f"{key}:{value[0]}:{config_value}")
|
||
if config_value is not None and ('w' in value[2] or 'W' in value[2]):
|
||
match value[1]:
|
||
case 'float32':
|
||
config_value = float(config_value)
|
||
self.server_set_values(value[0]+1, float_to_registers(config_value))
|
||
case 'uint32':
|
||
config_value = int(config_value)
|
||
# self.server_set_values(value[0]+1, [config_value & 0xFFFF, (config_value >> 16) & 0xFFFF])
|
||
self.server_set_values(value[0]+1, [(config_value >> 16) & 0xFFFF, config_value & 0xFFFF])
|
||
case 'int32':
|
||
config_value = int(config_value)
|
||
# self.server_set_values(value[0]+1, [config_value & 0xFFFF, (config_value >> 16) & 0xFFFF])
|
||
self.server_set_values(value[0]+1, [(config_value >> 16) & 0xFFFF, config_value & 0xFFFF])
|
||
case 'uint16':
|
||
config_value = int(config_value)
|
||
# print(f"{key}:{value[0]}:{config_value}:{[struct.pack('>H', config_value)[0]]}")
|
||
self.server_set_values(value[0]+1, [config_value & 0xFFFF])
|
||
case 'int16':
|
||
config_value = int(config_value)
|
||
self.server_set_values(value[0]+1, [config_value & 0xFFFF])
|
||
case _:
|
||
pass
|
||
self.logger.info("Register initialization completed")
|
||
|
||
def setValues(self, address, values):
|
||
"""Override setValues method"""
|
||
if not self._is_client_write:
|
||
super().setValues(address, values)
|
||
return
|
||
|
||
super().setValues(address, values)
|
||
|
||
# Handle client writes
|
||
updated = False
|
||
print(f"*************************address={address}, values={values}*************************")
|
||
reg_addr = address - 1
|
||
# print(f"values = {values}")
|
||
# path = self.config_manager.mapping.get_config_keys(reg_addr)
|
||
# print(f"*************************{path}:{reg_addr}:{values}********************")
|
||
# if self.config_manager.update_config_value(path[0], value[0]):
|
||
# updated = True
|
||
|
||
regCount = len(values)
|
||
while(regCount > 0):
|
||
path = self.config_manager.mapping.get_config_keys(reg_addr)
|
||
print(f"*************************{path}, {reg_addr}, {regCount}*************************")
|
||
dataType = self.config_manager.mapping.key_to_address[path[0]][1]
|
||
print(f"*************************{path}, {dataType}, {regCount}*************************")
|
||
if len(path) > 0:
|
||
if '16' in dataType:
|
||
print(f"*************************{path}:{reg_addr}:{values[0]}:{regCount}********************")
|
||
if dataType in ['int16', 'uint16']:
|
||
self.config_manager.update_config_value(path[0], int(values[0]))
|
||
regCount -= 1
|
||
reg_addr += 1
|
||
values = values[1:]
|
||
elif '32' in dataType:
|
||
print(f"*************************{path}:{reg_addr}:{values[0:2]}:{regCount}********************")
|
||
if dataType in ['int32', 'uint32']:
|
||
self.config_manager.update_config_value(path[0], (values[0]<<16)+values[1])
|
||
elif dataType == 'float32':
|
||
self.config_manager.update_config_value(path[0], registers_to_float(values))
|
||
regCount -= 2
|
||
reg_addr += 2
|
||
values = values[2:]
|
||
else:
|
||
regCount -= 1
|
||
reg_addr += 1
|
||
|
||
if updated:
|
||
self.config_manager.save_config()
|
||
self.logger.debug(f"Register {address} update triggered configuration change")
|
||
|
||
def server_set_values(self, address, values):
|
||
"""Server-only write method that won't trigger YAML update"""
|
||
# self._is_client_write = False
|
||
# self.setValues(address, values)
|
||
# self._is_client_write = True
|
||
super().setValues(address, values)
|
||
|
||
class LSDAQ:
|
||
def __init__(self, config:dict, logger):
|
||
# 加载配置参数
|
||
''' self.status 码表
|
||
-200: 配置信息错误
|
||
-201: 串口号错误
|
||
-202: 传感器类型错误
|
||
-203: 工作模式错误
|
||
-100: 设备关闭
|
||
-101: 设备未连接
|
||
-1: 多次执行指令失败
|
||
0: 正常
|
||
100: 连接失败
|
||
200: 命令执行失败
|
||
202: 读取命令错误
|
||
203: 响应超时
|
||
204: 报头错误
|
||
205: 校验错误
|
||
206: 数据解析错误
|
||
'''
|
||
self.status = -1
|
||
self.config = config
|
||
self.logger = logger
|
||
self.port = config.get('port', '/dev/ttyLP3')
|
||
if self.port != '/dev/ttyLP3':
|
||
self.status = -201
|
||
self.baudrate = config.get('baudrate', 115200)
|
||
self.timeout = config.get('timeout', 50)/1000.0
|
||
self.mode = config.get('mode', 0)
|
||
self.channels = config.get('channels', 16)
|
||
if self.mode not in [0, 1]:
|
||
self.mode = 0
|
||
self.status = -203
|
||
self.frameNo = 0
|
||
self.sensor_type = config.get('sensor_type', 0xffff)
|
||
self.alias = config.get('alias', {})
|
||
for i in range(16):
|
||
if f'CH{i+1}' not in self.alias:
|
||
self.alias[f'CH{i+1}'] = ''
|
||
self.reg_values = {
|
||
'CH1': 0.0,
|
||
'CH2': 0.0,
|
||
'CH3': 0.0,
|
||
'CH4': 0.0,
|
||
'CH5': 0.0,
|
||
'CH6': 0.0,
|
||
'CH7': 0.0,
|
||
'CH8': 0.0,
|
||
'CH9': 0.0,
|
||
'CH10': 0.0,
|
||
'CH11': 0.0,
|
||
'CH12': 0.0,
|
||
'CH13': 0.0,
|
||
'CH14': 0.0,
|
||
'CH15': 0.0,
|
||
'CH16': 0.0,
|
||
'OFFSET': 0.0,
|
||
'POWERVOL': 0.0,
|
||
'TEMP': 0.0,
|
||
'GAIN': 0.0,
|
||
'REF': 0.0,
|
||
'STATUS': 0.0
|
||
}
|
||
self.warning_values = {
|
||
'CH1': 0,
|
||
'CH2': 0,
|
||
'CH3': 0,
|
||
'CH4': 0,
|
||
'CH5': 0,
|
||
'CH6': 0,
|
||
'CH7': 0,
|
||
'CH8': 0,
|
||
'CH9': 0,
|
||
'CH10': 0,
|
||
'CH11': 0,
|
||
'CH12': 0,
|
||
'CH13': 0,
|
||
'CH14': 0,
|
||
'CH15': 0,
|
||
'CH16': 0
|
||
}
|
||
_sensor_Tmp_CalibParam = {
|
||
'CH1': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH2': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH3': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH4': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH5': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH6': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH7': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH8': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH9': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH10': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH11': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH12': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH13': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH14': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH15': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH16': {'K2':0.0, 'K': 1.0, 'B': 0.0}
|
||
}
|
||
_sensor_Cur_CalibParam = {
|
||
'CH1': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH2': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH3': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH4': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH5': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH6': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH7': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH8': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH9': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH10': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH11': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH12': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH13': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH14': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH15': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH16': {'K2':0.0, 'K': 1.0, 'B': 0.0}
|
||
}
|
||
|
||
_sensor_Pres_CalibParam = {
|
||
'CH1': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH2': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH3': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH4': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH5': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH6': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH7': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH8': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH9': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH10': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH11': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH12': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH13': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH14': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH15': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH16': {'K2':0.0, 'K': 1.0, 'B': 0.0}
|
||
}
|
||
|
||
self.sensor_Tmp_CalibParam = config.get('sensor_Tmp_CalibParam', _sensor_Tmp_CalibParam)
|
||
self.sensor_Cur_CalibParam = config.get('sensor_Cur_CalibParam', _sensor_Cur_CalibParam)
|
||
self.sensor_Pres_CalibParam = config.get('sensor_Cur_CalibParam', _sensor_Pres_CalibParam)
|
||
|
||
# 构建指令集
|
||
self.cmdList = {
|
||
# 查询所有通道采集数据
|
||
# 指令格式:指令字符串,回复长度,超时时间,发送校验标志,接收校验标志,指令描述,重试次数
|
||
'readAllADs': ['', f"0000 0000 0006 0103 0008 0017", 55, 200, 0, 0, 5]
|
||
}
|
||
self.optFlag = 0
|
||
|
||
def update_config(self):
|
||
self.mode = self.config.get('mode')
|
||
if self.mode not in [0, 1]:
|
||
self.mode = 0
|
||
self.sensor_type = self.config.get('sensor_type', 0xffff)
|
||
self.sensor_Tmp_CalibParam = self.config.get('sensor_Tmp_CalibParam')
|
||
self.sensor_Cur_CalibParam = self.config.get('sensor_Cur_CalibParam')
|
||
self.sensor_Pres_CalibParam = self.config.get('sensor_Pres_CalibParam')
|
||
|
||
def exeCmd(self, cmdName:str='readAllADs') -> list: # type: ignore
|
||
try:
|
||
info = ''
|
||
cmd = self.cmdList.get(cmdName, None)
|
||
self.status = 0
|
||
if cmd is None:
|
||
self.status = 202
|
||
return [False, None, f"Command {cmdName} not found in cmdList."]
|
||
retry = 0
|
||
data = bytearray().fromhex(cmd[1])
|
||
|
||
if (cmd[4] == 1):
|
||
data += bytearray(checkValue(data[2:]).to_bytes(2, 'big'))
|
||
if len(cmd) >= 7:
|
||
RETRYTIMES = int(cmd[6])
|
||
else:
|
||
RETRYTIMES = 1
|
||
while (retry < RETRYTIMES):
|
||
info += f"[{nowStr()}] Sent:{wordData2HexStr(data)}\n"
|
||
recvData = bytearray()
|
||
self.serial.write(data)
|
||
time.sleep(int(cmd[3])/1000.0)
|
||
recvData = self.serial.read(int(cmd[2]))
|
||
info += (f"[{nowStr()}] Echo:{wordData2HexStr(recvData)}\n")
|
||
rspLen = int(cmd[2])
|
||
if len(recvData) >= rspLen:
|
||
if recvData[0:2] == bytearray().fromhex(f"0000"):
|
||
# info += f"[{nowStr()}] Echo:{wordData2HexStr(recvData[0:rspLen])}\n"
|
||
rspLen = len(recvData)
|
||
if (cmd[5] == 1):
|
||
crc = int.from_bytes(recvData[rspLen-2:rspLen], byteorder='big')
|
||
calc_value = checkValue(recvData[0:rspLen-2])
|
||
# info += f"{crc:04X}, {calc_value:04X}\n"
|
||
if crc == calc_value:
|
||
# self.logger.info(info)
|
||
return [True, recvData, info]
|
||
else:
|
||
self.status = 205
|
||
else:
|
||
self.logger.info(info)
|
||
return [True, recvData, info]
|
||
else:
|
||
self.status = 204
|
||
recvData = recvData[1:]
|
||
else:
|
||
self.status = 203
|
||
retry += 1
|
||
if retry == RETRYTIMES:
|
||
self.status = -1
|
||
# self.logger.info(info)
|
||
return [False, None, info]
|
||
except Exception as e:
|
||
info += f"[{nowStr()}] Error in exeCmd({cmd}): {str(e)}\n" # type: ignore
|
||
# self.logger.info(info)
|
||
return [False, None, info]
|
||
|
||
def parseData(self, cmdName, rawData):
|
||
_sensor_type = f"{self.sensor_type:016b}"[::-1]
|
||
match cmdName:
|
||
case 'readAllADs':
|
||
datas = struct.unpack('>23H', rawData[9:55])
|
||
if self.mode == 1:
|
||
# 校准模式下,直接返回原始数据
|
||
for i in range(self.channels):
|
||
self.reg_values[f'CH{i+1}'] = datas[i]
|
||
else:
|
||
# 工作模式下,进行数据转换
|
||
for i in range(self.channels):
|
||
j = i + 1
|
||
if _sensor_type[i] == '0':
|
||
# 温度传感器
|
||
# self.logger.info(str(self.reg_values))
|
||
self.reg_values[f'CH{j}'] = (datas[i]**2*self.sensor_Tmp_CalibParam[f'CH{j}']['K2'] + datas[i]*self.sensor_Tmp_CalibParam[f'CH{j}']['K'] + self.sensor_Tmp_CalibParam[f'CH{j}']['B'])
|
||
elif _sensor_type[i] == '1':
|
||
# 电流传感器
|
||
self.reg_values[f'CH{j}'] = (datas[i]**2*self.sensor_Cur_CalibParam[f'CH{j}']['K2'] + datas[i]*self.sensor_Cur_CalibParam[f'CH{j}']['K'] + self.sensor_Cur_CalibParam[f'CH{j}']['B'])
|
||
# 转换为物理量
|
||
self.reg_values[f'CH{j}'] = (self.reg_values[f'CH{j}']**2*self.sensor_Pres_CalibParam[f'CH{j}']['K2'] + self.reg_values[f'CH{j}']*self.sensor_Pres_CalibParam[f'CH{j}']['K'] + self.sensor_Pres_CalibParam[f'CH{j}']['B'])
|
||
|
||
self.reg_values['OFFSET'] = datas[16]*256/786432
|
||
self.reg_values['POWERVOL'] = datas[18]*256/786432
|
||
self.reg_values['TEMP'] = (datas[19]*4500000*256/7864320-168000)/563 + 25 #7864320*256/4500000-168000)/563 + 25
|
||
self.reg_values['GAIN'] = datas[20]*256/7864320
|
||
self.reg_values['REF'] = datas[21]*256/786432
|
||
self.reg_values['STATUS'] = self.status
|
||
|
||
self.warning_check()
|
||
case _:
|
||
self.status = 206
|
||
|
||
def warning_check(self):
|
||
"""检查是否有报警条件"""
|
||
for i in range(self.channels):
|
||
ch = f'CH{i+1}'
|
||
val = self.reg_values[ch]
|
||
wp = self.config.get('warning_param', {})
|
||
enable_bits = f"{wp.get('enable', 0):016b}"[::-1]
|
||
if enable_bits[i] == '1':
|
||
low_limit = wp.get(ch, {}).get('lower', float('-inf'))
|
||
high_limit = wp.get(ch, {}).get('upper', float('inf'))
|
||
if val < low_limit or val > high_limit:
|
||
# self.logger.warning(f"Warning: {ch} value {val} out of limits ({low_limit}, {high_limit})")
|
||
self.warning_values[ch] = 1
|
||
else:
|
||
self.warning_values[ch] = 0
|
||
else:
|
||
self.warning_values[ch] = 0
|
||
|
||
def open(self):
|
||
"""打开串口连接"""
|
||
self.serial = serial.Serial(self.port, self.baudrate, timeout=self.timeout)
|
||
if not self.serial.is_open:
|
||
self.status = -101
|
||
return -1
|
||
else:
|
||
self.status = 0
|
||
return 0
|
||
|
||
def close(self):
|
||
self.serial.close()
|
||
self.status = -100
|
||
|
||
def run(self):
|
||
"""主运行循环"""
|
||
try:
|
||
while True:
|
||
match self.optFlag:
|
||
case 0:
|
||
if self.open() == 0:
|
||
self.optFlag = 1
|
||
else:
|
||
self.optFlag = -1
|
||
case 1:
|
||
ret = self.exeCmd('readAllADs')
|
||
if ret[0]:
|
||
self.parseData('readAllADs', ret[1])
|
||
# self.logger.info(str(self.reg_values))
|
||
self.frameNo += 1
|
||
if self.frameNo > 0xFFFF:
|
||
self.frameNo = 0
|
||
time.sleep(1)
|
||
if self.status == -1:
|
||
self.optFlag = -1
|
||
case _:
|
||
time.sleep(5)
|
||
self.close()
|
||
self.optFlag = 0
|
||
except KeyboardInterrupt:
|
||
self.close()
|
||
self.logger.info("Modbus Serial TCP Client stopped.")
|
||
|
||
class Breaker:
|
||
def __init__(self, config:dict, logger):
|
||
# 加载配置参数
|
||
''' self.errorCode 码表
|
||
0x0001 打开/dev/ttyUSB0设备失败
|
||
0x0101 与断路器通讯失败
|
||
'''
|
||
self.errorCode = 0
|
||
''' self.load_status 码表
|
||
0x00 负载不在线
|
||
0x0101 负载在线
|
||
'''
|
||
self.load_status = 0
|
||
self.config = config
|
||
self.logger = logger
|
||
self.port = config.get('port', '/dev/ttyUSB0')
|
||
self.baudrate = config.get('baudrate', 9600)
|
||
self.timeout = config.get('timeout', 50)/1000.0
|
||
self.task_start_threshold = config.get('task_start_threshold', 2000)
|
||
self.task_stop_threshold = config.get('task_stop_threshold', 2000)
|
||
self.locked = 0
|
||
self.closed = 0x0F # 0x0F:分闸 0xF0:合闸
|
||
self.reasonForLastOpen = 15 # F:无 1:过流 2:漏电 3:过温 4:过载 5:过压 6:欠压 7:远程 8:模组 9:失压 A:锁扣 B:限电 0: 本地
|
||
self.active_powers = []
|
||
self.duration = config.get('duration', 5)
|
||
self.alarm = 0
|
||
|
||
self.active_power = 0 # 有功功率,单位:W
|
||
|
||
OVV = config.get('OVV', 275)
|
||
UVV = config.get('UVV', 150)
|
||
OCV = config.get('OCV', 10000)
|
||
LCV = config.get('LCV', 30)
|
||
OTV = config.get('OTV', 80)
|
||
OPV = config.get('OPV', 13000)
|
||
OVT = config.get('OVT', 0)
|
||
UVT = config.get('UVT', 0)
|
||
OCT = config.get('OCT', 0)
|
||
LCT = config.get('LCT', 200)
|
||
OTT = config.get('OTT', 200)
|
||
OPT = config.get('OPT', 100)
|
||
|
||
self.reg_values = {
|
||
'locked': 0,
|
||
'closed': 0x0F,
|
||
'reasonForLastOpen': 0x0F,
|
||
'alarm': 0,
|
||
'active_power': 0,
|
||
'load_status': 0
|
||
}
|
||
# 构建指令集
|
||
self.cmdList = {
|
||
# 指令格式:指令描述,指令字符串,回复长度,超时时间,发送校验标志,接收校验标志,重试次数
|
||
'readAllDatas': ['', f"0204 0000 0027", 83, 300, 1, 1, 3],
|
||
'readOverLimitValues': ['', f"0203 0002 0006", 17, 200, 1, 1, 3],
|
||
'readOverLimitActionTime': ['', f"0203 0010 0006", 17, 200, 1, 1, 3],
|
||
'setOverLimitValues': ['', f"0210 0002 0006 0C {OVV:04X} {UVV:04X} {OCV:04X} {LCV:04X} {OTV:04X} {OPV:04X}", 8, 100, 1, 1, 3],
|
||
'setOverLimitActionTime': ['', f"0210 0010 0006 0C {OVT:04X} {UVT:04X} {OCT:04X} {LCT:04X} {OTT:04X} {OPT:04X}", 8, 100, 1, 1, 3],
|
||
'closeBreaker': ['', f"0210 000D 0001 02 FF00", 8, 100, 1, 1, 3],
|
||
'openBreaker': ['', f"0210 000D 0001 02 0000", 8, 100, 1, 1, 3],
|
||
'turnOnGreen': ['', f"0105 0002 FF00", 8, 100, 1, 1, 3],
|
||
'turnOffGreen': ['', f"0105 0002 0000", 8, 100, 1, 1, 3],
|
||
'turnOnRed': ['', f"0105 0008 FF00", 8, 100, 1, 1, 3],
|
||
'turnOffRed': ['', f"0105 0000 0000", 8, 100, 1, 1, 3],
|
||
'turnOnAlarm': ['', f"0105 00A1 FF00", 8, 100, 1, 1, 3],
|
||
'turnOffAlarm': ['', f"0105 00A1 0000", 8, 100, 1, 1, 3]
|
||
}
|
||
|
||
self.optFlag = 0
|
||
self.logger.info(f"Breader routine inspection started.")
|
||
|
||
def update_config(self):
|
||
pass
|
||
|
||
def exeCmd(self, cmdName) -> list: # type: ignore
|
||
try:
|
||
info = ''
|
||
cmd = self.cmdList.get(cmdName, None)
|
||
if cmd is None:
|
||
return [False, None, f"Command {cmdName} not found in cmdList."]
|
||
retry = 0
|
||
data = bytearray().fromhex(cmd[1])
|
||
if (cmd[4] == 1):
|
||
data += bytearray(checkValue(data[0:]).to_bytes(2, 'big'))
|
||
if len(cmd) >= 7:
|
||
RETRYTIMES = int(cmd[6])
|
||
else:
|
||
RETRYTIMES = 1
|
||
while (retry < RETRYTIMES):
|
||
info += f"[{nowStr()}] Sent:{wordData2HexStr(data)}\n"
|
||
recvData = bytearray()
|
||
self.serial.write(data)
|
||
time.sleep(int(cmd[3])/1000.0)
|
||
recvData = self.serial.read(int(cmd[2]))
|
||
info += (f"[{nowStr()}] Echo:{wordData2HexStr(recvData)}\n")
|
||
rspLen = int(cmd[2])
|
||
if len(recvData) >= rspLen:
|
||
if recvData[0:2] == bytearray().fromhex(cmd[1][0:4]):
|
||
# info += f"[{nowStr()}] Echo:{wordData2HexStr(recvData[0:rspLen])}\n"
|
||
recvData = recvData[0:rspLen]
|
||
if (cmd[5] == 1):
|
||
crc = int.from_bytes(recvData[rspLen-2:rspLen], byteorder='big')
|
||
calc_value = checkValue(recvData[0:rspLen-2])
|
||
# info += f"{crc:04X}, {calc_value:04X}\n"
|
||
if crc == calc_value:
|
||
# self.logger.info(info)
|
||
return [True, recvData, info]
|
||
else:
|
||
# self.logger.info(info)
|
||
return [True, recvData, info]
|
||
retry += 1
|
||
# self.logger.info(info)
|
||
return [False, None, info]
|
||
except Exception as e:
|
||
info += f"[{nowStr()}] Error in exeCmd({cmd}): {str(e)}\n" # type: ignore
|
||
# self.logger.info(info)
|
||
return [False, None, info]
|
||
|
||
def parseData(self, cmdName, rawData):
|
||
try:
|
||
match cmdName:
|
||
case 'readAllDatas':
|
||
rawData = rawData[3:-2]
|
||
self.locked = rawData[0]
|
||
self.closed = rawData[1]
|
||
self.reasonForLastOpen = (rawData[6]&0xF0)>>4
|
||
self.active_power = int.from_bytes(rawData[68:70], byteorder='big')
|
||
self.active_powers.append(self.active_power)
|
||
|
||
if len(self.active_powers) > self.duration * 2:
|
||
self.active_powers = self.active_powers[1:]
|
||
if np.mean(self.active_powers) > self.task_start_threshold:
|
||
self.load_status = 1
|
||
if np.mean(self.active_powers) < self.task_stop_threshold:
|
||
self.load_status = 0
|
||
|
||
|
||
self.reg_values['locked'] = self.locked
|
||
self.reg_values['closed'] = self.closed
|
||
self.reg_values['reasonForLastOpen'] = self.reasonForLastOpen
|
||
self.reg_values['alarm'] = self.alarm
|
||
self.reg_values['active_power'] = self.active_power
|
||
self.reg_values['load_status'] = self.load_status
|
||
|
||
print(f"breaker: {self.reg_values}")
|
||
|
||
case 'closeBreaker':
|
||
pass
|
||
case 'openBreaker':
|
||
pass
|
||
case _:
|
||
pass
|
||
except Exception as e:
|
||
self.logger.error(f"[{nowStr()}] Error in Breaker: parseData({cmdName}): {str(e)}\n")
|
||
|
||
def openBreaker(self):
|
||
if self.reg_values['closed'] == 0xF0:
|
||
self.optFlag = 2
|
||
|
||
def closeBreaker(self):
|
||
if self.reg_values['closed'] == 0x0F:
|
||
self.optFlag = 3
|
||
|
||
def alarming(self):
|
||
if not self.alarm and self.closed & 0xFF == 0xF0:
|
||
self.exeCmd('turnOffGreen')
|
||
self.exeCmd('turnOnRed')
|
||
self.exeCmd('turnOnAlarm')
|
||
|
||
def unalarming(self):
|
||
if self.alarm:
|
||
if self.closed & 0xFF == 0xF0:
|
||
self.exeCmd('turnOnGreen')
|
||
self.exeCmd('turnOffRed')
|
||
self.exeCmd('turnOffAlarm')
|
||
else:
|
||
self.exeCmd('turnOffGreen')
|
||
self.exeCmd('turnOffRed')
|
||
self.exeCmd('turnOffAlarm')
|
||
|
||
def open(self):
|
||
"""打开串口连接"""
|
||
self.serial = serial.Serial(self.port, self.baudrate, timeout=self.timeout)
|
||
if not self.serial.is_open:
|
||
self.errorCode = 0x0001
|
||
return -1
|
||
else:
|
||
self.errorCode = 0
|
||
return 0
|
||
|
||
def close(self):
|
||
if self.serial.is_open:
|
||
self.serial.close()
|
||
|
||
def run(self):
|
||
"""主运行循环"""
|
||
try:
|
||
while True:
|
||
# self.logger.info(f"optFlag={self.optFlag}")
|
||
match self.optFlag:
|
||
case 0:
|
||
if self.open() == 0:
|
||
ret0 = self.exeCmd('openBreaker')
|
||
# self.logger.info(f"setOverLimitValues ret: {ret0}")
|
||
ret1 = self.exeCmd('setOverLimitValues')
|
||
# self.logger.info(f"setOverLimitValues ret: {ret1}")
|
||
ret2 = self.exeCmd('readOverLimitValues')
|
||
self.logger.info(f"readOverLimitValues ret: {ret2}")
|
||
ret3 = self.exeCmd('setOverLimitActionTime')
|
||
# self.logger.info(f"setOverLimitValues ret: {ret3}")
|
||
ret4 = self.exeCmd('readOverLimitActionTime')
|
||
self.logger.info(f"readOverLimitActionTime ret: {ret4}")
|
||
if ret0[0] and ret1[0] and ret3[0]:
|
||
self.optFlag = 1
|
||
continue
|
||
self.optFlag = -1
|
||
self.errorCode = 0x0101
|
||
case 1:
|
||
time.sleep(0.2)
|
||
ret = self.exeCmd('readAllDatas')
|
||
self.logger.info(f"readAllDatas ret: {wordData2HexStr(ret[1])}")
|
||
if ret[0]:
|
||
self.parseData('readAllDatas', ret[1])
|
||
continue
|
||
self.optFlag = -1
|
||
self.errorCode = 0x0101
|
||
case 2:
|
||
ret = self.exeCmd('openBreaker')
|
||
if ret[0]:
|
||
self.optFlag = 1
|
||
continue
|
||
self.optFlag = -1
|
||
self.errorCode = 0x0101
|
||
case 3:
|
||
ret = self.exeCmd('closeBreaker')
|
||
if ret[0]:
|
||
self.optFlag = 1
|
||
continue
|
||
self.optFlag = -1
|
||
self.errorCode = 0x0101
|
||
case _:
|
||
time.sleep(1)
|
||
self.close()
|
||
self.optFlag = 0
|
||
except Exception as e:
|
||
self.close()
|
||
self.logger.info(f"Error in Breader: run(), {e}")
|
||
|
||
class GPS:
|
||
def __init__(self, config:dict, logger):
|
||
self.status = -1
|
||
self.logger = logger
|
||
self.config = config
|
||
self.port = config.get('port', '/dev/ttyLP4')
|
||
if self.port != '/dev/ttyLP4':
|
||
self.status = -201
|
||
self.baudrate = config.get('baudrate', 9600)
|
||
self.timeout = config.get('timeout', 1)
|
||
self.optFlag = 0
|
||
self.gps_data = {'latitude': 0.0, 'longitude': 0.0, 'altitude': 0.0, 'speed': 0.0}
|
||
|
||
def read_data(self):
|
||
"""从串口读取GPS数据"""
|
||
if not self.serial or not self.serial.is_open:
|
||
return -1
|
||
try:
|
||
# 读取NMEA数据 (简化示例,实际需要解析NMEA语句)
|
||
line = self.serial.readline().decode('ascii', errors='ignore').strip()
|
||
if line.startswith('$GNGGA') or line.startswith('$GPGGA') or line.startswith('$BDGGA'):
|
||
# 示例解析GPGGA语句 (实际应用中需要更健壮的解析)
|
||
parts = line.split(',')
|
||
if len(parts) > 9:
|
||
try:
|
||
# 纬度格式转换: ddmm.mmmm -> 十进制
|
||
lat = (float(parts[2][:2]) if parts[2] else 0.0) + (float(parts[2][2:]) if parts[2] else 0.0)/60.0
|
||
if parts[3] == 'S':
|
||
lat = -lat
|
||
|
||
# 经度格式转换: dddmm.mmmm -> 十进制
|
||
lon = (float(parts[4][:3]) if parts[4] else 0.0) + (float(parts[4][3:]) if parts[4] else 0.0)/60.0
|
||
if parts[5] == 'W':
|
||
lon = -lon
|
||
|
||
# 海拔高度
|
||
alt = float(parts[9]) if parts[9] else 0.0
|
||
|
||
self.gps_data = {
|
||
'latitude': lat,
|
||
'longitude': lon,
|
||
'altitude': alt,
|
||
'speed': 0.0 # GPGGA不包含速度,需要从GPRMC获取
|
||
}
|
||
return 0
|
||
except (ValueError, IndexError) as e:
|
||
raise Exception(f"Error in parse GPS data: {e}")
|
||
except Exception as e:
|
||
self.logger.error(f"Error in read_gps_data(): {e}")
|
||
|
||
def open(self):
|
||
"""打开串口连接"""
|
||
self.serial = serial.Serial(self.port, self.baudrate, timeout=self.timeout)
|
||
if not self.serial.is_open:
|
||
self.status = -101
|
||
return -1
|
||
else:
|
||
self.status = 0
|
||
return 0
|
||
|
||
def close(self):
|
||
self.serial.close()
|
||
self.status = -100
|
||
|
||
def run(self):
|
||
"""主运行循环"""
|
||
try:
|
||
while True:
|
||
match self.optFlag:
|
||
case 0:
|
||
if self.open() == 0:
|
||
self.optFlag = 1
|
||
else:
|
||
self.optFlag = -1
|
||
case 1:
|
||
ret = self.read_data()
|
||
if ret != 0:
|
||
self.optFlag = -1
|
||
continue
|
||
self.logger.info(str(self.gps_data))
|
||
case _:
|
||
time.sleep(5)
|
||
self.close()
|
||
self.open()
|
||
self.optFlag = 0
|
||
except KeyboardInterrupt:
|
||
self.close()
|
||
self.logger.info("Modbus Serial TCP Client stopped.")
|
||
|
||
class HSDAQ:
|
||
def __init__(self, config:dict, logger):
|
||
try:
|
||
self.config = config
|
||
self.logger = logger
|
||
result = subprocess.run(["ip","neigh","add", "192.168.0.2", "lladdr","00:0A:35:01:FE:C0", "dev", "ethernet0"], capture_output=True, text=True, encoding="utf-8")
|
||
if result.returncode != 0:
|
||
self.logger.info(result.stderr)
|
||
# result = subprocess.run(["sudo","ethtool","-s", "ethernet0", "speed", "100", "duplex", "full", "autoneg", "off"], capture_output=True, text=True, encoding="utf-8")
|
||
# if result.returncode != 0:
|
||
# self.logger.info(result.stderr)
|
||
|
||
# 设置允许强制修改缓存区大小
|
||
self.sock = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.ntohs(0x0003))
|
||
SO_RCVBUFORCE = 33
|
||
self.sock.setsockopt(socket.SOL_SOCKET, SO_RCVBUFORCE, 1024 * 1024 * 25)
|
||
# 设置 SO_NO_CHECK 选项,使用整数值 11
|
||
SO_NO_CHECK = 11
|
||
self.sock.setsockopt(socket.SOL_SOCKET, SO_NO_CHECK, 0)
|
||
actual_buf_size = self.sock.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF)
|
||
self.logger.info(f"Requested UDP buffer: 50MB, Actual UDP buffer: {actual_buf_size/1024/1024:.2f}MB")
|
||
self.sock.bind(('ethernet0', 0))
|
||
|
||
self.dataFileDir = self.config['output_dir']
|
||
self.file_type = self.config.get('file_type', 1)
|
||
if self.file_type not in [0, 1]:
|
||
self.file_type = 1
|
||
|
||
self.save_flag = self.config.get('save_flag', 0xffff)
|
||
self.channels = self.config.get('channels', 16)
|
||
|
||
if not os.path.exists(self.dataFileDir):
|
||
os.makedirs(self.dataFileDir)
|
||
for i in range(self.channels):
|
||
os.makedirs(os.path.join(self.config['output_dir'], f"{i+1:02}"), exist_ok=True)
|
||
|
||
|
||
self.buffer = b''
|
||
self.feature_data = {}
|
||
self.frequency = [0]*16
|
||
self.reg_values = []
|
||
|
||
self.daqBoardNo = self.config.get('daq_board_no', 'XXXXXXXXXX')
|
||
self.sensor_type = self.config.get('sensor_type', 0xffffffff)
|
||
|
||
self.feature_type = self.config.get('feature_type', '加速度rms')
|
||
self.min_vol_cur_phy_value = self.config.get('min_vol_cur_phy_value', 0.0)
|
||
self.max_vol_cur_phy_value = self.config.get('max_vol_cur_phy_value', 160.0)
|
||
self.scale = self.config.get('vol_cur_phy_scale', 1)
|
||
|
||
self.sample_time = self.config.get('sample_time', 1000)
|
||
self.sample_period = self.config.get('sample_period', 4000)
|
||
self.one_sample_time = self.config.get('one_sample_time', 10)
|
||
self.sample_rate = int(1000000/self.one_sample_time)
|
||
|
||
self.sample_points = int(self.sample_time*1000/self.one_sample_time)
|
||
self.mode = self.config.get('mode', 0)
|
||
self.alias = config.get('alias', {})
|
||
for i in range(16):
|
||
if f'CH{i+1}' not in self.alias:
|
||
self.alias[f'CH{i+1}'] = ''
|
||
|
||
self.cmdList = {'startDAQ': bytes.fromhex(f"DDDD 0001 {self.sample_time*1000:08X} {(self.sample_period-self.sample_time)*1000:08X} {self.one_sample_time:08X} {self.sensor_type:08X}"),
|
||
'stopDAQ': bytes.fromhex(f"DDDD 0000 {self.sample_time*1000:08X} {(self.sample_period-self.sample_time)*1000:08X} {self.one_sample_time:08X} {self.sensor_type:08X}")}
|
||
|
||
_sensor_Vol_CalibParam = {
|
||
'CH1': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH2': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH3': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH4': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH5': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH6': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH7': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH8': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH9': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH10': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH11': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH12': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH13': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH14': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH15': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH16': {'K2':0.0, 'K': 1.0, 'B': 0.0}
|
||
}
|
||
_sensor_Cur_CalibParam = {
|
||
'CH1': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH2': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH3': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH4': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH5': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH6': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH7': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH8': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH9': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH10': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH11': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH12': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH13': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH14': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH15': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH16': {'K2':0.0, 'K': 1.0, 'B': 0.0}
|
||
}
|
||
_sensor_Vib_CalibParam = {
|
||
'CH1': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH2': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH3': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH4': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH5': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH6': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH7': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH8': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH9': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH10': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH11': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH12': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH13': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH14': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH15': {'K2':0.0, 'K': 1.0, 'B': 0.0},
|
||
'CH16': {'K2':0.0, 'K': 1.0, 'B': 0.0}
|
||
}
|
||
self.sensor_Vol_CalibParam = self.config.get('sensor_Vol_CalibParam', _sensor_Vol_CalibParam)
|
||
self.sensor_Cur_CalibParam = self.config.get('sensor_Cur_CalibParam', _sensor_Cur_CalibParam)
|
||
self.sensor_Vib_CalibParam = self.config.get('sensor_Vib_CalibParam', _sensor_Vib_CalibParam)
|
||
self.warning_values = {
|
||
'CH1': 0,
|
||
'CH2': 0,
|
||
'CH3': 0,
|
||
'CH4': 0,
|
||
'CH5': 0,
|
||
'CH6': 0,
|
||
'CH7': 0,
|
||
'CH8': 0,
|
||
'CH9': 0,
|
||
'CH10': 0,
|
||
'CH11': 0,
|
||
'CH12': 0,
|
||
'CH13': 0,
|
||
'CH14': 0,
|
||
'CH15': 0,
|
||
'CH16': 0
|
||
}
|
||
self.logger.info(f"DAQ thread starts. Address of DAQ board: IP={self.config['host']}, port={self.config['port']}")
|
||
except Exception as e:
|
||
self.logger.error(f"Error in __init__(): {e}")
|
||
time.sleep(5)
|
||
self.__init__()
|
||
|
||
def update_config(self):
|
||
self.file_type = self.config.get('file_type')
|
||
if self.file_type not in [0, 1]:
|
||
self.file_type = 1
|
||
self.save_flag = self.config.get('save_flag')
|
||
self.sensor_type = self.config.get('sensor_type')
|
||
self.sample_time = self.config.get('sample_time')
|
||
self.sample_period = self.config.get('sample_period')
|
||
self.one_sample_time = self.config.get('one_sample_time')
|
||
self.sample_points = int(self.sample_time*1000/self.one_sample_time)
|
||
self.mode = self.config.get('mode')
|
||
if self.mode not in [0, 1]:
|
||
self.mode = 1
|
||
|
||
self.cmdList = {'startDAQ': bytes.fromhex(f"DDDD 0001 {self.sample_time*1000:08X} {(self.sample_period-self.sample_time)*1000:08X} {self.one_sample_time:08X} {self.sensor_type:08X}"),
|
||
'stopDAQ': bytes.fromhex(f"DDDD 0000 {self.sample_time*1000:08X} {(self.sample_period-self.sample_time)*1000:08X} {self.one_sample_time:08X} {self.sensor_type:08X}")}
|
||
self.sensor_Vol_CalibParam = self.config.get('sensor_Vol_CalibParam')
|
||
self.sensor_Cur_CalibParam = self.config.get('sensor_Cur_CalibParam')
|
||
self.sensor_Vib_CalibParam = self.config.get('sensor_Vib_CalibParam')
|
||
|
||
def start_DAQ(self):
|
||
"""发送启动采集指令"""
|
||
try:
|
||
self.buffer = bytearray()
|
||
if hasattr(self, 'udpsock') and self.udpsock:
|
||
self.udpsock.close()
|
||
self.udpsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||
# self.udpsock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024 * 25)
|
||
self.udpsock.bind((self.config['local_host'], self.config['local_port']))
|
||
self.udpsock.sendto(self.cmdList['startDAQ'], (self.config['host'], self.config['port']))
|
||
self.udpsock.close()
|
||
self.logger.info(f"Send start command to DAQ board. {self.cmdList['startDAQ'].hex()}")
|
||
except Exception as e:
|
||
self.logger.error(f"Error in start_DAQ(): {e}")
|
||
|
||
def stop_DAQ(self):
|
||
"""发送停止采集指令"""
|
||
try:
|
||
self.buffer = bytearray()
|
||
if hasattr(self, 'udpsock') and self.udpsock:
|
||
self.udpsock.close()
|
||
self.udpsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||
# self.udpsock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024 * 25)
|
||
self.udpsock.bind((self.config['local_host'], self.config['local_port']))
|
||
self.udpsock.sendto(self.cmdList['stopDAQ'], (self.config['host'], self.config['port']))
|
||
self.udpsock.close()
|
||
self.logger.info(f"Send stop command to DAQ board. {self.cmdList['stopDAQ'].hex()}")
|
||
except Exception as e:
|
||
self.logger.error(f"Error in stop_DAQ(): {e}")
|
||
|
||
def _get_dir_size(self, path: Path) -> int:
|
||
"""利用 Linux 的 du,返回目录本身已占用字节数,毫秒级"""
|
||
return int(subprocess.check_output(
|
||
['du', '-sb', str(path)], text=True).split()[0])
|
||
|
||
def _oldest_file(self, path: Path):
|
||
"""返回目录中最旧的普通文件 Path 对象,没有则返回 None"""
|
||
with os.scandir(path) as it:
|
||
files = [e for e in it if e.is_file()]
|
||
if not files:
|
||
return None
|
||
# 按修改时间升序
|
||
return Path(min(files, key=lambda e: e.stat().st_mtime).path)
|
||
|
||
def save_data(self):
|
||
"""保存数据到文件"""
|
||
try:
|
||
#判断磁盘剩余空间是否小于1G,如果是从16通道的旧文件目录中删除文件
|
||
channels = self.channels
|
||
# usage = shutil.disk_usage("C:/")
|
||
# while usage.free < self.config['daq']['min_free_gb']*1024*1024*1024:
|
||
# #获取目录下文件列表,并按照降序排序,如果硬盘空间小于阈值,删除旧的文件
|
||
# for i in range(channels):
|
||
# os.makedirs(os.path.join(self.config['daq']['output_dir'], f"{i+1:02}"), exist_ok=True)
|
||
# fileList = os.listdir(f"C:/users/Administrator/PCM/data/{i+1:02}")
|
||
# fileList.sort(reverse=False)
|
||
# if os.path.exists(f"C:/users/Administrator/PCM/data/{i+1:02}/{fileList[0]}"):
|
||
# os.remove(f"C:/users/Administrator/PCM/data/{i+1:02}/{fileList[0]}")
|
||
# # os.remove(f"C:/users/Administrator/PCM/data/{fileList[1]}")
|
||
# usage = shutil.disk_usage("C:/")
|
||
|
||
target_dir = Path(self.dataFileDir)
|
||
max_usage_gb = 5
|
||
max_usage_bytes = max_usage_gb * 1024**3
|
||
channels = self.channels
|
||
while True:
|
||
if self._get_dir_size(target_dir) <= max_usage_bytes:
|
||
break
|
||
for ch in range(channels):
|
||
ch_path = target_dir / f'{ch+1:02d}'
|
||
ch_path.mkdir(parents=True, exist_ok=True)
|
||
victim = self._oldest_file(ch_path)
|
||
if victim:
|
||
victim.unlink()
|
||
self.reg_values = []
|
||
timestamp = time.time()
|
||
timeStr = datetime.fromtimestamp(timestamp).strftime("%Y%m%d%H%M%S")
|
||
# with open(filename, 'wb') as f:
|
||
# f.write(self.buffer)
|
||
# f.close()
|
||
print(f"Length of buffer: {len(self.buffer)}")
|
||
datas = np.frombuffer(self.buffer, dtype='>h')
|
||
print(f"Length of datas: {len(datas)}")
|
||
datas = datas[:int(len(datas)/channels)*channels]
|
||
datas = datas.reshape(-1, channels)
|
||
data = None
|
||
_s = f"{self.sensor_type:032b}"
|
||
_sensor_type = ''.join([_s[2*i:2*i+2] for i in range(channels-1, -1, -1)])
|
||
print(f"save_flag = {self.save_flag}")
|
||
_save_flag = f"{self.save_flag:016b}"[::-1]
|
||
if 'calib_params' in self.config and 'vibration' in self.config['calib_params']:
|
||
_fre = self.config['calib_params']['vibration'].get('frequency', -1)
|
||
else:
|
||
_fre = -1
|
||
|
||
for i in range(channels):
|
||
j = i + 1
|
||
if _fre != -1 and _fre > 0 and self.config['mode'] == 1:
|
||
_len = len(datas[:,i])//_fre*_fre
|
||
_data = datas[0:_len,i]
|
||
else:
|
||
_data = datas[:,i]
|
||
|
||
_data = _data[0:20]
|
||
log_data = np.log(np.abs(_data) + 1e-300)
|
||
log_mean_squared = 2 * np.mean(log_data) + np.log(len(_data))
|
||
_rms = np.exp(0.5 * log_mean_squared) / self.scale
|
||
|
||
# _rms = np.sqrt(np.mean(_data**2))/self.scale
|
||
_min = np.min(_data)
|
||
_max = np.max(_data)
|
||
_mean = np.mean(_data)
|
||
|
||
# if (_max - _mean) * 5 < (_mean - _min):
|
||
# _mean = np.mean(_data[0:20])
|
||
# _min = np.min(_data[0:20])
|
||
# _max = np.max(_data[0:20])
|
||
# _rms = np.sqrt(np.mean(np.square(_data[0:20])))
|
||
|
||
self.feature_data[f'CH{j}'] = {
|
||
'min': _min/self.scale,
|
||
'max': _max/self.scale,
|
||
'mean': _mean/self.scale,
|
||
'std': np.std(_data)/self.scale,
|
||
'rms': _rms,
|
||
'sr0': 0.0,
|
||
'sr1': 0.0,
|
||
'sr2': 0.0,
|
||
'sr3': 0.0,
|
||
'sr4': 0.0
|
||
}
|
||
rms = self.feature_data[f'CH{j}']['rms']
|
||
mean = self.feature_data[f'CH{j}']['mean']
|
||
|
||
filename = ''
|
||
match _sensor_type[2*i:2*i+2]:
|
||
case '00':
|
||
# 计算频率,以Hz为单位
|
||
self.feature_data[f'CH{j}']['sr0'] = self.calculateFrequency(datas[:, i], self.one_sample_time)
|
||
self.reg_values.extend(list(self.feature_data[f'CH{j}'].values()))
|
||
data = datas[:, i]
|
||
filename = os.path.join(self.dataFileDir, f"{j:02}/{timeStr}_{rms:.3f}_{self.feature_type}_{self.daqBoardNo}_{j:02}_N_{self.sample_rate}_{self.sample_points}_0.000_{mean:.3f}_{self.min_vol_cur_phy_value}_{self.max_vol_cur_phy_value}_{self.scale}")
|
||
case '01':
|
||
# 计算声音大小
|
||
self.reg_values.extend(list(self.feature_data[f'CH{j}'].values()))
|
||
data = datas[:, i]
|
||
filename = os.path.join(self.dataFileDir, f"{j:02}/{timeStr}_{rms:.3f}_{self.feature_type}_{self.daqBoardNo}_{j:02}_N_{self.sample_rate}_{self.sample_points}_0.000_{mean:.3f}_{self.min_vol_cur_phy_value}_{self.max_vol_cur_phy_value}_{self.scale}")
|
||
case '10':
|
||
# 计算电流大小
|
||
data = datas[:, i]
|
||
if self.mode != 1:
|
||
for k, v in self.feature_data[f'CH{j}'].items():
|
||
self.feature_data[f'CH{j}'][k] = v**2*self.sensor_Cur_CalibParam[f'CH{j}']['K2'] + v*self.sensor_Cur_CalibParam[f'CH{j}']['K'] + self.sensor_Cur_CalibParam[f'CH{j}']['B']
|
||
data = (datas[:, i]**2*self.sensor_Cur_CalibParam[f'CH{j}']['K2'] + datas[:, i]*self.sensor_Cur_CalibParam[f'CH{j}']['K']+self.sensor_Cur_CalibParam[f'CH{j}']['B']).astype(np.float32)
|
||
self.reg_values.extend(list(self.feature_data[f'CH{j}'].values()))
|
||
filename = os.path.join(self.dataFileDir, f"{j:02}/{timeStr}_{rms:.3f}_{self.feature_type}_{self.daqBoardNo}_{j:02}_N_{self.sample_rate}_{self.sample_points}_0.000_{mean:.3f}_{self.min_vol_cur_phy_value}_{self.max_vol_cur_phy_value}_{self.scale}")
|
||
case '11':
|
||
# 计算振动大小
|
||
data = datas[:, i]
|
||
if self.mode != 1:
|
||
for k, v in self.feature_data[f'CH{j}'].items():
|
||
self.feature_data[f'CH{j}'][k] = v**2*self.sensor_Vib_CalibParam[f'CH{j}']['K2'] + v*self.sensor_Vib_CalibParam[f'CH{j}']['K'] + self.sensor_Vib_CalibParam[f'CH{j}']['B']
|
||
data = (datas[:, i]**2*self.sensor_Vib_CalibParam[f'CH{j}']['K2'] + datas[:, i]*self.sensor_Vib_CalibParam[f'CH{j}']['K']+self.sensor_Vib_CalibParam[f'CH{j}']['B']).astype(np.float32)
|
||
self.feature_data[f'CH{j}']['rms'] = np.sqrt(np.mean(data**2))
|
||
self.feature_data[f'CH{j}']['sr0'] = self.feature_data[f'CH{j}']['rms']*np.sqrt(2)
|
||
else:
|
||
self.feature_data[f'CH{j}']['sr0'] = self.feature_data[f'CH{j}']['std']*np.sqrt(2)
|
||
self.reg_values.extend(list(self.feature_data[f'CH{j}'].values()))
|
||
filename = os.path.join(self.dataFileDir, f"{j:02}/{timeStr}_{rms:.3f}_{self.feature_type}_{self.daqBoardNo}_{j:02}_N_{self.sample_rate}_{self.sample_points}_0.000_{mean:.3f}_{self.min_vol_cur_phy_value}_{self.max_vol_cur_phy_value}_{self.scale}")
|
||
case _:
|
||
pass
|
||
|
||
# 将数据写入文件
|
||
if self.file_type == 1:
|
||
filename += '.bin'
|
||
if _save_flag[i] == '1':
|
||
if isinstance(data, np.ndarray):
|
||
try:
|
||
temp_data = data.astype('>f4')
|
||
bytes_data = temp_data.tobytes()
|
||
with open(filename, 'wb', buffering=0) as f:
|
||
# 1. 正常写
|
||
f.write(bytes_data)
|
||
|
||
# 2. 告诉内核:整个文件以后大概率不读,页 cache 可以立即回收
|
||
fd = os.open(filename, os.O_RDONLY)
|
||
try:
|
||
# POSIX_FADV_DONTNEED = 4
|
||
os.posix_fadvise(fd, 0, 0, os.POSIX_FADV_DONTNEED)
|
||
finally:
|
||
os.close(fd)
|
||
del temp_data, bytes_data, data
|
||
finally:
|
||
if 'temp_data' in locals():
|
||
del temp_data
|
||
if 'bytes_data' in locals():
|
||
del bytes_data
|
||
self.logger.debug(f"Success to save data to {filename}.")
|
||
else:
|
||
filename += '.csv'
|
||
if _save_flag[i] == '1':
|
||
if isinstance(data, np.ndarray):
|
||
with open(filename, 'w') as f:
|
||
# 使用生成器表达式避免创建巨大列表
|
||
lines = (f"{num:.4f}\n" for num in data)
|
||
f.writelines(lines)
|
||
self.logger.debug(f"Saved data to {filename}.")
|
||
del lines
|
||
self.warning_check()
|
||
data = None
|
||
datas = None
|
||
self.buffer = bytearray()
|
||
self._force_memory_cleanup()
|
||
except Exception as e:
|
||
self.logger.error(f"Error in save_data(): {e}")
|
||
|
||
def warning_check(self):
|
||
"""检查是否有报警条件"""
|
||
for i in range(self.channels):
|
||
ch = f'CH{i+1}'
|
||
val = self.feature_data[ch]['mean']
|
||
wp = self.config.get('warning_param', {})
|
||
enable_bits = f"{wp.get('enable', 0):016b}"[::-1]
|
||
if enable_bits[i] == '1':
|
||
low_limit = wp.get(ch, {}).get('lower', float('-inf'))
|
||
high_limit = wp.get(ch, {}).get('upper', float('inf'))
|
||
if val < low_limit or val > high_limit:
|
||
# self.logger.warning(f"Warning: {ch} value {val} out of limits ({low_limit}, {high_limit})")
|
||
self.warning_values[ch] = 1
|
||
else:
|
||
self.warning_values[ch] = 0
|
||
|
||
def _force_memory_cleanup(self):
|
||
"""强制内存清理"""
|
||
import gc
|
||
# 清除各种缓存
|
||
if hasattr(np, 'getbufsize'):
|
||
np.setbufsize(32768)
|
||
|
||
# 强制垃圾回收
|
||
gc.collect()
|
||
gc.collect() # 两次确保回收
|
||
|
||
# 稍微等待让系统处理
|
||
import time
|
||
time.sleep(0.01)
|
||
|
||
def calculateFrequency(self, signal, oneSampleTime):
|
||
'''计算0-1变换的数组中0-1变化的次数, 并计算其频率'''
|
||
# oneSampleTime 单位为us
|
||
if len(signal) < 2:
|
||
return 0.0 # 信号太短无法计算频率
|
||
transitions = 0 # 跳变次数计数器
|
||
# 遍历数组计算跳变次数
|
||
for i in range(1, len(signal)):
|
||
if signal[i] != signal[i-1]:
|
||
transitions += 1
|
||
# 计算频率:
|
||
# 每个周期有2次跳变(0→1和1→0)
|
||
# 总时间 = 采样点数 / 采样率
|
||
# 频率 = (跳变次数 / 2) / (总时间)
|
||
total_time = len(signal) * oneSampleTime / 1000000
|
||
frequency = (transitions // 2) / total_time if total_time > 0 else 0.0
|
||
return frequency
|
||
|
||
def run(self):
|
||
"""主运行循环"""
|
||
self.logger.info(f"Start DAQ thread.")
|
||
frame_size = self.config.get('frame_size_max', 1464)
|
||
FILESIZE = self.config.get('file_size', 32000000)
|
||
DATA_DIR = self.config.get('output_dir', 'data')
|
||
optFlag = 0
|
||
# 清空接收缓存,并向DAQ模块发送启动采集指令
|
||
self.buffer = b''
|
||
self.sampleNum = 0
|
||
self.stop_DAQ()
|
||
time.sleep(0.01)
|
||
self.start_DAQ()
|
||
lastFrameNo = 0
|
||
cycles = 0
|
||
while(True):
|
||
try:
|
||
data, addr = self.sock.recvfrom(frame_size+42)
|
||
# 如何返回了数据,数据起始符正确,包号正确,则存储数据
|
||
if data:
|
||
data = data[42:]
|
||
nowFrameNo = int.from_bytes(data[4:8], 'big')
|
||
if nowFrameNo != lastFrameNo + 1:
|
||
print(f"Received data: len={len(data)}, lastFrameNo={lastFrameNo}, nowFrameNo={nowFrameNo}")
|
||
# self.logger.info(f"Received data: len={len(data)}, frame NO.={int.from_bytes(data[4:8], 'big')}")
|
||
else:
|
||
continue
|
||
# self.logger.debug(f"Head Data:(20 byte) ={' '.join(data[i:i+2].hex() for i in range(0, 20, 2))}")
|
||
|
||
if optFlag == 0:
|
||
if data and data[0:4] == bytearray([0xa5, 0x5a, 0xa5, 0x5a]) and len(data) == frame_size and data[4:8] == bytearray([0x00, 0x00, 0x00, 0x01]):
|
||
self.buffer += data[24:]
|
||
self.sampleNum += ((len(data)-24)/32)
|
||
# if self.sensorType != int.from_bytes(data[8:12], 'big'):
|
||
# self.logger.error(f"In daq_thread(): SensorType in return data doesn't match with config.")
|
||
optFlag = 1
|
||
elif optFlag == 1:
|
||
if data and data[0:4] == bytearray([0xa5, 0x5a, 0xa5, 0x5a]):
|
||
if len(data) != frame_size or data[5:8] == bytearray([0x00, 0x00, 0x01]):
|
||
if data[4:8] != bytearray([0x00, 0x00, 0x00, 0x01]):
|
||
self.buffer += data[24:]
|
||
self.sampleNum += ((len(data)-24)/32)
|
||
|
||
cycles += 1
|
||
self.save_data()
|
||
self.buffer = bytearray()
|
||
self.sampleNum = 0
|
||
optFlag = 0
|
||
else:
|
||
self.buffer += data[24:]
|
||
self.sampleNum += ((len(data)-24)/32)
|
||
|
||
if nowFrameNo - lastFrameNo != 1 and lastFrameNo < nowFrameNo:
|
||
if lastFrameNo != 0:
|
||
pass
|
||
self.logger.warning(f"cycles= {cycles}, lastFrameNo={lastFrameNo}, nowFrameNo={nowFrameNo}")
|
||
# raise Exception()
|
||
if nowFrameNo != 0:
|
||
lastFrameNo = nowFrameNo
|
||
except Exception as e:
|
||
self.stop_DAQ()
|
||
self.sock.close()
|
||
self.running = False
|
||
self.buffer = bytearray()
|
||
self.logger.error(f"Error in daq_thread(): {e}")
|
||
self.logger.info(f"Stop DAQ thread.")
|
||
break
|
||
|
||
class InfluxDBWriter:
|
||
def __init__(self, url="http://localhost:8086", token="", org="my-org", bucket="my-bucket"):
|
||
"""
|
||
初始化 InfluxDB 客户端
|
||
|
||
参数:
|
||
url: InfluxDB 地址,host模式下使用 http://localhost:8086
|
||
token: API token,格式为 "username:password" 或 token字符串
|
||
org: 组织名称
|
||
bucket: 存储桶名称
|
||
"""
|
||
self.client = InfluxDBClient(url=url, token=token, org=org)
|
||
self.write_api = self.client.write_api(write_options=SYNCHRONOUS)
|
||
self.bucket = bucket
|
||
self.org = org
|
||
|
||
def write_sensor_data(self, measurement, tags, fields):
|
||
"""
|
||
写入传感器数据到 InfluxDB
|
||
|
||
参数:
|
||
measurement: 测量名称 (类似表名)
|
||
tags: 标签字典,用于索引和分组 (如: {"device": "sensor1", "location": "factory"})
|
||
fields: 字段字典,存储实际数据 (如: {"temperature": 25.6, "humidity": 60.2})
|
||
"""
|
||
try:
|
||
# 创建数据点
|
||
point = Point(measurement)
|
||
|
||
# 添加标签
|
||
for tag_key, tag_value in tags.items():
|
||
point = point.tag(tag_key, tag_value)
|
||
|
||
# 添加字段
|
||
for field_key, field_value in fields.items():
|
||
point = point.field(field_key, field_value)
|
||
|
||
# 写入数据
|
||
self.write_api.write(bucket=self.bucket, record=point)
|
||
print(f"[{datetime.now()}]数据写入成功: {point.to_line_protocol()}")
|
||
|
||
except Exception as e:
|
||
print(f"写入数据时出错: {e}")
|
||
|
||
def write_batch_data(self, points):
|
||
"""
|
||
批量写入多个数据点
|
||
"""
|
||
try:
|
||
self.write_api.write(bucket=self.bucket, record=points)
|
||
print(f"[{datetime.now()}]批量写入成功,共 {len(points)} 个数据点")
|
||
except Exception as e:
|
||
print(f"批量写入时出错: {e}")
|
||
|
||
def close(self):
|
||
"""关闭连接"""
|
||
self.client.close()
|
||
|
||
class ModbusGateway:
|
||
def __init__(self):
|
||
# 初始化logger
|
||
config_file = 'src/config-1.2-debug.yaml'
|
||
config_file_temp = 'config-1.2-debugcopy.yaml'
|
||
with open('src/logging-config.json', 'r') as f:
|
||
logging.config.dictConfig(json.load(f))
|
||
self.logger = logging.getLogger('PCM')
|
||
|
||
self.config_manager = ConfigManager(
|
||
regs_config_file='src/regs-mapping-1.2-debug.yaml',
|
||
config_file=config_file,
|
||
logger=self.logger)
|
||
|
||
self.taskInfo = {'status':0x0000, 'running_time':0, 'period':0}
|
||
self.taskInfo['period'] = self.config_manager.config['task']['period']*60
|
||
|
||
# 初始化influxdb client
|
||
self.influx_client = InfluxDBWriter(
|
||
url=self.config_manager.config['influxdb'].get('url', 'http://localhost:8086'),
|
||
token=self.config_manager.config['influxdb'].get('token', 'PCM:1842moon'),
|
||
org=self.config_manager.config['influxdb'].get('org', 'MEASCON'),
|
||
bucket=self.config_manager.config['influxdb'].get('bucket', 'PCM')
|
||
)
|
||
|
||
# 连接plc server
|
||
self.plc_host = self.config_manager.config['plc-server'].get('host', '172.22.0.3')
|
||
self.plc_port = self.config_manager.config['plc-server'].get('port', 5020)
|
||
self.plc_client = ModbusTcpClient(self.plc_host, port=self.plc_port)
|
||
self.plc_measurements = self.config_manager.config['plc-server'].get('measurements', {})
|
||
|
||
# 创建本地modbus tcp服务器
|
||
self.max_address = 1300
|
||
self.host = self.config_manager.config['modbus-server']['host']
|
||
self.port = self.config_manager.config['modbus-server']['port']
|
||
|
||
self.holding_registers = ModbusSequentialDataBlockForPCM(self.config_manager, self.logger, 0x00, [0]*(self.max_address+1))
|
||
|
||
# 创建数据存储
|
||
store = ModbusSlaveContext(
|
||
di=ModbusSequentialDataBlock(0, [0]*1),
|
||
co=ModbusSequentialDataBlock(0, [0]*1),
|
||
# hr=ModbusSequentialDataBlock(0, [0]*1000),
|
||
hr=self.holding_registers,
|
||
ir=ModbusSequentialDataBlock(0, [0]*1))
|
||
|
||
self.context = ModbusServerContext(slaves={1:store}, single=False)
|
||
|
||
# 启动服务器线程
|
||
self.modbus_sever = threading.Thread(
|
||
target=StartTcpServer,
|
||
# kwargs={"context": self.context, "address": (self.host, self.port)})
|
||
kwargs={"context": self.context, "address": ('0.0.0.0', self.port)})
|
||
self.modbus_sever.daemon = True
|
||
self.modbus_sever.start()
|
||
self.logger.info(f"Local modbusTCP service starts, IP={self.host}, port={self.port}")
|
||
|
||
self.gps = GPS(self.config_manager.config['gps'], self.logger)
|
||
self.breaker = Breaker(self.config_manager.config['breaker'], self.logger)
|
||
self.lsdaq = LSDAQ(self.config_manager.config['lsdaq'], self.logger)
|
||
self.hsdaq = HSDAQ(self.config_manager.config['hsdaq'], self.logger)
|
||
|
||
self.gps_thread = threading.Thread(target=self.gps.run)
|
||
self.gps_thread.daemon = True
|
||
self.gps_thread.start()
|
||
|
||
self.breaker_thread = threading.Thread(target=self.breaker.run)
|
||
self.breaker_thread.daemon = True
|
||
self.breaker_thread.start()
|
||
|
||
self.lsdaq_thread = threading.Thread(target=self.lsdaq.run)
|
||
self.lsdaq_thread.daemon = True
|
||
self.lsdaq_thread.start()
|
||
|
||
self.hsdaq_thread = threading.Thread(target=self.hsdaq.run)
|
||
self.hsdaq_thread.daemon = True
|
||
self.hsdaq_thread.start()
|
||
|
||
# 启动配置服务(HTTP API)
|
||
config_service_port = self.config_manager.config.get('config-server', {}).get('port', 5000)
|
||
config_service_host = self.config_manager.config.get('config-server', {}).get('host', '127.0.0.1')
|
||
|
||
# 如果配置文件中指定了配置文件路径,使用它;否则使用默认的YAML配置文件
|
||
config_service_config_path = self.config_manager.config.get('config-server', {}).get('config_path', config_file_temp)
|
||
|
||
self.config_service = ConfigService(
|
||
default_config_path=config_service_config_path,
|
||
host=config_service_host,
|
||
port=config_service_port,
|
||
debug=False,
|
||
logger=self.logger,
|
||
)
|
||
self.config_service.start()
|
||
self.logger.info(f"Config service started on {config_service_host}:{config_service_port}")
|
||
|
||
# 任务状态持久化文件路径(独立文件,不会被外部覆盖)
|
||
self.task_state_file = '.task_state.json'
|
||
|
||
def _load_task_running_time(self) -> float:
|
||
"""
|
||
从独立的持久化文件加载任务累计运行时间
|
||
|
||
Returns:
|
||
float: 累计运行时间(秒),文件不存在时返回0
|
||
"""
|
||
try:
|
||
if not os.path.exists(self.task_state_file):
|
||
self.logger.info(f"Task state file not found: {self.task_state_file}, starting from 0")
|
||
return 0
|
||
|
||
with open(self.task_state_file, 'r', encoding='utf-8') as f:
|
||
data = json.load(f)
|
||
|
||
running_time = data.get('running_time', 0)
|
||
if running_time > 0:
|
||
self.logger.info(f"Loaded task running_time from {self.task_state_file}: {running_time:.2f}s")
|
||
return float(running_time)
|
||
|
||
except json.JSONDecodeError as e:
|
||
self.logger.error(f"Task state file corrupted: {e}, starting from 0")
|
||
return 0
|
||
except Exception as e:
|
||
self.logger.error(f"Error loading task running_time: {e}, starting from 0")
|
||
return 0
|
||
|
||
def _save_task_running_time(self, running_time: float) -> bool:
|
||
"""
|
||
保存任务累计运行时间到独立的持久化文件(原子写入)
|
||
|
||
Args:
|
||
running_time: 累计运行时间(秒)
|
||
|
||
Returns:
|
||
bool: 保存成功返回 True,失败返回 False
|
||
"""
|
||
try:
|
||
data = {
|
||
'running_time': running_time,
|
||
'last_update': datetime.now().isoformat(),
|
||
'version': '1.0'
|
||
}
|
||
|
||
# 原子写入:先写临时文件,再重命名
|
||
temp_file = self.task_state_file + '.tmp'
|
||
with open(temp_file, 'w', encoding='utf-8') as f:
|
||
json.dump(data, f, indent=2, ensure_ascii=False)
|
||
|
||
# 重命名(原子操作)
|
||
os.replace(temp_file, self.task_state_file)
|
||
return True
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"Error saving task running_time: {e}")
|
||
# 清理临时文件
|
||
try:
|
||
temp_file = self.task_state_file + '.tmp'
|
||
if os.path.exists(temp_file):
|
||
os.remove(temp_file)
|
||
except:
|
||
pass
|
||
return False
|
||
|
||
def _reset_task_running_time(self) -> bool:
|
||
"""
|
||
重置任务累计运行时间为0
|
||
|
||
Returns:
|
||
bool: 重置成功返回 True,失败返回 False
|
||
"""
|
||
self.logger.info("Resetting task running_time to 0")
|
||
return self._save_task_running_time(0)
|
||
|
||
def _update_modbus_datas(self):
|
||
"""更新本地Modbus服务器的寄存器"""
|
||
if not hasattr(self, 'context'):
|
||
self.logger.error("Local modbus tcp service isn't initilized.")
|
||
return
|
||
|
||
try:
|
||
# 获取本地服务器的slave上下文
|
||
slave = self.context[1]
|
||
holding_registers = self.holding_registers
|
||
|
||
gps_reg_values = list(self.gps.gps_data.values())
|
||
# print(f"gps_reg_values:{gps_reg_values}")
|
||
for i in range(len(gps_reg_values)):
|
||
holding_registers.server_set_values(2*i+1, float_to_registers(gps_reg_values[i])) # type: ignore
|
||
|
||
breaker_reg_values = list(self.breaker.reg_values.values())
|
||
# print(f"breaker_reg_values:{breaker_reg_values}")
|
||
for i in range(len(breaker_reg_values)):
|
||
holding_registers.server_set_values(1220+i+1, int(breaker_reg_values[i])) # type: ignore
|
||
|
||
lsdaq_reg_values = list(self.lsdaq.reg_values.values())
|
||
# print(f"lsdaq_reg_values:{lsdaq_reg_values}")
|
||
for i in range(len(lsdaq_reg_values)):
|
||
holding_registers.server_set_values(8+2*i+1, float_to_registers(lsdaq_reg_values[i])) # type: ignore
|
||
|
||
lsdaq_warning_values = list(self.lsdaq.warning_values.values())
|
||
# print(f"lsdaq_warning_values:{lsdaq_warning_values}")
|
||
for i in range(len(lsdaq_warning_values)):
|
||
holding_registers.server_set_values(1129+i+1, int(lsdaq_warning_values[i])) # type: ignore
|
||
|
||
hsdaq_reg_values = self.hsdaq.reg_values
|
||
# print(f"hsdaq_reg_values:{hsdaq_reg_values}")
|
||
# print(f"len = {len(hsdaq_reg_values)}")
|
||
for i in range(len(hsdaq_reg_values)):
|
||
holding_registers.server_set_values(60+2*i+1, float_to_registers(hsdaq_reg_values[i])) # type: ignore
|
||
|
||
hsdaq_warning_values = list(self.hsdaq.warning_values.values())
|
||
# print(f"hsdaq_warning_values:{hsdaq_warning_values}")
|
||
for i in range(len(hsdaq_warning_values)):
|
||
holding_registers.server_set_values(1145+i+1, int(hsdaq_warning_values[i])) # type: ignore
|
||
|
||
# 更新从plc采集到的数据
|
||
# packed_data = bytearray()
|
||
# for k, v in self.plc_measurements.items():
|
||
# packed_data.extend(struct.pack('>fHffH', v['value'], v['warning_param']['enable'], v['warning_param']['lower'], v['warning_param']['upper'], v['warning']))
|
||
# register_values = []
|
||
# for i in range(0, len(packed_data), 2):
|
||
# if i + 1 < len(packed_data):
|
||
# # 组合两个字节为一个16位整数
|
||
# value = (packed_data[i] << 8) | packed_data[i + 1]
|
||
# else:
|
||
# # 如果字节数为奇数,最后一个字节补0
|
||
# value = packed_data[i] << 8
|
||
# register_values.append(value)
|
||
# holding_registers.server_set_values(1161+1, register_values)
|
||
|
||
# 读取CPU各温区温度
|
||
thermal_zones = 5
|
||
self.cpu_temperatures = {}
|
||
for i in range(thermal_zones):
|
||
with open(f"/sys/class/thermal/thermal_zone{i}/temp", 'r') as f:
|
||
# 读取温度值(毫摄氏度)
|
||
temp_millic = int(f.read().strip())
|
||
# 转换为摄氏度
|
||
temp_c = temp_millic / 1000.0
|
||
self.cpu_temperatures[f'zone{i}'] = temp_c
|
||
|
||
holding_registers.server_set_values(50+2*i+1, float_to_registers(temp_c)) # type: ignore
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"Error in _update_modbus_datas(): {e}")
|
||
|
||
def _write_to_influxdb(self):
|
||
if not self.influx_client:
|
||
self.logger.error("Influxdb isn't initilized. Try to initilize after 1 seconds.")
|
||
time.sleep(1)
|
||
if hasattr(self, 'influx_client'):
|
||
self.influx_client.close()
|
||
self.influx_client = InfluxDBClient(
|
||
url=self.config_manager.config['influxdb'].get('url', 'http://localhost:8086'),
|
||
token=self.config_manager.config['influxdb'].get('token', 'PCM:1842moon'),
|
||
org=self.config_manager.config['influxdb'].get('org', 'MEASCON'),
|
||
)
|
||
self.write_api = self.influx_client.write_api(write_options=SYNCHRONOUS)
|
||
try:
|
||
points = []
|
||
# 构建toradex核心板CPU温度
|
||
point = Point("PCM_Measurement")
|
||
point.tag("data_type", 'cpu_temperatures')
|
||
for field_name, field_value in self.cpu_temperatures.items():
|
||
point.field(field_name, float(field_value))
|
||
points.append(point)
|
||
|
||
# 构建GPS采集数据
|
||
point = Point("PCM_Measurement")
|
||
point.tag("data_type", 'GPS')
|
||
for field_name, field_value in self.gps.gps_data.items():
|
||
point.field(field_name, float(field_value))
|
||
points.append(point)
|
||
|
||
# 构建Breaker采集数据
|
||
point = Point("PCM_Measurement")
|
||
point.tag("data_type", 'Breaker')
|
||
for field_name, field_value in self.breaker.reg_values.items():
|
||
point.field(field_name, float(field_value))
|
||
points.append(point)
|
||
|
||
# 构建TaskInfo采集数据
|
||
point = Point("PCM_Measurement")
|
||
point.tag("data_type", 'TaskInfo')
|
||
print(f"taskInfo = {self.taskInfo}")
|
||
for field_name, field_value in self.taskInfo.items():
|
||
if field_value == None:
|
||
field_value = 0
|
||
point.field(field_name, float(field_value))
|
||
points.append(point)
|
||
|
||
# 构建低速采集数据
|
||
point = Point("PCM_Measurement")
|
||
point.tag("data_type", 'LSDAQ')
|
||
# for field_name, field_value in self.lsdaq.reg_values.items():
|
||
# point.field(field_name, float(field_value))
|
||
# for field_name, field_value in self.lsdaq.warning_values.items():
|
||
# point.field(field_name+'.warning', field_value)
|
||
for field_name, field_value in self.lsdaq.reg_values.items():
|
||
if field_name in self.lsdaq.alias and self.lsdaq.alias.get(field_name) != '':
|
||
point.field(self.lsdaq.alias[field_name], float(field_value))
|
||
for field_name, field_value in self.lsdaq.warning_values.items():
|
||
if field_name in self.lsdaq.alias and self.lsdaq.alias.get(field_name) != '':
|
||
point.field(self.lsdaq.alias[field_name]+'.warning', field_value)
|
||
points.append(point)
|
||
|
||
# 构建高速采集数据
|
||
for i in range(16):
|
||
# print(str(self.hsdaq.feature_data[f'CH{i}']))
|
||
j = i + 1
|
||
point = Point("PCM_Measurement")
|
||
# point.tag("data_type", f'HSDAQ_CH{j}')
|
||
if self.hsdaq.alias.get(f'CH{j}') != '':
|
||
point.tag("data_type", self.hsdaq.alias[f'CH{j}'])
|
||
for field_name, field_value in self.hsdaq.feature_data[f'CH{j}'].items():
|
||
point.field(field_name, float(field_value))
|
||
point.field('warning', self.hsdaq.warning_values[f'CH{j}'])
|
||
points.append(point)
|
||
|
||
# 构建从PLC采集到的数据
|
||
point = Point("PCM_Measurement")
|
||
point.tag("data_type", f'PLC')
|
||
for k, v in self.plc_measurements.items():
|
||
point.field(k, float(v['value']))
|
||
points.append(point)
|
||
|
||
# 将数据点写入influxdb
|
||
self.influx_client.write_batch_data(points)
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"Error in _write_to_influxdb(): {e}")
|
||
|
||
def _read_plc_datas(self):
|
||
if not self.plc_client:
|
||
self.plc_client = ModbusTcpClient(self.plc_host, port=self.plc_port)
|
||
if self.plc_client.connect() and self.plc_measurements:
|
||
for k, v in self.plc_measurements.items():
|
||
ret = self.plc_client.read_holding_registers(address=v['address'], count=2, slave=1)
|
||
print(f"{ret}")
|
||
if (not ret.isError()) and len(ret.registers) == 2:
|
||
self.plc_measurements[k]['value'] = registers_to_float(ret.registers, byte_order='ABCD')
|
||
if v['warning_param']['enable'] == 1:
|
||
low_limit = v['warning_param']['lower']
|
||
high_limit = v['warning_param']['upper']
|
||
val = self.plc_measurements[k]['value']
|
||
if val < low_limit or val > high_limit:
|
||
self.plc_measurements[k]['warning'] = 1
|
||
else:
|
||
self.plc_measurements[k]['warning'] = 0
|
||
else:
|
||
self.plc_measurements[k]['warning'] = 0
|
||
|
||
def run(self):
|
||
"""主运行循环"""
|
||
timestamp = time.time()
|
||
task_control_reg_addr = self.config_manager.config['task']['control_reg_addr']
|
||
task_control_reg_value = 0x0000
|
||
self.taskInfo['start_time'] = None
|
||
|
||
# 从配置文件恢复运行时间(如果有)
|
||
saved_running_time = self._load_task_running_time()
|
||
if saved_running_time > 0:
|
||
self.taskInfo['running_time'] = saved_running_time
|
||
self.logger.info(f"Restored running_time from config: {self.taskInfo['running_time']:.2f}s")
|
||
else:
|
||
self.taskInfo['running_time'] = 0
|
||
|
||
# 持久化保存相关变量
|
||
last_save_time = time.time()
|
||
SAVE_INTERVAL = 5 # 每5秒保存一次
|
||
last_running_time = self.taskInfo['running_time'] # 用于检测变化
|
||
|
||
while True:
|
||
self.gps.config = self.config_manager.config['gps']
|
||
self.lsdaq.config = self.config_manager.config['lsdaq']
|
||
self.lsdaq.update_config()
|
||
self.hsdaq.config = self.config_manager.config['hsdaq']
|
||
self.hsdaq.update_config()
|
||
self.taskInfo['status'] = self.holding_registers.values[task_control_reg_addr+1]
|
||
match self.taskInfo['status']:
|
||
case 0x0000:
|
||
self.breaker.openBreaker()
|
||
# 实验停止时重置运行时间
|
||
if self.taskInfo['running_time'] > 0:
|
||
self._reset_task_running_time()
|
||
self.taskInfo['start_time'] = None
|
||
self.taskInfo['running_time'] = 0
|
||
case 0x5555:
|
||
self.breaker.closeBreaker()
|
||
if self.breaker.reg_values['load_status'] == 1:
|
||
current_time = time.time()
|
||
self.taskInfo['running_time'] += ( current_time - self.taskInfo['start_time'] )
|
||
self.taskInfo['start_time'] = current_time
|
||
else:
|
||
self.taskInfo['start_time'] = time.time()
|
||
|
||
if self.taskInfo['running_time'] > self.taskInfo['period']:
|
||
# 实验完成时重置运行时间
|
||
self._reset_task_running_time()
|
||
self.holding_registers.values[task_control_reg_addr+1] = 0xFFFF
|
||
continue
|
||
|
||
if 1 in self.lsdaq.warning_values.values() or 1 in self.hsdaq.warning_values.values():
|
||
self.breaker.alarming()
|
||
self.holding_registers.values[task_control_reg_addr+1] = 0xFFFF
|
||
continue
|
||
else:
|
||
self.breaker.unalarming()
|
||
case 0xAAAA:
|
||
self.breaker.openBreaker()
|
||
self.taskInfo['start_time'] = time.time()
|
||
case 0xFFFF:
|
||
self.holding_registers.values[task_control_reg_addr+1] = 0x0000
|
||
|
||
# time.sleep(0.01)
|
||
nowtime = time.time()
|
||
# print(f"{timestamp}, {nowtime}")
|
||
if (nowtime-timestamp) > 1:
|
||
timestamp = nowtime
|
||
#将数据写入influxdb
|
||
self._write_to_influxdb()
|
||
# self._read_plc_datas()
|
||
self._update_modbus_datas()
|
||
|
||
# 周期性保存运行时间(仅在运行状态且时间有变化时保存)
|
||
if self.taskInfo['status'] == 0x5555:
|
||
if (nowtime - last_save_time) >= SAVE_INTERVAL:
|
||
if self.taskInfo['running_time'] != last_running_time:
|
||
self._save_task_running_time(self.taskInfo['running_time'])
|
||
last_running_time = self.taskInfo['running_time']
|
||
last_save_time = nowtime
|
||
|
||
# 控制寄存器
|
||
|
||
|
||
if __name__ == "__main__":
|
||
gateway = ModbusGateway()
|
||
gateway.run() |