RDSS/core/data_writer.py

1277 lines
55 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python3
"""
数据写入器模块
"""
import struct
import logging
from pathlib import Path
from core.encoder import BitFieldEncoder
# 设置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class InstructionConfigV4:
"""指令配置类 V4"""
# 指令部分各字段定义
INSTRUCTION_FORMAT = {
'header': {
'bytes': 2,
'value': [0xAA, 0xAA],
'description': '包头'
},
'signal_type': {
'bytes': 2,
'value': [0x00, 0x07],
'description': '信号类型'
},
'ddr_start_addr': {
'bytes': 4,
'value': [0x00, 0x00, 0x00, 0x00],
'description': 'DDR起始地址'
},
'ddr_end_addr': {
'bytes': 4,
'value': None, # 根据数据长度计算
'description': 'DDR结束地址'
},
'sync_pulse1_count': {
'bytes': 4,
'value': [0x00, 0x00, 0x01, 0xDB], # 475
'description': '同步脉冲1个数'
},
'sync_pulse1_period': {
'bytes': 4,
'value': [0x00, 0x00, 0x3E, 0x80], # 16000
'description': '同步脉冲1周期'
},
'sync_pulse2_count': {
'bytes': 4,
'value': [0x00, 0x00, 0x00, 0x14], # 20
'description': '同步脉冲2个数'
},
'sync_pulse2_period': {
'bytes': 4,
'value': [0x00, 0x01, 0x86, 0xA0], # 100000
'description': '同步脉冲2周期'
},
'sync_pulse3_count': {
'bytes': 4,
'value': [0x00, 0x00, 0x00, 0x01], # 1
'description': '同步脉冲3个数'
},
'sync_pulse3_period': {
'bytes': 4,
'value': [0x00, 0x06, 0x1A, 0x80], # 400000
'description': '同步脉冲3周期'
},
'sync_pulse4_count': {
'bytes': 4,
'value': [0x00, 0x00, 0x00, 0x00], # 0
'description': '同步脉冲4个数'
},
'sync_pulse4_period': {
'bytes': 4,
'value': [0x00, 0x00, 0x00, 0x00], # 0
'description': '同步脉冲4周期'
},
'start_stop_control': {
'bytes': 2,
'value': [0x00, 0x00],
'description': '启停控制'
},
'crc16': {
'bytes': 2,
'value': None, # 根据前面数据计算
'description': 'Modbus CRC16校验和'
}
}
# 打包配置
PACKING_CONFIG = {
'signals_per_packet': 800, # 每包800个点火信号
'bytes_per_signal': 8, # 每个信号8字节
'crc_bytes': 2, # CRC16校验和2字节
'bytes_per_packet': 6402, # 800*8 + 2 = 6402字节
'group_size': 8, # 8个信号为一组
'padding_byte': 0x00 # 填充字节
}
@classmethod
def get_instruction_length(cls) -> int:
"""获取指令部分的总长度(字节)"""
total = 0
for field_info in cls.INSTRUCTION_FORMAT.values():
total += field_info['bytes']
return total
@classmethod
def calculate_ddr_end_addr(cls, total_data_bytes: int) -> list:
"""
计算DDR结束地址
DDR结束地址以信号为单位每个信号8字节
每包数据6400字节 = 800个信号
参数:
total_data_bytes: 文件总字节数(包括指令部分)
返回:
DDR结束地址的4字节列表以信号为单位
"""
# 减去两个指令的大小启动指令48字节 + 停止指令48字节 = 96字节得到纯数据大小
instruction_size = cls.get_instruction_length() * 2 # 两个指令
data_size = total_data_bytes - instruction_size
# 计算包的数量每包6400字节向上取整
packet_size = 6400
packet_count = (data_size + packet_size - 1) // packet_size
# DDR结束地址 = 包的数量 * 800每包800个信号
end_addr = packet_count * 800
# 转换为4字节列表大端序
return [(end_addr >> (8 * (3 - i))) & 0xFF for i in range(4)]
@classmethod
def calculate_crc16(cls, data: bytes) -> list:
"""
计算Modbus CRC16校验和
使用标准Modbus CRC16算法
"""
crc = 0xFFFF
for byte in data:
crc ^= byte
for _ in range(8):
if crc & 0x0001:
crc = (crc >> 1) ^ 0xA001
else:
crc >>= 1
# 返回低字节在前
return [crc & 0xFF, (crc >> 8) & 0xFF]
@classmethod
def generate_instruction_bytes(cls, total_data_bytes: int) -> bytes:
"""
生成完整的指令部分字节流
参数:
total_data_bytes: 总数据字节数(不包括指令部分)
返回:
指令部分的字节流
"""
instruction_data = bytearray()
# 1. 包头
instruction_data.extend(bytes(cls.INSTRUCTION_FORMAT['header']['value']))
# 2. 信号类型
instruction_data.extend(bytes(cls.INSTRUCTION_FORMAT['signal_type']['value']))
# 3. DDR起始地址
instruction_data.extend(bytes(cls.INSTRUCTION_FORMAT['ddr_start_addr']['value']))
# 4. DDR结束地址根据总数据大小计算
ddr_end_addr = cls.calculate_ddr_end_addr(total_data_bytes)
instruction_data.extend(bytes(ddr_end_addr))
# 5. 同步脉冲1个数
instruction_data.extend(bytes(cls.INSTRUCTION_FORMAT['sync_pulse1_count']['value']))
# 6. 同步脉冲1周期
instruction_data.extend(bytes(cls.INSTRUCTION_FORMAT['sync_pulse1_period']['value']))
# 7. 同步脉冲2个数
instruction_data.extend(bytes(cls.INSTRUCTION_FORMAT['sync_pulse2_count']['value']))
# 8. 同步脉冲2周期
instruction_data.extend(bytes(cls.INSTRUCTION_FORMAT['sync_pulse2_period']['value']))
# 9. 同步脉冲3个数
instruction_data.extend(bytes(cls.INSTRUCTION_FORMAT['sync_pulse3_count']['value']))
# 10. 同步脉冲3周期
instruction_data.extend(bytes(cls.INSTRUCTION_FORMAT['sync_pulse3_period']['value']))
# 11. 同步脉冲4个数
instruction_data.extend(bytes(cls.INSTRUCTION_FORMAT['sync_pulse4_count']['value']))
# 12. 同步脉冲4周期
instruction_data.extend(bytes(cls.INSTRUCTION_FORMAT['sync_pulse4_period']['value']))
# 13. 启停控制
instruction_data.extend(bytes(cls.INSTRUCTION_FORMAT['start_stop_control']['value']))
# 14. 计算CRC16不包括CRC字段本身
crc_bytes = cls.calculate_crc16(instruction_data)
instruction_data.extend(bytes(crc_bytes))
return bytes(instruction_data)
@classmethod
def print_instruction_info(cls, total_data_bytes: int):
"""打印指令信息"""
print("=" * 60)
print("指令配置信息")
print("=" * 60)
# 生成指令字节流
instruction_bytes = cls.generate_instruction_bytes(total_data_bytes)
# 解析并打印每个字段
offset = 0
for field_name, field_info in cls.INSTRUCTION_FORMAT.items():
field_bytes = field_info['bytes']
field_data = instruction_bytes[offset:offset + field_bytes]
# 格式化显示
hex_str = ' '.join(f'0x{b:02X}' for b in field_data)
int_str = ' '.join(f'{b:3d}' for b in field_data)
if field_name == 'ddr_end_addr' and field_info['value'] is None:
# 计算DDR结束地址
end_addr_bytes = cls.calculate_ddr_end_addr(total_data_bytes)
end_addr = sum(end_addr_bytes[i] << (8 * (3 - i)) for i in range(4))
print(f"{field_info['description']:20s}: {hex_str} ({int_str}) -> 地址: 0x{end_addr:08X} ({end_addr})")
elif field_name == 'crc16' and field_info['value'] is None:
crc_value = (field_data[1] << 8) | field_data[0]
print(f"{field_info['description']:20s}: {hex_str} ({int_str}) -> CRC16: 0x{crc_value:04X}")
else:
print(f"{field_info['description']:20s}: {hex_str} ({int_str})")
offset += field_bytes
# 打印内存信息
end_addr_bytes = cls.calculate_ddr_end_addr(total_data_bytes)
end_addr = sum(end_addr_bytes[i] << (8 * (3 - i)) for i in range(4))
print("\n内存信息:")
print(f" 总数据字节: {total_data_bytes} 字节")
print(f" DDR起始地址: 0x{0:08X}")
print(f" DDR结束地址: 0x{end_addr:08X} ({end_addr})")
print(f" 验证 (end_addr + 8) % 800: {(end_addr + 8) % 800} (应为0)")
print("=" * 60)
@classmethod
def generate_start_instruction(cls, signal_type: int, ddr_start_addr: int, ddr_end_addr: int,
sync_pulse1_count: int, sync_pulse1_period: int,
sync_pulse2_count: int, sync_pulse2_period: int,
sync_pulse3_count: int, sync_pulse3_period: int,
sync_pulse4_count: int, sync_pulse4_period: int) -> bytes:
"""
生成启动指令
参数:
signal_type: 信号类型 (0x07 或 0x70)
ddr_start_addr: DDR起始地址
ddr_end_addr: DDR结束地址
sync_pulse1_count: 同步脉冲1个数
sync_pulse1_period: 同步脉冲1周期(10ns)
sync_pulse2_count: 同步脉冲2个数
sync_pulse2_period: 同步脉冲2周期(10ns)
sync_pulse3_count: 同步脉冲3个数
sync_pulse3_period: 同步脉冲3周期(10ns)
sync_pulse4_count: 同步脉冲4个数
sync_pulse4_period: 同步脉冲4周期(10ns)
返回:
启动指令的字节流48字节
"""
instruction_data = bytearray()
# 1. 包头
instruction_data.extend(bytes(cls.INSTRUCTION_FORMAT['header']['value']))
# 2. 信号类型
instruction_data.extend([(signal_type >> 8) & 0xFF, signal_type & 0xFF])
# 3. DDR起始地址
instruction_data.extend([(ddr_start_addr >> (8 * (3 - i))) & 0xFF for i in range(4)])
# 4. DDR结束地址
instruction_data.extend([(ddr_end_addr >> (8 * (3 - i))) & 0xFF for i in range(4)])
# 5. 同步脉冲1个数
instruction_data.extend([(sync_pulse1_count >> (8 * (3 - i))) & 0xFF for i in range(4)])
# 6. 同步脉冲1周期
instruction_data.extend([(sync_pulse1_period >> (8 * (3 - i))) & 0xFF for i in range(4)])
# 7. 同步脉冲2个数
instruction_data.extend([(sync_pulse2_count >> (8 * (3 - i))) & 0xFF for i in range(4)])
# 8. 同步脉冲2周期
instruction_data.extend([(sync_pulse2_period >> (8 * (3 - i))) & 0xFF for i in range(4)])
# 9. 同步脉冲3个数
instruction_data.extend([(sync_pulse3_count >> (8 * (3 - i))) & 0xFF for i in range(4)])
# 10. 同步脉冲3周期
instruction_data.extend([(sync_pulse3_period >> (8 * (3 - i))) & 0xFF for i in range(4)])
# 11. 同步脉冲4个数
instruction_data.extend([(sync_pulse4_count >> (8 * (3 - i))) & 0xFF for i in range(4)])
# 12. 同步脉冲4周期
instruction_data.extend([(sync_pulse4_period >> (8 * (3 - i))) & 0xFF for i in range(4)])
# 13. 启停控制启动0x0001
instruction_data.extend([0x00, 0x01])
# 14. 计算CRC16不包括CRC字段本身
crc_bytes = cls.calculate_crc16(instruction_data)
instruction_data.extend(bytes(crc_bytes))
return bytes(instruction_data)
@classmethod
def generate_stop_instruction(cls, signal_type: int, ddr_start_addr: int, ddr_end_addr: int,
sync_pulse1_count: int, sync_pulse1_period: int,
sync_pulse2_count: int, sync_pulse2_period: int,
sync_pulse3_count: int, sync_pulse3_period: int,
sync_pulse4_count: int, sync_pulse4_period: int) -> bytes:
"""
生成停止指令
参数:
signal_type: 信号类型 (0x07 或 0x70)
ddr_start_addr: DDR起始地址
ddr_end_addr: DDR结束地址
sync_pulse1_count: 同步脉冲1个数
sync_pulse1_period: 同步脉冲1周期(10ns)
sync_pulse2_count: 同步脉冲2个数
sync_pulse2_period: 同步脉冲2周期(10ns)
sync_pulse3_count: 同步脉冲3个数
sync_pulse3_period: 同步脉冲3周期(10ns)
sync_pulse4_count: 同步脉冲4个数
sync_pulse4_period: 同步脉冲4周期(10ns)
返回:
停止指令的字节流48字节
"""
instruction_data = bytearray()
# 1. 包头
instruction_data.extend(bytes(cls.INSTRUCTION_FORMAT['header']['value']))
# 2. 信号类型
instruction_data.extend([(signal_type >> 8) & 0xFF, signal_type & 0xFF])
# 3. DDR起始地址
instruction_data.extend([(ddr_start_addr >> (8 * (3 - i))) & 0xFF for i in range(4)])
# 4. DDR结束地址
instruction_data.extend([(ddr_end_addr >> (8 * (3 - i))) & 0xFF for i in range(4)])
# 5. 同步脉冲1个数
instruction_data.extend([(sync_pulse1_count >> (8 * (3 - i))) & 0xFF for i in range(4)])
# 6. 同步脉冲1周期
instruction_data.extend([(sync_pulse1_period >> (8 * (3 - i))) & 0xFF for i in range(4)])
# 7. 同步脉冲2个数
instruction_data.extend([(sync_pulse2_count >> (8 * (3 - i))) & 0xFF for i in range(4)])
# 8. 同步脉冲2周期
instruction_data.extend([(sync_pulse2_period >> (8 * (3 - i))) & 0xFF for i in range(4)])
# 9. 同步脉冲3个数
instruction_data.extend([(sync_pulse3_count >> (8 * (3 - i))) & 0xFF for i in range(4)])
# 10. 同步脉冲3周期
instruction_data.extend([(sync_pulse3_period >> (8 * (3 - i))) & 0xFF for i in range(4)])
# 11. 同步脉冲4个数
instruction_data.extend([(sync_pulse4_count >> (8 * (3 - i))) & 0xFF for i in range(4)])
# 12. 同步脉冲4周期
instruction_data.extend([(sync_pulse4_period >> (8 * (3 - i))) & 0xFF for i in range(4)])
# 13. 启停控制停止0x0000
instruction_data.extend([0x00, 0x02])
# 14. 计算CRC16不包括CRC字段本身
crc_bytes = cls.calculate_crc16(instruction_data)
instruction_data.extend(bytes(crc_bytes))
return bytes(instruction_data)
def calculate_crc16(data: bytes) -> bytes:
"""计算Modbus CRC16校验和"""
crc = 0xFFFF
for byte in data:
crc ^= byte
for _ in range(8):
if crc & 0x0001:
crc = (crc >> 1) ^ 0xA001
else:
crc >>= 1
# 返回低字节在前的2字节
return struct.pack('<H', crc)
class DataWriterV4:
"""数据写入器 V4支持两种打包模式"""
def __init__(self, config: dict):
self.config = config
self.encoder = BitFieldEncoder()
self.instruction_config = InstructionConfigV4()
self.packing_config = self.instruction_config.PACKING_CONFIG
# 获取打包模式
self.packing_mode = config['simulation'].get('packing_mode', 'separate')
if self.packing_mode not in ['separate', 'combined']:
raise ValueError(f"不支持的打包模式: {self.packing_mode}")
# 获取能量转换参数
energy_conversion = config['simulation'].get('energy_conversion', {})
self.energy_K = energy_conversion.get('K', 1.0)
self.energy_B = energy_conversion.get('B', 0.0)
logger.info(f"使用打包模式: {self.packing_mode}")
logger.info(f"能量转换参数: K={self.energy_K}, B={self.energy_B}")
def _encode_signal_to_bytes(self, signal: int) -> bytes:
"""
将64位整数编码为8字节高字节在前
"""
return struct.pack('>Q', signal) # 大端序
def _reorganize_signals_for_output(self, signals: list) -> list:
"""
重新组织信号用于输出
8个信号为一组组内按照信号序号逆序排列
例如8,7,6,5,4,3,2,1, 16,15,14,13,12,11,10,9, ...
"""
group_size = 8
num_groups = len(signals) // group_size
reorganized = []
for group_idx in range(num_groups):
# 计算当前组的结束索引(即该组的最后一个信号索引+1
end_idx = (group_idx + 1) * group_size
start_idx = end_idx - group_size
group = signals[start_idx:end_idx]
# 组内逆序排列:从后往前取信号
reversed_group = list(reversed(group))
reorganized.extend(reversed_group)
return reorganized
def _create_packet_bytes(self, packet_signals: list) -> bytes:
"""
创建一个数据包的字节流6402字节
参数:
packet_signals: 一个数据包的信号列表必须为800个
返回:
数据包的字节流6400数据字节 + 2字节CRC16
"""
if len(packet_signals) != 800:
raise ValueError(f"数据包必须包含800个信号当前为{len(packet_signals)}")
# 1. 重新组织信号8个一组从后往前
reorganized_signals = self._reorganize_signals_for_output(packet_signals)
# 2. 编码为字节流
packet_data = bytearray()
for signal in reorganized_signals:
packet_data.extend(self._encode_signal_to_bytes(signal))
# 3. 计算CRC16对整个6400字节数据
crc_bytes = calculate_crc16(packet_data)
# 4. 添加CRC16
packet_data.extend(crc_bytes)
# 验证大小
if len(packet_data) != 6402:
raise ValueError(f"数据包字节流大小错误: {len(packet_data)} != 6402")
return bytes(packet_data)
def _pack_events_separately(self, all_events: list) -> list:
"""
单独打包模式:每个事件单独打包成一个数据包
参数:
all_events: 所有事件的信号列表
返回:
数据包字节流列表
"""
packets = []
for event_idx, event_signals in enumerate(all_events):
# 每个事件单独处理
signals_needed = 800
# 如果事件信号不足800个补零
if len(event_signals) < signals_needed:
padding_needed = signals_needed - len(event_signals)
padded_signals = event_signals + [0] * padding_needed
elif len(event_signals) > signals_needed:
# 如果超过800个截断理论上不应该发生
logger.warning(f"事件{event_idx+1}信号数({len(event_signals)})超过800将截断")
padded_signals = event_signals[:signals_needed]
else:
padded_signals = event_signals
# 设置最后一个信号的事件结束标志位
if padded_signals[-1] != 0:
padded_signals[-1] = self.encoder.set_event_end_flag(padded_signals[-1])
else:
# 如果最后一个信号是0设置为事件结束标记
padded_signals[-1] = self.encoder.encode(0, [0, 0, 0], event_end=True)
# 创建数据包
packet_bytes = self._create_packet_bytes(padded_signals)
packets.append(packet_bytes)
logger.info(f"已打包事件 {event_idx + 1}/{len(all_events)} (单独打包模式)")
return packets
def _pack_events_combined(self, all_events: list) -> list:
"""
整体打包模式所有事件信号拼接每800个信号一包
参数:
all_events: 所有事件的信号列表
返回:
数据包字节流列表
"""
# 1. 收集所有信号,并设置每个事件的最后一个信号为结束标志
all_signals = []
for event_idx, event_signals in enumerate(all_events):
if event_signals:
# 设置最后一个信号的事件结束标志位
if event_signals[-1] != 0:
event_signals[-1] = self.encoder.set_event_end_flag(event_signals[-1])
else:
# 如果最后一个信号是0设置为事件结束标记
event_signals[-1] = self.encoder.encode(0, [0, 0, 0], event_end=True)
all_signals.extend(event_signals)
else:
# 如果事件没有信号,添加一个结束标记信号
end_signal = self.encoder.encode(0, [0, 0, 0], event_end=True)
all_signals.append(end_signal)
logger.info(f"已处理事件 {event_idx + 1}/{len(all_events)},信号数: {len(event_signals)}")
# 2. 将信号分组成800个一包
packets = []
signals_per_packet = 800
total_signals = len(all_signals)
num_full_packets = total_signals // signals_per_packet
remaining_signals = total_signals % signals_per_packet
logger.info(f"总信号数: {total_signals}, 完整包: {num_full_packets}, 剩余信号: {remaining_signals}")
# 3. 处理完整的数据包
for i in range(num_full_packets):
start_idx = i * signals_per_packet
end_idx = start_idx + signals_per_packet
packet_signals = all_signals[start_idx:end_idx]
# 创建数据包
packet_bytes = self._create_packet_bytes(packet_signals)
packets.append(packet_bytes)
logger.info(f"已打包完整数据包 {i + 1}/{num_full_packets}")
# 4. 处理最后一个不完整的数据包(如果有)
if remaining_signals > 0:
start_idx = num_full_packets * signals_per_packet
last_packet_signals = all_signals[start_idx:]
# 补零到800个信号
padding_needed = signals_per_packet - remaining_signals
padded_signals = last_packet_signals + [0] * padding_needed
# 创建最后一个数据包
packet_bytes = self._create_packet_bytes(padded_signals)
packets.append(packet_bytes)
logger.info(f"已打包最后一个数据包 (补零{padding_needed}个信号)")
return packets
def write_events_binary_v4(self, all_events: list, output_file: str):
"""
将事件写入二进制文件 V4
注意: 此方法已弃用,请使用 write_time_series_with_instructions 方法
"""
import warnings
warnings.warn("write_events_binary_v4 方法已弃用,请使用 write_time_series_with_instructions 方法",
DeprecationWarning, stacklevel=2)
# 根据打包模式创建数据包
if self.packing_mode == 'separate':
packets = self._pack_events_separately(all_events)
print(f"打包模式: 单独打包 - 每个事件一个数据包")
else: # combined
packets = self._pack_events_combined(all_events)
print(f"打包模式: 整体打包 - 所有事件信号拼接后分包")
# 计算总数据字节数(不包括指令部分)
total_data_bytes = sum(len(packet) for packet in packets)
# 生成指令部分
instruction_bytes = self.instruction_config.generate_instruction_bytes(total_data_bytes)
# 写入文件
with open(output_file, 'wb') as f:
# 写入指令部分
# f.write(instruction_bytes)
# 写入所有数据包
for packet_idx, packet in enumerate(packets):
f.write(packet)
# 统计信息
print(f"二进制文件结构:")
print(f" 指令部分: {len(instruction_bytes)} 字节")
print(f" 数据包数量: {len(packets)}")
print(f" 总数据字节: {total_data_bytes} 字节")
print(f" 每数据包: {self.packing_config['bytes_per_packet']} 字节")
print(f" 总大小: {len(instruction_bytes) + total_data_bytes} 字节")
logger.info(f"已将 {len(packets)} 个数据包写入二进制文件 V4: {output_file}")
def write_events_text_v4(self, all_events: list, output_file: str):
"""
将事件写入文本文件 V4
每行一个十进制整数0-255
注意: 此方法已弃用,请使用 write_time_series_with_instructions 方法
"""
import warnings
warnings.warn("write_events_text_v4 方法已弃用,请使用 write_time_series_with_instructions 方法",
DeprecationWarning, stacklevel=2)
# 根据打包模式创建数据包
if self.packing_mode == 'separate':
packets = self._pack_events_separately(all_events)
print(f"打包模式: 单独打包 - 每个事件一个数据包")
else: # combined
packets = self._pack_events_combined(all_events)
print(f"打包模式: 整体打包 - 所有事件信号拼接后分包")
# 计算总数据字节数(不包括指令部分)
total_data_bytes = sum(len(packet) for packet in packets)
# 生成指令部分
instruction_bytes = self.instruction_config.generate_instruction_bytes(total_data_bytes)
# 将所有字节合并
all_bytes = instruction_bytes
for packet in packets:
all_bytes += packet
# 转换为整数列表
int_list = list(all_bytes)
# 写入文件(每行一个十进制整数)
with open(output_file, 'w', encoding='utf-8') as f:
for i, byte_value in enumerate(int_list):
f.write(f"{byte_value}\n")
# 打印信息
print(f"文本文件结构:")
print(f" 总字节数: {len(int_list)}")
print(f" 总行数: {len(int_list)}")
print(f" 指令部分: {len(instruction_bytes)} 字节 (行 1-{len(instruction_bytes)})")
print(f" 数据部分: {total_data_bytes} 字节 (行 {len(instruction_bytes)+1}-{len(int_list)})")
print(f" 数据包数量: {len(packets)}")
print(f" 每数据包: {self.packing_config['bytes_per_packet']} 字节 = {self.packing_config['bytes_per_packet']}")
print(f" 每行一个0-255的十进制整数")
logger.info(f"已将 {len(int_list)} 个字节写入文本文件: {output_file}")
def write_events_debug_text(self, all_events: list, output_file: str):
"""
写入调试文本文件(详细事件信息)
8个信号为一组组内按照信号序号逆序排列
"""
with open(output_file, 'w', encoding='utf-8') as f:
f.write(f"# 探测器信号数据 V4.2 - 打包模式: {self.packing_mode}\n")
f.write("# 每个数据包: 800个点火信号 + 2字节CRC16 = 6402字节\n")
f.write("# 8个信号为一组每组内从后往前导出逆序排列: 8,7,6,5,4,3,2,1, 16,15...\n")
f.write("# 格式: 事件ID | 组内序号 | 十六进制编码 | 解析结果\n")
f.write(f"# 能量转换: mV = keV × {self.energy_K} + {self.energy_B}\n")
# 写入模拟参数
f.write("#\n")
f.write("# 模拟参数:\n")
# 同步脉冲参数
sync_pulses = self.config.get('simulation', {}).get('sync_pulses', [])
for i, pulse in enumerate(sync_pulses, 1):
count = pulse.get('count', 0)
period = pulse.get('period', 0)
f.write(f"# 同步脉冲{i}: 个数={count}, 周期={period} (10ns)\n")
# 能量和AD转换参数
energy_conversion = self.config.get('simulation', {}).get('energy_conversion', {})
energy_K = energy_conversion.get('K', 1.0)
energy_B = energy_conversion.get('B', 0.0)
f.write(f"# 能量转换参数: K={energy_K}, B={energy_B} (mV = keV × K + B)\n")
f.write("#" * 120 + "\n")
for event_idx, event_signals in enumerate(all_events):
valid_count = 0
# 处理事件内的信号每8个一组组内逆序
num_groups = len(event_signals) // 8
for group_idx in range(num_groups):
# 计算当前组的结束索引(即该组的最后一个信号索引+1
end_idx = (group_idx + 1) * 8
start_idx = end_idx - 8
group_signals = event_signals[start_idx:end_idx]
# 组内逆序处理从后往前即从第8个到第1个
for j in range(len(group_signals) - 1, -1, -1):
sig_idx = start_idx + j # 原始信号索引0-based
display_idx = sig_idx + 1 # 显示的信号序号1-based
encoded = group_signals[j]
hex_str = self.encoder.to_hex_string(encoded)
if encoded == 0:
info = "填充信号"
else:
valid_count += 1
decoded = self.encoder.decode(encoded)
info_parts = []
if decoded['event_end']:
info_parts.append("事件结束")
info_parts.append(f"时间戳:{decoded['timestamp']}us")
energies = decoded['energies']
active_dets = []
for k in range(3):
if energies[k] > 0:
energy_mv = energies[k]
# 反向计算keV值: keV = (mV - B) / K
if self.energy_K != 0:
energy_kev = (energy_mv - self.energy_B) / self.energy_K
else:
energy_kev = 0
active_dets.append(f"Det{k+1}:{energy_kev:.1f}keV({energy_mv}mV)")
if active_dets:
info_parts.append(f"信号:{', '.join(active_dets)}")
info = ", ".join(info_parts)
marker = " [事件结束]" if sig_idx == len(event_signals) - 1 else ""
f.write(f"{event_idx:4d} | {display_idx:4d} | 0x{hex_str} | {info}{marker}\n")
# 每组信号后空一行
if group_idx < num_groups - 1:
f.write("\n")
# 事件结束后添加总结
f.write(f"# 总结: 有效信号={valid_count}, 总信号={len(event_signals)}\n\n")
logger.info(f"已将 {len(all_events)} 个事件写入调试文本文件: {output_file}")
def write_events(self, all_events: list, output_file: str = None):
"""
根据配置写入所有格式的文件
注意: 此方法已弃用,请使用 write_time_series_with_instructions 方法
"""
import warnings
warnings.warn("write_events 方法已弃用,请使用 write_time_series_with_instructions 方法",
DeprecationWarning, stacklevel=2)
if output_file is None:
output_file = self.config['simulation']['output_file']
output_format = self.config['simulation'].get('output_format', 'all')
base_name = Path(output_file).stem
if output_format == 'binary':
binary_file = f"{base_name}.bin"
self.write_events_binary_v4(all_events, binary_file)
elif output_format == 'text':
text_file = f"{base_name}_v4.txt"
self.write_events_text_v4(all_events, text_file)
elif output_format == 'debug':
debug_file = f"{base_name}_debug.txt"
self.write_events_debug_text(all_events, debug_file)
elif output_format == 'all':
binary_file = f"{base_name}.bin"
text_file = f"{base_name}_v4.txt"
debug_file = f"{base_name}_debug.txt"
print("生成所有格式文件:")
print(f" 1. {binary_file} - 二进制格式(指令+数据)")
print(f" 2. {text_file} - 文本格式(每行一个十进制整数)")
print(f" 3. {debug_file} - 调试文本格式(详细解析)")
print()
self.write_events_binary_v4(all_events, binary_file)
print()
self.write_events_text_v4(all_events, text_file)
print()
self.write_events_debug_text(all_events, debug_file)
else:
raise ValueError(f"不支持的输出格式: {output_format}")
def print_file_info(self, all_events: list):
"""
打印文件信息
"""
num_events = len(all_events)
print("=" * 80)
print(f"文件输出信息 V4.2 - 打包模式: {self.packing_mode}")
print("=" * 80)
# 计算数据包数量
if self.packing_mode == 'separate':
num_packets = num_events
else: # combined
total_signals = sum(len(event) for event in all_events)
num_full_packets = total_signals // 800
remaining_signals = total_signals % 800
num_packets = num_full_packets + (1 if remaining_signals > 0 else 0)
# 计算总数据字节数
total_data_bytes = num_packets * self.packing_config['bytes_per_packet']
# 打印指令信息
self.instruction_config.print_instruction_info(total_data_bytes)
# 打印事件统计
total_valid_signals = 0
total_all_signals = 0
for event_idx, event_signals in enumerate(all_events):
valid_in_event = sum(1 for sig in event_signals if sig != 0)
total_valid_signals += valid_in_event
total_all_signals += len(event_signals)
# 检查最后一个信号的事件结束标志
if event_signals:
last_signal = event_signals[-1]
if last_signal != 0:
last_decoded = self.encoder.decode(last_signal)
if not last_decoded['event_end']:
print(f"警告: 事件{event_idx+1}的最后一个信号没有设置事件结束标志")
print("\n事件统计:")
print(f" 总事件数: {num_events}")
print(f" 总信号数: {total_all_signals}")
print(f" 有效信号数: {total_valid_signals}")
print(f" 平均每事件信号数: {total_all_signals/num_events:.1f}")
print(f"\n数据包统计 ({self.packing_mode}模式):")
print(f" 数据包数量: {num_packets}")
print(f" 每数据包: {self.packing_config['bytes_per_packet']} 字节")
print(f" 总数据字节: {total_data_bytes} 字节")
if self.packing_mode == 'combined':
print(f" 完整数据包: {total_all_signals // 800}")
print(f" 最后一个数据包信号数: {total_all_signals % 800 if total_all_signals % 800 != 0 else 800}")
print(f" 最后一个数据包补零数: {800 - (total_all_signals % 800) if total_all_signals % 800 != 0 else 0}")
print("=" * 80)
def write_time_series_with_instructions(self, time_series: dict, output_file: str,
signal_type: int,
simulation_mode: str,
sync_pulse1_count: int, sync_pulse1_period: int,
sync_pulse2_count: int, sync_pulse2_period: int,
sync_pulse3_count: int, sync_pulse3_period: int,
sync_pulse4_count: int, sync_pulse4_period: int,
encoded_signals: list = None):
"""
写入时间序列数据文件(包含模拟参数和时间序列数据)
参数:
time_series: 时间序列字典
output_file: 输出文件路径
signal_type: 信号类型 (0x07 或 0x70)
simulation_mode: 模拟模式 (CO 或 MO)
sync_pulse1_count: 同步脉冲1个数
sync_pulse1_period: 同步脉冲1周期(10ns)
sync_pulse2_count: 同步脉冲2个数
sync_pulse2_period: 同步脉冲2周期(10ns)
sync_pulse3_count: 同步脉冲3个数
sync_pulse3_period: 同步脉冲3周期(10ns)
sync_pulse4_count: 同步脉冲4个数
sync_pulse4_period: 同步脉冲4周期(10ns)
encoded_signals: 可选的预编码信号列表(包含事件结束标志)
"""
# 1. 生成模拟参数(替代启动/停止指令)
# 参数格式:
# 字节0-3: 魔法数 0x52445353 (RDSS)
# 字节4-7: 版本号 0x00010000 (v1.0.0)
# 字节8-9: 信号类型
# 字节10-13: 实际有效信号数
# 字节14-15: 模拟模式
# 字节16-19: 同步脉冲1个数
# 字节20-23: 同步脉冲1周期
# 字节24-27: 同步脉冲2个数
# 字节28-31: 同步脉冲2周期
# 字节32-35: 同步脉冲3个数
# 字节36-39: 同步脉冲3周期
# 字节40-43: 同步脉冲4个数
# 字节44-47: 同步脉冲4周期
# 字节48-53: 保留
# 字节54-57: 校验和(简单累加和)
param_data = bytearray()
# 魔法数
param_data.extend(struct.pack('>I', 0x52445353)) # RDSS
# 版本号
param_data.extend(struct.pack('>I', 0x00010000)) # v1.0.0
# 信号类型
param_data.extend(struct.pack('>H', signal_type))
# 实际有效信号数
actual_signal_count = len(encoded_signals) if encoded_signals else 0
param_data.extend(struct.pack('>I', actual_signal_count))
# 模拟模式 (0=CO, 1=MO)
mode_value = 0 if simulation_mode.upper() == 'CO' else 1
param_data.extend(struct.pack('>H', mode_value))
# 同步脉冲1
param_data.extend(struct.pack('>I', sync_pulse1_count))
param_data.extend(struct.pack('>I', sync_pulse1_period))
# 同步脉冲2
param_data.extend(struct.pack('>I', sync_pulse2_count))
param_data.extend(struct.pack('>I', sync_pulse2_period))
# 同步脉冲3
param_data.extend(struct.pack('>I', sync_pulse3_count))
param_data.extend(struct.pack('>I', sync_pulse3_period))
# 同步脉冲4
param_data.extend(struct.pack('>I', sync_pulse4_count))
param_data.extend(struct.pack('>I', sync_pulse4_period))
# 保留
param_data.extend(struct.pack('>I', 0))
param_data.extend(struct.pack('>H', 0))
# 计算校验和(简单累加和)
checksum = sum(param_data) % 0xFFFFFFFF
param_data.extend(struct.pack('>I', checksum))
# 确保参数部分长度为96字节与原来的两个指令长度一致
padding = 96 - len(param_data)
if padding > 0:
param_data.extend(b'\x00' * padding)
# 3. 编码时间序列信号
encoder = BitFieldEncoder()
overflow_count = 0
# 如果已经提供了编码信号,则直接使用
if encoded_signals is None:
encoded_signals = []
detector_signals = {
"detector1": time_series["detector1"],
"detector2": time_series["detector2"],
"detector3": time_series["detector3"]
}
# 合并探测器信号
time_signal_map = {}
for det_idx, det_name in enumerate(['detector1', 'detector2', 'detector3']):
signals = detector_signals.get(det_name, [])
for signal in signals:
if len(signal) == 2:
timestamp_10ns = signal[0]
energy_mv = signal[1]
if timestamp_10ns not in time_signal_map:
time_signal_map[timestamp_10ns] = [0, 0, 0]
time_signal_map[timestamp_10ns][det_idx] = energy_mv
# 按时间戳排序并编码
sorted_timestamps = sorted(time_signal_map.keys())
overflow_count = 0
for timestamp in sorted_timestamps:
energies = time_signal_map[timestamp]
try:
encoded = encoder.encode(timestamp, energies, event_end=False)
encoded_signals.append(encoded)
except ValueError as e:
if "溢出" in str(e):
overflow_count += 1
continue
raise
if overflow_count > 0:
logger.warning(f"跳过了{overflow_count}个溢出信号")
# 4. 将信号打包成数据包
packets = []
signals_per_packet = 800
total_signals = len(encoded_signals)
num_full_packets = total_signals // signals_per_packet
remaining_signals = total_signals % signals_per_packet
# 处理完整的数据包
for i in range(num_full_packets):
start_idx = i * signals_per_packet
end_idx = start_idx + signals_per_packet
packet_signals = encoded_signals[start_idx:end_idx]
packet_bytes = self._create_packet_bytes(packet_signals)
packets.append(packet_bytes)
# 处理最后一个不完整的数据包
if remaining_signals > 0:
start_idx = num_full_packets * signals_per_packet
last_packet_signals = encoded_signals[start_idx:]
# 计算最后一个同步脉冲的结束时间戳
# 找到最后一个非零的同步脉冲
last_sync_pulse = None
if sync_pulse4_count > 0:
last_sync_pulse = {'count': sync_pulse4_count, 'period': sync_pulse4_period}
elif sync_pulse3_count > 0:
last_sync_pulse = {'count': sync_pulse3_count, 'period': sync_pulse3_period}
elif sync_pulse2_count > 0:
last_sync_pulse = {'count': sync_pulse2_count, 'period': sync_pulse2_period}
elif sync_pulse1_count > 0:
last_sync_pulse = {'count': sync_pulse1_count, 'period': sync_pulse1_period}
# 计算最后一个同步脉冲的结束时间戳10ns单位
last_sync_end_timestamp = 0
if last_sync_pulse:
last_sync_end_timestamp = (last_sync_pulse['count'] - 1) * last_sync_pulse['period']
# 补零前的最后一个信号将事件结束标志置0
if last_packet_signals:
last_packet_signals[-1] = last_packet_signals[-1] & ~(1 << 63)
# 计算需要补零到8的整数倍的数量
padding_to_8 = (8 - (remaining_signals % 8)) % 8
# 创建补齐到8的信号使用同步脉冲结束时间戳减一
padding_signals_8 = []
for i in range(padding_to_8):
padding_timestamp = max(0, last_sync_end_timestamp - 1)
if i == padding_to_8 - 1:
# 最后一个补齐信号事件结束标志置1
padding_signal = encoder.encode(3990+i, [0, 0, 0], event_end=True)
else:
# 其他补齐信号事件结束标志置0
padding_signal = encoder.encode(3990+i, [0, 0, 0], event_end=False)
padding_signals_8.append(padding_signal)
remaining_padding = signals_per_packet - remaining_signals - padding_to_8
padding_signals_zero = [0] * remaining_padding
# 合并所有填充信号
padded_signals = last_packet_signals + padding_signals_8 + padding_signals_zero
packet_bytes = self._create_packet_bytes(padded_signals)
packets.append(packet_bytes)
# 构建包含填充信号的完整信号列表用于debug输出
full_encoded_signals = encoded_signals[:start_idx] + padded_signals
else:
# 没有填充信号,使用原始信号
full_encoded_signals = encoded_signals
# 5. 写入文件
with open(output_file, 'wb') as f:
# 写入模拟参数96字节
f.write(param_data)
# 写入所有数据包
for packet in packets:
f.write(packet)
# 6. 获取输出格式设置
output_format = self.config.get('simulation', {}).get('output_format', 'all')
# 7. 如果需要,生成文本格式文件
if output_format in ['text', 'all']:
text_file = str(Path(output_file).with_suffix('')) + '_v4.txt'
self._write_time_series_text(encoded_signals, param_data, b'', text_file)
# 8. 如果需要,生成调试文件
if output_format in ['debug', 'all']:
debug_file = str(Path(output_file).with_suffix('')) + '_debug.txt'
# self._write_time_series_debug(time_series, encoded_signals, debug_file)
self._write_time_series_debug(time_series, full_encoded_signals, debug_file)
# 9. 打印统计信息
total_data_bytes = sum(len(packet) for packet in packets)
print("=" * 80)
print("时间序列数据文件结构")
print("=" * 80)
print(f"模拟参数: {len(param_data)} 字节 (偏移: 0x0000-0x005F)")
print(f"时间序列数据: {total_data_bytes} 字节 (偏移: 0x0060-0x{0x60 + total_data_bytes - 1:04X})")
print(f"总文件大小: {len(param_data) + total_data_bytes} 字节")
print(f"\n时间序列统计:")
print(f" 总时长: {time_series['total_duration']} (10ns)")
print(f" 探测器1信号数: {len(time_series['detector1'])}")
print(f" 探测器2信号数: {len(time_series['detector2'])}")
print(f" 探测器3信号数: {len(time_series['detector3'])}")
print(f" 编码信号总数: {total_signals}")
print(f" 数据包数量: {len(packets)}")
print(f" 每数据包: {self.packing_config['bytes_per_packet']} 字节")
print(f"\n指令参数:")
print(f" 信号类型: 0x{signal_type:04X}")
print(f" 同步脉冲1: {sync_pulse1_count}个, 周期={sync_pulse1_period} (10ns)")
print(f" 同步脉冲2: {sync_pulse2_count}个, 周期={sync_pulse2_period} (10ns)")
print(f" 同步脉冲3: {sync_pulse3_count}个, 周期={sync_pulse3_period} (10ns)")
print(f" 同步脉冲4: {sync_pulse4_count}个, 周期={sync_pulse4_period} (10ns)")
print("=" * 80)
logger.info(f"已写入时间序列数据文件: {output_file}")
def _write_time_series_text(self, encoded_signals: list, param_data: bytes,
stop_instruction: bytes, output_file: str):
"""写入时间序列文本文件(每行一个十进制整数)"""
all_bytes = param_data
# 重新组织信号8个一组组内逆序
reorganized_signals = self._reorganize_signals_for_output(encoded_signals)
# 将所有编码信号转换为字节
for signal in reorganized_signals:
all_bytes += struct.pack('>Q', signal)
# 写入文件(每行一个十进制整数)
with open(output_file, 'w', encoding='utf-8') as f:
for byte_value in all_bytes:
f.write(f"{byte_value}\n")
print(f"文本文件已生成: {output_file} ({len(all_bytes)} 字节)")
logger.info(f"已写入时间序列文本文件: {output_file}")
def _write_time_series_debug(self, time_series: dict, encoded_signals: list, output_file: str):
"""写入时间序列调试文件"""
encoder = BitFieldEncoder()
with open(output_file, 'w', encoding='utf-8') as f:
# 写入头部信息
f.write("# 探测器信号数据 V4.2 - 打包模式: combined\n")
f.write("# 每个数据包: 800个点火信号 + 2字节CRC16 = 6402字节\n")
f.write("# 8个信号为一组每组内从后往前导出逆序排列: 8,7,6,5,4,3,2,1, 16,15...\n")
f.write("# 格式: 事件ID | 信号ID | 十六进制编码 | 解析结果\n")
f.write("# 注: 能量为0的探测器不显示\n")
f.write("#" * 120 + "\n\n")
# 写入模拟参数
f.write("#\n")
f.write("# 模拟参数:\n")
# 同步脉冲参数
sync_pulses = self.config.get('simulation', {}).get('sync_pulses', [])
for i, pulse in enumerate(sync_pulses, 1):
count = pulse.get('count', 0)
period = pulse.get('period', 0)
f.write(f"# 同步脉冲{i}: 个数={count}, 周期={period} (10ns)\n")
# 能量和AD转换参数
energy_conversion = self.config.get('simulation', {}).get('energy_conversion', {})
energy_K = energy_conversion.get('K', 1.0)
energy_B = energy_conversion.get('B', 0.0)
f.write(f"# 能量转换参数: K={energy_K}, B={energy_B} (mV = keV × K + B)\n")
f.write("#" * 120 + "\n\n")
total_signals = len(encoded_signals)
# 预先计算每个信号对应的事件ID
signal_event_ids = []
current_event_id = 0
for signal_idx in range(total_signals):
# 检查是否是事件开始
if signal_idx == 0:
current_event_id = 1 # 第一个信号事件ID从1开始
elif encoder.decode(encoded_signals[signal_idx - 1])['event_end']:
current_event_id += 1 # 前一个信号是事件结束,新事件开始
signal_event_ids.append(current_event_id)
# 处理每组8个信号组内逆序导出
num_groups = total_signals // 8
for group_idx in range(num_groups):
# 计算当前组的结束索引(即该组的最后一个信号索引+1
end_idx = (group_idx + 1) * 8
start_idx = end_idx - 8
group_signals = encoded_signals[start_idx:end_idx]
# 组内逆序处理从后往前即从第8个到第1个
for j in range(len(group_signals) - 1, -1, -1):
signal_idx = start_idx + j
signal = group_signals[j]
display_idx = signal_idx + 1 # 显示的信号序号1-based
event_id = signal_event_ids[signal_idx] # 获取预先计算的事件ID
hex_value = f"0x{signal:016X}"
# 解析信号
decoded = encoder.decode(signal)
timestamp = decoded['timestamp']
event_end = decoded['event_end']
energies = decoded['energies']
# 构建信号描述只显示能量不为0的探测器
signal_desc = []
if energies[0] > 0:
det1_kev = (energies[0] - self.energy_B) / self.energy_K if self.energy_K != 0 else 0
signal_desc.append(f"Det1:{det1_kev:.0f}keV")
if energies[1] > 0:
det2_kev = (energies[1] - self.energy_B) / self.energy_K if self.energy_K != 0 else 0
signal_desc.append(f"Det2:{det2_kev:.0f}keV")
if energies[2] > 0:
det3_kev = (energies[2] - self.energy_B) / self.energy_K if self.energy_K != 0 else 0
signal_desc.append(f"Det3:{det3_kev:.0f}keV")
# 如果没有探测器有能量,显示无信号
if not signal_desc:
signal_desc_str = "无信号"
else:
signal_desc_str = ", ".join(signal_desc)
# 写入信号行
f.write(f" {event_id:4d} | {display_idx:4d} | {hex_value} | 时间戳:{timestamp}us, 信号:{signal_desc_str}\n")
# 如果是事件结束,添加分隔线
if event_end:
f.write("-" * 120 + "\n")
# 每组信号后空一行
if group_idx < num_groups - 1:
f.write("\n")
# 写入数据包信息
total_packets = (total_signals + 799) // 800 # 每800个信号一个数据包
f.write("\n" + "#" * 120 + "\n")
f.write(f"# 数据包统计: 共 {total_packets} 个数据包\n")
f.write(f"# 每个数据包包含 800 个信号 + 2 字节 CRC16\n")
f.write(f"# 总信号数: {total_signals}\n")
f.write(f"# 总事件数: {current_event_id}\n")
print(f"调试文件已生成: {output_file}")
logger.info(f"已写入时间序列调试文件: {output_file}")