#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 测试脚本:验证时间戳去重功能 """ import sys import os from core.simulator import SampleSpaceGenerator, TimeSeriesGenerator, DetectorSampler # 添加项目根目录到Python路径 sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) # 测试配置 detector_configs = { "detector1": { "name": "detector1", "energy_distribution": { "type": "uniform", "params": { "low": 100, "high": 1000 } }, "timestamp_distribution": { "type": "uniform", "params": { "low": 0, "high": 100 } }, "sample_space_size": 1000, "allow_replacement": True }, "detector2": { "name": "detector2", "energy_distribution": { "type": "uniform", "params": { "low": 100, "high": 1000 } }, "timestamp_distribution": { "type": "uniform", "params": { "low": 0, "high": 100 } }, "sample_space_size": 1000, "allow_replacement": True }, "detector3": { "name": "detector3", "energy_distribution": { "type": "uniform", "params": { "low": 100, "high": 1000 } }, "timestamp_distribution": { "type": "uniform", "params": { "low": 0, "high": 100 } }, "sample_space_size": 1000, "allow_replacement": True } } sync_pulses = [ { "count": 10, "period": 10000, # 10000 * 10ns = 100us }, { "count": 5, "period": 20000, # 20000 * 10ns = 200us }, { "count": 0, "period": 0, }, { "count": 0, "period": 0, } ] # 运行测试 def test_timestamp_deduplication(): print("开始测试时间戳去重功能...") # 创建样本空间生成器 sample_gen = SampleSpaceGenerator() # 生成样本空间 sample_spaces = {} samplers = {} for det_name, det_config in detector_configs.items(): sample_space = sample_gen.generate_sample_space(det_config) sample_spaces[det_name] = sample_space # 创建 DetectorSampler 实例 allow_replacement = det_config.get('allow_replacement', True) samplers[det_name] = DetectorSampler(sample_space, allow_replacement) # 创建时间序列生成器 ts_gen = TimeSeriesGenerator( work_mode="CO", sync_pulses=sync_pulses, sample_spaces=sample_spaces, samplers=samplers, detectors=detector_configs, energy_K=1.0, energy_B=0.0 ) # 生成时间序列 time_series = ts_gen.generate_time_series() # 检查每个探测器的信号 for det_name in ["detector1", "detector2", "detector3"]: if det_name in time_series: signals = time_series[det_name] print(f"\n探测器 {det_name} 生成了 {len(signals)} 个信号") # 检查是否有重复时间戳 timestamps = [signal[0] for signal in signals] unique_timestamps = set(timestamps) if len(timestamps) != len(unique_timestamps): print(f"❌ 发现重复时间戳!总时间戳数: {len(timestamps)}, 唯一时间戳数: {len(unique_timestamps)}") # 找出重复的时间戳 seen = set() duplicates = set() for ts in timestamps: if ts in seen: duplicates.add(ts) else: seen.add(ts) print(f"重复的时间戳: {sorted(duplicates)}") else: print("✅ 未发现重复时间戳") if __name__ == "__main__": test_timestamp_deduplication()