#!/usr/bin/env python3 """ 数据写入器模块 """ import struct import logging from pathlib import Path from core.encoder import BitFieldEncoder # 设置日志 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) class InstructionConfigV4: """指令配置类 V4""" # 指令部分各字段定义 INSTRUCTION_FORMAT = { 'header': { 'bytes': 2, 'value': [0xAA, 0xAA], 'description': '包头' }, 'signal_type': { 'bytes': 2, 'value': [0x00, 0x07], 'description': '信号类型' }, 'ddr_start_addr': { 'bytes': 4, 'value': [0x00, 0x00, 0x00, 0x00], 'description': 'DDR起始地址' }, 'ddr_end_addr': { 'bytes': 4, 'value': None, # 根据数据长度计算 'description': 'DDR结束地址' }, 'sync_pulse1_count': { 'bytes': 4, 'value': [0x00, 0x00, 0x01, 0xDB], # 475 'description': '同步脉冲1个数' }, 'sync_pulse1_period': { 'bytes': 4, 'value': [0x00, 0x00, 0x3E, 0x80], # 16000 'description': '同步脉冲1周期' }, 'sync_pulse2_count': { 'bytes': 4, 'value': [0x00, 0x00, 0x00, 0x14], # 20 'description': '同步脉冲2个数' }, 'sync_pulse2_period': { 'bytes': 4, 'value': [0x00, 0x01, 0x86, 0xA0], # 100000 'description': '同步脉冲2周期' }, 'sync_pulse3_count': { 'bytes': 4, 'value': [0x00, 0x00, 0x00, 0x01], # 1 'description': '同步脉冲3个数' }, 'sync_pulse3_period': { 'bytes': 4, 'value': [0x00, 0x06, 0x1A, 0x80], # 400000 'description': '同步脉冲3周期' }, 'sync_pulse4_count': { 'bytes': 4, 'value': [0x00, 0x00, 0x00, 0x00], # 0 'description': '同步脉冲4个数' }, 'sync_pulse4_period': { 'bytes': 4, 'value': [0x00, 0x00, 0x00, 0x00], # 0 'description': '同步脉冲4周期' }, 'start_stop_control': { 'bytes': 2, 'value': [0x00, 0x00], 'description': '启停控制' }, 'crc16': { 'bytes': 2, 'value': None, # 根据前面数据计算 'description': 'Modbus CRC16校验和' } } # 打包配置 PACKING_CONFIG = { 'signals_per_packet': 800, # 每包800个点火信号 'bytes_per_signal': 8, # 每个信号8字节 'crc_bytes': 2, # CRC16校验和2字节 'bytes_per_packet': 6402, # 800*8 + 2 = 6402字节 'group_size': 8, # 8个信号为一组 'padding_byte': 0x00 # 填充字节 } @classmethod def get_instruction_length(cls) -> int: """获取指令部分的总长度(字节)""" total = 0 for field_info in cls.INSTRUCTION_FORMAT.values(): total += field_info['bytes'] return total @classmethod def calculate_ddr_end_addr(cls, total_data_bytes: int) -> list: """ 计算DDR结束地址 DDR结束地址以信号为单位,每个信号8字节 每包数据6400字节 = 800个信号 参数: total_data_bytes: 文件总字节数(包括指令部分) 返回: DDR结束地址的4字节列表(以信号为单位) """ # 减去两个指令的大小(启动指令48字节 + 停止指令48字节 = 96字节),得到纯数据大小 instruction_size = cls.get_instruction_length() * 2 # 两个指令 data_size = total_data_bytes - instruction_size # 计算包的数量(每包6400字节,向上取整) packet_size = 6400 packet_count = (data_size + packet_size - 1) // packet_size # DDR结束地址 = 包的数量 * 800(每包800个信号) end_addr = packet_count * 800 # 转换为4字节列表(大端序) return [(end_addr >> (8 * (3 - i))) & 0xFF for i in range(4)] @classmethod def calculate_crc16(cls, data: bytes) -> list: """ 计算Modbus CRC16校验和 使用标准Modbus CRC16算法 """ crc = 0xFFFF for byte in data: crc ^= byte for _ in range(8): if crc & 0x0001: crc = (crc >> 1) ^ 0xA001 else: crc >>= 1 # 返回低字节在前 return [crc & 0xFF, (crc >> 8) & 0xFF] @classmethod def generate_instruction_bytes(cls, total_data_bytes: int) -> bytes: """ 生成完整的指令部分字节流 参数: total_data_bytes: 总数据字节数(不包括指令部分) 返回: 指令部分的字节流 """ instruction_data = bytearray() # 1. 包头 instruction_data.extend(bytes(cls.INSTRUCTION_FORMAT['header']['value'])) # 2. 信号类型 instruction_data.extend(bytes(cls.INSTRUCTION_FORMAT['signal_type']['value'])) # 3. DDR起始地址 instruction_data.extend(bytes(cls.INSTRUCTION_FORMAT['ddr_start_addr']['value'])) # 4. DDR结束地址(根据总数据大小计算) ddr_end_addr = cls.calculate_ddr_end_addr(total_data_bytes) instruction_data.extend(bytes(ddr_end_addr)) # 5. 同步脉冲1个数 instruction_data.extend(bytes(cls.INSTRUCTION_FORMAT['sync_pulse1_count']['value'])) # 6. 同步脉冲1周期 instruction_data.extend(bytes(cls.INSTRUCTION_FORMAT['sync_pulse1_period']['value'])) # 7. 同步脉冲2个数 instruction_data.extend(bytes(cls.INSTRUCTION_FORMAT['sync_pulse2_count']['value'])) # 8. 同步脉冲2周期 instruction_data.extend(bytes(cls.INSTRUCTION_FORMAT['sync_pulse2_period']['value'])) # 9. 同步脉冲3个数 instruction_data.extend(bytes(cls.INSTRUCTION_FORMAT['sync_pulse3_count']['value'])) # 10. 同步脉冲3周期 instruction_data.extend(bytes(cls.INSTRUCTION_FORMAT['sync_pulse3_period']['value'])) # 11. 同步脉冲4个数 instruction_data.extend(bytes(cls.INSTRUCTION_FORMAT['sync_pulse4_count']['value'])) # 12. 同步脉冲4周期 instruction_data.extend(bytes(cls.INSTRUCTION_FORMAT['sync_pulse4_period']['value'])) # 13. 启停控制 instruction_data.extend(bytes(cls.INSTRUCTION_FORMAT['start_stop_control']['value'])) # 14. 计算CRC16(不包括CRC字段本身) crc_bytes = cls.calculate_crc16(instruction_data) instruction_data.extend(bytes(crc_bytes)) return bytes(instruction_data) @classmethod def print_instruction_info(cls, total_data_bytes: int): """打印指令信息""" print("=" * 60) print("指令配置信息") print("=" * 60) # 生成指令字节流 instruction_bytes = cls.generate_instruction_bytes(total_data_bytes) # 解析并打印每个字段 offset = 0 for field_name, field_info in cls.INSTRUCTION_FORMAT.items(): field_bytes = field_info['bytes'] field_data = instruction_bytes[offset:offset + field_bytes] # 格式化显示 hex_str = ' '.join(f'0x{b:02X}' for b in field_data) int_str = ' '.join(f'{b:3d}' for b in field_data) if field_name == 'ddr_end_addr' and field_info['value'] is None: # 计算DDR结束地址 end_addr_bytes = cls.calculate_ddr_end_addr(total_data_bytes) end_addr = sum(end_addr_bytes[i] << (8 * (3 - i)) for i in range(4)) print(f"{field_info['description']:20s}: {hex_str} ({int_str}) -> 地址: 0x{end_addr:08X} ({end_addr})") elif field_name == 'crc16' and field_info['value'] is None: crc_value = (field_data[1] << 8) | field_data[0] print(f"{field_info['description']:20s}: {hex_str} ({int_str}) -> CRC16: 0x{crc_value:04X}") else: print(f"{field_info['description']:20s}: {hex_str} ({int_str})") offset += field_bytes # 打印内存信息 end_addr_bytes = cls.calculate_ddr_end_addr(total_data_bytes) end_addr = sum(end_addr_bytes[i] << (8 * (3 - i)) for i in range(4)) print("\n内存信息:") print(f" 总数据字节: {total_data_bytes} 字节") print(f" DDR起始地址: 0x{0:08X}") print(f" DDR结束地址: 0x{end_addr:08X} ({end_addr})") print(f" 验证 (end_addr + 8) % 800: {(end_addr + 8) % 800} (应为0)") print("=" * 60) @classmethod def generate_start_instruction(cls, signal_type: int, ddr_start_addr: int, ddr_end_addr: int, sync_pulse1_count: int, sync_pulse1_period: int, sync_pulse2_count: int, sync_pulse2_period: int, sync_pulse3_count: int, sync_pulse3_period: int, sync_pulse4_count: int, sync_pulse4_period: int) -> bytes: """ 生成启动指令 参数: signal_type: 信号类型 (0x07 或 0x70) ddr_start_addr: DDR起始地址 ddr_end_addr: DDR结束地址 sync_pulse1_count: 同步脉冲1个数 sync_pulse1_period: 同步脉冲1周期(10ns) sync_pulse2_count: 同步脉冲2个数 sync_pulse2_period: 同步脉冲2周期(10ns) sync_pulse3_count: 同步脉冲3个数 sync_pulse3_period: 同步脉冲3周期(10ns) sync_pulse4_count: 同步脉冲4个数 sync_pulse4_period: 同步脉冲4周期(10ns) 返回: 启动指令的字节流(48字节) """ instruction_data = bytearray() # 1. 包头 instruction_data.extend(bytes(cls.INSTRUCTION_FORMAT['header']['value'])) # 2. 信号类型 instruction_data.extend([(signal_type >> 8) & 0xFF, signal_type & 0xFF]) # 3. DDR起始地址 instruction_data.extend([(ddr_start_addr >> (8 * (3 - i))) & 0xFF for i in range(4)]) # 4. DDR结束地址 instruction_data.extend([(ddr_end_addr >> (8 * (3 - i))) & 0xFF for i in range(4)]) # 5. 同步脉冲1个数 instruction_data.extend([(sync_pulse1_count >> (8 * (3 - i))) & 0xFF for i in range(4)]) # 6. 同步脉冲1周期 instruction_data.extend([(sync_pulse1_period >> (8 * (3 - i))) & 0xFF for i in range(4)]) # 7. 同步脉冲2个数 instruction_data.extend([(sync_pulse2_count >> (8 * (3 - i))) & 0xFF for i in range(4)]) # 8. 同步脉冲2周期 instruction_data.extend([(sync_pulse2_period >> (8 * (3 - i))) & 0xFF for i in range(4)]) # 9. 同步脉冲3个数 instruction_data.extend([(sync_pulse3_count >> (8 * (3 - i))) & 0xFF for i in range(4)]) # 10. 同步脉冲3周期 instruction_data.extend([(sync_pulse3_period >> (8 * (3 - i))) & 0xFF for i in range(4)]) # 11. 同步脉冲4个数 instruction_data.extend([(sync_pulse4_count >> (8 * (3 - i))) & 0xFF for i in range(4)]) # 12. 同步脉冲4周期 instruction_data.extend([(sync_pulse4_period >> (8 * (3 - i))) & 0xFF for i in range(4)]) # 13. 启停控制(启动:0x0001) instruction_data.extend([0x00, 0x01]) # 14. 计算CRC16(不包括CRC字段本身) crc_bytes = cls.calculate_crc16(instruction_data) instruction_data.extend(bytes(crc_bytes)) return bytes(instruction_data) @classmethod def generate_stop_instruction(cls, signal_type: int, ddr_start_addr: int, ddr_end_addr: int, sync_pulse1_count: int, sync_pulse1_period: int, sync_pulse2_count: int, sync_pulse2_period: int, sync_pulse3_count: int, sync_pulse3_period: int, sync_pulse4_count: int, sync_pulse4_period: int) -> bytes: """ 生成停止指令 参数: signal_type: 信号类型 (0x07 或 0x70) ddr_start_addr: DDR起始地址 ddr_end_addr: DDR结束地址 sync_pulse1_count: 同步脉冲1个数 sync_pulse1_period: 同步脉冲1周期(10ns) sync_pulse2_count: 同步脉冲2个数 sync_pulse2_period: 同步脉冲2周期(10ns) sync_pulse3_count: 同步脉冲3个数 sync_pulse3_period: 同步脉冲3周期(10ns) sync_pulse4_count: 同步脉冲4个数 sync_pulse4_period: 同步脉冲4周期(10ns) 返回: 停止指令的字节流(48字节) """ instruction_data = bytearray() # 1. 包头 instruction_data.extend(bytes(cls.INSTRUCTION_FORMAT['header']['value'])) # 2. 信号类型 instruction_data.extend([(signal_type >> 8) & 0xFF, signal_type & 0xFF]) # 3. DDR起始地址 instruction_data.extend([(ddr_start_addr >> (8 * (3 - i))) & 0xFF for i in range(4)]) # 4. DDR结束地址 instruction_data.extend([(ddr_end_addr >> (8 * (3 - i))) & 0xFF for i in range(4)]) # 5. 同步脉冲1个数 instruction_data.extend([(sync_pulse1_count >> (8 * (3 - i))) & 0xFF for i in range(4)]) # 6. 同步脉冲1周期 instruction_data.extend([(sync_pulse1_period >> (8 * (3 - i))) & 0xFF for i in range(4)]) # 7. 同步脉冲2个数 instruction_data.extend([(sync_pulse2_count >> (8 * (3 - i))) & 0xFF for i in range(4)]) # 8. 同步脉冲2周期 instruction_data.extend([(sync_pulse2_period >> (8 * (3 - i))) & 0xFF for i in range(4)]) # 9. 同步脉冲3个数 instruction_data.extend([(sync_pulse3_count >> (8 * (3 - i))) & 0xFF for i in range(4)]) # 10. 同步脉冲3周期 instruction_data.extend([(sync_pulse3_period >> (8 * (3 - i))) & 0xFF for i in range(4)]) # 11. 同步脉冲4个数 instruction_data.extend([(sync_pulse4_count >> (8 * (3 - i))) & 0xFF for i in range(4)]) # 12. 同步脉冲4周期 instruction_data.extend([(sync_pulse4_period >> (8 * (3 - i))) & 0xFF for i in range(4)]) # 13. 启停控制(停止:0x0000) instruction_data.extend([0x00, 0x02]) # 14. 计算CRC16(不包括CRC字段本身) crc_bytes = cls.calculate_crc16(instruction_data) instruction_data.extend(bytes(crc_bytes)) return bytes(instruction_data) def calculate_crc16(data: bytes) -> bytes: """计算Modbus CRC16校验和""" crc = 0xFFFF for byte in data: crc ^= byte for _ in range(8): if crc & 0x0001: crc = (crc >> 1) ^ 0xA001 else: crc >>= 1 # 返回低字节在前的2字节 return struct.pack(' bytes: """ 将64位整数编码为8字节(高字节在前) """ return struct.pack('>Q', signal) # 大端序 def _reorganize_signals_for_output(self, signals: list) -> list: """ 重新组织信号用于输出 8个信号为一组,组内按照信号序号逆序排列 例如:8,7,6,5,4,3,2,1, 16,15,14,13,12,11,10,9, ... """ group_size = 8 num_groups = len(signals) // group_size reorganized = [] for group_idx in range(num_groups): # 计算当前组的结束索引(即该组的最后一个信号索引+1) end_idx = (group_idx + 1) * group_size start_idx = end_idx - group_size group = signals[start_idx:end_idx] # 组内逆序排列:从后往前取信号 reversed_group = list(reversed(group)) reorganized.extend(reversed_group) return reorganized def _create_packet_bytes(self, packet_signals: list) -> bytes: """ 创建一个数据包的字节流(6402字节) 参数: packet_signals: 一个数据包的信号列表(必须为800个) 返回: 数据包的字节流(6400数据字节 + 2字节CRC16) """ if len(packet_signals) != 800: raise ValueError(f"数据包必须包含800个信号,当前为{len(packet_signals)}") # 1. 重新组织信号(8个一组,从后往前) reorganized_signals = self._reorganize_signals_for_output(packet_signals) # 2. 编码为字节流 packet_data = bytearray() for signal in reorganized_signals: packet_data.extend(self._encode_signal_to_bytes(signal)) # 3. 计算CRC16(对整个6400字节数据) crc_bytes = calculate_crc16(packet_data) # 4. 添加CRC16 packet_data.extend(crc_bytes) # 验证大小 if len(packet_data) != 6402: raise ValueError(f"数据包字节流大小错误: {len(packet_data)} != 6402") return bytes(packet_data) def _pack_events_separately(self, all_events: list) -> list: """ 单独打包模式:每个事件单独打包成一个数据包 参数: all_events: 所有事件的信号列表 返回: 数据包字节流列表 """ packets = [] for event_idx, event_signals in enumerate(all_events): # 每个事件单独处理 signals_needed = 800 # 如果事件信号不足800个,补零 if len(event_signals) < signals_needed: padding_needed = signals_needed - len(event_signals) padded_signals = event_signals + [0] * padding_needed elif len(event_signals) > signals_needed: # 如果超过800个,截断(理论上不应该发生) logger.warning(f"事件{event_idx+1}信号数({len(event_signals)})超过800,将截断") padded_signals = event_signals[:signals_needed] else: padded_signals = event_signals # 设置最后一个信号的事件结束标志位 if padded_signals[-1] != 0: padded_signals[-1] = self.encoder.set_event_end_flag(padded_signals[-1]) else: # 如果最后一个信号是0,设置为事件结束标记 padded_signals[-1] = self.encoder.encode(0, [0, 0, 0], event_end=True) # 创建数据包 packet_bytes = self._create_packet_bytes(padded_signals) packets.append(packet_bytes) logger.info(f"已打包事件 {event_idx + 1}/{len(all_events)} (单独打包模式)") return packets def _pack_events_combined(self, all_events: list) -> list: """ 整体打包模式:所有事件信号拼接,每800个信号一包 参数: all_events: 所有事件的信号列表 返回: 数据包字节流列表 """ # 1. 收集所有信号,并设置每个事件的最后一个信号为结束标志 all_signals = [] for event_idx, event_signals in enumerate(all_events): if event_signals: # 设置最后一个信号的事件结束标志位 if event_signals[-1] != 0: event_signals[-1] = self.encoder.set_event_end_flag(event_signals[-1]) else: # 如果最后一个信号是0,设置为事件结束标记 event_signals[-1] = self.encoder.encode(0, [0, 0, 0], event_end=True) all_signals.extend(event_signals) else: # 如果事件没有信号,添加一个结束标记信号 end_signal = self.encoder.encode(0, [0, 0, 0], event_end=True) all_signals.append(end_signal) logger.info(f"已处理事件 {event_idx + 1}/{len(all_events)},信号数: {len(event_signals)}") # 2. 将信号分组成800个一包 packets = [] signals_per_packet = 800 total_signals = len(all_signals) num_full_packets = total_signals // signals_per_packet remaining_signals = total_signals % signals_per_packet logger.info(f"总信号数: {total_signals}, 完整包: {num_full_packets}, 剩余信号: {remaining_signals}") # 3. 处理完整的数据包 for i in range(num_full_packets): start_idx = i * signals_per_packet end_idx = start_idx + signals_per_packet packet_signals = all_signals[start_idx:end_idx] # 创建数据包 packet_bytes = self._create_packet_bytes(packet_signals) packets.append(packet_bytes) logger.info(f"已打包完整数据包 {i + 1}/{num_full_packets}") # 4. 处理最后一个不完整的数据包(如果有) if remaining_signals > 0: start_idx = num_full_packets * signals_per_packet last_packet_signals = all_signals[start_idx:] # 补零到800个信号 padding_needed = signals_per_packet - remaining_signals padded_signals = last_packet_signals + [0] * padding_needed # 创建最后一个数据包 packet_bytes = self._create_packet_bytes(padded_signals) packets.append(packet_bytes) logger.info(f"已打包最后一个数据包 (补零{padding_needed}个信号)") return packets def write_events_binary_v4(self, all_events: list, output_file: str): """ 将事件写入二进制文件 V4 注意: 此方法已弃用,请使用 write_time_series_with_instructions 方法 """ import warnings warnings.warn("write_events_binary_v4 方法已弃用,请使用 write_time_series_with_instructions 方法", DeprecationWarning, stacklevel=2) # 根据打包模式创建数据包 if self.packing_mode == 'separate': packets = self._pack_events_separately(all_events) print(f"打包模式: 单独打包 - 每个事件一个数据包") else: # combined packets = self._pack_events_combined(all_events) print(f"打包模式: 整体打包 - 所有事件信号拼接后分包") # 计算总数据字节数(不包括指令部分) total_data_bytes = sum(len(packet) for packet in packets) # 生成指令部分 instruction_bytes = self.instruction_config.generate_instruction_bytes(total_data_bytes) # 写入文件 with open(output_file, 'wb') as f: # 写入指令部分 # f.write(instruction_bytes) # 写入所有数据包 for packet_idx, packet in enumerate(packets): f.write(packet) # 统计信息 print(f"二进制文件结构:") print(f" 指令部分: {len(instruction_bytes)} 字节") print(f" 数据包数量: {len(packets)}") print(f" 总数据字节: {total_data_bytes} 字节") print(f" 每数据包: {self.packing_config['bytes_per_packet']} 字节") print(f" 总大小: {len(instruction_bytes) + total_data_bytes} 字节") logger.info(f"已将 {len(packets)} 个数据包写入二进制文件 V4: {output_file}") def write_events_text_v4(self, all_events: list, output_file: str): """ 将事件写入文本文件 V4 每行一个十进制整数(0-255) 注意: 此方法已弃用,请使用 write_time_series_with_instructions 方法 """ import warnings warnings.warn("write_events_text_v4 方法已弃用,请使用 write_time_series_with_instructions 方法", DeprecationWarning, stacklevel=2) # 根据打包模式创建数据包 if self.packing_mode == 'separate': packets = self._pack_events_separately(all_events) print(f"打包模式: 单独打包 - 每个事件一个数据包") else: # combined packets = self._pack_events_combined(all_events) print(f"打包模式: 整体打包 - 所有事件信号拼接后分包") # 计算总数据字节数(不包括指令部分) total_data_bytes = sum(len(packet) for packet in packets) # 生成指令部分 instruction_bytes = self.instruction_config.generate_instruction_bytes(total_data_bytes) # 将所有字节合并 all_bytes = instruction_bytes for packet in packets: all_bytes += packet # 转换为整数列表 int_list = list(all_bytes) # 写入文件(每行一个十进制整数) with open(output_file, 'w', encoding='utf-8') as f: for i, byte_value in enumerate(int_list): f.write(f"{byte_value}\n") # 打印信息 print(f"文本文件结构:") print(f" 总字节数: {len(int_list)}") print(f" 总行数: {len(int_list)}") print(f" 指令部分: {len(instruction_bytes)} 字节 (行 1-{len(instruction_bytes)})") print(f" 数据部分: {total_data_bytes} 字节 (行 {len(instruction_bytes)+1}-{len(int_list)})") print(f" 数据包数量: {len(packets)}") print(f" 每数据包: {self.packing_config['bytes_per_packet']} 字节 = {self.packing_config['bytes_per_packet']} 行") print(f" 每行一个0-255的十进制整数") logger.info(f"已将 {len(int_list)} 个字节写入文本文件: {output_file}") def write_events_debug_text(self, all_events: list, output_file: str): """ 写入调试文本文件(详细事件信息) 8个信号为一组,组内按照信号序号逆序排列 """ with open(output_file, 'w', encoding='utf-8') as f: f.write(f"# 探测器信号数据 V4.2 - 打包模式: {self.packing_mode}\n") f.write("# 每个数据包: 800个点火信号 + 2字节CRC16 = 6402字节\n") f.write("# 8个信号为一组,每组内从后往前导出(逆序排列: 8,7,6,5,4,3,2,1, 16,15...)\n") f.write("# 格式: 事件ID | 组内序号 | 十六进制编码 | 解析结果\n") f.write(f"# 能量转换: mV = keV × {self.energy_K} + {self.energy_B}\n") # 写入模拟参数 f.write("#\n") f.write("# 模拟参数:\n") # 同步脉冲参数 sync_pulses = self.config.get('simulation', {}).get('sync_pulses', []) for i, pulse in enumerate(sync_pulses, 1): count = pulse.get('count', 0) period = pulse.get('period', 0) f.write(f"# 同步脉冲{i}: 个数={count}, 周期={period} (10ns)\n") # 能量和AD转换参数 energy_conversion = self.config.get('simulation', {}).get('energy_conversion', {}) energy_K = energy_conversion.get('K', 1.0) energy_B = energy_conversion.get('B', 0.0) f.write(f"# 能量转换参数: K={energy_K}, B={energy_B} (mV = keV × K + B)\n") f.write("#" * 120 + "\n") for event_idx, event_signals in enumerate(all_events): valid_count = 0 # 处理事件内的信号,每8个一组,组内逆序 num_groups = len(event_signals) // 8 for group_idx in range(num_groups): # 计算当前组的结束索引(即该组的最后一个信号索引+1) end_idx = (group_idx + 1) * 8 start_idx = end_idx - 8 group_signals = event_signals[start_idx:end_idx] # 组内逆序处理:从后往前(即从第8个到第1个) for j in range(len(group_signals) - 1, -1, -1): sig_idx = start_idx + j # 原始信号索引(0-based) display_idx = sig_idx + 1 # 显示的信号序号(1-based) encoded = group_signals[j] hex_str = self.encoder.to_hex_string(encoded) if encoded == 0: info = "填充信号" else: valid_count += 1 decoded = self.encoder.decode(encoded) info_parts = [] if decoded['event_end']: info_parts.append("事件结束") info_parts.append(f"时间戳:{decoded['timestamp']}us") energies = decoded['energies'] active_dets = [] for k in range(3): if energies[k] > 0: energy_mv = energies[k] # 反向计算keV值: keV = (mV - B) / K if self.energy_K != 0: energy_kev = (energy_mv - self.energy_B) / self.energy_K else: energy_kev = 0 active_dets.append(f"Det{k+1}:{energy_kev:.1f}keV({energy_mv}mV)") if active_dets: info_parts.append(f"信号:{', '.join(active_dets)}") info = ", ".join(info_parts) marker = " [事件结束]" if sig_idx == len(event_signals) - 1 else "" f.write(f"{event_idx:4d} | {display_idx:4d} | 0x{hex_str} | {info}{marker}\n") # 每组信号后空一行 if group_idx < num_groups - 1: f.write("\n") # 事件结束后添加总结 f.write(f"# 总结: 有效信号={valid_count}, 总信号={len(event_signals)}\n\n") logger.info(f"已将 {len(all_events)} 个事件写入调试文本文件: {output_file}") def write_events(self, all_events: list, output_file: str = None): """ 根据配置写入所有格式的文件 注意: 此方法已弃用,请使用 write_time_series_with_instructions 方法 """ import warnings warnings.warn("write_events 方法已弃用,请使用 write_time_series_with_instructions 方法", DeprecationWarning, stacklevel=2) if output_file is None: output_file = self.config['simulation']['output_file'] output_format = self.config['simulation'].get('output_format', 'all') base_name = Path(output_file).stem if output_format == 'binary': binary_file = f"{base_name}.bin" self.write_events_binary_v4(all_events, binary_file) elif output_format == 'text': text_file = f"{base_name}_v4.txt" self.write_events_text_v4(all_events, text_file) elif output_format == 'debug': debug_file = f"{base_name}_debug.txt" self.write_events_debug_text(all_events, debug_file) elif output_format == 'all': binary_file = f"{base_name}.bin" text_file = f"{base_name}_v4.txt" debug_file = f"{base_name}_debug.txt" print("生成所有格式文件:") print(f" 1. {binary_file} - 二进制格式(指令+数据)") print(f" 2. {text_file} - 文本格式(每行一个十进制整数)") print(f" 3. {debug_file} - 调试文本格式(详细解析)") print() self.write_events_binary_v4(all_events, binary_file) print() self.write_events_text_v4(all_events, text_file) print() self.write_events_debug_text(all_events, debug_file) else: raise ValueError(f"不支持的输出格式: {output_format}") def print_file_info(self, all_events: list): """ 打印文件信息 """ num_events = len(all_events) print("=" * 80) print(f"文件输出信息 V4.2 - 打包模式: {self.packing_mode}") print("=" * 80) # 计算数据包数量 if self.packing_mode == 'separate': num_packets = num_events else: # combined total_signals = sum(len(event) for event in all_events) num_full_packets = total_signals // 800 remaining_signals = total_signals % 800 num_packets = num_full_packets + (1 if remaining_signals > 0 else 0) # 计算总数据字节数 total_data_bytes = num_packets * self.packing_config['bytes_per_packet'] # 打印指令信息 self.instruction_config.print_instruction_info(total_data_bytes) # 打印事件统计 total_valid_signals = 0 total_all_signals = 0 for event_idx, event_signals in enumerate(all_events): valid_in_event = sum(1 for sig in event_signals if sig != 0) total_valid_signals += valid_in_event total_all_signals += len(event_signals) # 检查最后一个信号的事件结束标志 if event_signals: last_signal = event_signals[-1] if last_signal != 0: last_decoded = self.encoder.decode(last_signal) if not last_decoded['event_end']: print(f"警告: 事件{event_idx+1}的最后一个信号没有设置事件结束标志") print("\n事件统计:") print(f" 总事件数: {num_events}") print(f" 总信号数: {total_all_signals}") print(f" 有效信号数: {total_valid_signals}") print(f" 平均每事件信号数: {total_all_signals/num_events:.1f}") print(f"\n数据包统计 ({self.packing_mode}模式):") print(f" 数据包数量: {num_packets}") print(f" 每数据包: {self.packing_config['bytes_per_packet']} 字节") print(f" 总数据字节: {total_data_bytes} 字节") if self.packing_mode == 'combined': print(f" 完整数据包: {total_all_signals // 800}") print(f" 最后一个数据包信号数: {total_all_signals % 800 if total_all_signals % 800 != 0 else 800}") print(f" 最后一个数据包补零数: {800 - (total_all_signals % 800) if total_all_signals % 800 != 0 else 0}") print("=" * 80) def write_time_series_with_instructions(self, time_series: dict, output_file: str, signal_type: int, simulation_mode: str, sync_pulse1_count: int, sync_pulse1_period: int, sync_pulse2_count: int, sync_pulse2_period: int, sync_pulse3_count: int, sync_pulse3_period: int, sync_pulse4_count: int, sync_pulse4_period: int, encoded_signals: list = None): """ 写入时间序列数据文件(包含模拟参数和时间序列数据) 参数: time_series: 时间序列字典 output_file: 输出文件路径 signal_type: 信号类型 (0x07 或 0x70) simulation_mode: 模拟模式 (CO 或 MO) sync_pulse1_count: 同步脉冲1个数 sync_pulse1_period: 同步脉冲1周期(10ns) sync_pulse2_count: 同步脉冲2个数 sync_pulse2_period: 同步脉冲2周期(10ns) sync_pulse3_count: 同步脉冲3个数 sync_pulse3_period: 同步脉冲3周期(10ns) sync_pulse4_count: 同步脉冲4个数 sync_pulse4_period: 同步脉冲4周期(10ns) encoded_signals: 可选的预编码信号列表(包含事件结束标志) """ # 1. 生成模拟参数(替代启动/停止指令) # 参数格式: # 字节0-3: 魔法数 0x52445353 (RDSS) # 字节4-7: 版本号 0x00010000 (v1.0.0) # 字节8-9: 信号类型 # 字节10-13: 实际有效信号数 # 字节14-15: 模拟模式 # 字节16-19: 同步脉冲1个数 # 字节20-23: 同步脉冲1周期 # 字节24-27: 同步脉冲2个数 # 字节28-31: 同步脉冲2周期 # 字节32-35: 同步脉冲3个数 # 字节36-39: 同步脉冲3周期 # 字节40-43: 同步脉冲4个数 # 字节44-47: 同步脉冲4周期 # 字节48-53: 保留 # 字节54-57: 校验和(简单累加和) param_data = bytearray() # 魔法数 param_data.extend(struct.pack('>I', 0x52445353)) # RDSS # 版本号 param_data.extend(struct.pack('>I', 0x00010000)) # v1.0.0 # 信号类型 param_data.extend(struct.pack('>H', signal_type)) # 实际有效信号数 actual_signal_count = len(encoded_signals) if encoded_signals else 0 param_data.extend(struct.pack('>I', actual_signal_count)) # 模拟模式 (0=CO, 1=MO) mode_value = 0 if simulation_mode.upper() == 'CO' else 1 param_data.extend(struct.pack('>H', mode_value)) # 同步脉冲1 param_data.extend(struct.pack('>I', sync_pulse1_count)) param_data.extend(struct.pack('>I', sync_pulse1_period)) # 同步脉冲2 param_data.extend(struct.pack('>I', sync_pulse2_count)) param_data.extend(struct.pack('>I', sync_pulse2_period)) # 同步脉冲3 param_data.extend(struct.pack('>I', sync_pulse3_count)) param_data.extend(struct.pack('>I', sync_pulse3_period)) # 同步脉冲4 param_data.extend(struct.pack('>I', sync_pulse4_count)) param_data.extend(struct.pack('>I', sync_pulse4_period)) # 保留 param_data.extend(struct.pack('>I', 0)) param_data.extend(struct.pack('>H', 0)) # 计算校验和(简单累加和) checksum = sum(param_data) % 0xFFFFFFFF param_data.extend(struct.pack('>I', checksum)) # 确保参数部分长度为96字节(与原来的两个指令长度一致) padding = 96 - len(param_data) if padding > 0: param_data.extend(b'\x00' * padding) # 3. 编码时间序列信号 encoder = BitFieldEncoder() overflow_count = 0 # 如果已经提供了编码信号,则直接使用 if encoded_signals is None: encoded_signals = [] detector_signals = { "detector1": time_series["detector1"], "detector2": time_series["detector2"], "detector3": time_series["detector3"] } # 合并探测器信号 time_signal_map = {} for det_idx, det_name in enumerate(['detector1', 'detector2', 'detector3']): signals = detector_signals.get(det_name, []) for signal in signals: if len(signal) == 2: timestamp_10ns = signal[0] energy_mv = signal[1] if timestamp_10ns not in time_signal_map: time_signal_map[timestamp_10ns] = [0, 0, 0] time_signal_map[timestamp_10ns][det_idx] = energy_mv # 按时间戳排序并编码 sorted_timestamps = sorted(time_signal_map.keys()) overflow_count = 0 for timestamp in sorted_timestamps: energies = time_signal_map[timestamp] try: encoded = encoder.encode(timestamp, energies, event_end=False) encoded_signals.append(encoded) except ValueError as e: if "溢出" in str(e): overflow_count += 1 continue raise if overflow_count > 0: logger.warning(f"跳过了{overflow_count}个溢出信号") # 4. 将信号打包成数据包 packets = [] signals_per_packet = 800 total_signals = len(encoded_signals) num_full_packets = total_signals // signals_per_packet remaining_signals = total_signals % signals_per_packet # 处理完整的数据包 for i in range(num_full_packets): start_idx = i * signals_per_packet end_idx = start_idx + signals_per_packet packet_signals = encoded_signals[start_idx:end_idx] packet_bytes = self._create_packet_bytes(packet_signals) packets.append(packet_bytes) # 处理最后一个不完整的数据包 if remaining_signals > 0: start_idx = num_full_packets * signals_per_packet last_packet_signals = encoded_signals[start_idx:] # 计算最后一个同步脉冲的结束时间戳 # 找到最后一个非零的同步脉冲 last_sync_pulse = None if sync_pulse4_count > 0: last_sync_pulse = {'count': sync_pulse4_count, 'period': sync_pulse4_period} elif sync_pulse3_count > 0: last_sync_pulse = {'count': sync_pulse3_count, 'period': sync_pulse3_period} elif sync_pulse2_count > 0: last_sync_pulse = {'count': sync_pulse2_count, 'period': sync_pulse2_period} elif sync_pulse1_count > 0: last_sync_pulse = {'count': sync_pulse1_count, 'period': sync_pulse1_period} # 计算最后一个同步脉冲的结束时间戳(10ns单位) last_sync_end_timestamp = 0 if last_sync_pulse: last_sync_end_timestamp = (last_sync_pulse['count'] - 1) * last_sync_pulse['period'] # 补零前的最后一个信号:将事件结束标志置0 if last_packet_signals: last_packet_signals[-1] = last_packet_signals[-1] & ~(1 << 63) # 计算需要补零到8的整数倍的数量 padding_to_8 = (8 - (remaining_signals % 8)) % 8 # 创建补齐到8的信号(使用同步脉冲结束时间戳减一) padding_signals_8 = [] for i in range(padding_to_8): padding_timestamp = max(0, last_sync_end_timestamp - 1) if i == padding_to_8 - 1: # 最后一个补齐信号,事件结束标志置1 padding_signal = encoder.encode(3990+i, [0, 0, 0], event_end=True) else: # 其他补齐信号,事件结束标志置0 padding_signal = encoder.encode(3990+i, [0, 0, 0], event_end=False) padding_signals_8.append(padding_signal) remaining_padding = signals_per_packet - remaining_signals - padding_to_8 padding_signals_zero = [0] * remaining_padding # 合并所有填充信号 padded_signals = last_packet_signals + padding_signals_8 + padding_signals_zero packet_bytes = self._create_packet_bytes(padded_signals) packets.append(packet_bytes) # 构建包含填充信号的完整信号列表用于debug输出 full_encoded_signals = encoded_signals[:start_idx] + padded_signals else: # 没有填充信号,使用原始信号 full_encoded_signals = encoded_signals # 5. 写入文件 with open(output_file, 'wb') as f: # 写入模拟参数(96字节) f.write(param_data) # 写入所有数据包 for packet in packets: f.write(packet) # 6. 获取输出格式设置 output_format = self.config.get('simulation', {}).get('output_format', 'all') # 7. 如果需要,生成文本格式文件 if output_format in ['text', 'all']: text_file = str(Path(output_file).with_suffix('')) + '_v4.txt' self._write_time_series_text(encoded_signals, param_data, b'', text_file) # 8. 如果需要,生成调试文件 if output_format in ['debug', 'all']: debug_file = str(Path(output_file).with_suffix('')) + '_debug.txt' # self._write_time_series_debug(time_series, encoded_signals, debug_file) self._write_time_series_debug(time_series, full_encoded_signals, debug_file) # 9. 打印统计信息 total_data_bytes = sum(len(packet) for packet in packets) print("=" * 80) print("时间序列数据文件结构") print("=" * 80) print(f"模拟参数: {len(param_data)} 字节 (偏移: 0x0000-0x005F)") print(f"时间序列数据: {total_data_bytes} 字节 (偏移: 0x0060-0x{0x60 + total_data_bytes - 1:04X})") print(f"总文件大小: {len(param_data) + total_data_bytes} 字节") print(f"\n时间序列统计:") print(f" 总时长: {time_series['total_duration']} (10ns)") print(f" 探测器1信号数: {len(time_series['detector1'])}") print(f" 探测器2信号数: {len(time_series['detector2'])}") print(f" 探测器3信号数: {len(time_series['detector3'])}") print(f" 编码信号总数: {total_signals}") print(f" 数据包数量: {len(packets)}") print(f" 每数据包: {self.packing_config['bytes_per_packet']} 字节") print(f"\n指令参数:") print(f" 信号类型: 0x{signal_type:04X}") print(f" 同步脉冲1: {sync_pulse1_count}个, 周期={sync_pulse1_period} (10ns)") print(f" 同步脉冲2: {sync_pulse2_count}个, 周期={sync_pulse2_period} (10ns)") print(f" 同步脉冲3: {sync_pulse3_count}个, 周期={sync_pulse3_period} (10ns)") print(f" 同步脉冲4: {sync_pulse4_count}个, 周期={sync_pulse4_period} (10ns)") print("=" * 80) logger.info(f"已写入时间序列数据文件: {output_file}") def _write_time_series_text(self, encoded_signals: list, param_data: bytes, stop_instruction: bytes, output_file: str): """写入时间序列文本文件(每行一个十进制整数)""" all_bytes = param_data # 重新组织信号(8个一组,组内逆序) reorganized_signals = self._reorganize_signals_for_output(encoded_signals) # 将所有编码信号转换为字节 for signal in reorganized_signals: all_bytes += struct.pack('>Q', signal) # 写入文件(每行一个十进制整数) with open(output_file, 'w', encoding='utf-8') as f: for byte_value in all_bytes: f.write(f"{byte_value}\n") print(f"文本文件已生成: {output_file} ({len(all_bytes)} 字节)") logger.info(f"已写入时间序列文本文件: {output_file}") def _write_time_series_debug(self, time_series: dict, encoded_signals: list, output_file: str): """写入时间序列调试文件""" encoder = BitFieldEncoder() with open(output_file, 'w', encoding='utf-8') as f: # 写入头部信息 f.write("# 探测器信号数据 V4.2 - 打包模式: combined\n") f.write("# 每个数据包: 800个点火信号 + 2字节CRC16 = 6402字节\n") f.write("# 8个信号为一组,每组内从后往前导出(逆序排列: 8,7,6,5,4,3,2,1, 16,15...)\n") f.write("# 格式: 事件ID | 信号ID | 十六进制编码 | 解析结果\n") f.write("# 注: 能量为0的探测器不显示\n") f.write("#" * 120 + "\n\n") # 写入模拟参数 f.write("#\n") f.write("# 模拟参数:\n") # 同步脉冲参数 sync_pulses = self.config.get('simulation', {}).get('sync_pulses', []) for i, pulse in enumerate(sync_pulses, 1): count = pulse.get('count', 0) period = pulse.get('period', 0) f.write(f"# 同步脉冲{i}: 个数={count}, 周期={period} (10ns)\n") # 能量和AD转换参数 energy_conversion = self.config.get('simulation', {}).get('energy_conversion', {}) energy_K = energy_conversion.get('K', 1.0) energy_B = energy_conversion.get('B', 0.0) f.write(f"# 能量转换参数: K={energy_K}, B={energy_B} (mV = keV × K + B)\n") f.write("#" * 120 + "\n\n") total_signals = len(encoded_signals) # 预先计算每个信号对应的事件ID signal_event_ids = [] current_event_id = 0 for signal_idx in range(total_signals): # 检查是否是事件开始 if signal_idx == 0: current_event_id = 1 # 第一个信号,事件ID从1开始 elif encoder.decode(encoded_signals[signal_idx - 1])['event_end']: current_event_id += 1 # 前一个信号是事件结束,新事件开始 signal_event_ids.append(current_event_id) # 处理每组8个信号,组内逆序导出 num_groups = total_signals // 8 for group_idx in range(num_groups): # 计算当前组的结束索引(即该组的最后一个信号索引+1) end_idx = (group_idx + 1) * 8 start_idx = end_idx - 8 group_signals = encoded_signals[start_idx:end_idx] # 组内逆序处理:从后往前(即从第8个到第1个) for j in range(len(group_signals) - 1, -1, -1): signal_idx = start_idx + j signal = group_signals[j] display_idx = signal_idx + 1 # 显示的信号序号(1-based) event_id = signal_event_ids[signal_idx] # 获取预先计算的事件ID hex_value = f"0x{signal:016X}" # 解析信号 decoded = encoder.decode(signal) timestamp = decoded['timestamp'] event_end = decoded['event_end'] energies = decoded['energies'] # 构建信号描述,只显示能量不为0的探测器 signal_desc = [] if energies[0] > 0: det1_kev = (energies[0] - self.energy_B) / self.energy_K if self.energy_K != 0 else 0 signal_desc.append(f"Det1:{det1_kev:.0f}keV") if energies[1] > 0: det2_kev = (energies[1] - self.energy_B) / self.energy_K if self.energy_K != 0 else 0 signal_desc.append(f"Det2:{det2_kev:.0f}keV") if energies[2] > 0: det3_kev = (energies[2] - self.energy_B) / self.energy_K if self.energy_K != 0 else 0 signal_desc.append(f"Det3:{det3_kev:.0f}keV") # 如果没有探测器有能量,显示无信号 if not signal_desc: signal_desc_str = "无信号" else: signal_desc_str = ", ".join(signal_desc) # 写入信号行 f.write(f" {event_id:4d} | {display_idx:4d} | {hex_value} | 时间戳:{timestamp}us, 信号:{signal_desc_str}\n") # 如果是事件结束,添加分隔线 if event_end: f.write("-" * 120 + "\n") # 每组信号后空一行 if group_idx < num_groups - 1: f.write("\n") # 写入数据包信息 total_packets = (total_signals + 799) // 800 # 每800个信号一个数据包 f.write("\n" + "#" * 120 + "\n") f.write(f"# 数据包统计: 共 {total_packets} 个数据包\n") f.write(f"# 每个数据包包含 800 个信号 + 2 字节 CRC16\n") f.write(f"# 总信号数: {total_signals}\n") f.write(f"# 总事件数: {current_event_id}\n") print(f"调试文件已生成: {output_file}") logger.info(f"已写入时间序列调试文件: {output_file}")