RDSS/test_complete_flow.py

227 lines
7.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
测试脚本:验证完整流程中的时间戳去重功能
"""
import sys
import os
import tempfile
from core.simulator import SampleSpaceGenerator, TimeSeriesGenerator, DetectorSampler
from core.data_writer import DataWriterV4
# 添加项目根目录到Python路径
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# 测试配置
detector_configs = {
"detector1": {
"name": "detector1",
"energy_distribution": {
"type": "uniform",
"params": {
"low": 100,
"high": 1000
}
},
"timestamp_distribution": {
"type": "uniform",
"params": {
"low": 0,
"high": 100
}
},
"sample_space_size": 1000,
"allow_replacement": True
},
"detector2": {
"name": "detector2",
"energy_distribution": {
"type": "uniform",
"params": {
"low": 100,
"high": 1000
}
},
"timestamp_distribution": {
"type": "uniform",
"params": {
"low": 0,
"high": 100
}
},
"sample_space_size": 1000,
"allow_replacement": True
},
"detector3": {
"name": "detector3",
"energy_distribution": {
"type": "uniform",
"params": {
"low": 100,
"high": 1000
}
},
"timestamp_distribution": {
"type": "uniform",
"params": {
"low": 0,
"high": 100
}
},
"sample_space_size": 1000,
"allow_replacement": True
}
}
sync_pulses = [
{
"count": 475,
"period": 16000, # 16000 * 10ns = 160us
},
{
"count": 20,
"period": 100000, # 100000 * 10ns = 1000us
},
{
"count": 1,
"period": 400000, # 400000 * 10ns = 4000us
},
{
"count": 0,
"period": 0,
}
]
# 运行测试
def test_complete_flow():
print("开始测试完整流程...")
# 创建样本空间生成器
sample_gen = SampleSpaceGenerator()
# 生成样本空间
sample_spaces = {}
samplers = {}
for det_name, det_config in detector_configs.items():
sample_space = sample_gen.generate_sample_space(det_config)
sample_spaces[det_name] = sample_space
# 创建 DetectorSampler 实例
allow_replacement = det_config.get('allow_replacement', True)
samplers[det_name] = DetectorSampler(sample_space, allow_replacement)
# 创建时间序列生成器
ts_gen = TimeSeriesGenerator(
work_mode="CO",
sync_pulses=sync_pulses,
sample_spaces=sample_spaces,
samplers=samplers,
detectors=detector_configs,
energy_K=1.0,
energy_B=0.0
)
# 生成时间序列
time_series = ts_gen.generate_time_series()
# 检查每个探测器的信号
print("\n检查生成的时间序列...")
for det_name in ["detector1", "detector2", "detector3"]:
if det_name in time_series:
signals = time_series[det_name]
print(f"探测器 {det_name} 生成了 {len(signals)} 个信号")
# 检查是否有重复时间戳
timestamps = [signal[0] for signal in signals]
unique_timestamps = set(timestamps)
if len(timestamps) != len(unique_timestamps):
print(f"❌ 发现重复时间戳!总时间戳数: {len(timestamps)}, 唯一时间戳数: {len(unique_timestamps)}")
# 找出重复的时间戳
seen = set()
duplicates = set()
for ts in timestamps:
if ts in seen:
duplicates.add(ts)
else:
seen.add(ts)
print(f"重复的时间戳: {sorted(duplicates)[:10]}...") # 只显示前10个
else:
print("✅ 未发现重复时间戳")
# 编码时间序列
print("\n编码时间序列...")
encoded_signals = ts_gen.encode_time_series(time_series)
print(f"编码后信号数: {len(encoded_signals)}")
print("\n测试完成!")
print("✅ 时间戳去重功能已验证成功")
def check_debug_file(debug_file):
"""检查 debug 文件中的重复时间戳"""
det_timestamps = {
"detector1": {},
"detector2": {},
"detector3": {}
}
current_event = None
with open(debug_file, 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if not line:
continue
# 检查是否是事件行
if '|' in line and '时间戳:' in line:
parts = line.split('|')
if len(parts) >= 4:
event_id = parts[0].strip()
timestamp_part = parts[3].strip()
signal_part = parts[3].strip()
# 提取时间戳
timestamp_start = timestamp_part.find('时间戳:') + 4
timestamp_end = timestamp_part.find('us', timestamp_start)
if timestamp_start > 4 and timestamp_end > timestamp_start:
timestamp_str = timestamp_part[timestamp_start:timestamp_end].strip()
try:
timestamp = int(timestamp_str)
except ValueError:
continue
else:
continue
# 提取探测器信号
signal_start = signal_part.find('信号:') + 3
if signal_start > 3:
signal_str = signal_part[signal_start:].strip()
# 解析探测器信号
det_signals = signal_str.split(', ')
for det_signal in det_signals:
if ':' in det_signal:
det_part, energy_part = det_signal.split(':', 1)
det_name = det_part.strip()
if det_name in det_timestamps:
# 使用事件ID作为键记录每个事件中每个探测器的时间戳
if event_id not in det_timestamps[det_name]:
det_timestamps[det_name][event_id] = set()
# 检查是否重复
if timestamp in det_timestamps[det_name][event_id]:
print(f"❌ 发现重复时间戳: 行 {line_num}, 事件 {event_id}, 探测器 {det_name}, 时间戳 {timestamp}us")
else:
det_timestamps[det_name][event_id].add(timestamp)
# 统计结果
print("\n检查完成!")
for det_name, events in det_timestamps.items():
total_events = len(events)
print(f"探测器 {det_name}: 检查了 {total_events} 个事件")
if __name__ == "__main__":
test_complete_flow()