RDSS/test_timestamp_deduplicatio...

153 lines
4.1 KiB
Python
Raw Permalink Normal View History

2026-03-13 14:16:00 +08:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
测试脚本验证时间戳去重功能
"""
import sys
import os
from core.simulator import SampleSpaceGenerator, TimeSeriesGenerator, DetectorSampler
# 添加项目根目录到Python路径
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# 测试配置
detector_configs = {
"detector1": {
"name": "detector1",
"energy_distribution": {
"type": "uniform",
"params": {
"low": 100,
"high": 1000
}
},
"timestamp_distribution": {
"type": "uniform",
"params": {
"low": 0,
"high": 100
}
},
"sample_space_size": 1000,
"allow_replacement": True
},
"detector2": {
"name": "detector2",
"energy_distribution": {
"type": "uniform",
"params": {
"low": 100,
"high": 1000
}
},
"timestamp_distribution": {
"type": "uniform",
"params": {
"low": 0,
"high": 100
}
},
"sample_space_size": 1000,
"allow_replacement": True
},
"detector3": {
"name": "detector3",
"energy_distribution": {
"type": "uniform",
"params": {
"low": 100,
"high": 1000
}
},
"timestamp_distribution": {
"type": "uniform",
"params": {
"low": 0,
"high": 100
}
},
"sample_space_size": 1000,
"allow_replacement": True
}
}
sync_pulses = [
{
"count": 10,
"period": 10000, # 10000 * 10ns = 100us
},
{
"count": 5,
"period": 20000, # 20000 * 10ns = 200us
},
{
"count": 0,
"period": 0,
},
{
"count": 0,
"period": 0,
}
]
# 运行测试
def test_timestamp_deduplication():
print("开始测试时间戳去重功能...")
# 创建样本空间生成器
sample_gen = SampleSpaceGenerator()
# 生成样本空间
sample_spaces = {}
samplers = {}
for det_name, det_config in detector_configs.items():
sample_space = sample_gen.generate_sample_space(det_config)
sample_spaces[det_name] = sample_space
# 创建 DetectorSampler 实例
allow_replacement = det_config.get('allow_replacement', True)
samplers[det_name] = DetectorSampler(sample_space, allow_replacement)
# 创建时间序列生成器
ts_gen = TimeSeriesGenerator(
work_mode="CO",
sync_pulses=sync_pulses,
sample_spaces=sample_spaces,
samplers=samplers,
detectors=detector_configs,
energy_K=1.0,
energy_B=0.0
)
# 生成时间序列
time_series = ts_gen.generate_time_series()
# 检查每个探测器的信号
for det_name in ["detector1", "detector2", "detector3"]:
if det_name in time_series:
signals = time_series[det_name]
print(f"\n探测器 {det_name} 生成了 {len(signals)} 个信号")
# 检查是否有重复时间戳
timestamps = [signal[0] for signal in signals]
unique_timestamps = set(timestamps)
if len(timestamps) != len(unique_timestamps):
print(f"❌ 发现重复时间戳!总时间戳数: {len(timestamps)}, 唯一时间戳数: {len(unique_timestamps)}")
# 找出重复的时间戳
seen = set()
duplicates = set()
for ts in timestamps:
if ts in seen:
duplicates.add(ts)
else:
seen.add(ts)
print(f"重复的时间戳: {sorted(duplicates)}")
else:
print("✅ 未发现重复时间戳")
if __name__ == "__main__":
test_timestamp_deduplication()