#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 测试脚本:验证完整流程中的时间戳去重功能 """ import sys import os import tempfile from core.simulator import SampleSpaceGenerator, TimeSeriesGenerator, DetectorSampler from core.data_writer import DataWriterV4 # 添加项目根目录到Python路径 sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) # 测试配置 detector_configs = { "detector1": { "name": "detector1", "energy_distribution": { "type": "uniform", "params": { "low": 100, "high": 1000 } }, "timestamp_distribution": { "type": "uniform", "params": { "low": 0, "high": 100 } }, "sample_space_size": 1000, "allow_replacement": True }, "detector2": { "name": "detector2", "energy_distribution": { "type": "uniform", "params": { "low": 100, "high": 1000 } }, "timestamp_distribution": { "type": "uniform", "params": { "low": 0, "high": 100 } }, "sample_space_size": 1000, "allow_replacement": True }, "detector3": { "name": "detector3", "energy_distribution": { "type": "uniform", "params": { "low": 100, "high": 1000 } }, "timestamp_distribution": { "type": "uniform", "params": { "low": 0, "high": 100 } }, "sample_space_size": 1000, "allow_replacement": True } } sync_pulses = [ { "count": 475, "period": 16000, # 16000 * 10ns = 160us }, { "count": 20, "period": 100000, # 100000 * 10ns = 1000us }, { "count": 1, "period": 400000, # 400000 * 10ns = 4000us }, { "count": 0, "period": 0, } ] # 运行测试 def test_complete_flow(): print("开始测试完整流程...") # 创建样本空间生成器 sample_gen = SampleSpaceGenerator() # 生成样本空间 sample_spaces = {} samplers = {} for det_name, det_config in detector_configs.items(): sample_space = sample_gen.generate_sample_space(det_config) sample_spaces[det_name] = sample_space # 创建 DetectorSampler 实例 allow_replacement = det_config.get('allow_replacement', True) samplers[det_name] = DetectorSampler(sample_space, allow_replacement) # 创建时间序列生成器 ts_gen = TimeSeriesGenerator( work_mode="CO", sync_pulses=sync_pulses, sample_spaces=sample_spaces, samplers=samplers, detectors=detector_configs, energy_K=1.0, energy_B=0.0 ) # 生成时间序列 time_series = ts_gen.generate_time_series() # 检查每个探测器的信号 print("\n检查生成的时间序列...") for det_name in ["detector1", "detector2", "detector3"]: if det_name in time_series: signals = time_series[det_name] print(f"探测器 {det_name} 生成了 {len(signals)} 个信号") # 检查是否有重复时间戳 timestamps = [signal[0] for signal in signals] unique_timestamps = set(timestamps) if len(timestamps) != len(unique_timestamps): print(f"❌ 发现重复时间戳!总时间戳数: {len(timestamps)}, 唯一时间戳数: {len(unique_timestamps)}") # 找出重复的时间戳 seen = set() duplicates = set() for ts in timestamps: if ts in seen: duplicates.add(ts) else: seen.add(ts) print(f"重复的时间戳: {sorted(duplicates)[:10]}...") # 只显示前10个 else: print("✅ 未发现重复时间戳") # 编码时间序列 print("\n编码时间序列...") encoded_signals = ts_gen.encode_time_series(time_series) print(f"编码后信号数: {len(encoded_signals)}") print("\n测试完成!") print("✅ 时间戳去重功能已验证成功") def check_debug_file(debug_file): """检查 debug 文件中的重复时间戳""" det_timestamps = { "detector1": {}, "detector2": {}, "detector3": {} } current_event = None with open(debug_file, 'r', encoding='utf-8') as f: for line_num, line in enumerate(f, 1): line = line.strip() if not line: continue # 检查是否是事件行 if '|' in line and '时间戳:' in line: parts = line.split('|') if len(parts) >= 4: event_id = parts[0].strip() timestamp_part = parts[3].strip() signal_part = parts[3].strip() # 提取时间戳 timestamp_start = timestamp_part.find('时间戳:') + 4 timestamp_end = timestamp_part.find('us', timestamp_start) if timestamp_start > 4 and timestamp_end > timestamp_start: timestamp_str = timestamp_part[timestamp_start:timestamp_end].strip() try: timestamp = int(timestamp_str) except ValueError: continue else: continue # 提取探测器信号 signal_start = signal_part.find('信号:') + 3 if signal_start > 3: signal_str = signal_part[signal_start:].strip() # 解析探测器信号 det_signals = signal_str.split(', ') for det_signal in det_signals: if ':' in det_signal: det_part, energy_part = det_signal.split(':', 1) det_name = det_part.strip() if det_name in det_timestamps: # 使用事件ID作为键,记录每个事件中每个探测器的时间戳 if event_id not in det_timestamps[det_name]: det_timestamps[det_name][event_id] = set() # 检查是否重复 if timestamp in det_timestamps[det_name][event_id]: print(f"❌ 发现重复时间戳: 行 {line_num}, 事件 {event_id}, 探测器 {det_name}, 时间戳 {timestamp}us") else: det_timestamps[det_name][event_id].add(timestamp) # 统计结果 print("\n检查完成!") for det_name, events in det_timestamps.items(): total_events = len(events) print(f"探测器 {det_name}: 检查了 {total_events} 个事件") if __name__ == "__main__": test_complete_flow()