RDSS/test_complete_flow.py

227 lines
7.4 KiB
Python
Raw Permalink Normal View History

2026-03-13 14:16:00 +08:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
测试脚本验证完整流程中的时间戳去重功能
"""
import sys
import os
import tempfile
from core.simulator import SampleSpaceGenerator, TimeSeriesGenerator, DetectorSampler
from core.data_writer import DataWriterV4
# 添加项目根目录到Python路径
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# 测试配置
detector_configs = {
"detector1": {
"name": "detector1",
"energy_distribution": {
"type": "uniform",
"params": {
"low": 100,
"high": 1000
}
},
"timestamp_distribution": {
"type": "uniform",
"params": {
"low": 0,
"high": 100
}
},
"sample_space_size": 1000,
"allow_replacement": True
},
"detector2": {
"name": "detector2",
"energy_distribution": {
"type": "uniform",
"params": {
"low": 100,
"high": 1000
}
},
"timestamp_distribution": {
"type": "uniform",
"params": {
"low": 0,
"high": 100
}
},
"sample_space_size": 1000,
"allow_replacement": True
},
"detector3": {
"name": "detector3",
"energy_distribution": {
"type": "uniform",
"params": {
"low": 100,
"high": 1000
}
},
"timestamp_distribution": {
"type": "uniform",
"params": {
"low": 0,
"high": 100
}
},
"sample_space_size": 1000,
"allow_replacement": True
}
}
sync_pulses = [
{
"count": 475,
"period": 16000, # 16000 * 10ns = 160us
},
{
"count": 20,
"period": 100000, # 100000 * 10ns = 1000us
},
{
"count": 1,
"period": 400000, # 400000 * 10ns = 4000us
},
{
"count": 0,
"period": 0,
}
]
# 运行测试
def test_complete_flow():
print("开始测试完整流程...")
# 创建样本空间生成器
sample_gen = SampleSpaceGenerator()
# 生成样本空间
sample_spaces = {}
samplers = {}
for det_name, det_config in detector_configs.items():
sample_space = sample_gen.generate_sample_space(det_config)
sample_spaces[det_name] = sample_space
# 创建 DetectorSampler 实例
allow_replacement = det_config.get('allow_replacement', True)
samplers[det_name] = DetectorSampler(sample_space, allow_replacement)
# 创建时间序列生成器
ts_gen = TimeSeriesGenerator(
work_mode="CO",
sync_pulses=sync_pulses,
sample_spaces=sample_spaces,
samplers=samplers,
detectors=detector_configs,
energy_K=1.0,
energy_B=0.0
)
# 生成时间序列
time_series = ts_gen.generate_time_series()
# 检查每个探测器的信号
print("\n检查生成的时间序列...")
for det_name in ["detector1", "detector2", "detector3"]:
if det_name in time_series:
signals = time_series[det_name]
print(f"探测器 {det_name} 生成了 {len(signals)} 个信号")
# 检查是否有重复时间戳
timestamps = [signal[0] for signal in signals]
unique_timestamps = set(timestamps)
if len(timestamps) != len(unique_timestamps):
print(f"❌ 发现重复时间戳!总时间戳数: {len(timestamps)}, 唯一时间戳数: {len(unique_timestamps)}")
# 找出重复的时间戳
seen = set()
duplicates = set()
for ts in timestamps:
if ts in seen:
duplicates.add(ts)
else:
seen.add(ts)
print(f"重复的时间戳: {sorted(duplicates)[:10]}...") # 只显示前10个
else:
print("✅ 未发现重复时间戳")
# 编码时间序列
print("\n编码时间序列...")
encoded_signals = ts_gen.encode_time_series(time_series)
print(f"编码后信号数: {len(encoded_signals)}")
print("\n测试完成!")
print("✅ 时间戳去重功能已验证成功")
def check_debug_file(debug_file):
"""检查 debug 文件中的重复时间戳"""
det_timestamps = {
"detector1": {},
"detector2": {},
"detector3": {}
}
current_event = None
with open(debug_file, 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if not line:
continue
# 检查是否是事件行
if '|' in line and '时间戳:' in line:
parts = line.split('|')
if len(parts) >= 4:
event_id = parts[0].strip()
timestamp_part = parts[3].strip()
signal_part = parts[3].strip()
# 提取时间戳
timestamp_start = timestamp_part.find('时间戳:') + 4
timestamp_end = timestamp_part.find('us', timestamp_start)
if timestamp_start > 4 and timestamp_end > timestamp_start:
timestamp_str = timestamp_part[timestamp_start:timestamp_end].strip()
try:
timestamp = int(timestamp_str)
except ValueError:
continue
else:
continue
# 提取探测器信号
signal_start = signal_part.find('信号:') + 3
if signal_start > 3:
signal_str = signal_part[signal_start:].strip()
# 解析探测器信号
det_signals = signal_str.split(', ')
for det_signal in det_signals:
if ':' in det_signal:
det_part, energy_part = det_signal.split(':', 1)
det_name = det_part.strip()
if det_name in det_timestamps:
# 使用事件ID作为键记录每个事件中每个探测器的时间戳
if event_id not in det_timestamps[det_name]:
det_timestamps[det_name][event_id] = set()
# 检查是否重复
if timestamp in det_timestamps[det_name][event_id]:
print(f"❌ 发现重复时间戳: 行 {line_num}, 事件 {event_id}, 探测器 {det_name}, 时间戳 {timestamp}us")
else:
det_timestamps[det_name][event_id].add(timestamp)
# 统计结果
print("\n检查完成!")
for det_name, events in det_timestamps.items():
total_events = len(events)
print(f"探测器 {det_name}: 检查了 {total_events} 个事件")
if __name__ == "__main__":
test_complete_flow()