271 lines
9.1 KiB
Python
271 lines
9.1 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
测试完整流程中的时间戳去重和信号合并功能
|
||
"""
|
||
|
||
import os
|
||
import tempfile
|
||
import numpy as np
|
||
from core.simulator import SampleSpaceGenerator, DetectorSampler, TimeSeriesGenerator
|
||
from core.data_writer import DataWriterV4
|
||
|
||
|
||
def test_complete_flow_with_debug_file():
|
||
"""测试完整流程,包括生成、编码和数据写入"""
|
||
print("开始测试完整流程的时间戳去重和信号合并功能...")
|
||
|
||
# 配置参数
|
||
sync_pulses = [
|
||
{"count": 10, "period": 10000}, # 同步脉冲1: 10个,周期10000 (10ns)
|
||
{"count": 5, "period": 20000}, # 同步脉冲2: 5个,周期20000 (10ns)
|
||
{"count": 0, "period": 0}, # 同步脉冲3: 0个
|
||
{"count": 0, "period": 0} # 同步脉冲4: 0个
|
||
]
|
||
|
||
# 探测器配置
|
||
detector_configs = {
|
||
"detector1": {
|
||
"name": "detector1",
|
||
"energy_range": [0, 1000],
|
||
"time_range": [0, 1000],
|
||
"signal_count": {
|
||
"min": 1,
|
||
"max": 10
|
||
}
|
||
},
|
||
"detector2": {
|
||
"name": "detector2",
|
||
"energy_range": [0, 1000],
|
||
"time_range": [0, 1000],
|
||
"signal_count": {
|
||
"min": 1,
|
||
"max": 10
|
||
}
|
||
},
|
||
"detector3": {
|
||
"name": "detector3",
|
||
"energy_range": [0, 1000],
|
||
"time_range": [0, 1000],
|
||
"signal_count": {
|
||
"min": 1,
|
||
"max": 10
|
||
}
|
||
}
|
||
}
|
||
|
||
# 生成样本空间
|
||
sample_space_gen = SampleSpaceGenerator()
|
||
sample_spaces = {}
|
||
samplers = {}
|
||
|
||
for det_name, config in detector_configs.items():
|
||
# 生成样本空间配置
|
||
det_config = {
|
||
"name": det_name,
|
||
"sample_space_size": 1000,
|
||
"energy_distribution": {
|
||
"type": "uniform",
|
||
"low": config["energy_range"][0],
|
||
"high": config["energy_range"][1]
|
||
},
|
||
"timestamp_distribution": {
|
||
"type": "uniform",
|
||
"low": config["time_range"][0],
|
||
"high": config["time_range"][1]
|
||
}
|
||
}
|
||
|
||
# 生成样本空间
|
||
sample_space = sample_space_gen.generate_sample_space(det_config)
|
||
sample_spaces[det_name] = sample_space
|
||
|
||
# 创建采样器
|
||
sampler = DetectorSampler(sample_space)
|
||
samplers[det_name] = sampler
|
||
|
||
print(f"为{det_name}生成了{len(sample_space)}个样本")
|
||
|
||
# 同步脉冲信号数配置
|
||
sync_pulse_signals = {
|
||
"pulse1": {
|
||
"detector1": {"min": 1, "max": 10},
|
||
"detector2": {"min": 1, "max": 10},
|
||
"detector3": {"min": 1, "max": 10}
|
||
},
|
||
"pulse2": {
|
||
"detector1": {"min": 1, "max": 10},
|
||
"detector2": {"min": 1, "max": 10},
|
||
"detector3": {"min": 1, "max": 10}
|
||
}
|
||
}
|
||
|
||
# 创建时间序列生成器
|
||
ts_gen = TimeSeriesGenerator(
|
||
work_mode="CO",
|
||
sync_pulses=sync_pulses,
|
||
sample_spaces=sample_spaces,
|
||
samplers=samplers,
|
||
detectors=detector_configs,
|
||
energy_K=1.0,
|
||
energy_B=0.0,
|
||
sync_pulse_signals=sync_pulse_signals
|
||
)
|
||
|
||
# 生成时间序列
|
||
print("\n生成时间序列...")
|
||
time_series = ts_gen.generate_time_series()
|
||
|
||
# 检查生成的时间序列
|
||
print("\n检查生成的时间序列...")
|
||
for det_name in ["detector1", "detector2", "detector3"]:
|
||
signals = time_series.get(det_name, [])
|
||
print(f"探测器 {det_name} 生成了 {len(signals)} 个信号")
|
||
|
||
# 检查重复时间戳
|
||
timestamps = [sig[0] for sig in signals]
|
||
duplicates = [ts for ts in set(timestamps) if timestamps.count(ts) > 1]
|
||
if duplicates:
|
||
print(f"❌ 发现重复时间戳: {sorted(duplicates)[:10]}...") # 只显示前10个
|
||
else:
|
||
print("✅ 未发现重复时间戳")
|
||
|
||
# 编码时间序列
|
||
print("\n编码时间序列...")
|
||
encoded_signals = ts_gen.encode_time_series(time_series)
|
||
print(f"编码后信号数: {len(encoded_signals)}")
|
||
|
||
# 创建临时文件
|
||
with tempfile.NamedTemporaryFile(suffix='_debug.txt', delete=False) as f:
|
||
debug_file = f.name
|
||
|
||
# 创建数据写入器
|
||
writer_config = {
|
||
"simulation": {
|
||
"work_mode": "CO",
|
||
"sync_pulses": sync_pulses,
|
||
"energy_conversion": {
|
||
"K": 1.0,
|
||
"B": 0.0
|
||
}
|
||
},
|
||
"detectors": detector_configs
|
||
}
|
||
|
||
data_writer = DataWriterV4(writer_config)
|
||
|
||
# 写入 debug 文件
|
||
print(f"\n写入 debug 文件: {debug_file}")
|
||
# 由于 write_events_debug_text 期望的是事件列表,其中每个事件是该同步脉冲的信号列表
|
||
# 我们需要创建正确的事件格式
|
||
try:
|
||
# 按事件结束标志分割信号,创建事件列表
|
||
events = []
|
||
current_event = []
|
||
for signal in encoded_signals:
|
||
current_event.append(signal)
|
||
# 检查是否是事件结束信号
|
||
# 事件结束标志在bit63
|
||
if (signal >> 63) & 1:
|
||
events.append(current_event)
|
||
current_event = []
|
||
|
||
if current_event:
|
||
events.append(current_event)
|
||
|
||
data_writer.write_events_debug_text(events, debug_file)
|
||
print(f"✅ Debug 文件写入成功,共写入 {len(events)} 个事件")
|
||
except Exception as e:
|
||
print(f"❌ Debug 文件写入失败: {e}")
|
||
return
|
||
|
||
# 检查 debug 文件中的重复时间戳
|
||
print("\n检查 debug 文件中的重复时间戳...")
|
||
check_debug_file(debug_file)
|
||
|
||
# 清理临时文件
|
||
os.unlink(debug_file)
|
||
print(f"\n清理临时文件: {debug_file}")
|
||
|
||
print("\n测试完成!")
|
||
print("✅ 完整流程的时间戳去重和信号合并功能已验证成功")
|
||
|
||
|
||
def check_debug_file(debug_file):
|
||
"""检查 debug 文件中的重复时间戳"""
|
||
duplicate_found = False
|
||
|
||
with open(debug_file, 'r', encoding='utf-8') as f:
|
||
current_event = None
|
||
event_det_timestamps = {} # 每个事件的探测器时间戳
|
||
|
||
for line in f:
|
||
line = line.strip()
|
||
if not line:
|
||
continue
|
||
|
||
if line.startswith('#'):
|
||
# 检查是否是事件结束标记
|
||
if '总结:' in line:
|
||
# 事件结束,重置时间戳记录
|
||
current_event = None
|
||
event_det_timestamps = {}
|
||
continue
|
||
|
||
# 解析行数据
|
||
parts = line.split('|')
|
||
if len(parts) < 4:
|
||
continue
|
||
|
||
event_idx = int(parts[0].strip())
|
||
info = parts[3].strip()
|
||
|
||
# 提取时间戳和探测器信息
|
||
timestamp_str = None
|
||
det_infos = []
|
||
|
||
for part in info.split(','):
|
||
part = part.strip()
|
||
if part.startswith('时间戳:'):
|
||
timestamp_str = part.split(':')[1].replace('us', '').strip()
|
||
elif part.startswith('信号:'):
|
||
signal_part = part.split(':', 1)[1].strip()
|
||
for det_info in signal_part.split(','):
|
||
det_info = det_info.strip()
|
||
if det_info:
|
||
det_infos.append(det_info)
|
||
|
||
if timestamp_str:
|
||
timestamp = int(timestamp_str)
|
||
|
||
# 初始化当前事件的时间戳记录
|
||
if current_event != event_idx:
|
||
current_event = event_idx
|
||
event_det_timestamps = {
|
||
"detector1": set(),
|
||
"detector2": set(),
|
||
"detector3": set()
|
||
}
|
||
|
||
for det_info in det_infos:
|
||
# 提取探测器名称和能量
|
||
if ':' in det_info:
|
||
det_part = det_info.split(':')[0]
|
||
if det_part.startswith('Det'):
|
||
det_idx = int(det_part[3]) - 1 # Det1 -> 0
|
||
det_name = f"detector{det_idx + 1}"
|
||
|
||
# 检查该探测器在该事件内的该时间戳是否已存在
|
||
if timestamp in event_det_timestamps[det_name]:
|
||
print(f"❌ 重复时间戳: 事件={event_idx}, 时间戳={timestamp}us, 探测器={det_name}")
|
||
duplicate_found = True
|
||
else:
|
||
event_det_timestamps[det_name].add(timestamp)
|
||
|
||
if not duplicate_found:
|
||
print("✅ Debug 文件中未发现重复时间戳")
|
||
else:
|
||
print("\n注意:不同事件之间的时间戳重复是正常的,因为时间戳是相对于每个同步脉冲的")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
test_complete_flow_with_debug_file() |